Skip to content

Commit

Permalink
dcache-bulk: refine container executor model
Browse files Browse the repository at this point in the history
Motivation:

In https://rb.dcache.org/r/14115
master@8a5c358af45586383d87873952407c646f81f4c6

the container executor model was made more like SRM
by using an unbounded cached executor.  While this
brought Bulk performance in line with SRM, it also
drives memory usage to be very near to physical
memory.  In JFR, after a sustained usage/submission
of about 1700 requests of 10K targets each over 6
hours, one can observe this warning (see attached).

However, a totally unbounded executor is not necessary.
All that is required is an executor which has enough
slots to accommodate the in-flight semaphore.

Modification:

Split the pooled executor into a main and a callback
executor, each initialized with max threads equal
to the in-flight semaphore value.  Use the callback
executor exclusively for message/activity callbacks
(instead of direct executor).

Result:

Same performance as before, but with a total memory
footprint at a little more than half available
physical memory instead of being very close to it.

Target: master
Request: 9.2
Requires-notes: yes
Patch: https://rb.dcache.org/r/14118/
Acked-by: Tigran
Acked-by: Lea
  • Loading branch information
alrossi authored and lemora committed Oct 11, 2023
1 parent 3cd0912 commit be306f4
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 8 deletions.
Expand Up @@ -73,7 +73,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import com.google.common.collect.Range;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import diskCacheV111.util.CacheException;
import diskCacheV111.util.FsPath;
import diskCacheV111.util.NamespaceHandlerAware;
Expand Down Expand Up @@ -480,7 +479,7 @@ public void failure(int rc, Object error) {
remove();
}
}
}, MoreExecutors.directExecutor());
}, callbackExecutor);
}

/**
Expand Down Expand Up @@ -514,7 +513,7 @@ public void failure(int rc, Object error) {
remove();
}
}
}, MoreExecutors.directExecutor());
}, callbackExecutor);
}

/**
Expand Down Expand Up @@ -567,7 +566,7 @@ private void performActivity(boolean async) throws InterruptedException {
try {
activityFuture = activity.perform(ruid, id == null ? seqNo : id, path, attributes);
if (async) {
activityFuture.addListener(() -> handleCompletion(), executor);
activityFuture.addListener(() -> handleCompletion(), callbackExecutor);
}
} catch (BulkServiceException | UnsupportedOperationException e) {
LOGGER.error("{}, perform failed for {}: {}", ruid, target, e.getMessage());
Expand Down Expand Up @@ -669,6 +668,7 @@ private void storeOrUpdate(Throwable error) throws InterruptedException {
private SignalAware callback;
private Thread runThread;
private ExecutorService executor;
private ExecutorService callbackExecutor;
private Semaphore dirListSemaphore;
private Semaphore inFlightSemaphore;

Expand Down Expand Up @@ -839,6 +839,10 @@ public void setListHandler(ListDirectoryHandler listHandler) {
this.listHandler = listHandler;
}

public void setCallbackExecutor(ExecutorService callbackExecutor) {
this.callbackExecutor = callbackExecutor;
}

public void setExecutor(ExecutorService executor) {
this.executor = executor;
}
Expand Down
Expand Up @@ -92,6 +92,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.services.bulk.util.BulkRequestTarget;
import org.dcache.services.bulk.util.BulkRequestTarget.State;
import org.dcache.services.bulk.util.BulkServiceStatistics;
import org.dcache.util.BoundedCachedExecutor;
import org.dcache.util.FireAndForgetTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -309,10 +310,15 @@ private ListMultimap<String, String> userRequests() {
*/
private ExecutorService processorExecutorService;

/**
* Thread dedicated to job callbacks.
*/
private ExecutorService callbackExecutor;

/**
* Thread dedicated to jobs.
*/
private ExecutorService pooledExecutorService;
private ExecutorService executorService;

/**
* Records number of jobs and requests processed.
Expand Down Expand Up @@ -357,7 +363,6 @@ public void initialize() throws Exception {
schedulerProvider.initialize();
processor = new ConcurrentRequestProcessor(schedulerProvider.getRequestScheduler());
processorExecutorService = Executors.newSingleThreadScheduledExecutor();
pooledExecutorService = Executors.newCachedThreadPool();
processorFuture = processorExecutorService.submit(processor);
}

Expand Down Expand Up @@ -420,11 +425,21 @@ public int getMaxActiveRequests() {
return maxActiveRequests;
}

@Required
public void setCallbackExecutor(BoundedCachedExecutor callbackExecutor) {
this.callbackExecutor = callbackExecutor;
}

@Required
public void setCompletionHandler(BulkRequestCompletionHandler completionHandler) {
this.completionHandler = completionHandler;
}

@Required
public void setExecutor(BoundedCachedExecutor pooledExecutor) {
this.executorService = pooledExecutor;
}

@Required
public void setTargetStore(BulkTargetStore targetStore) {
this.targetStore = targetStore;
Expand Down Expand Up @@ -518,12 +533,13 @@ void startJob(BulkRequestContainerJob job) {
String key = job.getTarget().getKey();
LOGGER.trace("submitting job {} to executor, target {}.", key,
job.getTarget());
job.setExecutor(pooledExecutorService);
job.setExecutor(executorService);
job.setCallbackExecutor(callbackExecutor);
job.setCallback(this);
try {
if (isJobValid(job)) { /* possibly cancelled in flight */
job.update(State.RUNNING);
pooledExecutorService.submit(new FireAndForgetTask(job));
executorService.submit(new FireAndForgetTask(job));
}
} catch (RuntimeException e) {
job.getTarget().setErrorObject(e);
Expand Down
Expand Up @@ -243,6 +243,16 @@
<property name="submissionHandler" ref="request-handler"/>
<property name="schedulerProvider" ref="scheduler-provider"/>
<property name="maxActiveRequests" value="${bulk.limits.container-processing-threads}"/>
<property name="executor">
<bean class="org.dcache.util.BoundedCachedExecutor">
<constructor-arg value="${bulk.limits.in-flight-semaphore}"/>
</bean>
</property>
<property name="callbackExecutor">
<bean class="org.dcache.util.BoundedCachedExecutor">
<constructor-arg value="${bulk.db.connections.max}"/>
</bean>
</property>
<property name="timeout" value="${bulk.limits.sweep-interval}"/>
<property name="timeoutUnit" value="${bulk.limits.sweep-interval.unit}"/>
</bean>
Expand Down

0 comments on commit be306f4

Please sign in to comment.