Skip to content

Commit

Permalink
pnfsmanager: process listings of the same path sequentially, implemen…
Browse files Browse the repository at this point in the history
…t list message folding

Motivation:

Multiple batch jobs executing listings of the same directories especially
if directories contain many entries quickly exhaust active threads, each thread
executing the same listing, denying listing to all other customers and
creating significant load on namespace server (and DB). Clients time out
and retry creating even more load on dCache.

Modification:

The problem is solved by implementing PnfsListDirectoryMessage folding
and dispatching list requests to multiple queues (number of queues is
configurable) based on directory path hashCode. Each queue is served by a single
thread. This guarantees that a listing of the same path will be queued to
the same queue and only one listing of a unique path can be active.
Once listing is complete, the queue is scanned for all other requests for
the same path and these messages are folded (that is their payload is
populated from just completed listing) and routed back to sender without
ever hitting the database.

Ancillary changes include - instead of hardcoded time in the future when
quota and FS stat scan start, use already available settings for durations
between run. This is to avoid triggering these scans when PnfsManager is
restarted often (when testing for instance). And a few "final" qualifiers added
where suggested by IDE.

Result:

Lists of large directories no longer hog PnfsManage denying service to all
other list calls. Additionally observed significnat reduction of CPU and
IO load on chimera DB host. Patch has been running in production at Fermilab
and has shown to solve the issue of listings timing out.

patch: https://rb.dcache.org/r/13992/
Acked-by: Al, Paul, Tigran

Target: master
Request: 9.x
Request: 8.x

Require-book: no
Require-notes: yes
  • Loading branch information
DmitryLitvintsev committed May 22, 2023
1 parent 9680a85 commit d7653b9
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 32 deletions.
@@ -1,10 +1,12 @@
package org.dcache.vehicles;

import static java.util.Objects.requireNonNull;
import static org.dcache.auth.Subjects.getDisplayName;

import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import diskCacheV111.vehicles.Message;
import diskCacheV111.util.FsPath;
import diskCacheV111.vehicles.PnfsMessage;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -42,7 +44,7 @@ public enum PathType
* The last message has the following field set to true and a non-zero message count;
*/
private boolean _isFinal;
private int _messageCount;
private int _messageCount = 0;



Expand Down Expand Up @@ -169,8 +171,38 @@ public int getMessageCount() {
return _messageCount;
}

public void setMessageCount(int messageCount) {
_messageCount = messageCount;
}

@Override
public boolean invalidates(Message message) {
return false;
}

@Override
public boolean fold(Message message) {
if (message instanceof PnfsListDirectoryMessage) {
String path = getPnfsPath();
Set<FileAttribute> requested = getRequestedAttributes();
PnfsListDirectoryMessage other =
(PnfsListDirectoryMessage) message;

if (path.equals(other.getPnfsPath()) &&
getDisplayName(getSubject()).equals(getDisplayName(other.getSubject())) &&
getMessageCount() == other.getMessageCount()-1 &&
other.getRequestedAttributes().containsAll(requested)) {
other.getEntries().forEach(e -> addEntry(e.getName(),
e.getFileAttributes()));
if (other.isFinal()) {
setSucceeded(other.getMessageCount());
}
_messageCount = other.getMessageCount();
return true;
}
}
return false;
}


}
Expand Up @@ -97,6 +97,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
Expand Down Expand Up @@ -217,9 +218,9 @@ public class PnfsManagerV3
private boolean _canFold;

/**
* Queues for list operations. There is one queue per thread group.
* Queues for list operations.
*/
private BlockingQueue<CellMessage> _listQueue;
private BlockingQueue<CellMessage>[] _listQueues;

