Skip to content

Commit

Permalink
dcache-bulk: eliminate storage of initial targets as text column in b…
Browse files Browse the repository at this point in the history
…ulk_requests

Motivation:

The immediate storage of initial request targets in the DB
upon submission effected by #13782 effectively eliminates
the need to maintain the original targets as text string
in the bulk_request table.

To do this, though, means we must also read back only the
initial targets upon container startup.

Modification:

This patch provides the new storage methods and container
logic, as well as necessary modifications to the
load and reset methods; it also drops the text column.

Result:

Much less DB bloat.

I would argue that we should be "prepared" to back-port
 #13789, #13790 and #13791 to 8.2 should we discover
that normal usage requires an unreasonable amount of
disk for the DB.  We may also find that querying
the bulk_request table experiences a speedup with this patch.

Target: master
Patch: https://rb.dcache.org/r/13791
Requires-notes: yes (see below)
Acked-by: Tigran
  • Loading branch information
alrossi committed Nov 30, 2022
1 parent 5facbaf commit 4d8d361
Show file tree
Hide file tree
Showing 12 changed files with 93 additions and 114 deletions.
Expand Up @@ -92,6 +92,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.services.bulk.handler.BulkSubmissionHandler;
import org.dcache.services.bulk.manager.BulkRequestManager;
import org.dcache.services.bulk.store.BulkRequestStore;
import org.dcache.services.bulk.store.BulkTargetStore;
import org.dcache.services.bulk.util.BulkServiceStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -116,6 +117,7 @@ public final class BulkService implements CellLifeCycleAware, CellMessageReceive
private BulkRequestManager requestManager;
private BulkActivityFactory activityFactory;
private BulkRequestStore requestStore;
private BulkTargetStore targetStore;
private BulkSubmissionHandler submissionHandler;
private BulkServiceStatistics statistics;
private ExecutorService incomingExecutorService;
Expand Down Expand Up @@ -352,6 +354,11 @@ public void setSubmissionHandler(BulkSubmissionHandler submissionHandler) {
this.submissionHandler = submissionHandler;
}

@Required
public void setTargetStore(BulkTargetStore targetStore) {
this.targetStore = targetStore;
}

