Skip to content

Commit

Permalink
Ensure one heartbeat is sent per client for all sessions.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Nov 3, 2017
1 parent 336478f commit a93a103
Showing 1 changed file with 25 additions and 20 deletions.
Expand Up @@ -18,6 +18,7 @@
import io.atomix.protocols.raft.RaftError;
import io.atomix.protocols.raft.RaftException;
import io.atomix.protocols.raft.RaftServer;
import io.atomix.protocols.raft.cluster.MemberId;
import io.atomix.protocols.raft.cluster.RaftMember;
import io.atomix.protocols.raft.cluster.impl.DefaultRaftMember;
import io.atomix.protocols.raft.cluster.impl.RaftMemberContext;
Expand Down Expand Up @@ -86,7 +87,7 @@ public final class LeaderRole extends ActiveRole {

private final LeaderAppender appender;
private Scheduled appendTimer;
private final Map<String, Scheduled> heartbeatTimers = new HashMap<>();
private final Map<MemberId, Scheduled> heartbeatTimers = new HashMap<>();
private long configuring;
private boolean transferring;

Expand Down Expand Up @@ -191,18 +192,18 @@ private void startHeartbeatTimer() {
raft.getSessions().getSessions().forEach(s -> s.resetHeartbeats());
log.trace("Starting heartbeat timers");
raft.getSessions().getSessions().stream()
.map(s -> s.memberId().id())
.map(s -> s.memberId())
.distinct()
.forEach(this::resetHeartbeatTimer);
}

/**
* Resets the heartbeat timer.
*/
private void resetHeartbeatTimer(String member) {
private void resetHeartbeatTimer(MemberId member) {
// Compute the smallest timeout of all open sessions for the member.
OptionalLong minTimeout = raft.getSessions().getSessions().stream()
.filter(s -> s.memberId().id().equals(member))
.filter(s -> s.memberId().equals(member))
.mapToLong(s -> s.minTimeout())
.min();

Expand All @@ -228,16 +229,16 @@ private void resetHeartbeatTimer(String member) {
*
* @param member the member to which to send heartbeats
*/
private void sendHeartbeats(String member) {
raft.getSessions().getSessions().stream()
.filter(s -> s.memberId().id().equals(member))
.forEach(this::sendHeartbeat);
private void sendHeartbeats(MemberId member) {
sendHeartbeat(member, raft.getSessions().getSessions().stream()
.filter(s -> s.memberId().equals(member))
.collect(Collectors.toList()));
}

/**
* Attempts to send a heartbeat to the given session.
*/
private void sendHeartbeat(RaftSessionContext session) {
private void sendHeartbeat(MemberId member, Collection<RaftSessionContext> sessions) {
long timestamp = System.currentTimeMillis();
HeartbeatRequest request = HeartbeatRequest.newBuilder()
.withLeader(raft.getCluster().getMember().memberId())
Expand All @@ -246,20 +247,24 @@ private void sendHeartbeat(RaftSessionContext session) {
.filter(m -> m != null)
.collect(Collectors.toList()))
.build();
raft.getProtocol().heartbeat(session.memberId(), request).whenCompleteAsync((response, error) -> {
log.trace("Sending {}", request);
raft.getProtocol().heartbeat(member, request).whenCompleteAsync((response, error) -> {
if (error == null && response.status() == RaftResponse.Status.OK) {
session.setHeartbeat(timestamp);
log.trace("Received {}", response);
sessions.forEach(s -> s.setHeartbeat(timestamp));
} else {
// If no heartbeats have been received, use the session's minimum timeout.
if (session.getHeartbeat() == 0) {
if (timestamp - raft.getLastHeartbeatTime() > session.minTimeout()) {
sessions.forEach(session -> {
// If no heartbeats have been received, use the session's minimum timeout.
if (session.getHeartbeat() == 0) {
if (timestamp - raft.getLastHeartbeatTime() > session.minTimeout()) {
expireSession(session);
}
}
// Otherwise, if the session is still active but has failed according to the failure detector, expire the session.
else if (session.getState().active() && session.isFailed(raft.getSessionFailureThreshold())) {
expireSession(session);
}
}
// Otherwise, if the session is still active but has failed according to the failure detector, expire the session.
else if (session.getState().active() && session.isFailed(raft.getSessionFailureThreshold())) {
expireSession(session);
}
});
}
}, raft.getThreadContext());
}
Expand Down Expand Up @@ -860,7 +865,7 @@ public CompletableFuture<OpenSessionResponse> onOpenSession(OpenSessionRequest r
if (commitError == null) {
raft.getStateMachine().<Long>apply(entry.index()).whenComplete((sessionId, sessionError) -> {
if (sessionError == null) {
resetHeartbeatTimer(request.member());
resetHeartbeatTimer(MemberId.from(request.member()));
future.complete(logResponse(OpenSessionResponse.newBuilder()
.withStatus(RaftResponse.Status.OK)
.withSession(sessionId)
Expand Down

0 comments on commit a93a103

Please sign in to comment.