Skip to content

Commit

Permalink
dcache-bulk: reconfigure activity providers to capture environment
Browse files Browse the repository at this point in the history
Motivation:

Recently there has been a user request to allow
the default lifetime of the `PIN` and `STAGE` activities
to be configurable (these are currently hardcoded
to `5 MINUTES` and `P2D`, respectively).

Modification:

In order to achieve this, the activity providers
need an extra `configure` method which would allow
an `EnvironmentAware` factory to pass in properties.

The internals of some of the providers then need
to handle the configuration call as is appropriate
to them.

Finally, the activities themselves should be able
to see the set of default value descriptors in
a systematic way.

These changes are accompanied by the addition of
three `bulk.plugin!` properties which enable
control of the default lifetime values that
the providers get set with upon SPI loading.

Result:

It is now possible to configure default lifetime
for these two activities.

Target: master
Patch: https://rb.dcache.org/r/14058/
Requires-notes: yes
Acked-by: Lea
  • Loading branch information
alrossi committed Aug 22, 2023
1 parent a32aa03 commit 841d537
Show file tree
Hide file tree
Showing 19 changed files with 225 additions and 66 deletions.
Expand Up @@ -641,7 +641,7 @@ public String call() throws Exception {
Sorter sorter = new Sorter(SortOrder.valueOf(sort.toUpperCase()));
Set<BulkActivityArgumentDescriptor> descriptors = activityFactory.getProviders()
.get(activity)
.getArguments();
.getDescriptors();
String arguments = descriptors.stream()
.map(BulkServiceCommands::formatArgument)
.sorted(sorter)
Expand Down
Expand Up @@ -109,6 +109,7 @@ public enum TargetType {
protected ExecutorService activityExecutor;
protected ExecutorService callbackExecutor;
protected BulkTargetRetryPolicy retryPolicy;
protected Set<BulkActivityArgumentDescriptor> descriptors;

protected BulkActivity(String name, TargetType targetType) {
this.name = name;
Expand Down Expand Up @@ -178,6 +179,10 @@ public void setCallbackExecutor(ExecutorService callbackExecutor) {
this.callbackExecutor = callbackExecutor;
}

public void setDescriptors(Set<BulkActivityArgumentDescriptor> descriptors) {
this.descriptors = descriptors;
}

public void setMaxPermits(int maxPermits) {
this.maxPermits = maxPermits;
}
Expand Down
Expand Up @@ -65,6 +65,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import diskCacheV111.util.PnfsHandler;
import dmg.cells.nucleus.CellEndpoint;
import dmg.cells.nucleus.CellMessageSender;
import dmg.cells.nucleus.EnvironmentAware;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -93,7 +94,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* For each activity (such as pinning, deletion, etc.), there must be an SPI provider which creates
* the class implementing the activity API contract.
*/
public final class BulkActivityFactory implements CellMessageSender {
public final class BulkActivityFactory implements CellMessageSender, EnvironmentAware {

private static final Logger LOGGER = LoggerFactory.getLogger(BulkActivityFactory.class);

Expand All @@ -104,6 +105,7 @@ public final class BulkActivityFactory implements CellMessageSender {
private Map<String, ExecutorService> activityExecutors;
private Map<String, ExecutorService> callbackExecutors;
private Map<String, Integer> maxPermits;
private Map<String, Object> environment;

private CellStub pnfsManager;
private CellStub pinManager;
Expand Down Expand Up @@ -159,6 +161,7 @@ public void initialize() {
for (BulkActivityProvider provider : serviceLoader) {
String activity = provider.getActivity();
provider.setMaxPermits(maxPermits.get(activity));
provider.configure(environment);
providers.put(provider.getActivity(), provider);
}
pnfsHandler = new PnfsHandler(pnfsManager);
Expand Down Expand Up @@ -229,6 +232,11 @@ public void setCallbackExecutors(Map<String, ExecutorService> callbackExecutors)
this.callbackExecutors = callbackExecutors;
}

@Override
public void setEnvironment(Map<String, Object> environment) {
this.environment = environment;
}

private void configureEndpoints(BulkActivity activity) {
if (activity instanceof NamespaceHandlerAware) {
PnfsHandler pnfsHandler = new PnfsHandler(pnfsManager);
Expand Down
Expand Up @@ -59,6 +59,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
*/
package org.dcache.services.bulk.activity;

import java.util.Map;
import java.util.Set;
import org.dcache.services.bulk.BulkServiceException;
import org.dcache.services.bulk.activity.BulkActivity.TargetType;
Expand Down Expand Up @@ -103,6 +104,7 @@ public void setMaxPermits(int maxPermits) {
public J createActivity() throws BulkServiceException {
J activity = activityInstance();
activity.setMaxPermits(maxPermits);
activity.setDescriptors(getDescriptors());
return activity;
}

Expand All @@ -118,7 +120,9 @@ public J createActivity() throws BulkServiceException {
*
* @return argument set.
*/
public abstract Set<BulkActivityArgumentDescriptor> getArguments();
public abstract Set<BulkActivityArgumentDescriptor> getDescriptors();

public abstract void configure(Map<String, Object> environment);

protected abstract J activityInstance() throws BulkServiceException;
}
Expand Up @@ -130,9 +130,12 @@ protected void handleCompletion(BulkRequestTarget target,
@Override
protected void configure(Map<String, String> arguments) {
if (arguments == null) {
skipDirs = Boolean.parseBoolean(SKIP_DIRS.getDefaultValue());
/*
* There is only one descriptor.
*/
skipDirs = Boolean.parseBoolean(descriptors.iterator().next().getDefaultValue());
} else {
skipDirs = Boolean.parseBoolean(arguments.get(SKIP_DIRS.getName()));
skipDirs = Boolean.parseBoolean(arguments.get(SKIP_DIRS));
}
}
}
Expand Up @@ -62,19 +62,21 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import static org.dcache.services.bulk.activity.BulkActivity.TargetType.BOTH;

import com.google.common.collect.ImmutableSet;
import java.util.Map;
import java.util.Set;
import org.dcache.services.bulk.BulkServiceException;
import org.dcache.services.bulk.activity.BulkActivityArgumentDescriptor;
import org.dcache.services.bulk.activity.BulkActivityProvider;

public final class DeleteActivityProvider extends BulkActivityProvider<DeleteActivity> {
static final String SKIP_DIRS = "skipDirs";

static final BulkActivityArgumentDescriptor SKIP_DIRS =
new BulkActivityArgumentDescriptor("skipDirs",
"do not attempt to remove directories",
"true|false",
false,
"false");
private static final BulkActivityArgumentDescriptor DEFAULT_DESCRIPTOR
= new BulkActivityArgumentDescriptor(SKIP_DIRS,
"do not attempt to remove directories",
"true|false",
false,
"false");

public DeleteActivityProvider() {
super("DELETE", BOTH);
Expand All @@ -86,8 +88,13 @@ public Class<DeleteActivity> getActivityClass() {
}

@Override
public Set<BulkActivityArgumentDescriptor> getArguments() {
return ImmutableSet.of(SKIP_DIRS);
public Set<BulkActivityArgumentDescriptor> getDescriptors() {
return ImmutableSet.of(DEFAULT_DESCRIPTOR);
}

@Override
public void configure(Map<String, Object> environment) {
// NOP
}

@Override
Expand Down
Expand Up @@ -62,6 +62,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import static org.dcache.services.bulk.activity.BulkActivity.TargetType.BOTH;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.dcache.services.bulk.BulkServiceException;
import org.dcache.services.bulk.activity.BulkActivityArgumentDescriptor;
Expand All @@ -82,12 +83,17 @@ public Class<LogTargetActivity> getActivityClass() {
return LogTargetActivity.class;
}

public Set<BulkActivityArgumentDescriptor> getArguments() {
public Set<BulkActivityArgumentDescriptor> getDescriptors() {
return Collections.EMPTY_SET;
}

@Override
protected LogTargetActivity activityInstance() throws BulkServiceException {
return new LogTargetActivity(activity, targetType);
}

@Override
public void configure(Map<String, Object> environment) {
// NOP
}
}
Expand Up @@ -77,6 +77,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.concurrent.TimeUnit;
import org.dcache.pinmanager.PinManagerPinMessage;
import org.dcache.services.bulk.BulkServiceException;
import org.dcache.services.bulk.activity.BulkActivityArgumentDescriptor;
import org.dcache.services.bulk.util.BulkRequestTarget;
import org.dcache.vehicles.FileAttributes;

Expand Down Expand Up @@ -125,14 +126,14 @@ public ListenableFuture<Message> perform(String rid, long tid, FsPath target,

@Override
protected void configure(Map<String, String> arguments) {
TimeUnit defaultUnit = TimeUnit.valueOf(LIFETIME_UNIT.getDefaultValue());
Long defaultValue = Long.parseLong(LIFETIME.getDefaultValue());
TimeUnit defaultUnit = TimeUnit.valueOf(lifetimeUnitDefault());
Long defaultValue = Long.parseLong(lifetimeDefault());

if (arguments == null) {
lifetimeInMillis = defaultUnit.toMillis(defaultValue);
} else {
String expire = arguments.get(LIFETIME.getName());
String unit = arguments.get(LIFETIME_UNIT.getName());
String expire = arguments.get(LIFETIME);
String unit = arguments.get(LIFETIME_UNIT);

Long value = (long) (Double.parseDouble(expire));

Expand All @@ -141,7 +142,7 @@ protected void configure(Map<String, String> arguments) {
: TimeUnit.valueOf(unit).toMillis(value);
}

id = arguments == null ? null : arguments.get(PIN_REQUEST_ID.getName());
id = arguments == null ? null : arguments.get(PIN_REQUEST_ID);
}

private ProtocolInfo getProtocolInfo() throws URISyntaxException {
Expand All @@ -150,4 +151,22 @@ private ProtocolInfo getProtocolInfo() throws URISyntaxException {
null, null, null,
new URI("http", "localhost", null, null));
}

private String lifetimeDefault() {
for (BulkActivityArgumentDescriptor d: descriptors) {
if (d.getName().equals(LIFETIME)) {
return d.getDefaultValue();
}
}
return null;
}

private String lifetimeUnitDefault() {
for (BulkActivityArgumentDescriptor d: descriptors) {
if (d.getName().equals(LIFETIME_UNIT)) {
return d.getDefaultValue();
}
}
return null;
}
}
Expand Up @@ -63,34 +63,32 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import static org.dcache.services.bulk.activity.BulkActivityArgumentDescriptor.EMPTY_DEFAULT;

import com.google.common.collect.ImmutableSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.dcache.services.bulk.BulkServiceException;
import org.dcache.services.bulk.activity.BulkActivityArgumentDescriptor;
import org.dcache.services.bulk.activity.BulkActivityProvider;

public final class PinActivityProvider extends BulkActivityProvider<PinActivity> {

static final BulkActivityArgumentDescriptor LIFETIME
= new BulkActivityArgumentDescriptor("lifetime",
"duration of the pin",
"long",
false,
"5");

static final BulkActivityArgumentDescriptor LIFETIME_UNIT =
new BulkActivityArgumentDescriptor("lifetimeUnit",
"time unit for duration of the pin",
"SECONDS|MINUTES|HOURS|DAYS",
false,
"MINUTES");
static final String LIFETIME = "lifetime";
static final String LIFETIME_UNIT = "lifetimeUnit";
static final String PIN_REQUEST_ID = "id";

static final BulkActivityArgumentDescriptor PIN_REQUEST_ID
= new BulkActivityArgumentDescriptor("id",
static final BulkActivityArgumentDescriptor DEFAULT_PIN_REQUEST_ID
= new BulkActivityArgumentDescriptor(PIN_REQUEST_ID,
"to use for this pin. If null, the id of the current request will be used.",
"string",
false,
EMPTY_DEFAULT);

private static final String DEFAULT_PIN_LIFETIME = "bulk.plugin!pin.default-lifetime";
private static final String DEFAULT_PIN_LIFETIME_UNIT = "bulk.plugin!pin.default-lifetime.unit";

private String defaultLifetime;
private String defaultLifetimeUnit;

public PinActivityProvider() {
super("PIN", FILE);
}
Expand All @@ -101,12 +99,35 @@ public Class<PinActivity> getActivityClass() {
}

@Override
public Set<BulkActivityArgumentDescriptor> getArguments() {
return ImmutableSet.of(LIFETIME, LIFETIME_UNIT, PIN_REQUEST_ID);
public Set<BulkActivityArgumentDescriptor> getDescriptors() {
return ImmutableSet.of(getLifetime(), getLifetimeUnit(), DEFAULT_PIN_REQUEST_ID);
}

@Override
protected PinActivity activityInstance() throws BulkServiceException {
return new PinActivity(activity, targetType);
}

@Override
public void configure(Map<String, Object> environment) {
defaultLifetime = String.valueOf(environment.getOrDefault(DEFAULT_PIN_LIFETIME,5L));
defaultLifetimeUnit = String.valueOf(environment.getOrDefault(DEFAULT_PIN_LIFETIME_UNIT,
TimeUnit.MINUTES));
}

private BulkActivityArgumentDescriptor getLifetime() {
return new BulkActivityArgumentDescriptor(LIFETIME,
"duration of the pin",
"long",
false,
defaultLifetime);
}

private BulkActivityArgumentDescriptor getLifetimeUnit() {
return new BulkActivityArgumentDescriptor(LIFETIME_UNIT,
"time unit for duration of the pin",
"SECONDS|MINUTES|HOURS|DAYS",
false,
defaultLifetimeUnit);
}
}
Expand Up @@ -91,6 +91,6 @@ public ListenableFuture<Message> perform(String rid, long tid, FsPath target,

@Override
protected void configure(Map<String, String> arguments) {
id = arguments == null ? null : arguments.get(REQUEST_ID.getName());
id = arguments == null ? null : arguments.get(REQUEST_ID);
}
}
Expand Up @@ -61,13 +61,16 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING

import static org.dcache.services.bulk.activity.BulkActivity.TargetType.FILE;

import java.util.Map;
import java.util.Set;
import org.dcache.services.bulk.BulkServiceException;
import org.dcache.services.bulk.activity.BulkActivityArgumentDescriptor;
import org.dcache.services.bulk.activity.BulkActivityProvider;

public final class ReleaseActivityProvider extends BulkActivityProvider<ReleaseActivity> {
static final BulkActivityArgumentDescriptor REQUEST_ID
static final String REQUEST_ID = "id";

private static final BulkActivityArgumentDescriptor DEFAULT_DESCRIPTOR
= new BulkActivityArgumentDescriptor("id",
"to use for this release",
"string",
Expand All @@ -78,8 +81,8 @@ public ReleaseActivityProvider() {
}

@Override
public Set<BulkActivityArgumentDescriptor> getArguments() {
return Set.of(REQUEST_ID);
public Set<BulkActivityArgumentDescriptor> getDescriptors() {
return Set.of(DEFAULT_DESCRIPTOR);
}

@Override
Expand All @@ -91,4 +94,9 @@ public Class<ReleaseActivity> getActivityClass() {
protected ReleaseActivity activityInstance() throws BulkServiceException {
return new ReleaseActivity(activity, targetType);
}

@Override
public void configure(Map<String, Object> environment) {
// NOP
}
}

0 comments on commit 841d537

Please sign in to comment.