Skip to content

Commit

Permalink
pool: Use immutable collections in IoQueueManager
Browse files Browse the repository at this point in the history
IoQueueManager currently synchronizes all methods. It does
so despite never adding or removing any queues. This patch
replaces the collections with immutable counterparts.

At the moment this patch is mostly cosmetic, but I need it
for a subsequent patch that cleans up the shutdown logic
to avoid deadlocks.

Target: trunk
Require-notes: no
Require-book: no
Acked-by: Paul Millar <paul.millar@desy.de>
Patch: http://rb.dcache.org/r/5295/
  • Loading branch information
Gerd Behrmann committed Mar 17, 2013
1 parent 797efe5 commit fe7635b
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 80 deletions.
@@ -1,12 +1,13 @@
package org.dcache.pool.classic;

import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -16,134 +17,118 @@

import org.dcache.util.IoPriority;

import static com.google.common.collect.Iterables.concat;
import static java.util.Arrays.asList;

public class IoQueueManager {

private final static Logger _log = LoggerFactory.getLogger(IoQueueManager.class);

public static final String DEFAULT_QUEUE = "regular";
private List<IoScheduler> _list = new ArrayList<>();
private final Map<String, IoScheduler> _hash = new HashMap<>();
private final ImmutableList<IoScheduler> _queues;
private final ImmutableMap<String, IoScheduler> _queuesByName;

public IoQueueManager(JobTimeoutManager jobTimeoutManager, String[] queues,
public IoQueueManager(JobTimeoutManager jobTimeoutManager, String[] names,
MoverExecutorServices executorServices) {

if(queues == null) {
throw new IllegalArgumentException("queue names can't be null");
}

addQueue(DEFAULT_QUEUE, jobTimeoutManager, executorServices);
for (String queueName : queues) {
queueName = queueName.trim();
if(queueName.isEmpty()) {
continue;
Map<String,IoScheduler> queuesByName = new HashMap<>();
List<IoScheduler> queues = new ArrayList<>();
for (String name : concat(asList(DEFAULT_QUEUE), asList(names))) {
name = name.trim();
if (!name.isEmpty()) {
boolean fifo = !name.startsWith("-");
if (!fifo) {
name = name.substring(1);
}
if (!queuesByName.containsKey(name)) {
_log.debug("Creating queue: {}", name);
IoScheduler job = new SimpleIoScheduler(name, executorServices, queues.size(), fifo);
queues.add(job);
queuesByName.put(name, job);
jobTimeoutManager.addScheduler(name, job);
} else {
_log.warn("Queue not created, name already exists: {}", name);
}
}

addQueue(queueName, jobTimeoutManager, executorServices);
}

_log.info("Defined IO queues {}: " + _hash.keySet());
}

private void addQueue(String queueName, JobTimeoutManager jobTimeoutManager,
MoverExecutorServices executorServices ) {
boolean fifo = !queueName.startsWith("-");
if (!fifo) {
queueName = queueName.substring(1);
}
if (_hash.get(queueName) == null) {
_log.info("adding queue: {}", queueName);
int id = _list.size();
IoScheduler job = new SimpleIoScheduler(queueName, executorServices, id, fifo);
_list.add(job);
_hash.put(queueName, job);
jobTimeoutManager.addScheduler(queueName, job);
}else{
_log.warn("Queue not created, name already exists: " + queueName);
}
}

public synchronized IoScheduler getDefaultScheduler() {
return _list.get(0);
_queues = ImmutableList.copyOf(queues);
_queuesByName = ImmutableMap.copyOf(queuesByName);
_log.debug("Defined IO queues: {}", _queuesByName.keySet());
}

public synchronized Collection<IoScheduler> getSchedulers() {
return Collections.unmodifiableCollection(_list);
public IoScheduler getDefaultQueue() {
return _queues.get(0);
}

public synchronized IoScheduler getQueue(String queueName) {
return _hash.get(queueName);
public ImmutableCollection<IoScheduler> getQueues() {
return _queues;
}

/**
* Get {@link List} of defined {@link JobScheduler}s.
* @return schedulers.
*/
public synchronized List<IoScheduler> getQueues() {
return new ArrayList<>(_list);
public IoScheduler getQueue(String queueName) {
return _queuesByName.get(queueName);
}

public synchronized IoScheduler getQueueByJobId(int id) {
public IoScheduler getQueueByJobId(int id) {
int pos = id >> 24;
if (pos >= _list.size()) {
throw new IllegalArgumentException("Invalid id (doesn't below to any known scheduler)");
if (pos >= _queues.size()) {
throw new IllegalArgumentException("Invalid id (doesn't belong to any known scheduler)");
}
return _list.get(pos);
return _queues.get(pos);
}

public synchronized int add(String queueName, PoolIORequest request, IoPriority priority)
public int add(String queueName, PoolIORequest request, IoPriority priority)
{
IoScheduler js = (queueName == null) ? null : _hash.get(queueName);
IoScheduler js = (queueName == null) ? null : _queuesByName.get(queueName);
return (js == null) ? add(request, priority) : js.add(request, priority);
}

public synchronized int add(PoolIORequest request, IoPriority priority)
public int add(PoolIORequest request, IoPriority priority)
{
return getDefaultScheduler().add(request, priority);
return getDefaultQueue().add(request, priority);
}

public synchronized void cancel(int jobId) throws NoSuchElementException {
public void cancel(int jobId) throws NoSuchElementException {
getQueueByJobId(jobId).cancel(jobId);
}

public synchronized int getMaxActiveJobs() {
public int getMaxActiveJobs() {
int sum = 0;
for (IoScheduler s : _list) {
for (IoScheduler s : _queues) {
sum += s.getMaxActiveJobs();
}
return sum;
}

public synchronized int getActiveJobs() {
public int getActiveJobs() {
int sum = 0;
for (IoScheduler s : _list) {
for (IoScheduler s : _queues) {
sum += s.getActiveJobs();
}
return sum;
}

public synchronized int getQueueSize() {
public int getQueueSize() {
int sum = 0;
for (IoScheduler s : _list) {
for (IoScheduler s : _queues) {
sum += s.getQueueSize();
}
return sum;
}

public synchronized List<JobInfo> getJobInfos() {
public List<JobInfo> getJobInfos() {
List<JobInfo> list = new ArrayList<>();
for (IoScheduler s : _list) {
for (IoScheduler s : _queues) {
list.addAll(s.getJobInfos());
}
return list;
}

public synchronized void printSetup(PrintWriter pw) {
for (IoScheduler s : _list) {
public void printSetup(PrintWriter pw) {
for (IoScheduler s : _queues) {
pw.println("mover set max active -queue=" + s.getName() + " " + s.getMaxActiveJobs());
}
}

public synchronized JobInfo findJob(String client, long id) {
public JobInfo findJob(String client, long id) {
for (JobInfo info : getJobInfos()) {
if (client.equals(info.getClientName()) && id == info.getClientId()) {
return info;
Expand All @@ -153,8 +138,8 @@ public synchronized JobInfo findJob(String client, long id) {
}

public synchronized void shutdown() {
for (IoScheduler s : _list) {
s.shutdown();
for (IoScheduler queue : _queues) {
queue.shutdown();
}
}
}
15 changes: 8 additions & 7 deletions modules/dcache/src/main/java/org/dcache/pool/classic/PoolV4.java
Expand Up @@ -15,6 +15,7 @@
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Hashtable;
Expand Down Expand Up @@ -656,7 +657,7 @@ public void getInfo(PrintWriter pw)
pw.println("Inventory : " + _hybridCurrent);
}

for (IoScheduler js : _ioQueue.getSchedulers()) {
for (IoScheduler js : _ioQueue.getQueues()) {
pw.println("Mover Queue (" + js.getName() + ") "
+ js.getActiveJobs() + "(" + js.getMaxActiveJobs()
+ ")/" + js.getQueueSize());
Expand Down Expand Up @@ -1544,7 +1545,7 @@ private PoolCostInfo getPoolCostInfo()
info.getSpaceInfo().setParameter(_breakEven, _gap);
info.setMoverCostFactor(_moverCostFactor);

for (IoScheduler js : _ioQueue.getSchedulers()) {
for (IoScheduler js : _ioQueue.getQueues()) {
/*
* we skip p2p queue as it is handled differently
* FIXME: no special cases
Expand Down Expand Up @@ -1994,7 +1995,7 @@ public String ac_pool_enable(Args args)
String queueName = args.getOpt("queue");

if (queueName == null) {
return mover_set_max_active(_ioQueue.getDefaultScheduler(), args);
return mover_set_max_active(_ioQueue.getDefaultQueue(), args);
}

IoScheduler js = _ioQueue.getQueue(queueName);
Expand Down Expand Up @@ -2031,14 +2032,14 @@ private String mover_set_max_active(IoScheduler js, Args args)
StringBuilder sb = new StringBuilder();

if (args.hasOption("l")) {
for (IoScheduler js : _ioQueue.getSchedulers()) {
for (IoScheduler js : _ioQueue.getQueues()) {
sb.append(js.getName())
.append(" ").append(js.getActiveJobs())
.append(" ").append(js.getMaxActiveJobs())
.append(" ").append(js.getQueueSize()).append("\n");
}
} else {
for (IoScheduler js : _ioQueue.getSchedulers()) {
for (IoScheduler js : _ioQueue.getQueues()) {
sb.append(js.getName()).append("\n");
}
}
Expand All @@ -2061,7 +2062,7 @@ private String mover_set_max_active(IoScheduler js, Args args)

if (queueName.length() == 0) {
StringBuilder sb = new StringBuilder();
for (IoScheduler js : _ioQueue.getSchedulers()) {
for (IoScheduler js : _ioQueue.getQueues()) {
sb.append("[").append(js.getName()).append("]\n");
sb.append(mover_ls(js, binary).toString());
}
Expand Down Expand Up @@ -2095,7 +2096,7 @@ private Object mover_ls(IoScheduler js, boolean binary) {
return mover_ls(Arrays.asList(js), binary);
}

private Object mover_ls(List<IoScheduler> jobSchedulers, boolean binary) {
private Object mover_ls(Collection<IoScheduler> jobSchedulers, boolean binary) {

if (binary) {
List<JobInfo> list = new ArrayList<>();
Expand Down

0 comments on commit fe7635b

Please sign in to comment.