Skip to content

Commit

Permalink
dcache-bulk: container rewrite to optimize threading
Browse files Browse the repository at this point in the history
Motivation:

Benchmarking Bulk PIN requests against SRM's
bringonline requests revealed that the current
bulk threading implementation was nearly an order
of magnitude slower (e.g., 10K target paths in
10 minutes rather than 75 seconds).

Investigation determined there were two main issues:

Bulk was executing preparatory calls to the
PnfsManager synchronously on the same thread
as that allocated for the main activity operation.
Thread pools were unnecessarily multiple,
and all of the BoundedCachedExecutor variety,
which does not allow for unbounded pool expansion
(unlike SRM, which uses the unbounded cached thread
pool).

The second point actually makes a huge difference in
performance.

All of this suggested that the container job needed
significant rewriting to get the performance SRM does.

Modification:

The container job classes have been collapsed into
a single concrete implementation class. This has
been refactored to use ContainerTask objects.

All jobs are given to the unbounded cached pool
  for execution, but they must acquire a semphore
  permit before running, releasing it when they have
  finished.

PnfsManager communication is now asynchronous.
  Callbacks are handled by the calling thread.
  The callback from the activity itself is resubmitted
  to the executor pool since it involves a bit more work,
  and always a database update.

Activity semaphores and executors have been removed,
along with their related properties.

There is one minor internal interface change (parameter
type) on the activity perform method.

The properties and documentation have been updated.
The defaults are set from the testing reported here,
though further tweaking may be desirable after thorough
stress-testing has been done.

Result:

See summary under testing.  Performance is now very good.
  Code is also cleaner (IMO).

Target: master
Request: 9.2
Patch: https://rb.dcache.org/r/14115
Requires-book: yes (changes included)
Requires-notes: yes
Acked-by: Tigran
  • Loading branch information
