Skip to content

Commit

Permalink
cells: Add fail fast behaviour for busy pools
Browse files Browse the repository at this point in the history
Motivation:

The idea is taken from the "Release It!" book. We already have support for
discarding requests when their TTL is about to be exceeded. This allows a cell
to recover more quickly/degrade more gracefully if it starts queuing requests,
but it doesn't help the requesting cell as it still has to wait for the request
to time out before it can try something else.

The book suggests a fail fast addition in which a service rejects a request if
based on the current request processing times it is unlikely that we can
process the request before it expires.

Modification:

Tracks the time the last dequeued request spent in the request queue. Upon
receiving a new request, if its TTL is smaller than the last queue time, we
immediately reply to the requesting cell with an error indicating that the
cell is busy.

In case the queue is empty the estimated queue time is 0.

The code avoids adding extra synchroniztion points beyond the access of a new
volatile field. The price is that updates of the last queue time is subject
to some race conditions in which the time used is not strictly the age of
the last message dequeued (it could be that of a message that got dequeued
simultaneously).

Result:

One will observe that cells a more inclined to fast failure rather than
to hang and time out. Depending on the cell, it may choose to retry with
another cell, e.g. a door could repeat pool selection if the pool states
that it is busy.

Target: trunk
Require-notes: yes
Require-book: yes
Acked-by: Tigran Mkrtchyan <tigran.mkrtchyan@desy.de>
Patch: https://rb.dcache.org/r/8787/
  • Loading branch information
gbehrmann committed Nov 20, 2015
1 parent 46ff75b commit e1fc107
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 8 deletions.
12 changes: 12 additions & 0 deletions modules/cells/src/main/java/dmg/cells/nucleus/CellInfo.java
Expand Up @@ -22,6 +22,7 @@ public class CellInfo implements Serializable {
private int _state;
private int _eventQueueSize;
private int _threadCount;
private long _expectedQueueTime;
private CellVersion _version = new CellVersion() ;

private static final String [] _stateNames =
Expand All @@ -37,6 +38,7 @@ public CellInfo( CellInfo info ){
_privateInfo = info._privateInfo ;
_state = info._state ;
_eventQueueSize = info._eventQueueSize ;
_expectedQueueTime = info._expectedQueueTime;
_threadCount = info._threadCount ;
_version = info._version ;
}
Expand All @@ -50,11 +52,20 @@ public CellInfo( CellInfo info ){
public void setPrivateInfo( String info ){ _privateInfo = info ; }
public void setShortInfo( String info ){ _shortInfo = info ; }
public void setEventQueueSize( int size ){ _eventQueueSize = size ; }
public void setExpectedQueueTime(long millis) { _expectedQueueTime = millis; }
public void setThreadCount( int threadCount ){ _threadCount = threadCount ; }
public void setState( int state ){
_state = ( state < 0 ) || ( _state >= _stateNames.length ) ?
_stateNames.length : state ;
}

public int getState() {
return _state;
}

public String getStateName() {
return _stateNames[_state];
}
//
// and now the public getter's
//
Expand All @@ -69,6 +80,7 @@ public String toString(){
public CellVersion getCellVersion(){ return _version ; }
public String getPrivatInfo(){ return _privateInfo ; }
public int getEventQueueSize(){ return _eventQueueSize ; }
public long getExpectedQueueTime(){ return _expectedQueueTime; }
public String getCellName(){ return _cellName ; }
public String getCellType(){ return _cellType ; }
public String getCellClass(){ return _cellClass ; }
Expand Down
24 changes: 23 additions & 1 deletion modules/cells/src/main/java/dmg/cells/nucleus/CellNucleus.java
Expand Up @@ -96,6 +96,7 @@ public class CellNucleus implements ThreadFactory
private Pinboard _pinboard;
private FilterThresholdSet _loggingThresholds;
private final Queue<Runnable> _deferredTasks = Queues.synchronizedQueue(new ArrayDeque<>());
private volatile long _lastQueueTime;

public CellNucleus(Cell cell, String name, String type, Executor executor)
{
Expand Down Expand Up @@ -260,7 +261,9 @@ CellInfo _getCellInfo() {
}
info.setCellClass(_cellClass);
try {
info.setEventQueueSize(getEventQueueSize());
int eventQueueSize = getEventQueueSize();
info.setEventQueueSize(eventQueueSize);
info.setExpectedQueueTime((eventQueueSize == 0) ? 0 : _lastQueueTime);
info.setState(_state.get());
info.setThreadCount(_threads.activeCount());
} catch(Exception e) {
Expand Down Expand Up @@ -817,6 +820,24 @@ void addToEventQueue(MessageEvent ce)
LOGGER.error("Dropping reply: {}", e.getMessage());
}
} else {
/* Fail fast for requests if the cell is busy. We consider the cell busy
* if the last queue time exceeds the TTL of the request.
*/
if (_eventQueueSize.get() == 0) {
_lastQueueTime = 0;
} else if (!msg.isReply()) {
long queueTime = _lastQueueTime;
if (msg.getTtl() < queueTime) {
msg.setMessageObject(
new NoRouteToCellException(msg, getCellName() + "@" + getCellDomainName() +
" is busy (its estimated response time of " +
queueTime + " ms is longer than the message TTL of " +
msg.getTtl() + " ms)."));
msg.revertDirection();
sendMessage(msg, true, true);
}
}

try {
EventLogger.queueBegin(ce);
_eventQueueSize.incrementAndGet();
Expand Down Expand Up @@ -1015,6 +1036,7 @@ public void run()
try (CDC ignored = CDC.reset(CellNucleus.this)) {
try {
EventLogger.queueEnd(_event);
_lastQueueTime = _event.getMessage().getLocalAge();
_eventQueueSize.decrementAndGet();

if (_event instanceof RoutedMessageEvent) {
Expand Down
16 changes: 9 additions & 7 deletions modules/cells/src/main/java/dmg/cells/nucleus/CellShell.java
Expand Up @@ -700,15 +700,17 @@ public String call()
continue;
}
if (full) {
sb.append(" -- Short Info about Cell ").append(aCellName)
.append(" --\n");
sb.append(info.toString()).append("\n");
sb.append("-- Info --\n");
sb.append(" Cell : ").append(info.getCellName()).append('@').append(info.getDomainName()).append('\n');
sb.append(" Class : ").append(info.getCellClass()).append('\n');
sb.append(" State : ").append(info.getStateName()).append('\n');
sb.append(" Queue length : ").append(info.getEventQueueSize()).append('\n');
sb.append(" Queue time : ").append(info.getExpectedQueueTime()).append(" ms \n");
CellVersion version = info.getCellVersion();
if (version != null) {
sb.append(" -- Version : ").append(version.toString())
.append("\n");
sb.append(" Version : ").append(version).append("\n");
}
sb.append(" -- Threads --\n");
sb.append("-- Threads --\n");
Thread[] threads = _nucleus.getThreads(aCellName);
for (int j = 0;
(j < threads.length) && (threads[j] != null); j++) {
Expand All @@ -719,7 +721,7 @@ public String call()
.append(isAlive ? " Alive" : " Dead")
.append("\n");
}
sb.append(" -- Private Infos --\n");
sb.append("-- Private Infos --\n");
}
sb.append(info.getPrivatInfo()).append("\n");
}
Expand Down

0 comments on commit e1fc107

Please sign in to comment.