Skip to content

Commit

Permalink
dcache-qos: repair faulty queue refresh algorithm in verifier
Browse files Browse the repository at this point in the history
Motivation:

Discovered while investigating
RT 10479 Time-outs using QoS in dCache 8.2.23 when doing a QoS transition through namespace and bulk
https://rt.dcache.org/Ticket/Display.html?id=10479

The current algorithm for refreshing the queues from the `qos_operation`
table uses the total size of the in-memory queues to trigger the query.
This is flawed and is susceptible to starving one or more of the queues.

Modfication:

Two changes have been made:

- the existence of any empty queue now triggers the refresh
- the refresh is run after available running slots have been filled

The second change is for efficiency.

The `delegate` API (internal) has been extended with a `refresh()`
method.

Result:

Queue starvation should be avoided and operations of all message types
should continue to be processed.

Target:  master
Request: 9.1
Request: 9.0
Request: 8.2
Patch: https://rb.dcache.org/r/14004/
Requires-book: no
Requires-notes: yes
Acked-by: Dmitry
  • Loading branch information
alrossi committed Jun 29, 2023
1 parent dba3b07 commit fb0acb2
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 51 deletions.
Expand Up @@ -252,11 +252,6 @@ static int indexFromMessageType(QoSMessageType type) {
*/
private int lastReadyIndex = 0;

/*
* If there is a refresh thread currently running.
*/
private boolean refreshing = false;

@Override
public Map<String, Long> aggregateCounts(String classifier) {
read.lock();
Expand Down Expand Up @@ -424,57 +419,66 @@ public int maxRunning() {

/*
* Dequeues the next READY operation, if it exists. Uses a simple clock algorithm
* to alternate among the queues. Note that operations have been appended during refresh
* in the sort order of the query (by last updated timestamp), so the queues are
* prioritized.
* to alternate among the queues.
* <p/>
* If the total size of the ready queues < the maximum running limit, a refresh is triggered.
*/
public VerifyOperation next() {
VerifyOperation operation = null;
int qsz;

write.lock();
try {
for (int i = 0; i < queues.length; ++i) {
operation = queues[lastReadyIndex].poll();

lastReadyIndex = (lastReadyIndex + 1) % queues.length;

if (operation != null) {
break;
}
}
} finally {
write.unlock();
}

qsz = Arrays.stream(queues).mapToInt(Deque::size).sum();
return operation;
}

/*
* Checks each queue to see if it is empty. If it is, its message types are
* added to a query to see if there are READY operations. The entire set is
* passed off to the queueSupplier thread which will repopulate the empty queues
* from the persistent store.
*/
public void refresh() {
List<QoSMessageType> toRefresh = new LinkedList<>();
read.lock();
try {
for (int i = 0; i < queues.length; ++i) {
if (queues[i].peek() == null) {
toRefresh.addAll(Arrays.asList(Queue.at(i).messageTypes));
}
}

if (!refreshing && qsz < maxRunning) {
int ready = dao.count(dao.where().state(READY));
LOGGER.debug("next, not currently refreshing; queue size {}, ready {}.", qsz,
ready);
if (!toRefresh.isEmpty()) {
QoSMessageType[] messageTypes = toRefresh.toArray(QoSMessageType[]::new);
int ready = dao.count(dao.where().state(READY).messageType(messageTypes));
if (ready > 0) {
refreshing = true;
LOGGER.trace("next, toRefresh {}, ready {}.", toRefresh, ready);
queueSupplier.submit(this::doRefresh);
}
}
} finally {
write.unlock();
read.unlock();
}

return operation;
}

/*
* Resets to READY operations that were in the RUNNING or WAITING state when the service
* last stopped so they can be reprocessed. The queues are then refreshed.
*/
public void reload() {
write.lock();
try {
dao.update(dao.where().state(RUNNING, WAITING), dao.set().state(READY));
if (dao.count(dao.where().state(READY)) > 0) {
queueSupplier.submit(this::doRefresh);
}
} finally {
write.unlock();
dao.update(dao.where().state(RUNNING, WAITING), dao.set().state(READY));
if (dao.count(dao.where().state(READY)) > 0) {
queueSupplier.submit(this::doRefresh);
}
}

Expand Down Expand Up @@ -700,40 +704,37 @@ public void voidOperation(VerifyOperation operation) {
}

/*
* Triggered if the sum of the queue sizes is below max running,
* if there are ready operations in the store.
* Triggered if any queue is empty and if there are ready operations in the store.
*
* For each queue it will attempt to pull into memory up to cache capacity operations
* corresponding to queue message types, clearing the queue first.
* corresponding to queue message types.
*
* Run on a separate thread, so it needs to lock.
* Run on a separate thread.
*/
private void doRefresh() {
LOGGER.debug("doRefresh starting.");

write.lock();

try {
/*
* ORDER BY default is "updated". Prioritization sets it back
* to arrival time (done on resetOperation).
*
* For fairness, we want to load the earliest of each of the separate types.
*/
for (int i = 0; i < queues.length; ++i) {
populateQueue(i);
if (queues[i].isEmpty()) {
populateQueue(i);
}
}

callback.signal();
} finally {
write.unlock();
}

LOGGER.debug("signalled callback; refresh finished.");

refreshing = false;
}

/*
* ORDER BY default is "updated". Prioritization sets it back
* to arrival time (done on resetOperation).
*
* For fairness, we want to load the earliest of each of the separate types.
*/
@GuardedBy("lock")
private void populateQueue(int index) {
Queue queue = Queue.at(index);
Expand All @@ -742,7 +743,6 @@ private void populateQueue(int index) {
VerifyOperationCriterion criterion = dao.where().state(READY)
.messageType(queue.messageTypes);
List<VerifyOperation> results = dao.get(criterion, capacity);
queues[index].clear();
results.forEach(queues[index]::addLast);
LOGGER.debug("populateQueue {}, loaded {} operations.", queue.name(), queues[index].size());
}
Expand Down
Expand Up @@ -108,7 +108,12 @@ public interface VerifyOperationDelegate extends VerifyOperationMap {
void reload();

/**
* Remove the operation from any in memory caches and from including any backing persistence.
* Repopulate memory caches, if applicable.
*/
void refresh();

/**
* Remove the operation from any in memory caches and from any backing persistence.
*
* @param pnfsId of the operation.
*/
Expand Down
Expand Up @@ -357,6 +357,8 @@ void processReady() {
submit(operation);
--available;
}

delegate.refresh();
}
}

Expand Down Expand Up @@ -583,12 +585,8 @@ public void run() {
*/
@VisibleForTesting
public void scan() {
try {
terminalProcessor.processTerminated();
readyProcessor.processReady();
} catch (Throwable t) {
t.printStackTrace();
}
terminalProcessor.processTerminated();
readyProcessor.processReady();
}

public void setCounters(QoSVerifierCounters counters) {
Expand Down

0 comments on commit fb0acb2

Please sign in to comment.