From a886d809b6ec48e9bc5779b4856669210d8b7200 Mon Sep 17 00:00:00 2001 From: Deepthi Devaki Akkoorath Date: Tue, 2 May 2023 14:31:30 +0200 Subject: [PATCH 1/3] fix(backup): do not take backup when one already exists Previously, we were only checking for backups taken by this node. However, there could be backups taken by another node. For example, after restore the checkpoint record is processed again, but it must not re-take the backup. To prevent that, we should check for all backups with the given backup id. --- .../BackupAlreadyExistsException.java | 4 +-- .../backup/management/BackupServiceImpl.java | 27 ++++++++++++++----- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/backup/src/main/java/io/camunda/zeebe/backup/management/BackupAlreadyExistsException.java b/backup/src/main/java/io/camunda/zeebe/backup/management/BackupAlreadyExistsException.java index 330965e9608f..09f929f9fa1f 100644 --- a/backup/src/main/java/io/camunda/zeebe/backup/management/BackupAlreadyExistsException.java +++ b/backup/src/main/java/io/camunda/zeebe/backup/management/BackupAlreadyExistsException.java @@ -8,11 +8,11 @@ package io.camunda.zeebe.backup.management; import io.camunda.zeebe.backup.api.BackupIdentifier; -import io.camunda.zeebe.backup.api.BackupStatus; +import io.camunda.zeebe.backup.api.BackupStatusCode; class BackupAlreadyExistsException extends RuntimeException { - BackupAlreadyExistsException(final BackupIdentifier id, final BackupStatus status) { + BackupAlreadyExistsException(final BackupIdentifier id, final BackupStatusCode status) { super("Backup with id %s already exists, status of the backup is %s".formatted(id, status)); } } diff --git a/backup/src/main/java/io/camunda/zeebe/backup/management/BackupServiceImpl.java b/backup/src/main/java/io/camunda/zeebe/backup/management/BackupServiceImpl.java index f18a16a38dd6..5ed078e45358 100644 --- a/backup/src/main/java/io/camunda/zeebe/backup/management/BackupServiceImpl.java +++ b/backup/src/main/java/io/camunda/zeebe/backup/management/BackupServiceImpl.java @@ -15,6 +15,8 @@ import io.camunda.zeebe.scheduler.ConcurrencyControl; import io.camunda.zeebe.scheduler.future.ActorFuture; import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; import java.util.HashSet; import java.util.Optional; import java.util.Set; @@ -44,16 +46,22 @@ ActorFuture takeBackup( backupsInProgress.add(inProgressBackup); - final var checkCurrentBackup = backupStore.getStatus(inProgressBackup.id()); + final var checkCurrentBackup = + backupStore.list( + new BackupIdentifierWildcardImpl( + Optional.empty(), + Optional.of(inProgressBackup.id().partitionId()), + Optional.of(inProgressBackup.checkpointId()))); final ActorFuture backupSaved = concurrencyControl.createFuture(); checkCurrentBackup.whenCompleteAsync( - (status, error) -> { + (availableBackups, error) -> { if (error != null) { backupSaved.completeExceptionally(error); } else { - takeBackupIfDoesNotExist(status, inProgressBackup, concurrencyControl, backupSaved); + takeBackupIfDoesNotExist( + availableBackups, inProgressBackup, concurrencyControl, backupSaved); } }, concurrencyControl::run); @@ -63,12 +71,17 @@ ActorFuture takeBackup( } private void takeBackupIfDoesNotExist( - final BackupStatus status, + final Collection availableBackups, final InProgressBackup inProgressBackup, final ConcurrencyControl concurrencyControl, final ActorFuture backupSaved) { - switch (status.statusCode()) { + final BackupStatusCode existingBackupStatus = + availableBackups.isEmpty() + ? BackupStatusCode.DOES_NOT_EXIST + : Collections.max(availableBackups, Comparator.comparing(BackupStatus::statusCode)) + .statusCode(); + switch (existingBackupStatus) { case COMPLETED -> { LOG.debug("Backup {} is already completed, will not take a new one", inProgressBackup.id()); backupSaved.complete(null); @@ -77,9 +90,9 @@ private void takeBackupIfDoesNotExist( LOG.error( "Backup {} already exists with status {}, will not take a new one", inProgressBackup.id(), - status); + existingBackupStatus); backupSaved.completeExceptionally( - new BackupAlreadyExistsException(inProgressBackup.id(), status)); + new BackupAlreadyExistsException(inProgressBackup.id(), existingBackupStatus)); } default -> { final ActorFuture snapshotFound = concurrencyControl.createFuture(); From 876371704c42fa224ffc0722d88077c69fc3cd4b Mon Sep 17 00:00:00 2001 From: Deepthi Devaki Akkoorath Date: Tue, 2 May 2023 14:46:02 +0200 Subject: [PATCH 2/3] test(backup): verify backup is not re-taken when already exist Updated the test to remove mocking of InProgressBackup. The existing test was updated to use list instead of getStatus to find duplicate backups. --- .../management/BackupServiceImplTest.java | 195 ++++++++++++------ 1 file changed, 132 insertions(+), 63 deletions(-) diff --git a/backup/src/test/java/io/camunda/zeebe/backup/management/BackupServiceImplTest.java b/backup/src/test/java/io/camunda/zeebe/backup/management/BackupServiceImplTest.java index 6a6b42852a52..84a4880f4f69 100644 --- a/backup/src/test/java/io/camunda/zeebe/backup/management/BackupServiceImplTest.java +++ b/backup/src/test/java/io/camunda/zeebe/backup/management/BackupServiceImplTest.java @@ -18,14 +18,16 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import io.camunda.zeebe.backup.api.Backup; +import io.camunda.zeebe.backup.api.BackupIdentifier; import io.camunda.zeebe.backup.api.BackupStatus; import io.camunda.zeebe.backup.api.BackupStatusCode; import io.camunda.zeebe.backup.api.BackupStore; import io.camunda.zeebe.backup.common.BackupIdentifierImpl; import io.camunda.zeebe.backup.common.BackupIdentifierWildcardImpl; import io.camunda.zeebe.backup.common.BackupStatusImpl; -import io.camunda.zeebe.scheduler.ConcurrencyControl; import io.camunda.zeebe.scheduler.future.ActorFuture; +import io.camunda.zeebe.scheduler.testing.TestActorFuture; import io.camunda.zeebe.scheduler.testing.TestConcurrencyControl; import java.time.Duration; import java.util.List; @@ -43,13 +45,12 @@ @ExtendWith(MockitoExtension.class) class BackupServiceImplTest { - @Mock InProgressBackup inProgressBackup; @Mock BackupStore backupStore; @Mock BackupStatus notExistingBackupStatus; private BackupServiceImpl backupService; - private final ConcurrencyControl concurrencyControl = new TestConcurrencyControl(); + private final TestConcurrencyControl concurrencyControl = new TestConcurrencyControl(); @BeforeEach void setup() { @@ -61,12 +62,19 @@ void setup() { lenient() .when(backupStore.getStatus(any())) .thenReturn(CompletableFuture.completedFuture(notExistingBackupStatus)); + lenient() + .when( + backupStore.list( + new BackupIdentifierWildcardImpl( + Optional.empty(), Optional.of(2), Optional.of(3L)))) + .thenReturn(CompletableFuture.completedFuture(List.of())); } @Test void shouldTakeBackup() { // given - mockInProgressBackup(); + final ControllableInProgressBackup inProgressBackup = new ControllableInProgressBackup(); + mockSaveBackup(); // when final var result = backupService.takeBackup(inProgressBackup, concurrencyControl); @@ -79,10 +87,10 @@ void shouldTakeBackup() { @Test void shouldCloseAllInProgressBackupsWhenClosing() { // given - final InProgressBackup backup1 = mock(InProgressBackup.class); - final InProgressBackup backup2 = mock(InProgressBackup.class); - when(backup1.findSegmentFiles()).thenReturn(concurrencyControl.createFuture()); - when(backup2.findSegmentFiles()).thenReturn(concurrencyControl.createFuture()); + final ControllableInProgressBackup backup1 = + new ControllableInProgressBackup().waitOnFindSegmentFiles(); + final ControllableInProgressBackup backup2 = + new ControllableInProgressBackup().waitOnFindSegmentFiles(); backupService.takeBackup(backup1, concurrencyControl); backupService.takeBackup(backup2, concurrencyControl); @@ -91,14 +99,15 @@ void shouldCloseAllInProgressBackupsWhenClosing() { backupService.close(); // then - verify(backup1).close(); - verify(backup2).close(); + assertThat(backup1.isClosed()).isTrue(); + assertThat(backup2.isClosed()).isTrue(); } @Test void shouldCloseInProgressBackupsAfterBackupIsTaken() { // given - mockInProgressBackup(); + final ControllableInProgressBackup inProgressBackup = new ControllableInProgressBackup(); + mockSaveBackup(); // when final var result = backupService.takeBackup(inProgressBackup, concurrencyControl); @@ -110,8 +119,8 @@ void shouldCloseInProgressBackupsAfterBackupIsTaken() { @Test void shouldFailBackupWhenNoValidSnapshotFound() { // given - mockFindSegmentFiles(); - when(inProgressBackup.findValidSnapshot()).thenReturn(failedFuture()); + final ControllableInProgressBackup inProgressBackup = + new ControllableInProgressBackup().failOnFindValidSnapshot(); // when final var result = backupService.takeBackup(inProgressBackup, concurrencyControl); @@ -121,60 +130,53 @@ void shouldFailBackupWhenNoValidSnapshotFound() { .failsWithin(Duration.ofMillis(1000)) .withThrowableOfType(ExecutionException.class) .withMessageContaining("Expected"); - verifyInProgressBackupIsCleanedUpAfterFailure(); + verifyInProgressBackupIsCleanedUpAfterFailure(inProgressBackup); } @Test void shouldFailBackupWhenSnapshotCannotBeReserved() { // given - mockFindSegmentFiles(); - mockFindValidSnapshot(); - when(inProgressBackup.reserveSnapshot()).thenReturn(failedFuture()); + final var inProgressBackup = new ControllableInProgressBackup().failOnReserveSnapshot(); // when final var result = backupService.takeBackup(inProgressBackup, concurrencyControl); // then assertThat(result).failsWithin(Duration.ofMillis(100)); - verifyInProgressBackupIsCleanedUpAfterFailure(); + verifyInProgressBackupIsCleanedUpAfterFailure(inProgressBackup); } @Test void shouldFailBackupWhenSnapshotFilesCannotBeCollected() { // given - mockFindSegmentFiles(); - mockFindValidSnapshot(); - mockReserveSnapshot(); - when(inProgressBackup.findSnapshotFiles()).thenReturn(failedFuture()); + final var inProgressBackup = new ControllableInProgressBackup().failOnFindSnapshotFiles(); // when final var result = backupService.takeBackup(inProgressBackup, concurrencyControl); // then assertThat(result).failsWithin(Duration.ofMillis(100)); - verifyInProgressBackupIsCleanedUpAfterFailure(); + verifyInProgressBackupIsCleanedUpAfterFailure(inProgressBackup); } @Test void shouldFailBackupWhenSegmentFilesCannotBeCollected() { // given - when(inProgressBackup.findSegmentFiles()).thenReturn(failedFuture()); + final ControllableInProgressBackup inProgressBackup = + new ControllableInProgressBackup().failOnFindSegmentFiles(); // when final var result = backupService.takeBackup(inProgressBackup, concurrencyControl); // then assertThat(result).failsWithin(Duration.ofMillis(100)); - verifyInProgressBackupIsCleanedUpAfterFailure(); + verifyInProgressBackupIsCleanedUpAfterFailure(inProgressBackup); } @Test void shouldFailBackupIfStoringFailed() { // given - mockFindValidSnapshot(); - mockReserveSnapshot(); - mockFindSnapshotFiles(); - mockFindSegmentFiles(); + final ControllableInProgressBackup inProgressBackup = new ControllableInProgressBackup(); when(backupStore.save(any())) .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Expected"))); @@ -183,7 +185,7 @@ void shouldFailBackupIfStoringFailed() { // then assertThat(result).failsWithin(Duration.ofMillis(100)); - verifyInProgressBackupIsCleanedUpAfterFailure(); + verifyInProgressBackupIsCleanedUpAfterFailure(inProgressBackup); } @Test @@ -285,9 +287,15 @@ void shouldMarkInProgressBackupsAsFailed() { @Test void shouldNotTakeNewBackupIfBackupAlreadyCompleted() { // given + final ControllableInProgressBackup inProgressBackup = new ControllableInProgressBackup(); final BackupStatus status = mock(BackupStatus.class); when(status.statusCode()).thenReturn(BackupStatusCode.COMPLETED); - when(backupStore.getStatus(any())).thenReturn(CompletableFuture.completedFuture(status)); + when(backupStore.list( + new BackupIdentifierWildcardImpl( + Optional.empty(), + Optional.of(inProgressBackup.id.partitionId()), + Optional.of(inProgressBackup.checkpointId())))) + .thenReturn(CompletableFuture.completedFuture(List.of(status))); // when backupService.takeBackup(inProgressBackup, concurrencyControl).join(); @@ -302,9 +310,10 @@ void shouldNotTakeNewBackupIfBackupAlreadyCompleted() { names = {"IN_PROGRESS", "FAILED"}) void shouldNotTakeNewBackupIfBackupAlreadyExists(final BackupStatusCode statusCode) { // given + final ControllableInProgressBackup inProgressBackup = new ControllableInProgressBackup(); final BackupStatus status = mock(BackupStatus.class); when(status.statusCode()).thenReturn(statusCode); - when(backupStore.getStatus(any())).thenReturn(CompletableFuture.completedFuture(status)); + when(backupStore.list(any())).thenReturn(CompletableFuture.completedFuture(List.of(status))); // when assertThat(backupService.takeBackup(inProgressBackup, concurrencyControl)) @@ -390,44 +399,104 @@ void shouldListAvailableBackups() { } private ActorFuture failedFuture() { - final ActorFuture future = concurrencyControl.createFuture(); - future.completeExceptionally(new RuntimeException("Expected")); - return future; - } - - private void mockInProgressBackup() { - mockFindValidSnapshot(); - mockReserveSnapshot(); - mockFindSnapshotFiles(); - mockFindSegmentFiles(); - mockSaveBackup(); + return concurrencyControl.failedFuture(new RuntimeException("Expected")); } - private void mockFindSnapshotFiles() { - when(inProgressBackup.findSnapshotFiles()) - .thenReturn(concurrencyControl.createCompletedFuture()); - } - - private void mockReserveSnapshot() { - when(inProgressBackup.reserveSnapshot()).thenReturn(concurrencyControl.createCompletedFuture()); - } - - private void mockFindValidSnapshot() { - when(inProgressBackup.findValidSnapshot()) - .thenReturn(concurrencyControl.createCompletedFuture()); + private void verifyInProgressBackupIsCleanedUpAfterFailure( + final ControllableInProgressBackup inProgressBackup) { + verify(backupStore).markFailed(any(), any()); + assertThat(inProgressBackup.isClosed()).isTrue(); } private void mockSaveBackup() { when(backupStore.save(any())).thenReturn(CompletableFuture.completedFuture(null)); } - private void mockFindSegmentFiles() { - when(inProgressBackup.findSegmentFiles()) - .thenReturn(concurrencyControl.createCompletedFuture()); - } - - private void verifyInProgressBackupIsCleanedUpAfterFailure() { - verify(backupStore).markFailed(any(), any()); - verify(inProgressBackup).close(); + class ControllableInProgressBackup implements InProgressBackup { + + private final BackupIdentifier id; + private ActorFuture findValidSnapshotFuture = TestActorFuture.completedFuture(null); + private ActorFuture reserveSnapshotFuture = TestActorFuture.completedFuture(null); + private ActorFuture findSnapshotFilesFuture = TestActorFuture.completedFuture(null); + private ActorFuture findSegmentFilesFuture = TestActorFuture.completedFuture(null); + private boolean closed; + + ControllableInProgressBackup() { + id = new BackupIdentifierImpl(1, 2, 3); + } + + @Override + public long checkpointId() { + return id.checkpointId(); + } + + @Override + public long checkpointPosition() { + return 1; + } + + @Override + public BackupIdentifier id() { + return id; + } + + @Override + public ActorFuture findValidSnapshot() { + return findValidSnapshotFuture; + } + + @Override + public ActorFuture reserveSnapshot() { + return reserveSnapshotFuture; + } + + @Override + public ActorFuture findSnapshotFiles() { + return findSnapshotFilesFuture; + } + + @Override + public ActorFuture findSegmentFiles() { + return findSegmentFilesFuture; + } + + @Override + public Backup createBackup() { + return null; + } + + @Override + public void close() { + closed = true; + } + + boolean isClosed() { + return closed; + } + + ControllableInProgressBackup waitOnFindSegmentFiles() { + findSegmentFilesFuture = concurrencyControl.createFuture(); + return this; + } + + ControllableInProgressBackup failOnFindSegmentFiles() { + findSegmentFilesFuture = failedFuture(); + return this; + } + + ControllableInProgressBackup failOnFindSnapshotFiles() { + findSnapshotFilesFuture = failedFuture(); + return this; + } + + ControllableInProgressBackup failOnReserveSnapshot() { + reserveSnapshotFuture = failedFuture(); + return this; + } + + ControllableInProgressBackup failOnFindValidSnapshot() { + findValidSnapshotFuture = failedFuture(); + return this; + } } } From 65e08d97ea7530e4d554e48dde53a89ebb516f9e Mon Sep 17 00:00:00 2001 From: Deepthi Devaki Akkoorath Date: Wed, 3 May 2023 14:01:55 +0200 Subject: [PATCH 3/3] refactor(backup): use existing comparator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Ole Schönburg --- .../io/camunda/zeebe/backup/management/BackupServiceImpl.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/backup/src/main/java/io/camunda/zeebe/backup/management/BackupServiceImpl.java b/backup/src/main/java/io/camunda/zeebe/backup/management/BackupServiceImpl.java index 5ed078e45358..764aed238236 100644 --- a/backup/src/main/java/io/camunda/zeebe/backup/management/BackupServiceImpl.java +++ b/backup/src/main/java/io/camunda/zeebe/backup/management/BackupServiceImpl.java @@ -16,7 +16,6 @@ import io.camunda.zeebe.scheduler.future.ActorFuture; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.HashSet; import java.util.Optional; import java.util.Set; @@ -79,8 +78,7 @@ private void takeBackupIfDoesNotExist( final BackupStatusCode existingBackupStatus = availableBackups.isEmpty() ? BackupStatusCode.DOES_NOT_EXIST - : Collections.max(availableBackups, Comparator.comparing(BackupStatus::statusCode)) - .statusCode(); + : Collections.max(availableBackups, BackupStatusCode.BY_STATUS).statusCode(); switch (existingBackupStatus) { case COMPLETED -> { LOG.debug("Backup {} is already completed, will not take a new one", inProgressBackup.id());