Skip to content

Commit

Permalink
Merge pull request #333 from dongahn/idset
Browse files Browse the repository at this point in the history
resrc: condense core id list in R_lite using libidset
  • Loading branch information
grondo committed May 5, 2018
2 parents 9c7211d + 69be01f commit ce8c530
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 33 deletions.
1 change: 1 addition & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ AX_BOOST_FILESYSTEM
AX_BOOST_GRAPH
AX_BOOST_REGEX
PKG_CHECK_MODULES([JOBSPEC], [flux-jobspec], [], [])
PKG_CHECK_MODULES([IDSET], [flux-idset], [], [])
AC_CHECK_LIB([readline], [readline],
[AC_SUBST([READLINE_LIBS], ["-lreadline"])
AC_DEFINE([HAVE_LIBREADLINE], [1],
Expand Down
4 changes: 2 additions & 2 deletions resrc/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ noinst_HEADERS = resrc_api.h resrc_api_internal.h \
resrc.h resrc_tree.h resrc_flow.h resrc_reqst.h

libflux_resrc_la_SOURCES = resrc.c resrc_tree.c resrc_flow.c resrc_reqst.c
libflux_resrc_la_CFLAGS = $(AM_CFLAGS) -I$(top_srcdir)/rdl
libflux_resrc_la_CFLAGS = $(AM_CFLAGS) -I$(top_srcdir)/rdl $(IDSET_CFLAGS)
libflux_resrc_la_LIBADD = $(top_builddir)/rdl/libflux-rdl.la \
$(top_builddir)/src/common/libutil/libutil.la \
$(DL_LIBS) $(HWLOC_LIBS) $(UUID_LIBS) $(JANSSON_LIBS) $(CZMQ_LIBS)
$(DL_LIBS) $(HWLOC_LIBS) $(UUID_LIBS) $(JANSSON_LIBS) $(CZMQ_LIBS) $(IDSET_LIBS)
libflux_resrc_la_LDFLAGS = $(AM_LDFLAGS) $(fluxlib_ldflags) \
-Wl,--version-script=$(srcdir)/resrc_version.map

Expand Down
16 changes: 10 additions & 6 deletions resrc/resrc_reqst.c
Original file line number Diff line number Diff line change
Expand Up @@ -566,19 +566,23 @@ int64_t resrc_tree_search (resrc_api_ctx_t *ctx,
* and omit the intervening socket.
*/

if (*found_tree)
new_tree = resrc_tree_new (*found_tree, resrc_in);
else {
new_tree = resrc_tree_new (NULL, resrc_in);
*found_tree = new_tree;
}
new_tree = resrc_tree_new (NULL, resrc_in);
children = resrc_tree_children (resrc_phys_tree (resrc_in));
child_tree = resrc_tree_list_first (children);
while (child_tree) {
nfound += resrc_tree_search (ctx, resrc_tree_resrc (child_tree),
resrc_reqst, &new_tree, available);
child_tree = resrc_tree_list_next (children);
}

if (nfound) {
if (*found_tree)
resrc_tree_add_child (*found_tree, new_tree);
else
*found_tree = new_tree;
} else {
resrc_tree_destroy (ctx, new_tree, false, false);
}
}
ret:
return nfound;
Expand Down
97 changes: 78 additions & 19 deletions resrc/resrc_tree.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "resrc_tree.h"
#include "resrc_api_internal.h"
#include "src/common/libutil/xzmalloc.h"
#include "flux/idset.h"

struct resrc_tree_list {
zlist_t *list;
Expand Down Expand Up @@ -181,6 +182,42 @@ int resrc_tree_serialize (json_t *o, resrc_tree_t *resrc_tree)
return rc;
}

static json_t *condense (json_t *m_reduce)
{
char *out = NULL;
json_t *o = NULL;
json_t *v = NULL;
const char *k = NULL;
const char *sp = NULL;
struct idset *s = NULL;

/* An example layout of m_reduce: { "core": "0,1,3" }
* "core" can be any type, which depends on what input
* was given to the the upper layer. Further there can
* be multiple types.
*/
json_object_foreach (m_reduce, k, v) {
if (!(sp = json_string_value (v)))
goto done;
if (!(s = idset_decode (sp)))
goto done;
if (!(out = idset_encode (s, IDSET_FLAG_RANGE)))
goto done;
if (!(o = json_string (out)))
goto done;
idset_destroy (s);
s = NULL;
free (out);
out = NULL;
json_object_set_new (m_reduce, k, o);
}

done:
free (out);
idset_destroy (s);
return m_reduce;
}

int resrc_tree_serialize_lite (json_t *gather, json_t *reduce,
resrc_tree_t *resrc_tree,
resrc_api_map_t *gather_m,
Expand All @@ -194,40 +231,62 @@ int resrc_tree_serialize_lite (json_t *gather, json_t *reduce,
const char *type = NULL;
bool reduce_under_me = false;

if (!resrc_tree)
if (!gather || !reduce || !resrc_tree || !gather_m || !reduce_m)
return -1;

/*
* Preorder
*/
type = resrc_type (resrc_tree->resrc);
if ((val = resrc_api_map_get (reduce_m, type))) {
/* once the resource is reduced, please stop descending */
if (resrc_api_map_get (reduce_m, type)) {
/* During recursion, if my type matches w/ one of the reduce types,
* serialize the resrc in the reduced form.
* Return right away as once the resource is reduced
* in this form, it makes no sense to add children under it.
*/
return resrc_to_json_lite (reduce, resrc_tree->resrc, true);
} else if ((val = resrc_api_map_get (gather_m, type))) {
m_o = Jnew ();
/* gather types require new json to accumulate the subtree */
/* During recursion, if my match matches w/ one of the gather types,
* create new accumulators (m_reduce and m_gather) for
* further subtree walks.
*/
reduce_under_me = ((intptr_t)val == REDUCE_UNDER_ME);
m_reduce = (reduce_under_me)? Jnew () : reduce;
m_gather = (!reduce_under_me)? Jnew_ar () : gather;
rc = resrc_to_json_lite (m_o, resrc_tree->resrc, false);
rc += resrc_to_json_lite (m_o, resrc_tree->resrc, false);
/* If I'm a gather type, I still have to descend */
}

if (resrc_tree_num_children (resrc_tree)) {
resrc_tree_t *r = NULL;
resrc_tree_list_t *rl = resrc_tree->children;
for (r = resrc_tree_list_first (rl); r; r = resrc_tree_list_next (rl))
rc += resrc_tree_serialize_lite (m_gather, m_reduce, r,
gather_m, reduce_m);

if (m_o) {
if (reduce_under_me && json_object_size (m_reduce) == 0)
if (!resrc_tree_num_children (resrc_tree))
goto done;

/*
* Recurse
*/
resrc_tree_t *r = NULL;
resrc_tree_list_t *rl = resrc_tree->children;
for (r = resrc_tree_list_first (rl); r; r = resrc_tree_list_next (rl))
rc += resrc_tree_serialize_lite (m_gather, m_reduce, r, gather_m,
reduce_m);
/*
* Postorder: Now that I have processed my subtree.
*/
if (m_o) {
json_t *co = m_gather;
if (reduce_under_me) {
/* Assuming resrc_tree is well-formed, m_reduce
* accumulator cannot be empty.
*/
if (!json_object_size (m_reduce))
rc += -1;
json_t *co = (reduce_under_me)? m_reduce : m_gather;
json_object_set_new (m_o, "children", co);
co = condense (m_reduce);
}
}

if (m_o)
json_object_set_new (m_o, "children", co);
json_array_append_new (gather, m_o);
}

done:
return rc;
}

Expand Down
4 changes: 2 additions & 2 deletions resrc/resrc_tree.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ int resrc_tree_serialize (json_t *o, resrc_tree_t *resrc_tree);
* \param reduce[out] json object used for reducing resources whose type
* is contained within reduce_m.
* \param resrc_tree resrc_tree object to serialize.
* \param gather_m map that contains resoures types to serialize
* \param gather_m map that contains resoure type(s) to serialize
* in the gather form. If the value of the type is
* REDUCE_UNDER_ME, the first offspring resource type
* under it will be serialized in the reduce form.
* \param reduce_m map that contains resoures types to serialize
* \param reduce_m map that contains resource type(s) to serialize
* in the reduce form.
*
*/
Expand Down
13 changes: 9 additions & 4 deletions sched/sched.c
Original file line number Diff line number Diff line change
Expand Up @@ -721,8 +721,7 @@ static int load_resources (ssrvctx_t *ctx)
goto done;
} else if (build_hwloc_rs2rank (ctx, r_mode) != 0) {
flux_log (ctx->h, LOG_ERR, "failed to build rs2rank");
if (errno != 0)
errno = EINVAL;
errno = EINVAL;
goto done;
} else if (rsreader_force_link2rank (ctx->rsapi, ctx->machs) != 0) {
flux_log (ctx->h, LOG_ERR, "failed to force a link to a rank");
Expand Down Expand Up @@ -1378,11 +1377,17 @@ static int req_tpexec_allocate (ssrvctx_t *ctx, flux_lwj_t *job)
resrc_api_map_t *gmap = resrc_api_map_new ();
resrc_api_map_t *rmap = resrc_api_map_new ();

if (ctx->arg.verbosity > 0) {
flux_log (h, LOG_DEBUG, "job(%"PRId64"): selected resource tree",
job->lwj_id);
dump_resrc_state (ctx->h, job->resrc_tree);
}

resrc_api_map_put (gmap, "node", (void *)(intptr_t)REDUCE_UNDER_ME);
resrc_api_map_put (rmap, "core", (void *)(intptr_t)NONE_UNDER_ME);
if (resrc_tree_serialize_lite (gat, red, job->resrc_tree, gmap, rmap)) {
flux_log (h, LOG_ERR, "%"PRId64" resource serialization failed: %s",
job->lwj_id, strerror (errno));
flux_log (h, LOG_ERR, "job (%"PRId64") resource serialization failed",
job->lwj_id);
goto done;
} else if (resolve_rank (ctx, gat)) {
flux_log (ctx->h, LOG_ERR, "resolving a hostname to rank failed");
Expand Down

0 comments on commit ce8c530

Please sign in to comment.