Skip to content

Commit

Permalink
Don't send keep-alive requests for sessions that have been updated vi…
Browse files Browse the repository at this point in the history
…a commands.
  • Loading branch information
kuujo committed Jun 27, 2017
1 parent 408b36e commit 4ff4021
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 13 deletions.
Expand Up @@ -34,7 +34,7 @@

import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
Expand All @@ -44,6 +44,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
Expand Down Expand Up @@ -253,17 +254,36 @@ private void keepAliveSessions() {
* Sends a keep-alive request to the cluster.
*/
private synchronized void keepAliveSessions(boolean retryOnFailure) {
Map<Long, RaftProxyState> sessions = new HashMap<>(this.sessions);
long[] sessionIds = new long[sessions.size()];
long[] commandResponses = new long[sessions.size()];
long[] eventIndexes = new long[sessions.size()];
final long currentTime = System.currentTimeMillis();

// Filter the list of sessions that need keep-alive requests to be sent.
// If a session has been recently updated (a command has recently been committed via the session)
// then sending a keep-alive for the session is redundant.
List<RaftProxyState> needKeepAlive = sessions.values()
.stream()
.filter(s -> currentTime - s.getLastUpdated() > s.getSessionTimeout() / 2)
.collect(Collectors.toList());

// If no sessions need keep-alives to be sent, skip and reschedule the keep-alive.
if (needKeepAlive.isEmpty()) {
scheduleKeepAlive();
return;
}

// Allocate session IDs, command response sequence numbers, and event index arrays.
long[] sessionIds = new long[needKeepAlive.size()];
long[] commandResponses = new long[needKeepAlive.size()];
long[] eventIndexes = new long[needKeepAlive.size()];

// For each session that needs to be kept alive, populate batch request arrays.
int i = 0;
for (RaftProxyState sessionState : sessions.values()) {
sessionIds[i] = sessionState.getSessionId();
commandResponses[i] = sessionState.getCommandResponse();
eventIndexes[i] = sessionState.getEventIndex();
i++;
for (RaftProxyState sessionState : needKeepAlive) {
if (currentTime - sessionState.getLastUpdated() > sessionState.getSessionTimeout() / 2) {
sessionIds[i] = sessionState.getSessionId();
commandResponses[i] = sessionState.getCommandResponse();
eventIndexes[i] = sessionState.getEventIndex();
i++;
}
}

KeepAliveRequest request = KeepAliveRequest.newBuilder()
Expand All @@ -280,7 +300,7 @@ private synchronized void keepAliveSessions(boolean retryOnFailure) {
// If the request was successful, update the address selector and schedule the next keep-alive.
if (response.status() == RaftResponse.Status.OK) {
selectorManager.resetAll(response.leader(), response.members());
sessions.values().forEach(s -> s.setState(RaftProxy.State.CONNECTED));
needKeepAlive.forEach(s -> s.setState(RaftProxy.State.CONNECTED));
scheduleKeepAlive();
}
// If a leader is still set in the address selector, unset the leader and attempt to send another keep-alive.
Expand All @@ -291,7 +311,7 @@ else if (retryOnFailure && connection.leader() != null) {
}
// If no leader was set, set the session state to unstable and schedule another keep-alive.
else {
sessions.values().forEach(s -> s.setState(RaftProxy.State.SUSPENDED));
needKeepAlive.forEach(s -> s.setState(RaftProxy.State.SUSPENDED));
selectorManager.resetAll();
scheduleKeepAlive();
}
Expand All @@ -304,7 +324,7 @@ else if (retryOnFailure && connection.leader() != null) {
}
// If no leader was set, set the session state to unstable and schedule another keep-alive.
else {
sessions.values().forEach(s -> s.setState(RaftProxy.State.SUSPENDED));
needKeepAlive.forEach(s -> s.setState(RaftProxy.State.SUSPENDED));
selectorManager.resetAll();
scheduleKeepAlive();
}
Expand Down
Expand Up @@ -37,6 +37,7 @@ public final class RaftProxyState {
private volatile long commandResponse;
private long responseIndex;
private volatile long eventIndex;
private volatile long lastUpdated;
private final Set<Consumer<RaftProxy.State>> changeListeners = new CopyOnWriteArraySet<>();

RaftProxyState(long sessionId, String name, String type, long timeout) {
Expand Down Expand Up @@ -215,4 +216,21 @@ public long getEventIndex() {
return eventIndex;
}

/**
* Sets the last time the session was updated.
*
* @param lastUpdated The last time the session was updated.
*/
public void setLastUpdated(long lastUpdated) {
this.lastUpdated = lastUpdated;
}

/**
* Returns the last time the session was updated.
*
* @return The last time the session was updated.
*/
public long getLastUpdated() {
return lastUpdated;
}
}
Expand Up @@ -317,6 +317,8 @@ public void retry(Duration after) {
* Command operation attempt.
*/
private final class CommandAttempt<T> extends OperationAttempt<CommandRequest, CommandResponse, T> {
private final long time = System.currentTimeMillis();

public CommandAttempt(long sequence, CommandRequest request, CompletableFuture<T> future) {
super(sequence, 1, request, future);
}
Expand Down Expand Up @@ -389,6 +391,7 @@ public void fail(Throwable cause) {
@SuppressWarnings("unchecked")
protected void complete(CommandResponse response) {
sequence(response, () -> {
state.setLastUpdated(time);
state.setCommandResponse(request.sequenceNumber());
state.setResponseIndex(response.index());
future.complete((T) response.result());
Expand Down

0 comments on commit 4ff4021

Please sign in to comment.