Skip to content

Commit

Permalink
Remove server-side command queueing to ensure clients reset sequence …
Browse files Browse the repository at this point in the history
…numbers as quickly as possible on leader changes.
  • Loading branch information
kuujo committed Sep 5, 2017
1 parent 6692034 commit 595780d
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 107 deletions.
Expand Up @@ -554,26 +554,19 @@ public CompletableFuture<CommandResponse> onCommand(final CommandRequest request

final long sequenceNumber = request.sequenceNumber();

// If a command with the given sequence number is already pending, return the existing future to ensure
// duplicate requests aren't committed as duplicate entries in the log.
PendingCommand existingCommand = session.getCommand(sequenceNumber);
if (existingCommand != null) {
if (sequenceNumber == session.nextRequestSequence()) {
session.removeCommand(sequenceNumber);
commitCommand(existingCommand.request(), existingCommand.future());
session.setRequestSequence(sequenceNumber);
drainCommands(session);
}
return existingCommand.future();
}

final CompletableFuture<CommandResponse> future = new CompletableFuture<>();

// If the request sequence number is greater than the next sequence number, that indicates a command is missing.
// Register the command request and return a future to be completed once commands are properly sequenced.
// If the session's current sequence number is less then one prior to the request sequence number, reject
// the command to force it to be resent in the correct order. Note that it's possible for the session
// sequence number to be greater than the request sequence number. In that case, it's likely that the
// command was submitted more than once to the cluster, and the command will be deduplicated once
// applied to the state machine.
if (sequenceNumber > session.nextRequestSequence()) {
session.registerCommand(request.sequenceNumber(), new PendingCommand(request, future));
return future;
return CompletableFuture.completedFuture(logResponse(CommandResponse.newBuilder()
.withStatus(RaftResponse.Status.ERROR)
.withError(RaftError.Type.COMMAND_FAILURE)
.withLastSequence(session.getRequestSequence())
.build()));
}

// If the command has already been applied to the state machine then return a cached result if possible, otherwise
Expand All @@ -595,30 +588,11 @@ public CompletableFuture<CommandResponse> onCommand(final CommandRequest request
else {
commitCommand(request, future);
session.setRequestSequence(sequenceNumber);

// Once the sequence number has been updated, drain any additional pending commands after this sequence number.
drainCommands(session);
}

return future.thenApply(this::logResponse);
}

/**
* Sequentially drains pending commands from the session's command request queue.
*
* @param session the session for which to drain commands
*/
private void drainCommands(RaftSessionContext session) {
long nextSequence = session.nextRequestSequence();
PendingCommand nextCommand = session.removeCommand(nextSequence);
while (nextCommand != null) {
commitCommand(nextCommand.request(), nextCommand.future());
session.setRequestSequence(nextSequence);
nextSequence = session.nextRequestSequence();
nextCommand = session.removeCommand(nextSequence);
}
}

/**
* Commits a command.
*
Expand Down Expand Up @@ -821,14 +795,6 @@ public CompletableFuture<KeepAliveResponse> onKeepAlive(KeepAliveRequest request
if (commitError == null) {
raft.getStateMachine().<long[]>apply(entry.index()).whenCompleteAsync((sessionResult, sessionError) -> {
if (sessionError == null) {
// Iterate through kept alive session IDs and drain commands if necessary.
for (long sessionId : sessionResult) {
RaftSessionContext session = raft.getStateMachine().getSessions().getSession(sessionId);
if (session != null && session.getState().active()) {
drainCommands(session);
}
}

future.complete(logResponse(KeepAliveResponse.newBuilder()
.withStatus(RaftResponse.Status.OK)
.withLeader(raft.getCluster().getMember().memberId())
Expand Down Expand Up @@ -953,28 +919,12 @@ private void stepDown() {
}
}

/**
* Fails pending commands.
*/
private void failPendingCommands() {
for (RaftSessionContext session : raft.getStateMachine().getSessions().getSessions()) {
for (PendingCommand command : session.clearCommands()) {
command.future().complete(logResponse(CommandResponse.newBuilder()
.withStatus(RaftResponse.Status.ERROR)
.withError(RaftError.Type.COMMAND_FAILURE, "Request sequence number " + command.request().sequenceNumber() + " out of sequence")
.withLastSequence(session.getRequestSequence())
.build()));
}
}
}

@Override
public synchronized CompletableFuture<Void> close() {
return super.close()
.thenRun(appender::close)
.thenRun(this::cancelAppendTimer)
.thenRun(this::stepDown)
.thenRun(this::failPendingCommands);
.thenRun(this::stepDown);
}

}
Expand Up @@ -660,6 +660,7 @@ public CompletableFuture<OperationResult> executeQuery(long index, long sequence
private void executeQuery(long index, long sequence, long timestamp, RaftSessionContext session, RaftOperation operation, CompletableFuture<OperationResult> future) {
// If the session is not open, fail the request.
if (!session.getState().active()) {
log.warn("Inactive session: " + session.sessionId());
future.completeExceptionally(new RaftException.UnknownSession("Unknown session: " + session.sessionId()));
return;
}
Expand All @@ -674,7 +675,9 @@ private void executeQuery(long index, long sequence, long timestamp, RaftSession
private void sequenceQuery(long index, long sequence, long timestamp, RaftSessionContext session, RaftOperation operation, CompletableFuture<OperationResult> future) {
// If the query's sequence number is greater than the session's current sequence number, queue the request for
// handling once the state machine is caught up.
if (sequence > session.getCommandSequence()) {
long commandSequence = session.getCommandSequence();
if (sequence > commandSequence) {
log.trace("Registering query with sequence number " + sequence + " > " + commandSequence);
session.registerSequenceQuery(sequence, () -> indexQuery(index, timestamp, session, operation, future));
} else {
indexQuery(index, timestamp, session, operation, future);
Expand All @@ -688,6 +691,7 @@ private void indexQuery(long index, long timestamp, RaftSessionContext session,
// If the query index is greater than the session's last applied index, queue the request for handling once the
// state machine is caught up.
if (index > currentIndex) {
log.trace("Registering query with index " + index + " > " + currentIndex);
session.registerIndexQuery(index, () -> applyQuery(timestamp, session, operation, future));
} else {
applyQuery(timestamp, session, operation, future);
Expand All @@ -700,6 +704,7 @@ private void indexQuery(long index, long timestamp, RaftSessionContext session,
private void applyQuery(long timestamp, RaftSessionContext session, RaftOperation operation, CompletableFuture<OperationResult> future) {
// If the session is not open, fail the request.
if (!session.getState().active()) {
log.warn("Inactive session: " + session.sessionId());
future.completeExceptionally(new RaftException.UnknownSession("Unknown session: " + session.sessionId()));
return;
}
Expand Down
Expand Up @@ -15,7 +15,6 @@
*/
package io.atomix.protocols.raft.session.impl;

import com.google.common.collect.Lists;
import io.atomix.protocols.raft.ReadConsistency;
import io.atomix.protocols.raft.cluster.MemberId;
import io.atomix.protocols.raft.event.RaftEvent;
Expand All @@ -24,7 +23,6 @@
import io.atomix.protocols.raft.operation.OperationType;
import io.atomix.protocols.raft.protocol.PublishRequest;
import io.atomix.protocols.raft.protocol.RaftServerProtocol;
import io.atomix.protocols.raft.roles.PendingCommand;
import io.atomix.protocols.raft.service.ServiceType;
import io.atomix.protocols.raft.service.impl.DefaultServiceContext;
import io.atomix.protocols.raft.session.RaftSession;
Expand All @@ -38,7 +36,6 @@
import io.atomix.utils.logging.LoggerContext;
import org.slf4j.Logger;

import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -77,7 +74,6 @@ public class RaftSessionContext implements RaftSession {
private volatile long completeIndex;
private final Map<Long, List<Runnable>> sequenceQueries = new HashMap<>();
private final Map<Long, List<Runnable>> indexQueries = new HashMap<>();
private final Map<Long, PendingCommand> pendingCommands = new HashMap<>();
private final Map<Long, OperationResult> results = new HashMap<>();
private final Queue<EventHolder> events = new LinkedList<>();
private volatile EventHolder currentEventList;
Expand Down Expand Up @@ -339,47 +335,6 @@ public void registerIndexQuery(long index, Runnable query) {
queries.add(query);
}

/**
* Registers a pending command.
*
* @param sequence the pending command sequence number
* @param pendingCommand the pending command to register
*/
public void registerCommand(long sequence, PendingCommand pendingCommand) {
pendingCommands.put(sequence, pendingCommand);
}

/**
* Gets a pending command.
*
* @param sequence the pending command sequence number
* @return the pending command or {@code null} if no command is pending for this sequence number
*/
public PendingCommand getCommand(long sequence) {
return pendingCommands.get(sequence);
}

/**
* Removes and returns a pending command.
*
* @param sequence the pending command sequence number
* @return the pending command or {@code null} if no command is pending for this sequence number
*/
public PendingCommand removeCommand(long sequence) {
return pendingCommands.remove(sequence);
}

/**
* Clears and returns all pending commands.
*
* @return a collection of pending commands
*/
public Collection<PendingCommand> clearCommands() {
Collection<PendingCommand> commands = Lists.newArrayList(pendingCommands.values());
pendingCommands.clear();
return commands;
}

/**
* Registers a session result.
* <p>
Expand Down

0 comments on commit 595780d

Please sign in to comment.