alrossi committed Oct 4, 2023
1 parent a55628d commit b640b5e
Show file tree
Hide file tree
Showing 22 changed files with 1,127 additions and 1,268 deletions.
19 changes: 1 addition & 18 deletions docs/TheBook/src/main/markdown/config-bulk.md
Expand Up @@ -90,26 +90,9 @@ a different id. The default lifetime is five minutes (the same as for the NFS do
the [QoS Engine](config-qos-engine.md).
- **LOG_TARGET** : logs metadata for each target at the INFO level.

Each activity is associated with

- a permit count (used in connection with a semaphore for throttling execution);
- two thread queues, one for the execution of the container job,
and the other for the execution of callbacks on activity futures;
- a retry policy (currently the only retry policy is a NOP, i.e., no retry).

The permits are configurable using either the property or the admin shell
command ``request policy``.

Each activity is associated with a retry policy (currently the only retry policy is a NOP, i.e., no retry).
Should other retry policies become available, these can be set via a property.

The number and distribution of thread executors is hard-coded for the activities, but their
respective sizes can be adjusted using the properties:

```
bulk.limits.container-processing-threads=110
bulk.limits.activity-callback-threads=50
```

## Container Design

Version 2 of the bulk service has introduced improvements for better scalability and recovery.
Expand Down
Expand Up @@ -59,7 +59,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
*/
package org.dcache.services.bulk;

import static org.dcache.services.bulk.job.AbstractRequestContainerJob.findAbsolutePath;
import static org.dcache.services.bulk.job.BulkRequestContainerJob.findAbsolutePath;
import static org.dcache.services.bulk.util.BulkRequestTarget.computeFsPath;

import com.google.common.base.Strings;
Expand Down
Expand Up @@ -93,6 +93,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.security.auth.Subject;
Expand Down Expand Up @@ -181,7 +182,7 @@ public final class BulkServiceCommands implements CellCommandListener {
/**
* name | class | type | permits
*/
private static final String FORMAT_ACTIVITY = "%-20s | %100s | %7s | %10s ";
private static final String FORMAT_ACTIVITY = "%-20s | %100s | %7s ";

/**
* name | required | description
Expand Down Expand Up @@ -267,8 +268,7 @@ private static String formatActivity(Entry<String, BulkActivityProvider> entry)
return String.format(FORMAT_ACTIVITY,
entry.getKey(),
provider.getActivityClass(),
provider.getTargetType(),
provider.getMaxPermits());
provider.getTargetType());
}

private static String formatArgument(BulkActivityArgumentDescriptor descriptor) {
Expand Down Expand Up @@ -550,7 +550,7 @@ public String call() throws Exception {
return "There are no mapped activities!";
}

return String.format(FORMAT_ACTIVITY, "NAME", "CLASS", "TYPE", "PERMITS")
return String.format(FORMAT_ACTIVITY, "NAME", "CLASS", "TYPE")
+ "\n" + activities;
}
}
Expand Down Expand Up @@ -1371,7 +1371,7 @@ public PagedTargetResult call() throws Exception {
private BulkActivityFactory activityFactory;
private BulkTargetStore targetStore;
private BulkServiceStatistics statistics;
private ExecutorService executor;
private ExecutorService executor = Executors.newSingleThreadExecutor();

private JdbcBulkArchiveDao archiveDao;

Expand All @@ -1390,11 +1390,6 @@ public void setArchiveDao(JdbcBulkArchiveDao archiveDao) {
this.archiveDao = archiveDao;
}

@Required
public void setExecutor(ExecutorService executor) {
this.executor = executor;
}

@Required
public void setRequestManager(BulkRequestManager requestManager) {
this.requestManager = requestManager;
Expand Down
Expand Up @@ -65,14 +65,13 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import javax.security.auth.Subject;
import org.dcache.auth.attributes.Restriction;
import org.dcache.namespace.FileAttribute;
import org.dcache.services.bulk.BulkServiceException;
import org.dcache.services.bulk.activity.retry.BulkTargetRetryPolicy;
import org.dcache.services.bulk.activity.retry.NoRetryPolicy;
import org.dcache.services.bulk.util.BatchedResult;
import org.dcache.services.bulk.util.BulkRequestTarget;
import org.dcache.vehicles.FileAttributes;

Expand All @@ -98,36 +97,24 @@ public enum TargetType {

private static final BulkTargetRetryPolicy DEFAULT_RETRY_POLICY = new NoRetryPolicy();

private static final int DEFAULT_PERMITS = 50;

protected final String name;
protected final TargetType targetType;

protected Subject subject;
protected Restriction restriction;
protected Set<FileAttribute> requiredAttributes;
protected int maxPermits;
protected ExecutorService activityExecutor;
protected ExecutorService callbackExecutor;
protected BulkTargetRetryPolicy retryPolicy;
protected Set<BulkActivityArgumentDescriptor> descriptors;

protected BulkActivity(String name, TargetType targetType) {
this.name = name;
this.targetType = targetType;
requiredAttributes = MINIMALLY_REQUIRED_ATTRIBUTES;
maxPermits = DEFAULT_PERMITS;
retryPolicy = DEFAULT_RETRY_POLICY;
}

public void cancel(BulkRequestTarget target) {
target.cancel();
}

public int getMaxPermits() {
return maxPermits;
}

public String getName() {
return name;
}
Expand All @@ -144,10 +131,6 @@ public TargetType getTargetType() {
return targetType;
}

public Set<FileAttribute> getRequiredAttributes() {
return requiredAttributes;
}

public Subject getSubject() {
return subject;
}
Expand All @@ -164,39 +147,10 @@ public void setRestriction(Restriction restriction) {
this.restriction = restriction;
}

public ExecutorService getActivityExecutor() {
return activityExecutor;
}

public void setActivityExecutor(ExecutorService activityExecutor) {
this.activityExecutor = activityExecutor;
}

public ExecutorService getCallbackExecutor() {
return callbackExecutor;
}

public void setCallbackExecutor(ExecutorService callbackExecutor) {
this.callbackExecutor = callbackExecutor;
}

public void setDescriptors(Set<BulkActivityArgumentDescriptor> descriptors) {
this.descriptors = descriptors;
}

public void setMaxPermits(int maxPermits) {
this.maxPermits = maxPermits;
}

/**
* Completion handler method. Calls the internal implementation.
*
* @param result of the targeted activity.
*/
public void handleCompletion(BatchedResult<R> result) {
handleCompletion(result.getTarget(), result.getFuture());
}

/**
* Performs the activity.
*
Expand All @@ -223,5 +177,5 @@ public abstract ListenableFuture<R> perform(String rid, long tid, FsPath path, F
* @param target which has terminate.
* @param future the future returned by the activity call to perform();
*/
protected abstract void handleCompletion(BulkRequestTarget target, ListenableFuture<R> future);
public abstract void handleCompletion(BulkRequestTarget target, Future<R> future);
}
Expand Up @@ -70,7 +70,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.ExecutorService;
import javax.security.auth.Subject;
import org.dcache.auth.Subjects;
import org.dcache.auth.attributes.Restriction;
Expand Down Expand Up @@ -102,9 +101,6 @@ public final class BulkActivityFactory implements CellMessageSender, Environment
new HashMap<>());

