Skip to content

Commit

Permalink
Ensure sessions are not expired prior to being registered on the stat…
Browse files Browse the repository at this point in the history
…e machine thread.
  • Loading branch information
kuujo committed Dec 11, 2017
1 parent 06368e4 commit 1ae9880
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 11 deletions.
Expand Up @@ -392,8 +392,8 @@ public CompletableFuture<Long> openSession(long index, long timestamp, RaftSessi
serviceExecutor.execute(() -> {
log.debug("Opening session {}", session.sessionId());

// Update the session's timestamp to prevent it from being expired.
session.setLastUpdated(timestamp);
// Open the session at the current timestamp.
session.open(timestamp);

// If a snapshot exists prior to the given index and hasn't yet been installed, install the snapshot.
maybeInstallSnapshot(index);
Expand Down Expand Up @@ -476,7 +476,7 @@ public CompletableFuture<Boolean> keepAlive(long index, long timestamp, RaftSess
/**
* Completes a keep-alive.
*
* @param index the keep-alive index
* @param index the keep-alive index
* @param timestamp the keep-alive timestamp
* @return future to be completed once the keep alive is completed
*/
Expand All @@ -503,7 +503,7 @@ public CompletableFuture<Void> completeKeepAlive(long index, long timestamp) {
/**
* Keeps all sessions alive using the given timestamp.
*
* @param index the index of the timestamp
* @param index the index of the timestamp
* @param timestamp the timestamp with which to reset session timeouts
* @return future to be completed once all sessions have been preserved
*/
Expand Down Expand Up @@ -573,7 +573,7 @@ public CompletableFuture<Void> closeSession(long index, long timestamp, RaftSess
* @param timestamp The timestamp of the command.
* @param sequence The command sequence number.
* @param session The session that submitted the command.
* @param operation The command to execute.
* @param operation The command to execute.
* @return A future to be completed with the command result.
*/
public CompletableFuture<OperationResult> executeCommand(long index, long sequence, long timestamp, RaftSession session, PrimitiveOperation operation) {
Expand All @@ -597,6 +597,7 @@ private void executeCommand(long index, long sequence, long timestamp, RaftSessi

// If the session is not open, fail the request.
if (!session.getState().active()) {
log.warn("Session not open: {}", session);
future.completeExceptionally(new RaftException.UnknownSession("Unknown session: " + session.sessionId()));
return;
}
Expand All @@ -605,6 +606,7 @@ private void executeCommand(long index, long sequence, long timestamp, RaftSessi
// we've received a command that was previously applied to the state machine. Ensure linearizability by
// returning the cached response instead of applying it to the user defined state machine.
if (sequence > 0 && sequence < session.nextCommandSequence()) {
log.trace("Returning cached result for command with sequence number {} < {}", sequence, session.nextCommandSequence());
sequenceCommand(index, sequence, session, future);
}
// If we've made it this far, the command must have been applied in the proper order as sequenced by the
Expand Down Expand Up @@ -669,7 +671,7 @@ private void applyCommand(long index, long sequence, long timestamp, PrimitiveOp
* @param sequence The query sequence number.
* @param timestamp The timestamp of the query.
* @param session The session that submitted the query.
* @param operation The query to execute.
* @param operation The query to execute.
* @return A future to be completed with the query result.
*/
public CompletableFuture<OperationResult> executeQuery(long index, long sequence, long timestamp, RaftSession session, PrimitiveOperation operation) {
Expand Down
Expand Up @@ -65,7 +65,7 @@ public class RaftSession implements Session {
private final DefaultServiceContext context;
private final RaftContext server;
private final ThreadContext eventExecutor;
private volatile State state = State.OPEN;
private volatile State state = State.CLOSED;
private volatile long lastUpdated;
private long lastHeartbeat;
private PhiAccrualFailureDetector failureDetector = new PhiAccrualFailureDetector();
Expand Down Expand Up @@ -112,7 +112,6 @@ public RaftSession(
.add("type", context.serviceType())
.add("name", context.serviceName())
.build());
protocol.registerResetListener(sessionId, request -> resendEvents(request.index()), context.executor());
}

@Override
Expand Down Expand Up @@ -566,6 +565,15 @@ private void sendEvents(EventHolder event) {
}
}

/**
* Opens the session.
*/
public void open(long timestamp) {
setState(State.OPEN);
setLastUpdated(timestamp);
protocol.registerResetListener(sessionId, request -> resendEvents(request.index()), context.executor());
}

/**
* Expires the session.
*/
Expand Down
Expand Up @@ -16,8 +16,8 @@
package io.atomix.protocols.raft.session.impl;

import io.atomix.primitive.PrimitiveId;
import io.atomix.primitive.session.SessionListener;
import io.atomix.primitive.session.SessionId;
import io.atomix.primitive.session.SessionListener;

import java.util.Collection;
import java.util.Map;
Expand Down Expand Up @@ -111,6 +111,7 @@ public Collection<RaftSession> getSessions() {
public Collection<RaftSession> getSessions(PrimitiveId primitiveId) {
return sessions.values().stream()
.filter(session -> session.getService().serviceId().equals(primitiveId))
.filter(session -> session.getState().active())
.collect(Collectors.toSet());
}

Expand All @@ -126,7 +127,7 @@ public void removeSessions(PrimitiveId primitiveId) {
/**
* Adds a session listener.
*
* @param primitiveId the service ID for which to listen to sessions
* @param primitiveId the service ID for which to listen to sessions
* @param sessionListener the session listener
*/
public void addListener(PrimitiveId primitiveId, SessionListener sessionListener) {
Expand All @@ -137,7 +138,7 @@ public void addListener(PrimitiveId primitiveId, SessionListener sessionListener
/**
* Removes a session listener.
*
* @param primitiveId the service ID with which the listener is associated
* @param primitiveId the service ID with which the listener is associated
* @param sessionListener the session listener
*/
public void removeListener(PrimitiveId primitiveId, SessionListener sessionListener) {
Expand Down

0 comments on commit 1ae9880

Please sign in to comment.