Skip to content

Commit

Permalink
Ensure Raft heartbeat failure detector logs append times at semi-regu…
Browse files Browse the repository at this point in the history
…lar intervals.
  • Loading branch information
kuujo committed Nov 3, 2017
1 parent 16c03cc commit f500100
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 32 deletions.
Expand Up @@ -538,9 +538,9 @@ default CompletableFuture<RaftServer> listen(MemberId... cluster) {
abstract class Builder implements io.atomix.utils.Builder<RaftServer> { abstract class Builder implements io.atomix.utils.Builder<RaftServer> {
private static final Duration DEFAULT_ELECTION_TIMEOUT = Duration.ofMillis(750); private static final Duration DEFAULT_ELECTION_TIMEOUT = Duration.ofMillis(750);
private static final Duration DEFAULT_HEARTBEAT_INTERVAL = Duration.ofMillis(250); private static final Duration DEFAULT_HEARTBEAT_INTERVAL = Duration.ofMillis(250);
private static final int DEFAULT_ELECTION_THRESHOLD = 3; private static final int DEFAULT_ELECTION_THRESHOLD = 5;
private static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofMillis(5000); private static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofMillis(5000);
private static final int DEFAULT_SESSION_FAILURE_THRESHOLD = 5; private static final int DEFAULT_SESSION_FAILURE_THRESHOLD = 3;
private static final ThreadModel DEFAULT_THREAD_MODEL = ThreadModel.SHARED_THREAD_POOL; private static final ThreadModel DEFAULT_THREAD_MODEL = ThreadModel.SHARED_THREAD_POOL;
private static final int DEFAULT_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors(); private static final int DEFAULT_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors();


Expand Down
Expand Up @@ -17,13 +17,10 @@


import io.atomix.protocols.phi.PhiAccrualFailureDetector; import io.atomix.protocols.phi.PhiAccrualFailureDetector;
import io.atomix.protocols.raft.RaftServer; import io.atomix.protocols.raft.RaftServer;
import io.atomix.protocols.raft.cluster.MemberId;
import io.atomix.protocols.raft.cluster.impl.DefaultRaftMember; import io.atomix.protocols.raft.cluster.impl.DefaultRaftMember;
import io.atomix.protocols.raft.cluster.impl.RaftMemberContext; import io.atomix.protocols.raft.cluster.impl.RaftMemberContext;
import io.atomix.protocols.raft.impl.RaftContext; import io.atomix.protocols.raft.impl.RaftContext;
import io.atomix.protocols.raft.protocol.PollRequest; import io.atomix.protocols.raft.protocol.PollRequest;
import io.atomix.protocols.raft.protocol.VoteRequest;
import io.atomix.protocols.raft.protocol.VoteResponse;
import io.atomix.protocols.raft.storage.log.entry.RaftLogEntry; import io.atomix.protocols.raft.storage.log.entry.RaftLogEntry;
import io.atomix.protocols.raft.utils.Quorum; import io.atomix.protocols.raft.utils.Quorum;
import io.atomix.storage.journal.Indexed; import io.atomix.storage.journal.Indexed;
Expand All @@ -35,6 +32,7 @@
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors; import java.util.stream.Collectors;


/** /**
Expand All @@ -44,6 +42,7 @@ public final class FollowerRole extends ActiveRole {
private final PhiAccrualFailureDetector failureDetector = new PhiAccrualFailureDetector(); private final PhiAccrualFailureDetector failureDetector = new PhiAccrualFailureDetector();
private final Random random = new Random(); private final Random random = new Random();
private Scheduled heartbeatTimer; private Scheduled heartbeatTimer;
private Scheduled heartbeatTimeout;


public FollowerRole(RaftContext context) { public FollowerRole(RaftContext context) {
super(context); super(context);
Expand All @@ -65,41 +64,41 @@ public synchronized CompletableFuture<RaftRole> open() {
*/ */
private void startHeartbeatTimer() { private void startHeartbeatTimer() {
log.trace("Starting heartbeat timer"); log.trace("Starting heartbeat timer");
resetHeartbeatTimer(); AtomicLong lastHeartbeat = new AtomicLong();
heartbeatTimer = raft.getThreadContext().schedule(raft.getHeartbeatInterval(), () -> {
if (raft.getLastHeartbeatTime() > lastHeartbeat.get()) {
failureDetector.report(raft.getLastHeartbeatTime());
}
});
resetHeartbeatTimeout();
} }


/** /**
* Resets the heartbeat timer. * Resets the heartbeat timer.
*/ */
private void resetHeartbeatTimer() { private void resetHeartbeatTimeout() {
Duration delay = raft.getHeartbeatInterval().dividedBy(2) Duration delay = raft.getHeartbeatInterval().dividedBy(2)
.plus(Duration.ofMillis(random.nextInt((int) raft.getHeartbeatInterval().dividedBy(2).toMillis()))); .plus(Duration.ofMillis(random.nextInt((int) raft.getHeartbeatInterval().dividedBy(2).toMillis())));
heartbeatTimer = raft.getThreadContext().schedule(delay, () -> { heartbeatTimeout = raft.getThreadContext().schedule(delay, () -> {
if (isOpen()) { if (isOpen()) {
if (System.currentTimeMillis() - raft.getLastHeartbeatTime() > raft.getElectionTimeout().toMillis() || failureDetector.phi() >= raft.getElectionThreshold()) { if (System.currentTimeMillis() - raft.getLastHeartbeatTime() > raft.getElectionTimeout().toMillis() || failureDetector.phi() >= raft.getElectionThreshold()) {
log.debug("Heartbeat timed out in {}", System.currentTimeMillis() - raft.getLastHeartbeatTime()); log.debug("Heartbeat timed out in {}", System.currentTimeMillis() - raft.getLastHeartbeatTime());
sendPollRequests(); sendPollRequests();
} else { } else {
resetHeartbeatTimer(); resetHeartbeatTimeout();
} }
} }
}); });
} }


@Override
protected boolean updateTermAndLeader(long term, MemberId leader) {
failureDetector.report();
return super.updateTermAndLeader(term, leader);
}

/** /**
* Polls all members of the cluster to determine whether this member should transition to the CANDIDATE state. * Polls all members of the cluster to determine whether this member should transition to the CANDIDATE state.
*/ */
private void sendPollRequests() { private void sendPollRequests() {
// Set a new timer within which other nodes must respond in order for this node to transition to candidate. // Set a new timer within which other nodes must respond in order for this node to transition to candidate.
heartbeatTimer = raft.getThreadContext().schedule(raft.getElectionTimeout(), () -> { heartbeatTimeout = raft.getThreadContext().schedule(raft.getElectionTimeout(), () -> {
log.debug("Failed to poll a majority of the cluster in {}", raft.getElectionTimeout()); log.debug("Failed to poll a majority of the cluster in {}", raft.getElectionTimeout());
resetHeartbeatTimer(); resetHeartbeatTimeout();
}); });


// Create a quorum that will track the number of nodes that have responded to the poll request. // Create a quorum that will track the number of nodes that have responded to the poll request.
Expand All @@ -120,7 +119,7 @@ private void sendPollRequests() {
raft.setLeader(null); raft.setLeader(null);
raft.transition(RaftServer.Role.CANDIDATE); raft.transition(RaftServer.Role.CANDIDATE);
} else { } else {
resetHeartbeatTimer(); resetHeartbeatTimeout();
} }
}); });


