Skip to content

Commit

Permalink
Merge 852d53b into 12a1880
Browse files Browse the repository at this point in the history
  • Loading branch information
garlick committed May 3, 2018
2 parents 12a1880 + 852d53b commit 3984f47
Show file tree
Hide file tree
Showing 18 changed files with 195 additions and 372 deletions.
17 changes: 4 additions & 13 deletions doc/man3/tmrpc_then.c
@@ -1,36 +1,28 @@
#include <inttypes.h>
#include <flux/core.h>
#include "src/common/libutil/shortjson.h"
#include "src/common/libutil/log.h"

void get_rank (flux_mrpc_t *mrpc, void *arg)
{
const char *json_str;
json_object *o;
const char *rank;
uint32_t nodeid;

if (flux_mrpc_get_nodeid (mrpc, &nodeid) < 0)
log_err_exit ("flux_mrpc_get_nodeid");
if (flux_mrpc_get (mrpc, &json_str) < 0)
if (flux_mrpc_get_unpack (mrpc, "{s:s}", "value", &rank) < 0)
log_err_exit ("flux_mrpc_get");
if (!json_str
|| !(o = Jfromstr (json_str))
|| !Jget_str (o, "value", &rank))
log_msg_exit ("response protocol error");
printf ("[%" PRIu32 "] rank is %s\n", nodeid, rank);
Jput (o);
}

int main (int argc, char **argv)
{
flux_t *h;
flux_mrpc_t *mrpc;
json_object *o = Jnew();

Jadd_str (o, "name", "rank");
if (!(h = flux_open (NULL, 0)))
log_err_exit ("flux_open");
if (!(mrpc = flux_mrpc (h, "attr.get", Jtostr (o), "all", 0)))
if (!(mrpc = flux_mrpc_pack (h, "attr.get", "all", 0,
"{s:s}", "name", "rank")))
log_err_exit ("flux_mrpc");
if (flux_mrpc_then (mrpc, get_rank, NULL) < 0)
log_err_exit ("flux_mrpc_then");
Expand All @@ -39,6 +31,5 @@ int main (int argc, char **argv)

flux_mrpc_destroy (mrpc);
flux_close (h);
Jput (o);
return (0);
}
24 changes: 9 additions & 15 deletions doc/man3/treduce.c
@@ -1,7 +1,7 @@
#include <stdio.h>
#include <inttypes.h>
#include <flux/core.h>
#include "src/common/libutil/shortjson.h"
#include <jansson.h>
#include "src/common/libutil/nodeset.h"
#include "src/common/libutil/xzmalloc.h"

Expand Down Expand Up @@ -41,13 +41,11 @@ void forward (flux_reduce_t *r, int batchnum, void *arg)
flux_future_t *f;

while ((item = flux_reduce_pop (r))) {
json_object *out = Jnew ();
Jadd_int (out, "batchnum", batchnum);
Jadd_str (out, "nodeset", item);
f = flux_rpc (ctx->h, "treduce.forward", Jtostr (out),
FLUX_NODEID_UPSTREAM, FLUX_RPC_NORESPONSE);
f = flux_rpc_pack (ctx->h, "treduce.forward",
FLUX_NODEID_UPSTREAM, FLUX_RPC_NORESPONSE,
"{s:i s:s}", "batchnum", batchnum,
"nodeset", item);
flux_future_destroy (f);
Jput (out);
free (item);
}
}
Expand Down Expand Up @@ -77,21 +75,17 @@ void forward_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
struct context *ctx = arg;
const char *json_str, *nodeset_str;
json_object *in = NULL;
const char *nodeset_str;
int batchnum;
char *item;

if (flux_request_decode (msg, NULL, &json_str) < 0
|| !json_str
|| !(in = Jfromstr (json_str))
|| !Jget_int (in, "batchnum", &batchnum)
|| !Jget_str (in, "nodeset", &nodeset_str))
if (flux_request_unpack (msg, NULL, "{s:i s:s}",
"batchnum", &batchnum,
"nodeset", &nodeset_str) < 0)
return;
item = xstrdup (nodeset_str);
if (flux_reduce_append (ctx->r, item, batchnum) < 0)
free (item);
Jput (in);
}

