Skip to content

Commit

Permalink
dcache-bulk: convert pid field/column to enum/integer expressing node…
Browse files Browse the repository at this point in the history
… type

Motivation:

The Bulk database schema and target object currently have a pid
column/field.  This originally meant "parent id" and was used
to walk the recursion tree, storing the parent sequence number (id).

This field was made obsolete by subsequence changes to the
recursion logic put in place during the experimentation that
led to version 2, but I neglected to remove the field/column.

However, we can repurpose this (changing its data type to simple
integer) to hold an enum which is useful in distinguishing
the paths initially submitted with the request (now stored
immediately) from paths subsequently discovered through
directory recursion.

Modification:

Change the pid to hold the enum (ROOT, INTIAL, DISCOVERED).
Change the database column from big decimal to integer.
(we also add a change of type from text to varchar for
the error field).

Result:

Pid now usable to track initial vs discovered targets.

Target: master
Patch: https://rb.dcache.org/r/13789
Acked-by: Tigran
Requires-notes: no
  • Loading branch information
alrossi committed Nov 30, 2022
1 parent 0d9397f commit 1dd3b4c
Show file tree
Hide file tree
Showing 16 changed files with 178 additions and 78 deletions.
Expand Up @@ -104,6 +104,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.services.bulk.store.BulkTargetStore;
import org.dcache.services.bulk.util.BulkRequestFilter;
import org.dcache.services.bulk.util.BulkRequestTarget;
import org.dcache.services.bulk.util.BulkRequestTarget.PID;
import org.dcache.services.bulk.util.BulkRequestTarget.State;
import org.dcache.services.bulk.util.BulkServiceStatistics;
import org.dcache.services.bulk.util.BulkTargetFilter;
Expand Down Expand Up @@ -1080,8 +1081,10 @@ class TargetLs implements Callable<PagedTargetResult> {
long offset = 0L;

@Option(name = "pid",
usage = "Id of the parent of the target.")
Long pid;
valueSpec = "ROOT|INITIAL|DISCOVERED",
usage = "Node type of the target.")

String pid;

@Option(name = "rid",
separator = ",",
Expand Down Expand Up @@ -1154,7 +1157,9 @@ public PagedTargetResult call() throws Exception {
}
}

