Navigation Menu

Skip to content

Commit

Permalink
spacemanager: Controlled shutdown
Browse files Browse the repository at this point in the history
Since the thread pool was shut down asynchronously, requests in the queue could
be processed after the database connection pool was closed. The was a
potentially large number of stack traces in the logs when shutting down space
manager while it was busy.

This refactors the thread pool handling such that the tasks of the space
manager service can be drained before the connection pool is closed. The space
manager classifies requests into important requests and unimportant requests.
The latter are returned to the sender with an error, while the former are
processed as loosing them is undesirable.

The patch also fixes the use of an srm property inside the space manager
configuration.

Target: trunk
Require-notes: yes
Require-book: yes
Request: 2.12
Request: 2.11
Request: 2.10
Acked-by: Paul Millar <paul.millar@desy.de>
Patch: https://rb.dcache.org/r/7947/
(cherry picked from commit 5e56f38)
  • Loading branch information
gbehrmann committed Mar 9, 2015
1 parent ddb3bd3 commit 42dcac4
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 38 deletions.
Expand Up @@ -13,6 +13,7 @@
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

import diskCacheV111.util.AccessLatency;
import diskCacheV111.util.CacheException;
Expand All @@ -31,6 +32,7 @@
import dmg.util.command.Option;

import org.dcache.auth.FQAN;
import org.dcache.util.CDCExecutorServiceDecorator;
import org.dcache.util.ColumnWriter;
import org.dcache.util.Glob;

Expand All @@ -46,9 +48,9 @@ public class SpaceManagerCommandLineInterface implements CellCommandListener
private Executor executor;

@Required
public void setExecutor(Executor executor)
public void setExecutor(ExecutorService executor)
{
this.executor = executor;
this.executor = new CDCExecutorServiceDecorator<>(executor);
}

@Required
Expand Down
Expand Up @@ -94,6 +94,7 @@
import org.dcache.namespace.FileAttribute;
import org.dcache.namespace.FileType;
import org.dcache.poolmanager.PoolMonitor;
import org.dcache.util.BoundedExecutor;
import org.dcache.util.CDCExecutorServiceDecorator;
import org.dcache.vehicles.FileAttributes;

Expand Down Expand Up @@ -124,12 +125,14 @@ public final class SpaceManagerService

private SpaceManagerAuthorizationPolicy authorizationPolicy;

private Executor executor;
private ExecutorService executor;

private PoolMonitor poolMonitor;
private SpaceManagerDatabase db;
private LinkGroupLoader linkGroupLoader;
private long perishedSpacePurgeDelay;
private int threads;
private volatile boolean isStopped;

@Required
public void setPoolManager(CellPath poolManager)
Expand Down Expand Up @@ -179,10 +182,16 @@ public void setShouldReturnFlushedSpaceToReservation(boolean shouldReturnFlushed
this.shouldReturnFlushedSpaceToReservation = shouldReturnFlushedSpaceToReservation;
}

@Required
public void setMaxThreads(int threads)
{
this.threads = threads;
}

@Required
public void setExecutor(ExecutorService executor)
{
this.executor = new CDCExecutorServiceDecorator<>(executor);
this.executor = executor;
}

@Required
Expand Down Expand Up @@ -211,15 +220,23 @@ public void setPerishedSpacePurgeDelay(long millis)

public void start()
{
executor = new CDCExecutorServiceDecorator<>(new BoundedExecutor(executor, threads));
(expireSpaceReservations = new Thread(this,"ExpireThreadReservations")).start();
}

public void stop() throws InterruptedException
{
try {
isStopped = true;
executor.shutdown();
if (expireSpaceReservations != null) {
expireSpaceReservations.interrupt();
expireSpaceReservations.join();
expireSpaceReservations.interrupt();
expireSpaceReservations.join();
}
executor.awaitTermination(1, TimeUnit.SECONDS);
} finally {
executor.shutdownNow();
}
}


Expand Down Expand Up @@ -348,6 +365,18 @@ private boolean isInterceptedMessage(Message message)
|| (message instanceof PoolAcceptFileMessage && ((PoolAcceptFileMessage) message).getFileAttributes().getStorageInfo().getKey("LinkGroupId") != null && (!message.isReply() || message.getReturnCode() != 0));
}