void heartbeat_cb (flux_t *h, flux_msg_handler_t *mh,
Expand Down
72 changes: 23 additions & 49 deletions src/cmd/cmdhelp.c
Expand Up @@ -27,11 +27,11 @@
#include <glob.h>
#include <string.h>
#include <czmq.h>
#include <jansson.h>

#include "src/common/libutil/log.h"
#include "src/common/libutil/xzmalloc.h"
#include "src/common/libutil/sds.h"
#include "src/common/libutil/shortjson.h"

#include "cmdhelp.h"

Expand Down Expand Up @@ -74,44 +74,18 @@ static void cmd_list_destroy (zlist_t *zl)
zlist_destroy (&zl);
}

static json_object *command_list_file_read (const char *path)
static json_t *command_list_file_read (const char *path)
{
FILE *fp = NULL;
json_object *o = NULL;
enum json_tokener_error e = json_tokener_success;
json_tokener *tok = json_tokener_new ();
json_t *o;
json_error_t error;

if (tok == NULL) {
log_msg ("json_tokener_new: Out of memory");
return (NULL);
}

if (!(fp = fopen (path, "r"))) {
log_err ("%s", path);
return (NULL);
}

do {
int len;
char buf [4096];
char *s;
if (!(s = fgets (buf, sizeof (buf), fp)))
break;
len = strlen (s);
o = json_tokener_parse_ex (tok, s, len);
} while ((e = json_tokener_get_error(tok)) == json_tokener_continue);

fclose (fp);
json_tokener_free (tok);

if (!o || e != json_tokener_success) {
log_msg ("%s: %s", path, o ? json_tokener_error_desc (e) : "premature EOF");
return (NULL);
if (!(o = json_load_file (path, 0, &error))) {
log_msg ("%s::%d: %s", path, error.line, error.text);
return NULL;
}

if (json_object_get_type (o) != json_type_array) {
if (!json_is_array (o)) {
log_msg ("%s: not a JSON array", path);
json_object_put (o);
json_decref (o);
return (NULL);
}

Expand All @@ -123,26 +97,27 @@ static int command_list_read (zhash_t *h, const char *path)
int i;
int rc = -1;
int n = 0;
json_object *o = NULL;
struct array_list *l;

o = command_list_file_read (path);
json_t *o = NULL;

if (!(l = json_object_get_array (o))) {
log_msg ("%s: failed to get array list from JSON", path);
if (!(o = command_list_file_read (path)))
goto out;
}

n = array_list_length (l);
n = json_array_size (o);
for (i = 0; i < n; i++) {
json_object *entry = array_list_get_idx (l, i);
const char *category;
const char *command;
const char *description;
json_t *entry;
zlist_t *zl;
if (!Jget_str (entry, "category", &category)
|| !Jget_str (entry, "command", &command)
|| !Jget_str (entry, "description", &description)) {

if (!(entry = json_array_get (o, i))) {
log_msg ("%s: entry %d is not an object", path, i);
goto out;
}
if (json_unpack (entry, "{s:s s:s s:s}",
"category", &category,
"command", &command,
"description", &description) < 0) {
log_msg ("%s: Missing element in JSON entry %d", path, i);
goto out;
}
Expand All @@ -161,8 +136,7 @@ static int command_list_read (zhash_t *h, const char *path)
rc = 0;

out:
if (o)
json_object_put (o);
json_decref (o);
return (rc);

}
Expand Down
1 change: 0 additions & 1 deletion src/cmd/flux-comms.c
Expand Up @@ -33,7 +33,6 @@
#include <inttypes.h>

#include "src/common/libutil/log.h"
#include "src/common/libutil/shortjson.h"


#define OPTIONS "+hr:"
Expand Down
67 changes: 31 additions & 36 deletions src/cmd/flux-jstat.c
Expand Up @@ -31,11 +31,12 @@
#include <signal.h>
#include <unistd.h>
#include <stdbool.h>
#include <jansson.h>
#include <inttypes.h>
#include <flux/core.h>

#include "src/common/libutil/log.h"
#include "src/common/libutil/xzmalloc.h"
#include "src/common/libutil/shortjson.h"


/******************************************************************************
Expand Down Expand Up @@ -111,20 +112,6 @@ static FILE *open_test_outfile (const char *fn)
return fp;
}

static inline void get_jobid (json_object *jcb, int64_t *j)
{
Jget_int64 (jcb, JSC_JOBID, j);
}

static inline void get_states (json_object *jcb, int64_t *os, int64_t *ns)
{
json_object *o = NULL;
Jget_obj (jcb, JSC_STATE_PAIR, &o);
Jget_int64 (o, JSC_STATE_PAIR_OSTATE, os);
Jget_int64 (o, JSC_STATE_PAIR_NSTATE, ns);
}


/******************************************************************************
* *
* Async notification callback *
Expand All @@ -133,32 +120,31 @@ static inline void get_states (json_object *jcb, int64_t *os, int64_t *ns)

static int job_status_cb (const char *jcbstr, void *arg, int errnum)
{
int64_t os = 0;
int64_t ns = 0;
int64_t j = 0;
int os;
int ns;
jstatctx_t *ctx = NULL;
flux_t *h = (flux_t *)arg;
json_object *jcb = NULL;
json_t *jcb = NULL;

ctx = getctx (h);
if (errnum > 0) {
flux_log (ctx->h, LOG_ERR, "job_status_cb: errnum passed in");
return -1;
}

if (!(jcb = Jfromstr (jcbstr))) {
if (!(jcb = json_loads (jcbstr, 0, NULL))
|| json_unpack (jcb, "{s:{s:i s:i}}", JSC_STATE_PAIR,
JSC_STATE_PAIR_OSTATE, &os,
JSC_STATE_PAIR_NSTATE, &ns) < 0) {
flux_log (ctx->h, LOG_ERR, "job_status_cb: error parsing JSON string");
json_decref (jcb);
return -1;
}
get_jobid (jcb, &j);
get_states (jcb, &os, &ns);
Jput (jcb);

fprintf (ctx->op, "%s->%s\n",
jsc_job_num2state ((job_state_t)os),
jsc_job_num2state ((job_state_t)ns));
fprintf (ctx->op, "%s->%s\n", jsc_job_num2state (os),
jsc_job_num2state (ns));
fflush (ctx->op);

json_decref (jcb);
return 0;
}

Expand Down Expand Up @@ -192,21 +178,30 @@ static int handle_notify_req (flux_t *h, const char *ofn)

static int handle_query_req (flux_t *h, int64_t j, const char *k, const char *n)
{
json_object *jcb = NULL;
jstatctx_t *ctx = NULL;
char *jcbstr;
jstatctx_t *ctx = getctx (h);
json_t *jcb = NULL;
char *jcbstr = NULL;
char *jcbstr_pretty = NULL;

ctx = getctx (h);
ctx->op = n? open_test_outfile (n) : stdout;
ctx->op = n ? open_test_outfile (n) : stdout;
if (jsc_query_jcb (h, j, k, &jcbstr) != 0) {
flux_log (h, LOG_ERR, "jsc_query_jcb reported an error");
return -1;
}
jcb = Jfromstr (jcbstr);
if (!(jcb = json_loads (jcbstr, 0, NULL))) {
flux_log (h, LOG_ERR, "error loading jcbstr");
goto done;
}
if (!(jcbstr_pretty = json_dumps (jcb, JSON_INDENT(2)))) {
flux_log (h, LOG_ERR, "error dumping jcbstr");
goto done;
}

fprintf (ctx->op, "Job Control Block: attribute %s for job %"PRIi64"\n", k, j);
fprintf (ctx->op, "%s\n", jcb == NULL ? jcbstr :
json_object_to_json_string_ext (jcb, JSON_C_TO_STRING_PRETTY));
Jput (jcb);
fprintf (ctx->op, "%s\n", jcbstr_pretty);
done:
free (jcbstr_pretty);
json_decref (jcb);
free (jcbstr);
return 0;
}
Expand Down
1 change: 0 additions & 1 deletion src/cmd/flux.c
Expand Up @@ -40,7 +40,6 @@

#include "src/common/libutil/log.h"
#include "src/common/libutil/xzmalloc.h"
#include "src/common/libutil/shortjson.h"
#include "src/common/libutil/environment.h"

#include "cmdhelp.h"
Expand Down
1 change: 0 additions & 1 deletion src/common/libflux/test/module.c
@@ -1,7 +1,6 @@
#include <argz.h>
#include <flux/core.h>

#include "src/common/libutil/shortjson.h"
#include "src/common/libutil/xzmalloc.h"
#include "src/common/libtap/tap.h"

Expand Down

0 comments on commit 3984f47

Please sign in to comment.