Skip to content

Commit

Permalink
Attempt keep-alive retries until session appears to have expired.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Sep 12, 2017
1 parent b6944b1 commit 96f6ce5
Showing 1 changed file with 18 additions and 25 deletions.
Expand Up @@ -165,7 +165,7 @@ public CompletableFuture<RaftProxyClient> openSession(
}); });


// Ensure the proxy session info is reset and the session is kept alive. // Ensure the proxy session info is reset and the session is kept alive.
keepAliveSessions(state.getSessionTimeout()); keepAliveSessions(System.currentTimeMillis(), state.getSessionTimeout());


// Create the proxy client and complete the future. // Create the proxy client and complete the future.
RaftProxyClient client = new DiscreteRaftProxyClient( RaftProxyClient client = new DiscreteRaftProxyClient(
Expand Down Expand Up @@ -257,18 +257,11 @@ CompletableFuture<Void> resetIndexes(SessionId sessionId) {
/** /**
* Sends a keep-alive request to the cluster. * Sends a keep-alive request to the cluster.
*/ */
private void keepAliveSessions(long timeout) { private synchronized void keepAliveSessions(long lastKeepAliveTime, long sessionTimeout) {
keepAliveSessions(timeout, true);
}

/**
* Sends a keep-alive request to the cluster.
*/
private synchronized void keepAliveSessions(long timeout, boolean retryOnFailure) {
// Filter the list of sessions by timeout. // Filter the list of sessions by timeout.
List<RaftProxyState> needKeepAlive = sessions.values() List<RaftProxyState> needKeepAlive = sessions.values()
.stream() .stream()
.filter(session -> session.getSessionTimeout() == timeout) .filter(session -> session.getSessionTimeout() == sessionTimeout)
.collect(Collectors.toList()); .collect(Collectors.toList());


// If no sessions need keep-alives to be sent, skip and reschedule the keep-alive. // If no sessions need keep-alives to be sent, skip and reschedule the keep-alive.
Expand Down Expand Up @@ -298,10 +291,10 @@ private synchronized void keepAliveSessions(long timeout, boolean retryOnFailure
.withEventIndexes(eventIndexes) .withEventIndexes(eventIndexes)
.build(); .build();


long startTime = System.currentTimeMillis(); long keepAliveTime = System.currentTimeMillis();
connection.keepAlive(request).whenComplete((response, error) -> { connection.keepAlive(request).whenComplete((response, error) -> {
if (open.get()) { if (open.get()) {
long delta = System.currentTimeMillis() - startTime; long delta = System.currentTimeMillis() - keepAliveTime;
if (error == null) { if (error == null) {
// If the request was successful, update the address selector and schedule the next keep-alive. // If the request was successful, update the address selector and schedule the next keep-alive.
if (response.status() == RaftResponse.Status.OK) { if (response.status() == RaftResponse.Status.OK) {
Expand All @@ -316,32 +309,32 @@ private synchronized void keepAliveSessions(long timeout, boolean retryOnFailure
session.setState(RaftProxy.State.CLOSED); session.setState(RaftProxy.State.CLOSED);
} }
} }
scheduleKeepAlive(timeout, delta); scheduleKeepAlive(System.currentTimeMillis(), sessionTimeout, delta);
} }
// If a leader is still set in the address selector, unset the leader and attempt to send another keep-alive. // If the timeout has not been passed, attempt to keep the session alive again with no delay.
// This will ensure that the address selector selects all servers without filtering on the leader. // We will continue to retry until the session expiration has passed.
else if (retryOnFailure && connection.leader() != null) { else if (System.currentTimeMillis() - lastKeepAliveTime < sessionTimeout) {
selectorManager.resetAll(null, connection.servers()); selectorManager.resetAll(null, connection.servers());
keepAliveSessions(timeout, false); keepAliveSessions(lastKeepAliveTime, sessionTimeout);
} }
// If no leader was set, set the session state to unstable and schedule another keep-alive. // If no leader was set, set the session state to unstable and schedule another keep-alive.
else { else {
needKeepAlive.forEach(s -> s.setState(RaftProxy.State.SUSPENDED)); needKeepAlive.forEach(s -> s.setState(RaftProxy.State.SUSPENDED));
selectorManager.resetAll(); selectorManager.resetAll();
scheduleKeepAlive(timeout, delta); scheduleKeepAlive(lastKeepAliveTime, sessionTimeout, delta);
} }
} }
// If a leader is still set in the address selector, unset the leader and attempt to send another keep-alive. // If the timeout has not been passed, reset the connection and attempt to keep the session alive
// This will ensure that the address selector selects all servers without filtering on the leader. // again with no delay.
else if (retryOnFailure && connection.leader() != null) { else if (System.currentTimeMillis() - lastKeepAliveTime < sessionTimeout && connection.leader() != null) {
selectorManager.resetAll(null, connection.servers()); selectorManager.resetAll(null, connection.servers());
keepAliveSessions(timeout, false); keepAliveSessions(lastKeepAliveTime, sessionTimeout);
} }
// If no leader was set, set the session state to unstable and schedule another keep-alive. // If no leader was set, set the session state to unstable and schedule another keep-alive.
else { else {
needKeepAlive.forEach(s -> s.setState(RaftProxy.State.SUSPENDED)); needKeepAlive.forEach(s -> s.setState(RaftProxy.State.SUSPENDED));
selectorManager.resetAll(); selectorManager.resetAll();
scheduleKeepAlive(timeout, delta); scheduleKeepAlive(lastKeepAliveTime, sessionTimeout, delta);
} }
} }
}); });
Expand All @@ -350,7 +343,7 @@ else if (retryOnFailure && connection.leader() != null) {
/** /**
* Schedules a keep-alive request. * Schedules a keep-alive request.
*/ */
private synchronized void scheduleKeepAlive(long timeout, long delta) { private synchronized void scheduleKeepAlive(long lastKeepAliveTime, long timeout, long delta) {
ScheduledFuture<?> keepAliveFuture = keepAliveFutures.remove(timeout); ScheduledFuture<?> keepAliveFuture = keepAliveFutures.remove(timeout);
if (keepAliveFuture != null) { if (keepAliveFuture != null) {
keepAliveFuture.cancel(false); keepAliveFuture.cancel(false);
Expand All @@ -359,7 +352,7 @@ private synchronized void scheduleKeepAlive(long timeout, long delta) {
// Schedule the keep alive for 3/4 the timeout minus the delta from the last keep-alive request. // Schedule the keep alive for 3/4 the timeout minus the delta from the last keep-alive request.
keepAliveFutures.put(timeout, threadPoolExecutor.schedule(() -> { keepAliveFutures.put(timeout, threadPoolExecutor.schedule(() -> {
if (open.get()) { if (open.get()) {
keepAliveSessions(timeout); keepAliveSessions(lastKeepAliveTime, timeout);
} }
}, Math.max(Math.max((long)(timeout * .75) - delta, timeout - 2500 - delta), 0), TimeUnit.MILLISECONDS)); }, Math.max(Math.max((long)(timeout * .75) - delta, timeout - 2500 - delta), 0), TimeUnit.MILLISECONDS));
} }
Expand Down

0 comments on commit 96f6ce5

Please sign in to comment.