Skip to content

Commit

Permalink
dcache-bulk: fix thread executor injection
Browse files Browse the repository at this point in the history
Motivation:

Recent changes to the job container implementation
inadvertently scrambled thread pool injection
such that one of the executors (intended to run
the main container jobs) never gets used (the
container job is executed by the same thread
pool as its own tasks).

Modification:

Fix the injection.  For the sake of consistency,
all the thread pools are injected into the
container by the job factory rather than
the manager.

Result:

Container jobs run on their own thread pool,
as intended.

Target: master
Request: 9.2
Requires-notes:  no (this is largely invisible)
Patch: https://rb.dcache.org/r/14135
Acked-by: Tigran
  • Loading branch information
alrossi committed Oct 17, 2023
1 parent d466c36 commit ea7e261
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 50 deletions.
Expand Up @@ -80,6 +80,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.services.bulk.util.BulkRequestTarget.PID;
import org.dcache.services.bulk.util.BulkRequestTargetBuilder;
import org.dcache.services.bulk.util.BulkServiceStatistics;
import org.dcache.util.BoundedCachedExecutor;
import org.dcache.util.list.ListDirectoryHandler;
import org.dcache.vehicles.FileAttributes;
import org.slf4j.Logger;
Expand All @@ -102,6 +103,9 @@ public final class RequestContainerJobFactory {
private BulkServiceStatistics statistics;
private Semaphore dirListSemaphore;
private Semaphore inFlightSemaphore;
private BoundedCachedExecutor taskExecutor;
private BoundedCachedExecutor callbackExecutor;
private BoundedCachedExecutor listExecutor;

public BulkRequestContainerJob createRequestJob(BulkRequest request)
throws BulkServiceException {
Expand Down Expand Up @@ -132,6 +136,9 @@ public BulkRequestContainerJob createRequestJob(BulkRequest request)
containerJob.setListHandler(listHandler);
containerJob.setDirListSemaphore(dirListSemaphore);
containerJob.setInFlightSemaphore(inFlightSemaphore);
containerJob.setExecutor(taskExecutor);
containerJob.setListExecutor(listExecutor);
containerJob.setCallbackExecutor(callbackExecutor);
containerJob.initialize();
return containerJob;
}
Expand All @@ -149,11 +156,21 @@ public void setActivityFactory(BulkActivityFactory activityFactory) {
this.activityFactory = activityFactory;
}

@Required
public void setCallbackExecutor(BoundedCachedExecutor callbackExecutor) {
this.callbackExecutor = callbackExecutor;
}

@Required
public void setListHandler(ListDirectoryHandler listHandler) {
this.listHandler = listHandler;
}

@Required
public void setListExecutor(BoundedCachedExecutor listExecutor) {
this.listExecutor = listExecutor;
}

@Required
public void setDirListSemaphore(int permits) {
dirListSemaphore = new Semaphore(permits);
Expand Down Expand Up @@ -184,6 +201,11 @@ public void setTargetStore(BulkTargetStore targetStore) {
this.targetStore = targetStore;
}

@Required
public void setTaskExecutor(BoundedCachedExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}

BulkActivity create(BulkRequest request) throws BulkServiceException {
String rid = request.getUid();

Expand Down
Expand Up @@ -311,19 +311,9 @@ private ListMultimap<String, String> userRequests() {
private ExecutorService processorExecutorService;

/**
* Thread dedicated to job callbacks.
* Thread dedicated to running containerJobs.
*/
private ExecutorService callbackExecutor;

/**
* Thread dedicated to directory listing.
*/
private ExecutorService listExecutor;

/**
* Thread dedicated to jobs.
*/
private ExecutorService executorService;
private ExecutorService containerExecutor;

/**
* Records number of jobs and requests processed.
Expand Down Expand Up @@ -430,24 +420,14 @@ public int getMaxActiveRequests() {
return maxActiveRequests;
}

@Required
public void setCallbackExecutor(BoundedCachedExecutor callbackExecutor) {
this.callbackExecutor = callbackExecutor;
}

@Required
public void setCompletionHandler(BulkRequestCompletionHandler completionHandler) {
this.completionHandler = completionHandler;
}

@Required
public void setExecutor(BoundedCachedExecutor pooledExecutor) {
this.executorService = pooledExecutor;
}

@Required
public void setListExecutor(BoundedCachedExecutor listExecutor) {
this.listExecutor = listExecutor;
public void setContainerExecutor(BoundedCachedExecutor containerExecutor) {
this.containerExecutor = containerExecutor;
}

@Required
Expand Down Expand Up @@ -543,14 +523,11 @@ void startJob(BulkRequestContainerJob job) {
String key = job.getTarget().getKey();
LOGGER.trace("submitting job {} to executor, target {}.", key,
job.getTarget());
job.setExecutor(executorService);
job.setListExecutor(listExecutor);
job.setCallbackExecutor(callbackExecutor);
job.setCallback(this);
try {
if (isJobValid(job)) { /* possibly cancelled in flight */
job.update(State.RUNNING);
executorService.submit(new FireAndForgetTask(job));
containerExecutor.submit(new FireAndForgetTask(job));
}
} catch (RuntimeException e) {
job.getTarget().setErrorObject(e);
Expand Down
Expand Up @@ -67,13 +67,21 @@
</constructor-arg>
</bean>

<bean id="container-job-executor" class="org.dcache.util.CDCExecutorServiceDecorator">
<bean id="container-job-executor" class="org.dcache.util.BoundedCachedExecutor">
<description>Used to execute jobs that are executed by a batch container.</description>
<constructor-arg>
<bean class="org.dcache.util.BoundedCachedExecutor">
<constructor-arg value="${bulk.limits.container-processing-threads}"/>
</bean>
</constructor-arg>
<constructor-arg value="${bulk.limits.container-processing-threads}"/>
</bean>

<bean id="task-executor" class="org.dcache.util.BoundedCachedExecutor">
<constructor-arg value="${bulk.limits.in-flight-semaphore}"/>
</bean>

<bean id="callback-executor" class="org.dcache.util.BoundedCachedExecutor">
<constructor-arg value="${bulk.db.connections.max}"/>
</bean>

<bean id="dir-list-executor" class="org.dcache.util.BoundedCachedExecutor">
<constructor-arg value="${bulk.limits.dir-list-semaphore}"/>
</bean>

<bean id="cancellation-executor" class="org.dcache.util.CDCScheduledExecutorServiceDecorator">
Expand Down Expand Up @@ -209,6 +217,9 @@
<property name="statistics" ref="statistics"/>
<property name="dirListSemaphore" value="${bulk.limits.dir-list-semaphore}"/>
<property name="inFlightSemaphore" value="${bulk.limits.in-flight-semaphore}"/>
<property name="taskExecutor" ref="task-executor"/>
<property name="callbackExecutor" ref="callback-executor"/>
<property name="listExecutor" ref="dir-list-executor"/>
</bean>

<bean id="statistics" class="org.dcache.services.bulk.util.BulkServiceStatistics">
Expand Down Expand Up @@ -243,21 +254,7 @@
<property name="submissionHandler" ref="request-handler"/>
<property name="schedulerProvider" ref="scheduler-provider"/>
<property name="maxActiveRequests" value="${bulk.limits.container-processing-threads}"/>
<property name="executor">
<bean class="org.dcache.util.BoundedCachedExecutor">
<constructor-arg value="${bulk.limits.in-flight-semaphore}"/>
</bean>
</property>
<property name="callbackExecutor">
<bean class="org.dcache.util.BoundedCachedExecutor">
<constructor-arg value="${bulk.db.connections.max}"/>
</bean>
</property>
<property name="listExecutor" >
<bean class="org.dcache.util.BoundedCachedExecutor">
<constructor-arg value="${bulk.limits.dir-list-semaphore}"/>
</bean>
</property>
<property name="containerExecutor" ref="container-job-executor"/>
<property name="timeout" value="${bulk.limits.sweep-interval}"/>
<property name="timeoutUnit" value="${bulk.limits.sweep-interval.unit}"/>
</bean>
Expand Down Expand Up @@ -299,6 +296,9 @@
<list>
<ref bean="incoming-thread-executor"/>
<ref bean="container-job-executor"/>
<ref bean="task-executor"/>
<ref bean="callback-executor"/>
<ref bean="dir-list-executor"/>
<ref bean="cancellation-executor"/>
</list>
</property>
Expand Down
2 changes: 1 addition & 1 deletion skel/share/defaults/bulk.properties
Expand Up @@ -69,7 +69,7 @@ bulk.request-scheduler=org.dcache.services.bulk.manager.scheduler.LeastRecentFir
#
bulk.limits.container-processing-threads=100
bulk.limits.incoming-request-threads=10
bulk.limits.cancellation-threads=10
bulk.limits.cancellation-threads=${bulk.limits.container-processing-threads}

# ---- Expiration of the cache serving to front the request storage.
#
Expand Down

0 comments on commit ea7e261

Please sign in to comment.