Skip to content

Commit

Permalink
KAFKA-12161; Support raft observers with optional id (#9871)
Browse files Browse the repository at this point in the history
We would like to be able to use `KafkaRaftClient` for tooling/debugging use cases. For this, we need the localId to be optional so that the client can be used more like a consumer. This is already supported in the `Fetch` protocol by setting `replicaId=-1`, which the Raft implementation checks for. We just need to alter `QuorumState` so that the `localId` is optional. The main benefit of doing this is that it saves tools the need to generate an arbitrary id (which might cause conflicts given limited Int32 space) and it lets the leader avoid any local state for these observers (such as `ReplicaState` inside `LeaderState`).

Reviewers: Ismael Juma <ismael@juma.me.uk>, Boyang Chen <boyang@confluent.io>
  • Loading branch information
hachikuji committed Jan 15, 2021
1 parent b5c1073 commit 7ac0606
Show file tree
Hide file tree
Showing 11 changed files with 232 additions and 117 deletions.
15 changes: 9 additions & 6 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
*/
package kafka.raft

import java.util.OptionalInt

import java.io.File
import java.nio.file.Files
import java.util.concurrent.CompletableFuture

import kafka.log.{Log, LogConfig, LogManager}
import kafka.raft.KafkaRaftManager.RaftIoThread
import kafka.server.{BrokerTopicStats, KafkaConfig, KafkaServer, LogDirFailureChannel}
Expand All @@ -31,9 +37,6 @@ import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.raft.{FileBasedStateStore, KafkaRaftClient, RaftClient, RaftConfig, RaftRequest, RecordSerde}

import java.io.File
import java.nio.file.Files
import java.util.concurrent.CompletableFuture
import scala.jdk.CollectionConverters._

object KafkaRaftManager {
Expand Down Expand Up @@ -166,20 +169,20 @@ class KafkaRaftManager[T](
}

private def buildRaftClient(): KafkaRaftClient[T] = {

val expirationTimer = new SystemTimer("raft-expiration-executor")
val expirationService = new TimingWheelExpirationService(expirationTimer)
val quorumStateStore = new FileBasedStateStore(new File(dataDir, "quorum-state"))

new KafkaRaftClient(
recordSerde,
netChannel,
metadataLog,
new FileBasedStateStore(new File(dataDir, "quorum-state")),
quorumStateStore,
time,
metrics,
expirationService,
logContext,
nodeId
OptionalInt.of(nodeId)
)
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import java.util.concurrent.CompletableFuture

import kafka.raft.KafkaRaftManager.RaftIoThread
import org.apache.kafka.raft.KafkaRaftClient
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions._
import org.mockito.Mockito._
import org.junit.Test

class RaftManagerTest {

Expand Down
40 changes: 23 additions & 17 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private final LogContext logContext;
private final Time time;
private final int fetchMaxWaitMs;
private final int nodeId;
private final OptionalInt nodeId;
private final NetworkChannel channel;
private final ReplicatedLog log;
private final Random random;
Expand All @@ -162,6 +162,12 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private KafkaRaftMetrics kafkaRaftMetrics;
private RaftConfig raftConfig;

/**
* Create a new instance.
*
* Note that if the node ID is empty, then the the client will behave as a
* non-participating observer.
*/
public KafkaRaftClient(
RecordSerde<T> serde,
NetworkChannel channel,
Expand All @@ -171,7 +177,7 @@ public KafkaRaftClient(
Metrics metrics,
ExpirationService expirationService,
LogContext logContext,
int nodeId
OptionalInt nodeId
) {
this(serde,
channel,
Expand All @@ -188,7 +194,7 @@ public KafkaRaftClient(
new Random());
}

public KafkaRaftClient(
KafkaRaftClient(
RecordSerde<T> serde,
NetworkChannel channel,
RaftMessageQueue messageQueue,
Expand All @@ -199,7 +205,7 @@ public KafkaRaftClient(
Metrics metrics,
ExpirationService expirationService,
int fetchMaxWaitMs,
int nodeId,
OptionalInt nodeId,
LogContext logContext,
Random random
) {
Expand Down Expand Up @@ -515,7 +521,7 @@ private VoteResponseData buildVoteResponse(Errors partitionLevelError, boolean v
log.topicPartition(),
partitionLevelError,
quorum.epoch(),
quorum.leaderIdOrNil(),
quorum.leaderIdOrSentinel(),
voteGranted);
}

Expand Down Expand Up @@ -687,7 +693,7 @@ private BeginQuorumEpochResponseData buildBeginQuorumEpochResponse(Errors partit
log.topicPartition(),
partitionLevelError,
quorum.epoch(),
quorum.leaderIdOrNil());
quorum.leaderIdOrSentinel());
}

/**
Expand Down Expand Up @@ -770,7 +776,7 @@ private EndQuorumEpochResponseData buildEndQuorumEpochResponse(Errors partitionL
log.topicPartition(),
partitionLevelError,
quorum.epoch(),
quorum.leaderIdOrNil());
quorum.leaderIdOrSentinel());
}

/**
Expand Down Expand Up @@ -823,7 +829,7 @@ private long endEpochElectionBackoff(List<Integer> preferredSuccessors) {
// election backoff time based on strict exponential mechanism so that the most up-to-date
// voter has a higher chance to be elected. If the node's priority is highest, become
// candidate immediately instead of waiting for next poll.
int position = preferredSuccessors.indexOf(quorum.localId);
int position = preferredSuccessors.indexOf(quorum.localIdOrThrow());
if (position <= 0) {
return 0;
} else {
Expand Down Expand Up @@ -882,7 +888,7 @@ private FetchResponseData buildFetchResponse(

partitionData.currentLeader()
.setLeaderEpoch(quorum.epoch())
.setLeaderId(quorum.leaderIdOrNil());
.setLeaderId(quorum.leaderIdOrSentinel());

divergingEpoch.ifPresent(partitionData::setDivergingEpoch);
});
Expand Down Expand Up @@ -1073,7 +1079,7 @@ private boolean handleFetchResponse(

log.truncateToEndOffset(divergingOffsetAndEpoch).ifPresent(truncationOffset -> {
logger.info("Truncated to offset {} from Fetch response from leader {}",
truncationOffset, quorum.leaderIdOrNil());
truncationOffset, quorum.leaderIdOrSentinel());
});
} else if (partitionResponse.snapshotId().epoch() >= 0 ||
partitionResponse.snapshotId().endOffset() >= 0) {
Expand Down Expand Up @@ -1357,7 +1363,7 @@ List<ReplicaState> convertToReplicaStates(Map<Integer, Long> replicaEndOffsets)
private boolean hasConsistentLeader(int epoch, OptionalInt leaderId) {
// Only elected leaders are sent in the request/response header, so if we have an elected
// leaderId, it should be consistent with what is in the message.
if (leaderId.isPresent() && leaderId.getAsInt() == quorum.localId) {
if (leaderId.isPresent() && leaderId.getAsInt() == quorum.localIdOrSentinel()) {
// The response indicates that we should be the leader, so we verify that is the case
return quorum.isLeader();
} else {
Expand Down Expand Up @@ -1670,7 +1676,7 @@ private EndQuorumEpochRequestData buildEndQuorumEpochRequest(
return EndQuorumEpochRequest.singletonRequest(
log.topicPartition(),
quorum.epoch(),
quorum.localId,
quorum.localIdOrThrow(),
state.preferredSuccessors()
);
}
Expand All @@ -1694,7 +1700,7 @@ private BeginQuorumEpochRequestData buildBeginQuorumEpochRequest() {
return BeginQuorumEpochRequest.singletonRequest(
log.topicPartition(),
quorum.epoch(),
quorum.localId
quorum.localIdOrThrow()
);
}

Expand All @@ -1703,7 +1709,7 @@ private VoteRequestData buildVoteRequest() {
return VoteRequest.singletonRequest(
log.topicPartition(),
quorum.epoch(),
quorum.localId,
quorum.localIdOrThrow(),
endOffset.epoch,
endOffset.offset
);
Expand All @@ -1718,7 +1724,7 @@ private FetchRequestData buildFetchRequest() {
});
return request
.setMaxWaitMs(fetchMaxWaitMs)
.setReplicaId(quorum.localId);
.setReplicaId(quorum.localIdOrSentinel());
}

private long maybeSendAnyVoterFetch(long currentTimeMs) {
Expand Down Expand Up @@ -1749,15 +1755,15 @@ private FetchSnapshotRequestData buildFetchSnapshotRequest(OffsetAndEpoch snapsh
}
);

return request.setReplicaId(quorum.localId);
return request.setReplicaId(quorum.localIdOrSentinel());
}

private FetchSnapshotResponseData.PartitionSnapshot addQuorumLeader(
FetchSnapshotResponseData.PartitionSnapshot partitionSnapshot
) {
partitionSnapshot.currentLeader()
.setLeaderEpoch(quorum.epoch())
.setLeaderId(quorum.leaderIdOrNil());
.setLeaderId(quorum.leaderIdOrSentinel());

return partitionSnapshot;
}
Expand Down
52 changes: 34 additions & 18 deletions raft/src/main/java/org/apache/kafka/raft/QuorumState.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
*
*/
public class QuorumState {
public final int localId;
private final OptionalInt localId;
private final Time time;
private final Logger log;
private final QuorumStateStore store;
Expand All @@ -84,7 +84,7 @@ public class QuorumState {

private volatile EpochState state;

public QuorumState(int localId,
public QuorumState(OptionalInt localId,
Set<Integer> voters,
int electionTimeoutMs,
int fetchTimeoutMs,
Expand Down Expand Up @@ -125,9 +125,16 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IOException,
final EpochState initialState;
if (!election.voters().isEmpty() && !voters.equals(election.voters())) {
throw new IllegalStateException("Configured voter set: " + voters
+ " is different from the voter set read from the state file: " + election.voters() +
". Check if the quorum configuration is up to date, " +
"or wipe out the local state file if necessary");
+ " is different from the voter set read from the state file: " + election.voters()
+ ". Check if the quorum configuration is up to date, "
+ "or wipe out the local state file if necessary");
} else if (election.hasVoted() && !isVoter()) {
String localIdDescription = localId.isPresent() ?
localId.getAsInt() + " is not a voter" :
"is undefined";
throw new IllegalStateException("Initialized quorum state " + election
+ " with a voted candidate, which indicates this node was previously "
+ " a voter, but the local id " + localIdDescription);
} else if (election.epoch < logEndOffsetAndEpoch.epoch) {
log.warn("Epoch from quorum-state file is {}, which is " +
"smaller than last written epoch {} in the log",
Expand All @@ -139,7 +146,7 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IOException,
Optional.empty(),
randomElectionTimeoutMs()
);
} else if (election.isLeader(localId)) {
} else if (localId.isPresent() && election.isLeader(localId.getAsInt())) {
// If we were previously a leader, then we will start out as resigned
// in the same epoch. This serves two purposes:
// 1. It ensures that we cannot vote for another leader in the same epoch.
Expand All @@ -148,16 +155,16 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IOException,
// is lost after restarting.
initialState = new ResignedState(
time,
localId,
localId.getAsInt(),
election.epoch,
voters,
randomElectionTimeoutMs(),
Collections.emptyList()
);
} else if (election.isVotedCandidate(localId)) {
} else if (localId.isPresent() && election.isVotedCandidate(localId.getAsInt())) {
initialState = new CandidateState(
time,
localId,
localId.getAsInt(),
election.epoch,
voters,
Optional.empty(),
Expand Down Expand Up @@ -196,14 +203,22 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IOException,
}

public Set<Integer> remoteVoters() {
return voters.stream().filter(voterId -> voterId != localId).collect(Collectors.toSet());
return voters.stream().filter(voterId -> voterId != localIdOrSentinel()).collect(Collectors.toSet());
}

public int localIdOrSentinel() {
return localId.orElse(-1);
}

public int localIdOrThrow() {
return localId.orElseThrow(() -> new IllegalStateException("Required local id is not present"));
}

public int epoch() {
return state.epoch();
}

public int leaderIdOrNil() {
public int leaderIdOrSentinel() {
return leaderId().orElse(-1);
}

Expand All @@ -212,6 +227,7 @@ public Optional<LogOffsetMetadata> highWatermark() {
}

public OptionalInt leaderId() {

ElectionState election = state.election();
if (election.hasLeader())
return OptionalInt.of(state.election().leaderId());
Expand All @@ -224,11 +240,11 @@ public boolean hasLeader() {
}

public boolean hasRemoteLeader() {
return hasLeader() && leaderIdOrNil() != localId;
return hasLeader() && leaderIdOrSentinel() != localIdOrSentinel();
}

public boolean isVoter() {
return voters.contains(localId);
return localId.isPresent() && voters.contains(localId.getAsInt());
}

public boolean isVoter(int nodeId) {
Expand All @@ -249,7 +265,7 @@ public void transitionToResigned(List<Integer> preferredSuccessors) {
int epoch = state.epoch();
this.state = new ResignedState(
time,
localId,
localIdOrThrow(),
epoch,
voters,
randomElectionTimeoutMs(),
Expand Down Expand Up @@ -301,7 +317,7 @@ public void transitionToVoted(
int epoch,
int candidateId
) throws IOException {
if (candidateId == localId) {
if (localId.isPresent() && candidateId == localId.getAsInt()) {
throw new IllegalStateException("Cannot transition to Voted with votedId=" + candidateId +
" and epoch=" + epoch + " since it matches the local broker.id");
} else if (isObserver()) {
Expand Down Expand Up @@ -341,7 +357,7 @@ public void transitionToFollower(
int epoch,
int leaderId
) throws IOException {
if (leaderId == localId) {
if (localId.isPresent() && leaderId == localId.getAsInt()) {
throw new IllegalStateException("Cannot transition to Follower with leaderId=" + leaderId +
" and epoch=" + epoch + " since it matches the local broker.id=" + localId);
} else if (!isVoter(leaderId)) {
Expand Down Expand Up @@ -384,7 +400,7 @@ public void transitionToCandidate() throws IOException {

transitionTo(new CandidateState(
time,
localId,
localIdOrThrow(),
newEpoch,
voters,
state.highWatermark(),
Expand Down Expand Up @@ -417,7 +433,7 @@ public void transitionToLeader(long epochStartOffset) throws IOException {
// we typically expect the state machine to be caught up anyway.

transitionTo(new LeaderState(
localId,
localIdOrThrow(),
epoch(),
epochStartOffset,
voters,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public KafkaRaftMetrics(Metrics metrics, String metricGrpPrefix, QuorumState sta
this.currentVotedIdMetricName = metrics.metricName("current-vote", metricGroupName, "The current voted leader's id; -1 indicates not voted for anyone");
metrics.addMetric(this.currentVotedIdMetricName, (mConfig, currentTimeMs) -> {
if (state.isLeader() || state.isCandidate()) {
return state.localId;
return state.localIdOrThrow();
} else if (state.isVoted()) {
return state.votedStateOrThrow().votedId();
} else {
Expand Down
Loading

0 comments on commit 7ac0606

Please sign in to comment.