Skip to content

Commit

Permalink
pools: make flush queue configurable as FIFO or LIFO
Browse files Browse the repository at this point in the history
Motivation:

Mover queues have the option to be configured to act
as LIFO or FIFO (the default); while neither guarantees
fairness, it has been observed that LIFO sometimes
can help throughput of new requests when queues
are filled with long-running jobs.

The same switch may prove useful for flush (store)
queues to tape.

Modification:

Implement the switch, with appropriate property,
admin command, and printSetup line.

Result:

Flush queue semantics can be controlled dynamically
and through a property.

Target: master
Requires-notes: yes
Requires-book: yes
Patch: https://rb.dcache.org/r/11944
Acked-by: Lea
  • Loading branch information
alrossi committed Sep 25, 2019
1 parent 6a5c3df commit 0dd6c45
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 7 deletions.
26 changes: 26 additions & 0 deletions docs/TheBook/src/main/markdown/config-hsm.md
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,32 @@ Login to the Admin Interface to change the entry of the pool 'setup' file for a
(pool_1) admin > rh set timeout 5
(pool_1) admin > save


As with mover queues, the flush queue can also be set to behave as either LIFO
(last-in-first-out) or FIFO (first-in-first-out). This can be done statically using
the property:

```
(one-of?fifo|lifo)pool.flush-controller.queue-order=fifo
```

in the setup file:

```
flush set queue order lifo
```

or by using the admin command itself:

```
\s <pool-name> flush set queue order lifo
```

While neither queue order guarantees fairness, switching to LIFO under heavy
queuing where the jobs are long running may provide better throughput to
late-coming users. (Default is FIFO.)


#### The namespace layout

In order to allow dCache to remove files from attached TSSes, the “cleaner.enable.hsm = true” must be added immediately underneath the \[namespaceDomain/cleaner\] service declaration:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import diskCacheV111.util.PnfsId;
import diskCacheV111.vehicles.PoolFlushDoFlushMessage;
import diskCacheV111.vehicles.PoolFlushGainControlMessage;

import dmg.cells.nucleus.CellCommandListener;
import dmg.cells.nucleus.CellInfo;
import dmg.cells.nucleus.CellInfoAware;
Expand All @@ -36,11 +37,14 @@
import dmg.util.command.Argument;
import dmg.util.command.Command;
import dmg.util.command.Option;

import org.dcache.pool.PoolDataBeanProvider;
import org.dcache.util.FireAndForgetTask;
import org.dcache.pool.classic.MoverRequestScheduler.Order;
import org.dcache.pool.classic.json.FlushControllerData;
import org.dcache.util.FireAndForgetTask;

import static com.google.common.base.Preconditions.checkArgument;
import static org.dcache.pool.classic.MoverRequestScheduler.Order.FIFO;

