Skip to content

Commit

Permalink
Use failure detection for timing out messages.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Dec 6, 2017
1 parent 812661c commit 100efcd
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 33 deletions.
Expand Up @@ -150,13 +150,13 @@ public ManagedMessagingService build() {
}
}

private static final long DEFAULT_TIMEOUT_MILLIS = 1000;
private static final long HISTORY_EXPIRE_MILLIS = Duration.ofMinutes(1).toMillis();
private static final long MIN_TIMEOUT_MILLIS = 100;
private static final long MAX_TIMEOUT_MILLIS = 5000;
private static final long MAX_TIMEOUT_MILLIS = 15000;
private static final long TIMEOUT_INTERVAL = 50;
private static final int WINDOW_SIZE = 100;
private static final double TIMEOUT_MULTIPLIER = 2.5;
private static final int MIN_SAMPLES = 25;
private static final double PHI_FACTOR = 1.0 / Math.log(10.0);
private static final int PHI_FAILURE_THRESHOLD = 5;
private static final int CHANNEL_POOL_SIZE = 8;

private static final byte[] EMPTY_PAYLOAD = new byte[0];
Expand Down Expand Up @@ -279,7 +279,7 @@ private void logKeyStore(KeyStore ks, String ksLocation, char[] ksPwd) {
if (log.isInfoEnabled()) {
log.info("Loaded cluster key store from: {}", ksLocation);
try {
for (Enumeration<String> e = ks.aliases(); e.hasMoreElements();) {
for (Enumeration<String> e = ks.aliases(); e.hasMoreElements(); ) {
String alias = e.nextElement();
Key key = ks.getKey(alias, ksPwd);
Certificate[] certs = ks.getCertificateChain(alias);
Expand Down Expand Up @@ -706,7 +706,6 @@ public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
*
* @param msg inbound message
* @return true if {@code msg} is {@link InternalMessage} instance.
*
* @see SimpleChannelInboundHandler#acceptInboundMessage(Object)
*/
@Override
Expand Down Expand Up @@ -774,7 +773,7 @@ private interface ServerConnection {
* Sends a reply to the other side of the connection.
*
* @param message the message to which to reply
* @param status the reply status
* @param status the reply status
* @param payload the response payload
*/
void reply(InternalRequest message, InternalReply.Status status, Optional<byte[]> payload);
Expand Down Expand Up @@ -851,7 +850,7 @@ private final class RemoteClientConnection implements ClientConnection {
private final Channel channel;
private final Map<Long, Callback> futures = Maps.newConcurrentMap();
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Cache<String, TimeoutHistory> timeoutHistories = CacheBuilder.newBuilder()
private final Cache<String, RequestMonitor> requestMonitors = CacheBuilder.newBuilder()
.expireAfterAccess(HISTORY_EXPIRE_MILLIS, TimeUnit.MILLISECONDS)
.build();

Expand All @@ -872,24 +871,18 @@ private void timeoutCallbacks() {
while (iterator.hasNext()) {
Callback callback = iterator.next().getValue();
try {
TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
long currentTimeout = timeoutHistory.currentTimeout;
if (currentTime - callback.time > currentTimeout) {
RequestMonitor requestMonitor = requestMonitors.get(callback.type, RequestMonitor::new);
long elapsedTime = currentTime - callback.time;
if (elapsedTime > MAX_TIMEOUT_MILLIS || requestMonitor.isTimedOut(elapsedTime)) {
iterator.remove();
long elapsedTime = currentTime - callback.time;
timeoutHistory.addReplyTime(elapsedTime);
requestMonitor.addReplyTime(elapsedTime);
callback.completeExceptionally(
new TimeoutException("Request type " + callback.type + " timed out in " + elapsedTime + " milliseconds"));
new TimeoutException("Request timed out in " + elapsedTime + " milliseconds"));
}
} catch (ExecutionException e) {
throw new AssertionError();
}
}

// Iterate through all timeout histories and recompute the timeout.
for (TimeoutHistory timeoutHistory : timeoutHistories.asMap().values()) {
timeoutHistory.recomputeTimeoutMillis();
}
}

@Override
Expand Down Expand Up @@ -943,8 +936,8 @@ private void dispatch(InternalReply message) {
}

try {
TimeoutHistory timeoutHistory = timeoutHistories.get(callback.type, TimeoutHistory::new);
timeoutHistory.addReplyTime(System.currentTimeMillis() - callback.time);
RequestMonitor requestMonitor = requestMonitors.get(callback.type, RequestMonitor::new);
requestMonitor.addReplyTime(System.currentTimeMillis() - callback.time);
} catch (ExecutionException e) {
throw new AssertionError();
}
Expand Down Expand Up @@ -1011,30 +1004,50 @@ public void reply(InternalRequest message, InternalReply.Status status, Optional
/**
* Request-reply timeout history tracker.
*/
private static final class TimeoutHistory {
private final DescriptiveStatistics timeoutHistory = new SynchronizedDescriptiveStatistics(WINDOW_SIZE);
private final AtomicLong maxReplyTime = new AtomicLong();
private volatile long currentTimeout = DEFAULT_TIMEOUT_MILLIS;
private static final class RequestMonitor {
private final DescriptiveStatistics samples = new SynchronizedDescriptiveStatistics(WINDOW_SIZE);

/**
* Adds a reply time to the history.
*
* @param replyTime the reply time to add to the history
*/
void addReplyTime(long replyTime) {
maxReplyTime.getAndAccumulate(replyTime, Math::max);
samples.addValue(replyTime);
}

/**
* Returns a boolean indicating whether the given request should be timed out according to the elapsed time.
*
* @param elapsedTime the elapsed request time
* @return indicates whether the request should be timed out
*/
boolean isTimedOut(long elapsedTime) {
return phi(elapsedTime) >= PHI_FAILURE_THRESHOLD;
}

/**
* Computes the current timeout.
* Compute phi for the specified node id.
*
* @param elapsedTime the duration since the request was sent
* @return phi value
*/
private void recomputeTimeoutMillis() {
double nextTimeout = maxReplyTime.getAndSet(0) * TIMEOUT_MULTIPLIER;
timeoutHistory.addValue(
Math.min(Math.max(nextTimeout, MIN_TIMEOUT_MILLIS), MAX_TIMEOUT_MILLIS));
if (timeoutHistory.getN() == WINDOW_SIZE) {
this.currentTimeout = (long) timeoutHistory.getMax();
private double phi(long elapsedTime) {
if (samples.getN() < MIN_SAMPLES) {
return 0.0;
}
return computePhi(samples, elapsedTime);
}

/**
* Computes the phi value from the given samples.
*
* @param samples the samples from which to compute phi
* @param elapsedTime the duration since the request was sent
* @return phi
*/
private double computePhi(DescriptiveStatistics samples, long elapsedTime) {
return (samples.getN() > 0) ? PHI_FACTOR * elapsedTime / samples.getMean() : 100;
}
}
}
Empty file.

0 comments on commit 100efcd

Please sign in to comment.