Skip to content

Commit

Permalink
cells: reduce latency when notifying caller than a message has timed out
Browse files Browse the repository at this point in the history
Motivation:

Cell message delivery is unreliable and the target of a cell message may
be busy (so, delaying its response).  Cell messages may be sent with a
timout to handle these cases.

Timeouts have two main benefits:

First, it allows the message receiver to know when a response is no
longer needed.  Knowing this, the receiver can discard messages for
which the response is no longer needed.

Second, it provides a framework through which the cell that sends a
message is notified if the response is taking too long.

Currently, handling timed-out messages is done lazily, once every twenty
seconds.  This introduces a not insignificant latency: where the sender
is notified of the message timeout between zero and twenty seconds after
the message's actual timeout.

There are situations where a door is expected to respond within a
certain time-frame, and failing to do so could result in the client
considering the door as having "frozen".  Under these circumstances,
introducing such a latency is unacceptable.

Modification:

Provide a framework through which the callbacks of timed-out messages
are processed which greatly reduced latency.

A priority queue is used to keep a list of callbacks, ordered by when
they will time-out.

The code ensures that, when there is at least one callback, there is
also a task scheduled to run when the earliest callback will expire.

Note that this task uses the background-processing thread, which it
shares with other activity (in particular, updating information about
cell-routing), which may introduce a very modest delay.

If an expired callback could not be processed, it is retried with a
twenty second delay.  This is equivalent to the current behaviour.

Result:

dCache provides much more accurate timeout behaviour should it suffer
message loss or cells are overloaded.

This patch partially addresses #5642

Target: master
Requires-notes: yes
Requires-book: no
Patch: https://rb.dcache.org/r/12754/
Acked-by: Tigran Mkrtchyan
  • Loading branch information
paulmillar committed Jan 15, 2021
1 parent 3e99233 commit cbbe2ce
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 6 deletions.
6 changes: 6 additions & 0 deletions modules/cells/src/main/java/dmg/cells/nucleus/CellLock.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ public CellLock(CellMessage msg, CellMessageAnswerable callback,
_message = msg;
}

public CellLock withDelayedTimeout(long delay)
{
return new CellLock(_message, _callback, _executor,
addWithInfinity(_timeout, delay));
}

public CellMessageAnswerable getCallback() {
return _callback;
}
Expand Down
105 changes: 99 additions & 6 deletions modules/cells/src/main/java/dmg/cells/nucleus/CellNucleus.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.slf4j.MDC;

import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;

import java.io.FileNotFoundException;
import java.io.Reader;
Expand All @@ -26,6 +27,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
Expand All @@ -40,6 +43,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -53,11 +57,13 @@

import org.dcache.util.BoundedCachedExecutor;
import org.dcache.util.BoundedExecutor;
import org.dcache.util.FireAndForgetTask;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterables.consumingIterable;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.Comparator.comparingLong;
import static org.dcache.util.CompletableFutures.fromListenableFuture;
import static org.dcache.util.MathUtils.addWithInfinity;
import static org.dcache.util.MathUtils.subWithInfinity;
Expand Down Expand Up @@ -125,6 +131,11 @@ private enum State

private volatile State _state = State.NEW;

private ScheduledFuture _scheduledTimeoutRun;
private boolean _processingTimedOutCallbacks;

private final Queue<CellLock> _callbackExpiry
= new PriorityQueue<>(comparingLong(CellLock::getTimeout));
private final ConcurrentMap<UOID, CellLock> _waitHash = new ConcurrentHashMap<>();
private String _cellClass;
private String _cellSimpleClass;
Expand Down Expand Up @@ -485,11 +496,6 @@ public Map<UOID,CellLock > getWaitQueue()