private Map<String, BulkTargetRetryPolicy> retryPolicies;
private Map<String, ExecutorService> activityExecutors;
private Map<String, ExecutorService> callbackExecutors;
private Map<String, Integer> maxPermits;
private Map<String, Object> environment;

private CellStub pnfsManager;
Expand Down Expand Up @@ -142,8 +138,6 @@ public BulkActivity createActivity(BulkRequest request, Subject subject,
bulkActivity.setSubject(subject);
bulkActivity.setRestriction(restriction);

bulkActivity.setActivityExecutor(activityExecutors.get(activity));
bulkActivity.setCallbackExecutor(callbackExecutors.get(activity));
BulkTargetRetryPolicy retryPolicy = retryPolicies.get(activity);
if (retryPolicy != null) {
bulkActivity.setRetryPolicy(retryPolicy);
Expand All @@ -163,8 +157,6 @@ public void initialize() {
ServiceLoader<BulkActivityProvider> serviceLoader
= ServiceLoader.load(BulkActivityProvider.class);
for (BulkActivityProvider provider : serviceLoader) {
String activity = provider.getActivity();
provider.setMaxPermits(maxPermits.get(activity));
provider.configure(environment);
providers.put(provider.getActivity(), provider);
}
Expand Down Expand Up @@ -215,26 +207,11 @@ public void setQoSResponseReceiver(QoSResponseReceiver qoSResponseReceiver) {
this.qoSResponseReceiver = qoSResponseReceiver;
}

@Required
public void setMaxPermits(Map<String, Integer> maxPermits) {
this.maxPermits = maxPermits;
}

@Required
public void setRetryPolicies(Map<String, BulkTargetRetryPolicy> retryPolicies) {
this.retryPolicies = retryPolicies;
}

@Required
public void setActivityExecutors(Map<String, ExecutorService> activityExecutors) {
this.activityExecutors = activityExecutors;
}

@Required
public void setCallbackExecutors(Map<String, ExecutorService> callbackExecutors) {
this.callbackExecutors = callbackExecutors;
}

@Override
public void setEnvironment(Map<String, Object> environment) {
this.environment = environment;
Expand Down
Expand Up @@ -73,7 +73,6 @@ public abstract class BulkActivityProvider<J extends BulkActivity> {

protected final String activity;
protected final TargetType targetType;
protected int maxPermits;

protected BulkActivityProvider(String activity, TargetType targetType) {
this.activity = activity;
Expand All @@ -88,22 +87,13 @@ public TargetType getTargetType() {
return targetType;
}

public int getMaxPermits() {
return maxPermits;
}

public void setMaxPermits(int maxPermits) {
this.maxPermits = maxPermits;
}

/**
* @return an instance of the specific activity type to be configured by factory.
*
* @throws BulkServiceException
*/
public J createActivity() throws BulkServiceException {
J activity = activityInstance();
activity.setMaxPermits(maxPermits);
activity.setDescriptors(getDescriptors());
return activity;
}
Expand Down
Expand Up @@ -71,6 +71,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.dcache.namespace.FileType;
import org.dcache.services.bulk.activity.BulkActivity;
import org.dcache.services.bulk.util.BulkRequestTarget;
Expand Down Expand Up @@ -105,8 +106,8 @@ public void setNamespaceHandler(PnfsHandler pnfsHandler) {
}

@Override
protected void handleCompletion(BulkRequestTarget target,
ListenableFuture<PnfsDeleteEntryMessage> future) {
public void handleCompletion(BulkRequestTarget target,
Future<PnfsDeleteEntryMessage> future) {
PnfsDeleteEntryMessage reply;
try {
reply = getUninterruptibly(future);
Expand Down
Expand Up @@ -63,6 +63,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import com.google.common.util.concurrent.ListenableFuture;
import diskCacheV111.util.FsPath;
import java.util.Map;
import java.util.concurrent.Future;
import org.dcache.services.bulk.activity.BulkActivity;
import org.dcache.services.bulk.util.BulkRequestTarget;
import org.dcache.services.bulk.util.BulkRequestTarget.State;
Expand Down Expand Up @@ -110,7 +111,7 @@ public ListenableFuture<BulkRequestTarget> perform(String ruid, long tid, FsPath
}

@Override
protected void handleCompletion(BulkRequestTarget target, ListenableFuture future) {
public void handleCompletion(BulkRequestTarget target, Future future) {
target.setState(State.COMPLETED);
}

Expand Down
Expand Up @@ -75,6 +75,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.dcache.cells.CellStub;
import org.dcache.pinmanager.PinManagerAware;
import org.dcache.pinmanager.PinManagerPinMessage;
Expand Down Expand Up @@ -104,7 +105,7 @@ public void setNamespaceHandler(PnfsHandler pnfsHandler) {
}

@Override
protected void handleCompletion(BulkRequestTarget target, ListenableFuture<Message> future) {
public void handleCompletion(BulkRequestTarget target, Future<Message> future) {
Message reply;
try {
reply = getUninterruptibly(future);
Expand Down
Expand Up @@ -76,6 +76,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.dcache.cells.CellStub;
import org.dcache.qos.QoSException;
import org.dcache.qos.data.FileQoSRequirements;
Expand Down Expand Up @@ -197,7 +198,7 @@ protected void configure(Map<String, String> arguments) throws BulkServiceExcept

@Override
public void handleCompletion(BulkRequestTarget target,
ListenableFuture<QoSTransitionCompletedMessage> future) {
Future<QoSTransitionCompletedMessage> future) {
QoSTransitionCompletedMessage message;
try {
message = getUninterruptibly(future);
Expand Down
Expand Up @@ -74,7 +74,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.services.bulk.BulkRequestStatus;
import org.dcache.services.bulk.BulkServiceException;
import org.dcache.services.bulk.BulkStorageException;
import org.dcache.services.bulk.job.AbstractRequestContainerJob;
import org.dcache.services.bulk.job.BulkRequestContainerJob;
import org.dcache.services.bulk.job.RequestContainerJobFactory;
import org.dcache.services.bulk.manager.BulkRequestManager;
import org.dcache.services.bulk.store.BulkRequestStore;
Expand Down Expand Up @@ -210,7 +210,7 @@ public void setTargetStore(BulkTargetStore targetStore) {

@Override
public void submitRequestJob(BulkRequest request) throws BulkServiceException {
AbstractRequestContainerJob job = jobFactory.createRequestJob(request);
BulkRequestContainerJob job = jobFactory.createRequestJob(request);
if (storeJobTarget(job.getTarget())) {
requestManager.submit(job);
}
Expand Down

0 comments on commit b640b5e

Please sign in to comment.