From c1a36da0fa751a67ad8b1e734bc5f4c5f2eee760 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 20 Apr 2018 10:59:10 -0700 Subject: [PATCH 01/10] wreck: add wreck_job_list() to wreck_job class Add a function to produce an object containing a JSON array that is a sorted list of jobs from a zhash_t of struct wreck_job entries. Each time the state of a job is updated with wreck_job_set_state(), set "mtime" of the job to the current time. Then sort the jobs in the array in mtime order. Limit the number of jobs returned in array to 'max' if > 0. If non-NULL, 'include_states' and 'exclude_states' are parsed as comma-separated lists of job states to include or exclude from the list, with blacklist/whitelist semantics. --- src/modules/wreck/wreck_job.c | 153 ++++++++++++++++++++++++++++++++-- src/modules/wreck/wreck_job.h | 20 ++++- 2 files changed, 162 insertions(+), 11 deletions(-) diff --git a/src/modules/wreck/wreck_job.c b/src/modules/wreck/wreck_job.c index aee82c942172..965c8ff5d0c0 100644 --- a/src/modules/wreck/wreck_job.c +++ b/src/modules/wreck/wreck_job.c @@ -25,9 +25,12 @@ #if HAVE_CONFIG_H #include "config.h" #endif +#include #include #include #include +#include +#include #include "wreck_job.h" @@ -63,18 +66,24 @@ struct wreck_job *wreck_job_create (void) void wreck_job_set_state (struct wreck_job *job, const char *status) { - assert (strlen (status) < sizeof (job->state)); - strcpy (job->state, status); + if (job && status != NULL && strlen (status) < sizeof (job->state)) { + strcpy (job->state, status); + clock_gettime (CLOCK_MONOTONIC, &job->mtime); + } } const char *wreck_job_get_state (struct wreck_job *job) { - return job->state; + return job ? job->state : ""; } int wreck_job_insert (struct wreck_job *job, zhash_t *hash) { hashkey_t key; + if (!job || !hash) { + errno = EINVAL; + return -1; + } if (zhash_lookup (hash, idkey (key, job->id)) != NULL) { errno = EEXIST; return -1; @@ -89,6 +98,10 @@ struct wreck_job *wreck_job_lookup (int64_t id, zhash_t *hash) hashkey_t key; struct wreck_job *job; + if (id <= 0 || !hash) { + errno = EINVAL; + return NULL; + } if (!(job = zhash_lookup (hash, idkey (key, id)))) { errno = ENOENT; return NULL; @@ -99,20 +112,142 @@ struct wreck_job *wreck_job_lookup (int64_t id, zhash_t *hash) void wreck_job_delete (int64_t id, zhash_t *hash) { hashkey_t key; - zhash_delete (hash, idkey (key, id)); + if (id > 0 && hash != NULL) + zhash_delete (hash, idkey (key, id)); } void wreck_job_set_aux (struct wreck_job *job, void *item, flux_free_f destroy) { - if (job->aux_destroy) - job->aux_destroy (job->aux); - job->aux = item; - job->aux_destroy = destroy; + if (job != NULL) { + if (job->aux_destroy) + job->aux_destroy (job->aux); + job->aux = item; + job->aux_destroy = destroy; + } } void *wreck_job_get_aux (struct wreck_job *job) { - return job->aux; + return job ? job->aux : NULL; +} + +/* Sort jobs in reverse mtime order. + * This has the zlist_compare_fn footprint and can be used with zlist_sort(). + */ +static int compare_job_mtime (struct wreck_job *j1, struct wreck_job *j2) +{ + if (j1->mtime.tv_sec < j2->mtime.tv_sec) + return 1; + if (j1->mtime.tv_sec > j2->mtime.tv_sec) + return -1; + if (j1->mtime.tv_nsec < j2->mtime.tv_nsec) + return 1; + if (j1->mtime.tv_nsec > j2->mtime.tv_nsec) + return -1; + return 0; +} + +/* If 'key' is one of the strings in an argz vector, return true. + */ +static bool findz (const char *argz, size_t argz_len, const char *key) +{ + const char *entry = NULL; + if (argz && key) { + while ((entry = argz_next (argz, argz_len, entry)) != NULL) + if (!strcmp (key, entry)) + return true; + } + return false; +} + +/* If 'key' survives black/white list filtering, return true. + */ +static bool bw_test (const char *key, const char *b, size_t b_len, + const char *w, size_t w_len) +{ + if (b && findz (b, b_len, key)) + return false; + if (w && !findz (w, w_len, key)) + return false; + return true; +} + +char *wreck_job_list (zhash_t *hash, int max, const char *include_states, + const char *exclude_states) +{ + json_t *array = NULL; + json_t *obj = NULL; + zlist_t *joblist = NULL; + char *json_str; + struct wreck_job *job; + char *white = NULL; + size_t white_len = 0; + char *black = NULL; + size_t black_len = 0; + int saved_errno; + + if (!hash) { + errno = EINVAL; + goto error; + } + if (include_states) { + if (argz_create_sep (include_states, ',', &white, &white_len) != 0) + goto nomem; + } + if (exclude_states) { + if (argz_create_sep (exclude_states, ',', &black, &black_len) != 0) + goto nomem; + } + if (!(joblist = zlist_new ())) + goto nomem; + if (!(array = json_array ())) + goto nomem; + job = zhash_first (hash); + while (job != NULL) { + if (bw_test (job->state, black, black_len, white, white_len)) { + if (zlist_append (joblist, job) < 0) + goto nomem; + } + job = zhash_next (hash); + } + zlist_sort (joblist, (zlist_compare_fn *)compare_job_mtime); + job = zlist_first (joblist); + while (job != NULL) { + json_t *entry; + if (!(entry = json_pack ("{s:I s:s s:s}", + "jobid", job->id, + "kvs_path", job->kvs_path ? job->kvs_path : "", + "state", job->state))) + goto nomem; + if (json_array_append_new (array, entry) < 0) { + json_decref (entry); + goto nomem; + } + if (max > 0 && json_array_size (array) == max) + break; + job = zlist_next (joblist); + } + if (!(obj = json_pack ("{s:O}", "jobs", array))) + goto nomem; + if (!(json_str = json_dumps (obj, JSON_COMPACT))) + goto error; + zlist_destroy (&joblist); + json_decref (array); + json_decref (obj); + free (black); + free (white); + return json_str; +nomem: + errno = ENOMEM; +error: + saved_errno = errno; + zlist_destroy (&joblist); + json_decref (array); + json_decref (obj); + free (black); + free (white); + errno = saved_errno; + return NULL; } /* diff --git a/src/modules/wreck/wreck_job.h b/src/modules/wreck/wreck_job.h index d9ea16fca283..c88af7c50c3b 100644 --- a/src/modules/wreck/wreck_job.h +++ b/src/modules/wreck/wreck_job.h @@ -1,6 +1,7 @@ #ifndef HAVE_WJOB_H #define HAVE_WJOB_H +#include #include #include #include @@ -16,13 +17,15 @@ struct wreck_job { int walltime; void *aux; flux_free_f aux_destroy; + struct timespec mtime; }; void wreck_job_destroy (struct wreck_job *job); struct wreck_job *wreck_job_create (void); /* Set job status. - * 'status' must be a string of 15 characters or less or function will assert. + * 'status' must be a string of 15 characters or less. + * wreck_job_get_state() returns an empty string if job is NULL. */ void wreck_job_set_state (struct wreck_job *job, const char *status); const char *wreck_job_get_state (struct wreck_job *job); @@ -40,10 +43,23 @@ int wreck_job_insert (struct wreck_job *job, zhash_t *hash); void wreck_job_delete (int64_t id, zhash_t *hash); /* Look up job in hash by id. - * Returns job on success, NULL on failure. + * Returns job on success, NULL on failure with errno set. */ struct wreck_job *wreck_job_lookup (int64_t id, zhash_t *hash); +/* List entries in a hash of jobs, returning a serialized JSON object. + * The object contains a single array entry under the key "jobs". + * The array contains entries sorted by reverse "mtime" order of the form: + * {"jobid":I "kvs_path":s "state":s} + * If max > 0, only the first 'max' entries are added to the array. + * If include_states or exclude_states are non-NULL, they represent a comma- + * separated list of states to include or exclude from the list, respectively. + * Return a string-encoded JSON object (caller must free) or NULL on failure + * with errno set. + */ +char *wreck_job_list (zhash_t *hash, int max, const char *include_states, + const char *exclude_states); + /* Associate data and optional destructor with job. */ void wreck_job_set_aux (struct wreck_job *job, void *item, flux_free_f destroy); From 97e44dbc2ca556271637702198916993c9c426fa Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 20 Apr 2018 10:32:21 -0700 Subject: [PATCH 02/10] wreck: track/list state of active jobs Track the state of active jobs in a hash in the job module. A job is no longer active when it reaches the "complete" or "failed" states. Add a 'job.list' RPC which returns a list of active jobs in the form returned by wreck_jobs_list(), passing max, include, and exclude parameters in the request to the function. --- src/modules/wreck/job.c | 78 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 77 insertions(+), 1 deletion(-) diff --git a/src/modules/wreck/job.c b/src/modules/wreck/job.c index 543a7d75c214..8cee2b8aae8e 100644 --- a/src/modules/wreck/job.c +++ b/src/modules/wreck/job.c @@ -76,6 +76,8 @@ static int kvs_bits_per_dir = 7; uint32_t broker_rank; const char *local_uri = NULL; +zhash_t *active_jobs = NULL; + /* * Return as 64bit integer the portion of integer `n` * masked from bit position `a` to position `b`, @@ -733,12 +735,77 @@ static void runevent_cb (flux_t *h, flux_msg_handler_t *w, flux_future_destroy (f); } +/* Track job state transition in active_jobs hash + * Currently only id, kvs_path, and state are tracked. + */ +static void wreck_state_cb (flux_t *h, flux_msg_handler_t *w, + const flux_msg_t *msg, void *arg) +{ + int64_t id; + const char *topic; + const char *kvs_path; + struct wreck_job *job; + + if (flux_event_unpack (msg, &topic, "{s:I s:s}", + "jobid", &id, + "kvs_path", &kvs_path) < 0) + goto error; + topic += 12; // state comes after "wreck.state." (12 chars) + if (strlen (topic) == 0 || strlen (topic) >= sizeof (job->state)) { + errno = EPROTO; + goto error; + } + if (!(job = wreck_job_lookup (id, active_jobs))) { + if (!(job = wreck_job_create ())) + goto error; + job->id = id; + if (!(job->kvs_path = strdup (kvs_path))) + goto error_destroy; + if (wreck_job_insert (job, active_jobs) < 0) + goto error_destroy; + } + wreck_job_set_state (job, topic); + if (!strcmp (job->state, "complete") || !strcmp (job->state, "failed")) + wreck_job_delete (id, active_jobs); + return; +error_destroy: + wreck_job_destroy (job); +error: + flux_log_error (h, "%s", __FUNCTION__); +} + +static void job_list_cb (flux_t *h, flux_msg_handler_t *w, + const flux_msg_t *msg, void *arg) +{ + char *json_str = NULL; + int max = 0; + const char *include = NULL; + const char *exclude = NULL; + + if (flux_request_unpack (msg, NULL, "{s?:i s?:s s?:s}", + "max", &max, + "include", &include, + "exclude", &exclude) < 0) + goto error; + if (!(json_str = wreck_job_list (active_jobs, max, include, exclude))) + goto error; + if (flux_respond (h, msg, 0, json_str) < 0) + flux_log_error (h, "%s: flux_respond", __FUNCTION__); + free (json_str); + return; +error: + if (flux_respond (h, msg, errno, NULL) < 0) + flux_log_error (h, "%s: flux_respond", __FUNCTION__); +} + static const struct flux_msg_handler_spec mtab[] = { { FLUX_MSGTYPE_REQUEST, "job.create", job_create_cb, 0 }, { FLUX_MSGTYPE_REQUEST, "job.submit", job_create_cb, 0 }, { FLUX_MSGTYPE_REQUEST, "job.submit-nocreate", job_submit_only, 0 }, { FLUX_MSGTYPE_REQUEST, "job.kvspath", job_kvspath_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "job.list", job_list_cb, 0 }, { FLUX_MSGTYPE_EVENT, "wrexec.run.*", runevent_cb, 0 }, + { FLUX_MSGTYPE_EVENT, "wreck.state.*", wreck_state_cb, 0 }, FLUX_MSGHANDLER_TABLE_END }; @@ -747,14 +814,22 @@ int mod_main (flux_t *h, int argc, char **argv) flux_msg_handler_t **handlers = NULL; int rc = -1; + if (!(active_jobs = zhash_new ())) { + flux_log_error (h, "zhash_new"); + return (-1); + } if (flux_msg_handler_addvec (h, mtab, NULL, &handlers) < 0) { flux_log_error (h, "flux_msg_handler_addvec"); - return (-1); + goto done; } if ((flux_event_subscribe (h, "wrexec.run.") < 0)) { flux_log_error (h, "flux_event_subscribe"); goto done; } + if ((flux_event_subscribe (h, "wreck.state.") < 0)) { + flux_log_error (h, "flux_event_subscribe"); + goto done; + } if ((flux_attr_get_int (h, "wreck.lwj-dir-levels", &kvs_dir_levels) < 0) && (flux_attr_set_int (h, "wreck.lwj-dir-levels", kvs_dir_levels) < 0)) { @@ -786,6 +861,7 @@ int mod_main (flux_t *h, int argc, char **argv) rc = 0; done: flux_msg_handler_delvec (handlers); + zhash_destroy (&active_jobs); return rc; } From a783029c81a20fc49d3bb4311eff4f40e98fd383 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Fri, 20 Apr 2018 11:23:52 -0700 Subject: [PATCH 03/10] wreck/test: add unit test for wreck_job_list() --- src/modules/wreck/test/wreck_job.c | 144 ++++++++++++++++++++++++++++- 1 file changed, 143 insertions(+), 1 deletion(-) diff --git a/src/modules/wreck/test/wreck_job.c b/src/modules/wreck/test/wreck_job.c index 41b0abc0e592..cb4a52e636c3 100644 --- a/src/modules/wreck/test/wreck_job.c +++ b/src/modules/wreck/test/wreck_job.c @@ -2,6 +2,7 @@ #include "config.h" #endif #include +#include #include "src/common/libtap/tap.h" #include "src/modules/wreck/wreck_job.h" @@ -34,13 +35,30 @@ void basic (void) "wreck_job_set_aux calls destructor when aux overwritten"); wreck_job_destroy (job); ok (free_fun_count == 2, - "wreck_job_destry calls aux destructor"); + "wreck_job_destroy calls aux destructor"); +} + +static int count_entries (const char *json_str) +{ + json_t *obj = NULL; + json_t *array = NULL; + int n = 0; + + if (!json_str || !(obj = json_loads (json_str, 0, NULL)) + || json_unpack (obj, "{s:o}", "jobs", &array) < 0) + BAIL_OUT ("JSON parse error"); + n = json_array_size (array); + json_decref (obj); + return n; } void hash (void) { zhash_t *h = zhash_new (); struct wreck_job *job; + char *s = NULL; + json_t *obj = NULL; + json_t *array = NULL; if (!h) BAIL_OUT ("zhash_new failed"); @@ -49,6 +67,7 @@ void hash (void) if (!job) BAIL_OUT ("wreck_job_create failed"); job->id = 42; + wreck_job_set_state (job, "submitted"); ok (wreck_job_insert (job, h) == 0 && zhash_size (h) == 1, "wreck_job_insert 42 works"); @@ -56,6 +75,7 @@ void hash (void) if (!job) BAIL_OUT ("wreck_job_create failed"); job->id = 43; + wreck_job_set_state (job, "complete"); ok (wreck_job_insert (job, h) == 0 && zhash_size (h) == 2, "wreck_job_insert 43 works"); @@ -74,18 +94,140 @@ void hash (void) ok (job == NULL && errno == ENOENT, "wreck_job_lookup 2 fails with ENOENT"); + /* List has two entries, one "complete", one "submitted". + */ + s = wreck_job_list (h, 0, NULL, NULL); + ok (s != NULL && (obj = json_loads (s, 0, NULL)) && json_is_object (obj), + "wreck_job_list produces a JSON object"); + diag ("%s", s ? s : "(null)"); + ok (json_unpack (obj, "{s:o}", "jobs", &array) == 0, + "JSON object has one member"); + ok (json_is_array (array) && json_array_size (array) == 2, + "member is array with expected size"); + json_decref (obj); + free (s); + + /* 'max' can limit number of entries returned. + */ + s = wreck_job_list (h, 1, NULL, NULL); + ok (count_entries (s) == 1, + "wreck_job_list max=1 limits entries to 1"); + free (s); + + /* If 'include' and no match (and no 'exclude'), zero entries returned . + */ + s = wreck_job_list (h, 0, "badstate", NULL); + ok (count_entries (s) == 0, + "wreck_job_list include=badstate returns 0 entries"); + free (s); + + /* If 'exclude' and no match (and no 'include'), all entries returned + */ + s = wreck_job_list (h, 0, NULL, "badstate"); + ok (count_entries (s) == 2, + "wreck_job_list exclude=badstate returns 2 entries"); + free (s); + + /* If 'include' and one match (and no 'exclude'), 1 of 2 entries returned + */ + s = wreck_job_list (h, 0, "complete", NULL); + ok (count_entries (s) == 1, + "wreck_job_list include=complete returns 1 entry"); + free (s); + + /* If 'exclude' and one match (and no 'include'), 1 of 2 entries returned + */ + s = wreck_job_list (h, 0, NULL, "complete"); + ok (count_entries (s) == 1, + "wreck_job_list exclude=complete returns 1 entry"); + free (s); + + /* If 'include' and all match (and no 'exclude'), all entries returned + */ + s = wreck_job_list (h, 0, "complete,submitted", NULL); + ok (count_entries (s) == 2, + "wreck_job_list include=complete,submitted returns 2 entries"); + free (s); + + /* If 'exclude' and all match (and no 'include'), zero entries returned + */ + s = wreck_job_list (h, 0, NULL, "complete,submitted"); + ok (count_entries (s) == 0, + "wreck_job_list exclude=complete,submitted returns 0 entries"); + free (s); + free_fun_count = 0; zhash_destroy (&h); ok (free_fun_count == 2, "zhash_destroy frees jobs"); } +void corner (void) +{ + zhash_t *hash; + struct wreck_job *job; + const char *s; + + if (!(hash = zhash_new())) + BAIL_OUT ("zhash_new failed"); + if (!(job = wreck_job_create ())) + BAIL_OUT ("wreck_job_create failed)"); + + lives_ok ({wreck_job_destroy (NULL);}, + "wreck_job_destroy (job=NULL) doesn't crash"); + + lives_ok ({wreck_job_set_state (NULL, "completed");}, + "wreck_job_set_state (job=NULL doesn't crash"); + lives_ok ({wreck_job_set_state (job, NULL);}, + "wreck_job_set_state (state=NULL) doesn't crash"); + lives_ok ({wreck_job_set_state (job, "0123456789abcdef");}, + "wreck_job_set_state (state=too long) doesn't crash"); + + s = wreck_job_get_state (NULL); + ok (s != NULL && strlen (s) == 0, + "wreck_job_get_state (job=NULL) returns empty string"); + + errno = 0; + ok (wreck_job_insert (NULL, hash) < 0 && errno == EINVAL, + "wreck_job_insert (job=NULL) fails with EINVAL"); + errno = 0; + job->id = 42; + ok (wreck_job_insert (job, NULL) < 0 && errno == EINVAL, + "wreck_job_insert (hash=NULL) fails with EINVAL"); + + errno = 0; + ok (wreck_job_lookup (-1, hash) == NULL && errno == EINVAL, + "wreck_job_lookup (id=-1) fails with EINVAL"); + errno = 0; + ok (wreck_job_lookup (0, hash) == NULL && errno == EINVAL, + "wreck_job_lookup (id=0) fails with EINVAL"); + + errno = 0; + ok (wreck_job_lookup (1, NULL) == NULL && errno == EINVAL, + "wreck_job_lookup (hash=NULL) fails with EINVAL"); + + lives_ok ({wreck_job_delete (1, NULL);}, + "wreck_job_delete (job=NULL) doesn't crash"); + lives_ok ({wreck_job_set_aux (NULL, NULL, NULL);}, + "wreck_job_set_aux (job=NULL) doesn't crash"); + lives_ok ({wreck_job_get_aux (NULL);}, + "wreck_job_get_aux (job=NULL) doesn't crash"); + + errno = 0; + ok (wreck_job_list (NULL, 0, NULL, NULL) == NULL && errno == EINVAL, + "wreck_job_list (hash=NULL) fails with EINVAL"); + + zhash_destroy (&hash); + wreck_job_destroy (job); +} + int main (int argc, char *argv[]) { plan (NO_PLAN); basic (); hash (); + corner (); done_testing (); return (0); From 245cd1ff28cfa1e783bc0323b109b865fcd6c843 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 20 Apr 2018 15:27:19 -0700 Subject: [PATCH 04/10] wreck/lua: multiple improvements for wreck.joblist Improve wreck.joblist with the following additions: * retrieve a list of "active" jobs first using the new `job.list` rpc available from the wreck/job module. Inactive jobs are appended to this active list only if the returned list of active jobs does not meet or exceed arg.max. Fixes #1456 * Allow filtering jobs by state in wreck.joblist with a states table with two allowable members - include: include *only* states where include[state] == true - exclude: exclude states in this table where exclude[state] == true These states are passed to `job.list` rpc for the active job list and directly filtered on kvs job state for the jobs retrieved from the kvs. * Add an active_only flag to wreck.joblist which returns immediately after retrieving the active job list from the job module. This effectively skips kvs traversal and all complete and failed jobs. * Add a kvs_only flag to wreck.joblist which skips the retrieval of active jobs from the `job.list` rpc. This avoids an unnecessary rpc when it is known that no active jobs are required to be returned from the function. (Should be used in combination with exclude/include to restrict job states returned from kvs) --- src/bindings/lua/wreck.lua | 84 +++++++++++++++++++++++++++++++++++--- 1 file changed, 79 insertions(+), 5 deletions(-) diff --git a/src/bindings/lua/wreck.lua b/src/bindings/lua/wreck.lua index eebf6937a6d0..888b6c8d512e 100644 --- a/src/bindings/lua/wreck.lua +++ b/src/bindings/lua/wreck.lua @@ -177,9 +177,12 @@ end -- Return kvs path to job id `id` -- local kvs_paths = {} +local function kvs_path_insert (id, path) + kvs_paths [id] = path +end local function kvs_path (f, id) if not kvs_paths[id] then - kvs_paths [id] = job_kvspath (f, id) + kvs_path_insert (id, job_kvspath (f, id)) end return kvs_paths [id] end @@ -187,7 +190,7 @@ end local function kvs_path_multi (f, ids) local result = job_kvspath (f, ids) for i,id in ipairs (ids) do - kvs_paths [id] = result [id] + kvs_path_insert (id, result [id]) end return result end @@ -610,6 +613,12 @@ local function reverse (t) return r end +-- append array t2 to t1 +local function append (t1, t2) + for _,v in ipairs (t2) do table.insert (t1, v) end + return t1 +end + function wreck.jobids_to_kvspath (arg) local f = arg.flux local ids = arg.jobids @@ -619,7 +628,53 @@ function wreck.jobids_to_kvspath (arg) return kvs_path_multi (f, ids) end -function wreck.joblist (arg) +local function include_state (conf, state) + -- Include all states if no state conf is supplied + if not conf then return true end + + -- Otherwise, if there is a conf.include hash then *only* include + -- states that appear in this hash: + if conf.include then return conf.include[state] end + + -- Otherwise, exclude this state if it is in the conf.exclude hash: + if conf.exclude then return not conf.exclude[state] end + + -- No conf.include, no conf.exclude, I guess it is included: + return true +end + +-- Convert keys of table t to a comma-separated list of strings +local function to_string_list (t) + if not t then return nil end + local r = {} + for k,v in pairs (t) do + if v then table.insert (r, k) end + end + return table.concat (r, ",") +end + +-- Return a list and hash of active jobs by kvs path +local function joblist_active (arg) + local f = arg.flux + local conf = arg.states + local r, err = f:rpc ("job.list", + { max = arg.max, + include = to_string_list (conf and conf.include), + exclude = to_string_list (conf and conf.exclude) + }) + if not r then return nil, nil, err end + local active = {} + local results = {} + for _,job in ipairs (r.jobs) do + local path = job.kvs_path + kvs_path_insert (job.jobid, path) + active[path] = true + table.insert (results, path) + end + return results, active +end + +local function joblist_kvs (arg, exclude) local flux = require 'flux' local f = arg.flux if not f then f, err = flux.new () end @@ -632,10 +687,12 @@ function wreck.joblist (arg) for _,k in pairs (dirs) do local path = tostring (d) .. "." .. k local dir = f:kvsdir (path) - if dir then + if dir and (not exclude or not exclude[path]) then if dir.state then -- This is a lwj dir, add to results table: - table.insert (results, path) + if include_state (arg.states, dir.state) then + table.insert (results, path) + end else -- recurse to find lwj dirs lower in the directory tree visit (dir, results) @@ -655,6 +712,23 @@ function wreck.joblist (arg) return reverse (visit (dir)) end +function wreck.joblist (arg) + local results, active = {}, {} + if not arg.kvs_only then + results, active = joblist_active (arg) + if results and arg.max then + arg.max = arg.max - #results + end + if arg.active_only or arg.max == 0 then return results end + end + + -- Append a list of jobs from the kvs to the active jobs + local r, err = joblist_kvs (arg, active) + if not r then return nil, err end + + return append (results or {}, r) +end + local function shortprog () local prog = string.match (arg[0], "([^/]+)$") return prog:match ("flux%-(.+)$") From 085d6d41dc7c2b8ef48270c6e9d60df638f40742 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 20 Apr 2018 16:50:21 -0700 Subject: [PATCH 05/10] wreck: flux-wreck: pass new supported args to wreck.joblist For extensibility, change joblist_from_args from taking a parameter list to a table with parameters set by well-known keys. Include keys supported by the updated `wreck.joblist` function, including: - states dictionary for including, excluding states - kvs_only and active_only flags for skipping `job.list` or kvs walk respectively --- src/cmd/flux-wreck | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/cmd/flux-wreck b/src/cmd/flux-wreck index d0add8504715..0f47c2eede24 100755 --- a/src/cmd/flux-wreck +++ b/src/cmd/flux-wreck @@ -386,10 +386,17 @@ local function bracketify (args) return (r) end -local function joblist_from_args (self, args) +local function joblist_from_args (arg) + local self = arg.self + local args = arg.args + local max = tonumber (self.opt.n) or 25 if #args == 0 then - return wreck.joblist{ flux = f, max = max } + return wreck.joblist{ flux = f, + max = max, + states = arg.states, + active_only = arg.active_only, + kvs_only = arg.kvs_only } end -- otherwise use dirs on cmdline local hl,err = hostlist.union (unpack (bracketify (args))) @@ -407,7 +414,7 @@ prog:SubCommand { }, description = "List jobs in kvs", handler = function (self, arg) - local dirs,err = joblist_from_args (self, arg) + local dirs,err = joblist_from_args { self = self, args = arg } if not dirs then self:die (err) end if #dirs == 0 then return end local fmt = "%6s %6s %-9s %20s %12s %8s %-.13s\n"; @@ -440,7 +447,8 @@ prog:SubCommand { }, description = "List FLUX_URI for jobs that are Flux instances", handler = function (self, arg) - local dirs,err = joblist_from_args (self, arg) + local dirs, err = joblist_from_args { self = self, + args = arg } if not dirs then self:die (err) end if #dirs == 0 then return end local fmt = "%6s %6s %-9s %-40s %-.13s\n"; @@ -478,7 +486,7 @@ prog:SubCommand { }, description = "List timings of jobs in kvs", handler = function (self, arg) - local dirs,err = joblist_from_args (self, arg) + local dirs,err = joblist_from_args {self = self, args = arg} if not dirs then self:die (err) end if #dirs == 0 then return end local fmt = "%6s %12s %12s %12s %12s %12s\n" From 446323b0ffc93a5a6f3e0c1a3d2f53a164c48a8c Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Fri, 20 Apr 2018 16:40:47 -0700 Subject: [PATCH 06/10] wreck: flux-wreck: filter job states in wreck commands Use state filter and kvs_only, active_only flags to wreck.joblist to make some changes to behavior of flux-wreck commands: * Include only running jobs in flux-wreck uri * Skip complete jobs in flux-wreck `ls -x` and `uri` * Do not consider active jobs in flux-wreck purge --- src/cmd/flux-wreck | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/src/cmd/flux-wreck b/src/cmd/flux-wreck index 0f47c2eede24..34c6253e3c98 100755 --- a/src/cmd/flux-wreck +++ b/src/cmd/flux-wreck @@ -410,11 +410,15 @@ prog:SubCommand { options = { { name = "max", char = 'n', arg="COUNT", usage = "Display at most COUNT jobs", + }, + { name = "exclude-complete", char = 'x', + usage = "Skip the listing of complete jobs in the kvs" } }, description = "List jobs in kvs", handler = function (self, arg) - local dirs,err = joblist_from_args { self = self, args = arg } + local dirs,err = joblist_from_args { self = self, args = arg, + active_only = self.opt.x } if not dirs then self:die (err) end if #dirs == 0 then return end local fmt = "%6s %6s %-9s %20s %12s %8s %-.13s\n"; @@ -447,8 +451,12 @@ prog:SubCommand { }, description = "List FLUX_URI for jobs that are Flux instances", handler = function (self, arg) - local dirs, err = joblist_from_args { self = self, - args = arg } + -- Only include running jobs: + local states = { include = { running = true } } + local dirs,err = joblist_from_args { self = self, + args = arg, + states = states, + active_only = true } if not dirs then self:die (err) end if #dirs == 0 then return end local fmt = "%6s %6s %-9s %-40s %-.13s\n"; @@ -653,8 +661,15 @@ prog:SubCommand { --- -- Gather LWJ path list - -- - local dirs = wreck.joblist{ flux = f } or {} + -- Only include complete jobs + local states = { include = { complete = true, + failed = true + } + } + local dirs = wreck.joblist{ flux = f, + kvs_only = true, + states = states + } or {} if verbose then self:log ("%4.03fs: got lwj list (%d entries)\n", tt:get0(), #dirs) end From 392954eaa1fa939e4791005733abacf48af7ca99 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Tue, 24 Apr 2018 07:19:11 -0700 Subject: [PATCH 07/10] wreck: flux-wreck: ensure jobid is a number in kvs_path() Ensure jobid is converted to a number before passing to wreck.id_to_path(). --- src/cmd/flux-wreck | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/cmd/flux-wreck b/src/cmd/flux-wreck index 34c6253e3c98..39fba1fb6963 100755 --- a/src/cmd/flux-wreck +++ b/src/cmd/flux-wreck @@ -53,7 +53,8 @@ end local function kvs_path (id, fmt, ...) - local p = assert (wreck.id_to_path { flux = f, jobid = id }) + local id = assert (tonumber (id)) + local p, err = assert (wreck.id_to_path { flux = f, jobid = id }) if fmt then p = p .. "." .. string.format (fmt, ...) end From de323b83b0cce8237f0ca6a1b6dfe17a312260fe Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Tue, 24 Apr 2018 16:16:08 -0700 Subject: [PATCH 08/10] wreck: flux-wreck ls: allow --max=0 to disable max Allow --max=0 to set maximum number of jobs to return to unlimited. --- src/cmd/flux-wreck | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cmd/flux-wreck b/src/cmd/flux-wreck index 39fba1fb6963..483c32687237 100755 --- a/src/cmd/flux-wreck +++ b/src/cmd/flux-wreck @@ -392,6 +392,7 @@ local function joblist_from_args (arg) local args = arg.args local max = tonumber (self.opt.n) or 25 + if max <= 0 then max = math.huge end if #args == 0 then return wreck.joblist{ flux = f, max = max, From c65d4b20d351b985a09b079a3a1356c8d147dd6f Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Sun, 22 Apr 2018 23:11:46 +0000 Subject: [PATCH 09/10] t2000-wreck: don't unlink kvsdir in wreck ls RANGE test The unlink of the last job's kvs directory in the flux-wreck ls RANGE test confuses flux-wreck purge test later on, since the job completion link under lwj-complete is not similarly removed. Avoid the unlink by instead moving the kvsdir to a temporary location then moving it back again after the `ls` output is captured. --- t/t2000-wreck.t | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/t/t2000-wreck.t b/t/t2000-wreck.t index c5d5dd5fe427..17a335f16127 100755 --- a/t/t2000-wreck.t +++ b/t/t2000-wreck.t @@ -439,8 +439,9 @@ test_expect_success 'flux-wreck: ls RANGE ignores missing jobids' ' LASTID=$(last_job_id) && LWJ=$(last_job_path) && flux wreckrun hostname && - flux kvs unlink -R $LWJ && + flux kvs move $LWJ tmp && flux wreck ls $((${LASTID}-1))-$((${LASTID}+1)) > ls-range.out && + flux kvs move tmp $LWJ && test $(cat ls-range.out | wc -l) = 3 ' test_expect_success 'flux-wreck: purge works' ' From 3932537f262b92b27f4293609f8864671ce9658f Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Sun, 22 Apr 2018 23:52:28 +0000 Subject: [PATCH 10/10] t2000-wreck: add tests to exercise new job.list rpc Add a test to ensure that active jobs show up first in flux-wreck ls output, even when the active job is not the most recent, and ensure that flux-wreck purge doesn't consider "active" (i.e. running) jobs. --- t/t2000-wreck.t | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/t/t2000-wreck.t b/t/t2000-wreck.t index 17a335f16127..af8b7b8e1bb1 100755 --- a/t/t2000-wreck.t +++ b/t/t2000-wreck.t @@ -444,6 +444,20 @@ test_expect_success 'flux-wreck: ls RANGE ignores missing jobids' ' flux kvs move tmp $LWJ && test $(cat ls-range.out | wc -l) = 3 ' +test_expect_success 'flux-wreck: ls lists most recent "active" job first' ' + flux wreckrun -d sleep 100 && + KILL=$(last_job_id) && + flux wreckrun -d sleep 100 && + LASTID=$(last_job_id) && + flux wreckrun hostname && flux wreckrun hostname && + flux wreck ls --max=1 > ls-active.out && + flux wreck kill $KILL && + flux wreck kill $LASTID && + test_debug "echo expecting $LASTID" && + test_debug "cat ls-active.out" && + test $(cat ls-active.out | wc -l) = 2 && + tail -1 ls-active.out | grep ^\ *$LASTID +' test_expect_success 'flux-wreck: purge works' ' flux wreck purge && flux wreck purge -t 2 -R && @@ -451,6 +465,14 @@ test_expect_success 'flux-wreck: purge works' ' COUNT=$(flux wreck ls | grep -v NTASKS | wc -l) && test "$COUNT" = 2 ' +test_expect_success 'flux-wreck: purge excludes active jobs' ' + flux wreckrun -d sleep 100 && + flux wreck purge -v -t 0 -R && + flux wreck kill $(last_job_id) && + flux wreck ls && + COUNT=$(flux wreck ls | grep -v NTASKS | wc -l) && + test "$COUNT" = 1 +' flux module list | grep -q sched || test_set_prereq NO_SCHED test_expect_success NO_SCHED 'flux-submit: returns ENOSYS when sched not loaded' '