From d7653b9a01649ad30dd29541af467b95daefe6b8 Mon Sep 17 00:00:00 2001 From: Dmitry Litvintsev Date: Mon, 22 May 2023 08:26:07 -0500 Subject: [PATCH] pnfsmanager: process listings of the same path sequentially, implement list message folding Motivation: Multiple batch jobs executing listings of the same directories especially if directories contain many entries quickly exhaust active threads, each thread executing the same listing, denying listing to all other customers and creating significant load on namespace server (and DB). Clients time out and retry creating even more load on dCache. Modification: The problem is solved by implementing PnfsListDirectoryMessage folding and dispatching list requests to multiple queues (number of queues is configurable) based on directory path hashCode. Each queue is served by a single thread. This guarantees that a listing of the same path will be queued to the same queue and only one listing of a unique path can be active. Once listing is complete, the queue is scanned for all other requests for the same path and these messages are folded (that is their payload is populated from just completed listing) and routed back to sender without ever hitting the database. Ancillary changes include - instead of hardcoded time in the future when quota and FS stat scan start, use already available settings for durations between run. This is to avoid triggering these scans when PnfsManager is restarted often (when testing for instance). And a few "final" qualifiers added where suggested by IDE. Result: Lists of large directories no longer hog PnfsManage denying service to all other list calls. Additionally observed significnat reduction of CPU and IO load on chimera DB host. Patch has been running in production at Fermilab and has shown to solve the issue of listings timing out. patch: https://rb.dcache.org/r/13992/ Acked-by: Al, Paul, Tigran Target: master Request: 9.x Request: 8.x Require-book: no Require-notes: yes --- .../vehicles/PnfsListDirectoryMessage.java | 34 +++++- .../namespace/PnfsManagerV3.java | 110 +++++++++++++----- 2 files changed, 112 insertions(+), 32 deletions(-) diff --git a/modules/dcache-vehicles/src/main/java/org/dcache/vehicles/PnfsListDirectoryMessage.java b/modules/dcache-vehicles/src/main/java/org/dcache/vehicles/PnfsListDirectoryMessage.java index cc96b1aed50..426594065ee 100644 --- a/modules/dcache-vehicles/src/main/java/org/dcache/vehicles/PnfsListDirectoryMessage.java +++ b/modules/dcache-vehicles/src/main/java/org/dcache/vehicles/PnfsListDirectoryMessage.java @@ -1,10 +1,12 @@ package org.dcache.vehicles; import static java.util.Objects.requireNonNull; +import static org.dcache.auth.Subjects.getDisplayName; import com.google.common.collect.BoundType; import com.google.common.collect.Range; import diskCacheV111.vehicles.Message; +import diskCacheV111.util.FsPath; import diskCacheV111.vehicles.PnfsMessage; import java.util.ArrayList; import java.util.Collection; @@ -42,7 +44,7 @@ public enum PathType * The last message has the following field set to true and a non-zero message count; */ private boolean _isFinal; - private int _messageCount; + private int _messageCount = 0; @@ -169,8 +171,38 @@ public int getMessageCount() { return _messageCount; } + public void setMessageCount(int messageCount) { + _messageCount = messageCount; + } + @Override public boolean invalidates(Message message) { return false; } + + @Override + public boolean fold(Message message) { + if (message instanceof PnfsListDirectoryMessage) { + String path = getPnfsPath(); + Set requested = getRequestedAttributes(); + PnfsListDirectoryMessage other = + (PnfsListDirectoryMessage) message; + + if (path.equals(other.getPnfsPath()) && + getDisplayName(getSubject()).equals(getDisplayName(other.getSubject())) && + getMessageCount() == other.getMessageCount()-1 && + other.getRequestedAttributes().containsAll(requested)) { + other.getEntries().forEach(e -> addEntry(e.getName(), + e.getFileAttributes())); + if (other.isFinal()) { + setSucceeded(other.getMessageCount()); + } + _messageCount = other.getMessageCount(); + return true; + } + } + return false; + } + + } diff --git a/modules/dcache/src/main/java/diskCacheV111/namespace/PnfsManagerV3.java b/modules/dcache/src/main/java/diskCacheV111/namespace/PnfsManagerV3.java index 09191272a02..6d432b48d35 100644 --- a/modules/dcache/src/main/java/diskCacheV111/namespace/PnfsManagerV3.java +++ b/modules/dcache/src/main/java/diskCacheV111/namespace/PnfsManagerV3.java @@ -97,6 +97,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Random; import java.util.Set; @@ -217,9 +218,9 @@ public class PnfsManagerV3 private boolean _canFold; /** - * Queues for list operations. There is one queue per thread group. + * Queues for list operations. */ - private BlockingQueue _listQueue; + private BlockingQueue[] _listQueues; /** * Tasks queues used for messages that do not operate on cache locations. @@ -253,11 +254,11 @@ public class PnfsManagerV3 private List _flushNotificationTargets; private List _cancelUploadNotificationTargets = Collections.emptyList(); - private List _listProcessThreads = new ArrayList<>(); + private final List _listProcessThreads = new ArrayList<>(); private JdbcQuota quotaSystem; - private Function pathResolver = p -> resolveSymlinks(p.toString()); + private final Function pathResolver = p -> resolveSymlinks(p.toString()); private void populateRequestMap() { _gauges.addGauge(PnfsAddCacheLocationMessage.class); @@ -429,13 +430,17 @@ public void init() { executor.execute(new ProcessThread(_fifos[i])); } - /* Start a seperate queue for list operations. We use a shared queue, - * as list operations are read only and thus there is no need - * to serialize the operations. + /** + * Start separate queues for list operations. */ - _listQueue = new LinkedBlockingQueue<>(); - for (int j = 0; j < _listThreads; j++) { - ProcessThread t = new ProcessThread(_listQueue); + _listQueues = new BlockingQueue[_listThreads]; + for (int i = 0; i < _listQueues.length; i++) { + if (_queueMaxSize > 0) { + _listQueues[i] = new LinkedBlockingQueue<>(_queueMaxSize); + } else { + _listQueues[i] = new LinkedBlockingQueue<>(); + } + ProcessThread t = new ProcessThread(_listQueues[i]); _listProcessThreads.add(t); executor.execute(t); } @@ -443,7 +448,7 @@ public void init() { public void shutdown() throws InterruptedException { drainQueues(_fifos); - drainQueue(_listQueue); + drainQueues(_listQueues); MoreExecutors.shutdownAndAwaitTermination(executor, 1, TimeUnit.SECONDS); } @@ -480,7 +485,7 @@ public void run() { } } }), - 100000, + updateFsStatIntervalUnit.toMillis(updateFsStatInterval), updateFsStatIntervalUnit.toMillis(updateFsStatInterval), TimeUnit.MILLISECONDS); @@ -493,7 +498,7 @@ public void run() { quotaSystem.updateGroupQuotas(); } }), - 600000, + updateQuotaIntervalUnit.toMillis(updateQuotaInterval), updateQuotaIntervalUnit.toMillis(updateQuotaInterval), TimeUnit.MILLISECONDS); @@ -505,7 +510,7 @@ public void run() { quotaSystem.updateUserQuotas(); } }), - 600000, + updateQuotaIntervalUnit.toMillis(updateQuotaInterval), updateQuotaIntervalUnit.toMillis(updateQuotaInterval), TimeUnit.MILLISECONDS); } @@ -529,7 +534,9 @@ public void getInfo(PrintWriter pw) { pw.println(TimeUnit.MILLISECONDS.toSeconds(_atimeGap)); } pw.println(); - pw.println("List queue: " + _listQueue.size()); + pw.println("List queue: " + + Arrays.stream(_listQueues) + .mapToInt(BlockingQueue::size).sum()); pw.println(); pw.println("Threads (" + _fifos.length + ") Queue"); for (int i = 0; i < _fifos.length; i++) { @@ -1522,9 +1529,13 @@ public class ShowListActivityCommand implements Callable { public String call() { ColumnWriter writer = buildColumnWriter(); - if (!_listQueue.isEmpty()) { + if (Arrays.stream(_listQueues).anyMatch(q -> !q.isEmpty())) { writer.section("QUEUED REQUESTS"); - _listQueue.forEach(e -> addRow(writer.row(), e)); + for (BlockingQueue queue : _listQueues) { + if (!queue.isEmpty()) { + queue.forEach(e -> addRow(writer.row(), e)); + } + } } List activity = _listProcessThreads.stream() @@ -2156,10 +2167,12 @@ private class ListHandlerImpl implements ListHandler { private final Restriction _restriction; private long _deadline; private int _messageCount; + private final BlockingQueue _fifo; public ListHandlerImpl(CellPath requestor, UOID uoid, - PnfsListDirectoryMessage msg, - long initialDelay, long delay) { + PnfsListDirectoryMessage msg, + long initialDelay, long delay, + BlockingQueue fifo) { _msg = msg; _requestor = requestor; _uoid = uoid; @@ -2172,16 +2185,38 @@ public ListHandlerImpl(CellPath requestor, UOID uoid, (delay == Long.MAX_VALUE) ? Long.MAX_VALUE : System.currentTimeMillis() + initialDelay; + _fifo = fifo; } private void sendPartialReply() { _msg.setReply(); - CellMessage envelope = new CellMessage(_requestor, _msg); envelope.setLastUOID(_uoid); sendMessage(envelope); _messageCount++; + _msg.setMessageCount(_messageCount); + + /** + * fold other list requests for the same target in the queue + */ + + for (CellMessage message : _fifo) { + PnfsMessage other = (PnfsMessage) message.getMessageObject(); + + if (other.invalidates(_msg)) { + break; + } + + if (other.fold(_msg)) { + other.setReply(); + CellPath source = message.getSourcePath().revert(); + CellMessage parcel = new CellMessage(source, other); + parcel.setLastUOID(message.getUOID()); + sendMessage(parcel); + ((PnfsListDirectoryMessage)other).clear(); + } + } _msg.clear(); } @@ -2205,7 +2240,7 @@ public int getMessageCount() { } } - private void listDirectory(CellMessage envelope, PnfsListDirectoryMessage msg) { + private void listDirectory(CellMessage envelope, PnfsListDirectoryMessage msg, BlockingQueue fifo) { if (!msg.getReplyRequired()) { return; } @@ -2224,7 +2259,7 @@ private void listDirectory(CellMessage envelope, PnfsListDirectoryMessage msg) { CellPath source = envelope.getSourcePath().revert(); ListHandlerImpl handler = new ListHandlerImpl(source, envelope.getUOID(), - msg, initialDelay, delay); + msg, initialDelay, delay, fifo); if (msg.getPathType() == PnfsListDirectoryMessage.PathType.LABEL) { _nameSpaceProvider.listVirtualDirectory(msg.getSubject(), path.substring(1), @@ -2314,12 +2349,25 @@ public void run() { sendTimeout(message, "TTL exceeded"); continue; } - - processPnfsMessage(message, pnfs); + if (!(pnfs instanceof PnfsListDirectoryMessage)) { + processPnfsMessage(message, pnfs); + } else { + long ctime = System.currentTimeMillis(); + listDirectory(message, (PnfsListDirectoryMessage) pnfs, _fifo); + long duration = System.currentTimeMillis() - ctime; + _gauges.update(pnfs.getClass(), duration); + if (_logSlowThreshold != THRESHOLD_DISABLED && + duration > _logSlowThreshold) { + LOGGER.warn("{} processed in {} ms", pnfs.getClass(), duration); + } else { + LOGGER.info("{} processed in {} ms", pnfs.getClass(), duration); + } + postProcessMessage(message, pnfs); + } fold(pnfs); } catch (Throwable e) { - LOGGER.warn("processPnfsMessage: {} : {}", Thread.currentThread().getName(), - e); + LOGGER.warn("processPnfsMessage: {} : {}", + Thread.currentThread().getName(), e); } finally { clearActivity(); CDC.clearMessageContext(); @@ -2345,10 +2393,8 @@ protected void fold(PnfsMessage message) { if (other.fold(message)) { LOGGER.info("Folded {}", other.getClass().getSimpleName()); _foldedCounters.incrementRequests(message.getClass()); - i.remove(); envelope.revertDirection(); - sendMessage(envelope); } } @@ -2556,11 +2602,15 @@ private Quota getQuota(Integer qid, QuotaType type) { public void messageArrived(CellMessage envelope, PnfsListDirectoryMessage message) throws CacheException { + String path = message.getPnfsPath(); if (path == null) { throw new InvalidMessageCacheException("Missing PNFS id and path"); } - if (!_listQueue.offer(envelope)) { + + int index = (int)(Math.abs((long)Objects.hashCode(path.toString())) % _listThreads); + + if (!_listQueues[index].offer(envelope)) { throw new MissingResourceCacheException("PnfsManager queue limit exceeded"); } } @@ -2663,8 +2713,6 @@ boolean processMessageTransactionally(CellMessage message, PnfsMessage pnfsMessa processFlushMessage((PoolFileFlushedMessage) pnfsMessage); } else if (pnfsMessage instanceof PnfsGetParentMessage) { getParent((PnfsGetParentMessage) pnfsMessage); - } else if (pnfsMessage instanceof PnfsListDirectoryMessage) { - listDirectory(message, (PnfsListDirectoryMessage) pnfsMessage); } else if (pnfsMessage instanceof PnfsGetFileAttributes) { getFileAttributes((PnfsGetFileAttributes) pnfsMessage); } else if (pnfsMessage instanceof PnfsSetFileAttributes) {