Skip to content

Commit

Permalink
dcache-bulk: implement periodic archiving of old completed requests (…
Browse files Browse the repository at this point in the history
…part 1)

Motivation:

While keeping the state of requests in an RDBMs has
certain advantages, the clear disadvantage is an
ever-growing set of tables which at some point
can impede performance.

Requests do have the option of being deleted
upon failure or completion, but clients cannot
usually be relied upon to utilize them.

Moreover, given the use of postgresql, deletion
can be time-consuming if done on large amounts
of data (autovacuuming).  It would be preferable
to allow sites to control the periodicity of
deletion/cleanup.

Modification:

This patch introduces an archiver which will
remove completed requests from the main tables
if their last modified timestamp exceeds a
certain window in the past.  The archiver
does not store the entire request in the
archival area, but rather preserves only
the original targets and configuation, plus
some counts as to number of targets successfully
completed or failed, and error types encountered.

The archive table is not meant to be directly
manipulated (admins will have to clean
it eventually via a direct delete query).

Two serialization methods have been
relocated.

The first of two patches provides the
storage layer changes and the archiver
itself.  The second patch will add
user-facing functionality.

Result:

Archiving of bulk requests is implemented.

Target: master
Patch: https://rb.dcache.org/r/14056
Requires-notes: yes
Requires-book: yes (with next patch)
Acked-by: Tigran
  • Loading branch information
alrossi committed Aug 14, 2023
1 parent a29d8f4 commit e007ede
Show file tree
Hide file tree
Showing 15 changed files with 1,252 additions and 67 deletions.
Expand Up @@ -68,6 +68,9 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import javax.security.auth.Subject;
import org.dcache.auth.Subjects;
import org.dcache.auth.attributes.Restriction;
import org.dcache.services.bulk.BulkArchivedRequestInfo;
import org.dcache.services.bulk.BulkArchivedSummaryFilter;
import org.dcache.services.bulk.BulkArchivedSummaryInfo;
import org.dcache.services.bulk.BulkPermissionDeniedException;
import org.dcache.services.bulk.BulkRequest;
import org.dcache.services.bulk.BulkRequestInfo;
Expand Down Expand Up @@ -103,8 +106,6 @@ static String uidGidKey(Subject subject) {
/**
* Does not throw exception, as this is an internal termination of the request.
* <p>
* Should not clear the request from store unless automatic clear is set.
*
* @param request which failed.
* @param exception possibly associated with the abort.
*/
Expand Down Expand Up @@ -170,6 +171,23 @@ Collection<BulkRequest> find(Optional<BulkRequestFilter> filter, Integer limit)
*/
ListMultimap<String, String> getActiveRequestsByUser() throws BulkStorageException;

/**
* @param subject of request user.
* @param uid unique id for request.
* @return the archived request info
*/
BulkArchivedRequestInfo getArchivedInfo(Subject subject, String uid)
throws BulkStorageException, BulkPermissionDeniedException;

/**
* @param subject of request user.
* @param filter for the query
* @return list of matching summary info objects
* @throws BulkStorageException
*/
List<BulkArchivedSummaryInfo> getArchivedSummaryInfo(Subject subject,
BulkArchivedSummaryFilter filter) throws BulkStorageException;

/**
* @param uid unique id for request.
* @return the key (sequence number) of the request.
Expand Down Expand Up @@ -205,7 +223,7 @@ List<BulkRequestSummary> getRequestSummaries(Set<BulkRequestStatus> status, Set<
* @param subject of request user.
* @param uid unique id for request.
* @param offset into the list of targets ordered by sequence number
* @return optional of the corresponding request status.
* @return the corresponding request status.
* @throws BulkStorageException, BulkPermissionDeniedException
*/
BulkRequestInfo getRequestInfo(Subject subject, String uid, long offset)
Expand Down
Expand Up @@ -59,17 +59,9 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
*/
package org.dcache.services.bulk.store.jdbc;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand All @@ -80,7 +72,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.stream.Stream;
import org.dcache.db.JdbcCriterion;
import org.dcache.db.JdbcUpdate;
import org.dcache.services.bulk.BulkStorageException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Required;
Expand Down Expand Up @@ -165,24 +156,6 @@ public int delete(JdbcCriterion criterion, String tableName, String secondaryTab
return support.getJdbcTemplate().update(sql, criterion.getArgumentsAsArray());
}

/**
* @throws SQLException in order to support the jdbc template API.
*/
public Object deserializeFromBase64(Long id, String field, String base64)
throws SQLException {
if (base64 == null) {
return null;
}
byte[] array = Base64.getDecoder().decode(base64);
ByteArrayInputStream bais = new ByteArrayInputStream(array);
try (ObjectInputStream istream = new ObjectInputStream(bais)) {
return istream.readObject();
} catch (IOException | ClassNotFoundException e) {
throw new SQLException("problem deserializing " + field + " for "
+ id, e);
}
}

public <T> List<T> get(String select, JdbcCriterion criterion, int limit, String tableName,
JdbcDaoSupport support, RowMapper<T> mapper) {
LOGGER.trace("get {}, {}, limit {}.", select, criterion, limit);
Expand Down Expand Up @@ -222,18 +195,6 @@ public <T> void insertBatch(List<T> targets, String sql,
support.getJdbcTemplate().batchUpdate(sql, targets, 100, setter);
}

public String serializeToBase64(String field, Serializable serializable)
throws BulkStorageException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ObjectOutputStream ostream = new ObjectOutputStream(baos)) {
ostream.writeObject(serializable);
} catch (IOException e) {
throw new BulkStorageException("problem serializing "
+ field, e);
}
return Base64.getEncoder().encodeToString(baos.toByteArray());
}

