diff --git a/backup/src/main/java/io/camunda/zeebe/backup/management/BackupAlreadyExistsException.java b/backup/src/main/java/io/camunda/zeebe/backup/management/BackupAlreadyExistsException.java index 0af319d402e0..09f929f9fa1f 100644 --- a/backup/src/main/java/io/camunda/zeebe/backup/management/BackupAlreadyExistsException.java +++ b/backup/src/main/java/io/camunda/zeebe/backup/management/BackupAlreadyExistsException.java @@ -8,11 +8,11 @@ package io.camunda.zeebe.backup.management; import io.camunda.zeebe.backup.api.BackupIdentifier; -import io.camunda.zeebe.backup.api.BackupStatus; +import io.camunda.zeebe.backup.api.BackupStatusCode; -public class BackupAlreadyExistsException extends RuntimeException { +class BackupAlreadyExistsException extends RuntimeException { - public BackupAlreadyExistsException(final BackupIdentifier id, final BackupStatus status) { + BackupAlreadyExistsException(final BackupIdentifier id, final BackupStatusCode status) { super("Backup with id %s already exists, status of the backup is %s".formatted(id, status)); } } diff --git a/backup/src/main/java/io/camunda/zeebe/backup/management/BackupServiceImpl.java b/backup/src/main/java/io/camunda/zeebe/backup/management/BackupServiceImpl.java index 66091f014312..5fad49c15432 100644 --- a/backup/src/main/java/io/camunda/zeebe/backup/management/BackupServiceImpl.java +++ b/backup/src/main/java/io/camunda/zeebe/backup/management/BackupServiceImpl.java @@ -18,6 +18,7 @@ import io.camunda.zeebe.scheduler.future.ActorFuture; import io.camunda.zeebe.scheduler.future.CompletableActorFuture; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.Optional; import java.util.Set; @@ -46,16 +47,22 @@ ActorFuture takeBackup( backupsInProgress.add(inProgressBackup); - final var checkCurrentBackup = backupStore.getStatus(inProgressBackup.id()); + final var checkCurrentBackup = + backupStore.list( + new BackupIdentifierWildcardImpl( + Optional.empty(), + Optional.of(inProgressBackup.id().partitionId()), + Optional.of(inProgressBackup.checkpointId()))); final ActorFuture backupSaved = concurrencyControl.createFuture(); checkCurrentBackup.whenCompleteAsync( - (status, error) -> { + (availableBackups, error) -> { if (error != null) { backupSaved.completeExceptionally(error); } else { - takeBackupIfDoesNotExist(status, inProgressBackup, concurrencyControl, backupSaved); + takeBackupIfDoesNotExist( + availableBackups, inProgressBackup, concurrencyControl, backupSaved); } }, concurrencyControl::run); @@ -65,12 +72,16 @@ ActorFuture takeBackup( } private void takeBackupIfDoesNotExist( - final BackupStatus status, + final Collection availableBackups, final InProgressBackup inProgressBackup, final ConcurrencyControl concurrencyControl, final ActorFuture backupSaved) { - switch (status.statusCode()) { + final BackupStatusCode existingBackupStatus = + availableBackups.isEmpty() + ? BackupStatusCode.DOES_NOT_EXIST + : Collections.max(availableBackups, BackupStatusCode.BY_STATUS).statusCode(); + switch (existingBackupStatus) { case COMPLETED -> { LOG.debug("Backup {} is already completed, will not take a new one", inProgressBackup.id()); backupSaved.complete(null); @@ -79,9 +90,9 @@ private void takeBackupIfDoesNotExist( LOG.error( "Backup {} already exists with status {}, will not take a new one", inProgressBackup.id(), - status); + existingBackupStatus); backupSaved.completeExceptionally( - new BackupAlreadyExistsException(inProgressBackup.id(), status)); + new BackupAlreadyExistsException(inProgressBackup.id(), existingBackupStatus)); } default -> { final ActorFuture snapshotFound = concurrencyControl.createFuture(); diff --git a/backup/src/test/java/io/camunda/zeebe/backup/management/BackupServiceImplTest.java b/backup/src/test/java/io/camunda/zeebe/backup/management/BackupServiceImplTest.java index 110f4d2ee698..fa74d7cd3860 100644 --- a/backup/src/test/java/io/camunda/zeebe/backup/management/BackupServiceImplTest.java +++ b/backup/src/test/java/io/camunda/zeebe/backup/management/BackupServiceImplTest.java @@ -16,13 +16,15 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import io.camunda.zeebe.backup.api.Backup; +import io.camunda.zeebe.backup.api.BackupIdentifier; import io.camunda.zeebe.backup.api.BackupStatus; import io.camunda.zeebe.backup.api.BackupStatusCode; import io.camunda.zeebe.backup.api.BackupStore; import io.camunda.zeebe.backup.common.BackupIdentifierImpl; import io.camunda.zeebe.backup.common.BackupStatusImpl; -import io.camunda.zeebe.scheduler.ConcurrencyControl; import io.camunda.zeebe.scheduler.future.ActorFuture; +import io.camunda.zeebe.scheduler.testing.TestActorFuture; import io.camunda.zeebe.scheduler.testing.TestConcurrencyControl; import java.time.Duration; import java.util.List; @@ -40,13 +42,12 @@ @ExtendWith(MockitoExtension.class) class BackupServiceImplTest { - @Mock InProgressBackup inProgressBackup; @Mock BackupStore backupStore; @Mock BackupStatus notExistingBackupStatus; private BackupServiceImpl backupService; - private final ConcurrencyControl concurrencyControl = new TestConcurrencyControl(); + private final TestConcurrencyControl concurrencyControl = new TestConcurrencyControl(); @BeforeEach void setup() { @@ -58,12 +59,19 @@ void setup() { lenient() .when(backupStore.getStatus(any())) .thenReturn(CompletableFuture.completedFuture(notExistingBackupStatus)); + lenient() + .when( + backupStore.list( + new BackupIdentifierWildcardImpl( + Optional.empty(), Optional.of(2), Optional.of(3L)))) + .thenReturn(CompletableFuture.completedFuture(List.of())); } @Test void shouldTakeBackup() { // given - mockInProgressBackup(); + final ControllableInProgressBackup inProgressBackup = new ControllableInProgressBackup(); + mockSaveBackup(); // when final var result = backupService.takeBackup(inProgressBackup, concurrencyControl); @@ -76,10 +84,10 @@ void shouldTakeBackup() { @Test void shouldCloseAllInProgressBackupsWhenClosing() { // given - final InProgressBackup backup1 = mock(InProgressBackup.class); - final InProgressBackup backup2 = mock(InProgressBackup.class); - when(backup1.findSegmentFiles()).thenReturn(concurrencyControl.createFuture()); - when(backup2.findSegmentFiles()).thenReturn(concurrencyControl.createFuture()); + final ControllableInProgressBackup backup1 = + new ControllableInProgressBackup().waitOnFindSegmentFiles(); + final ControllableInProgressBackup backup2 = + new ControllableInProgressBackup().waitOnFindSegmentFiles(); backupService.takeBackup(backup1, concurrencyControl); backupService.takeBackup(backup2, concurrencyControl); @@ -88,14 +96,15 @@ void shouldCloseAllInProgressBackupsWhenClosing() { backupService.close(); // then - verify(backup1).close(); - verify(backup2).close(); + assertThat(backup1.isClosed()).isTrue(); + assertThat(backup2.isClosed()).isTrue(); } @Test void shouldCloseInProgressBackupsAfterBackupIsTaken() { // given - mockInProgressBackup(); + final ControllableInProgressBackup inProgressBackup = new ControllableInProgressBackup(); + mockSaveBackup(); // when final var result = backupService.takeBackup(inProgressBackup, concurrencyControl); @@ -107,8 +116,8 @@ void shouldCloseInProgressBackupsAfterBackupIsTaken() { @Test void shouldFailBackupWhenNoValidSnapshotFound() { // given - mockFindSegmentFiles(); - when(inProgressBackup.findValidSnapshot()).thenReturn(failedFuture()); + final ControllableInProgressBackup inProgressBackup = + new ControllableInProgressBackup().failOnFindValidSnapshot(); // when final var result = backupService.takeBackup(inProgressBackup, concurrencyControl); @@ -118,60 +127,53 @@ void shouldFailBackupWhenNoValidSnapshotFound() { .failsWithin(Duration.ofMillis(1000)) .withThrowableOfType(ExecutionException.class) .withMessageContaining("Expected"); - verifyInProgressBackupIsCleanedUpAfterFailure(); + verifyInProgressBackupIsCleanedUpAfterFailure(inProgressBackup); } @Test void shouldFailBackupWhenSnapshotCannotBeReserved() { // given - mockFindSegmentFiles(); - mockFindValidSnapshot(); - when(inProgressBackup.reserveSnapshot()).thenReturn(failedFuture()); + final var inProgressBackup = new ControllableInProgressBackup().failOnReserveSnapshot(); // when final var result = backupService.takeBackup(inProgressBackup, concurrencyControl); // then assertThat(result).failsWithin(Duration.ofMillis(100)); - verifyInProgressBackupIsCleanedUpAfterFailure(); + verifyInProgressBackupIsCleanedUpAfterFailure(inProgressBackup); } @Test void shouldFailBackupWhenSnapshotFilesCannotBeCollected() { // given - mockFindSegmentFiles(); - mockFindValidSnapshot(); - mockReserveSnapshot(); - when(inProgressBackup.findSnapshotFiles()).thenReturn(failedFuture()); + final var inProgressBackup = new ControllableInProgressBackup().failOnFindSnapshotFiles(); // when final var result = backupService.takeBackup(inProgressBackup, concurrencyControl); // then assertThat(result).failsWithin(Duration.ofMillis(100)); - verifyInProgressBackupIsCleanedUpAfterFailure(); + verifyInProgressBackupIsCleanedUpAfterFailure(inProgressBackup); } @Test void shouldFailBackupWhenSegmentFilesCannotBeCollected() { // given - when(inProgressBackup.findSegmentFiles()).thenReturn(failedFuture()); + final ControllableInProgressBackup inProgressBackup = + new ControllableInProgressBackup().failOnFindSegmentFiles(); // when final var result = backupService.takeBackup(inProgressBackup, concurrencyControl); // then assertThat(result).failsWithin(Duration.ofMillis(100)); - verifyInProgressBackupIsCleanedUpAfterFailure(); + verifyInProgressBackupIsCleanedUpAfterFailure(inProgressBackup); } @Test void shouldFailBackupIfStoringFailed() { // given - mockFindValidSnapshot(); - mockReserveSnapshot(); - mockFindSnapshotFiles(); - mockFindSegmentFiles(); + final ControllableInProgressBackup inProgressBackup = new ControllableInProgressBackup(); when(backupStore.save(any())) .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Expected"))); @@ -180,7 +182,7 @@ void shouldFailBackupIfStoringFailed() { // then assertThat(result).failsWithin(Duration.ofMillis(100)); - verifyInProgressBackupIsCleanedUpAfterFailure(); + verifyInProgressBackupIsCleanedUpAfterFailure(inProgressBackup); } @Test @@ -310,9 +312,15 @@ void shouldMarkRemainingBackupsAsFailedWhenThrowsError() { @Test void shouldNotTakeNewBackupIfBackupAlreadyCompleted() { // given + final ControllableInProgressBackup inProgressBackup = new ControllableInProgressBackup(); final BackupStatus status = mock(BackupStatus.class); when(status.statusCode()).thenReturn(BackupStatusCode.COMPLETED); - when(backupStore.getStatus(any())).thenReturn(CompletableFuture.completedFuture(status)); + when(backupStore.list( + new BackupIdentifierWildcardImpl( + Optional.empty(), + Optional.of(inProgressBackup.id.partitionId()), + Optional.of(inProgressBackup.checkpointId())))) + .thenReturn(CompletableFuture.completedFuture(List.of(status))); // when backupService.takeBackup(inProgressBackup, concurrencyControl).join(); @@ -327,9 +335,10 @@ void shouldNotTakeNewBackupIfBackupAlreadyCompleted() { names = {"IN_PROGRESS", "FAILED"}) void shouldNotTakeNewBackupIfBackupAlreadyExists(final BackupStatusCode statusCode) { // given + final ControllableInProgressBackup inProgressBackup = new ControllableInProgressBackup(); final BackupStatus status = mock(BackupStatus.class); when(status.statusCode()).thenReturn(statusCode); - when(backupStore.getStatus(any())).thenReturn(CompletableFuture.completedFuture(status)); + when(backupStore.list(any())).thenReturn(CompletableFuture.completedFuture(List.of(status))); // when assertThat(backupService.takeBackup(inProgressBackup, concurrencyControl)) @@ -342,44 +351,104 @@ void shouldNotTakeNewBackupIfBackupAlreadyExists(final BackupStatusCode statusCo } private ActorFuture failedFuture() { - final ActorFuture future = concurrencyControl.createFuture(); - future.completeExceptionally(new RuntimeException("Expected")); - return future; + return concurrencyControl.failedFuture(new RuntimeException("Expected")); } - private void mockInProgressBackup() { - mockFindValidSnapshot(); - mockReserveSnapshot(); - mockFindSnapshotFiles(); - mockFindSegmentFiles(); - mockSaveBackup(); - } - - private void mockFindSnapshotFiles() { - when(inProgressBackup.findSnapshotFiles()) - .thenReturn(concurrencyControl.createCompletedFuture()); - } - - private void mockReserveSnapshot() { - when(inProgressBackup.reserveSnapshot()).thenReturn(concurrencyControl.createCompletedFuture()); - } - - private void mockFindValidSnapshot() { - when(inProgressBackup.findValidSnapshot()) - .thenReturn(concurrencyControl.createCompletedFuture()); + private void verifyInProgressBackupIsCleanedUpAfterFailure( + final ControllableInProgressBackup inProgressBackup) { + verify(backupStore).markFailed(any(), any()); + assertThat(inProgressBackup.isClosed()).isTrue(); } private void mockSaveBackup() { when(backupStore.save(any())).thenReturn(CompletableFuture.completedFuture(null)); } - private void mockFindSegmentFiles() { - when(inProgressBackup.findSegmentFiles()) - .thenReturn(concurrencyControl.createCompletedFuture()); - } - - private void verifyInProgressBackupIsCleanedUpAfterFailure() { - verify(backupStore).markFailed(any(), any()); - verify(inProgressBackup).close(); + class ControllableInProgressBackup implements InProgressBackup { + + private final BackupIdentifier id; + private ActorFuture findValidSnapshotFuture = TestActorFuture.completedFuture(null); + private ActorFuture reserveSnapshotFuture = TestActorFuture.completedFuture(null); + private ActorFuture findSnapshotFilesFuture = TestActorFuture.completedFuture(null); + private ActorFuture findSegmentFilesFuture = TestActorFuture.completedFuture(null); + private boolean closed; + + ControllableInProgressBackup() { + id = new BackupIdentifierImpl(1, 2, 3); + } + + @Override + public long checkpointId() { + return id.checkpointId(); + } + + @Override + public long checkpointPosition() { + return 1; + } + + @Override + public BackupIdentifier id() { + return id; + } + + @Override + public ActorFuture findValidSnapshot() { + return findValidSnapshotFuture; + } + + @Override + public ActorFuture reserveSnapshot() { + return reserveSnapshotFuture; + } + + @Override + public ActorFuture findSnapshotFiles() { + return findSnapshotFilesFuture; + } + + @Override + public ActorFuture findSegmentFiles() { + return findSegmentFilesFuture; + } + + @Override + public Backup createBackup() { + return null; + } + + @Override + public void close() { + closed = true; + } + + boolean isClosed() { + return closed; + } + + ControllableInProgressBackup waitOnFindSegmentFiles() { + findSegmentFilesFuture = concurrencyControl.createFuture(); + return this; + } + + ControllableInProgressBackup failOnFindSegmentFiles() { + findSegmentFilesFuture = failedFuture(); + return this; + } + + ControllableInProgressBackup failOnFindSnapshotFiles() { + findSnapshotFilesFuture = failedFuture(); + return this; + } + + ControllableInProgressBackup failOnReserveSnapshot() { + reserveSnapshotFuture = failedFuture(); + return this; + } + + ControllableInProgressBackup failOnFindValidSnapshot() { + findValidSnapshotFuture = failedFuture(); + return this; + } } }