Skip to content

Commit

Permalink
pool: allow sort output of 'mover ls' by access time and size
Browse files Browse the repository at this point in the history
Motivation:
On a running system it's time-to-time required to list movers and
sort them by last modification time or transfered data.

Modification:
added '-t' and '-S' options (as by unix's ls command) to sort output of
mover ls by last modification time (time since last io request from client)
or transfer size. If no extra switch is provided, then output is sorted
by job ids, which is same as mover creation time. In both, '-t' and '-S'
are provided, then output is sorted by last access time. Additionally,
with '-r' option reverse sorting is performed (ascending)

Result:
admins can specify sorting order of mover ls commend.

Acked-by: Paul Millar
Target: master
Require-book: no
Require-notes: no
  • Loading branch information
kofemann committed Sep 1, 2016
1 parent 26a2bd7 commit 5679b20
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 44 deletions.
Expand Up @@ -10,6 +10,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Comparator;
import java.util.NoSuchElementException;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -204,22 +205,8 @@ private String moverSetMaxActive(MoverRequestScheduler js, int active)
return "Max Active Io Movers set to " + active;
}

private Serializable list(MoverRequestScheduler js, boolean binary)
{
return list(Collections.singleton(js), binary);
}

private Serializable list(Collection<MoverRequestScheduler> jobSchedulers, boolean isBinary)
{
if (isBinary) {
return jobSchedulers.stream().flatMap(s -> s.getJobInfos().stream()).toArray(IoJobInfo[]::new);
} else {
StringBuffer sb = new StringBuffer();
for (MoverRequestScheduler js : jobSchedulers) {
js.printJobQueue(sb);
}
return sb.toString();
}
private static void toMoverString(MoverRequestScheduler.PrioritizedRequest j, StringBuilder sb) {
sb.append(j.getId()).append(" : ").append(j).append('\n');
}

@AffectsSetup
Expand Down Expand Up @@ -410,32 +397,75 @@ public class MoverLsCommand implements Callable<Serializable>
@Option(name = "binary", usage = "Use binary output format.")
boolean isBinary;

@Option(name = "t", usage = "Sort output by last access time.")
boolean sortByTime;

@Option(name = "S", usage = "Sort output by transfer size.")
boolean sortBySize;

@Option(name = "r", usage = "Sort output in reverse order.")
boolean reverseSort;

@Override
public Serializable call() throws NoSuchElementException
{
if (id != null) {
return getQueueByJobId(id).getJobInfo(id);
}

if (queueName == null) {
return list(queuesById.values(), isBinary);
boolean groupByQueue;
Collection<MoverRequestScheduler> queues;
if (queueName != null && !queueName.isEmpty()) {
MoverRequestScheduler js = queuesByName.get(queueName);
if (js == null) {
throw new NoSuchElementException("Not found : " + queueName);
}
queues = Collections.singleton(js);
groupByQueue = false;
} else {
groupByQueue = queueName != null && queueName.isEmpty();
queues = queuesById.values();
}

if (queueName.isEmpty()) {
if (isBinary) {
// ignore sortin and grouping by queue name if binnary
return queues.stream().flatMap(s -> s.getJobInfos().stream()).toArray(IoJobInfo[]::new);
} else {

Comparator<MoverRequestScheduler.PrioritizedRequest> comparator;
if (sortBySize) {
comparator = (b, a) -> Long.compare(
a.getMover().getBytesTransferred(), b.getMover().getBytesTransferred()
);
} else if (sortByTime) {
comparator = (b, a) -> Long.compare(
a.getMover().getLastTransferred(), b.getMover().getLastTransferred()
);
} else {
comparator = (b, a) -> Integer.compare(
a.getId(), b.getId()
);
}

if (reverseSort) {
comparator = comparator.reversed();
}

StringBuilder sb = new StringBuilder();
for (MoverRequestScheduler js : queues()) {
sb.append("[").append(js.getName()).append("]\n");
sb.append(list(js, isBinary).toString());
if (groupByQueue) {
queues.stream().forEach(q -> {
sb.append("[").append(q.getName()).append("]\n");
q.getJobs()
.sorted()
.forEach(j -> IoQueueManager.toMoverString(j, sb));
});
} else {
queues.stream().flatMap(s -> s.getJobs())
.sorted(comparator)
.forEach(j -> IoQueueManager.toMoverString(j, sb));
}
return sb.toString();
}

MoverRequestScheduler js = queuesByName.get(queueName);
if (js == null) {
throw new NoSuchElementException("Not found : " + queueName);
}

return list(js, isBinary);
}
}

Expand All @@ -455,7 +485,14 @@ public Serializable call() throws NoSuchElementException
if (id != null) {
return p2pQueue.getJobInfo(id);
}
return list(p2pQueue, isBinary);
if (isBinary) {
return p2pQueue.getJobs().toArray(IoJobInfo[]::new);
} else {
StringBuilder sb = new StringBuilder();
p2pQueue.getJobs()
.forEach(j -> IoQueueManager.toMoverString(j, sb));
return sb.toString();
}
}
}

Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import diskCacheV111.util.CacheException;
import diskCacheV111.util.DiskErrorCacheException;
Expand Down Expand Up @@ -316,6 +317,16 @@ public List<IoJobInfo> getJobInfos()
);
}

/**
* Get a {@link Stream} of all jobs in this queue.
*
* @return list of all jobs
*/
Stream<PrioritizedRequest> getJobs() {

return _jobs.values().stream();
}

/**
* Get the maximal number allowed of concurrently running jobs by this scheduler.
*
Expand Down Expand Up @@ -431,19 +442,6 @@ public void failed(Throwable exc, Void attachment)
}
}

/**
* Print job list and status into provided {@link StringBuffer}
*
* @param sb int
* @return provided string buffer
*/
public StringBuffer printJobQueue(StringBuffer sb)
{
_jobs.values()
.forEach(j -> sb.append(j.getId()).append(" : ").append(j).append('\n'));
return sb;
}

/**
* Shutdown the scheduler. All subsequent execution request will be rejected.
*/
Expand Down Expand Up @@ -584,7 +582,7 @@ public synchronized void setTotal(long total)
_total = total;
}

private static class PrioritizedRequest implements IoPrioritizable
static class PrioritizedRequest implements IoPrioritizable
{
private final Mover<?> _mover;
private final IoPriority _priority;
Expand Down

0 comments on commit 5679b20

Please sign in to comment.