Skip to content

Commit

Permalink
dcache-bulk: propagate PID enum to distinguish between INITIAL and DI…
Browse files Browse the repository at this point in the history
…SCOVERED paths

Motivation:

Finish the work necessary to support elimination of
redundant target column/field.

This is the second part of #13789.

Modification:

Adds necessary PID parameters to various method calls
so that querying can be done on that basis, in
particular to distinguish paths stored initially
from those discovered through recursion on directories.

Result:

We are now ready to read the paths from the request_target
table on container start and to drop the bulk_request text
column.

Target: master
Patch: https://rb.dcache.org/r/13790
Requires-notes: no
Acked-by: Tigran
  • Loading branch information
alrossi committed Nov 30, 2022
1 parent 7dd2bdb commit 5facbaf
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 44 deletions.
Expand Up @@ -368,7 +368,7 @@ protected DirectoryStream getDirectoryListing(FsPath path)
Range.closedOpen(0, Integer.MAX_VALUE), MINIMALLY_REQUIRED_ATTRIBUTES);
}

protected void expandDepthFirst(FsPath path, FileAttributes dirAttributes)
protected void expandDepthFirst(PID pid, FsPath path, FileAttributes dirAttributes)
throws CacheException, InterruptedException {
checkForRequestCancellation();

Expand All @@ -385,20 +385,20 @@ protected void expandDepthFirst(FsPath path, FileAttributes dirAttributes)
case ALL:
LOGGER.debug("expandDepthFirst {}, found directory {}, "
+ "expand ALL.", rid, childPath);
expandDepthFirst(childPath, childAttributes);
expandDepthFirst(PID.DISCOVERED, childPath, childAttributes);
break;
case TARGETS:
switch (targetType) {
case BOTH:
case DIR:
handleDirTarget(childPath, childAttributes);
handleDirTarget(PID.DISCOVERED, childPath, childAttributes);
}
break;
}
break;
case LINK:
case REGULAR:
handleFileTarget(childPath, childAttributes);
handleFileTarget(PID.DISCOVERED, childPath, childAttributes);
break;
case SPECIAL:
default:
Expand All @@ -413,14 +413,15 @@ protected void expandDepthFirst(FsPath path, FileAttributes dirAttributes)
switch (targetType) {
case BOTH:
case DIR:
handleDirTarget(path, dirAttributes);
handleDirTarget(pid, path, dirAttributes);
}
}

