diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/VerifyOperationDaoDelegate.java b/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/VerifyOperationDaoDelegate.java index 32e8097ac16..44f48839daa 100644 --- a/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/VerifyOperationDaoDelegate.java +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/VerifyOperationDaoDelegate.java @@ -252,11 +252,6 @@ static int indexFromMessageType(QoSMessageType type) { */ private int lastReadyIndex = 0; - /* - * If there is a refresh thread currently running. - */ - private boolean refreshing = false; - @Override public Map aggregateCounts(String classifier) { read.lock(); @@ -424,42 +419,56 @@ public int maxRunning() { /* * Dequeues the next READY operation, if it exists. Uses a simple clock algorithm - * to alternate among the queues. Note that operations have been appended during refresh - * in the sort order of the query (by last updated timestamp), so the queues are - * prioritized. + * to alternate among the queues. *

- * If the total size of the ready queues < the maximum running limit, a refresh is triggered. */ public VerifyOperation next() { VerifyOperation operation = null; - int qsz; - write.lock(); try { for (int i = 0; i < queues.length; ++i) { operation = queues[lastReadyIndex].poll(); + lastReadyIndex = (lastReadyIndex + 1) % queues.length; + if (operation != null) { break; } } + } finally { + write.unlock(); + } - qsz = Arrays.stream(queues).mapToInt(Deque::size).sum(); + return operation; + } + + /* + * Checks each queue to see if it is empty. If it is, its message types are + * added to a query to see if there are READY operations. The entire set is + * passed off to the queueSupplier thread which will repopulate the empty queues + * from the persistent store. + */ + public void refresh() { + List toRefresh = new LinkedList<>(); + read.lock(); + try { + for (int i = 0; i < queues.length; ++i) { + if (queues[i].peek() == null) { + toRefresh.addAll(Arrays.asList(Queue.at(i).messageTypes)); + } + } - if (!refreshing && qsz < maxRunning) { - int ready = dao.count(dao.where().state(READY)); - LOGGER.debug("next, not currently refreshing; queue size {}, ready {}.", qsz, - ready); + if (!toRefresh.isEmpty()) { + QoSMessageType[] messageTypes = toRefresh.toArray(QoSMessageType[]::new); + int ready = dao.count(dao.where().state(READY).messageType(messageTypes)); if (ready > 0) { - refreshing = true; + LOGGER.trace("next, toRefresh {}, ready {}.", toRefresh, ready); queueSupplier.submit(this::doRefresh); } } } finally { - write.unlock(); + read.unlock(); } - - return operation; } /* @@ -467,14 +476,9 @@ public VerifyOperation next() { * last stopped so they can be reprocessed. The queues are then refreshed. */ public void reload() { - write.lock(); - try { - dao.update(dao.where().state(RUNNING, WAITING), dao.set().state(READY)); - if (dao.count(dao.where().state(READY)) > 0) { - queueSupplier.submit(this::doRefresh); - } - } finally { - write.unlock(); + dao.update(dao.where().state(RUNNING, WAITING), dao.set().state(READY)); + if (dao.count(dao.where().state(READY)) > 0) { + queueSupplier.submit(this::doRefresh); } } @@ -700,40 +704,37 @@ public void voidOperation(VerifyOperation operation) { } /* - * Triggered if the sum of the queue sizes is below max running, - * if there are ready operations in the store. + * Triggered if any queue is empty and if there are ready operations in the store. * * For each queue it will attempt to pull into memory up to cache capacity operations - * corresponding to queue message types, clearing the queue first. + * corresponding to queue message types. * - * Run on a separate thread, so it needs to lock. + * Run on a separate thread. */ private void doRefresh() { LOGGER.debug("doRefresh starting."); write.lock(); - try { - /* - * ORDER BY default is "updated". Prioritization sets it back - * to arrival time (done on resetOperation). - * - * For fairness, we want to load the earliest of each of the separate types. - */ for (int i = 0; i < queues.length; ++i) { - populateQueue(i); + if (queues[i].isEmpty()) { + populateQueue(i); + } } - callback.signal(); } finally { write.unlock(); } LOGGER.debug("signalled callback; refresh finished."); - - refreshing = false; } + /* + * ORDER BY default is "updated". Prioritization sets it back + * to arrival time (done on resetOperation). + * + * For fairness, we want to load the earliest of each of the separate types. + */ @GuardedBy("lock") private void populateQueue(int index) { Queue queue = Queue.at(index); @@ -742,7 +743,6 @@ private void populateQueue(int index) { VerifyOperationCriterion criterion = dao.where().state(READY) .messageType(queue.messageTypes); List results = dao.get(criterion, capacity); - queues[index].clear(); results.forEach(queues[index]::addLast); LOGGER.debug("populateQueue {}, loaded {} operations.", queue.name(), queues[index].size()); } diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/VerifyOperationDelegate.java b/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/VerifyOperationDelegate.java index b92896821eb..41b7cfba331 100644 --- a/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/VerifyOperationDelegate.java +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/VerifyOperationDelegate.java @@ -108,7 +108,12 @@ public interface VerifyOperationDelegate extends VerifyOperationMap { void reload(); /** - * Remove the operation from any in memory caches and from including any backing persistence. + * Repopulate memory caches, if applicable. + */ + void refresh(); + + /** + * Remove the operation from any in memory caches and from any backing persistence. * * @param pnfsId of the operation. */ diff --git a/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/VerifyOperationDelegatingMap.java b/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/VerifyOperationDelegatingMap.java index cffa7021953..ecafb5420c3 100644 --- a/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/VerifyOperationDelegatingMap.java +++ b/modules/dcache-qos/src/main/java/org/dcache/qos/services/verifier/data/VerifyOperationDelegatingMap.java @@ -357,6 +357,8 @@ void processReady() { submit(operation); --available; } + + delegate.refresh(); } } @@ -583,12 +585,8 @@ public void run() { */ @VisibleForTesting public void scan() { - try { - terminalProcessor.processTerminated(); - readyProcessor.processReady(); - } catch (Throwable t) { - t.printStackTrace(); - } + terminalProcessor.processTerminated(); + readyProcessor.processReady(); } public void setCounters(QoSVerifierCounters counters) {