Skip to content

Commit

Permalink
dcache-bulk: fix message thread invocation of database query for stat…
Browse files Browse the repository at this point in the history
…istics

Motivation:

The bulk admin interface provides an `info` command
for monitoring the current state of the service and
tracking request progress. There are currently two issues,
however, with the collection of these request and target
statistics.

1.  The request manager calls update after each sweep,
    which in turn calls requestStore.activeRequests.
2.  getInfo [CellInfoProvider] similarly calls activeRequests
    as well as targetStore.countsByState.

Both of these call sites thus will potentially block the executing
thread on a database connection.  When the respective tables fill up,
count and group-by count queries can in fact become very slow.

The issue reveals itself most clearly when `getInfo`
is periodically called via messaging (e.g., by the frontend or
httpd collectors); since the call is done on the message thread,
this potentially results in the blocking of further message
processing (including other admin commands) until the db queries
have completed.

Modification:

The problem has been solved by the following steps:

- Have the request manager only update the sweep time.
- Create a new table `counts_by_state` to store updated counts
  for both active requests and the states of a target.
- Have `getInfo` read its values from this table only.
- Offload the table updating onto a scheduled repeating task which
  runs the expensive queries once every so often (default = 2 minutes).

We have opted to store the values in a DB table so that the most
recent counts are immediately visible upon restart.  We have
avoided doing this using a trigger for reasons that are by
now well known regarding Postgres autovacuum.

Of course, issuing:

```
\s bulk request ls -count
\s bulk target ls -count
```

with or without filters (`-status` or `state`, etc., respectively)
will still take longer as the `bulk_request` and `request_target`
tables become large.  But at least the cell will not
have a very long ping/roundtrip time or be classified as
unavailable.

This is a substantial change, but I believe the problem
is important enough to warrant a back-port to 8.2.

Target:  master
Request: 8.2
Patch: https://rb.dcache.org/r/13766
Requires-notes: yes
Acked-by: Tigran
  • Loading branch information
alrossi committed Nov 11, 2022
1 parent a806be1 commit b6bbfcc
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 23 deletions.
Expand Up @@ -207,7 +207,7 @@ private void doRun() throws InterruptedException {
throw new InterruptedException();
}

updateJobStatistics(start);
statistics.sweepFinished(System.currentTimeMillis() - start);

LOGGER.trace("doRun() completed");
}
Expand Down Expand Up @@ -256,15 +256,6 @@ private void removeTerminatedRequests() {
}
}

private void updateJobStatistics(long start) {
try {
statistics.activeRequests(requestStore.countActive());
} catch (BulkStorageException e) {
LOGGER.error("problem finding number of active requests: {}.", e.toString());
}
statistics.sweepFinished(System.currentTimeMillis() - start);
}

