Skip to content

Commit

Permalink
Merge eec7e1c into 3ed8516
Browse files Browse the repository at this point in the history
  • Loading branch information
dongahn committed Apr 24, 2018
2 parents 3ed8516 + eec7e1c commit 40a2cc7
Show file tree
Hide file tree
Showing 8 changed files with 239 additions and 18 deletions.
60 changes: 59 additions & 1 deletion resrc/resrc.c
Expand Up @@ -125,6 +125,44 @@ void resrc_api_fini (resrc_api_ctx_t *ctx)
/* tree_root_resrc should already have been destroyed */
}

resrc_api_map_t *resrc_api_map_new ()
{
resrc_api_map_t *o = (resrc_api_map_t *)malloc (sizeof (resrc_api_map_t));
o->map = zhash_new ();
return o;
}

void resrc_api_map_destroy (resrc_api_map_t **m)
{
if (!m || !*m)
return;
if ((*m)->map)
zhash_destroy (&((*m)->map));
free (*m);
*m = NULL;
}

void *resrc_api_map_get (resrc_api_map_t *m, const char *key)
{
if (!m || !key || !(m->map))
return NULL;
return zhash_lookup (m->map, key);
}

void resrc_api_map_put (resrc_api_map_t *m, const char *key, void *val)
{
if (!m || !key || !(m->map))
return;
zhash_insert (m->map, key, (void *)val);
}

void resrc_api_map_rm (resrc_api_map_t *m, const char *key)
{
if (!m || !key || !(m->map))
return;
zhash_delete (m->map, key);
}