protected boolean hasBeenCancelled(FsPath path, FileAttributes attributes) {
protected boolean hasBeenCancelled(PID pid, FsPath path, FileAttributes attributes) {
synchronized (cancelledPaths) {
if (cancelledPaths.remove(path.toString())) {
BulkRequestTarget target = toTarget(path, Optional.of(attributes), CANCELLED, null);
BulkRequestTarget target = toTarget(pid, path, Optional.of(attributes), CANCELLED,
null);
try {
if (!targetStore.store(target)) {
targetStore.update(target.getId(), CANCELLED, null);
Expand Down Expand Up @@ -460,7 +461,7 @@ private void checkTransitionToDirs() {
}
}

protected BulkRequestTarget toTarget(FsPath path, Optional<FileAttributes> attributes,
protected BulkRequestTarget toTarget(PID pid, FsPath path, Optional<FileAttributes> attributes,
State state, Object errorObject) {
/* REVISIT pid should be INITIAL or DISCOVERED on basis of recursion. INITIAL is
* a placeholder until we add retrieval of initial paths from target table.
Expand All @@ -471,10 +472,10 @@ protected BulkRequestTarget toTarget(FsPath path, Optional<FileAttributes> attri
.build();
}

protected abstract void handleFileTarget(FsPath path, FileAttributes attributes)
protected abstract void handleFileTarget(PID pid, FsPath path, FileAttributes attributes)
throws InterruptedException;

protected abstract void handleDirTarget(FsPath path, FileAttributes attributes)
protected abstract void handleDirTarget(PID pid, FsPath path, FileAttributes attributes)
throws InterruptedException;

protected abstract void processFileTargets() throws InterruptedException;
Expand Down
Expand Up @@ -82,6 +82,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.services.bulk.activity.BulkActivity;
import org.dcache.services.bulk.util.BatchedResult;
import org.dcache.services.bulk.util.BulkRequestTarget;
import org.dcache.services.bulk.util.BulkRequestTarget.PID;
import org.dcache.services.bulk.util.BulkRequestTarget.State;
import org.dcache.vehicles.FileAttributes;
import org.slf4j.Logger;
Expand Down Expand Up @@ -127,17 +128,17 @@ public PrestoreRequestContainerJob(BulkActivity activity, BulkRequestTarget targ
}

@Override
protected void handleDirTarget(FsPath path, FileAttributes attributes)
protected void handleDirTarget(PID pid, FsPath path, FileAttributes attributes)
throws InterruptedException {
checkForRequestCancellation();
store(path, attributes);
store(pid, path, attributes);
}

@Override
protected void handleFileTarget(FsPath path, FileAttributes attributes)
protected void handleFileTarget(PID pid, FsPath path, FileAttributes attributes)
throws InterruptedException {
checkForRequestCancellation();
store(path, attributes);
store(pid, path, attributes);
}

@Override
Expand Down Expand Up @@ -198,14 +199,14 @@ protected void retryFailed(BatchedResult result, FileAttributes attributes)
}
}

private void addInfo(String target) {
private void addInfo(PID pid, String target) {
FsPath path = computeFsPath(targetPrefix, target);
try {
targetInfo.add(new TargetInfo(path, pnfsHandler.getFileAttributes(path,
MINIMALLY_REQUIRED_ATTRIBUTES)));
} catch (CacheException e) {
LOGGER.error("addInfo {}, path {}, error {}.", rid, path, e.getMessage());
BulkRequestTarget t = toTarget(path, Optional.ofNullable(null), FAILED, e);
BulkRequestTarget t = toTarget(pid, path, Optional.ofNullable(null), FAILED, e);
try {
targetStore.storeOrUpdate(t);
} catch (BulkStorageException ex) {
Expand Down Expand Up @@ -261,7 +262,7 @@ private ListenableFuture perform(BulkRequestTarget target)
FsPath path = target.getPath();
FileAttributes attributes = target.getAttributes();

if (hasBeenCancelled(path, attributes)) {
if (hasBeenCancelled(target.getPid(), path, attributes)) {
return Futures.immediateCancelledFuture();
}

Expand All @@ -285,7 +286,7 @@ private void register(BulkRequestTarget target, ListenableFuture future, Throwab
throws InterruptedException {
checkForRequestCancellation();

if (hasBeenCancelled(target.getPath(), target.getAttributes())) {
if (hasBeenCancelled(target.getPid(), target.getPath(), target.getAttributes())) {
return;
}

Expand All @@ -304,14 +305,14 @@ private void register(BulkRequestTarget target, ListenableFuture future, Throwab
}
}

private void store(FsPath path, FileAttributes attributes) throws InterruptedException {
private void store(PID pid, FsPath path, FileAttributes attributes) throws InterruptedException {
checkForRequestCancellation();
LOGGER.debug("store {}, path {}.", rid, path);
try {
if (hasBeenCancelled(path, attributes)) {
if (hasBeenCancelled(pid, path, attributes)) {
return;
}
BulkRequestTarget target = toTarget(path, Optional.of(attributes), CREATED, null);
BulkRequestTarget target = toTarget(pid, path, Optional.of(attributes), CREATED, null);
targetStore.storeOrUpdate(target);
} catch (BulkStorageException e) {
LOGGER.error("{}, could not store target {}, {}: {}.", rid, path, attributes,
Expand All @@ -323,7 +324,7 @@ private void storeAll(List<String> targets) throws InterruptedException {
for (String target : targets) {
checkForRequestCancellation();
semaphore.acquire();
activity.getActivityExecutor().submit(() -> addInfo(target)); /* RELEASES SEMAPHORE */
activity.getActivityExecutor().submit(() -> addInfo(PID.INITIAL, target)); /* RELEASES SEMAPHORE */
}

/*
Expand Down Expand Up @@ -351,9 +352,9 @@ private void storeAll(List<String> targets) throws InterruptedException {
LOGGER.debug("storeAll {}, path {}.", rid, info.path);
try {
if (depth != Depth.NONE && info.attributes.getFileType() == FileType.DIR) {
expandDepthFirst(info.path, info.attributes);
expandDepthFirst(PID.INITIAL, info.path, info.attributes);
} else if (info.attributes.getFileType() != FileType.SPECIAL) {
store(info.path, info.attributes);
store(PID.INITIAL, info.path, info.attributes);
}
} catch (CacheException e) {
LOGGER.error("storeAll {}, path {}, error {}.", rid, info.path, e.getMessage());
Expand Down
Expand Up @@ -79,6 +79,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.services.bulk.activity.BulkActivity;
import org.dcache.services.bulk.util.BatchedResult;
import org.dcache.services.bulk.util.BulkRequestTarget;
import org.dcache.services.bulk.util.BulkRequestTarget.PID;
import org.dcache.services.bulk.util.BulkRequestTarget.State;
import org.dcache.vehicles.FileAttributes;
import org.slf4j.Logger;
Expand All @@ -95,8 +96,10 @@ public final class RequestContainerJob extends AbstractRequestContainerJob {
static class DirTarget {
final FsPath path;
final FileAttributes attributes;
final PID pid;

DirTarget(FsPath path, FileAttributes attributes) {
DirTarget(PID pid, FsPath path, FileAttributes attributes) {
this.pid = pid;
this.attributes = attributes;
this.path = path;
}
Expand Down Expand Up @@ -125,10 +128,10 @@ protected void processFileTargets() throws InterruptedException {
FsPath path = computeFsPath(targetPrefix, tgt);
switch (depth) {
case NONE:
perform(path, null);
perform(PID.INITIAL, path, null);
break;
default:
handleTarget(path);
handleTarget(PID.INITIAL, path);
}
}
}
Expand All @@ -137,29 +140,30 @@ protected void processFileTargets() throws InterruptedException {
protected void processDirTargets() throws InterruptedException {
for (DirTarget dirTarget : dirs) {
checkForRequestCancellation();
perform(dirTarget.path, dirTarget.attributes);
perform(dirTarget.pid, dirTarget.path, dirTarget.attributes);
}
}

@Override
protected void handleDirTarget(FsPath path, FileAttributes attributes) {
dirs.add(new DirTarget(path, attributes));
protected void handleDirTarget(PID pid, FsPath path, FileAttributes attributes) {
dirs.add(new DirTarget(pid, path, attributes));
}

@Override
protected void handleFileTarget(FsPath path, FileAttributes attributes)
protected void handleFileTarget(PID pid, FsPath path, FileAttributes attributes)
throws InterruptedException {
perform(path, attributes);
perform(pid, path, attributes);
}

@Override
protected void retryFailed(BatchedResult result, FileAttributes attributes)
throws BulkStorageException {
BulkRequestTarget completedTarget = result.getTarget();
FsPath path = completedTarget.getPath();
PID pid = completedTarget.getPid();
completedTarget.resetToReady();
try {
perform(path, attributes);
perform(pid, path, attributes);
} catch (InterruptedException e) {
LOGGER.debug("{}. retryFailed interrupted", rid);
targetStore.update(result.getTarget().getId(), FAILED, e);
Expand Down Expand Up @@ -191,28 +195,28 @@ private void handleCompletion(BatchedResult result, FileAttributes attributes) {
}
}

private void handleTarget(FsPath path) throws InterruptedException {
private void handleTarget(PID pid, FsPath path) throws InterruptedException {
checkForRequestCancellation();
FileAttributes attributes = null;
LOGGER.debug("handleTarget {}, path {}.", rid, path);
try {
attributes = pnfsHandler.getFileAttributes(path, MINIMALLY_REQUIRED_ATTRIBUTES);
if (attributes.getFileType() == FileType.DIR) {
expandDepthFirst(path, attributes);
expandDepthFirst(pid, path, attributes);
} else if (attributes.getFileType() != FileType.SPECIAL) {
perform(path, attributes);
perform(pid, path, attributes);
}
} catch (CacheException e) {
LOGGER.error("handleTarget {}, path {}, error {}.", rid, path, e.getMessage());
register(path, Futures.immediateFailedFuture(e), attributes, e);
register(pid, path, Futures.immediateFailedFuture(e), attributes, e);
}
}

private ListenableFuture perform(FsPath path, FileAttributes attributes)
private ListenableFuture perform(PID pid, FsPath path, FileAttributes attributes)
throws InterruptedException {
checkForRequestCancellation();

if (hasBeenCancelled(path, attributes)) {
if (hasBeenCancelled(pid, path, attributes)) {
return Futures.immediateCancelledFuture();
}

Expand All @@ -224,23 +228,23 @@ private ListenableFuture perform(FsPath path, FileAttributes attributes)
} catch (BulkServiceException | UnsupportedOperationException e) {
LOGGER.error("{}, perform failed for {}: {}", rid, path, e.toString());
future = Futures.immediateFailedFuture(e);
register(path, future, attributes, e);
register(pid, path, future, attributes, e);
return future;
}

register(path, future, attributes, null);
register(pid, path, future, attributes, null);
return future;
}

private void register(FsPath path, ListenableFuture future, FileAttributes attributes,
private void register(PID pid, FsPath path, ListenableFuture future, FileAttributes attributes,
Throwable error) throws InterruptedException {
checkForRequestCancellation();

if (hasBeenCancelled(path, attributes)) {
if (hasBeenCancelled(pid, path, attributes)) {
return;
}

BulkRequestTarget target = toTarget(path, Optional.ofNullable(attributes),
BulkRequestTarget target = toTarget(pid, path, Optional.ofNullable(attributes),
error == null ? RUNNING : FAILED, error);
BatchedResult result = new BatchedResult(target, future);

Expand Down
Expand Up @@ -392,4 +392,60 @@
<column name="rid"/>
</createIndex>
</changeSet>

<changeSet author="arossi" id="4.1" runInTransaction="false">
<preConditions onFail="MARK_RAN">
<and>
<columnExists tableName="request_target" columnName="pid"/>
</and>
</preConditions>
<sql splitStatements="false">
ALTER TABLE request_target ALTER COLUMN pid TYPE integer;
</sql>
</changeSet>

<!-- The following changes are necessary for consistency.
While '0' was not previously used, '-1' was, and this now
corresponds to the enum value '0'. Unfortunately there is no
way of distinguishing recursively found from original targets
in the prior implementation, but not observing that distinction
will result in replays of incomplete requests which for PIN, UNPIN,
and DELETE will be idempotent or will just fail because the file has
already been deleted.
It may be advisable to run this update offline using
`dcache database update`, depending on the size of the current
request_target table. -->
<changeSet author="arossi" id="4.1.1" runInTransaction="false">
<preConditions onFail="MARK_RAN">
<and>
<columnExists tableName="request_target" columnName="pid"/>
</and>
</preConditions>
<sql splitStatements="false">
UPDATE request_target SET pid = 1 WHERE pid = 0;
</sql>
</changeSet>

<changeSet author="arossi" id="4.1.2" runInTransaction="false">
<preConditions onFail="MARK_RAN">
<and>
<columnExists tableName="request_target" columnName="pid"/>
</and>
</preConditions>
<sql splitStatements="false">
UPDATE request_target SET pid = 0 WHERE pid = -1;
</sql>
</changeSet>

<changeSet author="arossi" id="4.2" runInTransaction="false">
<preConditions onFail="MARK_RAN">
<and>
<columnExists tableName="request_target" columnName="error"/>
</and>
</preConditions>
<sql splitStatements="false">
ALTER TABLE request_target ALTER COLUMN error TYPE varchar(256);
</sql>
</changeSet>
</databaseChangeLog>

0 comments on commit 5facbaf

Please sign in to comment.