Skip to content

Commit

Permalink
Expose heartbeat timestamps in RaftNodeReport
Browse files Browse the repository at this point in the history
This commit adds a few methods to RaftNodeReport to expose a few timestamps.
- Followers and learners expose the timestamp of the latest heartbeat they
receive from the leader.
- Leader exposes timestamps of heartbeats it receives from followers and
learners.
- Leader also exposes a concept called quorum timestamp. See javadoc for info.
  • Loading branch information
metanet committed Feb 13, 2023
1 parent 8c90e31 commit 917edc6
Show file tree
Hide file tree
Showing 10 changed files with 216 additions and 32 deletions.
34 changes: 27 additions & 7 deletions microraft/src/main/java/io/microraft/impl/RaftNodeImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -769,9 +770,19 @@ public CompletableFuture<Ordered<RaftNodeReport>> takeSnapshot() {
}

private RaftNodeReportImpl newReport(RaftNodeReportReason reason) {
Map<RaftEndpoint, Long> heartbeatTimestamps = state.leaderState() != null
? state.leaderState().responseTimestamps()
: Collections.emptyMap();
Optional<Long> quorumTimestamp = getQuorumHeartbeatTimestamp();
// non-empty if this node is not leader and received at least one heartbeat from
// the leader.
Optional<Long> leaderHeartbeatTimestamp = (quorumTimestamp.isEmpty() && this.lastLeaderHeartbeatTimestamp > 0)
? Optional.of(Math.min(this.lastLeaderHeartbeatTimestamp, clock.millis()))
: Optional.empty();

return new RaftNodeReportImpl(requireNonNull(reason), groupId, state.localEndpoint(), state.initialMembers(),
state.committedGroupMembers(), state.effectiveGroupMembers(), state.role(), status, state.termState(),
newLogReport());
newLogReport(), heartbeatTimestamps, quorumTimestamp, leaderHeartbeatTimestamp);
}

