Skip to content

Commit

Permalink
dcache-bulk: split arguments out into a separate table
Browse files Browse the repository at this point in the history
Motivation:

The `bulk_request` table stores the request arguments
as a text string.  Originally, these arguments were
conceived of as globally applicable to the entire
request, but with the advent of the `TAPE STAGE`
json schema, they had to be adapated to accomodate
arguments mapped on a per-target basis.  This
means that the text field for a single request
could be quite sizeable if the number of targets
is large.

That situation has a two drawbacks.
The first is that normal listing of requests
is slowed down as the request table grows.
The second is that inspection via the (postgres)
interpreter is difficult in that lots of whitespace
is dumped to stdout for each request with
a large text field.

One remedy for this might be to normalize
`arguments` into a table `(rid, tid, name, value)`
with each argument for the target being
mapped to its numerical id via a join using
the given path string as identifier.  I
have tested this and it works, but severely
handicaps initial throughput because of the
processing of the request on insert.

A compromise which offers improvement
over the current situation is simply to
break out the argument text into a separate
table (rid, arguments) with no further
pre-processing.  While this is not Third Normal
Form, it does allow for the bulk_request table
to remain more stable in size.  The arguments
are only necessary for internal processing
and are never relevant for external querying
(for lists, request info, etc.), so the
two situations can be separated for efficiency.

Modfication:

Add the `request_arguments (rid, arguments)`
table; insert the text string there instead
of into `bulk_request`, and drop its arguments
column.  For queries originating from the
outside (REST or admin), don't join on
the arguments; for loading/caching and
finding queued requests, we use a LEFT OUTER JOIN
so that a `null` value for the arguments
(should there be none) can be returned.

The liquibase changeset takes the legacy
text and merely moves it over to the new table
before dropping the original column.

Result:

Improved query performance; the RDBMS (postgres)
interpreter inspection of the `bulk_request`
table is also friendlier.

Target: master
Requires-notes: no (unreleased)
Patch: https://rb.dcache.org/r/13889/
Acked-by: Tigran
  • Loading branch information
alrossi committed Feb 14, 2023
1 parent 98dc921 commit 45f2ae2
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 34 deletions.
Expand Up @@ -208,8 +208,7 @@ public <T> List<T> get(String sql, List args, int limit, JdbcDaoSupport support,
LOGGER.trace("get {}, {}, limit {}.", sql, args, limit);
JdbcTemplate template = support.getJdbcTemplate();
template.setFetchSize(fetchSize);
args.add(limit);
return template.query(sql, args.toArray(Object[]::new), mapper);
return template.query(sql + " LIMIT " + limit, args.toArray(Object[]::new), mapper);
}

public Optional<KeyHolder> insert(JdbcUpdate update, String tableName, JdbcDaoSupport support) {
Expand Down Expand Up @@ -263,7 +262,7 @@ public int update(JdbcCriterion criterion, JdbcUpdate update, String tableName,
concatArguments(update.getArguments(), criterion.getArguments()));
}