/**
* Returns true if message should not be discarded during shutdown.
*/
private boolean isImportantMessage(Message message)
{
return message.isReply() ||
message instanceof PoolRemoveFilesMessage ||
message instanceof PoolFileFlushedMessage ||
message instanceof PnfsDeleteEntryNotificationMessage ||
message instanceof DoorTransferFinishedMessage;
}

public void messageArrived(final CellMessage envelope,
final Message message)
{
Expand All @@ -363,26 +392,19 @@ public void messageArrived(final CellMessage envelope,
@Override
public void process() throws DeadlockLoserDataAccessException
{
processMessage(message);
if (message.getReplyRequired()) {
try {
envelope.revertDirection();
sendMessage(envelope);
} catch (NoRouteToCellException e) {
LOGGER.error("Failed to send reply: {}", e.getMessage());
if (!isStopped || isImportantMessage(message)) {
processMessage(message);
if (message.getReplyRequired()) {
returnMessage(envelope);
}
} else {
notifyShutdown(envelope);
}
}
});
} else if (message.getReplyRequired()) {
try {
message.setReply(1, "Space manager is disabled in configuration");
envelope.revertDirection();
sendMessage(envelope);
}
catch (NoRouteToCellException e) {
LOGGER.error("Failed to send reply: {}", e.getMessage());
}
message.setReply(1, "Space manager is disabled in configuration");
returnMessage(envelope);
}
}
}
Expand All @@ -407,10 +429,13 @@ public void messageToForward(final CellMessage envelope, final Message message)
@Override
public void process() throws DeadlockLoserDataAccessException
{
processMessage(message);

if (message.getReturnCode() != 0 && !isEnRouteToDoor) {
envelope.revertDirection();
if (!isStopped || isImportantMessage(message)) {
processMessage(message);
if (message.getReturnCode() != 0 && !isEnRouteToDoor) {
envelope.revertDirection();
}
} else {
notifyShutdown(envelope);
}

forwardMessage(envelope, isEnRouteToDoor);
Expand All @@ -422,6 +447,29 @@ public void process() throws DeadlockLoserDataAccessException
}
}

private void notifyShutdown(CellMessage envelope)
{
try {
envelope.setMessageObject(new NoRouteToCellException(
envelope.getUOID(), envelope.getDestinationPath(),
"Space manager is shutting down."));
envelope.revertDirection();
sendMessage(envelope);
} catch (NoRouteToCellException e) {
LOGGER.debug("Failed to notify cell of space manager shutdown: {}", e.getMessage());
}
}

private void returnMessage(CellMessage envelope)
{
try {
envelope.revertDirection();
sendMessage(envelope);
} catch (NoRouteToCellException e) {
LOGGER.error("Failed to send reply: {}", e.getMessage());
}
}

private void forwardMessage(CellMessage envelope, boolean isEnRouteToDoor)
{
try {
Expand Down
Expand Up @@ -10,18 +10,12 @@
<context:property-placeholder/>
<context:annotation-config/>

<bean id="executor"
class="org.dcache.util.BoundedExecutor"
destroy-method="shutdown">
<description>Thread pool for message processing</description>
<constructor-arg>
<bean class="java.util.concurrent.Executors"
factory-method="newCachedThreadPool"
destroy-method="shutdown">
</bean>
</constructor-arg>
<constructor-arg value="${spacemanager.limits.threads}"/>
</bean>
<bean id="executor"
class="java.util.concurrent.Executors"
factory-method="newCachedThreadPool"
destroy-method="shutdown">
<description>Thread pool for message processing</description>
</bean>

<bean id="pool-manager-stub" class="org.dcache.cells.CellStub">
<description>Pool manager communication stub</description>
Expand Down Expand Up @@ -118,8 +112,9 @@
<property name="database" ref="database"/>
<property name="linkGroupLoader" ref="linkgroup-loader"/>
<property name="pnfsHandler" ref="pnfs"/>
<property name="poolManager" value="${srm.service.poolmanager}"/>
<property name="poolManager" value="${spacemanager.service.poolmanager}"/>
<property name="poolMonitor" ref="pool-monitor"/>
<property name="maxThreads" value="${spacemanager.limits.threads}"/>
<property name="executor" ref="executor"/>
<property name="perishedSpacePurgeDelay"
value="#{T(java.util.concurrent.TimeUnit).MILLISECONDS.convert(
Expand Down

0 comments on commit 42dcac4

Please sign in to comment.