/**
* Controls flush to tape.
Expand All @@ -63,6 +67,7 @@ public class HsmFlushController
private long _flushingInterval = TimeUnit.MINUTES.toMillis(1);
private long _retryDelayOnError = TimeUnit.MINUTES.toMillis(1);
private int _maxActive = 1000;
private Order _order = FIFO;
private PoolV2Mode _poolMode;
private Supplier<CellInfo> _cellInfoSupplier;
private StorageClassContainer _storageQueue;
Expand Down Expand Up @@ -154,6 +159,17 @@ public synchronized void setRetryDelayOnError(long delay)
_retryDelayOnError = delay;
}

public synchronized Order getQueueOrder()
{
return _order;
}

@Required
public synchronized void setQueueOrder(Order order)
{
_order = order;
}

public void start()
{
schedule();
Expand Down Expand Up @@ -193,6 +209,7 @@ public synchronized void printSetup(PrintWriter pw)
pw.println("flush set max active " + _maxActive);
pw.println("flush set interval " + TimeUnit.MILLISECONDS.toSeconds(_flushingInterval));
pw.println("flush set retry delay " + TimeUnit.MILLISECONDS.toSeconds(_retryDelayOnError));
pw.println("flush set queue order " + _order.name());
}

@Override
Expand Down Expand Up @@ -273,7 +290,7 @@ public void run()
if (_poolMode.isDisabled(PoolV2Mode.DISABLED_DEAD)) {
LOGGER.warn("Pool mode prevents flushing to nearline storage.");
} else {
_storageQueue.flushAll(getMaxActive(), _retryDelayOnError);
_storageQueue.flushAll(getMaxActive(), _retryDelayOnError, getQueueOrder());
}
}
}
Expand All @@ -295,6 +312,24 @@ public String call() throws IllegalArgumentException
}
}

@AffectsSetup
@Command(name = "flush set queue order",
description = "Set ready queue order to LIFO (last-in-first-out) or "
+ "FIFO (first-in-first-out, default).")
class SetQueueOrder implements Callable<String>
{
@Argument
String order = "FIFO";

@Override
public String call() throws IllegalArgumentException
{
Order o = Order.valueOf(order.toUpperCase());
setQueueOrder(o);
return "Ready queue set to " + o.name();
}
}

@AffectsSetup
@Command(name = "flush set interval",
description = "Set the interval at which to flush files to tape")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import dmg.util.command.Option;

import org.dcache.pool.PoolDataBeanProvider;
import org.dcache.pool.classic.MoverRequestScheduler.Order;
import org.dcache.pool.classic.json.HSMFlushQManagerData;
import org.dcache.pool.nearline.NearlineStorageHandler;
import org.dcache.pool.repository.CacheEntry;
Expand Down Expand Up @@ -211,7 +212,7 @@ public void flush(PnfsId pnfsId, CompletionHandler<Void,PnfsId> callback)
_storageHandler.flush(hsm, Collections.singleton(pnfsId), callback);
}

public void flushAll(int maxActive, long retryDelayOnError)
public void flushAll(int maxActive, long retryDelayOnError, Order order)
{
long now = System.currentTimeMillis();
Map<Boolean, List<StorageClassInfo>> classes =
Expand All @@ -230,10 +231,21 @@ public void flushAll(int maxActive, long retryDelayOnError)
.limit(drainLimit)
.forEach(StorageClassInfo::drain);

ready.stream()
.sorted(Comparator.comparing(StorageClassInfo::getLastSubmitted))
.limit(flushLimit)
.forEach(i -> i.flush(Integer.MAX_VALUE, null, null));
switch (order) {
case LIFO:
ready.stream()
.sorted(Comparator.comparing(StorageClassInfo::getLastSubmitted)
.reversed())
.limit(flushLimit)
.forEach(i -> i.flush(Integer.MAX_VALUE, null, null));
break;
default:
ready.stream()
.sorted(Comparator.comparing(StorageClassInfo::getLastSubmitted))
.limit(flushLimit)
.forEach(i -> i.flush(Integer.MAX_VALUE, null, null));
break;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@
<description>Controller for centralising flushing</description>
<property name="storageClassContainer" ref="queue"/>
<property name="poolMode" ref="pool-mode"/>
<property name="queueOrder" value="${pool.flush-controller.queue-order}"/>
</bean>

<bean id="billing-stub" class="org.dcache.cells.CellStub">
Expand Down
5 changes: 5 additions & 0 deletions skel/share/defaults/pool.properties
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,11 @@ pool.backend.ceph.pool-name = ${pool.name}
#
(immutable)pool.net.ports.tcp=${dcache.net.wan.port.min}-${dcache.net.wan.port.max} ${dcache.net.lan.port.min}-${dcache.net.lan.port.max}

#
# Set the order of the flush queue
#
(one-of?fifo|lifo)pool.flush-controller.queue-order=fifo

# Obsolete properties
(obsolete)pool.cell.export = See pool.cell.consume

Expand Down

0 comments on commit 0dd6c45

Please sign in to comment.