Skip to content

Commit

Permalink
Ensure last applied index is sent in query responses.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jul 5, 2017
1 parent b69fb4a commit b037587
Showing 1 changed file with 8 additions and 8 deletions.
Expand Up @@ -195,13 +195,13 @@ private void tick(long index, long timestamp) {
maybeInstallSnapshot(index);

// Expire sessions that have timed out.
expireSessions(currentIndex, currentTimestamp);
expireSessions(currentTimestamp);
}

/**
* Expires sessions that have timed out.
*/
private void expireSessions(long index, long timestamp) {
private void expireSessions(long timestamp) {
// Iterate through registered sessions.
for (RaftSessionContext session : sessions.getSessions()) {

Expand Down Expand Up @@ -594,17 +594,17 @@ private void sequenceQuery(long index, long sequence, long timestamp, RaftSessio
private void indexQuery(long index, long timestamp, RaftSessionContext session, RaftOperation operation, CompletableFuture<OperationResult> future) {
// If the query index is greater than the session's last applied index, queue the request for handling once the
// state machine is caught up.
if (index > session.getLastApplied()) {
session.registerIndexQuery(index, () -> applyQuery(index, timestamp, session, operation, future));
if (index > currentIndex) {
session.registerIndexQuery(index, () -> applyQuery(timestamp, session, operation, future));
} else {
applyQuery(index, timestamp, session, operation, future);
applyQuery(timestamp, session, operation, future);
}
}

/**
* Applies a query to the state machine.
*/
private void applyQuery(long index, long timestamp, RaftSessionContext session, RaftOperation operation, CompletableFuture<OperationResult> future) {
private void applyQuery(long timestamp, RaftSessionContext session, RaftOperation operation, CompletableFuture<OperationResult> future) {
// If the session is not open, fail the request.
if (!session.getState().active()) {
future.completeExceptionally(new RaftException.UnknownSession("Unknown session: " + session.sessionId()));
Expand All @@ -620,9 +620,9 @@ private void applyQuery(long index, long timestamp, RaftSessionContext session,

OperationResult result;
try {
result = OperationResult.succeeded(index, eventIndex, stateMachine.apply(commit));
result = OperationResult.succeeded(currentIndex, eventIndex, stateMachine.apply(commit));
} catch (Exception e) {
result = OperationResult.failed(index, eventIndex, e);
result = OperationResult.failed(currentIndex, eventIndex, e);
}
future.complete(result);
}
Expand Down

0 comments on commit b037587

Please sign in to comment.