Skip to content

Commit

Permalink
pnfsmanager: make list scheduling behavior optional (selectable)
Browse files Browse the repository at this point in the history
Motivation:

Commit d7653b9 has introduced
sequential listing and folding of list requests of the same directories.
This works very well in the environment where there are mutliple listing
requests to the same directories simultaneously. But we discovered an edge case.
A site has a directory with 15M entries that is listed periodically.
Since it takes a lot of time to serve 15M listing, the list requests to
other directories that are dispatched to the same queue (due to modulo
of hashCode clash) are backing up even though the other threads are idle.

Modification:

Added dCache property variable:

pnfsmanager.enable.parallel-listing

that allows to restore previous behavior - listing requests are dispatched to
a single queue that is prrocessed by multiple parallel workers.

Result:

dCache admin can change behavior of listing scheduler.

Target: trunk
Request: 8.2, 9.2, 9.1, 9.0

Patch: https://rb.dcache.org/r/14048/
Acked-by: Tigran, Lea

Require-book: yes
Require-notes: yes
  • Loading branch information
DmitryLitvintsev authored and mksahakyan committed Aug 10, 2023
1 parent 368f419 commit 8f26701
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 32 deletions.
Expand Up @@ -35,6 +35,7 @@
<property name="fileAttributesRelay" value="${pnfsmanager.destination.file-attributes-notification}"/>
<property name="logSlowThreshold" value="${pnfsmanager.limits.log-slow-threshold}"/>
<property name="folding" value="${pnfsmanager.enable.folding}"/>
<property name="useParallelListing" value="${pnfsmanager.enable.parallel-listing}"/>
<property name="directoryListLimit" value="${pnfsmanager.limits.list-chunk-size}"/>
<property name="permissionHandler" ref="permission-handler"/>
<property name="queueMaxSize" value="${pnfsmanager.limits.queue-length}"/>
Expand Down
Expand Up @@ -211,6 +211,7 @@ public class PnfsManagerV3
private boolean quotaEnabled;

private boolean useParentHashOnCreate;
private boolean useParallelListing;

/**
* Whether to use folding.
Expand Down Expand Up @@ -315,10 +316,16 @@ public void setQuotaEnabled(boolean quotaEnabled) {
this.quotaEnabled = quotaEnabled;
}

@Required
public void setUseParentHashOnCreate(boolean useParentHashOnCreate) {
this.useParentHashOnCreate = useParentHashOnCreate;
}

@Required
public void setUseParallelListing(boolean useParallelListing) {
this.useParallelListing = useParallelListing;
}

@Required
public void setScheduledExecutor(ScheduledExecutorService executor) {
scheduledExecutor = executor;
Expand Down Expand Up @@ -430,20 +437,44 @@ public void init() {
executor.execute(new ProcessThread(_fifos[i]));
}

/**
* Start separate queues for list operations.
*/
_listQueues = new BlockingQueue[_listThreads];
for (int i = 0; i < _listQueues.length; i++) {
if (_queueMaxSize > 0) {
_listQueues[i] = new LinkedBlockingQueue<>(_queueMaxSize);
} else {
_listQueues[i] = new LinkedBlockingQueue<>();
}
ProcessThread t = new ProcessThread(_listQueues[i]);
_listProcessThreads.add(t);
executor.execute(t);
}
if (useParallelListing) {
/**
* when using parallel listing we have _listThreads
* consumers serving a single queue.
*/
_listQueues = new BlockingQueue[1];
if (_queueMaxSize > 0) {
_listQueues[0] = new LinkedBlockingQueue<>(_queueMaxSize);
} else {
_listQueues[0] = new LinkedBlockingQueue<>();
}

/**
* spawn consumers
*/
for (int i = 0; i < _listThreads; i++) {
ProcessThread t = new ProcessThread(_listQueues[0]);
_listProcessThreads.add(t);
executor.execute(t);
}
} else {
/**
* Start separate _listThreads queues for list operations.
* each consumer processes a dedicated queue
*/
_listQueues = new BlockingQueue[_listThreads];
for (int i = 0; i < _listQueues.length; i++) {
if (_queueMaxSize > 0) {
_listQueues[i] = new LinkedBlockingQueue<>(_queueMaxSize);
} else {
_listQueues[i] = new LinkedBlockingQueue<>();
}
ProcessThread t = null;
t = new ProcessThread(_listQueues[i]);
_listProcessThreads.add(t);
executor.execute(t);
}
}
}

