Skip to content

Commit

Permalink
spacemanager: Do not use ThreadManager
Browse files Browse the repository at this point in the history
Space manager uses a regular Java thread pool instead. Introduces
the following new configuration property:

spacemanager.limits.threads=30

Target: trunk
Require-notes: yes
Require-book: yes
Acked-by: Paul Millar <paul.millar@desy.de>
Acked-by: Dmitry Litvintsev <litvinse@fnal.gov>
Patch: http://rb.dcache.org/r/6376/
  • Loading branch information
gbehrmann committed Jan 10, 2014
1 parent 5c786c2 commit b6ec60c
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 5 deletions.
Expand Up @@ -33,6 +33,7 @@
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import com.google.common.primitives.Longs;
import org.dcache.util.CDCExecutorServiceDecorator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Required;
Expand All @@ -55,6 +56,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

import diskCacheV111.poolManager.PoolPreferenceLevel;
import diskCacheV111.poolManager.PoolSelectionUnit;
Expand All @@ -80,7 +83,6 @@
import diskCacheV111.util.PnfsHandler;
import diskCacheV111.util.PnfsId;
import diskCacheV111.util.RetentionPolicy;
import diskCacheV111.util.ThreadManager;
import diskCacheV111.util.VOInfo;
import diskCacheV111.vehicles.DoorTransferFinishedMessage;
import diskCacheV111.vehicles.IpProtocolInfo;
Expand Down Expand Up @@ -151,6 +153,8 @@ public final class Manager

private SpaceManagerAuthorizationPolicy authorizationPolicy;

private Executor executor;

private JdbcConnectionPool connection_pool;
private DBManager dbManager;
private static final Logger logger = LoggerFactory.getLogger(Manager.class);
Expand Down Expand Up @@ -237,6 +241,12 @@ public void setLinkGroupAuthorizationFileName(String linkGroupAuthorizationFileN
this.linkGroupAuthorizationFileName = linkGroupAuthorizationFileName;
}

@Required
public void setExecutor(ExecutorService executor)
{
this.executor = new CDCExecutorServiceDecorator(executor);
}

@Required
public void setDbManager(DBManager manager)
{
Expand Down Expand Up @@ -2458,7 +2468,7 @@ public void messageArrived(final CellMessage envelope,
if (!isNotificationMessage(message) && !isSpaceManagerMessage(message)) {
messageToForward(envelope, message);
} else if (spaceManagerEnabled) {
ThreadManager.execute(new Runnable()
executor.execute(new Runnable()
{
@Override
public void run()
Expand Down Expand Up @@ -2503,7 +2513,7 @@ public void messageToForward(final CellMessage envelope, final Message message)

if (envelope.nextDestination()) {
if (spaceManagerEnabled && isInterceptedMessage(message)) {
ThreadManager.execute(new Runnable()
executor.execute(new Runnable()
{
@Override
public void run()
Expand Down
Expand Up @@ -36,6 +36,19 @@
<constructor-arg value="1"/>
</bean>

<bean id="executor"
class="java.util.concurrent.ThreadPoolExecutor"
destroy-method="shutdown">
<description>Thread pool for message processing</description>
<constructor-arg value="1"/>
<constructor-arg value="${spacemanager.limits.threads}"/>
<constructor-arg value="60"/>
<constructor-arg value="SECONDS"/>
<constructor-arg>
<bean class="java.util.concurrent.LinkedBlockingQueue"/>
</constructor-arg>
</bean>

<bean id="pool-manager-stub" class="org.dcache.cells.CellStub">
<description>Pool manager communication stub</description>
<property name="destination" value="${srm.service.poolmanager}"/>
Expand Down Expand Up @@ -91,6 +104,7 @@
<property name="pnfsHandler" ref="pnfs"/>
<property name="poolManagerStub" ref="pool-manager-stub"/>
<property name="poolMonitor" ref="pool-monitor"/>
<property name="executor" ref="executor"/>
<property name="spaceManagerEnabled"
value="${spacemanager.enable.space-reservation}" />
<property name="updateLinkGroupsPeriod"
Expand Down
7 changes: 6 additions & 1 deletion skel/share/defaults/spacemanager.properties
Expand Up @@ -80,7 +80,12 @@ spacemanager.service.poolmanager=${dcache.service.poolmanager}
spacemanager.service.poolmanager.timeout=300
(one-of?MILLISECONDS|SECONDS|MINUTES|HOURS|DAYS)spacemanager.service.poolmanager.timeout.unit=SECONDS


# ---- Number of concurrent threads used by space manager
#
# Each thread will likely obtain a connection to the database, meaning the thread limit
# and the database connection limit should be in the same ballpark.
#
spacemanager.limits.threads=30

# ---- Whether space manager is enabled
#
Expand Down
2 changes: 1 addition & 1 deletion skel/share/services/spacemanager.batch
Expand Up @@ -3,7 +3,6 @@
#

onerror shutdown
exec file:${dcache.paths.share}/cells/threadmanager.fragment

onerror shutdown
check -strong spacemanager.cell.name
Expand All @@ -15,6 +14,7 @@ check -strong spacemanager.db.password
check -strong spacemanager.db.connections.partition-count
check -strong spacemanager.db.connections.min-per-partition
check -strong spacemanager.db.connections.max-per-partition
check -strong spacemanager.limits.threads
check -strong spacemanager.default-access-latency
check -strong spacemanager.enable.reserve-space-for-non-srm-transfers
check spacemanager.authz.link-group-file-name
Expand Down

0 comments on commit b6ec60c

Please sign in to comment.