From 8f26701d8868b8fc1bfd394895130ff8f7f2fbfa Mon Sep 17 00:00:00 2001 From: Dmitry Litvintsev Date: Wed, 9 Aug 2023 14:40:53 -0500 Subject: [PATCH] pnfsmanager: make list scheduling behavior optional (selectable) Motivation: Commit d7653b9a01649ad30dd29541af467b95daefe6b8 has introduced sequential listing and folding of list requests of the same directories. This works very well in the environment where there are mutliple listing requests to the same directories simultaneously. But we discovered an edge case. A site has a directory with 15M entries that is listed periodically. Since it takes a lot of time to serve 15M listing, the list requests to other directories that are dispatched to the same queue (due to modulo of hashCode clash) are backing up even though the other threads are idle. Modification: Added dCache property variable: pnfsmanager.enable.parallel-listing that allows to restore previous behavior - listing requests are dispatched to a single queue that is prrocessed by multiple parallel workers. Result: dCache admin can change behavior of listing scheduler. Target: trunk Request: 8.2, 9.2, 9.1, 9.0 Patch: https://rb.dcache.org/r/14048/ Acked-by: Tigran, Lea Require-book: yes Require-notes: yes --- .../namespace/pnfsmanager-chimera.xml | 1 + .../namespace/PnfsManagerV3.java | 103 +++++++++++++----- skel/share/defaults/pnfsmanager.properties | 27 ++++- skel/share/services/pnfsmanager.batch | 1 + 4 files changed, 100 insertions(+), 32 deletions(-) diff --git a/modules/dcache-chimera/src/main/resources/diskCacheV111/namespace/pnfsmanager-chimera.xml b/modules/dcache-chimera/src/main/resources/diskCacheV111/namespace/pnfsmanager-chimera.xml index 8bd1c78d279..480aae966bf 100644 --- a/modules/dcache-chimera/src/main/resources/diskCacheV111/namespace/pnfsmanager-chimera.xml +++ b/modules/dcache-chimera/src/main/resources/diskCacheV111/namespace/pnfsmanager-chimera.xml @@ -35,6 +35,7 @@ + diff --git a/modules/dcache/src/main/java/diskCacheV111/namespace/PnfsManagerV3.java b/modules/dcache/src/main/java/diskCacheV111/namespace/PnfsManagerV3.java index 81de1d8f3f7..ad8ce543e9b 100644 --- a/modules/dcache/src/main/java/diskCacheV111/namespace/PnfsManagerV3.java +++ b/modules/dcache/src/main/java/diskCacheV111/namespace/PnfsManagerV3.java @@ -211,6 +211,7 @@ public class PnfsManagerV3 private boolean quotaEnabled; private boolean useParentHashOnCreate; + private boolean useParallelListing; /** * Whether to use folding. @@ -315,10 +316,16 @@ public void setQuotaEnabled(boolean quotaEnabled) { this.quotaEnabled = quotaEnabled; } + @Required public void setUseParentHashOnCreate(boolean useParentHashOnCreate) { this.useParentHashOnCreate = useParentHashOnCreate; } + @Required + public void setUseParallelListing(boolean useParallelListing) { + this.useParallelListing = useParallelListing; + } + @Required public void setScheduledExecutor(ScheduledExecutorService executor) { scheduledExecutor = executor; @@ -430,20 +437,44 @@ public void init() { executor.execute(new ProcessThread(_fifos[i])); } - /** - * Start separate queues for list operations. - */ - _listQueues = new BlockingQueue[_listThreads]; - for (int i = 0; i < _listQueues.length; i++) { - if (_queueMaxSize > 0) { - _listQueues[i] = new LinkedBlockingQueue<>(_queueMaxSize); - } else { - _listQueues[i] = new LinkedBlockingQueue<>(); - } - ProcessThread t = new ProcessThread(_listQueues[i]); - _listProcessThreads.add(t); - executor.execute(t); - } + if (useParallelListing) { + /** + * when using parallel listing we have _listThreads + * consumers serving a single queue. + */ + _listQueues = new BlockingQueue[1]; + if (_queueMaxSize > 0) { + _listQueues[0] = new LinkedBlockingQueue<>(_queueMaxSize); + } else { + _listQueues[0] = new LinkedBlockingQueue<>(); + } + + /** + * spawn consumers + */ + for (int i = 0; i < _listThreads; i++) { + ProcessThread t = new ProcessThread(_listQueues[0]); + _listProcessThreads.add(t); + executor.execute(t); + } + } else { + /** + * Start separate _listThreads queues for list operations. + * each consumer processes a dedicated queue + */ + _listQueues = new BlockingQueue[_listThreads]; + for (int i = 0; i < _listQueues.length; i++) { + if (_queueMaxSize > 0) { + _listQueues[i] = new LinkedBlockingQueue<>(_queueMaxSize); + } else { + _listQueues[i] = new LinkedBlockingQueue<>(); + } + ProcessThread t = null; + t = new ProcessThread(_listQueues[i]); + _listProcessThreads.add(t); + executor.execute(t); + } + } } public void shutdown() throws InterruptedException { @@ -2196,25 +2227,28 @@ private void sendPartialReply() { _messageCount++; _msg.setMessageCount(_messageCount); - /** - * fold other list requests for the same target in the queue - */ - for (CellMessage message : _fifo) { + if (!useParallelListing) { + /** + * fold other list requests for the same target in the queue + */ - PnfsMessage other = (PnfsMessage) message.getMessageObject(); + for (CellMessage message : _fifo) { - if (other.invalidates(_msg)) { - break; - } + PnfsMessage other = (PnfsMessage) message.getMessageObject(); + + if (other.invalidates(_msg)) { + break; + } - if (other.fold(_msg)) { - other.setReply(); - CellPath source = message.getSourcePath().revert(); - CellMessage parcel = new CellMessage(source, other); - parcel.setLastUOID(message.getUOID()); - sendMessage(parcel); - ((PnfsListDirectoryMessage)other).clear(); + if (other.fold(_msg)) { + other.setReply(); + CellPath source = message.getSourcePath().revert(); + CellMessage parcel = new CellMessage(source, other); + parcel.setLastUOID(message.getUOID()); + sendMessage(parcel); + ((PnfsListDirectoryMessage)other).clear(); + } } } _msg.clear(); @@ -2608,7 +2642,16 @@ public void messageArrived(CellMessage envelope, PnfsListDirectoryMessage messag throw new InvalidMessageCacheException("Missing PNFS id and path"); } - int index = (int)(Math.abs((long)Objects.hashCode(path.toString())) % _listThreads); + int index = 0; + + if (!useParallelListing) { + index = (int)(Math.abs((long)Objects.hashCode(path.toString())) % _listThreads); + } + + /** + * when useParallelListing is true, we only have 1 queue in the + * list of queues below + */ if (!_listQueues[index].offer(envelope)) { throw new MissingResourceCacheException("PnfsManager queue limit exceeded"); diff --git a/skel/share/defaults/pnfsmanager.properties b/skel/share/defaults/pnfsmanager.properties index 872ec868fa2..cf59157e7b4 100644 --- a/skel/share/defaults/pnfsmanager.properties +++ b/skel/share/defaults/pnfsmanager.properties @@ -83,11 +83,12 @@ pnfsmanager.limits.threads = ${pnfsmanager.limits.threads-per-group} # directory. This leads to all available threads being busy/hanging # processing create entry messages denying other users from # accessing the namespace. The switch below, if enabled, would cause -# the create mesages to be dispatched to a thread associated -# with that entry's parent (that is the target directory). +# the create mesages to be dispatched to a thread associated +# with that entry's parent (that is the target directory). # (one-of?true|false)pnfsmanager.use-parent-hash-on-create = false + # ---- Number of list threads # # The PnfsManager uses dedicated threads for directory list @@ -105,6 +106,28 @@ pnfsmanager.limits.list-threads = 2 # pnfsmanager.limits.list-chunk-size = 100 +# ---- Determines listing scheduling algorithm behavior +# +# When set to false, PnfsManager spawns pnfsmanager.limits.list-threads +# threads with each thread processing a dedicated FIFO queue. Listing +# requests are dispactehd to these queues based on path name hashcode. +# So that listing of the same directory is dispatched to the same +# queue. If there are multiple listing requests for the same directory +# queued up, once first listing complete the rest will be filled from +# the result of just completed request (mechanism referred to as +# folding). +# +# When set to true, the PnfsManager spawns pnfsmanager.limits.list-threads +# threads serving a single list processing FIFO queue. If there are multiple +# list requests for the same directory they will all be served simultaneously +# (hence "parallel-listing") provided there are sufficient active threads +# remaining. +# + +(one-of?true|false)pnfsmanager.enable.parallel-listing = false + + + # ---- Threshold for when to log slow requests # # Threshold in milliseconds for when to log slow requests. Requests diff --git a/skel/share/services/pnfsmanager.batch b/skel/share/services/pnfsmanager.batch index d2b32a79f0e..e0762b8fba9 100644 --- a/skel/share/services/pnfsmanager.batch +++ b/skel/share/services/pnfsmanager.batch @@ -9,6 +9,7 @@ check -strong pnfsmanager.enable.folding check -strong pnfsmanager.enable.acl check -strong pnfsmanager.default-retention-policy check -strong pnfsmanager.default-access-latency +check -strong pnfsmanager.enable.parallel-listing check pnfsmanager.destination.flush-notification check pnfsmanager.destination.cache-notification check pnfsmanager.destination.cancel-upload-notification