Skip to content

Commit

Permalink
KAFKA-14142; Expose kraft high watermark update time in quorum command
Browse files Browse the repository at this point in the history
  • Loading branch information
hachikuji committed Aug 23, 2022
1 parent add4ca6 commit 1a805f9
Show file tree
Hide file tree
Showing 12 changed files with 445 additions and 249 deletions.
Expand Up @@ -4357,12 +4357,26 @@ private QuorumInfo.ReplicaState translateReplicaState(DescribeQuorumResponseData
}

private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) {
OptionalLong highWatermarkUpdateTimeMs = partition.highWatermarkUpdateTimeMs() >= 0 ?
OptionalLong.of(partition.highWatermarkUpdateTimeMs()) :
OptionalLong.empty();

List<QuorumInfo.ReplicaState> voters = partition.currentVoters().stream()
.map(this::translateReplicaState)
.collect(Collectors.toList());

List<QuorumInfo.ReplicaState> observers = partition.observers().stream()
.map(this::translateReplicaState)
.collect(Collectors.toList());

return new QuorumInfo(
partition.leaderId(),
partition.leaderEpoch(),
partition.highWatermark(),
partition.currentVoters().stream().map(v -> translateReplicaState(v)).collect(Collectors.toList()),
partition.observers().stream().map(o -> translateReplicaState(o)).collect(Collectors.toList()));
partition.leaderId(),
partition.leaderEpoch(),
partition.highWatermark(),
highWatermarkUpdateTimeMs,
voters,
observers
);
}

@Override
Expand Down
Expand Up @@ -24,32 +24,45 @@
* This class is used to describe the state of the quorum received in DescribeQuorumResponse.
*/
public class QuorumInfo {
private final Integer leaderId;
private final Integer leaderEpoch;
private final Long highWatermark;
private final int leaderId;
private final long leaderEpoch;
private final long highWatermark;
private final OptionalLong highWatermarkUpdateTimeMs;
private final List<ReplicaState> voters;
private final List<ReplicaState> observers;

QuorumInfo(Integer leaderId, Integer leaderEpoch, Long highWatermark, List<ReplicaState> voters, List<ReplicaState> observers) {
QuorumInfo(
int leaderId,
long leaderEpoch,
long highWatermark,
OptionalLong highWatermarkUpdateTimeMs,
List<ReplicaState> voters,
List<ReplicaState> observers
) {
this.leaderId = leaderId;
this.leaderEpoch = leaderEpoch;
this.highWatermark = highWatermark;
this.highWatermarkUpdateTimeMs = highWatermarkUpdateTimeMs;
this.voters = voters;
this.observers = observers;
}

public Integer leaderId() {
public int leaderId() {
return leaderId;
}

public Integer leaderEpoch() {
public long leaderEpoch() {
return leaderEpoch;
}

public Long highWatermark() {
public long highWatermark() {
return highWatermark;
}

public OptionalLong highWatermarkUpdateTimeMs() {
return highWatermarkUpdateTimeMs;
}

public List<ReplicaState> voters() {
return voters;
}
Expand All @@ -63,20 +76,26 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
QuorumInfo that = (QuorumInfo) o;
return leaderId.equals(that.leaderId)
&& voters.equals(that.voters)
&& observers.equals(that.observers);
return leaderId == that.leaderId
&& leaderEpoch == that.leaderEpoch
&& highWatermark == that.highWatermark
&& Objects.equals(highWatermarkUpdateTimeMs, that.highWatermarkUpdateTimeMs)
&& Objects.equals(voters, that.voters)
&& Objects.equals(observers, that.observers);
}

@Override
public int hashCode() {
return Objects.hash(leaderId, voters, observers);
return Objects.hash(leaderId, leaderEpoch, highWatermark, highWatermarkUpdateTimeMs, voters, observers);
}

@Override
public String toString() {
return "QuorumInfo(" +
"leaderId=" + leaderId +
", leaderEpoch=" + leaderEpoch +
", highWatermark=" + highWatermark +
", highWatermarkUpdateTimeMs=" + highWatermarkUpdateTimeMs +
", voters=" + voters +
", observers=" + observers +
')';
Expand Down
Expand Up @@ -18,15 +18,13 @@

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -85,23 +83,15 @@ public static DescribeQuorumResponseData singletonErrorResponse(
}


public static DescribeQuorumResponseData singletonResponse(TopicPartition topicPartition,
int leaderId,
int leaderEpoch,
long highWatermark,
List<ReplicaState> voterStates,
List<ReplicaState> observerStates) {
public static DescribeQuorumResponseData singletonResponse(
TopicPartition topicPartition,
DescribeQuorumResponseData.PartitionData partitionData
) {
return new DescribeQuorumResponseData()
.setTopics(Collections.singletonList(new DescribeQuorumResponseData.TopicData()
.setTopicName(topicPartition.topic())
.setPartitions(Collections.singletonList(new DescribeQuorumResponseData.PartitionData()
.setPartitionIndex(topicPartition.partition())
.setErrorCode(Errors.NONE.code())
.setLeaderId(leaderId)
.setLeaderEpoch(leaderEpoch)
.setHighWatermark(highWatermark)
.setCurrentVoters(voterStates)
.setObservers(observerStates)))));
.setPartitions(Collections.singletonList(partitionData
.setPartitionIndex(topicPartition.partition())))));
}

public static DescribeQuorumResponse parse(ByteBuffer buffer, short version) {
Expand Down
Expand Up @@ -38,7 +38,9 @@
"about": "The latest known leader epoch"},
{ "name": "HighWatermark", "type": "int64", "versions": "0+"},
{ "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+" },
{ "name": "Observers", "type": "[]ReplicaState", "versions": "0+" }
{ "name": "Observers", "type": "[]ReplicaState", "versions": "0+" },
{ "name": "HighWatermarkUpdateTimeMs", "type": "int64", "default": -1, "tag": 0,
"taggedVersions": "1+", "ignorable": true, "about": "The wall clock time that the high watermark was last updated on the leader" }
]}
]}],
"commonStructs": [
Expand Down
Expand Up @@ -607,8 +607,8 @@ private static ApiVersionsResponse prepareApiVersionsResponseForDescribeFeatures
.setErrorCode(error.code()));
}

