diff --git a/modules/dcache/src/main/java/org/dcache/pool/classic/SimpleIoScheduler.java b/modules/dcache/src/main/java/org/dcache/pool/classic/SimpleIoScheduler.java index 1c792288c6d..16d18abfc75 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/classic/SimpleIoScheduler.java +++ b/modules/dcache/src/main/java/org/dcache/pool/classic/SimpleIoScheduler.java @@ -126,8 +126,17 @@ public synchronized int add(Mover mover, IoPriority priority) { _log.warn("A task was added to queue '{}', however the queue is not configured to execute any tasks.", _name); } - PrioritizedRequest wrapper = new PrioritizedRequest(id, mover, priority); - _queue.add(wrapper); + final PrioritizedRequest wrapper = new PrioritizedRequest(id, mover, priority); + + if (_semaphore.tryAcquire()) { + /* + * there is a free slot in the queue - use it! + */ + sendToExecution(wrapper); + } else { + _queue.add(wrapper); + } + _jobs.put(id, wrapper); return id; @@ -284,58 +293,10 @@ public void run() { _semaphore.acquire(); try { final PrioritizedRequest request = _queue.take(); - request.getCdc().restore(); - request.transfer( - new CompletionHandler() - { - @Override - public void completed(Void result, Void attachment) - { - postprocess(); - } - - @Override - public void failed(Throwable exc, Void attachment) - { - if (exc instanceof InterruptedException || exc instanceof InterruptedIOException) { - request.getMover().setTransferStatus(CacheException.DEFAULT_ERROR_CODE, "Transfer was killed"); - } - postprocess(); - } - - private void postprocess() - { - request.getMover().postprocess( - new CompletionHandler() - { - @Override - public void completed(Void result, - Void attachment) - { - release(); - } - - @Override - public void failed(Throwable exc, - Void attachment) - { - release(); - } - - private void release() - { - request.done(); - _jobs.remove(request.getId()); - _semaphore.release(); - } - }); - } - }); + sendToExecution(request); } catch (RuntimeException | Error | InterruptedException e) { _semaphore.release(); throw e; - } finally { - CDC.clear(); } } } catch (InterruptedException e) { @@ -343,6 +304,47 @@ private void release() } } + private void sendToExecution(final PrioritizedRequest request) { + try (CDC ignore = request.getCdc().restore()) { + request.transfer( + new CompletionHandler() { + @Override + public void completed(Void result, Void attachment) { + postprocess(); + } + + @Override + public void failed(Throwable exc, Void attachment) { + if (exc instanceof InterruptedException || exc instanceof InterruptedIOException) { + request.getMover().setTransferStatus(CacheException.DEFAULT_ERROR_CODE, "Transfer was killed"); + } + postprocess(); + } + + private void postprocess() { + request.getMover().postprocess( + new CompletionHandler() { + @Override + public void completed(Void result, Void attachment) { + release(); + } + + @Override + public void failed(Throwable exc, Void attachment) { + release(); + } + + private void release() { + request.done(); + _jobs.remove(request.getId()); + _semaphore.release(); + } + }); + } + }); + } + } + private static class PrioritizedRequest implements IoPrioritizable { private final Mover _mover; private final IoPriority _priority;