/**
* Tasks queues used for messages that do not operate on cache locations.
Expand Down Expand Up @@ -253,11 +254,11 @@ public class PnfsManagerV3
private List<String> _flushNotificationTargets;
private List<String> _cancelUploadNotificationTargets = Collections.emptyList();

private List<ProcessThread> _listProcessThreads = new ArrayList<>();
private final List<ProcessThread> _listProcessThreads = new ArrayList<>();

private JdbcQuota quotaSystem;

private Function<FsPath, FsPath> pathResolver = p -> resolveSymlinks(p.toString());
private final Function<FsPath, FsPath> pathResolver = p -> resolveSymlinks(p.toString());

private void populateRequestMap() {
_gauges.addGauge(PnfsAddCacheLocationMessage.class);
Expand Down Expand Up @@ -429,21 +430,25 @@ public void init() {
executor.execute(new ProcessThread(_fifos[i]));
}

/* Start a seperate queue for list operations. We use a shared queue,
* as list operations are read only and thus there is no need
* to serialize the operations.
/**
* Start separate queues for list operations.
*/
_listQueue = new LinkedBlockingQueue<>();
for (int j = 0; j < _listThreads; j++) {
ProcessThread t = new ProcessThread(_listQueue);
_listQueues = new BlockingQueue[_listThreads];
for (int i = 0; i < _listQueues.length; i++) {
if (_queueMaxSize > 0) {
_listQueues[i] = new LinkedBlockingQueue<>(_queueMaxSize);
} else {
_listQueues[i] = new LinkedBlockingQueue<>();
}
ProcessThread t = new ProcessThread(_listQueues[i]);
_listProcessThreads.add(t);
executor.execute(t);
}
}

public void shutdown() throws InterruptedException {
drainQueues(_fifos);
drainQueue(_listQueue);
drainQueues(_listQueues);
MoreExecutors.shutdownAndAwaitTermination(executor, 1, TimeUnit.SECONDS);
}

Expand Down Expand Up @@ -480,7 +485,7 @@ public void run() {
}
}
}),
100000,
updateFsStatIntervalUnit.toMillis(updateFsStatInterval),
updateFsStatIntervalUnit.toMillis(updateFsStatInterval),
TimeUnit.MILLISECONDS);

Expand All @@ -493,7 +498,7 @@ public void run() {
quotaSystem.updateGroupQuotas();
}
}),
600000,
updateQuotaIntervalUnit.toMillis(updateQuotaInterval),
updateQuotaIntervalUnit.toMillis(updateQuotaInterval),
TimeUnit.MILLISECONDS);