public void shutdown() throws InterruptedException {
Expand Down Expand Up @@ -2196,25 +2227,28 @@ private void sendPartialReply() {
_messageCount++;
_msg.setMessageCount(_messageCount);

/**
* fold other list requests for the same target in the queue
*/

for (CellMessage message : _fifo) {
if (!useParallelListing) {
/**
* fold other list requests for the same target in the queue
*/

PnfsMessage other = (PnfsMessage) message.getMessageObject();
for (CellMessage message : _fifo) {

if (other.invalidates(_msg)) {
break;
}
PnfsMessage other = (PnfsMessage) message.getMessageObject();

if (other.invalidates(_msg)) {
break;
}

if (other.fold(_msg)) {
other.setReply();
CellPath source = message.getSourcePath().revert();
CellMessage parcel = new CellMessage(source, other);
parcel.setLastUOID(message.getUOID());
sendMessage(parcel);
((PnfsListDirectoryMessage)other).clear();
if (other.fold(_msg)) {
other.setReply();
CellPath source = message.getSourcePath().revert();
CellMessage parcel = new CellMessage(source, other);
parcel.setLastUOID(message.getUOID());
sendMessage(parcel);
((PnfsListDirectoryMessage)other).clear();
}
}
}
_msg.clear();
Expand Down Expand Up @@ -2608,7 +2642,16 @@ public void messageArrived(CellMessage envelope, PnfsListDirectoryMessage messag
throw new InvalidMessageCacheException("Missing PNFS id and path");
}

int index = (int)(Math.abs((long)Objects.hashCode(path.toString())) % _listThreads);
int index = 0;

if (!useParallelListing) {
index = (int)(Math.abs((long)Objects.hashCode(path.toString())) % _listThreads);
}

/**
* when useParallelListing is true, we only have 1 queue in the
* list of queues below
*/

if (!_listQueues[index].offer(envelope)) {
throw new MissingResourceCacheException("PnfsManager queue limit exceeded");
Expand Down
27 changes: 25 additions & 2 deletions skel/share/defaults/pnfsmanager.properties
Expand Up @@ -83,11 +83,12 @@ pnfsmanager.limits.threads = ${pnfsmanager.limits.threads-per-group}
# directory. This leads to all available threads being busy/hanging
# processing create entry messages denying other users from
# accessing the namespace. The switch below, if enabled, would cause
# the create mesages to be dispatched to a thread associated
# with that entry's parent (that is the target directory).
# the create mesages to be dispatched to a thread associated
# with that entry's parent (that is the target directory).
#
(one-of?true|false)pnfsmanager.use-parent-hash-on-create = false


# ---- Number of list threads
#
# The PnfsManager uses dedicated threads for directory list
Expand All @@ -105,6 +106,28 @@ pnfsmanager.limits.list-threads = 2
#
pnfsmanager.limits.list-chunk-size = 100

# ---- Determines listing scheduling algorithm behavior
#
# When set to false, PnfsManager spawns pnfsmanager.limits.list-threads
# threads with each thread processing a dedicated FIFO queue. Listing
# requests are dispactehd to these queues based on path name hashcode.
# So that listing of the same directory is dispatched to the same
# queue. If there are multiple listing requests for the same directory
# queued up, once first listing complete the rest will be filled from
# the result of just completed request (mechanism referred to as
# folding).
#
# When set to true, the PnfsManager spawns pnfsmanager.limits.list-threads
# threads serving a single list processing FIFO queue. If there are multiple
# list requests for the same directory they will all be served simultaneously
# (hence "parallel-listing") provided there are sufficient active threads
# remaining.
#

(one-of?true|false)pnfsmanager.enable.parallel-listing = false



# ---- Threshold for when to log slow requests
#
# Threshold in milliseconds for when to log slow requests. Requests
Expand Down
1 change: 1 addition & 0 deletions skel/share/services/pnfsmanager.batch
Expand Up @@ -9,6 +9,7 @@ check -strong pnfsmanager.enable.folding
check -strong pnfsmanager.enable.acl
check -strong pnfsmanager.default-retention-policy
check -strong pnfsmanager.default-access-latency
check -strong pnfsmanager.enable.parallel-listing
check pnfsmanager.destination.flush-notification
check pnfsmanager.destination.cache-notification
check pnfsmanager.destination.cancel-upload-notification
Expand Down

0 comments on commit 8f26701

Please sign in to comment.