Skip to content

Commit

Permalink
dcache-qos: rework verifier operation handling so most of it is in me…
Browse files Browse the repository at this point in the history
…mory (as in resilience)

Motivation:

In adapting the original resilience code to the qos services,
we decided to move away from completely in-memory processing
(with lossy to-file checkpointing of the operation map) to
using an underlying database to hold the operations.  This
was done in such a way that (a) operations were held in
memory up to a cache capacity limit, and were replenished
from the database store, and, as a consequence, (b) the
state of each operation was updated (written
through) to the database during its lifecycle.

In testing the soon-to-be posted rule engine extension,
I noticed that this design was not scaling efficiently.
Part of the problem was a mistake in the service's
message receiver, causing message backlog (this bug will
be fixed downstream), but the second part of the problem
was due to the database pressure.  What is more, the constant
updating of the underlying postgresql tables causes heavy
fragmentation/holes and requires autovacuum
to run more frequently.

Modification:

This patch moves us back in the direction of what resilience
used to do. All operations are held in memory for their
entire lifecyle. (16 GiB was usually sufficient for
Resilience and this should be no different; JFR
profiling reveals a stable memory footprint so far
not exceeding 1 GiB, in fact).

The RDMBS store is still used for recovery, but it now
holds a reduced operation descriptor with many state-related
fields eliminated. The operation is written once to the
database, and persists until the entire verification
sequence is completed, at which point it is eliminated
(via a batch delete).  Moreover, only `cache location`
or `qos modified` message types are stored, since they
originate outside of the qos components (scans, on
the other hand, need not store their operations as
they are repeating and based on queries).

The overhaul largely involves code elimination,
but this refactoring also reworks the central
components: what was called the `operation map`
and is now called the `operation manager`, and
the internal queues.  The new queueing schema
replaces the single-threaded clock algorithm
with independent thread pools for each queue
type.  The queue types are also now configurable
through the spring.xml.  Extra properties have
been added to control thread pool sizes, etc.,
for this new setup.

Result:

Under rule-engine extension testing, the
re-implementation performs much better,
particularly in terms of the turnover
of VOIDed operations, which on system
or pool scans normally constitute the
majority.

It is recommended this change accompany
the introduction of the rule engine
extensions (to follow).

Target: master
Patch: https://rb.dcache.org/r/14063/
Requires-notes: yes (eventually, for 9.2)
Acked-by: Tigran
  • Loading branch information
alrossi committed Aug 29, 2023
1 parent 8c92f2c commit 09e8e32
Show file tree
Hide file tree
Showing 22 changed files with 1,236 additions and 1,900 deletions.
Expand Up @@ -89,15 +89,12 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.qos.services.verifier.data.PoolInfoFilter;
import org.dcache.qos.services.verifier.data.PoolInfoMap;
import org.dcache.qos.services.verifier.data.VerifyOperationCancelFilter;
import org.dcache.qos.services.verifier.data.VerifyOperationDaoDelegate;
import org.dcache.qos.services.verifier.data.VerifyOperationDelegatingMap;
import org.dcache.qos.services.verifier.data.VerifyOperationFilter;
import org.dcache.qos.services.verifier.data.VerifyOperationManager;
import org.dcache.qos.services.verifier.data.VerifyOperationState;
import org.dcache.qos.services.verifier.handlers.VerifyOperationHandler;
import org.dcache.qos.services.verifier.util.QoSVerifierCounters;
import org.dcache.qos.util.InitializerAwareCommand;
import org.dcache.qos.util.MapInitializer;
import org.dcache.qos.util.MessageGuard;
import org.dcache.qos.util.QoSHistory;
import org.dcache.vehicles.FileAttributes;