private void executeMaintenanceTasks()
{
long now = System.currentTimeMillis();
_waitHash.entrySet().stream()
.filter(e -> e.getValue().getTimeout() < now)
.forEach(e -> timeOutMessage(e.getKey(), e.getValue(), this::reregisterCallback));

// Execute delayed tasks; since those tasks may themselves add new deferred
// tasks we limit the operation to the number of tasks we started out with
// to avoid an infinite loop.
Expand Down Expand Up @@ -549,6 +555,7 @@ public void sendMessage(CellMessage msg,
* to avoid a race with shutdown.
*/
_waitHash.put(uoid, lock);
addCallbackTimeout(lock);

if (!_state.areCallbacksGuaranteed) {
/* Cell is shutting down so timeout the message.
Expand All @@ -560,11 +567,13 @@ public void sendMessage(CellMessage msg,
try {
__cellGlue.sendMessage(msg, local, remote);
} catch (SerializationException e) {
removeCallbackTimeout(lock);
if (_waitHash.remove(uoid, lock)) {
EventLogger.sendEnd(msg);
}
throw e;
} catch (RuntimeException e) {
removeCallbackTimeout(lock);
if (_waitHash.remove(uoid, lock)) {
try {
executor.execute(() -> {
Expand Down Expand Up @@ -843,6 +852,7 @@ void addToEventQueue(MessageEvent ce)

CellLock lock = _waitHash.remove(msg.getLastUOID());
if (lock != null) {
removeCallbackTimeout(lock);
//
// we were waiting for you (sync or async)
//
Expand Down Expand Up @@ -949,6 +959,78 @@ private Void doStart() throws Exception
return null;
}

private void addCallbackTimeout(CellLock lock)
{
synchronized (_callbackExpiry) {
_callbackExpiry.add(lock);

if (!_processingTimedOutCallbacks) {
rescheduleCallbackTimeout();
}
}
}

private void removeCallbackTimeout(CellLock lock)
{
synchronized (_callbackExpiry) {
boolean callbackRemoved = _callbackExpiry.remove(lock);

if (callbackRemoved && !_processingTimedOutCallbacks) {
if (_callbackExpiry.isEmpty()) {
if (_scheduledTimeoutRun != null) {
_scheduledTimeoutRun.cancel(false);
_scheduledTimeoutRun = null;
}
} else {
rescheduleCallbackTimeout();
}
}
}
}

@GuardedBy("_callbackExpiry")
private void rescheduleCallbackTimeout()
{
CellLock earliestExpiringCallback = _callbackExpiry.peek();

if (earliestExpiringCallback != null) {
long delayUntilFirstTimeout = earliestExpiringCallback.getTimeout() - System.currentTimeMillis();

if (_scheduledTimeoutRun != null) {
long delayUntilNextTimeoutRun = _scheduledTimeoutRun.getDelay(TimeUnit.MILLISECONDS);

if (delayUntilNextTimeoutRun <= delayUntilFirstTimeout) {
return; // Do nothing: the next scheduled run is soon enough.
}

_scheduledTimeoutRun.cancel(false);
}

_scheduledTimeoutRun = _timer.schedule(new FireAndForgetTask(this::processTimedOutMessages),
delayUntilFirstTimeout, TimeUnit.MILLISECONDS);
}
}

private void processTimedOutMessages()
{
synchronized (_callbackExpiry) {
_processingTimedOutCallbacks = true;
try {
long now = System.currentTimeMillis();
_waitHash.entrySet().stream()
.filter(e -> e.getValue().getTimeout() < now)
.forEach(e -> {
_callbackExpiry.remove(e.getValue());
timeOutMessage(e.getKey(), e.getValue(), this::reregisterCallback);
});
} finally {
_processingTimedOutCallbacks = false;
_scheduledTimeoutRun = null;
rescheduleCallbackTimeout();
}
}
}

void shutdown(KillEvent event)
{
try (CDC ignored = CDC.reset(CellNucleus.this)) {
Expand Down Expand Up @@ -1001,6 +1083,13 @@ void shutdown(KillEvent event)
/* Cancel callbacks.
*/
_waitHash.forEach((uoid, lock) -> timeOutMessage(uoid, lock, (u, l) -> {}));
synchronized (_callbackExpiry) {
_callbackExpiry.clear();
if (_scheduledTimeoutRun != null) {
_scheduledTimeoutRun.cancel(false);
_scheduledTimeoutRun = null;
}
}

/* Shut down message executor.
*/
Expand Down Expand Up @@ -1050,12 +1139,16 @@ void shutdown(KillEvent event)
/**
* Registers a callback, considering that the cell may have already shut down.
*/
private void reregisterCallback(UOID uoid, CellLock lock)
private void reregisterCallback(UOID uoid, CellLock oldLock)
{
/* Schedule reprocessing some time in the future. */
CellLock lock = oldLock.withDelayedTimeout(20_000);

/* Ordering here is important - need to insert into waitHash before checking the state
* to avoid a race with shutdown.
*/
_waitHash.put(uoid, lock);
addCallbackTimeout(lock);

if (!_state.areCallbacksGuaranteed) {
/* The cell is shutting down so we time out the message right away.
Expand Down

0 comments on commit cbbe2ce

Please sign in to comment.