private ListMultimap<String, String> userRequests() {
ListMultimap<String, String> requestsByUser;
try {
Expand Down
Expand Up @@ -81,6 +81,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.db.JdbcCriterion;
import org.dcache.db.JdbcUpdate;
import org.dcache.services.bulk.BulkStorageException;
import org.dcache.services.bulk.util.BulkRequestTarget;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Required;
Expand All @@ -99,6 +100,12 @@ public final class JdbcBulkDaoUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(JdbcBulkDaoUtils.class);

private static final String SELECT_COUNTS_BY_STATE =
"SELECT * FROM counts_by_state";

private static final String UPDATE_STATE_COUNT =
"UPDATE counts_by_state SET count = ? WHERE state = ?";

public static <T> Set<T> toSetOrNull(T[] array) {
return array == null ? null : Arrays.stream(array).collect(Collectors.toSet());
}
Expand All @@ -117,6 +124,16 @@ public int count(JdbcCriterion criterion, String tableName, JdbcDaoSupport suppo
.queryForObject(sql, criterion.getArgumentsAsArray(), Integer.class);
}

public Map<String, Long> countsByState(JdbcDaoSupport support) {
LOGGER.trace("countStates.");
SqlRowSet rowSet = support.getJdbcTemplate().queryForRowSet(SELECT_COUNTS_BY_STATE);
Map<String, Long> counts = new HashMap<>();
while (rowSet.next()) {
counts.put(rowSet.getString(1), rowSet.getLong(2));
}
return counts;
}

public Map<String, Long> countGrouped(JdbcCriterion criterion, String tableName,
JdbcDaoSupport support) {
LOGGER.trace("countGrouped {}.", criterion);
Expand Down Expand Up @@ -194,7 +211,8 @@ public Optional<KeyHolder> insert(JdbcUpdate update, String tableName, JdbcDaoSu
try {
support.getJdbcTemplate().update(
con -> {
PreparedStatement ps = con.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS);
PreparedStatement ps = con.prepareStatement(sql,
Statement.RETURN_GENERATED_KEYS);
Collection<Object> arguments = update.getArguments();

int i = 1;
Expand Down Expand Up @@ -240,4 +258,15 @@ public int update(JdbcCriterion criterion, JdbcUpdate update, String tableName,
return support.getJdbcTemplate().update(sql,
concatArguments(update.getArguments(), criterion.getArguments()));
}

public void updateCounts(int countActive, Map<String, Long> countsByState,
JdbcDaoSupport support) {
LOGGER.trace("updateCounts {} : {}.", countActive, countsByState);
support.getJdbcTemplate().update(UPDATE_STATE_COUNT, countActive, "ACTIVE");
Arrays.stream(BulkRequestTarget.State.values()).forEach(state -> {
Long count = countsByState.get(state);
support.getJdbcTemplate()
.update(String.format(UPDATE_STATE_COUNT, count == null ? 0L : count, state));
});
}
}
Expand Up @@ -83,6 +83,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.security.auth.Subject;
Expand All @@ -99,6 +100,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.services.bulk.BulkRequestTargetInfo;
import org.dcache.services.bulk.BulkStorageException;
import org.dcache.services.bulk.store.BulkRequestStore;
import org.dcache.services.bulk.store.jdbc.JdbcBulkDaoUtils;
import org.dcache.services.bulk.store.jdbc.rtarget.JdbcBulkTargetStore;
import org.dcache.services.bulk.store.jdbc.rtarget.JdbcRequestTargetDao;
import org.dcache.services.bulk.util.BulkRequestFilter;
Expand Down Expand Up @@ -137,6 +139,7 @@ public Optional<BulkRequest> load(String id) throws Exception {
private final Map<String, String> userOfActiveRequest = new HashMap<>();

private LoadingCache<String, Optional<BulkRequest>> requestCache;
private JdbcBulkDaoUtils utils;
private JdbcBulkRequestDao requestDao;
private JdbcBulkRequestPermissionsDao requestPermissionsDao;
private JdbcBulkTargetStore targetStore;
Expand All @@ -145,11 +148,19 @@ public Optional<BulkRequest> load(String id) throws Exception {
private TimeUnit expiryUnit;
private long capacity;

/**
* For updating counts to the counts table.
*/
private ScheduledExecutorService countUpdater;
private long updateInterval;
private TimeUnit updateIntervalUnit;

public void initialize() {
requestCache = CacheBuilder.newBuilder()
.expireAfterAccess(expiry, expiryUnit)
.maximumSize(capacity)
.build(new RequestLoader());
countUpdater.schedule(this::updateCounts, updateInterval, updateIntervalUnit);
}

@Override
Expand Down Expand Up @@ -499,11 +510,31 @@ public void setRequestTargetDao(JdbcRequestTargetDao requestTargetDao) {
this.requestTargetDao = requestTargetDao;
}

@Required
public void setCountUpdater(ScheduledExecutorService countUpdater) {
this.countUpdater = countUpdater;
}

@Required
public void setUpdateInterval(long updateInterval) {
this.updateInterval = updateInterval;
}

@Required
public void setUpdateIntervalUnit(TimeUnit updateIntervalUnit) {
this.updateIntervalUnit = updateIntervalUnit;
}

@Required
public void setRequestPermissionsDao(JdbcBulkRequestPermissionsDao requestPermissionsDao) {
this.requestPermissionsDao = requestPermissionsDao;
}

@Required
public void setUtils(JdbcBulkDaoUtils utils) {
this.utils = utils;
}

@Override
public void store(Subject subject, Restriction restriction, BulkRequest request)
throws BulkStorageException {
Expand Down Expand Up @@ -736,6 +767,16 @@ private BulkRequestTargetInfo toRequestTargetInfo(BulkRequestTarget target) {
return info;
}

private void updateCounts() {
try {
utils.updateCounts(countActive(), targetStore.countsByState(), requestDao);
} catch (BulkStorageException e) {
LOGGER.error("Problem updating counts by state: {}.", e.toString());
}

countUpdater.schedule(this::updateCounts, updateInterval, updateIntervalUnit);
}

private BulkRequest valid(String id) throws BulkStorageException {
BulkRequest stored = get(id);
if (stored == null) {
Expand Down
Expand Up @@ -68,7 +68,8 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.dcache.services.bulk.store.BulkTargetStore;
import org.dcache.services.bulk.store.jdbc.JdbcBulkDaoUtils;
import org.dcache.services.bulk.store.jdbc.request.JdbcBulkRequestDao;
import org.springframework.beans.factory.annotation.Required;

/**
Expand All @@ -90,15 +91,11 @@ public final class BulkServiceStatistics implements CellInfoProvider {
private final Map<String, AtomicLong> requestTypes = new TreeMap<>();
private final Map<String, AtomicLong> userRequests = new TreeMap<>();

private BulkTargetStore targetStore;
private JdbcBulkDaoUtils utils;
private JdbcBulkRequestDao requestDao;

private long lastSweep = started.getTime();
private long lastSweepDuration = 0;
private int activeRequests = 0;

public void activeRequests(int count) {
activeRequests = count;
}

public void addUserRequest(String user) {
AtomicLong counter = userRequests.get(user);
Expand Down Expand Up @@ -126,10 +123,13 @@ public void getInfo(PrintWriter pw) {
TimeUnit.MILLISECONDS.toSeconds(lastSweepDuration)));
pw.println();

Map<String, Long> counts = targetStore.countsByState();
pw.println("------------------ TARGETS BY STATE ------------------");
Map<String, Long> counts = utils.countsByState(requestDao);
Long active = counts.remove("ACTIVE");

counts.entrySet()
.forEach(e -> pw.println(String.format(STATS_FORMAT, e.getKey(), e.getValue())));

long aborted = jobsAborted.get();
if (aborted > 0) {
pw.println(String.format(STATS_FORMAT, "ABORTED", aborted));
Expand All @@ -142,7 +142,7 @@ public void getInfo(PrintWriter pw) {
pw.println(String.format(STATS_FORMAT, "Requests received", received));
pw.println(String.format(STATS_FORMAT, "Requests completed", requestsCompleted.get()));
pw.println(String.format(STATS_FORMAT, "Requests cancelled", requestsCancelled.get()));
pw.println(String.format(STATS_FORMAT, "Active requests", activeRequests));
pw.println(String.format(STATS_FORMAT, "Active requests", active == null ? 0 : active));
pw.println();

pw.println("--------------- REQUESTS (since start) ---------------");
Expand Down Expand Up @@ -187,7 +187,12 @@ public void sweepFinished(long duration) {
}

@Required
public void setTargetStore(BulkTargetStore targetStore) {
this.targetStore = targetStore;
public void setRequestDao(JdbcBulkRequestDao requestDao) {
this.requestDao = requestDao;
}

@Required
public void setDaoUtils(JdbcBulkDaoUtils utils) {
this.utils = utils;
}
}
Expand Up @@ -176,6 +176,14 @@
<property name="expiry" value="${bulk.limits.request-cache-expiration}"/>
<property name="expiryUnit" value="${bulk.limits.request-cache-expiration.unit}"/>
<property name="capacity" value="${bulk.limits.container-processing-threads}"/>
<property name="countUpdater">
<bean class="java.util.concurrent.ScheduledThreadPoolExecutor">
<constructor-arg value="1"/>
</bean>
</property>
<property name="updateInterval" value="${bulk.limits.counts-update-interval}"/>
<property name="updateIntervalUnit" value="${bulk.limits.counts-update-interval.unit}"/>
<property name="utils" ref="bulk-jdbc-dao-utils"/>
</bean>

<bean id="target-store" class="org.dcache.services.bulk.store.jdbc.rtarget.JdbcBulkTargetStore">
Expand Down Expand Up @@ -254,7 +262,8 @@

<bean id="statistics" class="org.dcache.services.bulk.util.BulkServiceStatistics">
<description>Tracks request and target states (counts), sweeper state, etc.</description>
<property name="targetStore" ref="target-store"/>
<property name="requestDao" ref="bulk-request-dao"/>
<property name="daoUtils" ref="bulk-jdbc-dao-utils"/>
</bean>

<bean id="request-handler" class="org.dcache.services.bulk.handler.BulkRequestHandler">
Expand Down
Expand Up @@ -322,4 +322,55 @@
CREATE INDEX CONCURRENTLY idx_request_prestore on bulk_request(prestore);
</sql>
</changeSet>

<changeSet author="arossi" id="3.0">
<preConditions onFail="MARK_RAN">
<not>
<tableExists tableName="counts_by_state"/>
</not>
</preConditions>

<createTable tableName="counts_by_state">
<column name="state" type="varchar(16)">
<constraints nullable="false" primaryKey="true" primaryKeyName="idx_counts_state_id"/>
</column>
<column name="count" type="bigint">
<constraints nullable="false"/>
</column>
</createTable>

<insert tableName="counts_by_state">
<column name="state" value="ACTIVE"/>
<column name="count" value="0"/>
</insert>
<insert tableName="counts_by_state">
<column name="state" value="CREATED"/>
<column name="count" value="0"/>
</insert>
<insert tableName="counts_by_state">
<column name="state" value="READY"/>
<column name="count" value="0"/>
</insert>
<insert tableName="counts_by_state">
<column name="state" value="RUNNING"/>
<column name="count" value="0"/>
</insert>
<insert tableName="counts_by_state">
<column name="state" value="CANCELLED"/>
<column name="count" value="0"/>
</insert>
<insert tableName="counts_by_state">
<column name="state" value="COMPLETED"/>
<column name="count" value="0"/>
</insert>
<insert tableName="counts_by_state">
<column name="state" value="FAILED"/>
<column name="count" value="0"/>
</insert>
<insert tableName="counts_by_state">
<column name="state" value="SKIPPED"/>
<column name="count" value="0"/>
</insert>
<rollback/>
</changeSet>
</databaseChangeLog>
6 changes: 6 additions & 0 deletions skel/share/defaults/bulk.properties
Expand Up @@ -99,6 +99,12 @@ bulk.limits.max.targets-per-recursive-request=10
bulk.limits.sweep-interval=5
(one-of?MILLISECONDS|SECONDS|MINUTES)bulk.limits.sweep-interval.unit=SECONDS

# ---- Interval between sweeps to update the request and target counts.
# Calls the underlying stores (database tables).
#
bulk.limits.counts-update-interval=1
(one-of?MILLISECONDS|SECONDS|MINUTES)bulk.limits.counts-update-interval.unit=MINUTES

# ---- Endpoint for contacting pnfs manager.
#
bulk.service.pnfsmanager=${dcache.service.pnfsmanager}
Expand Down
2 changes: 2 additions & 0 deletions skel/share/services/bulk.batch
Expand Up @@ -19,6 +19,8 @@ check -strong bulk.limits.max.targets-per-shallow-request
check -strong bulk.limits.max.targets-per-recursive-request
check -strong bulk.limits.sweep-interval
check -strong bulk.limits.sweep-interval.unit
check -strong bulk.limits.counts-update-interval
check -strong bulk.limits.counts-update-interval.unit
check -strong bulk.service.pnfsmanager
check -strong bulk.service.pnfsmanager.timeout
check -strong bulk.service.pnfsmanager.timeout.unit
Expand Down

0 comments on commit b6bbfcc

Please sign in to comment.