Expand Down Expand Up @@ -164,7 +163,7 @@ private void sendPollRequests() {
log.debug("Received rejected poll from {}", member); log.debug("Received rejected poll from {}", member);
if (leader != null && response.term() == raft.getTerm() && member.memberId().equals(leader.memberId())) { if (leader != null && response.term() == raft.getTerm() && member.memberId().equals(leader.memberId())) {
quorum.cancel(); quorum.cancel();
resetHeartbeatTimer(); resetHeartbeatTimeout();
} else { } else {
quorum.fail(); quorum.fail();
} }
Expand All @@ -181,29 +180,22 @@ private void sendPollRequests() {
} }
} }


@Override
protected VoteResponse handleVote(VoteRequest request) {
// Reset the heartbeat timeout if we voted for another candidate.
VoteResponse response = super.handleVote(request);
if (response.voted()) {
failureDetector.report();
}
return response;
}

/** /**
* Cancels the heartbeat timer. * Cancels the heartbeat timer.
*/ */
private void cancelHeartbeatTimer() { private void cancelHeartbeatTimers() {
if (heartbeatTimer != null) { if (heartbeatTimer != null) {
log.trace("Cancelling heartbeat timer");
heartbeatTimer.cancel(); heartbeatTimer.cancel();
} }
if (heartbeatTimeout != null) {
log.trace("Cancelling heartbeat timer");
heartbeatTimeout.cancel();
}
} }


@Override @Override
public synchronized CompletableFuture<Void> close() { public synchronized CompletableFuture<Void> close() {
return super.close().thenRun(this::cancelHeartbeatTimer); return super.close().thenRun(this::cancelHeartbeatTimers);
} }


} }

0 comments on commit f500100

Please sign in to comment.