From 66b67eabe35375a6ace1d60d75204391bda32d7d Mon Sep 17 00:00:00 2001 From: Albert Louis Rossi Date: Tue, 10 Oct 2023 08:47:02 -0500 Subject: [PATCH] dcache-bulk: give directory listing a separate executor Motivation: The container rewrite originally put everything, including directory listing, on the same unbounded thread pool. Then a change was made to make that pool bounded. However, the extra semaphore permits involved can lead to a deadlock situation where no further listing takes place and tasks make no progress (this was observed and is reproducible). Modification: Return directory listing to its own executor. We also modify slightly the construction of the inner tasks so that only parent methods which manipulate the semaphore are necessary. Result: Lockup eliminated. Target: master Request: 9.2 Patch: https://rb.dcache.org/r/14126/ Requires-notes: yes, if backported after initial release. Acked-by: Tigran --- .../bulk/job/BulkRequestContainerJob.java | 57 +++++++++---------- .../manager/ConcurrentRequestManager.java | 11 ++++ .../org/dcache/services/bulk/bulk.xml | 5 ++ 3 files changed, 44 insertions(+), 29 deletions(-) diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java index b1444ab34b2..9047b348bea 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java @@ -206,6 +206,9 @@ abstract class ContainerTask implements Runnable { final long seqNo; Future taskFuture; + boolean holdingPermit; + ExecutorService taskExecutor; + Semaphore taskSemaphore; ContainerTask() { seqNo = taskCounter.getAndIncrement(); @@ -242,11 +245,17 @@ void expandDepthFirst(Long id, PID pid, FsPath path, FileAttributes dirAttribute void submitAsync() throws InterruptedException { checkForRequestCancellation(); + if (!holdingPermit) { + taskSemaphore.acquire(); + holdingPermit = true; + } + synchronized (running) { running.put(seqNo, this); LOGGER.debug("{} - submitAsync {}, task count is now {}.", ruid, seqNo, running.size()); } - taskFuture = executor.submit(this); + + taskFuture = taskExecutor.submit(this); } void remove() { @@ -255,6 +264,11 @@ void remove() { LOGGER.debug("{} - remove task {}, task count now {}.", ruid, seqNo, running.size()); } + if (holdingPermit) { + taskSemaphore.release(); + holdingPermit = false; + } + checkTransitionToDirs(); } @@ -272,6 +286,8 @@ class DirListTask extends ContainerTask { this.pid = pid; this.path = path; this.dirAttributes = dirAttributes; + taskExecutor = listExecutor; + taskSemaphore = dirListSemaphore; } void doInner() throws Throwable { @@ -345,15 +361,10 @@ private void addDirTarget(Long id, PID pid, FsPath path, FileAttributes attribut private DirectoryStream getDirectoryListing(FsPath path) throws CacheException, InterruptedException { - dirListSemaphore.acquire(); - try { - LOGGER.debug("{} - DirListTask, getDirectoryListing for path {}, calling list ...", - ruid, path); - return listHandler.list(subject, restriction, path, null, - Range.closedOpen(0, Integer.MAX_VALUE), MINIMALLY_REQUIRED_ATTRIBUTES); - } finally { - dirListSemaphore.release(); - } + LOGGER.debug("{} - DirListTask, getDirectoryListing for path {}, calling list ...", + ruid, path); + return listHandler.list(subject, restriction, path, null, + Range.closedOpen(0, Integer.MAX_VALUE), MINIMALLY_REQUIRED_ATTRIBUTES); } } @@ -371,11 +382,11 @@ class TargetTask extends ContainerTask { */ TaskState state; - boolean holdingPermit; - TargetTask(BulkRequestTarget target, TaskState initialState) { this.target = target; state = initialState; + taskExecutor = BulkRequestContainerJob.this.executor; + taskSemaphore = inFlightSemaphore; } void cancel() { @@ -415,23 +426,6 @@ void doInner() throws Throwable { } } - @Override - void submitAsync() throws InterruptedException { - if (!holdingPermit) { - inFlightSemaphore.acquire(); - holdingPermit = true; - } - super.submitAsync(); - } - - void remove() { - super.remove(); - if (holdingPermit) { - inFlightSemaphore.release(); - holdingPermit = false; - } - } - void performSync() throws InterruptedException { performActivity(false); @@ -668,6 +662,7 @@ private void storeOrUpdate(Throwable error) throws InterruptedException { private SignalAware callback; private Thread runThread; private ExecutorService executor; + private ExecutorService listExecutor; private ExecutorService callbackExecutor; private Semaphore dirListSemaphore; private Semaphore inFlightSemaphore; @@ -847,6 +842,10 @@ public void setExecutor(ExecutorService executor) { this.executor = executor; } + public void setListExecutor(ExecutorService listExecutor) { + this.listExecutor = listExecutor; + } + public void setNamespaceHandler(PnfsHandler pnfsHandler) { this.pnfsHandler = pnfsHandler; } diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java index 951841c7143..1bef438619c 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java @@ -315,6 +315,11 @@ private ListMultimap userRequests() { */ private ExecutorService callbackExecutor; + /** + * Thread dedicated to directory listing. + */ + private ExecutorService listExecutor; + /** * Thread dedicated to jobs. */ @@ -440,6 +445,11 @@ public void setExecutor(BoundedCachedExecutor pooledExecutor) { this.executorService = pooledExecutor; } + @Required + public void setListExecutor(BoundedCachedExecutor listExecutor) { + this.listExecutor = listExecutor; + } + @Required public void setTargetStore(BulkTargetStore targetStore) { this.targetStore = targetStore; @@ -534,6 +544,7 @@ void startJob(BulkRequestContainerJob job) { LOGGER.trace("submitting job {} to executor, target {}.", key, job.getTarget()); job.setExecutor(executorService); + job.setListExecutor(listExecutor); job.setCallbackExecutor(callbackExecutor); job.setCallback(this); try { diff --git a/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml b/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml index ae4331a8a30..78909bb1b51 100644 --- a/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml +++ b/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml @@ -253,6 +253,11 @@ + + + + +