@Required
public void setFetchSize(int fetchSize) {
this.fetchSize = fetchSize;
Expand Down
@@ -0,0 +1,145 @@
/*
COPYRIGHT STATUS:
Dec 1st 2001, Fermi National Accelerator Laboratory (FNAL) documents and
software are sponsored by the U.S. Department of Energy under Contract No.
DE-AC02-76CH03000. Therefore, the U.S. Government retains a world-wide
non-exclusive, royalty-free license to publish or reproduce these documents
and software for U.S. Government purposes. All documents and software
available from this server are protected under the U.S. and Foreign
Copyright Laws, and FNAL reserves all rights.
Distribution of the software available from this server is free of
charge subject to the user following the terms of the Fermitools
Software Legal Information.
Redistribution and/or modification of the software shall be accompanied
by the Fermitools Software Legal Information (including the copyright
notice).
The user is asked to feed back problems, benefits, and/or suggestions
about the software to the Fermilab Software Providers.
Neither the name of Fermilab, the URA, nor the names of the contributors
may be used to endorse or promote products derived from this software
without specific prior written permission.
DISCLAIMER OF LIABILITY (BSD):
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL FERMILAB,
OR THE URA, OR THE U.S. DEPARTMENT of ENERGY, OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Liabilities of the Government:
This software is provided by URA, independent from its Prime Contract
with the U.S. Department of Energy. URA is acting independently from
the Government and in its own private capacity and is not acting on
behalf of the U.S. Government, nor as its contractor nor its agent.
Correspondingly, it is understood and agreed that the U.S. Government
has no connection to this software and in no manner whatsoever shall
be liable for nor assume any responsibility or obligation for any claim,
cost, or damages arising out of or resulting from the use of the software
available from this server.
Export Control:
All documents and software available from this server are subject to U.S.
export control laws. Anyone downloading information from this server is
obligated to secure any necessary Government licenses before exporting
documents or software obtained from this server.
*/
package org.dcache.services.bulk.store.jdbc.request;

import static org.dcache.services.bulk.util.BulkServiceStatistics.getTimestamp;

import java.sql.Timestamp;
import java.text.ParseException;
import java.util.Set;
import org.dcache.db.JdbcCriterion;
import org.dcache.services.bulk.BulkArchivedSummaryFilter;

/**
* Implementation of criterion class for querying the request archive table.
*/
public final class JdbcArchivedBulkRequestCriterion extends JdbcCriterion {

public JdbcArchivedBulkRequestCriterion() {
sorter = "last_modified";
}

public JdbcArchivedBulkRequestCriterion uids(String... uids) {
addOrClause("uid = ?", (Object[]) uids);
return this;
}

public JdbcArchivedBulkRequestCriterion owner(String... owner) {
addOrClause("owner = ?", (Object[]) owner);
return this;
}

public JdbcArchivedBulkRequestCriterion activity(String... activity) {
addOrClause("activity = ?", (Object[]) activity);
return this;
}

public JdbcArchivedBulkRequestCriterion modifiedBefore(Long lastModified) {
if (lastModified != null) {
addClause("last_modified <= ?", new Timestamp(lastModified));
}
return this;
}

public JdbcArchivedBulkRequestCriterion modifiedAfter(Long lastModified) {
if (lastModified != null) {
addClause("last_modified >= ?", new Timestamp(lastModified));
}
return this;
}

public JdbcArchivedBulkRequestCriterion status(String ... status) {
addOrClause("status = ?", (Object[]) status);
return this;
}

public JdbcArchivedBulkRequestCriterion fromFilter(BulkArchivedSummaryFilter filter)
throws ParseException {
activity(toArray(filter.getActvity()));
owner(toArray(filter.getOwner()));
status(toArray(filter.getStatus()));
modifiedBefore(getTimestamp(filter.getBefore()));
modifiedAfter(getTimestamp(filter.getAfter()));
return this;
}

public JdbcArchivedBulkRequestCriterion classifier(String classifier) {
this.classifier = classifier;
return this;
}

public JdbcArchivedBulkRequestCriterion sorter(String sorter) {
this.sorter = sorter;
return this;
}

public JdbcArchivedBulkRequestCriterion reverse(Boolean reverse) {
this.reverse = reverse;
return this;
}

private static String[] toArray(Set<String> set) {
if (set == null) {
return null;
}

return set.toArray(String[]::new);
}
}

0 comments on commit e007ede

Please sign in to comment.