Skip to content

Commit

Permalink
qos-adjuster: add waiting queue and state; polling
Browse files Browse the repository at this point in the history
Motivation:

In the current code committed for the QoS Adjuster service,
staging is throttled in that a waiting staging request
continues to occupy a running slot (which are limited
to 200), and acts synchronously by doing a get() on
the ListenableFuture returned from the PinManager.

This, however, will not play well with the future
incorporation of the restore optimization scheduling.

What we need is to allow the request to change to
a WAITING state and not to hold onto the execution
thread, allowing for greater throughput.

Modification:

A new state, WAITING, occurs when the staging adjuster
receives its reply from PinManager.  It is then
placed on the WAITING queue, liberating its
RUNNING slot for other adjuster tasks.

Waiting adjuster tasks are 'polled' (which means
doing future.isDone() on their futures and
triggering the completion handler is this is true)
by the main adjuster map thread just before it checks
the running tasks for completion (once a minute by
default).

Result:

The adjuster can now accept an arbitrary number
(governed only by memory limitations) of concurrently
WAITING tasks, allowing all such staging requests
to pass on through to the restore scheduling
(presumably Pin Manager).

Note that the QoSVerifier (yet to be reviewed)
has similarly been modified to fit this model.

Target: master
Patch: https://rb.dcache.org/r/13074/
Acked-by: Tigran
  • Loading branch information
alrossi committed Jun 17, 2021
1 parent e5bbab7 commit a5f691f
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,13 @@ public void adjustQoS(QoSAdjusterTask task) {
runAdjuster(task);
}

/**
* If the adjuster is asynchronous (and the task is in a WAITING state) it
* should call isDone on the Future and then call the completion handler if true.
*/
public void poll() {
/* NOP here. */
}

protected abstract void runAdjuster(QoSAdjusterTask task);
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ protected void runAdjuster(QoSAdjusterTask task) {
attributes = task.getAttributes();
executorService.submit(() -> {
handleStaging();
waitForStaging();
task.setToWaiting();
});
}

Expand All @@ -128,44 +128,15 @@ public synchronized void cancel(String explanation) {
}

/**
* Called when there are no available replicas, but the file can be retrieved from an HSM.
* Issues a request.>
* Polled by main map thread. If the future is done, this will trigger the
* completion handler.
*/
private synchronized void handleStaging() {
LOGGER.debug("handleStaging {}, pool group {}.", pnfsId, poolGroup);
try {
ACTIVITY_LOGGER.info("Staging {}", pnfsId);
PinManagerPinMessage message = new PinManagerPinMessage(attributes,
getProtocolInfo(),
QOS_PIN_REQUEST_ID,
QOS_PIN_TEMP_LIFETIME);
future = pinManager.send(message, Long.MAX_VALUE);
LOGGER.debug("handleStaging, sent pin manager request for {}.", pnfsId);
} catch (URISyntaxException e) {
completionHandler.taskFailed(pnfsId, Optional.empty(),
CacheExceptionUtils.getCacheException(CacheException.INVALID_ARGS,
"could not construct HTTP protocol: %s.",
pnfsId,
action,
e.getMessage(),
null));
}
}

private void cancelPin() {
LOGGER.debug("handleStaging, cancelling pin {}.", pnfsId);
ACTIVITY_LOGGER.info("handleStaging, cancelling pin {}", pnfsId);
PinManagerUnpinMessage message = new PinManagerUnpinMessage(pnfsId);
pinManager.send(message, Long.MAX_VALUE);
LOGGER.debug("handleStaging, sent pin manager request to unpin {}.", pnfsId);
}

