Skip to content

Commit

Permalink
MINOR: A few cleanups for DescribeQuorum APIs (#12548)
Browse files Browse the repository at this point in the history
A few small cleanups in the `DescribeQuorum` API and handling logic:

- Change field types in `QuorumInfo`:
  - `leaderId`: `Integer` -> `int`
  - `leaderEpoch`: `Integer` -> `long` (to allow for type expansion in the future)
  - `highWatermark`: `Long` -> `long`
- Use field names `lastFetchTimestamp` and `lastCaughtUpTimestamp` consistently
- Move construction of `DescribeQuorumResponseData.PartitionData` into `LeaderState`
- Consolidate fetch time/offset update logic into `LeaderState.ReplicaState.updateFollowerState`

Reviewers: Luke Chen <showuon@gmail.com>, José Armando García Sancio <jsancio@users.noreply.github.com>
  • Loading branch information
hachikuji committed Aug 24, 2022
1 parent 0507597 commit 5c52c61
Show file tree
Hide file tree
Showing 11 changed files with 456 additions and 279 deletions.
Expand Up @@ -4355,12 +4355,21 @@ private QuorumInfo.ReplicaState translateReplicaState(DescribeQuorumResponseData
}

private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) {
List<QuorumInfo.ReplicaState> voters = partition.currentVoters().stream()
.map(this::translateReplicaState)
.collect(Collectors.toList());

List<QuorumInfo.ReplicaState> observers = partition.observers().stream()
.map(this::translateReplicaState)
.collect(Collectors.toList());

return new QuorumInfo(
partition.leaderId(),
partition.leaderEpoch(),
partition.highWatermark(),
partition.currentVoters().stream().map(v -> translateReplicaState(v)).collect(Collectors.toList()),
partition.observers().stream().map(o -> translateReplicaState(o)).collect(Collectors.toList()));
partition.leaderId(),
partition.leaderEpoch(),
partition.highWatermark(),
voters,
observers
);
}

