Skip to content

Commit

Permalink
Merge f73df10 into 3b62c49
Browse files Browse the repository at this point in the history
  • Loading branch information
grondo committed May 17, 2018
2 parents 3b62c49 + f73df10 commit 27ad508
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 138 deletions.
2 changes: 1 addition & 1 deletion src/bindings/lua/wreck.lua
Expand Up @@ -533,7 +533,7 @@ local function task_status (lwj, taskid)
if not tonumber (taskid) then return nil end
local t = lwj[taskid]
if not t.exit_status then
return 0, (t.procdesc and "starting" or "running")
return 0, "running"
end
local x = t.exit_code or (t.exit_sig + 128)
return x, exit_message (t)
Expand Down
180 changes: 54 additions & 126 deletions src/modules/wreck/wrexecd.c
Expand Up @@ -43,13 +43,14 @@
#include <lua.h>
#include <lauxlib.h>

#include <jansson.h>

#include <flux/core.h>

#include "src/common/liboptparse/optparse.h"
#include "src/common/libutil/xzmalloc.h"
#include "src/common/libutil/sds.h"
#include "src/common/libutil/fdwalk.h"
#include "src/common/libutil/shortjson.h"
#include "src/common/libsubprocess/zio.h"
#include "src/common/libpmi/simple_server.h"
#include "src/common/libkz/kz.h"
Expand Down Expand Up @@ -332,8 +333,11 @@ const char * prog_ctx_getopt (struct prog_ctx *ctx, const char *opt)

int prog_ctx_setopt (struct prog_ctx *ctx, const char *opt, const char *val)
{
char *cpy = strdup (val);
if (cpy == NULL)
wlog_fatal (ctx, 1, "prog_ctx_setopt(%s=%s): Out of memory", opt, val);
wlog_debug (ctx, "Setting option %s = %s", opt, val);
zhash_insert (ctx->options, opt, strdup (val));
zhash_insert (ctx->options, opt, cpy);
zhash_freefn (ctx->options, opt, (zhash_free_fn *) free);
return (0);
}
Expand Down Expand Up @@ -770,16 +774,16 @@ struct prog_ctx * prog_ctx_create (void)
}

