Skip to content

Commit

Permalink
dcache-bulk: allow multi-threaded directory listing on expandDirectories
Browse files Browse the repository at this point in the history
Motivation:

In order to scale out bulk requests concurrency,
the bulk service was redesigned to run each request
in its own container.  Each container uses a single
thread with a reentrant semaphor but a thread pool
to process the async Futures for each action
performed.

However, if there is directory listing (expandDirectories),
this currently is done on the main thread depth-first.
This turns out to be rather slow (only about 2K
targets a minute, or ~33Hz).

Depth-first ordering of the target ids, on the other
hand, is only necessary for directory targets
(e.g., on DELETE), which are then processed serially.

Modification:

Allow the depth-first expansion to be asynchronous
and multithreaded (each directory given a
separate threaded task).

Since this means the strictly monotonic ordering
of target ids is no longer the case, we also need to
sort the directory targets (if any) that are
deferred in memory by path depth.

The container uses a BoundedCachedExecutor to
handle the listing tasks, but the expansion
method also looks at the current state
of the thread queue, and only sends
the directory task to the executor if there
are idle threads; otherwise, it allows
the directory to be listed (shallow) on
the main thread.  This permits a bit
more concurrency and avoids total
starvation of running requests.

Result:

Much improved performance (by about
an order of magnitude), with the
thread pool at about 20 (this is
of course adjustable).

`This patch is for master only,
not just because it is an enhancement,
but also because the current call for
recursive request usage is low.
Making this available in 9.2
seems to me reasonable.`

Target: master
Patch:  https://rb.dcache.org/r/14041
Acked-by: Tigran
  • Loading branch information
alrossi committed Aug 3, 2023
1 parent 50588d0 commit 2acfaf8
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 54 deletions.
Expand Up @@ -82,7 +82,9 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import javax.security.auth.Subject;
import org.dcache.auth.attributes.Restriction;
import org.dcache.services.bulk.BulkRequest;
Expand All @@ -98,6 +100,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.services.bulk.util.BulkRequestTarget.State;
import org.dcache.services.bulk.util.BulkRequestTargetBuilder;
import org.dcache.services.bulk.util.BulkServiceStatistics;
import org.dcache.util.BoundedExecutor;
import org.dcache.util.SignalAware;
import org.dcache.util.list.DirectoryEntry;
import org.dcache.util.list.DirectoryStream;
Expand Down Expand Up @@ -134,6 +137,22 @@ public static FsPath findAbsolutePath(String prefix, String path) {
return absPath;
}

abstract class DirListTask implements Runnable {

protected Consumer<Throwable> errorHandler = e -> uncaughtException(Thread.currentThread(), e);

public void run() {
try {
doList();
} catch (Throwable e) {
errorHandler.accept(e);
Throwables.throwIfUnchecked(e);
}
}

protected abstract void doList() throws Throwable;
}