char *resrc_type (resrc_t *resrc)
{
if (resrc)
Expand Down Expand Up @@ -1080,7 +1118,6 @@ resrc_t *resrc_generate_hwloc_resources (resrc_api_ctx_t *ctx,
return resrc;
}


int resrc_to_json (json_t *o, resrc_t *resrc)
{
char uuid[40];
Expand All @@ -1100,6 +1137,27 @@ int resrc_to_json (json_t *o, resrc_t *resrc)
return rc;
}

int resrc_to_json_lite (json_t *o, resrc_t *resrc, bool reduce)
{
if (!resrc)
return -1;

if (!reduce) {
Jadd_str (o, resrc_type (resrc), resrc_name (resrc));
if (resrc_digest (resrc))
Jadd_str (o, "digest", resrc_digest (resrc));
} else {
const char *val = NULL;
if (Jget_str (o, resrc_type (resrc), &val))
Jadd_str (o, resrc_type (resrc), xasprintf ("%s,%"PRId64"",
val, resrc_id (resrc)));
else
Jadd_str (o, resrc_type (resrc), xasprintf ("%"PRId64"",
resrc_id (resrc)));
}
return 0;
}

char *resrc_to_string (resrc_t *resrc)
{
char *buf;
Expand Down
7 changes: 7 additions & 0 deletions resrc/resrc.h
Expand Up @@ -216,6 +216,13 @@ resrc_t *resrc_generate_hwloc_resources (resrc_api_ctx_t *ctx, TOPOLOGY topo,
*/
int resrc_to_json (json_t *o, resrc_t *resrc);

/*
* Add the lightweight input resource to the json object
* If reduce=true, only concatenate the resrc's id into
* o (e.g., 0,2,3,4).
*/
int resrc_to_json_lite (json_t *o, resrc_t *resrc, bool reduce);

/*
* Print details of a specific resource to a string buffer
* and return to the caller. The caller must free it.
Expand Down
10 changes: 9 additions & 1 deletion resrc/resrc_api.h
@@ -1,8 +1,12 @@
#ifndef _FLUX_RESRC_API_H
#define _FLUX_RESRC_API_H

#define REDUCE_UNDER_ME 1
#define GATHER_UNDER_ME 2
#define NONE_UNDER_ME 3

typedef struct resrc_api_ctx resrc_api_ctx_t;
typedef struct resrc_api_map resrc_api_map_t;

/*
* Create and initialize an instance of resource API and
Expand All @@ -19,7 +23,11 @@ resrc_api_ctx_t *resrc_api_init (void);
*/
void resrc_api_fini (resrc_api_ctx_t *ctx);

/* Pls add other API-level specialization functions here */
resrc_api_map_t *resrc_api_map_new ();
void resrc_api_map_destroy (resrc_api_map_t **m);
void *resrc_api_map_get (resrc_api_map_t *m, const char *key);
void resrc_api_map_put (resrc_api_map_t *m, const char *key, void *val);
void resrc_api_map_rm (resrc_api_map_t *m, const char *key);

#endif /* !_FLUX_RESRC_API_H */

Expand Down
4 changes: 4 additions & 0 deletions resrc/resrc_api_internal.h
Expand Up @@ -15,6 +15,10 @@ struct resrc_api_ctx {
/* additional API-level data here */
};

struct resrc_api_map {
zhash_t *map;
};

#endif /* !_FLUX_RESRC_API_INTERNAL_H */

/*
Expand Down
51 changes: 50 additions & 1 deletion resrc/resrc_tree.c
Expand Up @@ -42,7 +42,6 @@ struct resrc_tree_list {
zlist_t *list;
};


struct resrc_tree {
resrc_tree_t *parent;
resrc_t *resrc;
Expand Down Expand Up @@ -182,6 +181,56 @@ int resrc_tree_serialize (json_t *o, resrc_tree_t *resrc_tree)
return rc;
}

int resrc_tree_serialize_lite (json_t *gather, json_t *reduce,
resrc_tree_t *resrc_tree,
resrc_api_map_t *gather_m,
resrc_api_map_t *reduce_m)
{
int rc = 0;
void *val = NULL;
json_t *m_o = NULL;
json_t *m_gather = gather;
json_t *m_reduce = reduce;
const char *type = NULL;
bool reduce_under_me = false;

if (!resrc_tree)
return -1;

type = resrc_type (resrc_tree->resrc);
if ((val = resrc_api_map_get (reduce_m, type))) {
/* once the resource is reduced, please stop descending */
return resrc_to_json_lite (reduce, resrc_tree->resrc, true);
} else if ((val = resrc_api_map_get (gather_m, type))) {
m_o = Jnew ();
/* gather types require new json to accumulate the subtree */
reduce_under_me = ((intptr_t)val == REDUCE_UNDER_ME);
m_reduce = (reduce_under_me)? Jnew () : reduce;
m_gather = (!reduce_under_me)? Jnew_ar () : gather;
rc = resrc_to_json_lite (m_o, resrc_tree->resrc, false);
}

if (resrc_tree_num_children (resrc_tree)) {
resrc_tree_t *r = NULL;
resrc_tree_list_t *rl = resrc_tree->children;
for (r = resrc_tree_list_first (rl); r; r = resrc_tree_list_next (rl))
rc += resrc_tree_serialize_lite (m_gather, m_reduce, r,
gather_m, reduce_m);

if (m_o) {
if (reduce_under_me && json_object_size (m_reduce) == 0)
rc += -1;
json_t *co = (reduce_under_me)? m_reduce : m_gather;
json_object_set_new (m_o, "children", co);
}
}

if (m_o)
json_array_append (gather, m_o);

return rc;
}

resrc_tree_t *resrc_tree_deserialize (resrc_api_ctx_t *ctx, json_t *o,
resrc_tree_t *parent)
{
Expand Down
33 changes: 33 additions & 0 deletions resrc/resrc_tree.h
Expand Up @@ -68,6 +68,39 @@ void resrc_tree_print (resrc_tree_t *resrc_tree);
*/
int resrc_tree_serialize (json_t *o, resrc_tree_t *resrc_tree);

/*!
* Add the lightweight, programmable resource tree to the json object.
* gather_m and reduce_m control what and how specific types of
* resources will be serialized. Resource types passed in through
* gather_m map will be simply gathered while types passed in
* through reduce_m will be reduced.
*
* Gather form: [{"node": "quartz1", "children": {...}},
* {"node": "quartz2", "children": {...}}]
*
* Reduce form: {"core": "0,1,2"}
*
* Note that once a resource is "reduced", it can no longer have children
* resources serialized under it.
*
* \param gather[out] json array used for gathering resources whose type
* is contained within gather_m.
* \param reduce[out] json object used for reducing resources whose type
* is contained within reduce_m.
* \param resrc_tree resrc_tree object to serialize.
* \param gather_m map that contains resoures types to serialize
* in the gather form. If the value of the type is
* REDUCE_UNDER_ME, the first offspring resource type
* under it will be serialized in the reduce form.
* \param reduce_m map that contains resoures types to serialize
* in the reduce form.
*
*/
int resrc_tree_serialize_lite (json_t *gather, json_t *reduce,
resrc_tree_t *resrc_tree,
resrc_api_map_t *gather_m,
resrc_api_map_t *reduce_m);

/*
* Create a resource tree from a json object
*/
Expand Down
24 changes: 23 additions & 1 deletion resrc/test/tresrc.c
Expand Up @@ -355,6 +355,28 @@ static int test_a_resrc (resrc_api_ctx_t *rsapi, resrc_t *resrc, bool rdl)
printf ("The found resources serialized: %s\n", Jtostr (o));
}

json_t *gather = Jnew_ar ();
json_t *reduce = Jnew ();
resrc_api_map_t *gather_map = resrc_api_map_new ();
resrc_api_map_put (gather_map, "node", (void *)(intptr_t)REDUCE_UNDER_ME);
resrc_api_map_t *reduce_map = resrc_api_map_new ();
resrc_api_map_put (reduce_map, "core", (void *)(intptr_t)NONE_UNDER_ME);

init_time ();
rc = resrc_tree_serialize_lite (gather, reduce, found_tree,
gather_map, reduce_map);
ok (!rc, "found resource lightweight serialization took: %lf",
((double)get_time ())/1000000);

if (verbose) {
printf ("The lightweight serialization: %s\n",
json_dumps (gather, JSON_INDENT (3)));
}
Jput (gather);
Jput (reduce);
resrc_api_map_destroy (&gather_map);
resrc_api_map_destroy (&reduce_map);

/* serialized resrcs should be deserialized into a new API instance */
resrc_api_ctx_t *new_rsapi = resrc_api_init ();
deserialized_tree = resrc_tree_deserialize (new_rsapi, o, NULL);
Expand Down Expand Up @@ -545,7 +567,7 @@ int main (int argc, char *argv[])
{
int rc1 = 1, rc2 = 1;

plan (25 + num_temporal_allocation_tests);
plan (27 + num_temporal_allocation_tests);
test_temporal_allocation ();
rc1 = test_using_reader_rdl ();
rc2 = test_using_reader_hwloc ();
Expand Down
68 changes: 54 additions & 14 deletions sched/sched.c
Expand Up @@ -1224,18 +1224,15 @@ static inline void bridge_update_timer (ssrvctx_t *ctx)
queue_timer_change (ctx, "sched");
}

static inline int bridge_rs2rank_tab_query (ssrvctx_t *ctx, resrc_t *r,
uint32_t *rank)
static inline int bridge_rs2rank_tab_query (ssrvctx_t *ctx, const char *name,
const char *digest, uint32_t *rank)
{
int rc = -1;
if (ctx->sctx.in_sim) {
rc = rs2rank_tab_query_by_none (ctx->machs, resrc_digest (r),
false, rank);
rc = rs2rank_tab_query_by_none (ctx->machs, digest, false, rank);
} else {
flux_log (ctx->h, LOG_INFO, "hostname: %s, digest: %s", resrc_name (r),
resrc_digest (r));
rc = rs2rank_tab_query_by_sign (ctx->machs, resrc_name (r),
resrc_digest (r), false, rank);
flux_log (ctx->h, LOG_INFO, "hostname: %s, digest: %s", name, digest);
rc = rs2rank_tab_query_by_sign (ctx->machs, name, digest, false, rank);
}
if (rc == 0)
flux_log (ctx->h, LOG_INFO, "broker found, rank: %"PRIu32, *rank);
Expand Down Expand Up @@ -1314,7 +1311,7 @@ static int build_contain_req (ssrvctx_t *ctx, flux_lwj_t *job, resrc_tree_t *rt,
}
}
} else {
if (bridge_rs2rank_tab_query (ctx, r, &rank))
if (bridge_rs2rank_tab_query (ctx, resrc_name (r), resrc_digest (r), &rank))
goto done;
else {
int cores = job->req->corespernode ? job->req->corespernode :
Expand All @@ -1332,6 +1329,36 @@ static int build_contain_req (ssrvctx_t *ctx, flux_lwj_t *job, resrc_tree_t *rt,
return rc;
}

static int resolve_rank (ssrvctx_t *ctx, json_t *o)
{
int rc = -1;
size_t index = 0;
json_t *value = NULL;

json_array_foreach (o, index, value) {
uint32_t rank = 0;
char *hn = NULL;
char *digest = NULL;
if (json_unpack (value, "{s:s s:s}", "node", &hn, "digest", &digest))
goto done;
if (bridge_rs2rank_tab_query (ctx, hn, digest, &rank))
goto done;

json_t *j_rank = json_integer ((json_int_t)rank);
if (json_object_del (value, "node"))
goto done;
if (json_object_del (value, "digest"))
goto done;
if (json_object_set_new (value, "rank", j_rank))
goto done;
}
rc = 0;

done:
return rc;
}


/*
* Once the job gets allocated to its own copy of rdl, this
* 1) serializes the rdl and sends it to TP exec service
Expand All @@ -1344,22 +1371,32 @@ static int req_tpexec_allocate (ssrvctx_t *ctx, flux_lwj_t *job)
int rc = -1;
flux_t *h = ctx->h;
json_t *jcb = Jnew ();
json_t *ro = Jnew ();
json_t *arr = Jnew_ar ();

if (resrc_tree_serialize (ro, job->resrc_tree)) {
json_t *gat = Jnew_ar ();
json_t *red = Jnew ();
resrc_api_map_t *gmap = resrc_api_map_new ();
resrc_api_map_t *rmap = resrc_api_map_new ();

resrc_api_map_put (gmap, "node", (void *)(intptr_t)REDUCE_UNDER_ME);
resrc_api_map_put (rmap, "core", (void *)(intptr_t)NONE_UNDER_ME);
if (resrc_tree_serialize_lite (gat, red, job->resrc_tree, gmap, rmap)) {
flux_log (h, LOG_ERR, "%"PRId64" resource serialization failed: %s",
job->lwj_id, strerror (errno));
goto done;
} else if (resolve_rank (ctx, gat)) {
flux_log (ctx->h, LOG_ERR, "resolving a hostname to rank failed");
goto done;
}
json_object_set_new (jcb, JSC_RDL, ro);

json_object_set_new (jcb, JSC_R_LITE, gat);
jcbstr = Jtostr (jcb);
if (jsc_update_jcb (h, job->lwj_id, JSC_RDL, jcbstr) != 0) {
if (jsc_update_jcb (h, job->lwj_id, JSC_R_LITE, jcbstr) != 0) {
flux_log (h, LOG_ERR, "error jsc udpate: %"PRId64" (%s)", job->lwj_id,
strerror (errno));
goto done;
}
Jput (jcb);

jcb = Jnew ();
if (build_contain_req (ctx, job, job->resrc_tree, arr) != 0) {
flux_log (h, LOG_ERR, "error requesting containment for job");
Expand All @@ -1371,6 +1408,7 @@ static int req_tpexec_allocate (ssrvctx_t *ctx, flux_lwj_t *job)
flux_log (h, LOG_ERR, "error updating jcb");
goto done;
}

if ((update_state (h, job->lwj_id, job->state, J_ALLOCATED)) != 0) {
flux_log (h, LOG_ERR, "failed to update the state of job %"PRId64"",
job->lwj_id);
Expand All @@ -1381,6 +1419,8 @@ static int req_tpexec_allocate (ssrvctx_t *ctx, flux_lwj_t *job)
done:
if (jcb)
Jput (jcb);
if (red)
Jput (red);
return rc;
}

Expand Down

0 comments on commit 40a2cc7

Please sign in to comment.