private void waitForStaging() {
public void poll() {
synchronized (this) {
if (future == null) {
completionHandler.taskFailed(pnfsId, Optional.empty(),
new CacheException(CacheException.SERVICE_UNAVAILABLE,
"no future returned by message send."));
new CacheException(CacheException.SERVICE_UNAVAILABLE,
"no future returned by message send."));
return;
}
}
Expand All @@ -174,7 +145,11 @@ private void waitForStaging() {
Object error = null;

try {
LOGGER.debug("handleStaging, waiting for pin request future for {}.", pnfsId);
LOGGER.debug("poll, checking pin request future.isDone() for {}.", pnfsId);
if (!future.isDone()) {
return;
}

migrationReply = getUninterruptibly(future);
if (migrationReply.getReturnCode() != 0) {
error = migrationReply.getErrorObject();
Expand All @@ -187,7 +162,7 @@ private void waitForStaging() {
error = e.getCause();
}

LOGGER.debug("handleStaging, calling completion handler for {}.", pnfsId);
LOGGER.debug("poll, calling completion handler for {}.", pnfsId);
String target = migrationReply.getPool();

if (error == null) {
Expand All @@ -200,4 +175,38 @@ private void waitForStaging() {
new CacheException(String.valueOf(error)));
}
}

/**
* Called when there are no available replicas, but the file can be retrieved from an HSM.
* Issues a request.>
*/
private synchronized void handleStaging() {
LOGGER.debug("handleStaging {}, pool group {}.", pnfsId, poolGroup);
try {
ACTIVITY_LOGGER.info("Staging {}", pnfsId);
PinManagerPinMessage message = new PinManagerPinMessage(attributes,
getProtocolInfo(),
QOS_PIN_REQUEST_ID,
QOS_PIN_TEMP_LIFETIME);
future = pinManager.send(message, Long.MAX_VALUE);

LOGGER.debug("handleStaging, sent pin manager request for {}.", pnfsId);
} catch (URISyntaxException e) {
completionHandler.taskFailed(pnfsId, Optional.empty(),
CacheExceptionUtils.getCacheException(CacheException.INVALID_ARGS,
"could not construct HTTP protocol: %s.",
pnfsId,
action,
e.getMessage(),
null));
}
}

private void cancelPin() {
LOGGER.debug("handleStaging, cancelling pin {}.", pnfsId);
ACTIVITY_LOGGER.info("handleStaging, cancelling pin {}", pnfsId);
PinManagerUnpinMessage message = new PinManagerUnpinMessage(pnfsId);
pinManager.send(message, Long.MAX_VALUE);
LOGGER.debug("handleStaging, sent pin manager request to unpin {}.", pnfsId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ abstract class FilteredAdjusterTaskCommand extends InitializerAwareCommand {
protected QoSAction action;

@Option(name = "state",
valueSpec = "INITIALIZED|RUNNING|CANCELLED|DONE",
valueSpec = "INITIALIZED|RUNNING|WAITING|CANCELLED|DONE",
separator = ",",
usage = "Match only operations for files matching this comma-delimited set of states; "
+ "default is RUNNING.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public final class QoSAdjusterTaskMap extends RunnableModule implements CellInfo
private final Map<String, QoSAdjusterTask> index = new ConcurrentHashMap<>();
private final Deque<QoSAdjusterTask> runningQueue = new LinkedBlockingDeque<>();
private final Deque<QoSAdjusterTask> readyQueue = new LinkedBlockingDeque<>();
private final Deque<QoSAdjusterTask> waitingQueue = new LinkedBlockingDeque<>();
private final AtomicLong signalled = new AtomicLong(0L);

private QoSAdjusterFactory factory;
Expand All @@ -126,7 +127,7 @@ public final class QoSAdjusterTaskMap extends RunnableModule implements CellInfo

/*
* Note that this throttle is necessary. Staging adjustments and migration tasks
* relinguish the thread, so we cannot rely on the thread pool to put a barrier
* relinquish the thread, so we cannot rely on the thread pool to put a barrier
* on the number of concurrent (or "waiting") jobs.
*/
private int maxRunning = 200;
Expand Down Expand Up @@ -266,7 +267,8 @@ public void register(QoSAdjustmentRequest request) {
}

/**
* The consumer thread. When notified or times out, runs scan.
* The consumer thread. When notified or times out, polls any waiting tasks and
* then runs scan.
* <p/>
* Note that since the scan takes place outside of the monitor, the
* signals sent by various update methods will not be caught before
Expand All @@ -282,6 +284,8 @@ public void run() {
signalled.set(0);
long start = System.currentTimeMillis();

pollWaiting();

scan();

long end = System.currentTimeMillis();
Expand Down Expand Up @@ -320,15 +324,30 @@ public void scan() {
try {
final List<QoSAdjusterTask> toRemove = new ArrayList<>();
/*
* Check both queues in case of cancellation.
* Check all queues in case of cancellation.
* Also move tasks off running queue to waiting queue
* that have gone to the WAITING state.
*/
runningQueue.stream().filter(QoSAdjusterTask::isDone)
runningQueue.forEach(t -> {
if (t.isDone()) {
toRemove.add(t);
handleTerminatedTask(t);
} else if (t.isWaiting()) {
toRemove.add(t);
waitingQueue.add(t);
}
});

toRemove.forEach(runningQueue::remove);
toRemove.clear();

waitingQueue.stream().filter(QoSAdjusterTask::isDone)
.forEach(t -> {
toRemove.add(t);
handleTerminatedTask(t);
});

toRemove.stream().forEach(runningQueue::remove);
toRemove.forEach(waitingQueue::remove);
toRemove.clear();

readyQueue.stream().filter(QoSAdjusterTask::isDone)
Expand All @@ -337,7 +356,7 @@ public void scan() {
handleTerminatedTask(t);
});

toRemove.stream().forEach(readyQueue::remove);
toRemove.forEach(readyQueue::remove);
toRemove.clear();

int available = maxRunning - runningQueue.size();
Expand Down Expand Up @@ -502,6 +521,15 @@ private void handleTerminatedTask(QoSAdjusterTask task) {
}
}

private void pollWaiting() {
read.lock();
try {
waitingQueue.stream().forEach(QoSAdjusterTask::poll);
} finally {
read.unlock();
}
}

private void submit(QoSAdjusterTask task) {
task.setFuture(executorService.submit(task.toFireAndForgetTask()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public final class QoSAdjusterTask extends ErrorAwareTask implements Cancellable
private long endTime;

enum Status {
INITIALIZED, RUNNING, CANCELLED, DONE
INITIALIZED, RUNNING, WAITING, CANCELLED, DONE
}

private Status status;
Expand Down Expand Up @@ -239,10 +239,16 @@ public synchronized boolean isRunning() {
return status == Status.RUNNING;
}

public synchronized boolean isWaiting() { return status == Status.WAITING; }

public synchronized boolean isDone() {
return status == Status.DONE || status == Status.CANCELLED;
}

public synchronized void poll() {
adjuster.poll();
}

public void relayMessage(PoolMigrationCopyFinishedMessage message) {
if (!message.getPnfsId().equals(pnfsId)) {
return;
Expand All @@ -261,6 +267,10 @@ public synchronized void setFuture(Future future) {
this.future = future;
}

public synchronized void setToWaiting() {
status = Status.WAITING;
}

public synchronized void taskTerminated(Optional<String> target, Exception exception) {
if (target.isPresent()) {
this.target = target.get();
Expand Down

0 comments on commit a5f691f

Please sign in to comment.