From 5679b208e90f24c4c0f32e187b0b708d79f109a7 Mon Sep 17 00:00:00 2001 From: Tigran Mkrtchyan Date: Mon, 15 Aug 2016 12:51:33 +0200 Subject: [PATCH] pool: allow sort output of 'mover ls' by access time and size Motivation: On a running system it's time-to-time required to list movers and sort them by last modification time or transfered data. Modification: added '-t' and '-S' options (as by unix's ls command) to sort output of mover ls by last modification time (time since last io request from client) or transfer size. If no extra switch is provided, then output is sorted by job ids, which is same as mover creation time. In both, '-t' and '-S' are provided, then output is sorted by last access time. Additionally, with '-r' option reverse sorting is performed (ascending) Result: admins can specify sorting order of mover ls commend. Acked-by: Paul Millar Target: master Require-book: no Require-notes: no --- .../dcache/pool/classic/IoQueueManager.java | 97 +++++++++++++------ .../pool/classic/MoverRequestScheduler.java | 26 +++-- 2 files changed, 79 insertions(+), 44 deletions(-) diff --git a/modules/dcache/src/main/java/org/dcache/pool/classic/IoQueueManager.java b/modules/dcache/src/main/java/org/dcache/pool/classic/IoQueueManager.java index f2d8bc13898..f8c4ae7a6b6 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/classic/IoQueueManager.java +++ b/modules/dcache/src/main/java/org/dcache/pool/classic/IoQueueManager.java @@ -10,6 +10,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Comparator; import java.util.NoSuchElementException; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -204,22 +205,8 @@ private String moverSetMaxActive(MoverRequestScheduler js, int active) return "Max Active Io Movers set to " + active; } - private Serializable list(MoverRequestScheduler js, boolean binary) - { - return list(Collections.singleton(js), binary); - } - - private Serializable list(Collection jobSchedulers, boolean isBinary) - { - if (isBinary) { - return jobSchedulers.stream().flatMap(s -> s.getJobInfos().stream()).toArray(IoJobInfo[]::new); - } else { - StringBuffer sb = new StringBuffer(); - for (MoverRequestScheduler js : jobSchedulers) { - js.printJobQueue(sb); - } - return sb.toString(); - } + private static void toMoverString(MoverRequestScheduler.PrioritizedRequest j, StringBuilder sb) { + sb.append(j.getId()).append(" : ").append(j).append('\n'); } @AffectsSetup @@ -410,6 +397,15 @@ public class MoverLsCommand implements Callable @Option(name = "binary", usage = "Use binary output format.") boolean isBinary; + @Option(name = "t", usage = "Sort output by last access time.") + boolean sortByTime; + + @Option(name = "S", usage = "Sort output by transfer size.") + boolean sortBySize; + + @Option(name = "r", usage = "Sort output in reverse order.") + boolean reverseSort; + @Override public Serializable call() throws NoSuchElementException { @@ -417,25 +413,59 @@ public Serializable call() throws NoSuchElementException return getQueueByJobId(id).getJobInfo(id); } - if (queueName == null) { - return list(queuesById.values(), isBinary); + boolean groupByQueue; + Collection queues; + if (queueName != null && !queueName.isEmpty()) { + MoverRequestScheduler js = queuesByName.get(queueName); + if (js == null) { + throw new NoSuchElementException("Not found : " + queueName); + } + queues = Collections.singleton(js); + groupByQueue = false; + } else { + groupByQueue = queueName != null && queueName.isEmpty(); + queues = queuesById.values(); } - if (queueName.isEmpty()) { + if (isBinary) { + // ignore sortin and grouping by queue name if binnary + return queues.stream().flatMap(s -> s.getJobInfos().stream()).toArray(IoJobInfo[]::new); + } else { + + Comparator comparator; + if (sortBySize) { + comparator = (b, a) -> Long.compare( + a.getMover().getBytesTransferred(), b.getMover().getBytesTransferred() + ); + } else if (sortByTime) { + comparator = (b, a) -> Long.compare( + a.getMover().getLastTransferred(), b.getMover().getLastTransferred() + ); + } else { + comparator = (b, a) -> Integer.compare( + a.getId(), b.getId() + ); + } + + if (reverseSort) { + comparator = comparator.reversed(); + } + StringBuilder sb = new StringBuilder(); - for (MoverRequestScheduler js : queues()) { - sb.append("[").append(js.getName()).append("]\n"); - sb.append(list(js, isBinary).toString()); + if (groupByQueue) { + queues.stream().forEach(q -> { + sb.append("[").append(q.getName()).append("]\n"); + q.getJobs() + .sorted() + .forEach(j -> IoQueueManager.toMoverString(j, sb)); + }); + } else { + queues.stream().flatMap(s -> s.getJobs()) + .sorted(comparator) + .forEach(j -> IoQueueManager.toMoverString(j, sb)); } return sb.toString(); } - - MoverRequestScheduler js = queuesByName.get(queueName); - if (js == null) { - throw new NoSuchElementException("Not found : " + queueName); - } - - return list(js, isBinary); } } @@ -455,7 +485,14 @@ public Serializable call() throws NoSuchElementException if (id != null) { return p2pQueue.getJobInfo(id); } - return list(p2pQueue, isBinary); + if (isBinary) { + return p2pQueue.getJobs().toArray(IoJobInfo[]::new); + } else { + StringBuilder sb = new StringBuilder(); + p2pQueue.getJobs() + .forEach(j -> IoQueueManager.toMoverString(j, sb)); + return sb.toString(); + } } } diff --git a/modules/dcache/src/main/java/org/dcache/pool/classic/MoverRequestScheduler.java b/modules/dcache/src/main/java/org/dcache/pool/classic/MoverRequestScheduler.java index 879f380df53..a37bf966f9f 100644 --- a/modules/dcache/src/main/java/org/dcache/pool/classic/MoverRequestScheduler.java +++ b/modules/dcache/src/main/java/org/dcache/pool/classic/MoverRequestScheduler.java @@ -21,6 +21,7 @@ import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.Stream; import diskCacheV111.util.CacheException; import diskCacheV111.util.DiskErrorCacheException; @@ -316,6 +317,16 @@ public List getJobInfos() ); } + /** + * Get a {@link Stream} of all jobs in this queue. + * + * @return list of all jobs + */ + Stream getJobs() { + + return _jobs.values().stream(); + } + /** * Get the maximal number allowed of concurrently running jobs by this scheduler. * @@ -431,19 +442,6 @@ public void failed(Throwable exc, Void attachment) } } - /** - * Print job list and status into provided {@link StringBuffer} - * - * @param sb int - * @return provided string buffer - */ - public StringBuffer printJobQueue(StringBuffer sb) - { - _jobs.values() - .forEach(j -> sb.append(j.getId()).append(" : ").append(j).append('\n')); - return sb; - } - /** * Shutdown the scheduler. All subsequent execution request will be rejected. */ @@ -584,7 +582,7 @@ public synchronized void setTotal(long total) _total = total; } - private static class PrioritizedRequest implements IoPrioritizable + static class PrioritizedRequest implements IoPrioritizable { private final Mover _mover; private final IoPriority _priority;