Skip to content

Commit

Permalink
block snapshot writing when namespace of entity is blocked
Browse files Browse the repository at this point in the history
Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Aug 31, 2022
1 parent 4a7a109 commit dd6f1de
Showing 1 changed file with 35 additions and 13 deletions.
Expand Up @@ -20,6 +20,7 @@
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.id.NamespacedEntityId;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeExceptionBuilder;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
Expand All @@ -32,6 +33,7 @@
import org.eclipse.ditto.internal.utils.akka.PingCommandResponse;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.namespaces.BlockedNamespaces;
import org.eclipse.ditto.internal.utils.persistence.SnapshotAdapter;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.ActivityCheckConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.SnapshotConfig;
Expand Down Expand Up @@ -96,6 +98,7 @@ public abstract class AbstractPersistenceActor<
protected final I entityId;

private long accessCounter = 0L;
private final BlockedNamespaces blockedNamespaces;

/**
* Instantiate the actor.
Expand All @@ -121,6 +124,7 @@ protected AbstractPersistenceActor(final I entityId) {
.build();

handleCleanups = super.createReceive();
blockedNamespaces = BlockedNamespaces.of(actorSystem);
}

/**
Expand Down Expand Up @@ -288,14 +292,14 @@ protected void becomeCreatedHandler() {
final CommandStrategy<C, S, K, E> commandStrategy = getCreatedStrategy();

final Receive receive = handleCleanups.orElse(ReceiveBuilder.create()
.match(commandStrategy.getMatchingClass(), commandStrategy::isDefined, this::handleByCommandStrategy)
.match(PersistEmptyEvent.class, this::handlePersistEmptyEvent)
.match(CheckForActivity.class, this::checkForActivity)
.match(PingCommand.class, this::processPingCommand)
.matchEquals(Control.TAKE_SNAPSHOT, this::takeSnapshotByInterval)
.match(SaveSnapshotSuccess.class, this::saveSnapshotSuccess)
.match(SaveSnapshotFailure.class, this::saveSnapshotFailure)
.build())
.match(commandStrategy.getMatchingClass(), commandStrategy::isDefined, this::handleByCommandStrategy)
.match(PersistEmptyEvent.class, this::handlePersistEmptyEvent)
.match(CheckForActivity.class, this::checkForActivity)
.match(PingCommand.class, this::processPingCommand)
.matchEquals(Control.TAKE_SNAPSHOT, this::takeSnapshotByInterval)
.match(SaveSnapshotSuccess.class, this::saveSnapshotSuccess)
.match(SaveSnapshotFailure.class, this::saveSnapshotFailure)
.build())
.orElse(matchAnyAfterInitialization());

getContext().become(receive);
Expand Down Expand Up @@ -416,11 +420,11 @@ protected void passivate() {

private Receive createDeletedBehavior() {
return handleCleanups.orElse(handleByDeletedStrategyReceiveBuilder()
.match(CheckForActivity.class, this::checkForActivity)
.matchEquals(Control.TAKE_SNAPSHOT, this::takeSnapshotByInterval)
.match(SaveSnapshotSuccess.class, this::saveSnapshotSuccess)
.match(SaveSnapshotFailure.class, this::saveSnapshotFailure)
.build())
.match(CheckForActivity.class, this::checkForActivity)
.matchEquals(Control.TAKE_SNAPSHOT, this::takeSnapshotByInterval)
.match(SaveSnapshotSuccess.class, this::saveSnapshotSuccess)
.match(SaveSnapshotFailure.class, this::saveSnapshotFailure)
.build())
.orElse(matchAnyWhenDeleted());
}

Expand Down Expand Up @@ -557,6 +561,24 @@ sequence no (e.g. 2), but old entity revision no (e.g. 1) will be created -> can
}

private void takeSnapshot(final String reason) {

if (entityId instanceof NamespacedEntityId namespacedEntityId) {
final String namespace = namespacedEntityId.getNamespace();
blockedNamespaces.contains(namespace).thenAccept(namespaceIsBlocked -> {
if (namespaceIsBlocked) {
log.debug("Not taking snapshot for entity <{}> even if {}, because namespace is blocked.",
entityId, reason);
} else {
doTakeSnapshot(reason);
}
});
} else {
doTakeSnapshot(reason);
}

}

private void doTakeSnapshot(final String reason) {
final long revision = getRevisionNumber();
if (entity != null && lastSnapshotRevision != revision) {
log.debug("Taking snapshot for entity with ID <{}> and sequence number <{}> because {}.", entityId,
Expand Down

0 comments on commit dd6f1de

Please sign in to comment.