Skip to content

Commit

Permalink
dcache-bulk: remove target error object and always convert to type + …
Browse files Browse the repository at this point in the history
…message

Motivation:

While the BulkRequestTarget object has a Throwable error
field, we do not store the error in serialized form in
the database; rather, we store its type (canonical class name)
and message; moreover, we always store the root exception
metadata.

However, when deserializing a processed target from the
database fields, we have to reconstruct an error for
the object field as a placeholder (as a Throwable
with error type + message as message).   This
then leads to displays of target info with error type
always `Throwable` and the type name embedded into the
message.  This is rather ugly and redundant.

Modification:

Eliminate the actual Throwable object field in the
BulkRequestTarget object, and always store the type
and message of the root exception directly.  These
are then simply pushed to the database and retrieved
as such.

Result:

Economy and consistency of error metadata for bulk
target information listings.

I consider this a defect so I am suggesting
back-port to 8.2

Target: master
Request: 8.2
Patch: https://rb.dcache.org/r/13895/
Requires-notes: yes
Acked-by: Tigran
  • Loading branch information
alrossi committed Feb 20, 2023
1 parent 930b8e6 commit d80292b
Show file tree
Hide file tree
Showing 12 changed files with 105 additions and 67 deletions.
Expand Up @@ -1044,10 +1044,9 @@ public String call() throws Exception {

BulkRequestTarget target = optional.get();
StringBuilder builder = new StringBuilder(formatTarget(target, true)).append("\n");
Throwable t = target.getThrowable();
if (t != null) {
builder.append("ERROR: ").append(t.getClass().getCanonicalName()).append(" : ")
.append(t.getMessage()).append("\n");
if (target.getErrorType() != null) {
builder.append("ERROR[").append(target.getErrorType()).append(" : ")
.append(target.getErrorMessage()).append("]\n");
}
return builder.toString();
}
Expand Down
Expand Up @@ -68,6 +68,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import static org.dcache.services.bulk.util.BulkRequestTarget.State.SKIPPED;
import static org.dcache.services.bulk.util.BulkRequestTarget.computeFsPath;

import com.google.common.base.Throwables;
import com.google.common.collect.Range;
import diskCacheV111.util.CacheException;
import diskCacheV111.util.FsPath;
Expand Down Expand Up @@ -228,7 +229,7 @@ public void cancel(long id) {
}

try {
targetStore.update(id, CANCELLED, null);
targetStore.update(id, CANCELLED, null, null);
} catch (BulkStorageException e) {
LOGGER.error("Failed to cancel {}::{}: {}.", ruid, id, e.toString());
}
Expand Down Expand Up @@ -358,7 +359,8 @@ public void uncaughtException(Thread t, Throwable e) {
public void update(State state) {
if (target.setState(state)) {
try {
targetStore.update(target.getId(), target.getState(), target.getThrowable());
targetStore.update(target.getId(), target.getState(), target.getErrorType(),
target.getErrorMessage());
} catch (BulkStorageException e) {
LOGGER.error("{}, updateJobState: {}", ruid, e.toString());
}
Expand Down Expand Up @@ -455,7 +457,7 @@ protected boolean hasBeenCancelled(Long id, PID pid, FsPath path, FileAttributes
if (id == null) {
targetStore.store(target);
} else {
targetStore.update(target.getId(), CANCELLED, null);
targetStore.update(target.getId(), CANCELLED, null, null);
}
} catch (BulkServiceException | UnsupportedOperationException e) {
LOGGER.error("hasBeenCancelled {}, failed for {}: {}", ruid, path, e.toString());
Expand Down Expand Up @@ -493,11 +495,20 @@ private void checkTransitionToDirs() {
}

protected BulkRequestTarget toTarget(Long id, PID pid, FsPath path, Optional<FileAttributes> attributes,
State state, Object errorObject) {
State state, Throwable throwable) {
String errorType = null;
String errorMessage = null;
Throwable root = null;
if (throwable != null) {
root = Throwables.getRootCause(throwable);
errorType = root.getClass().getCanonicalName();
errorMessage = root.getMessage();
}

return BulkRequestTargetBuilder.builder().attributes(attributes.orElse(null))
.activity(activity.getName()).id(id).pid(pid).rid(rid).ruid(ruid).state(state)
.createdAt(System.currentTimeMillis()).error(errorObject).path(path)
.build();
.createdAt(System.currentTimeMillis()).errorType(errorType)
.errorMessage(errorMessage).path(path).build();
}

protected abstract void handleFileTarget(PID pid, FsPath path, FileAttributes attributes)
Expand Down
Expand Up @@ -74,7 +74,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.ws.rs.HEAD;
import org.dcache.namespace.FileType;
import org.dcache.services.bulk.BulkRequest;
import org.dcache.services.bulk.BulkRequest.Depth;
Expand Down Expand Up @@ -219,7 +218,9 @@ protected void retryFailed(BatchedResult result, FileAttributes attributes)
perform(completedTarget);
} catch (InterruptedException e) {
LOGGER.debug("{}. retryFailed interrupted", ruid);
targetStore.update(result.getTarget().getId(), FAILED, e);
targetStore.update(result.getTarget().getId(), FAILED,
InterruptedException.class.getCanonicalName(),
"retryFailed interrupted for " + ruid);
}
}

Expand Down Expand Up @@ -261,7 +262,8 @@ private void handleCompletion(BatchedResult result) {
return;
}

targetStore.update(completedTarget.getId(), state, completedTarget.getThrowable());
targetStore.update(completedTarget.getId(), state, completedTarget.getErrorType(),
completedTarget.getErrorMessage());
} catch (BulkStorageException e) {
LOGGER.error("{} could not store target from result: {}: {}.", ruid, result.getTarget(),
e.toString());
Expand All @@ -282,7 +284,7 @@ private Optional<BulkRequestTarget> next(FileType type) throws BulkStorageExcept

BulkRequestTarget target = readyTargets.poll();
if (target != null) {
targetStore.update(target.getId(), READY, null);
targetStore.update(target.getId(), READY, null, null);
}

return Optional.ofNullable(target);
Expand Down Expand Up @@ -328,7 +330,7 @@ private void register(BulkRequestTarget target, ListenableFuture future, Throwab
if (error == null) {
target.setState(RUNNING);
try {
targetStore.update(target.getId(), RUNNING, error);
targetStore.update(target.getId(), RUNNING, null, null);
} catch (BulkStorageException e) {
LOGGER.error("{}, could not update target {},: {}.", ruid, target, e.toString());
}
Expand Down
Expand Up @@ -176,7 +176,9 @@ protected void retryFailed(BatchedResult result, FileAttributes attributes)
perform(id, pid, path, attributes);
} catch (InterruptedException e) {
LOGGER.debug("{}. retryFailed interrupted", ruid);
targetStore.update(result.getTarget().getId(), FAILED, e);
targetStore.update(result.getTarget().getId(), FAILED,
InterruptedException.class.getCanonicalName(),
"retryFailed interrupted for " + ruid);
}
}

Expand All @@ -193,7 +195,8 @@ private void handleCompletion(BatchedResult result, FileAttributes attributes) {
return;
}

targetStore.update(completedTarget.getId(), state, completedTarget.getThrowable());
targetStore.update(completedTarget.getId(), state, completedTarget.getErrorType(),
completedTarget.getErrorMessage());
} catch (BulkStorageException e) {
LOGGER.error("{} could not store target from result: {}, {}: {}.", ruid, result,
attributes, e.toString());
Expand Down
Expand Up @@ -539,7 +539,8 @@ private boolean isJobValid(AbstractRequestContainerJob job) {
case CANCELLED:
job.update(State.CANCELLED);
try {
targetStore.update(target.getId(), State.CANCELLED, target.getThrowable());
targetStore.update(target.getId(), State.CANCELLED, target.getErrorType(),
target.getErrorMessage());
} catch (BulkStorageException e) {
LOGGER.error("updateJobState", e.toString());
}
Expand Down
Expand Up @@ -181,8 +181,9 @@ List<BulkRequestTarget> nextReady(Long rid, FileType type, Integer limit)
*
* @param id
* @param state
* @param errorObject
* @param errorType
* @param errorMessage
* @throws BulkStorageException
*/
void update(Long id, State state, Throwable errorObject) throws BulkStorageException;
void update(Long id, State state, String errorType, String errorMessage) throws BulkStorageException;
}
Expand Up @@ -183,9 +183,13 @@ public void abort(BulkRequest request, Throwable exception) {
attributes.setFileType(FileType.SPECIAL);
attributes.setPnfsId(PLACEHOLDER_PNFSID);

Throwable root = Throwables.getRootCause(exception);

BulkRequestTarget target = BulkRequestTargetBuilder.builder().rid(request.getId())
.pid(PID.ROOT).activity(request.getActivity())
.path(ROOT_REQUEST_PATH).attributes(attributes).error(exception).build();
.path(ROOT_REQUEST_PATH).attributes(attributes)
.errorType(root.getClass().getCanonicalName())
.errorMessage(root.getMessage()).build();

try {
targetStore.abort(target);
Expand Down Expand Up @@ -455,7 +459,7 @@ public void load() throws BulkStorageException {
requestTargetDao.where().pids(PID.DISCOVERED.ordinal()).state(NON_TERMINAL));
requestTargetDao.update(
requestTargetDao.where().pids(PID.INITIAL.ordinal()).state(NON_TERMINAL),
requestTargetDao.set().state(CREATED).errorObject(null));
requestTargetDao.set().state(CREATED).errorType(null).errorMessage(null));
requestDao.update(requestDao.where().status(STARTED, CANCELLING),
requestDao.set().status(QUEUED));
}
Expand All @@ -466,8 +470,8 @@ public List<BulkRequest> next(Optional<String> sortedBy, Optional<Boolean> rever
LOGGER.trace("next {}.", limit);
return requestDao.get(
requestDao.where().status(QUEUED).sorter(sortedBy.orElse("arrived_at"))
.reverse(reverse.orElse(false)), (int) limit, true).stream()
.collect(Collectors.toList());
.reverse(reverse.orElse(false)), (int) limit, true)
.stream().collect(Collectors.toList());
}

@Override
Expand All @@ -487,7 +491,7 @@ public void reset(String uid) throws BulkStorageException {
requestTargetDao.delete(requestTargetDao.where().pids(PID.ROOT.ordinal()).ruids(uid));
requestTargetDao.delete(requestTargetDao.where().pids(PID.DISCOVERED.ordinal()).ruids(uid));
requestTargetDao.update(requestTargetDao.where().pids(PID.INITIAL.ordinal()).ruids(uid),
requestTargetDao.set().state(CREATED).errorObject(null));
requestTargetDao.set().state(CREATED).errorType(null).errorMessage(null));
requestDao.update(requestDao.where().uids(uid),
requestDao.set().status(QUEUED));
try {
Expand Down Expand Up @@ -784,12 +788,8 @@ private BulkRequestTargetInfo toRequestTargetInfo(BulkRequestTarget target) {
if (target.isTerminated()) {
info.setFinishedAt(target.getLastUpdated());
}
Throwable errorObject = target.getThrowable();
if (errorObject != null) {
Throwable root = Throwables.getRootCause(errorObject);
info.setErrorType(root.getClass().getCanonicalName());
info.setErrorMessage(root.getMessage());
}
info.setErrorType(target.getErrorType());
info.setErrorMessage(target.getErrorMessage());
return info;
}

Expand Down
Expand Up @@ -88,16 +88,17 @@ public final class JdbcBulkTargetStore implements BulkTargetStore {
@Override
public void abort(BulkRequestTarget target)
throws BulkStorageException {
LOGGER.trace("targetAborted {}, {}, {}.", target.getRuid(), target.getPath(),
target.getThrowable());
LOGGER.trace("targetAborted {}, {}, {}, {}.", target.getRuid(), target.getPath(),
target.getErrorType(), target.getErrorMessage());

/*
* If aborted, the target has not yet been stored ...
*/
targetDao.insert(
targetDao.set().pid(target.getPid()).rid(target.getRid())
.pnfsid(target.getPnfsId()).path(target.getPath()).type(target.getType())
.errorObject(target.getThrowable()).aborted());
.errorType(target.getErrorType()).errorMessage(target.getErrorMessage())
.aborted());
}

@Override
Expand Down Expand Up @@ -196,9 +197,9 @@ public void storeOrUpdate(BulkRequestTarget target) throws BulkStorageException
}

@Override
public void update(Long id, State state, Throwable errorObject) throws BulkStorageException {
public void update(Long id, State state, String errorType, String errorMessage) throws BulkStorageException {
targetDao.update(targetDao.where().id(id),
targetDao.set().state(state).errorObject(errorObject));
targetDao.set().state(state).errorType(errorType).errorMessage(errorMessage));
}

private JdbcRequestTargetUpdate prepareUpdate(BulkRequestTarget target) {
Expand All @@ -211,7 +212,7 @@ private JdbcRequestTargetUpdate prepareUpdate(BulkRequestTarget target) {
case FAILED:
case CANCELLED:
update = update.targetStart(target.getCreatedAt())
.errorObject(target.getThrowable());
.errorType(target.getErrorType()).errorMessage(target.getErrorMessage());
break;
case RUNNING:
update.targetStart(target.getCreatedAt());
Expand Down
Expand Up @@ -236,14 +236,7 @@ public BulkRequestTarget toRequestTarget(ResultSet rs, int row) throws SQLExcept
Long startedAt = timestamp == null ? null : timestamp.getTime();

String errorType = rs.getString("error_type");
Throwable error = null;

if (errorType != null) {
/** this is a placeholder **/
error = new Throwable(
"[errorType: " + errorType + "] " + rs.getString("error_message"));
}

String errorMessage = rs.getString("error_message");
return BulkRequestTargetBuilder.builder()
.id(rs.getLong("id"))
.pid(PID.values()[rs.getInt("pid")])
Expand All @@ -254,7 +247,7 @@ public BulkRequestTarget toRequestTarget(ResultSet rs, int row) throws SQLExcept
.createdAt(rs.getTimestamp("created_at").getTime())
.startedAt(startedAt)
.lastUpdated(rs.getTimestamp("last_updated").getTime())
.error(error).build();
.errorType(errorType).errorMessage(errorMessage).build();
}

public int update(JdbcRequestTargetCriterion criterion, JdbcRequestTargetUpdate update) {
Expand Down
Expand Up @@ -64,7 +64,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import static org.dcache.services.bulk.util.BulkRequestTarget.State.RUNNING;
import static org.dcache.util.Strings.truncate;

import com.google.common.base.Throwables;
import diskCacheV111.util.FsPath;
import diskCacheV111.util.PnfsId;
import java.sql.Timestamp;
Expand Down Expand Up @@ -122,12 +121,13 @@ public JdbcRequestTargetUpdate aborted() {
return this;
}

public JdbcRequestTargetUpdate errorObject(Throwable errorObject) {
if (errorObject != null) {
Throwable root = Throwables.getRootCause(errorObject);
set("error_type", root.getClass().getCanonicalName());
set("error_message", truncate(root.getMessage(), 256, false));
}
public JdbcRequestTargetUpdate errorType(String errorType) {
set("error_type", errorType);
return this;
}

public JdbcRequestTargetUpdate errorMessage(String errorMessage) {
set("error_message", truncate(errorMessage, 256, false));
return this;
}

Expand Down

0 comments on commit d80292b

Please sign in to comment.