Skip to content

Commit

Permalink
pool: Kill movers on shutdown
Browse files Browse the repository at this point in the history
The SimpleIoScheduler has a shutdown method, but it doesn't actually
do anything.

This patch updates the shutdown method to kill all movers. Movers
are given a moment to die before the shutdown method returns. This
prevents that dependent components are shut down before movers
terminate.

An explicit dependency had to be added between the execution services
and the repository. This is to prevent that spring shuts down the
repository component before the movers are terminated.

Target: trunk
Require-notes: yes
Require-book: no
Acked-by: Paul Millar <paul.millar@desy.de>
Patch: http://rb.dcache.org/r/5300/
  • Loading branch information
gbehrmann committed Apr 2, 2013
1 parent e206e66 commit 806aa55
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 15 deletions.
Expand Up @@ -137,7 +137,7 @@ public JobInfo findJob(String client, long id) {
return null;
}

public synchronized void shutdown() {
public synchronized void shutdown() throws InterruptedException {
for (IoScheduler queue : _queues) {
queue.shutdown();
}
Expand Down
Expand Up @@ -68,7 +68,7 @@ public interface IoScheduler {
/**
* Shutdown the scheduler. All subsequent execution request will be rejected.
*/
public void shutdown();
public void shutdown() throws InterruptedException;

// legacy crap
public List<JobInfo> getJobInfos();
Expand Down
@@ -1,5 +1,7 @@
package org.dcache.pool.classic;

import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -14,6 +16,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

import diskCacheV111.util.CacheException;
import diskCacheV111.vehicles.IoJobInfo;
Expand All @@ -29,6 +32,9 @@
import org.dcache.util.IoPriority;
import org.dcache.util.LifoPriorityComparator;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterables.transform;

/**
*
* @since 1.9.11
Expand Down Expand Up @@ -70,7 +76,7 @@ public class SimpleIoScheduler implements IoScheduler, Runnable {
/**
* are we need to shutdown.
*/
boolean _shutdown;
private volatile boolean _shutdown;

private final AdjustableSemaphore _semaphore = new AdjustableSemaphore();

Expand Down Expand Up @@ -117,6 +123,7 @@ public SimpleIoScheduler(String name,
*/
@Override
public synchronized int add(PoolIORequest request, IoPriority priority) {
checkState(!_shutdown);

int id = _queueId << 24 | nextId();

Expand Down Expand Up @@ -199,27 +206,29 @@ public String getName() {

@Override
public synchronized void cancel(int id) throws NoSuchElementException {

PrioritizedRequest wrapper;
wrapper = _jobs.get(id);

if (wrapper == null) {
throw new NoSuchElementException("Job " + id + " not found");
}
cancel(wrapper);
}

if(_queue.remove(wrapper)) {
private void cancel(PrioritizedRequest wrapper)
{
if (_queue.remove(wrapper)) {
/*
* if request still in the queue, then we can cancel it right now.
* as request is still in the queue, we can cancel it right away.
*/
_jobs.remove(id);
final PoolIORequest request = wrapper.getRequest();
_jobs.remove(wrapper.getId());
PoolIORequest request = wrapper.getRequest();
request.setState(IoRequestState.DONE);
request.setTransferStatus(CacheException.DEFAULT_ERROR_CODE, "Transfer canceled");

/*
* go though standart procedure to update billing and notity door.
* go through the standard procedure to update billing and notify door.
*/
final String protocolName = protocolNameOf(request);
String protocolName = protocolNameOf(request);
_executorServices.getPostExecutorService(protocolName).execute(request);
} else {
wrapper.getRequest().kill();
Expand All @@ -240,8 +249,29 @@ public void setMaxActiveJobs(int maxJobs) {
}

@Override
public void shutdown() {
// NOP for now
public synchronized void shutdown() throws InterruptedException
{
if (!_shutdown) {
_shutdown = true;
_worker.interrupt();
for (PrioritizedRequest request : _jobs.values()) {
cancel(request);
}
_log.info("Waiting for movers on queue '{}' to finish", _name);
if (!_semaphore.tryAcquire(_semaphore.getMaxPermits(), 2000L, TimeUnit.MILLISECONDS)) {
// This is often due to a mover not reacting to interrupt or the transfer
// doing a lengthy checksum calculation during post processing.
_log.warn("Failed to terminate some movers prior to shutdown: {}",
Joiner.on(",").join(transform(_jobs.values(), new Function<PrioritizedRequest, String>()
{
@Override
public String apply(PrioritizedRequest input)
{
return input.getRequest().getProtocolInfo().getVersionString();
}
})));
}
}
}

@Override
Expand Down
@@ -1,6 +1,7 @@
package org.dcache.util;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
* A simple implementation of an adjustable semaphore.
Expand Down Expand Up @@ -83,6 +84,14 @@ public void acquire() throws InterruptedException {
this.semaphore.acquire();
}

/**
* @see Semaphore#tryAcquire(int, long, java.util.concurrent.TimeUnit)
*/
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException
{
return semaphore.tryAcquire(permits, timeout, unit);
}

/**
* Get a permit this semaphore if one is available at the
* time of invocation.
Expand Down
Expand Up @@ -329,10 +329,10 @@
</bean>

<bean id="legacy-execution-service" class="org.dcache.pool.classic.LegacyMoverExecutorService"
destroy-method="shutdown"/>
destroy-method="shutdown" depends-on="rep"/>

<bean id="nfs-execution-service" class="org.dcache.chimera.nfsv41.mover.NfsExcecutionService"
init-method="init" destroy-method="shutdown">
init-method="init" destroy-method="shutdown" depends-on="rep">
<description>NFSv4.1 request execution service</description>
<property name="enableGss" value="${nfs.rpcsec_gss}" />
</bean>
Expand Down

0 comments on commit 806aa55

Please sign in to comment.