From d0e725d3e168626b042d5d905b1936509053e0a5 Mon Sep 17 00:00:00 2001 From: Deepthi Devaki Akkoorath Date: Tue, 2 May 2023 14:31:30 +0200 Subject: [PATCH 1/3] fix(backup): do not take backup when one already exists Previously, we were only checking for backups taken by this node. However, there could be backups taken by another node. For example, after restore the checkpoint record is processed again, but it must not re-take the backup. To prevent that, we should check for all backups with the given backup id. (cherry picked from commit a886d809b6ec48e9bc5779b4856669210d8b7200) --- .../BackupAlreadyExistsException.java | 6 ++--- .../backup/management/BackupServiceImpl.java | 27 ++++++++++++++----- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/backup/src/main/java/io/camunda/zeebe/backup/management/BackupAlreadyExistsException.java b/backup/src/main/java/io/camunda/zeebe/backup/management/BackupAlreadyExistsException.java index 0af319d402e0..09f929f9fa1f 100644 --- a/backup/src/main/java/io/camunda/zeebe/backup/management/BackupAlreadyExistsException.java +++ b/backup/src/main/java/io/camunda/zeebe/backup/management/BackupAlreadyExistsException.java @@ -8,11 +8,11 @@ package io.camunda.zeebe.backup.management; import io.camunda.zeebe.backup.api.BackupIdentifier; -import io.camunda.zeebe.backup.api.BackupStatus; +import io.camunda.zeebe.backup.api.BackupStatusCode; -public class BackupAlreadyExistsException extends RuntimeException { +class BackupAlreadyExistsException extends RuntimeException { - public BackupAlreadyExistsException(final BackupIdentifier id, final BackupStatus status) { + BackupAlreadyExistsException(final BackupIdentifier id, final BackupStatusCode status) { super("Backup with id %s already exists, status of the backup is %s".formatted(id, status)); } } diff --git a/backup/src/main/java/io/camunda/zeebe/backup/management/BackupServiceImpl.java b/backup/src/main/java/io/camunda/zeebe/backup/management/BackupServiceImpl.java index 66091f014312..ac240c552c3b 100644 --- a/backup/src/main/java/io/camunda/zeebe/backup/management/BackupServiceImpl.java +++ b/backup/src/main/java/io/camunda/zeebe/backup/management/BackupServiceImpl.java @@ -18,6 +18,8 @@ import io.camunda.zeebe.scheduler.future.ActorFuture; import io.camunda.zeebe.scheduler.future.CompletableActorFuture; import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; import java.util.HashSet; import java.util.Optional; import java.util.Set; @@ -46,16 +48,22 @@ ActorFuture takeBackup( backupsInProgress.add(inProgressBackup); - final var checkCurrentBackup = backupStore.getStatus(inProgressBackup.id()); + final var checkCurrentBackup = + backupStore.list( + new BackupIdentifierWildcardImpl( + Optional.empty(), + Optional.of(inProgressBackup.id().partitionId()), + Optional.of(inProgressBackup.checkpointId()))); final ActorFuture backupSaved = concurrencyControl.createFuture(); checkCurrentBackup.whenCompleteAsync( - (status, error) -> { + (availableBackups, error) -> { if (error != null) { backupSaved.completeExceptionally(error); } else { - takeBackupIfDoesNotExist(status, inProgressBackup, concurrencyControl, backupSaved); + takeBackupIfDoesNotExist( + availableBackups, inProgressBackup, concurrencyControl, backupSaved); } }, concurrencyControl::run); @@ -65,12 +73,17 @@ ActorFuture takeBackup( } private void takeBackupIfDoesNotExist( - final BackupStatus status, + final Collection availableBackups, final InProgressBackup inProgressBackup, final ConcurrencyControl concurrencyControl, final ActorFuture backupSaved) { - switch (status.statusCode()) { + final BackupStatusCode existingBackupStatus = + availableBackups.isEmpty() + ? BackupStatusCode.DOES_NOT_EXIST + : Collections.max(availableBackups, Comparator.comparing(BackupStatus::statusCode)) + .statusCode(); + switch (existingBackupStatus) { case COMPLETED -> { LOG.debug("Backup {} is already completed, will not take a new one", inProgressBackup.id()); backupSaved.complete(null); @@ -79,9 +92,9 @@ private void takeBackupIfDoesNotExist( LOG.error( "Backup {} already exists with status {}, will not take a new one", inProgressBackup.id(), - status); + existingBackupStatus); backupSaved.completeExceptionally( - new BackupAlreadyExistsException(inProgressBackup.id(), status)); + new BackupAlreadyExistsException(inProgressBackup.id(), existingBackupStatus)); } default -> { final ActorFuture snapshotFound = concurrencyControl.createFuture(); From 36899fb88e5c3f7dd10f55f63758f72d7ca07187 Mon Sep 17 00:00:00 2001 From: Deepthi Devaki Akkoorath Date: Tue, 2 May 2023 14:46:02 +0200 Subject: [PATCH 2/3] test(backup): verify backup is not re-taken when already exist Updated the test to remove mocking of InProgressBackup. The existing test was updated to use list instead of getStatus to find duplicate backups. (cherry picked from commit 876371704c42fa224ffc0722d88077c69fc3cd4b) --- .../management/BackupServiceImplTest.java | 195 ++++++++++++------ 1 file changed, 132 insertions(+), 63 deletions(-) diff --git a/backup/src/test/java/io/camunda/zeebe/backup/management/BackupServiceImplTest.java b/backup/src/test/java/io/camunda/zeebe/backup/management/BackupServiceImplTest.java index 110f4d2ee698..fa74d7cd3860 100644 --- a/backup/src/test/java/io/camunda/zeebe/backup/management/BackupServiceImplTest.java +++ b/backup/src/test/java/io/camunda/zeebe/backup/management/BackupServiceImplTest.java @@ -16,13 +16,15 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import io.camunda.zeebe.backup.api.Backup; +import io.camunda.zeebe.backup.api.BackupIdentifier; import io.camunda.zeebe.backup.api.BackupStatus; import io.camunda.zeebe.backup.api.BackupStatusCode; import io.camunda.zeebe.backup.api.BackupStore; import io.camunda.zeebe.backup.common.BackupIdentifierImpl; import io.camunda.zeebe.backup.common.BackupStatusImpl; -import io.camunda.zeebe.scheduler.ConcurrencyControl; import io.camunda.zeebe.scheduler.future.ActorFuture; +import io.camunda.zeebe.scheduler.testing.TestActorFuture; import io.camunda.zeebe.scheduler.testing.TestConcurrencyControl; import java.time.Duration; import java.util.List; @@ -40,13 +42,12 @@ @ExtendWith(MockitoExtension.class) class BackupServiceImplTest { - @Mock InProgressBackup inProgressBackup; @Mock BackupStore backupStore; @Mock BackupStatus notExistingBackupStatus; private BackupServiceImpl backupService; - private final ConcurrencyControl concurrencyControl = new TestConcurrencyControl(); + private final TestConcurrencyControl concurrencyControl = new TestConcurrencyControl(); @BeforeEach void setup() { @@ -58,12 +59,19 @@ void setup() { lenient() .when(backupStore.getStatus(any())) .thenReturn(CompletableFuture.completedFuture(notExistingBackupStatus)); + lenient() + .when( + backupStore.list( + new BackupIdentifierWildcardImpl( + Optional.empty(), Optional.of(2), Optional.of(3L)))) + .thenReturn(CompletableFuture.completedFuture(List.of())); } @Test void shouldTakeBackup() { // given - mockInProgressBackup(); + final ControllableInProgressBackup inProgressBackup = new ControllableInProgressBackup(); + mockSaveBackup(); // when final var result = backupService.takeBackup(inProgressBackup, concurrencyControl); @@ -76,10 +84,10 @@ void shouldTakeBackup() { @Test void shouldCloseAllInProgressBackupsWhenClosing() { // given - final InProgressBackup backup1 = mock(InProgressBackup.class); - final InProgressBackup backup2 = mock(InProgressBackup.class); - when(backup1.findSegmentFiles()).thenReturn(concurrencyControl.createFuture()); - when(backup2.findSegmentFiles()).thenReturn(concurrencyControl.createFuture()); + final ControllableInProgressBackup backup1 = + new ControllableInProgressBackup().waitOnFindSegmentFiles(); + final ControllableInProgressBackup backup2 = + new ControllableInProgressBackup().waitOnFindSegmentFiles(); backupService.takeBackup(backup1, concurrencyControl); backupService.takeBackup(backup2, concurrencyControl); @@ -88,14 +96,15 @@ void shouldCloseAllInProgressBackupsWhenClosing() { backupService.close(); // then - verify(backup1).close(); - verify(backup2).close(); + assertThat(backup1.isClosed()).isTrue(); + assertThat(backup2.isClosed()).isTrue(); } @Test void shouldCloseInProgressBackupsAfterBackupIsTaken() { // given - mockInProgressBackup(); + final ControllableInProgressBackup inProgressBackup = new ControllableInProgressBackup(); + mockSaveBackup(); // when final var result = backupService.takeBackup(inProgressBackup, concurrencyControl); @@ -107,8 +116,8 @@ void shouldCloseInProgressBackupsAfterBackupIsTaken() { @Test void shouldFailBackupWhenNoValidSnapshotFound() { // given - mockFindSegmentFiles(); - when(inProgressBackup.findValidSnapshot()).thenReturn(failedFuture()); + final ControllableInProgressBackup inProgressBackup = + new ControllableInProgressBackup().failOnFindValidSnapshot(); // when final var result = backupService.takeBackup(inProgressBackup, concurrencyControl); @@ -118,60 +127,53 @@ void shouldFailBackupWhenNoValidSnapshotFound() { .failsWithin(Duration.ofMillis(1000)) .withThrowableOfType(ExecutionException.class) .withMessageContaining("Expected"); - verifyInProgressBackupIsCleanedUpAfterFailure(); + verifyInProgressBackupIsCleanedUpAfterFailure(inProgressBackup); } @Test void shouldFailBackupWhenSnapshotCannotBeReserved() { // given - mockFindSegmentFiles(); - mockFindValidSnapshot(); - when(inProgressBackup.reserveSnapshot()).thenReturn(failedFuture()); + final var inProgressBackup = new ControllableInProgressBackup().failOnReserveSnapshot(); // when final var result = backupService.takeBackup(inProgressBackup, concurrencyControl); // then assertThat(result).failsWithin(Duration.ofMillis(100)); - verifyInProgressBackupIsCleanedUpAfterFailure(); + verifyInProgressBackupIsCleanedUpAfterFailure(inProgressBackup); } @Test void shouldFailBackupWhenSnapshotFilesCannotBeCollected() { // given - mockFindSegmentFiles(); - mockFindValidSnapshot(); - mockReserveSnapshot(); - when(inProgressBackup.findSnapshotFiles()).thenReturn(failedFuture()); + final var inProgressBackup = new ControllableInProgressBackup().failOnFindSnapshotFiles(); // when final var result = backupService.takeBackup(inProgressBackup, concurrencyControl); // then assertThat(result).failsWithin(Duration.ofMillis(100)); - verifyInProgressBackupIsCleanedUpAfterFailure(); + verifyInProgressBackupIsCleanedUpAfterFailure(inProgressBackup); } @Test void shouldFailBackupWhenSegmentFilesCannotBeCollected() { // given - when(inProgressBackup.findSegmentFiles()).thenReturn(failedFuture()); + final ControllableInProgressBackup inProgressBackup = + new ControllableInProgressBackup().failOnFindSegmentFiles(); // when final var result = backupService.takeBackup(inProgressBackup, concurrencyControl); // then assertThat(result).failsWithin(Duration.ofMillis(100)); - verifyInProgressBackupIsCleanedUpAfterFailure(); + verifyInProgressBackupIsCleanedUpAfterFailure(inProgressBackup); } @Test void shouldFailBackupIfStoringFailed() { // given - mockFindValidSnapshot(); - mockReserveSnapshot(); - mockFindSnapshotFiles(); - mockFindSegmentFiles(); + final ControllableInProgressBackup inProgressBackup = new ControllableInProgressBackup(); when(backupStore.save(any())) .thenReturn(CompletableFuture.failedFuture(new RuntimeException("Expected"))); @@ -180,7 +182,7 @@ void shouldFailBackupIfStoringFailed() { // then assertThat(result).failsWithin(Duration.ofMillis(100)); - verifyInProgressBackupIsCleanedUpAfterFailure(); + verifyInProgressBackupIsCleanedUpAfterFailure(inProgressBackup); } @Test @@ -310,9 +312,15 @@ void shouldMarkRemainingBackupsAsFailedWhenThrowsError() { @Test void shouldNotTakeNewBackupIfBackupAlreadyCompleted() { // given + final ControllableInProgressBackup inProgressBackup = new ControllableInProgressBackup(); final BackupStatus status = mock(BackupStatus.class); when(status.statusCode()).thenReturn(BackupStatusCode.COMPLETED); - when(backupStore.getStatus(any())).thenReturn(CompletableFuture.completedFuture(status)); + when(backupStore.list( + new BackupIdentifierWildcardImpl( + Optional.empty(), + Optional.of(inProgressBackup.id.partitionId()), + Optional.of(inProgressBackup.checkpointId())))) + .thenReturn(CompletableFuture.completedFuture(List.of(status))); // when backupService.takeBackup(inProgressBackup, concurrencyControl).join(); @@ -327,9 +335,10 @@ void shouldNotTakeNewBackupIfBackupAlreadyCompleted() { names = {"IN_PROGRESS", "FAILED"}) void shouldNotTakeNewBackupIfBackupAlreadyExists(final BackupStatusCode statusCode) { // given + final ControllableInProgressBackup inProgressBackup = new ControllableInProgressBackup(); final BackupStatus status = mock(BackupStatus.class); when(status.statusCode()).thenReturn(statusCode); - when(backupStore.getStatus(any())).thenReturn(CompletableFuture.completedFuture(status)); + when(backupStore.list(any())).thenReturn(CompletableFuture.completedFuture(List.of(status))); // when assertThat(backupService.takeBackup(inProgressBackup, concurrencyControl)) @@ -342,44 +351,104 @@ void shouldNotTakeNewBackupIfBackupAlreadyExists(final BackupStatusCode statusCo } private ActorFuture failedFuture() { - final ActorFuture future = concurrencyControl.createFuture(); - future.completeExceptionally(new RuntimeException("Expected")); - return future; + return concurrencyControl.failedFuture(new RuntimeException("Expected")); } - private void mockInProgressBackup() { - mockFindValidSnapshot(); - mockReserveSnapshot(); - mockFindSnapshotFiles(); - mockFindSegmentFiles(); - mockSaveBackup(); - } - - private void mockFindSnapshotFiles() { - when(inProgressBackup.findSnapshotFiles()) - .thenReturn(concurrencyControl.createCompletedFuture()); - } - - private void mockReserveSnapshot() { - when(inProgressBackup.reserveSnapshot()).thenReturn(concurrencyControl.createCompletedFuture()); - } - - private void mockFindValidSnapshot() { - when(inProgressBackup.findValidSnapshot()) - .thenReturn(concurrencyControl.createCompletedFuture()); + private void verifyInProgressBackupIsCleanedUpAfterFailure( + final ControllableInProgressBackup inProgressBackup) { + verify(backupStore).markFailed(any(), any()); + assertThat(inProgressBackup.isClosed()).isTrue(); } private void mockSaveBackup() { when(backupStore.save(any())).thenReturn(CompletableFuture.completedFuture(null)); } - private void mockFindSegmentFiles() { - when(inProgressBackup.findSegmentFiles()) - .thenReturn(concurrencyControl.createCompletedFuture()); - } - - private void verifyInProgressBackupIsCleanedUpAfterFailure() { - verify(backupStore).markFailed(any(), any()); - verify(inProgressBackup).close(); + class ControllableInProgressBackup implements InProgressBackup { + + private final BackupIdentifier id; + private ActorFuture findValidSnapshotFuture = TestActorFuture.completedFuture(null); + private ActorFuture reserveSnapshotFuture = TestActorFuture.completedFuture(null); + private ActorFuture findSnapshotFilesFuture = TestActorFuture.completedFuture(null); + private ActorFuture findSegmentFilesFuture = TestActorFuture.completedFuture(null); + private boolean closed; + + ControllableInProgressBackup() { + id = new BackupIdentifierImpl(1, 2, 3); + } + + @Override + public long checkpointId() { + return id.checkpointId(); + } + + @Override + public long checkpointPosition() { + return 1; + } + + @Override + public BackupIdentifier id() { + return id; + } + + @Override + public ActorFuture findValidSnapshot() { + return findValidSnapshotFuture; + } + + @Override + public ActorFuture reserveSnapshot() { + return reserveSnapshotFuture; + } + + @Override + public ActorFuture findSnapshotFiles() { + return findSnapshotFilesFuture; + } + + @Override + public ActorFuture findSegmentFiles() { + return findSegmentFilesFuture; + } + + @Override + public Backup createBackup() { + return null; + } + + @Override + public void close() { + closed = true; + } + + boolean isClosed() { + return closed; + } + + ControllableInProgressBackup waitOnFindSegmentFiles() { + findSegmentFilesFuture = concurrencyControl.createFuture(); + return this; + } + + ControllableInProgressBackup failOnFindSegmentFiles() { + findSegmentFilesFuture = failedFuture(); + return this; + } + + ControllableInProgressBackup failOnFindSnapshotFiles() { + findSnapshotFilesFuture = failedFuture(); + return this; + } + + ControllableInProgressBackup failOnReserveSnapshot() { + reserveSnapshotFuture = failedFuture(); + return this; + } + + ControllableInProgressBackup failOnFindValidSnapshot() { + findValidSnapshotFuture = failedFuture(); + return this; + } } } From eaff4948dec83d6e326ce407033ca9dbd4902109 Mon Sep 17 00:00:00 2001 From: Deepthi Devaki Akkoorath Date: Wed, 3 May 2023 14:01:55 +0200 Subject: [PATCH 3/3] refactor(backup): use existing comparator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Ole Schönburg (cherry picked from commit 65e08d97ea7530e4d554e48dde53a89ebb516f9e) --- .../io/camunda/zeebe/backup/management/BackupServiceImpl.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/backup/src/main/java/io/camunda/zeebe/backup/management/BackupServiceImpl.java b/backup/src/main/java/io/camunda/zeebe/backup/management/BackupServiceImpl.java index ac240c552c3b..5fad49c15432 100644 --- a/backup/src/main/java/io/camunda/zeebe/backup/management/BackupServiceImpl.java +++ b/backup/src/main/java/io/camunda/zeebe/backup/management/BackupServiceImpl.java @@ -19,7 +19,6 @@ import io.camunda.zeebe.scheduler.future.CompletableActorFuture; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.HashSet; import java.util.Optional; import java.util.Set; @@ -81,8 +80,7 @@ private void takeBackupIfDoesNotExist( final BackupStatusCode existingBackupStatus = availableBackups.isEmpty() ? BackupStatusCode.DOES_NOT_EXIST - : Collections.max(availableBackups, Comparator.comparing(BackupStatus::statusCode)) - .statusCode(); + : Collections.max(availableBackups, BackupStatusCode.BY_STATUS).statusCode(); switch (existingBackupStatus) { case COMPLETED -> { LOG.debug("Backup {} is already completed, will not take a new one", inProgressBackup.id());