Expand Down Expand Up @@ -296,7 +293,7 @@ protected String doCall() throws Exception {
try {
VerifyOperationFilter filter = getFilter();
forceRemoval |= Arrays.stream(state).collect(Collectors.toSet()).contains(WAITING);
fileOpMap.cancel(new VerifyOperationCancelFilter(filter, forceRemoval));
manager.cancel(new VerifyOperationCancelFilter(filter, forceRemoval));
return "Issued cancel command to cancel verify operations.";
} catch (IllegalArgumentException e) {
return "Improper input: " + e.getMessage();
Expand Down Expand Up @@ -422,10 +419,10 @@ protected String doCall() throws Exception {
filter.setReverse(reverse);

if (count) {
return fileOpMap.count(filter) + " matching pnfsids";
return manager.count(filter) + " matching pnfsids";
}

long size = fileOpMap.size();
long size = manager.size();
int limitValue = (int) size;

if (limit == null) {
Expand All @@ -439,7 +436,7 @@ protected String doCall() throws Exception {
limitValue = limit;
}

return fileOpMap.list(filter, limitValue);
return manager.list(filter, limitValue);
}
}

Expand All @@ -457,9 +454,6 @@ class VerifyResetCommand extends InitializerAwareCommand {
usage = "sweep interval unit.")
TimeUnit unit;

@Option(name = "capacity", usage = "Maximum size of in-memory operation cache.")
Integer capacity;

@Option(name = "maxRunning", usage = "Maximum number of concurrent running operations.")
Integer maxRunning;

Expand All @@ -473,25 +467,21 @@ class VerifyResetCommand extends InitializerAwareCommand {
@Override
protected String doCall() throws Exception {
if (sweep != null) {
fileOpMap.setTimeout(sweep);
manager.setTimeout(sweep);
if (unit != null) {
fileOpMap.setTimeoutUnit(unit);
manager.setTimeoutUnit(unit);
}
}

if (capacity != null) {
cache.setCapacity(capacity);
}

if (maxRunning != null) {
cache.setMaxRunning(maxRunning);
manager.setMaxRunning(maxRunning);
}

if (retries != null) {
fileOpMap.setMaxRetries(retries);
manager.setMaxRetries(retries);
}

return fileOpMap.infoMessage();
return manager.infoMessage();
}
}

Expand Down Expand Up @@ -529,36 +519,21 @@ protected String doCall() throws Exception {
}

private CellStub pnfsManager;
private MessageGuard messageGuard;
private MapInitializer initializer;
private PoolInfoMap poolInfoMap;
private VerifyOperationDelegatingMap fileOpMap;
private VerifyOperationDaoDelegate cache;
private VerifyOperationHandler fileOpHandler;
private VerifyOperationManager manager;
private QoSVerifierCounters counters;
private QoSHistory history;

/*
* Needs concrete implementation type to set initialization and running parameters.
*/
public void setCache(VerifyOperationDaoDelegate cache) {
this.cache = cache;
}

public void setCounters(QoSVerifierCounters counters) {
this.counters = counters;
}

/*
* Needs concrete implementation type to set initialization and running parameters.
*/
public void setFileOpMap(VerifyOperationDelegatingMap fileOpMap) {
this.fileOpMap = fileOpMap;
}

public void setFileOpHandler(
VerifyOperationHandler fileOpHandler) {
this.fileOpHandler = fileOpHandler;
public void setManager(VerifyOperationManager manager) {
this.manager = manager;
}

public void setHistory(QoSHistory history) {
Expand All @@ -569,10 +544,6 @@ public void setInitializer(MapInitializer initializer) {
this.initializer = initializer;
}

public void setMessageGuard(MessageGuard messageGuard) {
this.messageGuard = messageGuard;
}

public void setPnfsManager(CellStub pnfsManager) {
this.pnfsManager = pnfsManager;
}
Expand All @@ -591,7 +562,7 @@ private String runFileChecks(Collection<PnfsId> list) {
Iterator<String> it = attr.getLocations().iterator();
FileQoSUpdate update = new FileQoSUpdate(pnfsId, it.hasNext() ? it.next() : null,
QoSMessageType.VALIDATE_ONLY);
fileOpMap.createOrUpdateOperation(update);
manager.createOrUpdateOperation(update);
++successful;
} catch (NoSuchElementException | CacheException e) {
reply.append(pnfsId).append(" ").append(e.getMessage()).append("\n");
Expand All @@ -609,21 +580,4 @@ private PnfsHandler getPnfsHandler() {
handler.setRestriction(Restrictions.none());
return handler;
}

private void startAll() {
initializer.initialize();
if (fileOpMap.isRunning()) {
fileOpMap.shutdown();
}
fileOpMap.initialize();
messageGuard.enable();
}

private void shutdownAll() {
if (fileOpMap.isRunning()) {
fileOpMap.shutdown();
}
messageGuard.disable(true);
initializer.shutDown();
}
}
Expand Up @@ -60,10 +60,8 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
package org.dcache.qos.services.verifier.data;

import java.util.function.Predicate;
import org.dcache.qos.data.QoSAction;
import org.dcache.qos.services.verifier.data.db.VerifyOperationDao;
import org.dcache.qos.services.verifier.data.db.VerifyOperationDao.VerifyOperationCriterion;
import org.dcache.qos.services.verifier.data.db.VerifyOperationDao.VerifyOperationUpdate;

/**
* Filter used specifically with cancel operations.
Expand All @@ -78,17 +76,6 @@ public VerifyOperationCancelFilter(VerifyOperationFilter filter, boolean remove)
this.remove = remove;
}

public VerifyOperationUpdate getUpdate(VerifyOperationDao dao) {
VerifyOperationUpdate update = dao.set().state(VerifyOperationState.CANCELED);
if (remove) {
/*
* Only if the operation is voided will it be removed.
*/
update.action(QoSAction.VOID);
}
return update;
}

public VerifyOperationCriterion getCriterion(VerifyOperationDao dao) {
return filter.toCriterion(dao);
}
Expand Down

0 comments on commit 09e8e32

Please sign in to comment.