Skip to content

Commit

Permalink
Implement failure detection for Raft leader elections.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Nov 3, 2017
1 parent 7c5c29c commit 336478f
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 60 deletions.
Expand Up @@ -538,7 +538,9 @@ default CompletableFuture<RaftServer> listen(MemberId... cluster) {
abstract class Builder implements io.atomix.utils.Builder<RaftServer> {
private static final Duration DEFAULT_ELECTION_TIMEOUT = Duration.ofMillis(750);
private static final Duration DEFAULT_HEARTBEAT_INTERVAL = Duration.ofMillis(250);
private static final int DEFAULT_ELECTION_THRESHOLD = 3;
private static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofMillis(5000);
private static final int DEFAULT_SESSION_FAILURE_THRESHOLD = 5;
private static final ThreadModel DEFAULT_THREAD_MODEL = ThreadModel.SHARED_THREAD_POOL;
private static final int DEFAULT_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors();

Expand All @@ -548,7 +550,9 @@ abstract class Builder implements io.atomix.utils.Builder<RaftServer> {
protected RaftStorage storage;
protected Duration electionTimeout = DEFAULT_ELECTION_TIMEOUT;
protected Duration heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
protected int electionThreshold = DEFAULT_ELECTION_THRESHOLD;
protected Duration sessionTimeout = DEFAULT_SESSION_TIMEOUT;
protected int sessionFailureThreshold = DEFAULT_SESSION_FAILURE_THRESHOLD;
protected final RaftServiceFactoryRegistry serviceRegistry = new RaftServiceFactoryRegistry();
protected ThreadModel threadModel = DEFAULT_THREAD_MODEL;
protected int threadPoolSize = DEFAULT_THREAD_POOL_SIZE;
Expand Down Expand Up @@ -660,6 +664,22 @@ public Builder withHeartbeatInterval(Duration heartbeatInterval) {
return this;
}

/**
* Sets the election failure detection threshold.
* <p>
* This is the phi value at which a follower will attempt to start a new election after not receiving any
* communication from the leader.
*
* @param electionThreshold the election failure detection threshold
* @return The Raft server builder
* @throws IllegalArgumentException if the threshold is not positive
*/
public Builder withElectionThreshold(int electionThreshold) {
checkArgument(electionThreshold > 0, "electionThreshold must be positive");
this.electionThreshold = electionThreshold;
return this;
}

/**
* Sets the Raft session timeout, returning the Raft configuration for method chaining.
*
Expand All @@ -676,6 +696,21 @@ public Builder withSessionTimeout(Duration sessionTimeout) {
return this;
}

/**
* Sets the session failure detection threshold.
* <p>
* The threshold is a phi value at which sessions will be expired if the leader cannot communicate with the client.
*
* @param sessionFailureThreshold the session failure threshold
* @return The Raft server builder.
* @throws IllegalArgumentException if the threshold is not positive
*/
public Builder withSessionFailureThreshold(int sessionFailureThreshold) {
checkArgument(sessionFailureThreshold > 0, "sessionFailureThreshold must be positive");
this.sessionFailureThreshold = sessionFailureThreshold;
return this;
}

/**
* Sets the server thread pool size.
*
Expand Down
Expand Up @@ -241,7 +241,9 @@ public RaftServer build() {
RaftContext raft = new RaftContext(name, localMemberId, protocol, storage, serviceRegistry, threadModel, threadPoolSize);
raft.setElectionTimeout(electionTimeout);
raft.setHeartbeatInterval(heartbeatInterval);
raft.setElectionThreshold(electionThreshold);
raft.setSessionTimeout(sessionTimeout);
raft.setSessionFailureThreshold(sessionFailureThreshold);

return new DefaultRaftServer(raft);
}
Expand Down
Expand Up @@ -99,8 +99,10 @@ public class RaftContext implements AutoCloseable {
private final ThreadContext compactionContext;
protected RaftRole role = new InactiveRole(this);
private Duration electionTimeout = Duration.ofMillis(500);
private Duration sessionTimeout = Duration.ofMillis(5000);
private Duration heartbeatInterval = Duration.ofMillis(150);
private int electionThreshold = 3;
private Duration sessionTimeout = Duration.ofMillis(5000);
private int sessionFailureThreshold = 5;
private volatile MemberId leader;
private volatile long term;
private MemberId lastVotedFor;
Expand Down Expand Up @@ -315,6 +317,24 @@ public Duration getHeartbeatInterval() {
return heartbeatInterval;
}

/**
* Sets the election threshold.
*
* @param electionThreshold the election threshold
*/
public void setElectionThreshold(int electionThreshold) {
this.electionThreshold = electionThreshold;
}

/**
* Returns the election threshold.
*
* @return the election threshold
*/
public int getElectionThreshold() {
return electionThreshold;
}

/**
* Returns the session timeout.
*
Expand All @@ -333,6 +353,24 @@ public void setSessionTimeout(Duration sessionTimeout) {
this.sessionTimeout = checkNotNull(sessionTimeout, "sessionTimeout cannot be null");
}

/**
* Returns the session failure threshold.
*
* @return the session failure threshold
*/
public int getSessionFailureThreshold() {
return sessionFailureThreshold;
}

/**
* Sets the session failure threshold.
*
* @param sessionFailureThreshold the session failure threshold
*/
public void setSessionFailureThreshold(int sessionFailureThreshold) {
this.sessionFailureThreshold = sessionFailureThreshold;
}

/**
* Sets the state leader.
*
Expand Down
Expand Up @@ -15,16 +15,12 @@
*/
package io.atomix.protocols.raft.roles;

import io.atomix.protocols.phi.PhiAccrualFailureDetector;
import io.atomix.protocols.raft.RaftServer;
import io.atomix.protocols.raft.cluster.MemberId;
import io.atomix.protocols.raft.cluster.impl.DefaultRaftMember;
import io.atomix.protocols.raft.cluster.impl.RaftMemberContext;
import io.atomix.protocols.raft.impl.RaftContext;
import io.atomix.protocols.raft.protocol.AppendRequest;
import io.atomix.protocols.raft.protocol.AppendResponse;
import io.atomix.protocols.raft.protocol.ConfigureRequest;
import io.atomix.protocols.raft.protocol.ConfigureResponse;
import io.atomix.protocols.raft.protocol.InstallRequest;
import io.atomix.protocols.raft.protocol.InstallResponse;
import io.atomix.protocols.raft.protocol.PollRequest;
import io.atomix.protocols.raft.protocol.VoteRequest;
import io.atomix.protocols.raft.protocol.VoteResponse;
Expand All @@ -45,6 +41,7 @@
* Follower state.
*/
public final class FollowerRole extends ActiveRole {
private final PhiAccrualFailureDetector failureDetector = new PhiAccrualFailureDetector();
private final Random random = new Random();
private Scheduled heartbeatTimer;

Expand All @@ -59,51 +56,50 @@ public RaftServer.Role role() {

@Override
public synchronized CompletableFuture<RaftRole> open() {
return super.open().thenRun(this::startHeartbeatTimeout).thenApply(v -> this);
raft.setLastHeartbeatTime();
return super.open().thenRun(this::startHeartbeatTimer).thenApply(v -> this);
}

/**
* Starts the heartbeat timer.
*/
private void startHeartbeatTimeout() {
private void startHeartbeatTimer() {
log.trace("Starting heartbeat timer");
resetHeartbeatTimeout();
resetHeartbeatTimer();
}

/**
* Resets the heartbeat timer.
*/
private void resetHeartbeatTimeout() {
raft.checkThread();
if (isClosed())
return;

// If a timer is already set, cancel the timer.
if (heartbeatTimer != null) {
heartbeatTimer.cancel();
}

// Set the election timeout in a semi-random fashion with the random range
// being election timeout and 2 * election timeout.
Duration delay = raft.getElectionTimeout().plus(Duration.ofMillis(random.nextInt((int) raft.getElectionTimeout().toMillis())));
private void resetHeartbeatTimer() {
Duration delay = raft.getHeartbeatInterval().dividedBy(2)
.plus(Duration.ofMillis(random.nextInt((int) raft.getHeartbeatInterval().dividedBy(2).toMillis())));
heartbeatTimer = raft.getThreadContext().schedule(delay, () -> {
heartbeatTimer = null;
if (isOpen()) {
raft.setLeader(null);
log.debug("Heartbeat timed out in {}", delay);
sendPollRequests();
if (System.currentTimeMillis() - raft.getLastHeartbeatTime() > raft.getElectionTimeout().toMillis() || failureDetector.phi() >= raft.getElectionThreshold()) {
log.debug("Heartbeat timed out in {}", System.currentTimeMillis() - raft.getLastHeartbeatTime());
sendPollRequests();
} else {
resetHeartbeatTimer();
}
}
});
}

@Override
protected boolean updateTermAndLeader(long term, MemberId leader) {
failureDetector.report();
return super.updateTermAndLeader(term, leader);
}

/**
* Polls all members of the cluster to determine whether this member should transition to the CANDIDATE state.
*/
private void sendPollRequests() {
// Set a new timer within which other nodes must respond in order for this node to transition to candidate.
heartbeatTimer = raft.getThreadContext().schedule(raft.getElectionTimeout(), () -> {
log.debug("Failed to poll a majority of the cluster in {}", raft.getElectionTimeout());
resetHeartbeatTimeout();
resetHeartbeatTimer();
});

// Create a quorum that will track the number of nodes that have responded to the poll request.
Expand All @@ -112,6 +108,7 @@ private void sendPollRequests() {

// If there are no other members in the cluster, immediately transition to leader.
if (votingMembers.isEmpty()) {
raft.setLeader(null);
raft.transition(RaftServer.Role.CANDIDATE);
return;
}
Expand All @@ -120,9 +117,10 @@ private void sendPollRequests() {
// If a majority of the cluster indicated they would vote for us then transition to candidate.
complete.set(true);
if (elected) {
raft.setLeader(null);
raft.transition(RaftServer.Role.CANDIDATE);
} else {
resetHeartbeatTimeout();
resetHeartbeatTimer();
}
});

Expand All @@ -137,6 +135,8 @@ private void sendPollRequests() {
lastTerm = 0;
}

final DefaultRaftMember leader = raft.getLeader();

log.debug("Polling members {}", votingMembers);

// Once we got the last log term, iterate through each current member
Expand All @@ -162,7 +162,12 @@ private void sendPollRequests() {

if (!response.accepted()) {
log.debug("Received rejected poll from {}", member);
quorum.fail();
if (leader != null && response.term() == raft.getTerm() && member.memberId().equals(leader.memberId())) {
quorum.cancel();
resetHeartbeatTimer();
} else {
quorum.fail();
}
} else if (response.term() != raft.getTerm()) {
log.debug("Received accepted poll for a different term from {}", member);
quorum.fail();
Expand All @@ -176,43 +181,20 @@ private void sendPollRequests() {
}
}

@Override
public CompletableFuture<InstallResponse> onInstall(InstallRequest request) {
CompletableFuture<InstallResponse> future = super.onInstall(request);
resetHeartbeatTimeout();
return future;
}

@Override
public CompletableFuture<ConfigureResponse> onConfigure(ConfigureRequest request) {
CompletableFuture<ConfigureResponse> future = super.onConfigure(request);
resetHeartbeatTimeout();
return future;
}

@Override
public CompletableFuture<AppendResponse> onAppend(AppendRequest request) {
CompletableFuture<AppendResponse> future = super.onAppend(request);

// Reset the heartbeat timeout.
resetHeartbeatTimeout();
return future;
}

@Override
protected VoteResponse handleVote(VoteRequest request) {
// Reset the heartbeat timeout if we voted for another candidate.
VoteResponse response = super.handleVote(request);
if (response.voted()) {
resetHeartbeatTimeout();
failureDetector.report();
}
return response;
}

/**
* Cancels the heartbeat timeout.
* Cancels the heartbeat timer.
*/
private void cancelHeartbeatTimeout() {
private void cancelHeartbeatTimer() {
if (heartbeatTimer != null) {
log.trace("Cancelling heartbeat timer");
heartbeatTimer.cancel();
Expand All @@ -221,7 +203,7 @@ private void cancelHeartbeatTimeout() {

@Override
public synchronized CompletableFuture<Void> close() {
return super.close().thenRun(this::cancelHeartbeatTimeout);
return super.close().thenRun(this::cancelHeartbeatTimer);
}

}
Expand Up @@ -257,7 +257,7 @@ private void sendHeartbeat(RaftSessionContext session) {
}
}
// Otherwise, if the session is still active but has failed according to the failure detector, expire the session.
else if (session.getState().active() && session.isFailed()) {
else if (session.getState().active() && session.isFailed(raft.getSessionFailureThreshold())) {
expireSession(session);
}
}
Expand Down
Expand Up @@ -218,10 +218,11 @@ public void resetHeartbeats() {
/**
* Returns a boolean indicating whether the session appears to have failed due to lack of heartbeats.
*
* @param threshold The phi failure threshold
* @return Indicates whether the session has failed.
*/
public boolean isFailed() {
return failureDetector.phi() >= PHI_FAILURE_THRESHOLD;
public boolean isFailed(int threshold) {
return failureDetector.phi() >= threshold;
}

@Override
Expand Down

0 comments on commit 336478f

Please sign in to comment.