private static QuorumInfo defaultQuorumInfo(Boolean emptyOptionals) {
return new QuorumInfo(1, 1, 1L,
private static QuorumInfo defaultQuorumInfo(boolean emptyOptionals) {
return new QuorumInfo(1, 1, 1L, OptionalLong.empty(),
singletonList(new QuorumInfo.ReplicaState(1, 100,
emptyOptionals ? OptionalLong.empty() : OptionalLong.of(1000),
emptyOptionals ? OptionalLong.empty() : OptionalLong.of(1000))),
Expand Down Expand Up @@ -637,8 +637,8 @@ private static DescribeQuorumResponse prepareDescribeQuorumResponse(
replica.setLastCaughtUpTimestamp(emptyOptionals ? -1 : 1000);
partitions.add(new DescribeQuorumResponseData.PartitionData().setPartitionIndex(partitionIndex)
.setLeaderId(1)
.setLeaderEpoch(0)
.setHighWatermark(0)
.setLeaderEpoch(1)
.setHighWatermark(1)
.setCurrentVoters(singletonList(replica))
.setObservers(singletonList(replica))
.setErrorCode(partitionLevelError.code()));
Expand Down
17 changes: 9 additions & 8 deletions core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala
Expand Up @@ -158,14 +158,15 @@ object MetadataQuorumCommand {
-1
}
println(
s"""|ClusterId: $clusterId
|LeaderId: ${quorumInfo.leaderId}
|LeaderEpoch: ${quorumInfo.leaderEpoch}
|HighWatermark: ${quorumInfo.highWatermark}
|MaxFollowerLag: $maxFollowerLag
|MaxFollowerLagTimeMs: $maxFollowerLagTimeMs
|CurrentVoters: ${quorumInfo.voters.asScala.map(_.replicaId).mkString("[", ",", "]")}
|CurrentObservers: ${quorumInfo.observers.asScala.map(_.replicaId).mkString("[", ",", "]")}
s"""|ClusterId: $clusterId
|LeaderId: ${quorumInfo.leaderId}
|LeaderEpoch: ${quorumInfo.leaderEpoch}
|HighWatermark: ${quorumInfo.highWatermark}
|HighWatermarkUpdateTimeMs: ${quorumInfo.highWatermarkUpdateTimeMs}
|MaxFollowerLag: $maxFollowerLag
|MaxFollowerLagTimeMs: $maxFollowerLagTimeMs
|CurrentVoters: ${quorumInfo.voters.asScala.map(_.replicaId).mkString("[", ",", "]")}
|CurrentObservers: ${quorumInfo.observers.asScala.map(_.replicaId).mkString("[", ",", "]")}
|""".stripMargin
)
}
Expand Down
11 changes: 4 additions & 7 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Expand Up @@ -1014,7 +1014,7 @@ private FetchResponseData tryCompleteFetchRequest(
if (validOffsetAndEpoch.kind() == ValidOffsetAndEpoch.Kind.VALID) {
LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED);

if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata, log.endOffset().offset)) {
if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata)) {
onUpdateLeaderHighWatermark(state, currentTimeMs);
}

Expand Down Expand Up @@ -1176,12 +1176,9 @@ private DescribeQuorumResponseData handleDescribeQuorumRequest(
}

LeaderState<T> leaderState = quorum.leaderStateOrThrow();
return DescribeQuorumResponse.singletonResponse(log.topicPartition(),
leaderState.localId(),
leaderState.epoch(),
leaderState.highWatermark().isPresent() ? leaderState.highWatermark().get().offset : -1,
leaderState.quorumResponseVoterStates(currentTimeMs),
leaderState.quorumResponseObserverStates(currentTimeMs)
return DescribeQuorumResponse.singletonResponse(
log.topicPartition(),
leaderState.describeQuorum(currentTimeMs)
);
}

Expand Down

0 comments on commit 1a805f9

Please sign in to comment.