Skip to content

Commit

Permalink
Merge 9a4ba90 into 86d5b32
Browse files Browse the repository at this point in the history
  • Loading branch information
dongahn committed Apr 24, 2018
2 parents 86d5b32 + 9a4ba90 commit 7936861
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 20 deletions.
30 changes: 15 additions & 15 deletions src/cmd/flux-jstat.c
Expand Up @@ -41,7 +41,7 @@
/******************************************************************************
* *
* Internal types, macros and static variables *
* *
* *
******************************************************************************/
typedef struct {
flux_t *h;
Expand All @@ -62,7 +62,7 @@ static const struct option longopts[] = {
/******************************************************************************
* *
* Utilities *
* *
* *
******************************************************************************/
static void usage (int code)
{
Expand Down Expand Up @@ -104,9 +104,9 @@ static void sig_handler (int s)
static FILE *open_test_outfile (const char *fn)
{
FILE *fp;
if (!fn)
fp = NULL;
else if ( !(fp = fopen (fn, "w")))
if (!fn)
fp = NULL;
else if ( !(fp = fopen (fn, "w")))
fprintf (stderr, "Failed to open %s\n", fn);
return fp;
}
Expand All @@ -128,7 +128,7 @@ static inline void get_states (json_object *jcb, int64_t *os, int64_t *ns)
/******************************************************************************
* *
* Async notification callback *
* *
* *
******************************************************************************/

static int job_status_cb (const char *jcbstr, void *arg, int errnum)
Expand All @@ -154,8 +154,8 @@ static int job_status_cb (const char *jcbstr, void *arg, int errnum)
get_states (jcb, &os, &ns);
Jput (jcb);

fprintf (ctx->op, "%s->%s\n",
jsc_job_num2state ((job_state_t)os),
fprintf (ctx->op, "%s->%s\n",
jsc_job_num2state ((job_state_t)os),
jsc_job_num2state ((job_state_t)ns));
fflush (ctx->op);

Expand All @@ -174,17 +174,17 @@ static int handle_notify_req (flux_t *h, const char *ofn)
jstatctx_t *ctx = NULL;

sig_flux_h = h;
if (signal (SIGINT, sig_handler) == SIG_ERR)
if (signal (SIGINT, sig_handler) == SIG_ERR)
return -1;

ctx = getctx (h);
ctx->op = (ofn)? open_test_outfile (ofn) : stdout;

if (jsc_notify_status (h, job_status_cb, (void *)h) != 0) {
flux_log (h, LOG_ERR, "failed to reg a job status change CB");
return -1;
return -1;
}
if (flux_reactor_run (flux_get_reactor (h), 0) < 0)
if (flux_reactor_run (flux_get_reactor (h), 0) < 0)
flux_log (h, LOG_ERR, "error in flux_reactor_run");

return 0;
Expand All @@ -211,7 +211,7 @@ static int handle_query_req (flux_t *h, int64_t j, const char *k, const char *n)
return 0;
}

static int handle_update_req (flux_t *h, int64_t j, const char *k,
static int handle_update_req (flux_t *h, int64_t j, const char *k,
const char *jcbstr, const char *n)
{
jstatctx_t *ctx = NULL;
Expand All @@ -230,7 +230,7 @@ static int handle_update_req (flux_t *h, int64_t j, const char *k,

int main (int argc, char *argv[])
{
flux_t *h;
flux_t *h;
int ch = 0;
int rc = 0;
char *cmd = NULL;
Expand Down Expand Up @@ -267,15 +267,15 @@ int main (int argc, char *argv[])
else if (!strcmp ("query", cmd) && optind == argc - 2) {
j = (const char *)(*(argv+optind));
attr = (const char *)(*(argv+optind+1));
rc = handle_query_req (h, strtol (j, NULL, 10), attr, ofn);
rc = handle_query_req (h, strtol (j, NULL, 10), attr, ofn);
}
else if (!strcmp ("update", cmd) && optind == argc - 3) {
j = (const char *)(*(argv+optind));
attr = (const char *)(*(argv+optind+1));
jcbstr = (const char *)(*(argv+optind+2));
rc = handle_update_req (h, strtol (j, NULL, 10), attr, jcbstr, ofn);
}
else
else
usage (1);

flux_close (h);
Expand Down
1 change: 1 addition & 0 deletions src/common/libjsc/README.md
Expand Up @@ -74,6 +74,7 @@ service.
| rdesc | JSC_RDESC | dictionary | Information on the resources owned by this job. See Table 3-3. |
| rdl | JSC_RDL | string | RDL binary string allocated to the job |
| rdl_alloc | JSC_RDL\_ALLOC | array of per-broker resources | Resource descriptor array (Resources allocated per broker - rank order). See Table 3-4.|
| R_lite | JSC_R\_LITE | string | R\_lite serialized JSON string allocated to the job |
| pdesc | JSC_PDESC | dictionary | Information on the processes spawned by this job. See Table 3-5. |

**Table 3-1** Keys and values of top-level JCB attributes
Expand Down
77 changes: 75 additions & 2 deletions src/common/libjsc/jstatctl.c
Expand Up @@ -433,7 +433,7 @@ static int extract_raw_rdl (flux_t *h, int64_t j, char **rdlstr)
flux_future_t *f = NULL;

if (!key || !(f = flux_kvs_lookup (h, 0, key))
|| flux_kvs_lookup_get_unpack (f, "s", &s) < 0) {
|| flux_kvs_lookup_get (f, &s) < 0) {
flux_log_error (h, "extract %s", key);
rc = -1;
}
Expand All @@ -446,6 +446,27 @@ static int extract_raw_rdl (flux_t *h, int64_t j, char **rdlstr)
return rc;
}

static int extract_raw_r_lite (flux_t *h, int64_t j, char **rlitestr)
{
int rc = 0;
char *key = lwj_key (h, j, ".R_lite");
const char *s;
flux_future_t *f = NULL;

if (!key || !(f = flux_kvs_lookup (h, 0, key))
|| flux_kvs_lookup_get (f, &s) < 0) {
flux_log_error (h, "extract %s", key);
rc = -1;
}
else {
*rlitestr = xstrdup (s);
flux_log (h, LOG_DEBUG, "R_lite under %s extracted", key);
}
free (key);
flux_future_destroy (f);
return rc;
}

static int extract_raw_state (flux_t *h, int64_t j, int64_t *s)
{
int rc = 0;
Expand Down Expand Up @@ -681,6 +702,21 @@ static int query_rdl (flux_t *h, int64_t j, json_object **jcb)
return 0;
}

static int query_r_lite (flux_t *h, int64_t j, json_object **jcb)
{
char *rlitestr = NULL;

if (extract_raw_r_lite (h, j, &rlitestr) < 0) return -1;

*jcb = Jnew ();
Jadd_str (*jcb, JSC_R_LITE, (const char *)rlitestr);
/* Note: seems there is no mechanism to transfer ownership
* of this string to jcb */
if (rlitestr)
free (rlitestr);
return 0;
}

static int query_rdl_alloc (flux_t *h, int64_t j, json_object **jcb)
{
*jcb = Jnew ();
Expand Down Expand Up @@ -842,7 +878,7 @@ static int update_rdl (flux_t *h, int64_t j, const char *rs)
flux_log_error (h, "txn_create");
goto done;
}
if (flux_kvs_txn_pack (txn, 0, key, "s", rs) < 0) {
if (flux_kvs_txn_put (txn, 0, key, rs) < 0) {
flux_log_error (h, "update %s", key);
goto done;
}
Expand All @@ -861,6 +897,36 @@ static int update_rdl (flux_t *h, int64_t j, const char *rs)
return rc;
}

static int update_r_lite (flux_t *h, int64_t j, const char *rs)
{
int rc = -1;
char *key = lwj_key (h, j, ".R_lite");
flux_kvs_txn_t *txn = NULL;
flux_future_t *f = NULL;

if (!(txn = flux_kvs_txn_create ())) {
flux_log_error (h, "txn_create");
goto done;
}
if (flux_kvs_txn_put (txn, 0, key, rs) < 0) {
flux_log_error (h, "update %s", key);
goto done;
}
if (!(f = flux_kvs_commit (h, 0, txn)) || flux_future_get (f, NULL) < 0) {
flux_log_error (h, "commit failed");
goto done;
}
flux_log (h, LOG_DEBUG, "job (%"PRId64") assigned new R_lite.", j);
rc = 0;

done:
flux_kvs_txn_destroy (txn);
flux_future_destroy (f);
free (key);

return rc;
}

static int update_hash_1ra (flux_t *h, int64_t j, json_object *o, zhash_t *rtab)
{
int rc = 0;
Expand Down Expand Up @@ -1299,6 +1365,9 @@ int jsc_query_jcb (flux_t *h, int64_t jobid, const char *key, char **jcb_str)
} else if (!strcmp (key, JSC_RDL)) {
if ( (rc = query_rdl (h, jobid, &jcb)) < 0)
flux_log (h, LOG_ERR, "query_rdl failed");
} else if (!strcmp (key, JSC_R_LITE)) {
if ( (rc = query_r_lite (h, jobid, &jcb)) < 0)
flux_log (h, LOG_ERR, "query_r_lite failed");
} else if (!strcmp (key, JSC_RDL_ALLOC)) {
if ( (rc = query_rdl_alloc (h, jobid, &jcb)) < 0)
flux_log (h, LOG_ERR, "query_rdl_alloc failed");
Expand Down Expand Up @@ -1341,6 +1410,10 @@ int jsc_update_jcb (flux_t *h, int64_t jobid, const char *key,
const char *s = NULL;
if (Jget_str (jcb, JSC_RDL, &s))
rc = update_rdl (h, jobid, s);
} else if (!strcmp (key, JSC_R_LITE)) {
const char *s = NULL;
if (Jget_str (jcb, JSC_R_LITE, &s))
rc = update_r_lite (h, jobid, s);
} else if (!strcmp (key, JSC_RDL_ALLOC)) {
if (Jget_obj (jcb, JSC_RDL_ALLOC, &o))
rc = update_rdl_alloc (h, jobid, o);
Expand Down
1 change: 1 addition & 0 deletions src/common/libjsc/jstatctl.h
Expand Up @@ -73,6 +73,7 @@ typedef int (*jsc_handler_f)(const char *base_jcb, void *arg, int errnum);
# define JSC_RDESC_NGPUS "ngpus"
# define JSC_RDESC_WALLTIME "walltime"
#define JSC_RDL "rdl"
#define JSC_R_LITE "R_lite"
#define JSC_RDL_ALLOC "rdl_alloc"
# define JSC_RDL_ALLOC_CONTAINED "contained"
# define JSC_RDL_ALLOC_CONTAINING_RANK "cmbdrank"
Expand Down
15 changes: 12 additions & 3 deletions t/t2001-jsc.t
Expand Up @@ -230,15 +230,15 @@ EOF
"

test_expect_success 'jstat 12: update rdl' "
flux jstat update 1 rdl '{\"rdl\": \"fake_rdl_string\"}' &&
flux jstat update 1 rdl '{\"rdl\": {\"cluster\": \"fake_rdl_string\"}}' &&
flux kvs get --json $(flux wreck kvs-path 1).rdl > output.12.1 &&
cat > expected.12.1 <<-EOF &&
fake_rdl_string
{\"cluster\": \"fake_rdl_string\"}
EOF
test_cmp expected.12.1 output.12.1
"

test_expect_success 'jstat 13: update rdl_alloc' "
test_expect_success 'jstat 13.1: update rdl_alloc' "
flux jstat update 1 rdl_alloc '{\"rdl_alloc\": [{\"contained\": {\"cmbdrank\": 0, \"cmbdncores\": 102, \"cmbdngpus\": 4}}]}' &&
flux kvs get --json $(flux wreck kvs-path 1).rank.0.cores > output.13.1 &&
flux kvs get --json $(flux wreck kvs-path 1).rank.0.gpus >> output.13.1 &&
Expand All @@ -249,6 +249,15 @@ EOF
test_cmp expected.13.1 output.13.1
"

test_expect_success 'jstat 13.2: update r_lite' "
flux jstat update 1 R_lite '{\"R_lite\": [{\"children\": {\"core\": \"0\"}, \"rank\": 0}]}' &&
flux kvs get --json $(flux wreck kvs-path 1).R_lite > output.13.2 &&
cat > expected.13.2 <<-EOF &&
[{\"children\": {\"core\": \"0\"}, \"rank\": 0}]
EOF
test_cmp expected.13.2 output.13.2
"

test_expect_success 'jstat 14: update detects bad inputs' "
test_expect_code 42 flux jstat update 1 jobid '{\"jobid\": 1}' &&
test_expect_code 42 flux jstat update 0 rdesc '{\"rdesc\": {\"nnodes\": 128, \"ntasks\": 128, \"ncores\":128, \"walltime\": 1800}}' &&
Expand Down

0 comments on commit 7936861

Please sign in to comment.