@Required
public void setIncomingExecutorService(ExecutorService incomingExecutorService) {
this.incomingExecutorService = incomingExecutorService;
Expand Down Expand Up @@ -539,8 +546,8 @@ private void validateTargets(String requestId, Subject subject, List<String> pat
}

String prefix = request.getTargetPrefix();
Set<FsPath> submitted = request.getTarget().stream().map(p -> computeFsPath(prefix, p))
.collect(Collectors.toSet());
Set<FsPath> submitted = targetStore.getInitialTargetPaths(requestId, false).stream()
.map(p -> computeFsPath(prefix, p)).collect(Collectors.toSet());
for (String path : paths) {
if (!submitted.contains(findAbsolutePath(prefix, path))) {
throw new BulkServiceException(String.format(INVALID_TARGET,
Expand Down
Expand Up @@ -75,6 +75,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -194,7 +195,7 @@ public void cancel() {

synchronized (waiting) {
LOGGER.debug("cancel {}: waiting {}.", rid, waiting.size());
waiting.values().forEach(r->r.cancel(activity));
waiting.values().forEach(r -> r.cancel(activity));
LOGGER.debug("cancel {}: waiting targets cancelled.", rid);
waiting.clear();
}
Expand Down Expand Up @@ -417,6 +418,10 @@ protected void expandDepthFirst(PID pid, FsPath path, FileAttributes dirAttribut
}
}

protected List<String> getInitialTargetPaths() {
return targetStore.getInitialTargetPaths(rid, true);
}

protected boolean hasBeenCancelled(PID pid, FsPath path, FileAttributes attributes) {
synchronized (cancelledPaths) {
if (cancelledPaths.remove(path.toString())) {
Expand Down
Expand Up @@ -143,12 +143,15 @@ protected void handleFileTarget(PID pid, FsPath path, FileAttributes attributes)

@Override
protected void preprocessTargets() throws InterruptedException {
if (request.getTarget().isEmpty()) {
List<String> requestTargets = getInitialTargetPaths();

if (requestTargets.isEmpty()) {
containerState = ContainerState.STOP;
update(FAILED);
return;
}
storeAll(request.getTarget());

storeAll(requestTargets);
}

@Override
Expand Down Expand Up @@ -290,15 +293,19 @@ private void register(BulkRequestTarget target, ListenableFuture future, Throwab
return;
}

target.setErrorObject(error);
BatchedResult result = new BatchedResult(target, future);

try {
targetStore.update(target.getId(), RUNNING, error);
} catch (BulkStorageException e) {
LOGGER.error("{}, could not update target {},: {}.", rid, target, e.toString());
if (error == null) {
target.setState(RUNNING);
try {
targetStore.update(target.getId(), RUNNING, error);
} catch (BulkStorageException e) {
LOGGER.error("{}, could not update target {},: {}.", rid, target, e.toString());
}
} else {
target.setErrorObject(error);
}

BatchedResult result = new BatchedResult(target, future);

synchronized (waiting) {
waiting.put(target.getPath(), result);
future.addListener(() -> handleCompletion(result), activity.getCallbackExecutor());
Expand All @@ -312,8 +319,8 @@ private void store(PID pid, FsPath path, FileAttributes attributes) throws Inter
if (hasBeenCancelled(pid, path, attributes)) {
return;
}
BulkRequestTarget target = toTarget(pid, path, Optional.of(attributes), CREATED, null);
targetStore.storeOrUpdate(target);
targetStore.storeOrUpdate(toTarget(pid, path, Optional.of(attributes),
CREATED, null));
} catch (BulkStorageException e) {
LOGGER.error("{}, could not store target {}, {}: {}.", rid, path, attributes,
e.toString());
Expand Down
Expand Up @@ -115,7 +115,7 @@ public RequestContainerJob(BulkActivity activity, BulkRequestTarget target,

@Override
protected void processFileTargets() throws InterruptedException {
List<String> requestTargets = request.getTarget();
List<String> requestTargets = getInitialTargetPaths();

if (requestTargets.isEmpty()) {
containerState = ContainerState.STOP;
Expand Down Expand Up @@ -246,13 +246,17 @@ private void register(PID pid, FsPath path, ListenableFuture future, FileAttribu

BulkRequestTarget target = toTarget(pid, path, Optional.ofNullable(attributes),
error == null ? RUNNING : FAILED, error);

BatchedResult result = new BatchedResult(target, future);

try {
targetStore.storeOrUpdate(target);
} catch (BulkStorageException e) {
LOGGER.error("{}, could not store target from result {}, {}, {}: {}.", rid, result,
attributes, e.toString());
if (error == null) {
LOGGER.error("register, incrementing {} {}.", target.getId(), target.getState());
try {
targetStore.storeOrUpdate(target);
} catch (BulkStorageException e) {
LOGGER.error("{}, could not store target from result {}, {}, {}: {}.", rid, result,
attributes, e.toString());
}
}

synchronized (waiting) {
Expand Down
Expand Up @@ -128,20 +128,13 @@ public interface BulkTargetStore {
*/
Map<String, Long> countsByState();

/**
* @param rid remove all jobs belonging to this request.
* @throws BulkStorageException
*/
void delete(String rid) throws BulkStorageException;

/**
* @param rid of the request.
* @param path of the target.
* @return true if it exists in the store, false otherwise.
*/
boolean exists(String rid, FsPath path);


/**
* @param filter on the target.
* @param limit max targets to return (can be <code>null</code>).
Expand All @@ -151,6 +144,13 @@ public interface BulkTargetStore {
List<BulkRequestTarget> find(BulkTargetFilter filter, Integer limit)
throws BulkStorageException;

/**
* @param requestId of the request the targets belong to.
* @param nonterminal only the initial targets which have not yet run.
* @return paths of the targets
*/
List<String> getInitialTargetPaths(String requestId, boolean nonterminal);

/**
* @param type REGULAR or DIR.
* @param limit max targets to return (can be <code>null</code>).
Expand Down
Expand Up @@ -59,14 +59,11 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
*/
package org.dcache.services.bulk.store.jdbc.request;

import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.dcache.services.bulk.BulkRequest;
Expand Down Expand Up @@ -155,7 +152,6 @@ public BulkRequest toRequest(ResultSet rs, int row) throws SQLException {
request.setExpandDirectories(Depth.valueOf(rs.getString("expand_directories")));
request.setUrlPrefix(rs.getString("url_prefix"));
request.setTargetPrefix(rs.getString("target_prefix"));
request.setTarget(new ArrayList<>(Arrays.asList(rs.getString("target").split(","))));
request.setClearOnSuccess(rs.getBoolean("clear_on_success"));
request.setClearOnFailure(rs.getBoolean("clear_on_failure"));
request.setCancelOnFailure(rs.getBoolean("cancel_on_failure"));
Expand Down Expand Up @@ -236,7 +232,6 @@ public JdbcBulkRequestUpdate updateFrom(BulkRequest request, String user)
.clearOnSuccess(request.isClearOnSuccess()).clearOnFailure(request.isClearOnFailure())
.prestore(request.isPrestore())
.depth(request.getExpandDirectories())
.target(Joiner.on(",").join(request.getTarget()))
.targetPrefix(request.getTargetPrefix()).urlPrefix(request.getUrlPrefix()).user(user)
.status(BulkRequestStatus.QUEUED).arrivedAt(System.currentTimeMillis());
}
Expand Down
Expand Up @@ -67,6 +67,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import static org.dcache.services.bulk.util.BulkRequestTarget.NON_TERMINAL;
import static org.dcache.services.bulk.util.BulkRequestTarget.PLACEHOLDER_PNFSID;
import static org.dcache.services.bulk.util.BulkRequestTarget.ROOT_REQUEST_PATH;
import static org.dcache.services.bulk.util.BulkRequestTarget.State.CREATED;

import com.google.common.base.Throwables;
import com.google.common.cache.CacheBuilder;
Expand Down Expand Up @@ -438,12 +439,19 @@ public boolean isRequestSubject(Subject subject, String id)
@Override
public void load() throws BulkStorageException {
LOGGER.trace("load called.");
requestTargetDao.delete(requestTargetDao.where().state(NON_TERMINAL));
/*
* deletion of the bulk request job (parent = -1) is necessary to avoid processing
* Deletion of the bulk request job (parent = -1) is necessary to avoid processing
* a placeholder target as if it represented a real namespace entry.
*
* We can safely delete all non-terminal discovered targets as they are produced
* from recursion and will be found again.
*
* Update non-terminal initial nodes to created.
*/
requestTargetDao.delete(requestTargetDao.where().pid(PID.ROOT));
requestTargetDao.delete(requestTargetDao.where().pid(PID.DISCOVERED).state(NON_TERMINAL));
requestTargetDao.update(requestTargetDao.where().pid(PID.INITIAL).state(NON_TERMINAL),
requestTargetDao.set().state(CREATED).errorObject(null));
requestDao.update(requestDao.where().status(STARTED, CANCELLING),
requestDao.set().status(QUEUED));
}
Expand All @@ -461,10 +469,16 @@ public List<BulkRequest> next(Optional<String> sortedBy, Optional<Boolean> rever
@Override
public void reset(String id) throws BulkStorageException {
/**
* Eliminate <i>all</i> targets for request; start from scratch.
* Start from scratch:
* - delete ROOT
* - delete DISCOVERED
* - set INITIAL to CREATED
*/
LOGGER.trace("reset {}.", id);
targetStore.delete(id);
requestTargetDao.delete(requestTargetDao.where().pid(PID.ROOT).rid(id));
requestTargetDao.delete(requestTargetDao.where().pid(PID.DISCOVERED).rid(id));
requestTargetDao.update(requestTargetDao.where().pid(PID.INITIAL).rid(id),
requestTargetDao.set().state(CREATED).errorObject(null));
requestDao.update(requestDao.where().requestIds(id),
requestDao.set().status(QUEUED));
try {
Expand Down
Expand Up @@ -139,13 +139,6 @@ public JdbcBulkRequestUpdate restriction(Restriction restriction) throws BulkSto
return this;
}

public JdbcBulkRequestUpdate target(String target) {
if (Strings.emptyToNull(target) != null) {
set("target", target);
}
return this;
}

public JdbcBulkRequestUpdate targetPrefix(String targetPrefix) {
if (Strings.emptyToNull(targetPrefix) != null) {
set("target_prefix", targetPrefix);
Expand Down
Expand Up @@ -66,10 +66,12 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.dcache.namespace.FileType;
import org.dcache.services.bulk.BulkStorageException;
import org.dcache.services.bulk.store.BulkTargetStore;
import org.dcache.services.bulk.util.BulkRequestTarget;
import org.dcache.services.bulk.util.BulkRequestTarget.PID;
import org.dcache.services.bulk.util.BulkRequestTarget.State;
import org.dcache.services.bulk.util.BulkTargetFilter;
import org.slf4j.Logger;
Expand Down Expand Up @@ -143,21 +145,6 @@ public Map<String, Long> countsByState() {
return targetDao.countStates();
}

/**
* Note that targets are not deleted singly by this implementation, but deleted by cascade on
* request id when clear is called, or en bloc during load (non-terminated targets) or reset.
*
* <p>Targets are left in the store to keep track of those processed,
* in case of interruption and restart.
*
* <p>The mass deletion by request id without elimination of the request itself
* provided by this method is only called on reset().
*/
@Override
public void delete(String rid) throws BulkStorageException {
targetDao.delete(targetDao.where().rid(rid));
}

@Override
public boolean exists(String rid, FsPath path) {
return targetDao.count(targetDao.where().rid(rid).path(path)) > 0;
Expand All @@ -169,6 +156,17 @@ public List<BulkRequestTarget> find(BulkTargetFilter jobFilter, Integer limit)
return targetDao.get(targetDao.where().filter(jobFilter).sorter("id"), limit);
}

@Override
public List<String> getInitialTargetPaths(String requestId, boolean nonterminal) {
JdbcRequestTargetCriterion criterion = targetDao.where().rid(requestId).pid(PID.INITIAL)
.sorter("id");
if (nonterminal) {
criterion.state(NON_TERMINAL);
}
return targetDao.get(criterion, Integer.MAX_VALUE).stream()
.map(t -> t.getPath().toString()).collect(Collectors.toList());
}

@Override
public List<BulkRequestTarget> nextReady(String rid, FileType type, Integer limit)
throws BulkStorageException {
Expand Down
Expand Up @@ -309,6 +309,7 @@
<property name="namespace" ref="pnfs-manager-stub"/>
<property name="submissionHandler" ref="request-handler"/>
<property name="requestManager" ref="request-manager"/>
<property name="targetStore" ref="target-store"/>
</bean>

<bean id="admin-commands" class="org.dcache.services.bulk.BulkServiceCommands">
Expand Down

0 comments on commit 4d8d361

Please sign in to comment.