Skip to content

Commit

Permalink
merge: #7910
Browse files Browse the repository at this point in the history
7910: [Backport release-1.2.0] PartitionId in the logging context r=Zelldon a=github-actions[bot]

# Description
Backport of #7885 to `release-1.2.0`.

relates to #7859

Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
zeebe-bors-cloud[bot] and Zelldon committed Sep 24, 2021
2 parents a24990c + 03fb4f9 commit 58844b2
Show file tree
Hide file tree
Showing 16 changed files with 328 additions and 207 deletions.
6 changes: 6 additions & 0 deletions atomix/cluster/src/main/java/io/atomix/raft/RaftServer.java
Expand Up @@ -421,6 +421,7 @@ abstract class Builder implements io.atomix.utils.Builder<RaftServer> {
protected EntryValidator entryValidator = new NoopEntryValidator();
protected RaftElectionConfig electionConfig = RaftElectionConfig.ofDefaultElection();
protected RaftPartitionConfig partitionConfig = new RaftPartitionConfig();
protected int partitionId;

protected Builder(final MemberId localMemberId) {
this.localMemberId = checkNotNull(localMemberId, "localMemberId cannot be null");
Expand Down Expand Up @@ -500,6 +501,11 @@ public Builder withPartitionConfig(final RaftPartitionConfig partitionConfig) {
this.partitionConfig = partitionConfig;
return this;
}

public Builder withPartitionId(final int partitionId) {
this.partitionId = partitionId;
return this;
}
}

/**
Expand Down
Expand Up @@ -138,6 +138,26 @@ public CompletableFuture<Void> bootstrap(final Collection<MemberId> cluster) {
return bootstrapFutureRef.get().whenComplete((result, error) -> bootstrapFutureRef.set(null));
}

@Override
public RaftMember getLeader() {
return raft.getLeader();
}

@Override
public RaftMember getLocalMember() {
return localMember;
}

@Override
public Collection<RaftMember> getMembers() {
return new ArrayList<>(members);
}

@Override
public long getTerm() {
return raft.getTerm();
}

private void ensureConfigurationIsConsistent(final Collection<MemberId> cluster) {
final var configuration = configurationRef.get();
final var hasPersistedConfiguration = configuration != null;
Expand Down Expand Up @@ -191,26 +211,6 @@ private void createInitialConfig(final Collection<MemberId> cluster) {
configure(new Configuration(0, 0, localMember.getLastUpdated().toEpochMilli(), activeMembers));
}

@Override
public RaftMember getLeader() {
return raft.getLeader();
}

@Override
public RaftMember getLocalMember() {
return localMember;
}

@Override
public Collection<RaftMember> getMembers() {
return new ArrayList<>(members);
}

@Override
public long getTerm() {
return raft.getTerm();
}

/**
* Returns a member by ID.
*
Expand Down
Expand Up @@ -258,6 +258,7 @@ public RaftServer build() {
final RaftContext raft =
new RaftContext(
name,
partitionId,
localMemberId,
membershipService,
protocol,
Expand Down
10 changes: 10 additions & 0 deletions atomix/cluster/src/main/java/io/atomix/raft/impl/RaftContext.java
Expand Up @@ -80,6 +80,7 @@
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.MDC;

/**
* Manages the volatile state and state transitions of a Raft server.
Expand Down Expand Up @@ -130,9 +131,11 @@ public class RaftContext implements AutoCloseable, HealthMonitorable {

private long lastHeartbeat;
private final RaftPartitionConfig partitionConfig;
private final int partitionId;

public RaftContext(
final String name,
final int partitionId,
final MemberId localMemberId,
final ClusterMembershipService membershipService,
final RaftServerProtocol protocol,
Expand All @@ -146,6 +149,7 @@ public RaftContext(
this.protocol = checkNotNull(protocol, "protocol cannot be null");
this.storage = checkNotNull(storage, "storage cannot be null");
random = randomFactory.get();
this.partitionId = partitionId;

raftRoleMetrics = new RaftRoleMetrics(name);

Expand All @@ -170,6 +174,8 @@ public RaftContext(
threadContext =
threadContextFactory.createContext(
namedThreads(baseThreadName, log), this::onUncaughtException);
// in order to set the partition id once in the raft thread
threadContext.execute(() -> MDC.put("partitionId", Integer.toString(partitionId)));

// Open the metadata store.
meta = storage.openMetaStore();
Expand Down Expand Up @@ -1090,6 +1096,10 @@ public Duration getMaxQuorumResponseTimeout() {
return partitionConfig.getMaxQuorumResponseTimeout();
}

public int getPartitionId() {
return partitionId;
}

/** Raft server state. */
public enum State {
ACTIVE,
Expand Down
Expand Up @@ -181,6 +181,7 @@ private RaftServer buildServer() {

return RaftServer.builder(localMemberId)
.withName(partition.name())
.withPartitionId(partitionId)
.withMembershipService(membershipService)
.withProtocol(createServerProtocol())
.withPartitionConfig(partitionConfig)
Expand Down
Expand Up @@ -153,6 +153,7 @@ public RaftContext createRaftContext(final MemberId memberId, final Random rando
final var raft =
new RaftContext(
memberId.id() + "-partition-1",
1,
memberId,
mock(ClusterMembershipService.class),
new ControllableRaftServerProtocol(memberId, serverProtocols, messageQueue),
Expand Down
Expand Up @@ -80,14 +80,15 @@ public final class ExporterDirector extends Actor implements HealthMonitorable,
private final ExporterMode exporterMode;
private final Duration distributionInterval;
private ExporterPositionsDistributionService exporterDistributionService;
private final int partitionId;

public ExporterDirector(final ExporterDirectorContext context, final boolean shouldPauseOnStart) {
name = context.getName();
containers =
context.getDescriptors().stream().map(ExporterContainer::new).collect(Collectors.toList());

logStream = Objects.requireNonNull(context.getLogStream());
final var partitionId = logStream.getPartitionId();
partitionId = logStream.getPartitionId();
metrics = new ExporterMetrics(partitionId);
recordExporter = new RecordExporter(metrics, containers, partitionId);
exportingRetryStrategy = new BackOffRetryStrategy(actor, Duration.ofSeconds(10));
Expand Down Expand Up @@ -134,6 +135,13 @@ public ActorFuture<ExporterPhase> getPhase() {
return actor.call(() -> exporterPhase);
}

@Override
protected Map<String, String> createContext() {
final var context = super.createContext();
context.put(ACTOR_PROP_PARTITION_ID, Integer.toString(partitionId));
return context;
}

@Override
public String getName() {
return name;
Expand Down
Expand Up @@ -13,11 +13,13 @@
import io.camunda.zeebe.snapshots.PersistedSnapshotStore;
import io.camunda.zeebe.util.sched.Actor;
import java.util.Collection;
import java.util.Map;

public final class LogDeletionService extends Actor implements PersistedSnapshotListener {
private final LogCompactor logCompactor;
private final String actorName;
private final Collection<PersistedSnapshotStore> persistedSnapshotStores;
private final int partitionId;

public LogDeletionService(
final int nodeId,
Expand All @@ -27,6 +29,14 @@ public LogDeletionService(
this.persistedSnapshotStores = persistedSnapshotStores;
this.logCompactor = logCompactor;
actorName = buildActorName(nodeId, "DeletionService", partitionId);
this.partitionId = partitionId;
}

@Override
protected Map<String, String> createContext() {
final var context = super.createContext();
context.put(ACTOR_PROP_PARTITION_ID, Integer.toString(partitionId));
return context;
}

@Override
Expand Down

0 comments on commit 58844b2

Please sign in to comment.