Skip to content

Commit

Permalink
Merge pull request #4286 from garlick/purge
Browse files Browse the repository at this point in the history
job-manager: add inactive job purge capability
  • Loading branch information
mergify[bot] committed Apr 18, 2022
2 parents 77f1708 + 3cb8819 commit f800288
Show file tree
Hide file tree
Showing 25 changed files with 953 additions and 23 deletions.
21 changes: 21 additions & 0 deletions doc/man1/flux-job.rst
Expand Up @@ -20,6 +20,8 @@ SYNOPSIS

**flux** **job** **raiseall** [*OPTIONS*] *type* [*message...*]

**flux** **job** **purge** [*OPTIONS*]

DESCRIPTION
===========

Expand Down Expand Up @@ -90,6 +92,25 @@ type (positional argument) and accepts the following options:
**-f, --force**
Confirm the command.

PURGE
=====

Inactive job data may be purged from the Flux instance with ``flux job purge``.
The following options may be used to add selection criteria:

**--age-limit=FSD**
Purge inactive jobs older than the specified Flux Standard Duration.

**--num-limit=COUNT**
Purge the oldest inactive jobs until there are at most COUNT left.

**-f, --force**
Confirm the command.

Inactive jobs may also be purged automatically if the job manager is
configured as described in :man5:`flux-config-job-manager`.


RESOURCES
=========

Expand Down
13 changes: 13 additions & 0 deletions doc/man5/flux-config-job-manager.rst
Expand Up @@ -18,6 +18,16 @@ journal-size-limit
be retained in the in-memory journal used to answer queries. The default
is 1000.

inactive-age-limit
(optional) String (in RFC 23 Flux Standard Duration format) that specifies
the maximum age of inactive jobs retained in the KVS. The age is computed
since the job became inactive. Once a job is removed from the KVS, its job
data is only available via the job-archive, if configured. Inactive jobs
can also be manually purged with :man1:`flux-job` ``purge``.

inactive-num-limit
(optional) Integer maximum number of inactive jobs retained in the KVS.

plugins
(optional) An array of objects defining a list of jobtap plugin directives.
Each directive follows the format defined in the :ref:`plugin_directive`
Expand Down Expand Up @@ -55,6 +65,9 @@ EXAMPLE

journal-size-limit = 10000

inactive-age-limit = "7d"
inactive-num-limit = 10000

