Skip to content

Commit

Permalink
pool: bypass IO queue if there is a slot available
Browse files Browse the repository at this point in the history
Acked-by: Gerd Behrmann
Target: master
Require-book: no
Require-notes: no
  • Loading branch information
kofemann committed Dec 2, 2013
1 parent 3caea80 commit 3834456
Showing 1 changed file with 53 additions and 51 deletions.
Expand Up @@ -126,8 +126,17 @@ public synchronized int add(Mover<?> mover, IoPriority priority) {
_log.warn("A task was added to queue '{}', however the queue is not configured to execute any tasks.", _name);
}

PrioritizedRequest wrapper = new PrioritizedRequest(id, mover, priority);
_queue.add(wrapper);
final PrioritizedRequest wrapper = new PrioritizedRequest(id, mover, priority);

if (_semaphore.tryAcquire()) {
/*
* there is a free slot in the queue - use it!
*/
sendToExecution(wrapper);
} else {
_queue.add(wrapper);
}

_jobs.put(id, wrapper);

return id;
Expand Down Expand Up @@ -284,65 +293,58 @@ public void run() {
_semaphore.acquire();
try {
final PrioritizedRequest request = _queue.take();
request.getCdc().restore();
request.transfer(
new CompletionHandler<Void,Void>()
{
@Override
public void completed(Void result, Void attachment)
{
postprocess();
}

@Override
public void failed(Throwable exc, Void attachment)
{
if (exc instanceof InterruptedException || exc instanceof InterruptedIOException) {
request.getMover().setTransferStatus(CacheException.DEFAULT_ERROR_CODE, "Transfer was killed");
}
postprocess();
}

private void postprocess()
{
request.getMover().postprocess(
new CompletionHandler<Void, Void>()
{
@Override
public void completed(Void result,
Void attachment)
{
release();
}

@Override
public void failed(Throwable exc,
Void attachment)
{
release();
}

private void release()
{
request.done();
_jobs.remove(request.getId());
_semaphore.release();
}
});
}
});
sendToExecution(request);
} catch (RuntimeException | Error | InterruptedException e) {
_semaphore.release();
throw e;
} finally {
CDC.clear();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

private void sendToExecution(final PrioritizedRequest request) {
try (CDC ignore = request.getCdc().restore()) {
request.transfer(
new CompletionHandler<Void, Void>() {
@Override
public void completed(Void result, Void attachment) {
postprocess();
}

@Override
public void failed(Throwable exc, Void attachment) {
if (exc instanceof InterruptedException || exc instanceof InterruptedIOException) {
request.getMover().setTransferStatus(CacheException.DEFAULT_ERROR_CODE, "Transfer was killed");
}
postprocess();
}

private void postprocess() {
request.getMover().postprocess(
new CompletionHandler<Void, Void>() {
@Override
public void completed(Void result, Void attachment) {
release();
}

@Override
public void failed(Throwable exc, Void attachment) {
release();
}

private void release() {
request.done();
_jobs.remove(request.getId());
_semaphore.release();
}
});
}
});
}
}

private static class PrioritizedRequest implements IoPrioritizable {
private final Mover<?> _mover;
private final IoPriority _priority;
Expand Down

0 comments on commit 3834456

Please sign in to comment.