private RaftLogStatsImpl newLogReport() {
Expand Down Expand Up @@ -1760,23 +1771,32 @@ public RaftModelFactory getModelFactory() {
}

public boolean demoteToFollowerIfQuorumHeartbeatTimeoutElapsed() {
LeaderState leaderState = state.leaderState();
if (leaderState == null) {
Optional<Long> quorumTimestamp = getQuorumHeartbeatTimestamp();
if (quorumTimestamp.isEmpty()) {
return true;
}

long quorumTimestamp = leaderState.quorumResponseTimestamp(state.logReplicationQuorumSize());
boolean demoteToFollower = isLeaderHeartbeatTimeoutElapsed(quorumTimestamp);
boolean demoteToFollower = isLeaderHeartbeatTimeoutElapsed(quorumTimestamp.get());
if (demoteToFollower) {
LOGGER.warn("{} Demoting to {} since not received append entries responses from majority recently.",
localEndpointStr, FOLLOWER);
LOGGER.warn(
"{} Demoting to {} since not received append entries responses from majority recently. Latest quorum timestamp: {}",
localEndpointStr, FOLLOWER, quorumTimestamp.get());
toFollower(state.term());
invalidateFuturesFrom(state.commitIndex() + 1, new IndeterminateStateException());
}

return demoteToFollower;
}

private Optional<Long> getQuorumHeartbeatTimestamp() {
LeaderState leaderState = state.leaderState();
if (leaderState == null) {
return Optional.empty();
}

return Optional.of(leaderState.quorumResponseTimestamp(state.logReplicationQuorumSize(), clock.millis()));
}

/**
* Completes futures with the given exception for indices smaller than or equal
* to the given index. Note that the given index is inclusive.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

import static java.util.Objects.requireNonNull;

import java.util.Map;
import java.util.Optional;

import javax.annotation.Nonnull;

import io.microraft.RaftEndpoint;
Expand All @@ -43,10 +46,15 @@ public final class RaftNodeReportImpl implements RaftNodeReport {
private final RaftNodeStatus status;
private final RaftTerm term;
private final RaftLogStats log;
private final Map<RaftEndpoint, Long> heartbeatTimestamps;
private final Optional<Long> quorumHeartbeatTimestamp;
private final Optional<Long> leaderHeartbeatTimestamp;

public RaftNodeReportImpl(RaftNodeReportReason reason, Object groupId, RaftEndpoint localEndpoint,
RaftGroupMembers initialMembers, RaftGroupMembers committedMembers, RaftGroupMembers effectiveMembers,
RaftRole role, RaftNodeStatus status, RaftTerm term, RaftLogStats log) {
RaftRole role, RaftNodeStatus status, RaftTerm term, RaftLogStats log,
Map<RaftEndpoint, Long> heartbeatTimestamps, Optional<Long> quorumHeartbeatTimestamp,
Optional<Long> leaderHeartbeatTimestamp) {
this.reason = requireNonNull(reason);
this.groupId = requireNonNull(groupId);
this.localEndpoint = requireNonNull(localEndpoint);
Expand All @@ -57,6 +65,9 @@ public RaftNodeReportImpl(RaftNodeReportReason reason, Object groupId, RaftEndpo
this.status = requireNonNull(status);
this.term = requireNonNull(term);
this.log = requireNonNull(log);
this.heartbeatTimestamps = requireNonNull(heartbeatTimestamps);
this.quorumHeartbeatTimestamp = requireNonNull(quorumHeartbeatTimestamp);
this.leaderHeartbeatTimestamp = requireNonNull(leaderHeartbeatTimestamp);
}

@Nonnull
Expand Down Expand Up @@ -119,12 +130,33 @@ public RaftLogStats getLog() {
return log;
}

@Nonnull
@Override
public Map<RaftEndpoint, Long> getHeartbeatTimestamps() {
return heartbeatTimestamps;
}

@Nonnull
@Override
public Optional<Long> getQuorumHeartbeatTimestamp() {
return quorumHeartbeatTimestamp;
}

@Nonnull
@Override
public Optional<Long> getLeaderHeartbeatTimestamp() {
return leaderHeartbeatTimestamp;
}

@Override
public String toString() {
return "RaftNodeReport{" + "reason=" + reason + ", groupId=" + groupId + ", localEndpoint=" + localEndpoint
+ ", initialMembers=" + initialMembers + ", committedMembers=" + committedMembers
+ ", effectiveMembers=" + effectiveMembers + ", role=" + role + ", status=" + status + ", term=" + term
+ ", log=" + log + '}';
+ ", log=" + log + ", heartbeatTimestamps=" + heartbeatTimestamps + ", quorumHeartbeatTimestamp="
+ (quorumHeartbeatTimestamp.isPresent() ? quorumHeartbeatTimestamp.get() : "-")
+ ", leaderHeartbeatTimestamp="
+ (leaderHeartbeatTimestamp.isPresent() ? leaderHeartbeatTimestamp.get() : "-") + '}';
}

}
22 changes: 18 additions & 4 deletions microraft/src/main/java/io/microraft/impl/state/LeaderState.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

import io.microraft.RaftEndpoint;

Expand Down Expand Up @@ -154,18 +155,31 @@ public long flushedLogIndex() {
* Returns the earliest append entries response timestamp of the log replication
* quorum nodes.
*/
public long quorumResponseTimestamp(int quorumSize) {
public long quorumResponseTimestamp(int quorumSize, long localNodeTimestamp) {
long[] timestamps = new long[followerStates.size() + 1];
int i = 0;
// for the local RaftNode
timestamps[i] = Long.MAX_VALUE;
long maxFollowerTimestamp = 0;
for (FollowerState followerState : followerStates.values()) {
timestamps[++i] = followerState.responseTimestamp();
long followerTimestamp = followerState.responseTimestamp();
maxFollowerTimestamp = Math.max(maxFollowerTimestamp, followerTimestamp);
timestamps[i++] = followerTimestamp;
}
// this is for the local RaftNode. all timestmaps are local,
// hence local RaftNode's timestamp cannot be smaller than
// any of the follower timestamps.
timestamps[i] = Math.max(maxFollowerTimestamp, localNodeTimestamp);

Arrays.sort(timestamps);

return timestamps[timestamps.length - quorumSize];
}

/**
* Returns response timestamps of all followers.
*/
public Map<RaftEndpoint, Long> responseTimestamps() {
return followerStates.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().responseTimestamp()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,7 @@ public interface RaftLogStats {

/**
* Returns the indices of the last known appended Raft log entries on the
* followers.
* <p>
* This map is non-empty only for the leader Raft node. Followers return an
* empty map.
* followers. This map is non-empty only for the leader Raft node.
*
* @return the indices of the last known appended Raft log entries on the
* followers
Expand Down
42 changes: 41 additions & 1 deletion microraft/src/main/java/io/microraft/report/RaftNodeReport.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package io.microraft.report;

import java.util.Map;
import java.util.Optional;

import javax.annotation.Nonnull;

import io.microraft.RaftConfig;
Expand Down Expand Up @@ -104,7 +107,10 @@ public interface RaftNodeReport {
RaftGroupMembers getEffectiveMembers();

/**
* Returns the role of the Raft node in the current term.
* Returns the role of the Raft node in the current term. If the returned role
* is {@link RafRole#LEADER}, it means the local Raft node has received
* heartbeats from the majority in the last
* {@link RaftConfig#leaderHeartbeatTimeoutSecs} seconds.
*
* @return the role of the Raft node in the current term
*/
Expand Down Expand Up @@ -137,6 +143,40 @@ public interface RaftNodeReport {
@Nonnull
RaftLogStats getLog();

/**
* Returns timestamps of latest heartbeats sent by the non-leader nodes to the
* leader Raft node, including both {@link RaftRole#FOLLOWER} and
* {@link RaftRole#LEARNER} nodes. The leader node's RaftEndpoint is not present
* in the map. This map is returned non-empty only by the leader Raft node.
*
* @return timestamps of latest heartbeats sent by the non-leader nodes
*/
@Nonnull
Map<RaftEndpoint, Long> getHeartbeatTimestamps();

/**
* Returns earliest heartbeat timestamp of the replication quorum. This method
* returns a non-empty value only for the leader Raft node. For instance, this
* method returns 8 for the following heartbeat timestamps of 5 Raft nodes, A
* (leader) ts = -, B (follower) ts = 10, C (follower) ts = 8, D (follower) ts =
* 6, E (follower) ts = 4. Please note that {@link RaftRole#LEARNER} nodes and
* their heartbeats are excluded in quorum calculations.
*
* @return earliest heartbeat timestamp of the replication quorum
*/
@Nonnull
Optional<Long> getQuorumHeartbeatTimestamp();

/**
* Returns timestamp of the latest heartbeat received from the leader Raft node.
* This method returns a non-empty value only from a non-leader Raft node if it
* has ever received a heartbeat from the leader.
*
* @return timestamp of the latest heartbeat received from the leader Raft node
*/
@Nonnull
Optional<Long> getLeaderHeartbeatTimestamp();

/**
* Denotes the reason for a given report
*/
Expand Down
4 changes: 3 additions & 1 deletion microraft/src/main/java/io/microraft/report/RaftTerm.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ public interface RaftTerm {

/**
* Returns the known Raft leader endpoint in the current term, or null if
* unknown.
* unknown. For a non-leader Raft node, if this method returns a non-null Raft
* endpoint, it means the local Raft node has recently received a heartbeat from
* the leader.
*
* @return the known Raft leader endpoint in the current term, or null if
* unknown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import static org.junit.Assert.fail;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

Expand Down Expand Up @@ -130,6 +131,9 @@ public void when_newNodeJoinsAsFollower_then_itAppendsMissingEntries() {
assertThat(newNode.getEffectiveMembers().getMembers()).isEqualTo(leader.getEffectiveMembers().getMembers());
assertThat(newNode.getEffectiveMembers().getLogIndex()).isEqualTo(leader.getEffectiveMembers().getLogIndex());
assertThat(newNode.getEffectiveMembers().getMajorityQuorumSize()).isEqualTo(1 + majority(initialMemberCount));

Map<RaftEndpoint, Long> heartbeatTimestamps = leader.getReport().join().getResult().getHeartbeatTimestamps();
assertThat(heartbeatTimestamps).containsKey(newNode.getLocalEndpoint());
}

@Test(timeout = 300_000)
Expand Down Expand Up @@ -184,6 +188,9 @@ public void when_newNodeJoinsAsLearner_then_itAppendsMissingEntries() {
assertThat(newNode.getEffectiveMembers().getMembers()).isEqualTo(leader.getEffectiveMembers().getMembers());
assertThat(newNode.getEffectiveMembers().getLogIndex()).isEqualTo(leader.getEffectiveMembers().getLogIndex());
assertThat(newNode.getEffectiveMembers().getMajorityQuorumSize()).isEqualTo(majority(initialMemberCount));

Map<RaftEndpoint, Long> heartbeatTimestamps = leader.getReport().join().getResult().getHeartbeatTimestamps();
assertThat(heartbeatTimestamps).containsKey(newNode.getLocalEndpoint());
}

@Test(timeout = 300_000)
Expand Down Expand Up @@ -232,6 +239,10 @@ public void when_thereIsSingleLearner_then_followerIsAdded() {
eventually(() -> assertThat(getCommitIndex(newNode1)).isEqualTo(commitIndex));
eventually(() -> assertThat(getCommitIndex(newNode2)).isEqualTo(commitIndex));

Map<RaftEndpoint, Long> heartbeatTimestamps = leader.getReport().join().getResult().getHeartbeatTimestamps();
assertThat(heartbeatTimestamps).containsKey(newNode1.getLocalEndpoint());
assertThat(heartbeatTimestamps).containsKey(newNode2.getLocalEndpoint());

assertThat(newNode1.getReport().join().getResult().getRole()).isEqualTo(LEARNER);
assertThat(newNode2.getReport().join().getResult().getRole()).isEqualTo(FOLLOWER);

Expand Down Expand Up @@ -306,6 +317,10 @@ public void when_thereIsSingleLearner_then_secondLearnerIsAdded() {
eventually(() -> assertThat(getCommitIndex(newNode1)).isEqualTo(commitIndex));
eventually(() -> assertThat(getCommitIndex(newNode2)).isEqualTo(commitIndex));

Map<RaftEndpoint, Long> heartbeatTimestamps = leader.getReport().join().getResult().getHeartbeatTimestamps();
assertThat(heartbeatTimestamps).containsKey(newNode1.getLocalEndpoint());
assertThat(heartbeatTimestamps).containsKey(newNode2.getLocalEndpoint());

assertThat(newNode1.getReport().join().getResult().getRole()).isEqualTo(LEARNER);
assertThat(newNode2.getReport().join().getResult().getRole()).isEqualTo(LEARNER);

Expand Down Expand Up @@ -376,6 +391,11 @@ public void when_thereAreTwoLearners_then_followerIsAdded() {
eventually(() -> assertThat(getCommitIndex(newNode2)).isEqualTo(commitIndex));
eventually(() -> assertThat(getCommitIndex(newNode3)).isEqualTo(commitIndex));

Map<RaftEndpoint, Long> heartbeatTimestamps = leader.getReport().join().getResult().getHeartbeatTimestamps();
assertThat(heartbeatTimestamps).containsKey(newNode1.getLocalEndpoint());
assertThat(heartbeatTimestamps).containsKey(newNode2.getLocalEndpoint());
assertThat(heartbeatTimestamps).containsKey(newNode3.getLocalEndpoint());

assertThat(newNode1.getReport().join().getResult().getRole()).isEqualTo(LEARNER);
assertThat(newNode2.getReport().join().getResult().getRole()).isEqualTo(LEARNER);
assertThat(newNode3.getReport().join().getResult().getRole()).isEqualTo(FOLLOWER);
Expand Down

0 comments on commit 917edc6

Please sign in to comment.