private Optional<KeyHolder> insert(String sql, Collection<Object> arguments,
public Optional<KeyHolder> insert(String sql, Collection<Object> arguments,
JdbcDaoSupport support) {
KeyHolder keyHolder = new GeneratedKeyHolder();

Expand Down
Expand Up @@ -68,6 +68,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.dcache.services.bulk.BulkRequest;
import org.dcache.services.bulk.BulkRequest.Depth;
import org.dcache.services.bulk.BulkRequestInfo;
Expand All @@ -92,11 +93,23 @@ public final class JdbcBulkRequestDao extends JdbcDaoSupport {

private static final Logger LOGGER = LoggerFactory.getLogger(JdbcBulkRequestDao.class);

private static final String ARGUMENTS_TABLE_NAME = "request_arguments";

private static final String INSERT_ARGS =
"INSERT INTO " + ARGUMENTS_TABLE_NAME + " VALUES (?, ?)";

private static final String SELECT = "SELECT bulk_request.*";

private static final String JOINED_TABLE_NAMES =
private static final String FULL_SELECT =
"SELECT " + TABLE_NAME + ".*, " + ARGUMENTS_TABLE_NAME + ".arguments as arguments";

private static final String JOINED_WITH_TARGET_TABLE_NAMES =
TABLE_NAME + ", " + JdbcRequestTargetDao.TABLE_NAME;

private static final String JOINED_WITH_ARGUMENTS_TABLE_NAMES =
TABLE_NAME + " LEFT OUTER JOIN " + ARGUMENTS_TABLE_NAME
+ " ON bulk_request.id = request_arguments.rid";

/**
* Update queries which permit the avoidance of fetch-and-set semantics requiring in-memory
* synchronization.
Expand Down Expand Up @@ -126,17 +139,35 @@ public int delete(JdbcBulkRequestCriterion criterion) {
return utils.delete(criterion, TABLE_NAME, this);
}

public List<BulkRequest> get(JdbcBulkRequestCriterion criterion, int limit) {
public List<BulkRequest> get(JdbcBulkRequestCriterion criterion, int limit,
boolean includeArgs) {
if (criterion.isJoined()) {
return utils.get(SELECT, criterion, limit, JOINED_TABLE_NAMES, this, this::toRequest);
return utils.get(SELECT, criterion, limit, JOINED_WITH_TARGET_TABLE_NAMES, this,
this::toRequest);
}

if (includeArgs) {
return utils.get(FULL_SELECT, criterion, limit, JOINED_WITH_ARGUMENTS_TABLE_NAMES, this,
this::toFullRequest);
} else {
return utils.get(criterion, limit, TABLE_NAME, this, this::toRequest);
}
return utils.get(criterion, limit, TABLE_NAME, this, this::toRequest);
}

public Optional<KeyHolder> insert(JdbcBulkRequestUpdate update) {
return utils.insert(update, TABLE_NAME, this);
}

public void insertArguments(BulkRequest request) {
Map<String, String> arguments = request.getArguments();
if (arguments != null && !arguments.isEmpty()) {
String argumentsText = arguments.entrySet().stream()
.map(e -> e.getKey() + ":" + e.getValue())
.collect(Collectors.joining(","));
utils.insert(INSERT_ARGS, List.of(request.getId(), argumentsText), this);
}
}

public JdbcBulkRequestUpdate set() {
return new JdbcBulkRequestUpdate(utils);
}
Expand All @@ -148,7 +179,7 @@ public void setUtils(JdbcBulkDaoUtils utils) {

/**
* Based on the ResultSet returned by the query, construct a BulkRequest object for a given
* request.
* request. Does not include arguments.
*
* @param rs from the query.
* @param row unused, but needs to be there to satisfy the template function signature.
Expand All @@ -168,16 +199,6 @@ public BulkRequest toRequest(ResultSet rs, int row) throws SQLException {
request.setClearOnFailure(rs.getBoolean("clear_on_failure"));
request.setCancelOnFailure(rs.getBoolean("cancel_on_failure"));
request.setPrestore(rs.getBoolean("prestore"));
String args = rs.getString("arguments");
if (Strings.emptyToNull(args) != null) {
JSONObject argObj = new JSONObject("{" + args + "}");
Map<String, String> arguments = new HashMap<>();
for (Iterator<String> keys = argObj.keys(); keys.hasNext();) {
String key = keys.next();
arguments.put(key, String.valueOf(argObj.get(key)));
}
request.setArguments(arguments);
}
BulkRequestStatusInfo statusInfo = new BulkRequestStatusInfo();
statusInfo.setUser(rs.getString("owner"));
statusInfo.setCreatedAt(rs.getTimestamp("arrived_at").getTime());
Expand All @@ -196,6 +217,29 @@ public BulkRequest toRequest(ResultSet rs, int row) throws SQLException {
return request;
}

/**
* Based on the ResultSet returned by the query, construct a BulkRequest object for a given
* request. Includes arguments.
*
* @param rs from the query.
* @param row unused, but needs to be there to satisfy the template function signature.
* @return request wrapper object.
* @throws SQLException if access to the ResultSet fails or there is a deserialization error.
*/
public BulkRequest toFullRequest(ResultSet rs, int row) throws SQLException {
BulkRequest request = toRequest(rs, row);
String args = rs.getString("arguments");
if (Strings.emptyToNull(args) != null) {
JSONObject argObj = new JSONObject("{" + args + "}");
Map<String, String> arguments = new HashMap<>();
for (Iterator<String> keys = argObj.keys(); keys.hasNext(); ) {
String key = keys.next();
arguments.put(key, String.valueOf(argObj.get(key)));
}
request.setArguments(arguments);
}
return request;
}
/**
* Based on the ResultSet returned by the query, construct a BulkRequestInfo for a given
* request.
Expand Down Expand Up @@ -245,7 +289,7 @@ public int updateTo(BulkRequestStatus status, String uuid) {

public JdbcBulkRequestUpdate updateFrom(BulkRequest request, String user)
throws BulkStorageException {
return set().activity(request.getActivity()).arguments(request.getArguments())
return set().activity(request.getActivity())
.cancelOnFailure(request.isCancelOnFailure()).uid(request.getUid())
.clearOnSuccess(request.isClearOnSuccess()).clearOnFailure(request.isClearOnFailure())
.prestore(request.isPrestore())
Expand Down
Expand Up @@ -133,7 +133,7 @@ class RequestLoader extends CacheLoader<String, Optional<BulkRequest>> {

@Override
public Optional<BulkRequest> load(String uid) throws Exception {
List<BulkRequest> list = requestDao.get(requestDao.where().uids(uid), 1);
List<BulkRequest> list = requestDao.get(requestDao.where().uids(uid), 1, true);
if (list.isEmpty()) {
return Optional.empty();
}
Expand Down Expand Up @@ -287,7 +287,7 @@ public Collection<BulkRequest> find(Optional<BulkRequestFilter> requestFilter, I
limit = limit == null ? Integer.MAX_VALUE : limit;
BulkRequestFilter rfilter = requestFilter.orElse(null);
return requestDao.get(
requestDao.where().filter(rfilter).sorter("id"), limit).stream()
requestDao.where().filter(rfilter).sorter("id"), limit, false).stream()
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -339,7 +339,7 @@ public List<BulkRequestSummary> getRequestSummaries(Set<BulkRequestStatus> statu

List<BulkRequest> requests = requestDao.get(
requestDao.where().sorter("bulk_request.id").id(id).pnfsId(pnfsId).status(status)
.user(users), FETCH_SIZE);
.user(users), FETCH_SIZE, false);

List<BulkRequestSummary> summaries = new ArrayList<>();

Expand Down Expand Up @@ -466,7 +466,7 @@ public List<BulkRequest> next(Optional<String> sortedBy, Optional<Boolean> rever
LOGGER.trace("next {}.", limit);
return requestDao.get(
requestDao.where().status(QUEUED).sorter(sortedBy.orElse("arrived_at"))
.reverse(reverse.orElse(false)), (int) limit).stream()
.reverse(reverse.orElse(false)), (int) limit, true).stream()
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -566,6 +566,8 @@ public void store(Subject subject, Restriction restriction, BulkRequest request)
requestPermissionsDao.set().permId(request.getId()).subject(subject)
.restriction(restriction));

requestDao.insertArguments(request);

requestTargetDao.insertInitialTargets(request);
} catch (BulkStorageException e) {
throw new BulkStorageException("store failed for " + request.getUid(), e);
Expand Down
Expand Up @@ -61,8 +61,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING

import com.google.common.base.Strings;
import java.sql.Timestamp;
import java.util.Map;
import java.util.stream.Collectors;
import javax.security.auth.Subject;
import org.dcache.auth.attributes.Restriction;
import org.dcache.db.JdbcUpdate;
Expand All @@ -72,7 +70,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.services.bulk.store.jdbc.JdbcBulkDaoUtils;

/**
* Implementation of the update class for the bulk request table.
* Implementation of the update class for the bulk request table.
*/
public final class JdbcBulkRequestUpdate extends JdbcUpdate {

Expand Down Expand Up @@ -132,6 +130,7 @@ public JdbcBulkRequestUpdate user(String user) {
}
return this;
}

public JdbcBulkRequestUpdate subject(Subject subject) throws BulkStorageException {
if (subject != null) {
set("subject", utils.serializeToBase64("subject", subject));
Expand Down Expand Up @@ -185,14 +184,6 @@ public JdbcBulkRequestUpdate delayClear(int delayClear) {
return this;
}

public JdbcBulkRequestUpdate arguments(Map<String, String> arguments) {
if (arguments != null && !arguments.isEmpty()) {
set("arguments", arguments.entrySet().stream().map(e -> e.getKey() + ":" + e.getValue())
.collect(Collectors.joining(",")));
}
return this;
}

public JdbcBulkRequestUpdate depth(Depth depth) {
if (depth != null) {
set("expand_directories", depth.name());
Expand Down
Expand Up @@ -459,4 +459,64 @@
<column name="id"/>
</createIndex>
</changeSet>

<!-- Maintaining a separate table for the text arguments presents the
advantage that, in the case of very large maps (as, for instance, with
STAGE requests), queries on the bulk_request table that do not need
to return the arguments run faster. The requests can also be more easily
inspected in the db interpreter. -->

<changeSet author="arossi" id="6.0">
<preConditions onFail="MARK_RAN">
<not>
<tableExists tableName="request_arguments"/>
</not>
</preConditions>

<createTable tableName="request_arguments">
<column name="rid" type="bigint">
<constraints nullable="false"/>
</column>
<column name="arguments" type="text">
<constraints nullable="true"/>
</column>
</createTable>

<createIndex tableName="request_arguments"
indexName="idx_request_arguments_rid">
<column name="rid"/>
</createIndex>

<addForeignKeyConstraint baseColumnNames="rid" baseTableName="request_arguments"
constraintName="request_arguments_rid_fkey"
deferrable="false"
initiallyDeferred="false"
onDelete="CASCADE"
onUpdate="CASCADE"
referencedColumnNames="id"
referencedTableName="bulk_request"/>
<rollback/>
</changeSet>

<!-- See above @5.2.6p -->
<changeSet author="arossi" id="6.1p" runInTransaction="false" dbms="postgresql">
<preConditions onFail="MARK_RAN">
<and>
<columnExists columnName="arguments" tableName="bulk_request"/>
</and>
</preConditions>
<sql splitStatements="false">
INSERT INTO request_arguments SELECT id, arguments FROM bulk_request;
</sql>
</changeSet>

<changeSet author="arossi" id="6.2">
<preConditions onFail="MARK_RAN">
<and>
<columnExists columnName="arguments" tableName="bulk_request"/>
</and>
</preConditions>
<dropColumn tableName="bulk_request" columnName="arguments"/>
</changeSet>

</databaseChangeLog>

0 comments on commit 45f2ae2

Please sign in to comment.