Skip to content

Commit

Permalink
feat: add partitionId to AsyncSnapshotDirector context
Browse files Browse the repository at this point in the history
(cherry picked from commit 741da77)
  • Loading branch information
Zelldon authored and github-actions[bot] committed Sep 24, 2021
1 parent a988182 commit e28cab3
Showing 1 changed file with 11 additions and 1 deletion.
Expand Up @@ -23,6 +23,7 @@
import io.camunda.zeebe.util.sched.future.CompletableActorFuture;
import java.time.Duration;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.BooleanSupplier;
import org.slf4j.Logger;
Expand Down Expand Up @@ -57,6 +58,7 @@ public final class AsyncSnapshotDirector extends Actor
private boolean persistingSnapshot;
private volatile HealthStatus healthStatus = HealthStatus.HEALTHY;
private long commitPosition;
private final int partitionId;

private AsyncSnapshotDirector(
final int nodeId,
Expand All @@ -69,14 +71,22 @@ private AsyncSnapshotDirector(
this.stateController = stateController;
processorName = streamProcessor.getName();
this.snapshotRate = snapshotRate;
actorName = buildActorName(nodeId, "SnapshotDirector", partitionId);
this.partitionId = partitionId;
actorName = buildActorName(nodeId, "SnapshotDirector", this.partitionId);
if (streamProcessorMode == StreamProcessorMode.REPLAY) {
isLastWrittenPositionCommitted = () -> true;
} else {
isLastWrittenPositionCommitted = () -> lastWrittenEventPosition <= commitPosition;
}
}

@Override
protected Map<String, String> createContext() {
final var context = super.createContext();
context.put(ACTOR_PROP_PARTITION_ID, Integer.toString(partitionId));
return context;
}

@Override
public String getName() {
return actorName;
Expand Down

0 comments on commit e28cab3

Please sign in to comment.