From 1a805f9c52f976dcad3a706df91fe36db4e68c0d Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Mon, 22 Aug 2022 16:32:42 -0700 Subject: [PATCH] KAFKA-14142; Expose kraft high watermark update time in quorum command --- .../kafka/clients/admin/KafkaAdminClient.java | 24 +- .../kafka/clients/admin/QuorumInfo.java | 41 ++- .../requests/DescribeQuorumResponse.java | 22 +- .../message/DescribeQuorumResponse.json | 4 +- .../clients/admin/KafkaAdminClientTest.java | 8 +- .../kafka/admin/MetadataQuorumCommand.scala | 17 +- .../apache/kafka/raft/KafkaRaftClient.java | 11 +- .../org/apache/kafka/raft/LeaderState.java | 260 +++++++++------- .../kafka/raft/KafkaRaftClientTest.java | 2 +- .../apache/kafka/raft/LeaderStateTest.java | 286 ++++++++++++------ .../kafka/raft/RaftClientTestContext.java | 17 +- .../raft/internals/KafkaRaftMetricsTest.java | 2 +- 12 files changed, 445 insertions(+), 249 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index e5df779b616e..a39178dda693 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -4357,12 +4357,26 @@ private QuorumInfo.ReplicaState translateReplicaState(DescribeQuorumResponseData } private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) { + OptionalLong highWatermarkUpdateTimeMs = partition.highWatermarkUpdateTimeMs() >= 0 ? + OptionalLong.of(partition.highWatermarkUpdateTimeMs()) : + OptionalLong.empty(); + + List voters = partition.currentVoters().stream() + .map(this::translateReplicaState) + .collect(Collectors.toList()); + + List observers = partition.observers().stream() + .map(this::translateReplicaState) + .collect(Collectors.toList()); + return new QuorumInfo( - partition.leaderId(), - partition.leaderEpoch(), - partition.highWatermark(), - partition.currentVoters().stream().map(v -> translateReplicaState(v)).collect(Collectors.toList()), - partition.observers().stream().map(o -> translateReplicaState(o)).collect(Collectors.toList())); + partition.leaderId(), + partition.leaderEpoch(), + partition.highWatermark(), + highWatermarkUpdateTimeMs, + voters, + observers + ); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java b/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java index 3a0b6cf6f749..78467de6586a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java @@ -24,32 +24,45 @@ * This class is used to describe the state of the quorum received in DescribeQuorumResponse. */ public class QuorumInfo { - private final Integer leaderId; - private final Integer leaderEpoch; - private final Long highWatermark; + private final int leaderId; + private final long leaderEpoch; + private final long highWatermark; + private final OptionalLong highWatermarkUpdateTimeMs; private final List voters; private final List observers; - QuorumInfo(Integer leaderId, Integer leaderEpoch, Long highWatermark, List voters, List observers) { + QuorumInfo( + int leaderId, + long leaderEpoch, + long highWatermark, + OptionalLong highWatermarkUpdateTimeMs, + List voters, + List observers + ) { this.leaderId = leaderId; this.leaderEpoch = leaderEpoch; this.highWatermark = highWatermark; + this.highWatermarkUpdateTimeMs = highWatermarkUpdateTimeMs; this.voters = voters; this.observers = observers; } - public Integer leaderId() { + public int leaderId() { return leaderId; } - public Integer leaderEpoch() { + public long leaderEpoch() { return leaderEpoch; } - public Long highWatermark() { + public long highWatermark() { return highWatermark; } + public OptionalLong highWatermarkUpdateTimeMs() { + return highWatermarkUpdateTimeMs; + } + public List voters() { return voters; } @@ -63,20 +76,26 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; QuorumInfo that = (QuorumInfo) o; - return leaderId.equals(that.leaderId) - && voters.equals(that.voters) - && observers.equals(that.observers); + return leaderId == that.leaderId + && leaderEpoch == that.leaderEpoch + && highWatermark == that.highWatermark + && Objects.equals(highWatermarkUpdateTimeMs, that.highWatermarkUpdateTimeMs) + && Objects.equals(voters, that.voters) + && Objects.equals(observers, that.observers); } @Override public int hashCode() { - return Objects.hash(leaderId, voters, observers); + return Objects.hash(leaderId, leaderEpoch, highWatermark, highWatermarkUpdateTimeMs, voters, observers); } @Override public String toString() { return "QuorumInfo(" + "leaderId=" + leaderId + + ", leaderEpoch=" + leaderEpoch + + ", highWatermark=" + highWatermark + + ", highWatermarkUpdateTimeMs=" + highWatermarkUpdateTimeMs + ", voters=" + voters + ", observers=" + observers + ')'; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java index 06ae681bc5c1..9f58e52970c4 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java @@ -18,7 +18,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.message.DescribeQuorumResponseData; -import org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; @@ -26,7 +25,6 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; /** @@ -85,23 +83,15 @@ public static DescribeQuorumResponseData singletonErrorResponse( } - public static DescribeQuorumResponseData singletonResponse(TopicPartition topicPartition, - int leaderId, - int leaderEpoch, - long highWatermark, - List voterStates, - List observerStates) { + public static DescribeQuorumResponseData singletonResponse( + TopicPartition topicPartition, + DescribeQuorumResponseData.PartitionData partitionData + ) { return new DescribeQuorumResponseData() .setTopics(Collections.singletonList(new DescribeQuorumResponseData.TopicData() .setTopicName(topicPartition.topic()) - .setPartitions(Collections.singletonList(new DescribeQuorumResponseData.PartitionData() - .setPartitionIndex(topicPartition.partition()) - .setErrorCode(Errors.NONE.code()) - .setLeaderId(leaderId) - .setLeaderEpoch(leaderEpoch) - .setHighWatermark(highWatermark) - .setCurrentVoters(voterStates) - .setObservers(observerStates))))); + .setPartitions(Collections.singletonList(partitionData + .setPartitionIndex(topicPartition.partition()))))); } public static DescribeQuorumResponse parse(ByteBuffer buffer, short version) { diff --git a/clients/src/main/resources/common/message/DescribeQuorumResponse.json b/clients/src/main/resources/common/message/DescribeQuorumResponse.json index 0ea6271238b2..4dee6a44cbfa 100644 --- a/clients/src/main/resources/common/message/DescribeQuorumResponse.json +++ b/clients/src/main/resources/common/message/DescribeQuorumResponse.json @@ -38,7 +38,9 @@ "about": "The latest known leader epoch"}, { "name": "HighWatermark", "type": "int64", "versions": "0+"}, { "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+" }, - { "name": "Observers", "type": "[]ReplicaState", "versions": "0+" } + { "name": "Observers", "type": "[]ReplicaState", "versions": "0+" }, + { "name": "HighWatermarkUpdateTimeMs", "type": "int64", "default": -1, "tag": 0, + "taggedVersions": "1+", "ignorable": true, "about": "The wall clock time that the high watermark was last updated on the leader" } ]} ]}], "commonStructs": [ diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 193457655a52..5bad7d94b1c9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -607,8 +607,8 @@ private static ApiVersionsResponse prepareApiVersionsResponseForDescribeFeatures .setErrorCode(error.code())); } - private static QuorumInfo defaultQuorumInfo(Boolean emptyOptionals) { - return new QuorumInfo(1, 1, 1L, + private static QuorumInfo defaultQuorumInfo(boolean emptyOptionals) { + return new QuorumInfo(1, 1, 1L, OptionalLong.empty(), singletonList(new QuorumInfo.ReplicaState(1, 100, emptyOptionals ? OptionalLong.empty() : OptionalLong.of(1000), emptyOptionals ? OptionalLong.empty() : OptionalLong.of(1000))), @@ -637,8 +637,8 @@ private static DescribeQuorumResponse prepareDescribeQuorumResponse( replica.setLastCaughtUpTimestamp(emptyOptionals ? -1 : 1000); partitions.add(new DescribeQuorumResponseData.PartitionData().setPartitionIndex(partitionIndex) .setLeaderId(1) - .setLeaderEpoch(0) - .setHighWatermark(0) + .setLeaderEpoch(1) + .setHighWatermark(1) .setCurrentVoters(singletonList(replica)) .setObservers(singletonList(replica)) .setErrorCode(partitionLevelError.code())); diff --git a/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala b/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala index b6e4e1597b55..c36e0021c53b 100644 --- a/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala +++ b/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala @@ -158,14 +158,15 @@ object MetadataQuorumCommand { -1 } println( - s"""|ClusterId: $clusterId - |LeaderId: ${quorumInfo.leaderId} - |LeaderEpoch: ${quorumInfo.leaderEpoch} - |HighWatermark: ${quorumInfo.highWatermark} - |MaxFollowerLag: $maxFollowerLag - |MaxFollowerLagTimeMs: $maxFollowerLagTimeMs - |CurrentVoters: ${quorumInfo.voters.asScala.map(_.replicaId).mkString("[", ",", "]")} - |CurrentObservers: ${quorumInfo.observers.asScala.map(_.replicaId).mkString("[", ",", "]")} + s"""|ClusterId: $clusterId + |LeaderId: ${quorumInfo.leaderId} + |LeaderEpoch: ${quorumInfo.leaderEpoch} + |HighWatermark: ${quorumInfo.highWatermark} + |HighWatermarkUpdateTimeMs: ${quorumInfo.highWatermarkUpdateTimeMs} + |MaxFollowerLag: $maxFollowerLag + |MaxFollowerLagTimeMs: $maxFollowerLagTimeMs + |CurrentVoters: ${quorumInfo.voters.asScala.map(_.replicaId).mkString("[", ",", "]")} + |CurrentObservers: ${quorumInfo.observers.asScala.map(_.replicaId).mkString("[", ",", "]")} |""".stripMargin ) } diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 69d2025b6bb4..97ddea5a2a6c 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -1014,7 +1014,7 @@ private FetchResponseData tryCompleteFetchRequest( if (validOffsetAndEpoch.kind() == ValidOffsetAndEpoch.Kind.VALID) { LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED); - if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata, log.endOffset().offset)) { + if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata)) { onUpdateLeaderHighWatermark(state, currentTimeMs); } @@ -1176,12 +1176,9 @@ private DescribeQuorumResponseData handleDescribeQuorumRequest( } LeaderState leaderState = quorum.leaderStateOrThrow(); - return DescribeQuorumResponse.singletonResponse(log.topicPartition(), - leaderState.localId(), - leaderState.epoch(), - leaderState.highWatermark().isPresent() ? leaderState.highWatermark().get().offset : -1, - leaderState.quorumResponseVoterStates(currentTimeMs), - leaderState.quorumResponseObserverStates(currentTimeMs) + return DescribeQuorumResponse.singletonResponse( + log.topicPartition(), + leaderState.describeQuorum(currentTimeMs) ); } diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 0b8ebad8bda7..46796fbfd6d2 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -17,23 +17,21 @@ package org.apache.kafka.raft; import org.apache.kafka.common.message.DescribeQuorumResponseData; -import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.raft.internals.BatchAccumulator; -import org.slf4j.Logger; - import org.apache.kafka.common.message.LeaderChangeMessage; import org.apache.kafka.common.message.LeaderChangeMessage.Voter; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.ControlRecordUtils; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.raft.internals.BatchAccumulator; +import org.slf4j.Logger; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.OptionalLong; import java.util.Set; import java.util.stream.Collectors; @@ -51,6 +49,7 @@ public class LeaderState implements EpochState { private final long epochStartOffset; private Optional highWatermark; + private long highWatermarkUpdateTimeMs; private final Map voterStates = new HashMap<>(); private final Map observerStates = new HashMap<>(); private final Set grantingVoters = new HashSet<>(); @@ -73,6 +72,7 @@ protected LeaderState( this.epoch = epoch; this.epochStartOffset = epochStartOffset; this.highWatermark = Optional.empty(); + this.highWatermarkUpdateTimeMs = -1; for (int voterId : voters) { boolean hasAcknowledgedLeader = voterId == localId; @@ -147,7 +147,7 @@ public Set nonAcknowledgingVoters() { return nonAcknowledging; } - private boolean updateHighWatermark() { + private boolean maybeUpdateHighWatermark(long currentTimeMs) { // Find the largest offset which is replicated to a majority of replicas (the leader counts) List followersByDescendingFetchOffset = followersByDescendingFetchOffset(); @@ -172,10 +172,9 @@ private boolean updateHighWatermark() { if (highWatermarkUpdateOffset > currentHighWatermarkMetadata.offset || (highWatermarkUpdateOffset == currentHighWatermarkMetadata.offset && !highWatermarkUpdateMetadata.metadata.equals(currentHighWatermarkMetadata.metadata))) { - highWatermark = highWatermarkUpdateOpt; - log.trace( - "High watermark updated to {} based on indexOfHw {} and voters {}", - highWatermark, + setHighWatermark( + currentTimeMs, + highWatermarkUpdateOpt, indexOfHw, followersByDescendingFetchOffset ); @@ -190,10 +189,9 @@ private boolean updateHighWatermark() { return false; } } else { - highWatermark = highWatermarkUpdateOpt; - log.trace( - "High watermark set to {} based on indexOfHw {} and voters {}", - highWatermark, + setHighWatermark( + currentTimeMs, + highWatermarkUpdateOpt, indexOfHw, followersByDescendingFetchOffset ); @@ -204,50 +202,84 @@ private boolean updateHighWatermark() { return false; } + private void setHighWatermark( + long currentTimeMs, + Optional newHighWatermark, + int indexOfHw, + List followersByDescendingFetchOffset + ) { + highWatermark = newHighWatermark; + highWatermarkUpdateTimeMs = currentTimeMs; + log.trace( + "High watermark set to {} based on indexOfHw {} and voters {}", + highWatermark, + indexOfHw, + followersByDescendingFetchOffset + ); + } + /** * Update the local replica state. * - * See {@link #updateReplicaState(int, long, LogOffsetMetadata, long)} + * @param currentTimeMs current time in milliseconds + * @param endOffsetMetadata updated log end offset of local replica + * @return true if the high watermark is updated as a result of this call */ - public boolean updateLocalState(long fetchTimestamp, LogOffsetMetadata logOffsetMetadata) { - return updateReplicaState(localId, fetchTimestamp, logOffsetMetadata, logOffsetMetadata.offset); + public boolean updateLocalState( + long currentTimeMs, + LogOffsetMetadata endOffsetMetadata + ) { + ReplicaState state = getOrCreateReplicaState(localId); + state.endOffset.ifPresent(currentEndOffset -> { + if (currentEndOffset.offset > endOffsetMetadata.offset) { + throw new IllegalStateException("Detected non-monotonic update of local " + + "end offset: " + currentEndOffset.offset + " -> " + endOffsetMetadata.offset); + } + }); + state.updateLeaderState(endOffsetMetadata); + return maybeUpdateHighWatermark(currentTimeMs); } /** * Update the replica state in terms of fetch time and log end offsets. * * @param replicaId replica id - * @param fetchTimestamp fetch timestamp - * @param logOffsetMetadata new log offset and metadata - * @param leaderLogEndOffset current log end offset of the leader - * @return true if the high watermark is updated too + * @param currentTimeMs current time in milliseconds + * @param fetchOffsetMetadata new log offset and metadata + * @return true if the high watermark is updated as a result of this call */ public boolean updateReplicaState( int replicaId, - long fetchTimestamp, - LogOffsetMetadata logOffsetMetadata, - long leaderLogEndOffset + long currentTimeMs, + LogOffsetMetadata fetchOffsetMetadata ) { // Ignore fetches from negative replica id, as it indicates // the fetch is from non-replica. For example, a consumer. if (replicaId < 0) { return false; + } else if (replicaId == localId) { + throw new IllegalStateException("Remote replica ID " + replicaId + " matches the local leader ID"); } - ReplicaState state = getReplicaState(replicaId); + ReplicaState state = getOrCreateReplicaState(replicaId); - // Only proceed with updating the states if the offset update is valid - verifyEndOffsetUpdate(state, logOffsetMetadata); + state.endOffset.ifPresent(currentEndOffset -> { + if (currentEndOffset.offset > fetchOffsetMetadata.offset) { + log.warn("Detected non-monotonic update of fetch offset from nodeId {}: {} -> {}", + state.nodeId, currentEndOffset.offset, fetchOffsetMetadata.offset); + } + }); - // Update the Last CaughtUp Time - if (logOffsetMetadata.offset >= leaderLogEndOffset) { - state.updateLastCaughtUpTimestamp(fetchTimestamp); - } else if (logOffsetMetadata.offset >= state.lastFetchLeaderLogEndOffset.orElse(-1L)) { - state.updateLastCaughtUpTimestamp(state.lastFetchTimestamp.orElse(-1L)); - } + Optional leaderEndOffsetOpt = + voterStates.get(localId).endOffset; - state.updateFetchTimestamp(fetchTimestamp, leaderLogEndOffset); - return updateEndOffset(state, logOffsetMetadata); + state.updateFollowerState( + currentTimeMs, + fetchOffsetMetadata, + leaderEndOffsetOpt + ); + + return isVoter(state.nodeId) && maybeUpdateHighWatermark(currentTimeMs); } public List nonLeaderVotersByDescendingFetchOffset() { @@ -263,31 +295,6 @@ private List followersByDescendingFetchOffset() { .collect(Collectors.toList()); } - private void verifyEndOffsetUpdate( - ReplicaState state, - LogOffsetMetadata endOffsetMetadata - ) { - state.endOffset.ifPresent(currentEndOffset -> { - if (currentEndOffset.offset > endOffsetMetadata.offset) { - if (state.nodeId == localId) { - throw new IllegalStateException("Detected non-monotonic update of local " + - "end offset: " + currentEndOffset.offset + " -> " + endOffsetMetadata.offset); - } else { - log.warn("Detected non-monotonic update of fetch offset from nodeId {}: {} -> {}", - state.nodeId, currentEndOffset.offset, endOffsetMetadata.offset); - } - } - }); - } - private boolean updateEndOffset( - ReplicaState state, - LogOffsetMetadata endOffsetMetadata - ) { - state.endOffset = Optional.of(endOffsetMetadata); - state.hasAcknowledgedLeader = true; - return isVoter(state.nodeId) && updateHighWatermark(); - } - public void addAcknowledgementFrom(int remoteNodeId) { ReplicaState voterState = ensureValidVoter(remoteNodeId); voterState.hasAcknowledgedLeader = true; @@ -304,7 +311,7 @@ public long epochStartOffset() { return epochStartOffset; } - private ReplicaState getReplicaState(int remoteNodeId) { + private ReplicaState getOrCreateReplicaState(int remoteNodeId) { ReplicaState state = voterStates.get(remoteNodeId); if (state == null) { observerStates.putIfAbsent(remoteNodeId, new ReplicaState(remoteNodeId, false)); @@ -313,43 +320,71 @@ private ReplicaState getReplicaState(int remoteNodeId) { return state; } - List quorumResponseVoterStates(long currentTimeMs) { - return quorumResponseReplicaStates(voterStates.values(), localId, currentTimeMs); + public DescribeQuorumResponseData.PartitionData describeQuorum(long currentTimeMs) { + clearInactiveObservers(currentTimeMs); + + return new DescribeQuorumResponseData.PartitionData() + .setErrorCode(Errors.NONE.code()) + .setLeaderId(localId) + .setLeaderEpoch(epoch) + .setHighWatermark(highWatermark().map(offsetMetadata -> offsetMetadata.offset).orElse(-1L)) + .setHighWatermarkUpdateTimeMs(highWatermarkUpdateTimeMs) + .setCurrentVoters(describeReplicaStates(voterStates, currentTimeMs)) + .setObservers(describeReplicaStates(observerStates, currentTimeMs)); } - List quorumResponseObserverStates(long currentTimeMs) { - clearInactiveObservers(currentTimeMs); - return quorumResponseReplicaStates(observerStates.values(), localId, currentTimeMs); + // Visible for testing + DescribeQuorumResponseData.ReplicaState describeVoterState( + int voterId, + long currentTimeMs + ) { + ReplicaState replicaState = voterStates.get(voterId); + return describeReplicaState(replicaState, currentTimeMs); } - private static List quorumResponseReplicaStates( - Collection state, - int leaderId, + // Visible for testing + DescribeQuorumResponseData.ReplicaState describeObserverState( + int observerId, long currentTimeMs ) { - return state.stream().map(s -> { - final long lastCaughtUpTimestamp; - final long lastFetchTimestamp; - if (s.nodeId == leaderId) { - lastCaughtUpTimestamp = currentTimeMs; - lastFetchTimestamp = currentTimeMs; - } else { - lastCaughtUpTimestamp = s.lastCaughtUpTimestamp.orElse(-1); - lastFetchTimestamp = s.lastFetchTimestamp.orElse(-1); - } - return new DescribeQuorumResponseData.ReplicaState() - .setReplicaId(s.nodeId) - .setLogEndOffset(s.endOffset.map(md -> md.offset).orElse(-1L)) - .setLastCaughtUpTimestamp(lastCaughtUpTimestamp) - .setLastFetchTimestamp(lastFetchTimestamp); - }).collect(Collectors.toList()); + ReplicaState replicaState = observerStates.get(observerId); + return describeReplicaState(replicaState, currentTimeMs); + } + + private List describeReplicaStates( + Map state, + long currentTimeMs + ) { + return state.values().stream() + .map(replicaState -> describeReplicaState(replicaState, currentTimeMs)) + .collect(Collectors.toList()); + } + + private DescribeQuorumResponseData.ReplicaState describeReplicaState( + ReplicaState replicaState, + long currentTimeMs + ) { + final long lastCaughtUpTimestamp; + final long lastFetchTimestamp; + if (replicaState.nodeId == localId) { + lastCaughtUpTimestamp = currentTimeMs; + lastFetchTimestamp = currentTimeMs; + } else { + lastCaughtUpTimestamp = replicaState.lastCaughtUpTimestamp; + lastFetchTimestamp = replicaState.lastFetchTimestamp; + } + return new DescribeQuorumResponseData.ReplicaState() + .setReplicaId(replicaState.nodeId) + .setLogEndOffset(replicaState.endOffset.map(md -> md.offset).orElse(-1L)) + .setLastCaughtUpTimestamp(lastCaughtUpTimestamp) + .setLastFetchTimestamp(lastFetchTimestamp); + } private void clearInactiveObservers(final long currentTimeMs) { - observerStates.entrySet().removeIf( - integerReplicaStateEntry -> - currentTimeMs - integerReplicaStateEntry.getValue().lastFetchTimestamp.orElse(-1) - >= OBSERVER_SESSION_TIMEOUT_MS); + observerStates.entrySet().removeIf(integerReplicaStateEntry -> + currentTimeMs - integerReplicaStateEntry.getValue().lastFetchTimestamp >= OBSERVER_SESSION_TIMEOUT_MS + ); } private boolean isVoter(int remoteNodeId) { @@ -359,31 +394,46 @@ private boolean isVoter(int remoteNodeId) { private static class ReplicaState implements Comparable { final int nodeId; Optional endOffset; - OptionalLong lastFetchTimestamp; - OptionalLong lastFetchLeaderLogEndOffset; - OptionalLong lastCaughtUpTimestamp; + long lastFetchTimestamp; + long lastFetchLeaderLogEndOffset; + long lastCaughtUpTimestamp; boolean hasAcknowledgedLeader; public ReplicaState(int nodeId, boolean hasAcknowledgedLeader) { this.nodeId = nodeId; this.endOffset = Optional.empty(); - this.lastFetchTimestamp = OptionalLong.empty(); - this.lastFetchLeaderLogEndOffset = OptionalLong.empty(); - this.lastCaughtUpTimestamp = OptionalLong.empty(); + this.lastFetchTimestamp = -1; + this.lastFetchLeaderLogEndOffset = -1; + this.lastCaughtUpTimestamp = -1; this.hasAcknowledgedLeader = hasAcknowledgedLeader; } - void updateFetchTimestamp(long currentFetchTimeMs, long leaderLogEndOffset) { - // To be resilient to system time shifts we do not strictly - // require the timestamp be monotonically increasing. - lastFetchTimestamp = OptionalLong.of(Math.max(lastFetchTimestamp.orElse(-1L), currentFetchTimeMs)); - lastFetchLeaderLogEndOffset = OptionalLong.of(leaderLogEndOffset); + void updateLeaderState( + LogOffsetMetadata endOffsetMetadata + ) { + // For the leader, we only update the end offset. The remaining fields + // (such as the caught up time) are determined implicitly. + this.endOffset = Optional.of(endOffsetMetadata); } - void updateLastCaughtUpTimestamp(long lastCaughtUpTime) { - // This value relies on the fetch timestamp which does not - // require monotonicity - lastCaughtUpTimestamp = OptionalLong.of(Math.max(lastCaughtUpTimestamp.orElse(-1L), lastCaughtUpTime)); + void updateFollowerState( + long currentTimeMs, + LogOffsetMetadata fetchOffsetMetadata, + Optional leaderEndOffsetOpt + ) { + leaderEndOffsetOpt.ifPresent(leaderEndOffset -> { + if (fetchOffsetMetadata.offset >= leaderEndOffset.offset) { + lastCaughtUpTimestamp = Math.max(lastCaughtUpTimestamp, currentTimeMs); + } else if (lastFetchLeaderLogEndOffset > 0 + && fetchOffsetMetadata.offset >= lastFetchLeaderLogEndOffset) { + lastCaughtUpTimestamp = Math.max(lastCaughtUpTimestamp, lastFetchTimestamp); + } + lastFetchLeaderLogEndOffset = leaderEndOffset.offset; + }); + + lastFetchTimestamp = Math.max(lastFetchTimestamp, currentTimeMs); + endOffset = Optional.of(fetchOffsetMetadata); + hasAcknowledgedLeader = true; } @Override diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index 678648505bfd..5a265c832ad4 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -2011,7 +2011,7 @@ public void testDescribeQuorum() throws Exception { context.deliverRequest(DescribeQuorumRequest.singletonRequest(context.metadataPartition)); context.pollUntilResponse(); - context.assertSentDescribeQuorumResponse(localId, epoch, 3L, + context.assertSentDescribeQuorumResponse(localId, epoch, 3L, closeFollowerFetchTime, Arrays.asList( new ReplicaState() .setReplicaId(localId) diff --git a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java index fa54d5cbc6b2..ff26fe7d788f 100644 --- a/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.raft.internals.BatchAccumulator; - import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -29,16 +28,13 @@ import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; -import static java.util.Collections.emptyList; import static java.util.Collections.emptySet; import static java.util.Collections.singleton; -import static org.apache.kafka.common.utils.Utils.mkEntry; -import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkSet; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -128,7 +124,7 @@ public void testLastCaughtUpTimeVoters() { int node2 = 2; int currentTime = 1000; int fetchTime = 0; - int caughtupTime = -1; + int caughtUpTime = -1; LeaderState state = newLeaderState(mkSet(localId, node1, node2), 10L); assertEquals(Optional.empty(), state.highWatermark()); assertFalse(state.updateLocalState(++fetchTime, new LogOffsetMetadata(10L))); @@ -136,41 +132,43 @@ public void testLastCaughtUpTimeVoters() { assertEquals(Optional.empty(), state.highWatermark()); // Node 1 falls behind - assertFalse(state.updateLocalState(++fetchTime, new LogOffsetMetadata(10L))); - assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(10L), 11L)); - assertEquals(currentTime, state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp()); - assertEquals(caughtupTime, state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp()); + assertFalse(state.updateLocalState(++fetchTime, new LogOffsetMetadata(11L))); + assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(10L))); + assertEquals(currentTime, state.describeVoterState(localId, currentTime).lastCaughtUpTimestamp()); + assertEquals(caughtUpTime, state.describeVoterState(node1, currentTime).lastCaughtUpTimestamp()); // Node 1 catches up to leader - assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(11L), 11L)); - caughtupTime = fetchTime; - assertEquals(currentTime, state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp()); - assertEquals(caughtupTime, state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp()); + assertTrue(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(11L))); + caughtUpTime = fetchTime; + assertEquals(currentTime, state.describeVoterState(localId, currentTime).lastCaughtUpTimestamp()); + assertEquals(caughtUpTime, state.describeVoterState(node1, currentTime).lastCaughtUpTimestamp()); // Node 1 falls behind - assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(50L), 100L)); - assertEquals(currentTime, state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp()); - assertEquals(caughtupTime, state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp()); + assertFalse(state.updateLocalState(++fetchTime, new LogOffsetMetadata(100L))); + assertTrue(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(50L))); + assertEquals(currentTime, state.describeVoterState(localId, currentTime).lastCaughtUpTimestamp()); + assertEquals(caughtUpTime, state.describeVoterState(node1, currentTime).lastCaughtUpTimestamp()); // Node 1 catches up to the last fetch offset int prevFetchTime = fetchTime; - assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(102L), 200L)); - caughtupTime = prevFetchTime; - assertEquals(currentTime, state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp()); - assertEquals(caughtupTime, state.quorumResponseVoterStates(currentTime).get(node1).lastCaughtUpTimestamp()); + assertFalse(state.updateLocalState(++fetchTime, new LogOffsetMetadata(200L))); + assertTrue(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(100L))); + caughtUpTime = prevFetchTime; + assertEquals(currentTime, state.describeVoterState(localId, currentTime).lastCaughtUpTimestamp()); + assertEquals(caughtUpTime, state.describeVoterState(node1, currentTime).lastCaughtUpTimestamp()); // Node2 has never caught up to leader - assertEquals(-1L, state.quorumResponseVoterStates(currentTime).get(node2).lastCaughtUpTimestamp()); - assertTrue(state.updateReplicaState(node2, ++fetchTime, new LogOffsetMetadata(202L), 300L)); - assertEquals(-1L, state.quorumResponseVoterStates(currentTime).get(node2).lastCaughtUpTimestamp()); - assertFalse(state.updateReplicaState(node2, ++fetchTime, new LogOffsetMetadata(250L), 300L)); - assertEquals(-1L, state.quorumResponseVoterStates(currentTime).get(node2).lastCaughtUpTimestamp()); + assertEquals(-1L, state.describeVoterState(node2, currentTime).lastCaughtUpTimestamp()); + assertFalse(state.updateLocalState(++fetchTime, new LogOffsetMetadata(300L))); + assertTrue(state.updateReplicaState(node2, ++fetchTime, new LogOffsetMetadata(200L))); + assertEquals(-1L, state.describeVoterState(node2, currentTime).lastCaughtUpTimestamp()); + assertTrue(state.updateReplicaState(node2, ++fetchTime, new LogOffsetMetadata(250L))); + assertEquals(-1L, state.describeVoterState(node2, currentTime).lastCaughtUpTimestamp()); } @Test public void testLastCaughtUpTimeObserver() { - int node1Index = 0; - int node1Id = 1; + int node1 = 1; int currentTime = 1000; int fetchTime = 0; int caughtUpTime = -1; @@ -179,34 +177,36 @@ public void testLastCaughtUpTimeObserver() { assertEquals(emptySet(), state.nonAcknowledgingVoters()); // Node 1 falls behind - assertTrue(state.updateLocalState(++fetchTime, new LogOffsetMetadata(10L))); - assertFalse(state.updateReplicaState(node1Id, ++fetchTime, new LogOffsetMetadata(10L), 11L)); - assertEquals(currentTime, state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp()); - assertEquals(caughtUpTime, state.quorumResponseObserverStates(currentTime).get(node1Index).lastCaughtUpTimestamp()); + assertTrue(state.updateLocalState(++fetchTime, new LogOffsetMetadata(11L))); + assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(10L))); + assertEquals(currentTime, state.describeVoterState(localId, currentTime).lastCaughtUpTimestamp()); + assertEquals(caughtUpTime, state.describeObserverState(node1, currentTime).lastCaughtUpTimestamp()); // Node 1 catches up to leader - assertFalse(state.updateReplicaState(node1Id, ++fetchTime, new LogOffsetMetadata(11L), 11L)); + assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(11L))); caughtUpTime = fetchTime; - assertEquals(currentTime, state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp()); - assertEquals(caughtUpTime, state.quorumResponseObserverStates(currentTime).get(node1Index).lastCaughtUpTimestamp()); + assertEquals(currentTime, state.describeVoterState(localId, currentTime).lastCaughtUpTimestamp()); + assertEquals(caughtUpTime, state.describeObserverState(node1, currentTime).lastCaughtUpTimestamp()); // Node 1 falls behind - assertFalse(state.updateReplicaState(node1Id, ++fetchTime, new LogOffsetMetadata(50L), 100L)); - assertEquals(currentTime, state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp()); - assertEquals(caughtUpTime, state.quorumResponseObserverStates(currentTime).get(node1Index).lastCaughtUpTimestamp()); + assertTrue(state.updateLocalState(++fetchTime, new LogOffsetMetadata(100L))); + assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(50L))); + assertEquals(currentTime, state.describeVoterState(localId, currentTime).lastCaughtUpTimestamp()); + assertEquals(caughtUpTime, state.describeObserverState(node1, currentTime).lastCaughtUpTimestamp()); // Node 1 catches up to the last fetch offset int prevFetchTime = fetchTime; - assertFalse(state.updateReplicaState(node1Id, ++fetchTime, new LogOffsetMetadata(102L), 200L)); + assertTrue(state.updateLocalState(++fetchTime, new LogOffsetMetadata(200L))); + assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(102L))); caughtUpTime = prevFetchTime; - assertEquals(currentTime, state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp()); - assertEquals(caughtUpTime, state.quorumResponseObserverStates(currentTime).get(node1Index).lastCaughtUpTimestamp()); + assertEquals(currentTime, state.describeVoterState(localId, currentTime).lastCaughtUpTimestamp()); + assertEquals(caughtUpTime, state.describeObserverState(node1, currentTime).lastCaughtUpTimestamp()); // Node 1 catches up to leader - assertFalse(state.updateReplicaState(node1Id, ++fetchTime, new LogOffsetMetadata(202L), 200L)); + assertFalse(state.updateReplicaState(node1, ++fetchTime, new LogOffsetMetadata(200L))); caughtUpTime = fetchTime; - assertEquals(currentTime, state.quorumResponseVoterStates(currentTime).get(localId).lastCaughtUpTimestamp()); - assertEquals(caughtUpTime, state.quorumResponseObserverStates(currentTime).get(node1Index).lastCaughtUpTimestamp()); + assertEquals(currentTime, state.describeVoterState(localId, currentTime).lastCaughtUpTimestamp()); + assertEquals(caughtUpTime, state.describeObserverState(node1, currentTime).lastCaughtUpTimestamp()); } @Test @@ -239,12 +239,12 @@ public void testUpdateHighWatermarkQuorumSizeTwo() { assertFalse(state.updateLocalState(0, new LogOffsetMetadata(13L))); assertEquals(singleton(otherNodeId), state.nonAcknowledgingVoters()); assertEquals(Optional.empty(), state.highWatermark()); - assertFalse(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(10L), 11L)); + assertFalse(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(10L))); assertEquals(emptySet(), state.nonAcknowledgingVoters()); assertEquals(Optional.empty(), state.highWatermark()); - assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(11L), 12L)); + assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(11L))); assertEquals(Optional.of(new LogOffsetMetadata(11L)), state.highWatermark()); - assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(13L), 14L)); + assertTrue(state.updateReplicaState(otherNodeId, 0, new LogOffsetMetadata(13L))); assertEquals(Optional.of(new LogOffsetMetadata(13L)), state.highWatermark()); } @@ -256,19 +256,19 @@ public void testUpdateHighWatermarkQuorumSizeThree() { assertFalse(state.updateLocalState(0, new LogOffsetMetadata(15L))); assertEquals(mkSet(node1, node2), state.nonAcknowledgingVoters()); assertEquals(Optional.empty(), state.highWatermark()); - assertFalse(state.updateReplicaState(node1, 0, new LogOffsetMetadata(10L), 11L)); + assertFalse(state.updateReplicaState(node1, 0, new LogOffsetMetadata(10L))); assertEquals(singleton(node2), state.nonAcknowledgingVoters()); assertEquals(Optional.empty(), state.highWatermark()); - assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(10L), 11L)); + assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(10L))); assertEquals(emptySet(), state.nonAcknowledgingVoters()); assertEquals(Optional.empty(), state.highWatermark()); - assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L), 16L)); + assertTrue(state.updateReplicaState(node2, 0, new LogOffsetMetadata(15L))); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); assertFalse(state.updateLocalState(0, new LogOffsetMetadata(20L))); assertEquals(Optional.of(new LogOffsetMetadata(15L)), state.highWatermark()); - assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(20L), 21L)); + assertTrue(state.updateReplicaState(node1, 0, new LogOffsetMetadata(20L))); assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark()); - assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(20L), 21L)); + assertFalse(state.updateReplicaState(node2, 0, new LogOffsetMetadata(20L))); assertEquals(Optional.of(new LogOffsetMetadata(20L)), state.highWatermark()); } @@ -278,13 +278,13 @@ public void testNonMonotonicHighWatermarkUpdate() { int node1 = 1; LeaderState state = newLeaderState(mkSet(localId, node1), 0L); state.updateLocalState(time.milliseconds(), new LogOffsetMetadata(10L)); - state.updateReplicaState(node1, time.milliseconds(), new LogOffsetMetadata(10L), 11L); + state.updateReplicaState(node1, time.milliseconds(), new LogOffsetMetadata(10L)); assertEquals(Optional.of(new LogOffsetMetadata(10L)), state.highWatermark()); // Follower crashes and disk is lost. It fetches an earlier offset to rebuild state. // The leader will report an error in the logs, but will not let the high watermark rewind - assertFalse(state.updateReplicaState(node1, time.milliseconds(), new LogOffsetMetadata(5L), 11L)); - assertEquals(5L, state.quorumResponseVoterStates(time.milliseconds()).get(node1).logEndOffset()); + assertFalse(state.updateReplicaState(node1, time.milliseconds(), new LogOffsetMetadata(5L))); + assertEquals(5L, state.describeVoterState(node1, time.milliseconds()).logEndOffset()); assertEquals(Optional.of(new LogOffsetMetadata(10L)), state.highWatermark()); } @@ -302,21 +302,112 @@ public void testGetNonLeaderFollowersByFetchOffsetDescending() { } @Test - public void testGetVoterStates() { - int node1 = 1; - int node2 = 2; + public void testDescribeQuorumWithSingleVoter() { + MockTime time = new MockTime(); long leaderStartOffset = 10L; long leaderEndOffset = 15L; - LeaderState state = setUpLeaderAndFollowers(node1, node2, leaderStartOffset, leaderEndOffset); + LeaderState state = newLeaderState(mkSet(localId), leaderStartOffset); + + // Until we have updated local state, high watermark should be uninitialized + assertEquals(Optional.empty(), state.highWatermark()); + DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds()); + assertEquals(-1, partitionData.highWatermark()); + assertEquals(-1, partitionData.highWatermarkUpdateTimeMs()); + assertEquals(localId, partitionData.leaderId()); + assertEquals(epoch, partitionData.leaderEpoch()); + assertEquals(Collections.emptyList(), partitionData.observers()); + assertEquals(1, partitionData.currentVoters().size()); + assertEquals(new DescribeQuorumResponseData.ReplicaState() + .setReplicaId(localId) + .setLogEndOffset(-1) + .setLastFetchTimestamp(time.milliseconds()) + .setLastCaughtUpTimestamp(time.milliseconds()), + partitionData.currentVoters().get(0)); + + + // Now update the high watermark and verify that describe output + long highWatermarkUpdateTimeMs = time.milliseconds(); + assertTrue(state.updateLocalState(highWatermarkUpdateTimeMs, new LogOffsetMetadata(leaderEndOffset))); + assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)), state.highWatermark()); + + time.sleep(500); + + partitionData = state.describeQuorum(time.milliseconds()); + assertEquals(leaderEndOffset, partitionData.highWatermark()); + assertEquals(highWatermarkUpdateTimeMs, partitionData.highWatermarkUpdateTimeMs()); + assertEquals(localId, partitionData.leaderId()); + assertEquals(epoch, partitionData.leaderEpoch()); + assertEquals(Collections.emptyList(), partitionData.observers()); + assertEquals(1, partitionData.currentVoters().size()); + assertEquals(new DescribeQuorumResponseData.ReplicaState() + .setReplicaId(localId) + .setLogEndOffset(leaderEndOffset) + .setLastFetchTimestamp(time.milliseconds()) + .setLastCaughtUpTimestamp(time.milliseconds()), + partitionData.currentVoters().get(0)); + } + + @Test + public void testDescribeQuorumWithMultipleVoters() { + MockTime time = new MockTime(); + int activeFollowerId = 1; + int inactiveFollowerId = 2; + long leaderStartOffset = 10L; + long leaderEndOffset = 15L; - assertEquals(mkMap( - mkEntry(localId, leaderEndOffset), - mkEntry(node1, leaderStartOffset), - mkEntry(node2, leaderEndOffset) - ), state.quorumResponseVoterStates(0) - .stream() - .collect(Collectors.toMap(DescribeQuorumResponseData.ReplicaState::replicaId, DescribeQuorumResponseData.ReplicaState::logEndOffset))); + LeaderState state = newLeaderState(mkSet(localId, activeFollowerId, inactiveFollowerId), leaderStartOffset); + assertFalse(state.updateLocalState(time.milliseconds(), new LogOffsetMetadata(leaderEndOffset))); + assertEquals(Optional.empty(), state.highWatermark()); + + long activeFollowerFetchTimeMs = time.milliseconds(); + assertTrue(state.updateReplicaState(activeFollowerId, activeFollowerFetchTimeMs, new LogOffsetMetadata(leaderEndOffset))); + assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)), state.highWatermark()); + + time.sleep(500); + + DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds()); + assertEquals(leaderEndOffset, partitionData.highWatermark()); + assertEquals(activeFollowerFetchTimeMs, partitionData.highWatermarkUpdateTimeMs()); + assertEquals(localId, partitionData.leaderId()); + assertEquals(epoch, partitionData.leaderEpoch()); + assertEquals(Collections.emptyList(), partitionData.observers()); + + List voterStates = partitionData.currentVoters(); + assertEquals(3, voterStates.size()); + + DescribeQuorumResponseData.ReplicaState leaderState = voterStates.stream() + .filter(voterState -> voterState.replicaId() == localId) + .findFirst() + .orElseThrow(() -> new AssertionError("")); + assertEquals(new DescribeQuorumResponseData.ReplicaState() + .setReplicaId(localId) + .setLogEndOffset(leaderEndOffset) + .setLastFetchTimestamp(time.milliseconds()) + .setLastCaughtUpTimestamp(time.milliseconds()), + leaderState); + + DescribeQuorumResponseData.ReplicaState activeFollowerState = voterStates.stream() + .filter(voterState -> voterState.replicaId() == activeFollowerId) + .findFirst() + .orElseThrow(() -> new AssertionError("")); + assertEquals(new DescribeQuorumResponseData.ReplicaState() + .setReplicaId(activeFollowerId) + .setLogEndOffset(leaderEndOffset) + .setLastFetchTimestamp(activeFollowerFetchTimeMs) + .setLastCaughtUpTimestamp(activeFollowerFetchTimeMs), + activeFollowerState); + + DescribeQuorumResponseData.ReplicaState inactiveFollowerState = voterStates.stream() + .filter(voterState -> voterState.replicaId() == inactiveFollowerId) + .findFirst() + .orElseThrow(() -> new AssertionError("")); + assertEquals(new DescribeQuorumResponseData.ReplicaState() + .setReplicaId(inactiveFollowerId) + .setLogEndOffset(-1) + .setLastFetchTimestamp(-1) + .setLastCaughtUpTimestamp(-1), + inactiveFollowerState); } private LeaderState setUpLeaderAndFollowers(int follower1, @@ -326,35 +417,57 @@ private LeaderState setUpLeaderAndFollowers(int follower1, LeaderState state = newLeaderState(mkSet(localId, follower1, follower2), leaderStartOffset); state.updateLocalState(0, new LogOffsetMetadata(leaderEndOffset)); assertEquals(Optional.empty(), state.highWatermark()); - state.updateReplicaState(follower1, 0, new LogOffsetMetadata(leaderStartOffset), leaderEndOffset); - state.updateReplicaState(follower2, 0, new LogOffsetMetadata(leaderEndOffset), leaderEndOffset); + state.updateReplicaState(follower1, 0, new LogOffsetMetadata(leaderStartOffset)); + state.updateReplicaState(follower2, 0, new LogOffsetMetadata(leaderEndOffset)); return state; } @Test - public void testGetObserverStatesWithObserver() { + public void testDescribeQuorumWithObservers() { + MockTime time = new MockTime(); int observerId = 10; long epochStartOffset = 10L; LeaderState state = newLeaderState(mkSet(localId), epochStartOffset); - long timestamp = 20L; - assertFalse(state.updateReplicaState(observerId, timestamp, new LogOffsetMetadata(epochStartOffset), epochStartOffset + 10)); - - assertEquals(Collections.singletonMap(observerId, epochStartOffset), - state.quorumResponseObserverStates(timestamp) - .stream() - .collect(Collectors.toMap(DescribeQuorumResponseData.ReplicaState::replicaId, DescribeQuorumResponseData.ReplicaState::logEndOffset))); + long highWatermarkUpdateTime = time.milliseconds(); + assertTrue(state.updateLocalState(time.milliseconds(), new LogOffsetMetadata(epochStartOffset + 1))); + assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)), state.highWatermark()); + + time.sleep(500); + long observerFetchTimeMs = time.milliseconds(); + assertFalse(state.updateReplicaState(observerId, observerFetchTimeMs, new LogOffsetMetadata(epochStartOffset + 1))); + + time.sleep(500); + DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds()); + assertEquals(epochStartOffset + 1, partitionData.highWatermark()); + assertEquals(highWatermarkUpdateTime, partitionData.highWatermarkUpdateTimeMs()); + assertEquals(localId, partitionData.leaderId()); + assertEquals(epoch, partitionData.leaderEpoch()); + + List observerStates = partitionData.observers(); + assertEquals(1, observerStates.size()); + + DescribeQuorumResponseData.ReplicaState observerState = observerStates.get(0); + assertEquals(new DescribeQuorumResponseData.ReplicaState() + .setReplicaId(observerId) + .setLogEndOffset(epochStartOffset + 1) + .setLastFetchTimestamp(observerFetchTimeMs) + .setLastCaughtUpTimestamp(observerFetchTimeMs), + observerState); } @Test public void testNoOpForNegativeRemoteNodeId() { - int observerId = -1; + MockTime time = new MockTime(); + int replicaId = -1; long epochStartOffset = 10L; LeaderState state = newLeaderState(mkSet(localId), epochStartOffset); - assertFalse(state.updateReplicaState(observerId, 0, new LogOffsetMetadata(epochStartOffset), epochStartOffset + 10)); + assertFalse(state.updateReplicaState(replicaId, 0, new LogOffsetMetadata(epochStartOffset))); - assertEquals(emptyList(), state.quorumResponseObserverStates(10)); + DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds()); + List observerStates = partitionData.observers(); + assertEquals(Collections.emptyList(), observerStates); } @Test @@ -364,14 +477,17 @@ public void testObserverStateExpiration() { long epochStartOffset = 10L; LeaderState state = newLeaderState(mkSet(localId), epochStartOffset); - state.updateReplicaState(observerId, time.milliseconds(), new LogOffsetMetadata(epochStartOffset), epochStartOffset + 10); - assertEquals(singleton(observerId), - state.quorumResponseObserverStates(time.milliseconds()) - .stream().map(o -> o.replicaId()) - .collect(Collectors.toSet())); + state.updateReplicaState(observerId, time.milliseconds(), new LogOffsetMetadata(epochStartOffset)); + DescribeQuorumResponseData.PartitionData partitionData = state.describeQuorum(time.milliseconds()); + List observerStates = partitionData.observers(); + assertEquals(1, observerStates.size()); + + DescribeQuorumResponseData.ReplicaState observerState = observerStates.get(0); + assertEquals(observerId, observerState.replicaId()); time.sleep(LeaderState.OBSERVER_SESSION_TIMEOUT_MS); - assertEquals(emptyList(), state.quorumResponseObserverStates(time.milliseconds())); + partitionData = state.describeQuorum(time.milliseconds()); + assertEquals(Collections.emptyList(), partitionData.observers()); } @ParameterizedTest diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index b825fc8867a1..dd7f69046fe1 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -454,17 +454,24 @@ void assertSentDescribeQuorumResponse( int leaderId, int leaderEpoch, long highWatermark, + long highWatermarkUpdateTimeMs, List voterStates, List observerStates ) { DescribeQuorumResponseData response = collectDescribeQuorumResponse(); + + DescribeQuorumResponseData.PartitionData partitionData = new DescribeQuorumResponseData.PartitionData() + .setErrorCode(Errors.NONE.code()) + .setLeaderId(leaderId) + .setLeaderEpoch(leaderEpoch) + .setHighWatermark(highWatermark) + .setHighWatermarkUpdateTimeMs(highWatermarkUpdateTimeMs) + .setCurrentVoters(voterStates) + .setObservers(observerStates); DescribeQuorumResponseData expectedResponse = DescribeQuorumResponse.singletonResponse( metadataPartition, - leaderId, - leaderEpoch, - highWatermark, - voterStates, - observerStates); + partitionData + ); assertEquals(expectedResponse, response); } diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java index cc2700bb17a7..0d64eac1cc2e 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java @@ -103,7 +103,7 @@ public void shouldRecordVoterQuorumState() throws IOException { assertEquals((double) -1L, getMetric(metrics, "high-watermark").metricValue()); state.leaderStateOrThrow().updateLocalState(0, new LogOffsetMetadata(5L)); - state.leaderStateOrThrow().updateReplicaState(1, 0, new LogOffsetMetadata(5L), 6L); + state.leaderStateOrThrow().updateReplicaState(1, 0, new LogOffsetMetadata(5L)); assertEquals((double) 5L, getMetric(metrics, "high-watermark").metricValue()); state.transitionToFollower(2, 1);