Skip to content

Commit

Permalink
dcache-bulk: implement periodic archiving of old completed requests (…
Browse files Browse the repository at this point in the history
…part 2)

Motivation:

See https://rb.dcache.org/r/14056/
master@e007ede840012ab321022f0a57ff4b58a627ec61

Modification:

Adds message layer, admin commands `REST` API.

Result:

Archiving fully implemented.

Target: master
Depends-on: #14056
Patch:  https://rb.dcache.org/r/14057
Requires-notes: yes
Requires-book:  yes (included)
Acked-by: Tigran
  • Loading branch information
alrossi committed Aug 16, 2023
1 parent e36714d commit ef26f40
Show file tree
Hide file tree
Showing 9 changed files with 735 additions and 48 deletions.
179 changes: 179 additions & 0 deletions docs/TheBook/src/main/markdown/config-bulk.md
Expand Up @@ -188,3 +188,182 @@ is rescanned in that case, and only the stored targets which have
successfully completed or failed are skipped; hence, the entire
directory list is reconstituted on restart for any requests that
had not been completed before shutdown of the service.

## Archiving of completed requests

As of 9.2, the archiving of completed or cancelled requests is available.

The archiver is set by default to run every 6 hours, checking requests whose last
modified timestamp is more than 14 days in the past and which are in a terminal
state. For each of these requests, a summary info object is constructed which
records the original configuration and initial targets, with counts of the
number of successful and failed targets and types of exceptions encountered.
This summary is stored in a special table, and the original request is deleted
from the main tables.

The archiver is meant to provide a configurable way of relieving some of the
pressure on database querying with respect to active requests by allowing for
such periodic deletion. The relevant parameters can be initially controlled by
the two properties:

```
# ---- Distance in the past from current time serving as threshold for archiving
# older completed or cancelled requests. The archiver will archive and delete
# all requests older than this window.
#
bulk.limits.archiver-window=14
(one-of?MILLISECONDS|SECONDS|MINUTES|HOURS|DAYS)bulk.limits.archiver-window.unit=DAYS
# ---- How often the archiver should run.
#
bulk.limits.archiver-period=6
(one-of?MILLISECONDS|SECONDS|MINUTES|HOURS|DAYS)bulk.limits.archiver-period.unit=HOURS
```

but can also be changed dynamically in memory using the admin command:

```
[fndcatemp2] (bulk@bulkDomain) admin > \h archiver reset
NAME
archiver reset -- Change the archiver settings.
SYNOPSIS
archiver reset [-period=long]
[-periodUnit=milliseconds|seconds|minutes|hours|days]
[-window=long]
[-windowUnit=milliseconds|seconds|minutes|hours|days]
DESCRIPTION
Interrupts the archiver and resets the period and/or window.
OPTIONS
-period=long
how often to run the archiver.
-periodUnit=milliseconds|seconds|minutes|hours|days
unit for how often to run the archiver.
-window=long
how far into the past from now the cutoff should be;
requests whose last modification time is before this point
and have completed will be deleted.
-windowUnit=milliseconds|seconds|minutes|hours|days
unit for the cutoff window.
```

The archiver can also be forced to run using:

```
[fndcatemp2] (bulk@bulkDomain) admin > \h archiver run
NAME
archiver run -- Run the archiver now.
SYNOPSIS
archiver run
DESCRIPTION
Interrupts the archiver, schedules it to run now, and then
periodically, as before.
```

**Note that clearing the request either through the `REST` API or the admin command
will now similarly archive the request before deleting it from the active tables.**

The admin commands available for listing and retrieving archived requests are:

```
[fndcatemp2] (bulk@bulkDomain) admin > \h request archived ls
NAME
request archived ls -- List archived requests
SYNOPSIS
request archived ls [-activity=string[,string]...]...
[-after=yyyy/mm/dd-hh:mm:ss] [-before=yyyy/mm/dd-hh:mm:ss]
[-limit=integer] [-owner=string[,string]...]...
[-status=completed|cancelled[,completed|cancelled]...]...
DESCRIPTION
Display the uid, owner, last modified, activity and status for
the archived request.
OPTIONS
-activity=string[,string]...
The request activity.
-after=yyyy/mm/dd-hh:mm:ss
Select requests with last modified date-time after
date-time.
-before=yyyy/mm/dd-hh:mm:ss
Select requests with last modified date-time before
date-time.
-limit=integer
Return maximum of this many entries. Defaults to 5000.
-owner=string[,string]...
The owner of the request.
-status=completed|cancelled[,completed|cancelled]...
Status of the request.
```

whose output looks like this:

```
[fndcatemp2] (bulk@bulkDomain) admin > request archived ls
346b2a47-264e-47da-91ea-1dd4a9bdf6c3 | 0:0 | Sat Aug 05 14:28:43 CDT 2023 | LOG_TARGET | COMPLETED
8fd1d7cf-2153-4572-8629-f761f7b6a382 | 0:0 | Sat Aug 05 14:30:02 CDT 2023 | LOG_TARGET | COMPLETED
f5d885aa-e447-4d33-974b-5087e5abcf01 | 0:0 | Sat Aug 05 14:33:53 CDT 2023 | LOG_TARGET | COMPLETED
...
To see the full information for a given archived request, use:
```

```
[fndcatemp2] (bulk@bulkDomain) admin > \h request archived info
NAME
request archived info -- Get archived request info
SYNOPSIS
request archived info rid
DESCRIPTION
Display the JSON info for a single archived request.
ARGUMENTS
rid
The id of the request.
```

which returns a JSON output:

```
[fndcatemp2] (bulk@bulkDomain) admin > request archived info 95b93210-e0d1-4cdd-9877-5250bbad2415
{
"initialTargets" : [ "/pnfs/fs/usr/fermilab/users/arossi/any/205/data-2023073107031690805037697499944" ],
"errors" : {
"diskCacheV111.util.TimeoutCacheException" : 1
},
"uid" : "95b93210-e0d1-4cdd-9877-5250bbad2415",
"owner" : "0:0",
"activity" : "UPDATE_QOS",
"depth" : "NONE",
"arrivedAt" : 1691596463050,
"startedAt" : 1691596463271,
"lastModified" : 1691596523300,
"status" : "COMPLETED",
"targetPrefix" : null,
"succeeded" : 0,
"cancelled" : 0,
"failed" : 1
}
```

The corresponding `REST` API resources for these two commands are:

```
/bulk-requests/archived
List the status information for an individual bulk request matching the query parameters (if any).
/bulk-requests/archived/{id}
Get the information for a bulk request which has been archived.
```

As usual, listing is world-readable but `info` requires either ownership of the original request
or admin privileges.
Expand Up @@ -206,17 +206,17 @@ public Reply messageArrived(BulkRequestStatusMessage message) {

MessageReply<Message> reply = new MessageReply<>();
incomingExecutorService.execute(() -> {
Subject subject = message.getSubject();
String uuid = message.getRequestUuid();
long offset = message.getOffset();
try {
Subject subject = message.getSubject();
String uuid = message.getRequestUuid();
/*
* First check to see if the request corresponds to a stored one.
*/
requestStore.getKey(uuid);
checkRestrictions(message.getRestriction(), uuid);
matchActivity(message.getActivity(), uuid);
BulkRequestInfo status = requestStore.getRequestInfo(subject, uuid,
message.getOffset());
BulkRequestInfo status = requestStore.getRequestInfo(subject, uuid, offset);
message.setInfo(status);
reply.reply(message);
} catch (BulkServiceException e) {
Expand Down Expand Up @@ -296,6 +296,53 @@ public Reply messageArrived(BulkRequestClearMessage message) {
return reply;
}

public Reply messageArrived(BulkArchivedRequestInfoMessage message) {
LOGGER.trace("received BulkArchivedRequestInfoMessage {}", message);

MessageReply<Message> reply = new MessageReply<>();
incomingExecutorService.execute(() -> {
try {
String uuid = message.getRequestUuid();
Subject subject = message.getSubject();
message.setInfo(requestStore.getArchivedInfo(subject, uuid));
reply.reply(message);
} catch (BulkServiceException e) {
LOGGER.error("messageArrived(BulkArchivedRequestInfoMessage) {}: {}", message,
e.toString());
reply.fail(message, e);
} catch (Exception e) {
reply.fail(message, e);
Thread thisThread = Thread.currentThread();
thisThread.getUncaughtExceptionHandler().uncaughtException(thisThread, e);
}
});

return reply;
}

public Reply messageArrived(BulkArchivedSummaryInfoMessage message) {
LOGGER.trace("received BulkArchivedSummaryInfoMessage {}", message);

MessageReply<Message> reply = new MessageReply<>();
incomingExecutorService.execute(() -> {
try {
Subject subject = message.getSubject();
message.setInfo(requestStore.getArchivedSummaryInfo(subject, message.getFilter()));
reply.reply(message);
} catch (BulkServiceException e) {
LOGGER.error("messageArrived(BulkArchivedSummaryInfoMessage) {}: {}", message,
e.toString());
reply.fail(message, e);
} catch (Exception e) {
reply.fail(message, e);
Thread thisThread = Thread.currentThread();
thisThread.getUncaughtExceptionHandler().uncaughtException(thisThread, e);
}
});

return reply;
}

public synchronized int getMaxFlatTargets() {
return maxFlatTargets;
}
Expand Down

0 comments on commit ef26f40

Please sign in to comment.