diff --git a/modules/dcache/src/main/java/org/dcache/pool/classic/IoQueueManager.java b/modules/dcache/src/main/java/org/dcache/pool/classic/IoQueueManager.java index 328281450c6..50b67840abd 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/classic/IoQueueManager.java +++ b/modules/dcache/src/main/java/org/dcache/pool/classic/IoQueueManager.java @@ -137,7 +137,7 @@ public JobInfo findJob(String client, long id) { return null; } - public synchronized void shutdown() { + public synchronized void shutdown() throws InterruptedException { for (IoScheduler queue : _queues) { queue.shutdown(); } diff --git a/modules/dcache/src/main/java/org/dcache/pool/classic/IoScheduler.java b/modules/dcache/src/main/java/org/dcache/pool/classic/IoScheduler.java index 8ab735fad9c..fc830427e73 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/classic/IoScheduler.java +++ b/modules/dcache/src/main/java/org/dcache/pool/classic/IoScheduler.java @@ -68,7 +68,7 @@ public interface IoScheduler { /** * Shutdown the scheduler. All subsequent execution request will be rejected. */ - public void shutdown(); + public void shutdown() throws InterruptedException; // legacy crap public List getJobInfos(); diff --git a/modules/dcache/src/main/java/org/dcache/pool/classic/SimpleIoScheduler.java b/modules/dcache/src/main/java/org/dcache/pool/classic/SimpleIoScheduler.java index ed1f5ef0759..a92d5835d45 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/classic/SimpleIoScheduler.java +++ b/modules/dcache/src/main/java/org/dcache/pool/classic/SimpleIoScheduler.java @@ -1,5 +1,7 @@ package org.dcache.pool.classic; +import com.google.common.base.Function; +import com.google.common.base.Joiner; import com.google.common.util.concurrent.MoreExecutors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,6 +16,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; import diskCacheV111.util.CacheException; import diskCacheV111.vehicles.IoJobInfo; @@ -29,6 +32,9 @@ import org.dcache.util.IoPriority; import org.dcache.util.LifoPriorityComparator; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.Iterables.transform; + /** * * @since 1.9.11 @@ -70,7 +76,7 @@ public class SimpleIoScheduler implements IoScheduler, Runnable { /** * are we need to shutdown. */ - boolean _shutdown; + private volatile boolean _shutdown; private final AdjustableSemaphore _semaphore = new AdjustableSemaphore(); @@ -117,6 +123,7 @@ public SimpleIoScheduler(String name, */ @Override public synchronized int add(PoolIORequest request, IoPriority priority) { + checkState(!_shutdown); int id = _queueId << 24 | nextId(); @@ -199,27 +206,29 @@ public String getName() { @Override public synchronized void cancel(int id) throws NoSuchElementException { - PrioritizedRequest wrapper; wrapper = _jobs.get(id); - if (wrapper == null) { throw new NoSuchElementException("Job " + id + " not found"); } + cancel(wrapper); + } - if(_queue.remove(wrapper)) { + private void cancel(PrioritizedRequest wrapper) + { + if (_queue.remove(wrapper)) { /* - * if request still in the queue, then we can cancel it right now. + * as request is still in the queue, we can cancel it right away. */ - _jobs.remove(id); - final PoolIORequest request = wrapper.getRequest(); + _jobs.remove(wrapper.getId()); + PoolIORequest request = wrapper.getRequest(); request.setState(IoRequestState.DONE); request.setTransferStatus(CacheException.DEFAULT_ERROR_CODE, "Transfer canceled"); /* - * go though standart procedure to update billing and notity door. + * go through the standard procedure to update billing and notify door. */ - final String protocolName = protocolNameOf(request); + String protocolName = protocolNameOf(request); _executorServices.getPostExecutorService(protocolName).execute(request); } else { wrapper.getRequest().kill(); @@ -240,8 +249,29 @@ public void setMaxActiveJobs(int maxJobs) { } @Override - public void shutdown() { - // NOP for now + public synchronized void shutdown() throws InterruptedException + { + if (!_shutdown) { + _shutdown = true; + _worker.interrupt(); + for (PrioritizedRequest request : _jobs.values()) { + cancel(request); + } + _log.info("Waiting for movers on queue '{}' to finish", _name); + if (!_semaphore.tryAcquire(_semaphore.getMaxPermits(), 2000L, TimeUnit.MILLISECONDS)) { + // This is often due to a mover not reacting to interrupt or the transfer + // doing a lengthy checksum calculation during post processing. + _log.warn("Failed to terminate some movers prior to shutdown: {}", + Joiner.on(",").join(transform(_jobs.values(), new Function() + { + @Override + public String apply(PrioritizedRequest input) + { + return input.getRequest().getProtocolInfo().getVersionString(); + } + }))); + } + } } @Override diff --git a/modules/dcache/src/main/java/org/dcache/util/AdjustableSemaphore.java b/modules/dcache/src/main/java/org/dcache/util/AdjustableSemaphore.java index d70ea147628..3eb4c9920cc 100644 --- a/modules/dcache/src/main/java/org/dcache/util/AdjustableSemaphore.java +++ b/modules/dcache/src/main/java/org/dcache/util/AdjustableSemaphore.java @@ -1,6 +1,7 @@ package org.dcache.util; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; /** * A simple implementation of an adjustable semaphore. @@ -83,6 +84,14 @@ public void acquire() throws InterruptedException { this.semaphore.acquire(); } + /** + * @see Semaphore#tryAcquire(int, long, java.util.concurrent.TimeUnit) + */ + public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException + { + return semaphore.tryAcquire(permits, timeout, unit); + } + /** * Get a permit this semaphore if one is available at the * time of invocation. diff --git a/modules/dcache/src/main/resources/org/dcache/pool/classic/pool.xml b/modules/dcache/src/main/resources/org/dcache/pool/classic/pool.xml index e116de6814f..cd26498a1ac 100644 --- a/modules/dcache/src/main/resources/org/dcache/pool/classic/pool.xml +++ b/modules/dcache/src/main/resources/org/dcache/pool/classic/pool.xml @@ -329,10 +329,10 @@ + destroy-method="shutdown" depends-on="rep"/> + init-method="init" destroy-method="shutdown" depends-on="rep"> NFSv4.1 request execution service