enum ContainerState {
START, PROCESS_FILES, WAIT, PROCESS_DIRS, STOP
}
Expand All @@ -150,9 +169,10 @@ enum ContainerState {
* A temporary placeholder for tracking purposes; it will not be the same as the actual
* autogenerated key in the database.
*/
protected final AtomicLong id = new AtomicLong(0L);

protected final AtomicLong id;
protected final BulkServiceStatistics statistics;
protected final AtomicInteger dirExpansionCount;

protected BulkTargetStore targetStore;
protected PnfsHandler pnfsHandler;
protected Semaphore semaphore;
Expand All @@ -168,10 +188,12 @@ enum ContainerState {
private ListDirectoryHandler listHandler;
private SignalAware callback;

private BoundedExecutor dirListExecutor;
private Thread runThread;

AbstractRequestContainerJob(BulkActivity activity, BulkRequestTarget target,
BulkRequest request, BulkServiceStatistics statistics) {
id = new AtomicLong(0L);
this.request = request;
this.activity = activity;
this.target = target;
Expand All @@ -187,6 +209,7 @@ enum ContainerState {
targetType = activity.getTargetType();
semaphore = new Semaphore(activity.getMaxPermits());
containerState = ContainerState.START;
dirExpansionCount = new AtomicInteger(0);
}

public void cancel() {
Expand Down Expand Up @@ -320,6 +343,10 @@ public void setListHandler(ListDirectoryHandler listHandler) {
this.listHandler = listHandler;
}

public void setDirListExecutor(BoundedExecutor dirListExecutor) {
this.dirListExecutor = dirListExecutor;
}

@Override
public void setNamespaceHandler(PnfsHandler pnfsHandler) {
this.pnfsHandler = pnfsHandler;
Expand Down Expand Up @@ -380,61 +407,86 @@ protected DirectoryStream getDirectoryListing(FsPath path)
protected void expandDepthFirst(Long id, PID pid, FsPath path, FileAttributes dirAttributes)
throws BulkServiceException, CacheException, InterruptedException {
checkForRequestCancellation();
DirListTask task = new DirListTask() {
@Override
public void doList() throws Throwable {
try {
DirectoryStream stream = getDirectoryListing(path);
for (DirectoryEntry entry : stream) {
LOGGER.trace("expandDepthFirst {}, directory {}, entry {}", ruid, path,
entry.getName());
FsPath childPath = path.child(entry.getName());
FileAttributes childAttributes = entry.getFileAttributes();

switch (childAttributes.getFileType()) {
case DIR:
switch (depth) {
case ALL:
LOGGER.debug("expandDepthFirst {}, found directory {}, "
+ "expand ALL.", ruid, childPath);
expandDepthFirst(null, PID.DISCOVERED, childPath,
childAttributes);
break;
case TARGETS:
switch (targetType) {
case BOTH:
case DIR:
handleDirTarget(null, PID.DISCOVERED, childPath,
childAttributes);
}
break;
}
break;
case LINK:
case REGULAR:
handleFileTarget(PID.DISCOVERED, childPath, childAttributes);
break;
case SPECIAL:
default:
LOGGER.trace("expandDepthFirst {}, cannot handle special "
+ "file {}.", ruid, childPath);
break;
}

checkForRequestCancellation();
}

DirectoryStream stream = getDirectoryListing(path);
for (DirectoryEntry entry : stream) {
LOGGER.trace("expandDepthFirst {}, directory {}, entry {}", ruid, path,
entry.getName());
FsPath childPath = path.child(entry.getName());
FileAttributes childAttributes = entry.getFileAttributes();

switch (childAttributes.getFileType()) {
case DIR:
switch (depth) {
case ALL:
LOGGER.debug("expandDepthFirst {}, found directory {}, "
+ "expand ALL.", ruid, childPath);
expandDepthFirst(null, PID.DISCOVERED, childPath, childAttributes);
switch (targetType) {
case BOTH:
case DIR:
handleDirTarget(id, pid, path, dirAttributes);
break;
case TARGETS:
switch (targetType) {
case BOTH:
case DIR:
handleDirTarget(null, PID.DISCOVERED, childPath, childAttributes);
case FILE:
/*
* Because we now store all initial targets immediately,
* we need to mark such a directory as SKIPPED; otherwise
* the request will not complete on the basis of querying for
* completed targets and finding this one unhandled.
*/
if (pid == PID.INITIAL) {
targetStore.storeOrUpdate(
toTarget(id, pid, path, Optional.of(dirAttributes),
SKIPPED, null));
}
break;
}
break;
case LINK:
case REGULAR:
handleFileTarget(PID.DISCOVERED, childPath, childAttributes);
break;
case SPECIAL:
default:
LOGGER.trace("expandDepthFirst {}, cannot handle special "
+ "file {}.", ruid, childPath);
break;
} finally {
dirExpansionCount.decrementAndGet();
checkTransitionToDirs();
}
}
};

checkForRequestCancellation();
}
dirExpansionCount.incrementAndGet();

switch (targetType) {
case BOTH:
case DIR:
handleDirTarget(id, pid, path, dirAttributes);
break;
case FILE:
/*
* Because we now store all initial targets immediately,
* we need to mark such a directory as SKIPPED; otherwise
* the request will not complete on the basis of querying for
* completed targets and finding this one unhandled.
*/
if (pid == PID.INITIAL) {
targetStore.storeOrUpdate(toTarget(id, pid, path, Optional.of(dirAttributes),
SKIPPED, null));
}
/*
* The executor is shared among containers. To avoid total inactivity should
* the running container be starved by other containers, we allow it
* to execute on its own thread if all other threads are currently occupied.
*/
if (dirListExecutor.getWorkQueueSize() >= dirListExecutor.getMaximumPoolSize()) {
task.run();
} else {
dirListExecutor.execute(task);
}
}

Expand Down Expand Up @@ -484,7 +536,7 @@ protected FsPath resolvePath(String targetPath) throws CacheException {
}

private void checkTransitionToDirs() {
if (semaphore.availablePermits() == activity.getMaxPermits()) {
if (dirExpansionCount.get() <= 0 && semaphore.availablePermits() == activity.getMaxPermits()) {
synchronized (this) {
if (containerState == ContainerState.WAIT) {
containerState = ContainerState.PROCESS_DIRS;
Expand Down
Expand Up @@ -68,6 +68,8 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import com.google.common.util.concurrent.ListenableFuture;
import diskCacheV111.util.CacheException;
import diskCacheV111.util.FsPath;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
Expand All @@ -94,17 +96,28 @@ public final class RequestContainerJob extends AbstractRequestContainerJob {

private static final Logger LOGGER = LoggerFactory.getLogger(RequestContainerJob.class);

private static final DirTargetSorter SORTER = new DirTargetSorter();

static class DirTarget {
final FsPath path;
final FileAttributes attributes;
final PID pid;
final Long id;
final int depth;

DirTarget(Long id, PID pid, FsPath path, FileAttributes attributes) {
this.id = id;
this.pid = pid;
this.attributes = attributes;
this.path = path;
depth = (int)path.toString().chars().filter(c -> c == '/').count();
}
}

static class DirTargetSorter implements Comparator<DirTarget> {
@Override
public int compare(DirTarget o1, DirTarget o2) {
return Integer.compare(o2.depth, o1.depth); /* DESCENDING ORDER */
}
}

Expand Down Expand Up @@ -159,7 +172,9 @@ protected void processFileTargets() throws InterruptedException {

@Override
protected void processDirTargets() throws InterruptedException {
for (DirTarget dirTarget : dirs) {
DirTarget[] sorted = dirs.toArray(new DirTarget[0]);
Arrays.sort(sorted, SORTER);
for (DirTarget dirTarget : sorted) {
checkForRequestCancellation();
perform(dirTarget.id, dirTarget.pid, dirTarget.path, dirTarget.attributes);
}
Expand Down
Expand Up @@ -79,6 +79,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.services.bulk.util.BulkRequestTarget.PID;
import org.dcache.services.bulk.util.BulkRequestTargetBuilder;
import org.dcache.services.bulk.util.BulkServiceStatistics;
import org.dcache.util.BoundedExecutor;
import org.dcache.util.list.ListDirectoryHandler;
import org.dcache.vehicles.FileAttributes;
import org.slf4j.Logger;
Expand All @@ -99,6 +100,7 @@ public final class RequestContainerJobFactory {
private ListDirectoryHandler listHandler;
private BulkTargetStore targetStore;
private BulkServiceStatistics statistics;
private BoundedExecutor dirListExecutor;

public AbstractRequestContainerJob createRequestJob(BulkRequest request)
throws BulkServiceException {
Expand Down Expand Up @@ -127,6 +129,7 @@ public AbstractRequestContainerJob createRequestJob(BulkRequest request)
containerJob.setNamespaceHandler(pnfsHandler);
containerJob.setTargetStore(targetStore);
containerJob.setListHandler(listHandler);
containerJob.setDirListExecutor(dirListExecutor);
containerJob.initialize();
return containerJob;
}
Expand All @@ -136,6 +139,11 @@ public void setActivityFactory(BulkActivityFactory activityFactory) {
this.activityFactory = activityFactory;
}

@Required
public void setDirListExecutor(BoundedExecutor dirListExecutor) {
this.dirListExecutor = dirListExecutor;
}

@Required
public void setListHandler(ListDirectoryHandler listHandler) {
this.listHandler = listHandler;
Expand Down
Expand Up @@ -80,7 +80,7 @@
</bean>

<bean id="cancellation-executor" class="org.dcache.util.CDCScheduledExecutorServiceDecorator">
<description>Used to run delayed clear requests.</description>
<description>Used to cancel requests.</description>
<constructor-arg>
<bean class="java.util.concurrent.ScheduledThreadPoolExecutor">
<constructor-arg value="${bulk.limits.cancellation-threads}"/>
Expand All @@ -104,6 +104,10 @@
</constructor-arg>
</bean>

<bean id="dir-list-executor" class="org.dcache.util.BoundedCachedExecutor">
<constructor-arg value="${bulk.limits.dir-list-threads}"/>
</bean>

<bean id="bulk-data-source" class="com.zaxxer.hikari.HikariDataSource">
<description>Encapsulates the bulk database connection pool and properties.</description>
<constructor-arg>
Expand Down Expand Up @@ -250,6 +254,7 @@
<property name="requestStore" ref="request-store"/>
<property name="targetStore" ref="target-store"/>
<property name="statistics" ref="statistics"/>
<property name="dirListExecutor" ref="dir-list-executor"/>
</bean>

<bean id="statistics" class="org.dcache.services.bulk.util.BulkServiceStatistics">
Expand Down Expand Up @@ -326,6 +331,7 @@
<ref bean="cancellation-executor"/>
<ref bean="manager-executor"/>
<ref bean="reset-executor"/>
<ref bean="dir-list-executor"/>
</list>
</property>
<property name="dataSource" ref="bulk-data-source"/>
Expand Down
3 changes: 2 additions & 1 deletion skel/share/defaults/bulk.properties
Expand Up @@ -60,7 +60,8 @@ bulk.request-scheduler=org.dcache.services.bulk.manager.scheduler.LeastRecentFir
bulk.limits.container-processing-threads=200
bulk.limits.activity-callback-threads=50
bulk.limits.incoming-request-threads=10
bulk.limits.cancellation-threads=25
bulk.limits.cancellation-threads=20
bulk.limits.dir-list-threads=20
(deprecated)bulk.limits.delay-clear-threads=

# ---- Expiration of the cache serving to front the request storage.
Expand Down
1 change: 1 addition & 0 deletions skel/share/services/bulk.batch
Expand Up @@ -11,6 +11,7 @@ check -strong bulk.limits.container-processing-threads
check -strong bulk.limits.activity-callback-threads
check -strong bulk.limits.incoming-request-threads
check -strong bulk.limits.cancellation-threads
check -strong bulk.limits.dir-list-threads
check -strong bulk.limits.request-cache-expiration
check -strong bulk.limits.request-cache-expiration.unit
check -strong bulk.limits.max-requests-per-user
Expand Down

0 comments on commit 2acfaf8

Please sign in to comment.