Skip to content

Commit

Permalink
Merge 3932537 into 866f4c7
Browse files Browse the repository at this point in the history
  • Loading branch information
garlick committed Apr 26, 2018
2 parents 866f4c7 + 3932537 commit 5903b60
Show file tree
Hide file tree
Showing 7 changed files with 518 additions and 27 deletions.
84 changes: 79 additions & 5 deletions src/bindings/lua/wreck.lua
Expand Up @@ -177,17 +177,20 @@ end
-- Return kvs path to job id `id`
--
local kvs_paths = {}
local function kvs_path_insert (id, path)
kvs_paths [id] = path
end
local function kvs_path (f, id)
if not kvs_paths[id] then
kvs_paths [id] = job_kvspath (f, id)
kvs_path_insert (id, job_kvspath (f, id))
end
return kvs_paths [id]
end

local function kvs_path_multi (f, ids)
local result = job_kvspath (f, ids)
for i,id in ipairs (ids) do
kvs_paths [id] = result [id]
kvs_path_insert (id, result [id])
end
return result
end
Expand Down Expand Up @@ -610,6 +613,12 @@ local function reverse (t)
return r
end

-- append array t2 to t1
local function append (t1, t2)
for _,v in ipairs (t2) do table.insert (t1, v) end
return t1
end

function wreck.jobids_to_kvspath (arg)
local f = arg.flux
local ids = arg.jobids
Expand All @@ -619,7 +628,53 @@ function wreck.jobids_to_kvspath (arg)
return kvs_path_multi (f, ids)
end

function wreck.joblist (arg)
local function include_state (conf, state)
-- Include all states if no state conf is supplied
if not conf then return true end

-- Otherwise, if there is a conf.include hash then *only* include
-- states that appear in this hash:
if conf.include then return conf.include[state] end

-- Otherwise, exclude this state if it is in the conf.exclude hash:
if conf.exclude then return not conf.exclude[state] end

-- No conf.include, no conf.exclude, I guess it is included:
return true
end

-- Convert keys of table t to a comma-separated list of strings
local function to_string_list (t)
if not t then return nil end
local r = {}
for k,v in pairs (t) do
if v then table.insert (r, k) end
end
return table.concat (r, ",")
end

-- Return a list and hash of active jobs by kvs path
local function joblist_active (arg)
local f = arg.flux
local conf = arg.states
local r, err = f:rpc ("job.list",
{ max = arg.max,
include = to_string_list (conf and conf.include),
exclude = to_string_list (conf and conf.exclude)
})
if not r then return nil, nil, err end
local active = {}
local results = {}
for _,job in ipairs (r.jobs) do
local path = job.kvs_path
kvs_path_insert (job.jobid, path)
active[path] = true
table.insert (results, path)
end
return results, active
end

local function joblist_kvs (arg, exclude)
local flux = require 'flux'
local f = arg.flux
if not f then f, err = flux.new () end
Expand All @@ -632,10 +687,12 @@ function wreck.joblist (arg)
for _,k in pairs (dirs) do
local path = tostring (d) .. "." .. k
local dir = f:kvsdir (path)
if dir then
if dir and (not exclude or not exclude[path]) then
if dir.state then
-- This is a lwj dir, add to results table:
table.insert (results, path)
if include_state (arg.states, dir.state) then
table.insert (results, path)
end
else
-- recurse to find lwj dirs lower in the directory tree
visit (dir, results)
Expand All @@ -655,6 +712,23 @@ function wreck.joblist (arg)
return reverse (visit (dir))
end

function wreck.joblist (arg)
local results, active = {}, {}
if not arg.kvs_only then
results, active = joblist_active (arg)
if results and arg.max then
arg.max = arg.max - #results
end
if arg.active_only or arg.max == 0 then return results end
end

-- Append a list of jobs from the kvs to the active jobs
local r, err = joblist_kvs (arg, active)
if not r then return nil, err end