BulkTargetFilter filter = new BulkTargetFilter(ids, offset, pid, pnfsids, paths,
PID nodeType = pid == null ? null : PID.valueOf(pid);

BulkTargetFilter filter = new BulkTargetFilter(ids, offset, nodeType, pnfsids, paths,
activities, types, states);

if (count) {
Expand Down
Expand Up @@ -63,7 +63,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import static org.dcache.services.bulk.BulkRequestStatus.CANCELLING;
import static org.dcache.services.bulk.BulkRequestStatus.COMPLETED;

import diskCacheV111.util.FsPath;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
Expand All @@ -81,9 +80,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.services.bulk.store.BulkRequestStore;
import org.dcache.services.bulk.store.BulkTargetStore;
import org.dcache.services.bulk.util.BulkRequestTarget;
import org.dcache.services.bulk.util.BulkRequestTargetBuilder;
import org.dcache.services.bulk.util.BulkServiceStatistics;
import org.dcache.vehicles.FileAttributes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Required;
Expand All @@ -109,25 +106,6 @@ public final class BulkRequestHandler implements BulkSubmissionHandler,
private BulkServiceStatistics statistics;
private ExecutorService cancelExecutor;

/**
* Caused by an internal issue.
* <p>
* Essentially a premature failure.
*/
@Override
public void abortRequestTarget(BulkRequestTarget parent, FsPath path, FileAttributes attributes,
Throwable exception)
throws BulkServiceException {
LOGGER.trace("requestTargetAborted {}, {}, {}; calling abort on request store",
parent, path, exception.toString());
targetStore.abort(
BulkRequestTargetBuilder.builder().rid(parent.getRid()).pid(parent.getId())
.activity(parent.getActivity()).path(path).attributes(attributes)
.error(exception).build());
requestManager.signal();
statistics.incrementJobsAborted();
}

@Override
public void cancelRequest(Subject subject, String id) throws BulkServiceException {
LOGGER.trace("cancelRequest {}, {}.", subject, id);
Expand Down
Expand Up @@ -59,31 +59,16 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
*/
package org.dcache.services.bulk.handler;

import diskCacheV111.util.FsPath;
import java.util.List;
import javax.security.auth.Subject;
import org.dcache.services.bulk.BulkRequest;
import org.dcache.services.bulk.BulkServiceException;
import org.dcache.services.bulk.util.BulkRequestTarget;
import org.dcache.vehicles.FileAttributes;

/**
* Defines the basic submission methods for interacting with the queue.
*/
public interface BulkSubmissionHandler {

/**
* Unrecoverable internal failure. This usually means that the target has not yet been
* processed into a job.
*
* @param parent target of failed request
* @param path of target
* @param attributes of target
* @param exception error
*/
void abortRequestTarget(BulkRequestTarget parent, FsPath path, FileAttributes attributes,
Throwable exception) throws BulkServiceException;

/**
* Services request (from user) to cancel the request.
*
Expand Down
Expand Up @@ -91,6 +91,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.services.bulk.store.BulkTargetStore;
import org.dcache.services.bulk.util.BatchedResult;
import org.dcache.services.bulk.util.BulkRequestTarget;
import org.dcache.services.bulk.util.BulkRequestTarget.PID;
import org.dcache.services.bulk.util.BulkRequestTarget.State;
import org.dcache.services.bulk.util.BulkRequestTargetBuilder;
import org.dcache.util.SignalAware;
Expand All @@ -100,6 +101,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.vehicles.FileAttributes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.annotation.meta.field;

/**
* Base class for the implementations. Acts as a container for a list of targets which may or may
Expand Down Expand Up @@ -151,7 +153,6 @@ enum ContainerState {

protected volatile ContainerState containerState;

private final Long pid;
private final TargetType targetType;
private final BulkRequestTarget target;
private final Subject subject;
Expand All @@ -170,7 +171,6 @@ enum ContainerState {
this.target = target;
this.subject = activity.getSubject();
this.restriction = activity.getRestriction();
pid = target.getId();
waiting = new HashMap<>();
cancelledPaths = new HashSet<>();
rid = request.getId();
Expand Down Expand Up @@ -462,8 +462,11 @@ private void checkTransitionToDirs() {

protected BulkRequestTarget toTarget(FsPath path, Optional<FileAttributes> attributes,
State state, Object errorObject) {
/* REVISIT pid should be INITIAL or DISCOVERED on basis of recursion. INITIAL is
* a placeholder until we add retrieval of initial paths from target table.
*/
return BulkRequestTargetBuilder.builder().attributes(attributes.orElse(null))
.activity(activity.getName()).pid(pid).rid(rid).state(state)
.activity(activity.getName()).pid(PID.INITIAL).rid(rid).state(state)
.createdAt(System.currentTimeMillis()).error(errorObject).path(path)
.build();
}
Expand Down
Expand Up @@ -60,7 +60,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
package org.dcache.services.bulk.job;

import static org.dcache.services.bulk.util.BulkRequestTarget.PLACEHOLDER_PNFSID;
import static org.dcache.services.bulk.util.BulkRequestTarget.ROOT_REQUEST_PARENT;
import static org.dcache.services.bulk.util.BulkRequestTarget.ROOT_REQUEST_PATH;

import diskCacheV111.util.PnfsHandler;
Expand All @@ -77,6 +76,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.services.bulk.store.BulkRequestStore;
import org.dcache.services.bulk.store.BulkTargetStore;
import org.dcache.services.bulk.util.BulkRequestTarget;
import org.dcache.services.bulk.util.BulkRequestTarget.PID;
import org.dcache.services.bulk.util.BulkRequestTargetBuilder;
import org.dcache.util.list.ListDirectoryHandler;
import org.dcache.vehicles.FileAttributes;
Expand Down Expand Up @@ -112,7 +112,7 @@ public AbstractRequestContainerJob createRequestJob(BulkRequest request)
attributes.setPnfsId(PLACEHOLDER_PNFSID);
BulkRequestTarget target = BulkRequestTargetBuilder.builder()
.activity(activity.getName())
.rid(request.getId()).pid(ROOT_REQUEST_PARENT).attributes(attributes)
.rid(request.getId()).pid(PID.ROOT).attributes(attributes)
.path(ROOT_REQUEST_PATH).build();

PnfsHandler pnfsHandler = new PnfsHandler(pnfsManager);
Expand Down
Expand Up @@ -59,12 +59,14 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
*/
package org.dcache.services.bulk.store.jdbc;

import com.google.common.base.Throwables;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.SocketException;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
Expand All @@ -78,6 +80,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.ws.rs.HEAD;
import org.dcache.db.JdbcCriterion;
import org.dcache.db.JdbcUpdate;
import org.dcache.services.bulk.BulkStorageException;
Expand All @@ -87,6 +90,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.jdbc.CannotGetJdbcConnectionException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.ParameterizedPreparedStatementSetter;
import org.springframework.jdbc.core.RowMapper;
Expand Down Expand Up @@ -181,7 +185,6 @@ public Object deserializeFromBase64(String request, String field, String base64)
public <T> List<T> get(JdbcCriterion criterion, int limit, String tableName,
JdbcDaoSupport support, RowMapper<T> mapper) {
LOGGER.trace("get {}, limit {}.", criterion, limit);

Boolean reverse = criterion.reverse();
String direction = reverse == null || !reverse ? "ASC" : "DESC";
String sql = "SELECT * FROM " + tableName + " WHERE " + criterion.getPredicate()
Expand Down Expand Up @@ -236,7 +239,8 @@ public int update(JdbcCriterion criterion, JdbcUpdate update, String tableName,
LOGGER.trace("update {} : {}.", criterion, update);
String sql = "UPDATE " + tableName + " SET " + update.getUpdate() + " WHERE "
+ criterion.getPredicate();
LOGGER.trace("update {} ({}, {}).", sql, update.getArguments(), criterion.getArguments());
LOGGER.trace("update {} ({}, {}).", sql, update.getArguments(),
criterion.getArguments());
return support.getJdbcTemplate().update(sql,
concatArguments(update.getArguments(), criterion.getArguments()));
}
Expand Down
Expand Up @@ -219,9 +219,11 @@ public int updateTo(BulkRequestStatus status, String id) {
Timestamp now = new Timestamp(System.currentTimeMillis());
switch (status) {
case COMPLETED:
return getJdbcTemplate().update(UPDATE_COMPLETED_IF_DONE, new Object[]{now, id});
return getJdbcTemplate().update(UPDATE_COMPLETED_IF_DONE,
new Object[]{now, id});
case CANCELLED:
return getJdbcTemplate().update(UPDATE_CANCELLED_IF_DONE, new Object[]{now, id});
return getJdbcTemplate().update(UPDATE_CANCELLED_IF_DONE,
new Object[]{now, id});
default:
return 0;
}
Expand Down
Expand Up @@ -66,7 +66,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import static org.dcache.services.bulk.BulkRequestStatus.STARTED;
import static org.dcache.services.bulk.util.BulkRequestTarget.NON_TERMINAL;
import static org.dcache.services.bulk.util.BulkRequestTarget.PLACEHOLDER_PNFSID;
import static org.dcache.services.bulk.util.BulkRequestTarget.ROOT_REQUEST_PARENT;
import static org.dcache.services.bulk.util.BulkRequestTarget.ROOT_REQUEST_PATH;

import com.google.common.base.Throwables;
Expand Down Expand Up @@ -105,6 +104,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.services.bulk.store.jdbc.rtarget.JdbcRequestTargetDao;
import org.dcache.services.bulk.util.BulkRequestFilter;
import org.dcache.services.bulk.util.BulkRequestTarget;
import org.dcache.services.bulk.util.BulkRequestTarget.PID;
import org.dcache.services.bulk.util.BulkRequestTarget.State;
import org.dcache.services.bulk.util.BulkRequestTargetBuilder;
import org.dcache.vehicles.FileAttributes;
Expand Down Expand Up @@ -183,7 +183,7 @@ public void abort(BulkRequest request, Throwable exception) {
attributes.setPnfsId(PLACEHOLDER_PNFSID);

BulkRequestTarget target = BulkRequestTargetBuilder.builder().rid(requestId)
.pid(ROOT_REQUEST_PARENT).activity(request.getActivity())
.pid(PID.ROOT).activity(request.getActivity())
.path(ROOT_REQUEST_PATH).attributes(attributes).error(exception).build();

try {
Expand Down Expand Up @@ -443,7 +443,7 @@ public void load() throws BulkStorageException {
* deletion of the bulk request job (parent = -1) is necessary to avoid processing
* a placeholder target as if it represented a real namespace entry.
*/
requestTargetDao.delete(requestTargetDao.where().pid(ROOT_REQUEST_PARENT));
requestTargetDao.delete(requestTargetDao.where().pid(PID.ROOT));
requestDao.update(requestDao.where().status(STARTED, CANCELLING),
requestDao.set().status(QUEUED));
}
Expand Down Expand Up @@ -735,10 +735,9 @@ private BulkRequestInfo processInfo(BulkRequest stored, long offset) {
* Order by id from offset. Limit is 10000 per swatch.
*/
List<BulkRequestTargetInfo> targets =
requestTargetDao.get(requestTargetDao.where().rid(requestId).offset(offset)
.sorter("id"), FETCH_SIZE)
.stream().filter(t -> !t.getPath().equals(ROOT_REQUEST_PATH))
.map(this::toRequestTargetInfo)
requestTargetDao.get(
requestTargetDao.where().rid(requestId).offset(offset).notRootRequest()
.sorter("id"), FETCH_SIZE).stream().map(this::toRequestTargetInfo)
.collect(Collectors.toList());
info.setTargets(targets);
int size = targets.size();
Expand Down
Expand Up @@ -64,7 +64,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.sql.Timestamp;
import org.dcache.db.JdbcCriterion;
import org.dcache.namespace.FileType;
import org.dcache.services.bulk.util.BulkRequestTarget;
import org.dcache.services.bulk.util.BulkRequestTarget.PID;
import org.dcache.services.bulk.util.BulkRequestTarget.State;
import org.dcache.services.bulk.util.BulkTargetFilter;

Expand All @@ -85,7 +85,7 @@ public JdbcRequestTargetCriterion id(Long id) {
}

public JdbcRequestTargetCriterion notRootRequest() {
addClause("pid != ?", BulkRequestTarget.ROOT_REQUEST_PARENT);
addClause("pid != ?", PID.ROOT.ordinal());
return this;
}

Expand All @@ -96,9 +96,9 @@ public JdbcRequestTargetCriterion offset(Long id) {
return this;
}

public JdbcRequestTargetCriterion pid(Long pid) {
public JdbcRequestTargetCriterion pid(PID pid) {
if (pid != null) {
addClause("pid = ?", pid);
addClause("pid = ?", pid.ordinal());
}
return this;
}
Expand Down
Expand Up @@ -79,6 +79,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.services.bulk.BulkRequest;
import org.dcache.services.bulk.store.jdbc.JdbcBulkDaoUtils;
import org.dcache.services.bulk.util.BulkRequestTarget;
import org.dcache.services.bulk.util.BulkRequestTarget.PID;
import org.dcache.services.bulk.util.BulkRequestTarget.State;
import org.dcache.services.bulk.util.BulkRequestTargetBuilder;
import org.dcache.vehicles.FileAttributes;
Expand Down Expand Up @@ -107,7 +108,7 @@ static class TargetPlaceholder {

static final ParameterizedPreparedStatementSetter<TargetPlaceholder> SETTER = (ps, target) -> {
Instant now = Instant.now();
ps.setBigDecimal(1, BigDecimal.ZERO); // DEPRECATED, will be removed REVISIT
ps.setInt(1, PID.INITIAL.ordinal());
ps.setString(2, target.rid);
ps.setString(3, "?");
ps.setString(4, target.path);
Expand Down Expand Up @@ -210,7 +211,7 @@ public BulkRequestTarget toRequestTarget(ResultSet rs, int row) throws SQLExcept

return BulkRequestTargetBuilder.builder()
.id(rs.getLong("id"))
.pid(rs.getLong("pid"))
.pid(PID.values()[rs.getInt("pid")])
.rid(rs.getString("rid"))
.activity(rs.getString("activity"))
.state(State.valueOf(rs.getString("state")))
Expand Down
Expand Up @@ -71,6 +71,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.sql.Timestamp;
import org.dcache.db.JdbcUpdate;
import org.dcache.namespace.FileType;
import org.dcache.services.bulk.util.BulkRequestTarget.PID;
import org.dcache.services.bulk.util.BulkRequestTarget.State;

/**
Expand Down Expand Up @@ -121,12 +122,8 @@ public JdbcRequestTargetUpdate errorObject(Throwable errorObject) {
return this;
}

public JdbcRequestTargetUpdate pid(Long pid) {
if (pid != null) {
set("pid", pid);
} else {
set("pid", 0L); // REVISIT will be deprecated
}
public JdbcRequestTargetUpdate pid(PID pid) {
set("pid", pid == null ? PID.INITIAL.ordinal() : pid.ordinal());
return this;
}

Expand Down

0 comments on commit 1dd3b4c

Please sign in to comment.