From a93a10309fdda43e41ff0e74da26ed53c0bb3c2b Mon Sep 17 00:00:00 2001 From: Jordan Halterman Date: Wed, 1 Nov 2017 17:04:35 -0700 Subject: [PATCH] Ensure one heartbeat is sent per client for all sessions. --- .../protocols/raft/roles/LeaderRole.java | 45 ++++++++++--------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/protocols/raft/src/main/java/io/atomix/protocols/raft/roles/LeaderRole.java b/protocols/raft/src/main/java/io/atomix/protocols/raft/roles/LeaderRole.java index e138194195..b32ead92cb 100644 --- a/protocols/raft/src/main/java/io/atomix/protocols/raft/roles/LeaderRole.java +++ b/protocols/raft/src/main/java/io/atomix/protocols/raft/roles/LeaderRole.java @@ -18,6 +18,7 @@ import io.atomix.protocols.raft.RaftError; import io.atomix.protocols.raft.RaftException; import io.atomix.protocols.raft.RaftServer; +import io.atomix.protocols.raft.cluster.MemberId; import io.atomix.protocols.raft.cluster.RaftMember; import io.atomix.protocols.raft.cluster.impl.DefaultRaftMember; import io.atomix.protocols.raft.cluster.impl.RaftMemberContext; @@ -86,7 +87,7 @@ public final class LeaderRole extends ActiveRole { private final LeaderAppender appender; private Scheduled appendTimer; - private final Map heartbeatTimers = new HashMap<>(); + private final Map heartbeatTimers = new HashMap<>(); private long configuring; private boolean transferring; @@ -191,7 +192,7 @@ private void startHeartbeatTimer() { raft.getSessions().getSessions().forEach(s -> s.resetHeartbeats()); log.trace("Starting heartbeat timers"); raft.getSessions().getSessions().stream() - .map(s -> s.memberId().id()) + .map(s -> s.memberId()) .distinct() .forEach(this::resetHeartbeatTimer); } @@ -199,10 +200,10 @@ private void startHeartbeatTimer() { /** * Resets the heartbeat timer. */ - private void resetHeartbeatTimer(String member) { + private void resetHeartbeatTimer(MemberId member) { // Compute the smallest timeout of all open sessions for the member. OptionalLong minTimeout = raft.getSessions().getSessions().stream() - .filter(s -> s.memberId().id().equals(member)) + .filter(s -> s.memberId().equals(member)) .mapToLong(s -> s.minTimeout()) .min(); @@ -228,16 +229,16 @@ private void resetHeartbeatTimer(String member) { * * @param member the member to which to send heartbeats */ - private void sendHeartbeats(String member) { - raft.getSessions().getSessions().stream() - .filter(s -> s.memberId().id().equals(member)) - .forEach(this::sendHeartbeat); + private void sendHeartbeats(MemberId member) { + sendHeartbeat(member, raft.getSessions().getSessions().stream() + .filter(s -> s.memberId().equals(member)) + .collect(Collectors.toList())); } /** * Attempts to send a heartbeat to the given session. */ - private void sendHeartbeat(RaftSessionContext session) { + private void sendHeartbeat(MemberId member, Collection sessions) { long timestamp = System.currentTimeMillis(); HeartbeatRequest request = HeartbeatRequest.newBuilder() .withLeader(raft.getCluster().getMember().memberId()) @@ -246,20 +247,24 @@ private void sendHeartbeat(RaftSessionContext session) { .filter(m -> m != null) .collect(Collectors.toList())) .build(); - raft.getProtocol().heartbeat(session.memberId(), request).whenCompleteAsync((response, error) -> { + log.trace("Sending {}", request); + raft.getProtocol().heartbeat(member, request).whenCompleteAsync((response, error) -> { if (error == null && response.status() == RaftResponse.Status.OK) { - session.setHeartbeat(timestamp); + log.trace("Received {}", response); + sessions.forEach(s -> s.setHeartbeat(timestamp)); } else { - // If no heartbeats have been received, use the session's minimum timeout. - if (session.getHeartbeat() == 0) { - if (timestamp - raft.getLastHeartbeatTime() > session.minTimeout()) { + sessions.forEach(session -> { + // If no heartbeats have been received, use the session's minimum timeout. + if (session.getHeartbeat() == 0) { + if (timestamp - raft.getLastHeartbeatTime() > session.minTimeout()) { + expireSession(session); + } + } + // Otherwise, if the session is still active but has failed according to the failure detector, expire the session. + else if (session.getState().active() && session.isFailed(raft.getSessionFailureThreshold())) { expireSession(session); } - } - // Otherwise, if the session is still active but has failed according to the failure detector, expire the session. - else if (session.getState().active() && session.isFailed(raft.getSessionFailureThreshold())) { - expireSession(session); - } + }); } }, raft.getThreadContext()); } @@ -860,7 +865,7 @@ public CompletableFuture onOpenSession(OpenSessionRequest r if (commitError == null) { raft.getStateMachine().apply(entry.index()).whenComplete((sessionId, sessionError) -> { if (sessionError == null) { - resetHeartbeatTimer(request.member()); + resetHeartbeatTimer(MemberId.from(request.member())); future.complete(logResponse(OpenSessionResponse.newBuilder() .withStatus(RaftResponse.Status.OK) .withSession(sessionId)