@Override
Expand Down
Expand Up @@ -24,29 +24,35 @@
* This class is used to describe the state of the quorum received in DescribeQuorumResponse.
*/
public class QuorumInfo {
private final Integer leaderId;
private final Integer leaderEpoch;
private final Long highWatermark;
private final int leaderId;
private final long leaderEpoch;
private final long highWatermark;
private final List<ReplicaState> voters;
private final List<ReplicaState> observers;

QuorumInfo(Integer leaderId, Integer leaderEpoch, Long highWatermark, List<ReplicaState> voters, List<ReplicaState> observers) {
QuorumInfo(
int leaderId,
long leaderEpoch,
long highWatermark,
List<ReplicaState> voters,
List<ReplicaState> observers
) {
this.leaderId = leaderId;
this.leaderEpoch = leaderEpoch;
this.highWatermark = highWatermark;
this.voters = voters;
this.observers = observers;
}

public Integer leaderId() {
public int leaderId() {
return leaderId;
}

public Integer leaderEpoch() {
public long leaderEpoch() {
return leaderEpoch;
}

public Long highWatermark() {
public long highWatermark() {
return highWatermark;
}

Expand All @@ -63,20 +69,24 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
QuorumInfo that = (QuorumInfo) o;
return leaderId.equals(that.leaderId)
&& voters.equals(that.voters)
&& observers.equals(that.observers);
return leaderId == that.leaderId
&& leaderEpoch == that.leaderEpoch
&& highWatermark == that.highWatermark
&& Objects.equals(voters, that.voters)
&& Objects.equals(observers, that.observers);
}

@Override
public int hashCode() {
return Objects.hash(leaderId, voters, observers);
return Objects.hash(leaderId, leaderEpoch, highWatermark, voters, observers);
}

@Override
public String toString() {
return "QuorumInfo(" +
"leaderId=" + leaderId +
", leaderEpoch=" + leaderEpoch +
", highWatermark=" + highWatermark +
", voters=" + voters +
", observers=" + observers +
')';
Expand All @@ -85,8 +95,8 @@ public String toString() {
public static class ReplicaState {
private final int replicaId;
private final long logEndOffset;
private final OptionalLong lastFetchTimeMs;
private final OptionalLong lastCaughtUpTimeMs;
private final OptionalLong lastFetchTimestamp;
private final OptionalLong lastCaughtUpTimestamp;

ReplicaState() {
this(0, 0, OptionalLong.empty(), OptionalLong.empty());
Expand All @@ -95,13 +105,13 @@ public static class ReplicaState {
ReplicaState(
int replicaId,
long logEndOffset,
OptionalLong lastFetchTimeMs,
OptionalLong lastCaughtUpTimeMs
OptionalLong lastFetchTimestamp,
OptionalLong lastCaughtUpTimestamp
) {
this.replicaId = replicaId;
this.logEndOffset = logEndOffset;
this.lastFetchTimeMs = lastFetchTimeMs;
this.lastCaughtUpTimeMs = lastCaughtUpTimeMs;
this.lastFetchTimestamp = lastFetchTimestamp;
this.lastCaughtUpTimestamp = lastCaughtUpTimestamp;
}

/**
Expand All @@ -121,19 +131,21 @@ public long logEndOffset() {
}

/**
* Return the lastFetchTime in milliseconds for this replica.
* Return the last millisecond timestamp that the leader received a
* fetch from this replica.
* @return The value of the lastFetchTime if known, empty otherwise
*/
public OptionalLong lastFetchTimeMs() {
return lastFetchTimeMs;
public OptionalLong lastFetchTimestamp() {
return lastFetchTimestamp;
}

/**
* Return the lastCaughtUpTime in milliseconds for this replica.
* Return the last millisecond timestamp at which this replica was known to be
* caught up with the leader.
* @return The value of the lastCaughtUpTime if known, empty otherwise
*/
public OptionalLong lastCaughtUpTimeMs() {
return lastCaughtUpTimeMs;
public OptionalLong lastCaughtUpTimestamp() {
return lastCaughtUpTimestamp;
}

@Override
Expand All @@ -143,22 +155,22 @@ public boolean equals(Object o) {
ReplicaState that = (ReplicaState) o;
return replicaId == that.replicaId
&& logEndOffset == that.logEndOffset
&& lastFetchTimeMs.equals(that.lastFetchTimeMs)
&& lastCaughtUpTimeMs.equals(that.lastCaughtUpTimeMs);
&& lastFetchTimestamp.equals(that.lastFetchTimestamp)
&& lastCaughtUpTimestamp.equals(that.lastCaughtUpTimestamp);
}

@Override
public int hashCode() {
return Objects.hash(replicaId, logEndOffset, lastFetchTimeMs, lastCaughtUpTimeMs);
return Objects.hash(replicaId, logEndOffset, lastFetchTimestamp, lastCaughtUpTimestamp);
}

@Override
public String toString() {
return "ReplicaState(" +
"replicaId=" + replicaId +
", logEndOffset=" + logEndOffset +
", lastFetchTimeMs=" + lastFetchTimeMs +
", lastCaughtUpTimeMs=" + lastCaughtUpTimeMs +
", lastFetchTimestamp=" + lastFetchTimestamp +
", lastCaughtUpTimestamp=" + lastCaughtUpTimestamp +
')';
}
}
Expand Down
Expand Up @@ -18,15 +18,13 @@

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -85,23 +83,15 @@ public static DescribeQuorumResponseData singletonErrorResponse(
}


public static DescribeQuorumResponseData singletonResponse(TopicPartition topicPartition,
int leaderId,
int leaderEpoch,
long highWatermark,
List<ReplicaState> voterStates,
List<ReplicaState> observerStates) {
public static DescribeQuorumResponseData singletonResponse(
TopicPartition topicPartition,
DescribeQuorumResponseData.PartitionData partitionData
) {
return new DescribeQuorumResponseData()
.setTopics(Collections.singletonList(new DescribeQuorumResponseData.TopicData()
.setTopicName(topicPartition.topic())
.setPartitions(Collections.singletonList(new DescribeQuorumResponseData.PartitionData()
.setPartitionIndex(topicPartition.partition())
.setErrorCode(Errors.NONE.code())
.setLeaderId(leaderId)
.setLeaderEpoch(leaderEpoch)
.setHighWatermark(highWatermark)
.setCurrentVoters(voterStates)
.setObservers(observerStates)))));
.setPartitions(Collections.singletonList(partitionData
.setPartitionIndex(topicPartition.partition())))));
}

public static DescribeQuorumResponse parse(ByteBuffer buffer, short version) {
Expand Down
Expand Up @@ -644,7 +644,7 @@ private static ApiVersionsResponse prepareApiVersionsResponseForDescribeFeatures
.setErrorCode(error.code()));
}

private static QuorumInfo defaultQuorumInfo(Boolean emptyOptionals) {
private static QuorumInfo defaultQuorumInfo(boolean emptyOptionals) {
return new QuorumInfo(1, 1, 1L,
singletonList(new QuorumInfo.ReplicaState(1, 100,
emptyOptionals ? OptionalLong.empty() : OptionalLong.of(1000),
Expand Down Expand Up @@ -674,8 +674,8 @@ private static DescribeQuorumResponse prepareDescribeQuorumResponse(
replica.setLastCaughtUpTimestamp(emptyOptionals ? -1 : 1000);
partitions.add(new DescribeQuorumResponseData.PartitionData().setPartitionIndex(partitionIndex)
.setLeaderId(1)
.setLeaderEpoch(0)
.setHighWatermark(0)
.setLeaderEpoch(1)
.setHighWatermark(1)
.setCurrentVoters(singletonList(replica))
.setObservers(singletonList(replica))
.setErrorCode(partitionLevelError.code()));
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala
Expand Up @@ -127,13 +127,13 @@ object MetadataQuorumCommand {
Array(info.replicaId,
info.logEndOffset,
leader.logEndOffset - info.logEndOffset,
info.lastFetchTimeMs.orElse(-1),
info.lastCaughtUpTimeMs.orElse(-1),
info.lastFetchTimestamp.orElse(-1),
info.lastCaughtUpTimestamp.orElse(-1),
status
).map(_.toString)
}
prettyPrintTable(
Array("NodeId", "LogEndOffset", "Lag", "LastFetchTimeMs", "LastCaughtUpTimeMs", "Status"),
Array("NodeId", "LogEndOffset", "Lag", "LastFetchTimestamp", "LastCaughtUpTimestamp", "Status"),
(convertQuorumInfo(Seq(leader), "Leader")
++ convertQuorumInfo(quorumInfo.voters.asScala.filter(_.replicaId != leaderId).toSeq, "Follower")
++ convertQuorumInfo(quorumInfo.observers.asScala.toSeq, "Observer")).asJava,
Expand All @@ -152,8 +152,8 @@ object MetadataQuorumCommand {
val maxFollowerLagTimeMs =
if (leader == maxLagFollower) {
0
} else if (leader.lastCaughtUpTimeMs.isPresent && maxLagFollower.lastCaughtUpTimeMs.isPresent) {
leader.lastCaughtUpTimeMs.getAsLong - maxLagFollower.lastCaughtUpTimeMs.getAsLong
} else if (leader.lastCaughtUpTimestamp.isPresent && maxLagFollower.lastCaughtUpTimestamp.isPresent) {
leader.lastCaughtUpTimestamp.getAsLong - maxLagFollower.lastCaughtUpTimestamp.getAsLong
} else {
-1
}
Expand Down
Expand Up @@ -810,16 +810,16 @@ class KRaftClusterTest {
quorumInfo.voters.forEach { voter =>
assertTrue(0 < voter.logEndOffset,
s"logEndOffset for voter with ID ${voter.replicaId} was ${voter.logEndOffset}")
assertNotEquals(OptionalLong.empty(), voter.lastFetchTimeMs)
assertNotEquals(OptionalLong.empty(), voter.lastCaughtUpTimeMs)
assertNotEquals(OptionalLong.empty(), voter.lastFetchTimestamp)
assertNotEquals(OptionalLong.empty(), voter.lastCaughtUpTimestamp)
}

assertEquals(cluster.brokers.asScala.keySet, quorumInfo.observers.asScala.map(_.replicaId).toSet)
quorumInfo.observers.forEach { observer =>
assertTrue(0 < observer.logEndOffset,
s"logEndOffset for observer with ID ${observer.replicaId} was ${observer.logEndOffset}")
assertNotEquals(OptionalLong.empty(), observer.lastFetchTimeMs)
assertNotEquals(OptionalLong.empty(), observer.lastCaughtUpTimeMs)
assertNotEquals(OptionalLong.empty(), observer.lastFetchTimestamp)
assertNotEquals(OptionalLong.empty(), observer.lastCaughtUpTimestamp)
}
} finally {
admin.close()
Expand Down
13 changes: 5 additions & 8 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Expand Up @@ -275,7 +275,7 @@ private void updateLeaderEndOffsetAndTimestamp(
) {
final LogOffsetMetadata endOffsetMetadata = log.endOffset();

if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
if (state.updateLocalState(endOffsetMetadata)) {
onUpdateLeaderHighWatermark(state, currentTimeMs);
}

Expand Down Expand Up @@ -1014,7 +1014,7 @@ private FetchResponseData tryCompleteFetchRequest(
if (validOffsetAndEpoch.kind() == ValidOffsetAndEpoch.Kind.VALID) {
LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED);

if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata, log.endOffset().offset)) {
if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata)) {
onUpdateLeaderHighWatermark(state, currentTimeMs);
}

Expand Down Expand Up @@ -1176,12 +1176,9 @@ private DescribeQuorumResponseData handleDescribeQuorumRequest(
}

LeaderState<T> leaderState = quorum.leaderStateOrThrow();
return DescribeQuorumResponse.singletonResponse(log.topicPartition(),
leaderState.localId(),
leaderState.epoch(),
leaderState.highWatermark().isPresent() ? leaderState.highWatermark().get().offset : -1,
leaderState.quorumResponseVoterStates(currentTimeMs),
leaderState.quorumResponseObserverStates(currentTimeMs)
return DescribeQuorumResponse.singletonResponse(
log.topicPartition(),
leaderState.describeQuorum(currentTimeMs)
);
}

Expand Down

0 comments on commit 5c52c61

Please sign in to comment.