plugins = [
{
load = "priority-custom.so",
Expand Down
1 change: 1 addition & 0 deletions doc/test/spell.en.pws
Expand Up @@ -622,3 +622,4 @@ ustar
xz
validator
statedir
num
80 changes: 80 additions & 0 deletions src/cmd/flux-job.c
Expand Up @@ -91,6 +91,7 @@ int cmd_info (optparse_t *p, int argc, char **argv);
int cmd_stats (optparse_t *p, int argc, char **argv);
int cmd_wait (optparse_t *p, int argc, char **argv);
int cmd_memo (optparse_t *p, int argc, char **argv);
int cmd_purge (optparse_t *p, int argc, char **argv);

int stdin_flags;

Expand Down Expand Up @@ -333,6 +334,22 @@ static struct optparse_option memo_opts[] = {
OPTPARSE_TABLE_END
};

static struct optparse_option purge_opts[] = {
{ .name = "age-limit", .has_arg = 1, .arginfo = "FSD",
.usage = "Purge jobs that became inactive beyond age-limit.",
},
{ .name = "num-limit", .has_arg = 1, .arginfo = "COUNT",
.usage = "Purge oldest inactive jobs until COUNT are left.",
},
{ .name = "force", .key = 'f', .has_arg = 0,
.usage = "Perform the irreversible purge.",
},
{ .name = "batch", .has_arg = 1, .arginfo = "COUNT",
.usage = "Limit number of jobs per request (default 50).",
},
OPTPARSE_TABLE_END
};

static struct optparse_subcommand subcommands[] = {
{ "list",
"[OPTIONS]",
Expand Down Expand Up @@ -482,6 +499,13 @@ static struct optparse_subcommand subcommands[] = {
0,
memo_opts,
},
{ "purge",
"[--age-limit=FSD] [--count-limit=N]",
"Purge the oldest inactive jobs",
cmd_purge,
0,
purge_opts,
},
OPTPARSE_SUBCMD_END
};

Expand Down Expand Up @@ -3069,6 +3093,62 @@ int cmd_memo (optparse_t *p, int argc, char **argv)
return (0);
}

int cmd_purge (optparse_t *p, int argc, char **argv)
{
int optindex = optparse_option_index (p);
flux_t *h;
int rc = 0;
double age_limit = optparse_get_duration (p, "age-limit", -1.);
int num_limit = optparse_get_int (p, "num-limit", -1);
int batch = optparse_get_int (p, "batch", 50);
int force = 0;
int count;
int total = 0;
int inactives;
flux_future_t *f;

if ((argc - optindex) > 0) {
optparse_print_usage (p);
exit (1);
}
if (optparse_hasopt (p, "force"))
force = 1;
if (!(h = flux_open (NULL, 0)))
log_err_exit ("flux_open");

do {
if (!(f = flux_rpc_pack (h,
"job-manager.purge",
0,
0,
"{s:f s:i s:i s:b}",
"age_limit", age_limit,
"num_limit", num_limit,
"batch", batch,
"force", force))
|| flux_rpc_get_unpack (f, "{s:i}", "count", &count) < 0)
log_msg_exit ("purge: %s", future_strerror (f, errno));
total += count;
flux_future_destroy (f);
} while (force && count == batch);

if (!(f = flux_rpc (h, "job-manager.stats.get", NULL, 0, 0))
|| flux_rpc_get_unpack (f, "{s:i}", "inactive_jobs", &inactives) < 0)
log_err_exit ("purge: failed to fetch inactive job count");
flux_future_destroy (f);

if (force)
printf ("purged %d inactive jobs, %d remaining\n", total, inactives);
else {
printf ("use --force to purge %d of %d inactive jobs\n",
total,
inactives);
}

flux_close (h);

return rc;
}

/*
* vi:tabstop=4 shiftwidth=4 expandtab
Expand Down
35 changes: 35 additions & 0 deletions src/modules/job-list/job-list.c
Expand Up @@ -64,6 +64,34 @@ static void job_stats_cb (flux_t *h, flux_msg_handler_t *mh,
flux_log_error (h, "%s: flux_respond_error", __FUNCTION__);
}

static void purge_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
struct list_ctx *ctx = arg;
json_t *jobs;
size_t index;
json_t *entry;
int count = 0;

if (flux_event_unpack (msg, NULL, "{s:o}", "jobs", &jobs) < 0)
flux_log_error (h, "job-purge-inactive message");
json_array_foreach (jobs, index, entry) {
flux_jobid_t id = json_integer_value (entry);
struct job *job;

if ((job = zhashx_lookup (ctx->jsctx->index, &id))) {
job_stats_purge (&ctx->jsctx->stats, job);
if (job->list_handle)
zlistx_delete (ctx->jsctx->inactive, job->list_handle);
zhashx_delete (ctx->jsctx->index, &id);
count++;
}
}
flux_log (h, LOG_DEBUG, "purged %d inactive jobs", count);
}

static const struct flux_msg_handler_spec htab[] = {
{ .typemask = FLUX_MSGTYPE_REQUEST,
.topic_glob = "job-list.list",
Expand Down Expand Up @@ -105,6 +133,11 @@ static const struct flux_msg_handler_spec htab[] = {
.cb = stats_cb,
.rolemask = 0
},
{ .typemask = FLUX_MSGTYPE_EVENT,
.topic_glob = "job-purge-inactive",
.cb = purge_cb,
.rolemask = 0
},
FLUX_MSGHANDLER_TABLE_END,
};

Expand All @@ -128,6 +161,8 @@ static struct list_ctx *list_ctx_create (flux_t *h)
if (!ctx)
return NULL;
ctx->h = h;
if (flux_event_subscribe (h, "job-purge-inactive") < 0)
goto error;
if (flux_msg_handler_addvec (h, htab, ctx, &ctx->handlers) < 0)
goto error;
if (!(ctx->jsctx = job_state_create (ctx)))
Expand Down
19 changes: 19 additions & 0 deletions src/modules/job-list/stats.c
Expand Up @@ -59,6 +59,25 @@ void job_stats_update (struct job_stats *stats,
}
}

/* An inactive job is being purged, so statistics must be updated.
*/
void job_stats_purge (struct job_stats *stats, struct job *job)
{
assert (job->state == FLUX_JOB_STATE_INACTIVE);

stats->state_count[state_index(job->state)]--;

if (!job->success) {
stats->failed--;
if (job->exception_occurred) {
if (strcmp (job->exception_type, "cancel") == 0)
stats->canceled--;
else if (strcmp (job->exception_type, "timeout") == 0)
stats->timeout--;
}
}
}

static int object_set_integer (json_t *o,
const char *key,
unsigned int n)
Expand Down
2 changes: 2 additions & 0 deletions src/modules/job-list/stats.h
Expand Up @@ -28,6 +28,8 @@ void job_stats_update (struct job_stats *stats,
struct job *job,
flux_job_state_t newstate);

void job_stats_purge (struct job_stats *stats, struct job *job);

json_t * job_stats_encode (struct job_stats *stats);

#endif /* ! _FLUX_JOB_LIST_JOB_STATS_H */
Expand Down
2 changes: 2 additions & 0 deletions src/modules/job-manager/Makefile.am
Expand Up @@ -52,6 +52,8 @@ libjob_manager_la_SOURCES = \
start.c \
list.h \
list.c \
purge.h \
purge.c \
urgency.h \
urgency.c \
annotate.h \
Expand Down
6 changes: 3 additions & 3 deletions src/modules/job-manager/alloc.c
Expand Up @@ -649,7 +649,7 @@ int alloc_queue_recalc_pending (struct alloc *alloc)
while (alloc->alloc_limit
&& head
&& tail) {
if (job_comparator (head, tail) < 0) {
if (job_priority_comparator (head, tail) < 0) {
if (alloc_cancel_alloc_request (alloc, tail) < 0) {
flux_log_error (alloc->ctx->h, "%s: alloc_cancel_alloc_request",
__FUNCTION__);
Expand Down Expand Up @@ -845,13 +845,13 @@ struct alloc *alloc_ctx_create (struct job_manager *ctx)
if (!(alloc->queue = zlistx_new()))
goto error;
zlistx_set_destructor (alloc->queue, job_destructor);
zlistx_set_comparator (alloc->queue, job_comparator);
zlistx_set_comparator (alloc->queue, job_priority_comparator);
zlistx_set_duplicator (alloc->queue, job_duplicator);

if (!(alloc->pending_jobs = zlistx_new()))
goto error;
zlistx_set_destructor (alloc->pending_jobs, job_destructor);
zlistx_set_comparator (alloc->pending_jobs, job_comparator);
zlistx_set_comparator (alloc->pending_jobs, job_priority_comparator);
zlistx_set_duplicator (alloc->pending_jobs, job_duplicator);

if (flux_msg_handler_addvec (ctx->h, htab, ctx, &alloc->handlers) < 0)
Expand Down
5 changes: 4 additions & 1 deletion src/modules/job-manager/annotate.c
Expand Up @@ -149,7 +149,10 @@ void annotate_memo_request (flux_t *h,
|| flux_msg_get_cred (msg, &cred) < 0)
goto error;
if (!(job = zhashx_lookup (ctx->active_jobs, &id))) {
errstr = "unknown job id";
if (!(job = zhashx_lookup (ctx->inactive_jobs, &id)))
errstr = "unknown job id";
else
errstr = "job is inactive";
errno = ENOENT;
goto error;
}
Expand Down
8 changes: 8 additions & 0 deletions src/modules/job-manager/event.c
Expand Up @@ -51,6 +51,7 @@
#include "wait.h"
#include "prioritize.h"
#include "annotate.h"
#include "purge.h"
#include "jobtap-internal.h"

#include "event.h"
Expand Down Expand Up @@ -392,6 +393,12 @@ int event_job_action (struct event *event, struct job *job)
case FLUX_JOB_STATE_INACTIVE:
if ((job->flags & FLUX_JOB_WAITABLE))
wait_notify_inactive (ctx->wait, job);
if (zhashx_insert (ctx->inactive_jobs, &job->id, job) < 0
|| purge_enqueue_job (ctx->purge, job) < 0) {
flux_log_error (event->ctx->h,
"%ju: error preserving inactive job",
(uintmax_t) job->id);
}
zhashx_delete (ctx->active_jobs, &job->id);
drain_check (ctx->drain);
break;
Expand Down Expand Up @@ -665,6 +672,7 @@ int event_job_update (struct job *job, json_t *event)
if (job->state != FLUX_JOB_STATE_CLEANUP)
goto inval;
job->state = FLUX_JOB_STATE_INACTIVE;
job->t_clean = timestamp;
}
else if (!strncmp (name, "prolog-", 7)) {
if (job->start_pending)
Expand Down
3 changes: 2 additions & 1 deletion src/modules/job-manager/getattr.c
Expand Up @@ -96,7 +96,8 @@ void getattr_handle_request (flux_t *h,
"attrs", &attrs) < 0
|| flux_msg_get_cred (msg, &cred) < 0)
goto error;
if (!(job = zhashx_lookup (ctx->active_jobs, &id))) {
if (!(job = zhashx_lookup (ctx->active_jobs, &id))
&& !(job = zhashx_lookup (ctx->inactive_jobs, &id))) {
errstr = "unknown job";
errno = EINVAL;
goto error;
Expand Down

0 comments on commit f800288

Please sign in to comment.