Skip to content

Commit

Permalink
dcache-bulk: do not PIN or STAGE files with AL ONLINE
Browse files Browse the repository at this point in the history
Motivation:

Online files should not accumulate useless user pins.

Modification:

If a file's AL is ONLINE, immediately return
a future indicating indefinite lifetime.
The completion handler will understand these
to be ONLINE and mark them SKIPPED.

Result:

No unnecessary pins created.

Target: master
Request: 9.2
Request: 9.1
Request: 9.0
Request: 8.2
Requires-notes: yes
Patch: https://rb.dcache.org/r/14114/
Acked-by: Tigran
  • Loading branch information
alrossi committed Sep 29, 2023
1 parent 8479f44 commit 2507eea
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 1 deletion.
Expand Up @@ -93,7 +93,8 @@ public enum TargetType {

public static final Set<FileAttribute> MINIMALLY_REQUIRED_ATTRIBUTES
= Collections.unmodifiableSet(EnumSet.of(FileAttribute.PNFSID, FileAttribute.TYPE,
FileAttribute.OWNER_GROUP, FileAttribute.OWNER, FileAttribute.RETENTION_POLICY));
FileAttribute.OWNER_GROUP, FileAttribute.OWNER, FileAttribute.ACCESS_LATENCY,
FileAttribute.RETENTION_POLICY));

private static final BulkTargetRetryPolicy DEFAULT_RETRY_POLICY = new NoRetryPolicy();

Expand Down
Expand Up @@ -74,6 +74,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.dcache.pinmanager.PinManagerPinMessage;
import org.dcache.services.bulk.BulkServiceException;
Expand Down Expand Up @@ -118,6 +119,12 @@ public ListenableFuture<Message> perform(String rid, long tid, FsPath target,
= new PinManagerPinMessage(attributes, getProtocolInfo(), id,
lifetimeInMillis);
message.setSubject(subject);

Optional<ListenableFuture<Message>> skipOption = skipIfOnline(attributes, message);
if (skipOption.isPresent()) {
return skipOption.get();
}

return pinManager.send(message, Long.MAX_VALUE);
} catch (URISyntaxException | CacheException e) {
return Futures.immediateFailedFuture(e);
Expand Down
Expand Up @@ -61,18 +61,23 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING

import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
import static diskCacheV111.util.CacheException.INVALID_ARGS;
import static org.dcache.services.bulk.util.BulkRequestTarget.State.SKIPPED;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import diskCacheV111.util.AccessLatency;
import diskCacheV111.util.CacheException;
import diskCacheV111.util.FsPath;
import diskCacheV111.util.NamespaceHandlerAware;
import diskCacheV111.util.PnfsHandler;
import diskCacheV111.util.PnfsId;
import diskCacheV111.vehicles.Message;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import org.dcache.cells.CellStub;
import org.dcache.pinmanager.PinManagerAware;
import org.dcache.pinmanager.PinManagerPinMessage;
import org.dcache.pinmanager.PinManagerUnpinMessage;
import org.dcache.services.bulk.activity.BulkActivity;
import org.dcache.services.bulk.util.BulkRequestTarget;
Expand Down Expand Up @@ -105,6 +110,9 @@ protected void handleCompletion(BulkRequestTarget target, ListenableFuture<Messa
reply = getUninterruptibly(future);
if (reply.getReturnCode() != 0) {
target.setErrorObject(reply.getErrorObject());
} else if (reply instanceof PinManagerPinMessage
&& ((PinManagerPinMessage) reply).getLifetime() == -1L) {
target.setState(SKIPPED);
} else {
target.setState(State.COMPLETED);
}
Expand Down Expand Up @@ -144,4 +152,16 @@ protected void checkPinnable(FileAttributes attributes) throws CacheException {
throw new CacheException(INVALID_ARGS, "Not a regular file.");
}
}

protected Optional<ListenableFuture<Message>> skipIfOnline(FileAttributes attributes,
PinManagerPinMessage message) {
ListenableFuture<Message> future = null;
if (attributes.getAccessLatency() == AccessLatency.ONLINE) {
message.setReply();
message.setLifetime(-1L);
future = Futures.immediateFuture(message);
}

return Optional.ofNullable(future);
}
}
Expand Up @@ -78,6 +78,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.dcache.pinmanager.PinManagerPinMessage;
import org.dcache.services.bulk.BulkServiceException;
Expand Down Expand Up @@ -128,6 +129,12 @@ public ListenableFuture<Message> perform(String rid, long tid, FsPath target,
= new PinManagerPinMessage(attributes, getProtocolInfo(), id,
getLifetimeInMillis(target));
message.setSubject(subject);

Optional<ListenableFuture<Message>> skipOption = skipIfOnline(attributes, message);
if (skipOption.isPresent()) {
return skipOption.get();
}

return pinManager.send(message, Long.MAX_VALUE);
} catch (URISyntaxException | CacheException e) {
return Futures.immediateFailedFuture(e);
Expand Down

0 comments on commit 2507eea

Please sign in to comment.