return append (results or {}, r)
end

local function shortprog ()
local prog = string.match (arg[0], "([^/]+)$")
return prog:match ("flux%-(.+)$")
Expand Down
41 changes: 33 additions & 8 deletions src/cmd/flux-wreck
Expand Up @@ -53,7 +53,8 @@ end
local function kvs_path (id, fmt, ...)
local p = assert (wreck.id_to_path { flux = f, jobid = id })
local id = assert (tonumber (id))
local p, err = assert (wreck.id_to_path { flux = f, jobid = id })
if fmt then
p = p .. "." .. string.format (fmt, ...)
end
Expand Down Expand Up @@ -386,10 +387,18 @@ local function bracketify (args)
return (r)
end
local function joblist_from_args (self, args)
local function joblist_from_args (arg)
local self = arg.self
local args = arg.args
local max = tonumber (self.opt.n) or 25
if max <= 0 then max = math.huge end
if #args == 0 then
return wreck.joblist{ flux = f, max = max }
return wreck.joblist{ flux = f,
max = max,
states = arg.states,
active_only = arg.active_only,
kvs_only = arg.kvs_only }
end
-- otherwise use dirs on cmdline
local hl,err = hostlist.union (unpack (bracketify (args)))
Expand All @@ -403,11 +412,15 @@ prog:SubCommand {
options = {
{ name = "max", char = 'n', arg="COUNT",
usage = "Display at most COUNT jobs",
},
{ name = "exclude-complete", char = 'x',
usage = "Skip the listing of complete jobs in the kvs"
}
},
description = "List jobs in kvs",
handler = function (self, arg)
local dirs,err = joblist_from_args (self, arg)
local dirs,err = joblist_from_args { self = self, args = arg,
active_only = self.opt.x }
if not dirs then self:die (err) end
if #dirs == 0 then return end
local fmt = "%6s %6s %-9s %20s %12s %8s %-.13s\n";
Expand Down Expand Up @@ -440,7 +453,12 @@ prog:SubCommand {
},
description = "List FLUX_URI for jobs that are Flux instances",
handler = function (self, arg)
local dirs,err = joblist_from_args (self, arg)
-- Only include running jobs:
local states = { include = { running = true } }
local dirs,err = joblist_from_args { self = self,
args = arg,
states = states,
active_only = true }
if not dirs then self:die (err) end
if #dirs == 0 then return end
local fmt = "%6s %6s %-9s %-40s %-.13s\n";
Expand Down Expand Up @@ -478,7 +496,7 @@ prog:SubCommand {
},
description = "List timings of jobs in kvs",
handler = function (self, arg)
local dirs,err = joblist_from_args (self, arg)
local dirs,err = joblist_from_args {self = self, args = arg}
if not dirs then self:die (err) end
if #dirs == 0 then return end
local fmt = "%6s %12s %12s %12s %12s %12s\n"
Expand Down Expand Up @@ -645,8 +663,15 @@ prog:SubCommand {
---
-- Gather LWJ path list
--
local dirs = wreck.joblist{ flux = f } or {}
-- Only include complete jobs
local states = { include = { complete = true,
failed = true
}
}
local dirs = wreck.joblist{ flux = f,
kvs_only = true,
states = states
} or {}
if verbose then
self:log ("%4.03fs: got lwj list (%d entries)\n", tt:get0(), #dirs)
end
Expand Down
78 changes: 77 additions & 1 deletion src/modules/wreck/job.c
Expand Up @@ -76,6 +76,8 @@ static int kvs_bits_per_dir = 7;
uint32_t broker_rank;
const char *local_uri = NULL;

zhash_t *active_jobs = NULL;

/*
* Return as 64bit integer the portion of integer `n`
* masked from bit position `a` to position `b`,
Expand Down Expand Up @@ -733,12 +735,77 @@ static void runevent_cb (flux_t *h, flux_msg_handler_t *w,
flux_future_destroy (f);
}

/* Track job state transition in active_jobs hash
* Currently only id, kvs_path, and state are tracked.
*/
static void wreck_state_cb (flux_t *h, flux_msg_handler_t *w,
const flux_msg_t *msg, void *arg)
{
int64_t id;
const char *topic;
const char *kvs_path;
struct wreck_job *job;

if (flux_event_unpack (msg, &topic, "{s:I s:s}",
"jobid", &id,
"kvs_path", &kvs_path) < 0)
goto error;
topic += 12; // state comes after "wreck.state." (12 chars)
if (strlen (topic) == 0 || strlen (topic) >= sizeof (job->state)) {
errno = EPROTO;
goto error;
}
if (!(job = wreck_job_lookup (id, active_jobs))) {
if (!(job = wreck_job_create ()))
goto error;
job->id = id;
if (!(job->kvs_path = strdup (kvs_path)))
goto error_destroy;
if (wreck_job_insert (job, active_jobs) < 0)
goto error_destroy;
}
wreck_job_set_state (job, topic);
if (!strcmp (job->state, "complete") || !strcmp (job->state, "failed"))
wreck_job_delete (id, active_jobs);
return;
error_destroy:
wreck_job_destroy (job);
error:
flux_log_error (h, "%s", __FUNCTION__);
}

static void job_list_cb (flux_t *h, flux_msg_handler_t *w,
const flux_msg_t *msg, void *arg)
{
char *json_str = NULL;
int max = 0;
const char *include = NULL;
const char *exclude = NULL;

if (flux_request_unpack (msg, NULL, "{s?:i s?:s s?:s}",
"max", &max,
"include", &include,
"exclude", &exclude) < 0)
goto error;
if (!(json_str = wreck_job_list (active_jobs, max, include, exclude)))
goto error;
if (flux_respond (h, msg, 0, json_str) < 0)
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
free (json_str);
return;
error:
if (flux_respond (h, msg, errno, NULL) < 0)
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
}

static const struct flux_msg_handler_spec mtab[] = {
{ FLUX_MSGTYPE_REQUEST, "job.create", job_create_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "job.submit", job_create_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "job.submit-nocreate", job_submit_only, 0 },
{ FLUX_MSGTYPE_REQUEST, "job.kvspath", job_kvspath_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "job.list", job_list_cb, 0 },
{ FLUX_MSGTYPE_EVENT, "wrexec.run.*", runevent_cb, 0 },
{ FLUX_MSGTYPE_EVENT, "wreck.state.*", wreck_state_cb, 0 },
FLUX_MSGHANDLER_TABLE_END
};

Expand All @@ -747,14 +814,22 @@ int mod_main (flux_t *h, int argc, char **argv)
flux_msg_handler_t **handlers = NULL;
int rc = -1;

if (!(active_jobs = zhash_new ())) {
flux_log_error (h, "zhash_new");
return (-1);
}
if (flux_msg_handler_addvec (h, mtab, NULL, &handlers) < 0) {
flux_log_error (h, "flux_msg_handler_addvec");
return (-1);
goto done;
}
if ((flux_event_subscribe (h, "wrexec.run.") < 0)) {
flux_log_error (h, "flux_event_subscribe");
goto done;
}
if ((flux_event_subscribe (h, "wreck.state.") < 0)) {
flux_log_error (h, "flux_event_subscribe");
goto done;
}

if ((flux_attr_get_int (h, "wreck.lwj-dir-levels", &kvs_dir_levels) < 0)
&& (flux_attr_set_int (h, "wreck.lwj-dir-levels", kvs_dir_levels) < 0)) {
Expand Down Expand Up @@ -786,6 +861,7 @@ int mod_main (flux_t *h, int argc, char **argv)
rc = 0;
done:
flux_msg_handler_delvec (handlers);
zhash_destroy (&active_jobs);
return rc;
}

Expand Down

0 comments on commit 5903b60

Please sign in to comment.