Skip to content

Commit

Permalink
dcache-bulk: modify qos update activity to support policies (qos rule…
Browse files Browse the repository at this point in the history
… engine 7)

Motivation:

Implement the rule engine extension to QoS services.

Modification:

Extensions to the bulk `UPDATE_QOS` activity
to support both the older style targets
(i.e., `tape`, `disk` and `disk+tape`)
as well as policy and policy state.

It is possible to move files
back and forth arbitrarily between these
two kinds of definitions; it is also
possible to start a file's policy
state at an internal index rather
than at the beginning of a policy.

Result:

Bulk control over qos changes now
includes policy manipulation.

Target: master
Patch: https://rb.dcache.org/r/14075/
Depends-on: #14074
Acked-by: Tigran
  • Loading branch information
alrossi committed Sep 6, 2023
1 parent 0ebfa05 commit 13580d9
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 12 deletions.
Expand Up @@ -214,7 +214,7 @@ public abstract ListenableFuture<R> perform(String rid, long tid, FsPath path, F
*
* @param arguments parameters of the specific activity.
*/
protected abstract void configure(Map<String, String> arguments);
protected abstract void configure(Map<String, String> arguments) throws BulkServiceException;

/**
* Internal implementation of completion handler taking full target.
Expand Down
Expand Up @@ -60,8 +60,11 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
package org.dcache.services.bulk.activity.plugin.qos;

import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
import static org.dcache.services.bulk.activity.plugin.qos.UpdateQoSActivityProvider.QOS_POLICY;
import static org.dcache.services.bulk.activity.plugin.qos.UpdateQoSActivityProvider.QOS_STATE;
import static org.dcache.services.bulk.activity.plugin.qos.UpdateQoSActivityProvider.TARGET_QOS;

import com.google.common.base.Strings;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import diskCacheV111.util.CacheException;
Expand Down Expand Up @@ -100,6 +103,8 @@ public class UpdateQoSActivity extends BulkActivity<QoSTransitionCompletedMessag
private CellStub qosEngine;
private PnfsHandler pnfsHandler;
private String targetQos;
private String qosPolicy;
private Integer qosState;

public UpdateQoSActivity(String name, TargetType targetType) {
super(name, targetType);
Expand All @@ -123,8 +128,8 @@ public synchronized void cancel(BulkRequestTarget target) {
@Override
public ListenableFuture<QoSTransitionCompletedMessage> perform(String rid, long tid,
FsPath path, FileAttributes attributes) throws BulkServiceException {
if (targetQos == null) {
return Futures.immediateFailedFuture(new IllegalArgumentException("no target qos given."));
if (targetQos == null && qosPolicy == null) {
return Futures.immediateFailedFuture(new IllegalArgumentException("no target qos or policy given."));
}

if (attributes == null) {
Expand All @@ -137,12 +142,17 @@ public ListenableFuture<QoSTransitionCompletedMessage> perform(String rid, long

PnfsId pnfsId = attributes.getPnfsId();
FileQoSRequirements requirements = new FileQoSRequirements(pnfsId, attributes);
if (targetQos.contains("disk")) {
requirements.setRequiredDisk(1);
}
if (qosPolicy != null) {
requirements.setRequiredQoSPolicy(qosPolicy);
requirements.setRequiredQoSStateIndex(qosState == null ? 0 : qosState);
} else {
if (targetQos.contains("disk")) {
requirements.setRequiredDisk(1);
}

if (targetQos.contains("tape")) {
requirements.setRequiredTape(1);
if (targetQos.contains("tape")) {
requirements.setRequiredTape(1);
}
}

QoSTransitionFuture future = responseReceiver.register(pnfsId.toString());
Expand All @@ -161,11 +171,28 @@ public ListenableFuture<QoSTransitionCompletedMessage> perform(String rid, long
}

@Override
protected void configure(Map<String, String> arguments) {
protected void configure(Map<String, String> arguments) throws BulkServiceException {
if (arguments == null) {
return;
}
targetQos = arguments.get(TARGET_QOS);
qosPolicy = arguments.get(QOS_POLICY.getName());

if (qosPolicy == null) {
qosState = Integer.parseInt(QOS_STATE.getDefaultValue());
} else {
String state = arguments.get(QOS_STATE.getName());
if ( Strings.emptyToNull(state) == null) {
qosState = Integer.parseInt(QOS_STATE.getDefaultValue());
} else {
qosState = Integer.parseInt(state);
}
}

if (targetQos == null && qosPolicy == null) {
throw new BulkServiceException("either targetQos or qosPolicy must be "
+ "provided as argument.");
}
}

@Override
Expand Down
Expand Up @@ -60,6 +60,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
package org.dcache.services.bulk.activity.plugin.qos;

import static org.dcache.services.bulk.activity.BulkActivity.TargetType.FILE;
import static org.dcache.services.bulk.activity.BulkActivityArgumentDescriptor.EMPTY_DEFAULT;

import com.google.common.collect.ImmutableSet;
import java.util.Map;
Expand All @@ -77,8 +78,22 @@ public final class UpdateQoSActivityProvider extends BulkActivityProvider<Update
"the desired qos transition ('disk' is limited to "
+ "files with volatile/unknown qos status)",
"disk|tape|disk+tape",
true,
null);
false,
EMPTY_DEFAULT);

static final BulkActivityArgumentDescriptor QOS_STATE =
new BulkActivityArgumentDescriptor("qosState",
"the index into the desired policy's list states",
"integer",
false,
"0");

static final BulkActivityArgumentDescriptor QOS_POLICY =
new BulkActivityArgumentDescriptor("qosPolicy",
"the name of the qos policy to apply to the file",
"string, max 64 chars",
false,
EMPTY_DEFAULT);

public UpdateQoSActivityProvider() {
super("UPDATE_QOS", FILE);
Expand All @@ -89,8 +104,9 @@ public Class<UpdateQoSActivity> getActivityClass() {
return UpdateQoSActivity.class;
}

@Override
public Set<BulkActivityArgumentDescriptor> getDescriptors() {
return ImmutableSet.of(DEFAULT_DESCRIPTOR);
return ImmutableSet.of(DEFAULT_DESCRIPTOR, QOS_STATE, QOS_POLICY);
}

@Override
Expand Down

0 comments on commit 13580d9

Please sign in to comment.