int json_array_to_argv (struct prog_ctx *ctx,
json_object *o, char ***argvp, int *argcp)
json_t *o, char ***argvp, int *argcp)
{
int i;
if (json_object_get_type (o) != json_type_array) {
if (json_typeof (o) != JSON_ARRAY) {
wlog_err (ctx, "json_array_to_argv: not an array");
errno = EINVAL;
return (-1);
}

*argcp = json_object_array_length (o);
*argcp = json_array_size (o);
if (*argcp <= 0) {
wlog_err (ctx, "json_array_to_argv: array length = %d", *argcp);
return (-1);
Expand All @@ -788,13 +792,14 @@ int json_array_to_argv (struct prog_ctx *ctx,
*argvp = xzmalloc ((*argcp + 1) * sizeof (char **));

for (i = 0; i < *argcp; i++) {
json_object *ox = json_object_array_get_idx (o, i);
if (json_object_get_type (ox) != json_type_string) {
json_t *ox = json_array_get (o, i);
if (json_typeof (ox) != JSON_STRING) {
wlog_err (ctx, "malformed cmdline");
free (*argvp);
return (-1);
}
(*argvp) [i] = strdup (json_object_get_string (ox));
if (!((*argvp) [i] = strdup (json_string_value (ox))))
wlog_fatal (ctx, 1, "json_array_to_argv: strdup: Out of memory");
}
return (0);
}
Expand Down Expand Up @@ -822,42 +827,42 @@ int prog_ctx_options_init (struct prog_ctx *ctx)
i = flux_kvsitr_create (opts);
while ((opt = flux_kvsitr_next (i))) {
char *json_str;
json_object *v;
json_t *v;
char s [64];

if (flux_kvsdir_get (opts, opt, &json_str) < 0) {
wlog_err (ctx, "skipping option '%s': %s", opt, flux_strerror (errno));
continue;
}

if (!(v = json_tokener_parse (json_str))) {
if (!(v = json_loads (json_str, JSON_DECODE_ANY, NULL))) {
wlog_err (ctx, "failed to parse json for option '%s'", opt);
free (json_str);
continue;
}

switch (json_object_get_type (v)) {
case json_type_null:
switch (json_typeof (v)) {
case JSON_NULL:
prog_ctx_setopt (ctx, opt, "");
break;
case json_type_string:
prog_ctx_setopt (ctx, opt, json_object_get_string (v));
case JSON_STRING:
prog_ctx_setopt (ctx, opt, json_string_value (v));
break;
case json_type_int:
snprintf (s, sizeof (s) -1, "%"PRId64,
json_object_get_int64 (v));
case JSON_INTEGER:
snprintf (s, sizeof (s) -1, "%ju",
(uintmax_t) json_integer_value (v));
prog_ctx_setopt (ctx, opt, s);
break;
case json_type_boolean:
if (json_object_get_boolean (v))
prog_ctx_setopt (ctx, opt, "");
case JSON_TRUE:
prog_ctx_setopt (ctx, opt, "");
case JSON_FALSE:
break;
default:
wlog_err (ctx, "skipping option '%s': invalid type", opt);
break;
}
free (json_str);
json_object_put (v);
json_decref (v);
}
flux_kvsitr_destroy (i);
flux_kvsdir_destroy (opts);
Expand Down Expand Up @@ -953,7 +958,7 @@ int prog_ctx_load_lwj_info (struct prog_ctx *ctx)
{
int i;
char *json_str;
json_object *v;
json_t *v;
flux_future_t *f = NULL;
char *key;

Expand Down Expand Up @@ -983,7 +988,7 @@ int prog_ctx_load_lwj_info (struct prog_ctx *ctx)
if (flux_kvsdir_get (ctx->kvs, "cmdline", &json_str) < 0)
wlog_fatal (ctx, 1, "kvs_get: cmdline");

if (!(v = json_tokener_parse (json_str)))
if (!(v = json_loads (json_str, 0, NULL)))
wlog_fatal (ctx, 1, "kvs_get: cmdline: json parser failed");

if (json_array_to_argv (ctx, v, &ctx->argv, &ctx->argc) < 0)
Expand All @@ -995,9 +1000,9 @@ int prog_ctx_load_lwj_info (struct prog_ctx *ctx)

wlog_msg (ctx, "lwj %" PRIi64 ": node%d: nprocs=%d, nnodes=%d, cmdline=%s",
ctx->id, ctx->rankinfo.nodeid, ctx->rankinfo.ntasks,
ctx->nnodes, json_object_to_json_string (v));
ctx->nnodes, json_str);
free (json_str);
json_object_put (v);
json_decref (v);

return (0);
}
Expand Down Expand Up @@ -1196,54 +1201,10 @@ int rexec_state_change (struct prog_ctx *ctx, const char *state)
}


json_object * json_task_info_object_create (struct prog_ctx *ctx,
const char *cmd, pid_t pid)
{
json_object *o = json_object_new_object ();
json_object *ocmd = json_object_new_string (cmd);
json_object *opid = json_object_new_int (pid);
json_object *onodeid = json_object_new_int (ctx->noderank);
json_object_object_add (o, "command", ocmd);
json_object_object_add (o, "pid", opid);
json_object_object_add (o, "nodeid", onodeid);
return (o);
}

int rexec_taskinfo_put (struct prog_ctx *ctx, int localid)
{
json_object *o;
char *key;
int rc;
struct task_info *t = ctx->task [localid];

o = json_task_info_object_create (ctx, ctx->argv [0], t->pid);

if (asprintf (&key, "%d.procdesc", t->globalid) < 0) {
errno = ENOMEM;
wlog_fatal (ctx, 1, "rexec_taskinfo_put: asprintf: %s",
flux_strerror (errno));
}

rc = flux_kvsdir_put (ctx->kvs, key, json_object_to_json_string (o));
free (key);
json_object_put (o);
//flux_kvs_commit_anon (ctx->flux, 0);

if (rc < 0)
return wlog_err (ctx, "kvs_put failure");
return (0);
}

int send_startup_message (struct prog_ctx *ctx)
{
int i;
const char * state = "running";

for (i = 0; i < ctx->rankinfo.ntasks; i++) {
if (rexec_taskinfo_put (ctx, i) < 0)
return (-1);
}

if (prog_ctx_getopt (ctx, "stop-children-in-exec"))
state = "sync";

Expand All @@ -1262,9 +1223,10 @@ exitstatus_watcher (const char *key, const char *str, void *arg, int err)
struct prog_ctx *ctx = arg;
flux_t *h = ctx->flux;
int count;
json_object *o;
json_t *o;

if (err || !(o = Jfromstr (str))) {
if (err || !(o = json_loads (str, 0, NULL))
|| (json_unpack (o, "{s:i}", "count", &count) < 0)) {
if (err != ENOENT)
flux_log (h, LOG_ERR, "exitstatus_watcher: %s",
err ? flux_strerror (err) : "Jfromstr failed");
Expand All @@ -1274,43 +1236,15 @@ exitstatus_watcher (const char *key, const char *str, void *arg, int err)
/* Once count is fully populated, release the watch on the
* exit_status dir so reactor loop can exit
*/
if (Jget_int (o, "count", &count) && count == ctx->total_ntasks) {
if (count == ctx->total_ntasks) {
flux_kvs_unwatch (h, key);
prog_ctx_remove_completion_ref (ctx, "exit_status");
}

Jput (o);
json_decref (o);
return (0);
}

static json_object *task_exit_tojson (struct task_info *t)
{
char *key = NULL;
char *taskid = NULL;
struct prog_ctx *ctx = t->ctx;
json_object *o;
json_object *e;

if (asprintf (&key, "%s.exit_status", ctx->kvspath) < 0)
return (NULL);
if (asprintf (&taskid, "%d", t->globalid) < 0) {
free (key);
return (NULL);
}

e = Jnew ();
Jadd_int64 (e, taskid, t->status);

o = Jnew ();
Jadd_str (o, "key", key);
Jadd_int (o, "total", ctx->total_ntasks);
json_object_object_add (o, "entries", e);

free (key);
free (taskid);
return (o);
}

static int wait_for_task_exit_aggregate (struct prog_ctx *ctx)
{
int rc = 0;
Expand All @@ -1337,18 +1271,26 @@ static int wait_for_task_exit_aggregate (struct prog_ctx *ctx)

static int aggregator_push_task_exit (struct task_info *t)
{
int rc = 0;
flux_t *h = t->ctx->flux;
int n, rc = 0;
char key [1024];
char idstr [16];
struct prog_ctx *ctx = t->ctx;
flux_t *h = ctx->flux;
flux_future_t *f;
json_object *o = task_exit_tojson (t);

if (o == NULL) {
flux_log_error (h, "task_exit_tojson");
if ((n = snprintf (key, sizeof (key), "%s.exit_status", ctx->kvspath)) < 0
|| (n >= sizeof (key))
|| (n = snprintf (idstr, sizeof (idstr), "%d", t->globalid)) < 0
|| (n >= sizeof (idstr))) {
flux_log_error (h, "aggregator_push_task_exit: snprintf");
return (-1);
}

if (!(f = flux_rpc (h, "aggregator.push", Jtostr (o),
FLUX_NODEID_ANY, 0))) {
if (!(f = flux_rpc_pack (h, "aggregator.push", FLUX_NODEID_ANY, 0,
"{s:s,s:i,s:{s:i}}",
"key", key,
"total", ctx->total_ntasks,
"entries", idstr, t->status))) {
flux_log_error (h, "flux_rpc");
rc = -1;
}
Expand All @@ -1358,7 +1300,6 @@ static int aggregator_push_task_exit (struct task_info *t)
rc = -1;
}
flux_future_destroy (f);
Jput (o);

if (t->ctx->noderank == 0 && t->id == 0)
rc = wait_for_task_exit_aggregate (t->ctx);
Expand Down Expand Up @@ -2047,9 +1988,7 @@ void ev_cb (flux_t *f, flux_msg_handler_t *mw,
const flux_msg_t *msg, struct prog_ctx *ctx)
{
int base;
json_object *o = NULL;
const char *topic;
const char *json_str;

if (flux_msg_get_topic (msg, &topic) < 0) {
wlog_err (ctx, "flux_msg_get_topic: %s", flux_strerror (errno));
Expand All @@ -2060,26 +1999,14 @@ void ev_cb (flux_t *f, flux_msg_handler_t *mw,
flux_heartbeat_decode (msg, &ctx->epoch);
return;
}
if (flux_msg_get_json (msg, &json_str) < 0) {
wlog_err (ctx, "flux_msg_get_json");
return;
}
if (json_str && !(o = json_tokener_parse (json_str))) {
wlog_err (ctx, "json_tokener_parse");
return;
}


base = strlen (ctx->topic);
if (strcmp (topic+base, "kill") == 0) {
json_object *ox;
/* Default signal is 9 (SIGKILL) unless overidden by event payload */
int sig = 9;
if (json_object_object_get_ex (o, "signal", &ox))
sig = json_object_get_int (ox);
flux_msg_unpack (msg, "{s:i}", "signal", &sig);
wlog_msg (ctx, "Killing jobid %" PRIi64 " with signal %d", ctx->id, sig);
prog_ctx_signal (ctx, sig);
}
json_object_put (o);
}

int task_info_io_setup (struct task_info *t)
Expand Down Expand Up @@ -2309,7 +2236,8 @@ int prog_ctx_get_id (struct prog_ctx *ctx, optparse_t *p)

if (!optparse_getopt (p, "kvs-path", &kvspath))
wlog_fatal (ctx, 1, "Required arg --kvs-path missing");
ctx->kvspath = strdup (kvspath);
if (!(ctx->kvspath = strdup (kvspath)))
wlog_fatal (ctx, 1, "prog_ctx_get_id: strdup failed");

if (!optparse_getopt (p, "lwj-id", &id)) {
/* Assume lwj id is last component of kvs-path */
Expand Down
11 changes: 0 additions & 11 deletions t/t2001-jsc.t
Expand Up @@ -195,10 +195,6 @@ test_expect_success 'jstat 7.4: basic query works: rdesc' '
flux jstat query 1 rdesc
'

test_expect_success 'jstat 7.5: basic query works: pdesc' '
flux jstat query 1 pdesc
'

test_expect_success 'jstat 8: query detects bad inputs' '
test_expect_code 42 flux jstat query 0 jobid &&
test_expect_code 42 flux jstat query 99999 state-pair &&
Expand All @@ -215,13 +211,6 @@ EOF
test_cmp expected.9.1 output.9.1
"

test_expect_success 'jstat 10: update procdescs' "
flux kvs get --json $(flux wreck kvs-path 1).0.procdesc > output.10.1 &&
flux jstat update 1 pdesc '{\"pdesc\": {\"procsize\":1, \"hostnames\":[\"0\"], \"executables\":[\"fake\"], \"pdarray\":[{\"pid\":8482,\"eindx\":0,\"hindx\":0}]}}' &&
flux kvs get --json $(flux wreck kvs-path 1).0.procdesc > output.10.2 &&
test_expect_code 1 diff output.10.1 output.10.2
"

test_expect_success 'jstat 11: update rdesc' "
flux jstat update 1 rdesc '{\"rdesc\": {\"nnodes\": 128, \"ntasks\": 128, \"ncores\":128, \"walltime\":3600}}' &&
flux kvs get --json $(flux wreck kvs-path 1).ntasks > output.11.1 &&
Expand Down

0 comments on commit 27ad508

Please sign in to comment.