Skip to content

Commit

Permalink
Record timestamp *after* heartbeat response in leader heartbeats.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Nov 3, 2017
1 parent 62a6481 commit 16c03cc
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 32 deletions.
Expand Up @@ -272,7 +272,7 @@ private void restoreSession(SnapshotReader reader, DefaultServiceContext service
service, service,
raft, raft,
threadContextFactory); threadContextFactory);
session.setTimestamp(sessionTimestamp); session.setLastUpdated(sessionTimestamp);
session.setRequestSequence(reader.readLong()); session.setRequestSequence(reader.readLong());
session.setCommandSequence(reader.readLong()); session.setCommandSequence(reader.readLong());
session.setEventIndex(reader.readLong()); session.setEventIndex(reader.readLong());
Expand Down
Expand Up @@ -239,7 +239,6 @@ private void sendHeartbeats(MemberId member) {
* Attempts to send a heartbeat to the given session. * Attempts to send a heartbeat to the given session.
*/ */
private void sendHeartbeat(MemberId member, Collection<RaftSessionContext> sessions) { private void sendHeartbeat(MemberId member, Collection<RaftSessionContext> sessions) {
long timestamp = System.currentTimeMillis();
HeartbeatRequest request = HeartbeatRequest.newBuilder() HeartbeatRequest request = HeartbeatRequest.newBuilder()
.withLeader(raft.getCluster().getMember().memberId()) .withLeader(raft.getCluster().getMember().memberId())
.withMembers(raft.getCluster().getMembers().stream() .withMembers(raft.getCluster().getMembers().stream()
Expand All @@ -249,13 +248,14 @@ private void sendHeartbeat(MemberId member, Collection<RaftSessionContext> sessi
.build(); .build();
log.trace("Sending {}", request); log.trace("Sending {}", request);
raft.getProtocol().heartbeat(member, request).whenCompleteAsync((response, error) -> { raft.getProtocol().heartbeat(member, request).whenCompleteAsync((response, error) -> {
long timestamp = System.currentTimeMillis();
if (error == null && response.status() == RaftResponse.Status.OK) { if (error == null && response.status() == RaftResponse.Status.OK) {
log.trace("Received {}", response); log.trace("Received {}", response);
sessions.forEach(s -> s.setHeartbeat(timestamp)); sessions.forEach(s -> s.setLastHeartbeat(timestamp));
} else { } else {
sessions.forEach(session -> { sessions.forEach(session -> {
// If no heartbeats have been received, use the session's minimum timeout. // If no heartbeats have been received, use the session's minimum timeout.
if (session.getHeartbeat() == 0) { if (session.getLastHeartbeat() == 0) {
if (timestamp - raft.getLastHeartbeatTime() > session.minTimeout()) { if (timestamp - raft.getLastHeartbeatTime() > session.minTimeout()) {
expireSession(session); expireSession(session);
} }
Expand Down
Expand Up @@ -210,7 +210,7 @@ private void expireSessions(long timestamp) {
// Iterate through registered sessions. // Iterate through registered sessions.
for (RaftSessionContext session : sessions.getSessions()) { for (RaftSessionContext session : sessions.getSessions()) {
if (session.isTimedOut(timestamp)) { if (session.isTimedOut(timestamp)) {
log.debug("Session expired in {} milliseconds: {}", timestamp - session.getTimestamp(), session); log.debug("Session expired in {} milliseconds: {}", timestamp - session.getLastUpdated(), session);
log.debug("Closing session {}", session.sessionId()); log.debug("Closing session {}", session.sessionId());
sessions.expireSession(session); sessions.expireSession(session);
} }
Expand Down Expand Up @@ -279,7 +279,7 @@ private void maybeInstallSnapshot(long index) {
this, this,
raft, raft,
threadContextFactory); threadContextFactory);
session.setTimestamp(sessionTimestamp); session.setLastUpdated(sessionTimestamp);
session.setRequestSequence(reader.readLong()); session.setRequestSequence(reader.readLong());
session.setCommandSequence(reader.readLong()); session.setCommandSequence(reader.readLong());
session.setEventIndex(reader.readLong()); session.setEventIndex(reader.readLong());
Expand Down Expand Up @@ -343,7 +343,7 @@ public CompletableFuture<Long> takeSnapshot(long index) {
writer.writeString(session.readConsistency().name()); writer.writeString(session.readConsistency().name());
writer.writeLong(session.minTimeout()); writer.writeLong(session.minTimeout());
writer.writeLong(session.maxTimeout()); writer.writeLong(session.maxTimeout());
writer.writeLong(session.getTimestamp()); writer.writeLong(session.getLastUpdated());
writer.writeLong(session.getRequestSequence()); writer.writeLong(session.getRequestSequence());
writer.writeLong(session.getCommandSequence()); writer.writeLong(session.getCommandSequence());
writer.writeLong(session.getEventIndex()); writer.writeLong(session.getEventIndex());
Expand Down Expand Up @@ -391,7 +391,7 @@ public CompletableFuture<Long> openSession(long index, long timestamp, RaftSessi
log.debug("Opening session {}", session.sessionId()); log.debug("Opening session {}", session.sessionId());


// Update the session's timestamp to prevent it from being expired. // Update the session's timestamp to prevent it from being expired.
session.setTimestamp(timestamp); session.setLastUpdated(timestamp);


// Update the state machine index/timestamp. // Update the state machine index/timestamp.
tick(index, timestamp); tick(index, timestamp);
Expand Down Expand Up @@ -439,7 +439,7 @@ public CompletableFuture<Boolean> keepAlive(long index, long timestamp, RaftSess
// The session may have been closed by the time this update was executed on the service thread. // The session may have been closed by the time this update was executed on the service thread.
if (session.getState() != RaftSession.State.CLOSED) { if (session.getState() != RaftSession.State.CLOSED) {
// Update the session's timestamp to prevent it from being expired. // Update the session's timestamp to prevent it from being expired.
session.setTimestamp(timestamp); session.setLastUpdated(timestamp);


// Clear results cached in the session. // Clear results cached in the session.
session.clearResults(commandSequence); session.clearResults(commandSequence);
Expand Down Expand Up @@ -514,7 +514,7 @@ public CompletableFuture<Void> keepAliveSessions(long index, long timestamp) {
this.currentTimestamp = Math.max(currentTimestamp, timestamp); this.currentTimestamp = Math.max(currentTimestamp, timestamp);


for (RaftSessionContext session : sessions.getSessions()) { for (RaftSessionContext session : sessions.getSessions()) {
session.setTimestamp(timestamp); session.setLastUpdated(timestamp);
} }
}); });
return future; return future;
Expand All @@ -534,7 +534,7 @@ public CompletableFuture<Void> closeSession(long index, long timestamp, RaftSess
log.debug("Closing session {}", session.sessionId()); log.debug("Closing session {}", session.sessionId());


// Update the session's timestamp to prevent it from being expired. // Update the session's timestamp to prevent it from being expired.
session.setTimestamp(timestamp); session.setLastUpdated(timestamp);


// Update the state machine index/timestamp. // Update the state machine index/timestamp.
tick(index, timestamp); tick(index, timestamp);
Expand Down Expand Up @@ -585,7 +585,7 @@ public CompletableFuture<OperationResult> executeCommand(long index, long sequen
*/ */
private void executeCommand(long index, long sequence, long timestamp, RaftSessionContext session, RaftOperation operation, CompletableFuture<OperationResult> future) { private void executeCommand(long index, long sequence, long timestamp, RaftSessionContext session, RaftOperation operation, CompletableFuture<OperationResult> future) {
// Update the session's timestamp to prevent it from being expired. // Update the session's timestamp to prevent it from being expired.
session.setTimestamp(timestamp); session.setLastUpdated(timestamp);


// Update the state machine index/timestamp. // Update the state machine index/timestamp.
tick(index, timestamp); tick(index, timestamp);
Expand Down
Expand Up @@ -67,8 +67,8 @@ public class RaftSessionContext implements RaftSession {
private final RaftContext server; private final RaftContext server;
private final ThreadContext eventExecutor; private final ThreadContext eventExecutor;
private volatile State state = State.OPEN; private volatile State state = State.OPEN;
private volatile long timestamp; private volatile long lastUpdated;
private long heartbeat; private long lastHeartbeat;
private PhiAccrualFailureDetector failureDetector = new PhiAccrualFailureDetector(); private PhiAccrualFailureDetector failureDetector = new PhiAccrualFailureDetector();
private long requestSequence; private long requestSequence;
private volatile long commandSequence; private volatile long commandSequence;
Expand Down Expand Up @@ -165,17 +165,17 @@ public DefaultServiceContext getService() {
* *
* @return The session update timestamp. * @return The session update timestamp.
*/ */
public long getTimestamp() { public long getLastUpdated() {
return timestamp; return lastUpdated;
} }


/** /**
* Updates the session timestamp. * Updates the session timestamp.
* *
* @param timestamp The session timestamp. * @param lastUpdated The session timestamp.
*/ */
public void setTimestamp(long timestamp) { public void setLastUpdated(long lastUpdated) {
this.timestamp = Math.max(this.timestamp, timestamp); this.lastUpdated = Math.max(this.lastUpdated, lastUpdated);
} }


/** /**
Expand All @@ -185,7 +185,7 @@ public void setTimestamp(long timestamp) {
* @return indicates whether the session is timed out * @return indicates whether the session is timed out
*/ */
public boolean isTimedOut(long timestamp) { public boolean isTimedOut(long timestamp) {
long lastUpdated = this.timestamp; long lastUpdated = this.lastUpdated;
return lastUpdated > 0 && timestamp - lastUpdated > maxTimeout; return lastUpdated > 0 && timestamp - lastUpdated > maxTimeout;
} }


Expand All @@ -194,25 +194,25 @@ public boolean isTimedOut(long timestamp) {
* *
* @return The current heartbeat time. * @return The current heartbeat time.
*/ */
public long getHeartbeat() { public long getLastHeartbeat() {
return heartbeat; return lastHeartbeat;
} }


/** /**
* Sets the current heartbeat time. * Sets the last heartbeat time.
* *
* @param heartbeat The current heartbeat time. * @param lastHeartbeat The last heartbeat time.
*/ */
public void setHeartbeat(long heartbeat) { public void setLastHeartbeat(long lastHeartbeat) {
this.heartbeat = Math.max(this.heartbeat, heartbeat); this.lastHeartbeat = Math.max(this.lastHeartbeat, lastHeartbeat);
failureDetector.report(heartbeat); failureDetector.report(lastHeartbeat);
} }


/** /**
* Resets heartbeat times. * Resets heartbeat times.
*/ */
public void resetHeartbeats() { public void resetHeartbeats() {
this.heartbeat = 0; this.lastHeartbeat = 0;
this.failureDetector = new PhiAccrualFailureDetector(); this.failureDetector = new PhiAccrualFailureDetector();
} }


Expand Down Expand Up @@ -242,13 +242,13 @@ private void setState(State state) {
log.debug("State changed: {}", state); log.debug("State changed: {}", state);
switch (state) { switch (state) {
case OPEN: case OPEN:
eventListeners.forEach(l -> l.onEvent(new RaftSessionEvent(RaftSessionEvent.Type.OPEN, this, getTimestamp()))); eventListeners.forEach(l -> l.onEvent(new RaftSessionEvent(RaftSessionEvent.Type.OPEN, this, getLastUpdated())));
break; break;
case EXPIRED: case EXPIRED:
eventListeners.forEach(l -> l.onEvent(new RaftSessionEvent(RaftSessionEvent.Type.EXPIRE, this, getTimestamp()))); eventListeners.forEach(l -> l.onEvent(new RaftSessionEvent(RaftSessionEvent.Type.EXPIRE, this, getLastUpdated())));
break; break;
case CLOSED: case CLOSED:
eventListeners.forEach(l -> l.onEvent(new RaftSessionEvent(RaftSessionEvent.Type.CLOSE, this, getTimestamp()))); eventListeners.forEach(l -> l.onEvent(new RaftSessionEvent(RaftSessionEvent.Type.CLOSE, this, getLastUpdated())));
break; break;
} }
} }
Expand Down Expand Up @@ -594,7 +594,7 @@ public String toString() {
return toStringHelper(this) return toStringHelper(this)
.addValue(context) .addValue(context)
.add("session", sessionId) .add("session", sessionId)
.add("timestamp", TimestampPrinter.of(timestamp)) .add("timestamp", TimestampPrinter.of(lastUpdated))
.toString(); .toString();
} }


Expand Down

0 comments on commit 16c03cc

Please sign in to comment.