Expand All @@ -505,7 +510,7 @@ public void run() {
quotaSystem.updateUserQuotas();
}
}),
600000,
updateQuotaIntervalUnit.toMillis(updateQuotaInterval),
updateQuotaIntervalUnit.toMillis(updateQuotaInterval),
TimeUnit.MILLISECONDS);
}
Expand All @@ -529,7 +534,9 @@ public void getInfo(PrintWriter pw) {
pw.println(TimeUnit.MILLISECONDS.toSeconds(_atimeGap));
}
pw.println();
pw.println("List queue: " + _listQueue.size());
pw.println("List queue: "
+ Arrays.stream(_listQueues)
.mapToInt(BlockingQueue::size).sum());
pw.println();
pw.println("Threads (" + _fifos.length + ") Queue");
for (int i = 0; i < _fifos.length; i++) {
Expand Down Expand Up @@ -1522,9 +1529,13 @@ public class ShowListActivityCommand implements Callable<String> {
public String call() {
ColumnWriter writer = buildColumnWriter();

if (!_listQueue.isEmpty()) {
if (Arrays.stream(_listQueues).anyMatch(q -> !q.isEmpty())) {
writer.section("QUEUED REQUESTS");
_listQueue.forEach(e -> addRow(writer.row(), e));
for (BlockingQueue<CellMessage> queue : _listQueues) {
if (!queue.isEmpty()) {
queue.forEach(e -> addRow(writer.row(), e));
}
}
}

List<ActivityReport> activity = _listProcessThreads.stream()
Expand Down Expand Up @@ -2156,10 +2167,12 @@ private class ListHandlerImpl implements ListHandler {
private final Restriction _restriction;
private long _deadline;
private int _messageCount;
private final BlockingQueue<CellMessage> _fifo;

public ListHandlerImpl(CellPath requestor, UOID uoid,
PnfsListDirectoryMessage msg,
long initialDelay, long delay) {
PnfsListDirectoryMessage msg,
long initialDelay, long delay,
BlockingQueue<CellMessage> fifo) {
_msg = msg;
_requestor = requestor;
_uoid = uoid;
Expand All @@ -2172,16 +2185,38 @@ public ListHandlerImpl(CellPath requestor, UOID uoid,
(delay == Long.MAX_VALUE)
? Long.MAX_VALUE
: System.currentTimeMillis() + initialDelay;
_fifo = fifo;
}

private void sendPartialReply() {
_msg.setReply();

CellMessage envelope = new CellMessage(_requestor, _msg);
envelope.setLastUOID(_uoid);
sendMessage(envelope);
_messageCount++;
_msg.setMessageCount(_messageCount);

/**
* fold other list requests for the same target in the queue
*/

for (CellMessage message : _fifo) {

PnfsMessage other = (PnfsMessage) message.getMessageObject();

if (other.invalidates(_msg)) {
break;
}

if (other.fold(_msg)) {
other.setReply();
CellPath source = message.getSourcePath().revert();
CellMessage parcel = new CellMessage(source, other);
parcel.setLastUOID(message.getUOID());
sendMessage(parcel);
((PnfsListDirectoryMessage)other).clear();
}
}
_msg.clear();
}

Expand All @@ -2205,7 +2240,7 @@ public int getMessageCount() {
}
}

private void listDirectory(CellMessage envelope, PnfsListDirectoryMessage msg) {
private void listDirectory(CellMessage envelope, PnfsListDirectoryMessage msg, BlockingQueue<CellMessage> fifo) {
if (!msg.getReplyRequired()) {
return;
}
Expand All @@ -2224,7 +2259,7 @@ private void listDirectory(CellMessage envelope, PnfsListDirectoryMessage msg) {
CellPath source = envelope.getSourcePath().revert();
ListHandlerImpl handler =
new ListHandlerImpl(source, envelope.getUOID(),
msg, initialDelay, delay);
msg, initialDelay, delay, fifo);

if (msg.getPathType() == PnfsListDirectoryMessage.PathType.LABEL) {
_nameSpaceProvider.listVirtualDirectory(msg.getSubject(), path.substring(1),
Expand Down Expand Up @@ -2314,12 +2349,25 @@ public void run() {
sendTimeout(message, "TTL exceeded");
continue;
}

processPnfsMessage(message, pnfs);
if (!(pnfs instanceof PnfsListDirectoryMessage)) {
processPnfsMessage(message, pnfs);
} else {
long ctime = System.currentTimeMillis();
listDirectory(message, (PnfsListDirectoryMessage) pnfs, _fifo);
long duration = System.currentTimeMillis() - ctime;
_gauges.update(pnfs.getClass(), duration);
if (_logSlowThreshold != THRESHOLD_DISABLED &&
duration > _logSlowThreshold) {
LOGGER.warn("{} processed in {} ms", pnfs.getClass(), duration);
} else {
LOGGER.info("{} processed in {} ms", pnfs.getClass(), duration);
}
postProcessMessage(message, pnfs);
}
fold(pnfs);
} catch (Throwable e) {
LOGGER.warn("processPnfsMessage: {} : {}", Thread.currentThread().getName(),
e);
LOGGER.warn("processPnfsMessage: {} : {}",
Thread.currentThread().getName(), e);
} finally {
clearActivity();
CDC.clearMessageContext();
Expand All @@ -2345,10 +2393,8 @@ protected void fold(PnfsMessage message) {
if (other.fold(message)) {
LOGGER.info("Folded {}", other.getClass().getSimpleName());
_foldedCounters.incrementRequests(message.getClass());

i.remove();
envelope.revertDirection();

sendMessage(envelope);
}
}
Expand Down Expand Up @@ -2556,11 +2602,15 @@ private Quota getQuota(Integer qid, QuotaType type) {

public void messageArrived(CellMessage envelope, PnfsListDirectoryMessage message)
throws CacheException {

String path = message.getPnfsPath();
if (path == null) {
throw new InvalidMessageCacheException("Missing PNFS id and path");
}
if (!_listQueue.offer(envelope)) {

int index = (int)(Math.abs((long)Objects.hashCode(path.toString())) % _listThreads);

if (!_listQueues[index].offer(envelope)) {
throw new MissingResourceCacheException("PnfsManager queue limit exceeded");
}
}
Expand Down Expand Up @@ -2663,8 +2713,6 @@ boolean processMessageTransactionally(CellMessage message, PnfsMessage pnfsMessa
processFlushMessage((PoolFileFlushedMessage) pnfsMessage);
} else if (pnfsMessage instanceof PnfsGetParentMessage) {
getParent((PnfsGetParentMessage) pnfsMessage);
} else if (pnfsMessage instanceof PnfsListDirectoryMessage) {
listDirectory(message, (PnfsListDirectoryMessage) pnfsMessage);
} else if (pnfsMessage instanceof PnfsGetFileAttributes) {
getFileAttributes((PnfsGetFileAttributes) pnfsMessage);
} else if (pnfsMessage instanceof PnfsSetFileAttributes) {
Expand Down

0 comments on commit d7653b9

Please sign in to comment.