Skip to content

Commit

Permalink
fix(backup): do not take backup when one already exists
Browse files Browse the repository at this point in the history
Previously, we were only checking for backups taken by this node.
However, there could be backups taken by another node. For example, after
restore the checkpoint record is processed again, but it must not re-take
the backup. To prevent that, we should check for all backups with the given
backup id.
  • Loading branch information
deepthidevaki committed May 2, 2023
1 parent b6e73bb commit a886d80
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
package io.camunda.zeebe.backup.management;

import io.camunda.zeebe.backup.api.BackupIdentifier;
import io.camunda.zeebe.backup.api.BackupStatus;
import io.camunda.zeebe.backup.api.BackupStatusCode;

class BackupAlreadyExistsException extends RuntimeException {

BackupAlreadyExistsException(final BackupIdentifier id, final BackupStatus status) {
BackupAlreadyExistsException(final BackupIdentifier id, final BackupStatusCode status) {
super("Backup with id %s already exists, status of the backup is %s".formatted(id, status));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -44,16 +46,22 @@ ActorFuture<Void> takeBackup(

backupsInProgress.add(inProgressBackup);

final var checkCurrentBackup = backupStore.getStatus(inProgressBackup.id());
final var checkCurrentBackup =
backupStore.list(
new BackupIdentifierWildcardImpl(
Optional.empty(),
Optional.of(inProgressBackup.id().partitionId()),
Optional.of(inProgressBackup.checkpointId())));

final ActorFuture<Void> backupSaved = concurrencyControl.createFuture();

checkCurrentBackup.whenCompleteAsync(
(status, error) -> {
(availableBackups, error) -> {
if (error != null) {
backupSaved.completeExceptionally(error);
} else {
takeBackupIfDoesNotExist(status, inProgressBackup, concurrencyControl, backupSaved);
takeBackupIfDoesNotExist(
availableBackups, inProgressBackup, concurrencyControl, backupSaved);
}
},
concurrencyControl::run);
Expand All @@ -63,12 +71,17 @@ ActorFuture<Void> takeBackup(
}

private void takeBackupIfDoesNotExist(
final BackupStatus status,
final Collection<BackupStatus> availableBackups,
final InProgressBackup inProgressBackup,
final ConcurrencyControl concurrencyControl,
final ActorFuture<Void> backupSaved) {

switch (status.statusCode()) {
final BackupStatusCode existingBackupStatus =
availableBackups.isEmpty()
? BackupStatusCode.DOES_NOT_EXIST
: Collections.max(availableBackups, Comparator.comparing(BackupStatus::statusCode))
.statusCode();
switch (existingBackupStatus) {
case COMPLETED -> {
LOG.debug("Backup {} is already completed, will not take a new one", inProgressBackup.id());
backupSaved.complete(null);
Expand All @@ -77,9 +90,9 @@ private void takeBackupIfDoesNotExist(
LOG.error(
"Backup {} already exists with status {}, will not take a new one",
inProgressBackup.id(),
status);
existingBackupStatus);
backupSaved.completeExceptionally(
new BackupAlreadyExistsException(inProgressBackup.id(), status));
new BackupAlreadyExistsException(inProgressBackup.id(), existingBackupStatus));
}
default -> {
final ActorFuture<Void> snapshotFound = concurrencyControl.createFuture();
Expand Down

0 comments on commit a886d80

Please sign in to comment.