Skip to content

Commit

Permalink
Merge branch 'master' into issue509-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
milroy committed Dec 5, 2019
2 parents 2804da9 + af4447f commit 0adeef4
Show file tree
Hide file tree
Showing 40 changed files with 3,033 additions and 973 deletions.
1 change: 1 addition & 0 deletions resource/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ libresource_la_SOURCES = \
schema/color.cpp \
traversers/dfu.cpp \
traversers/dfu_impl.cpp \
traversers/dfu_impl_update.cpp \
policies/base/dfu_match_cb.cpp \
policies/base/matcher.cpp \
readers/resource_reader_base.cpp \
Expand Down
50 changes: 25 additions & 25 deletions resource/modules/resource_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ struct resource_ctx_t {
resource_args_t args; /* Module load options */
std::shared_ptr<dfu_match_cb_t> matcher; /* Match callback object */
std::shared_ptr<dfu_traverser_t> traverser; /* Graph traverser object */
resource_graph_db_t db; /* Resource graph data store */
std::shared_ptr<resource_graph_db_t> db; /* Resource graph data store */
std::shared_ptr<f_resource_graph_t> fgraph; /* Filtered graph */
std::shared_ptr<match_writers_t> writers; /* Vertex/Edge writers */
std::shared_ptr<match_writers_t> writers; /* Vertex/Edge writers */
match_perf_t perf; /* Match performance stats */
std::map<uint64_t, std::shared_ptr<job_info_t>> jobs; /* Jobs table */
std::map<uint64_t, uint64_t> allocations; /* Allocation table */
Expand Down Expand Up @@ -167,6 +167,7 @@ static std::shared_ptr<resource_ctx_t> getctx (flux_t *h)
try {
ctx = std::make_shared<resource_ctx_t> ();
ctx->traverser = std::make_shared<dfu_traverser_t> ();
ctx->db = std::make_shared<resource_graph_db_t> ();
} catch (std::bad_alloc &e) {
errno = ENOMEM;
goto done;
Expand Down Expand Up @@ -356,7 +357,7 @@ static int populate_resource_db_file (std::shared_ptr<resource_ctx_t> &ctx,
}
buffer << in_file.rdbuf ();
in_file.close ();
if ( (rc = ctx->db.load (buffer.str (), rd)) < 0) {
if ( (rc = ctx->db->load (buffer.str (), rd)) < 0) {
flux_log (ctx->h, LOG_ERR, "%s: reader: %s",
__FUNCTION__, rd->err_message ().c_str ());
goto done;
Expand All @@ -378,7 +379,7 @@ static int populate_resource_db_kvs (std::shared_ptr<resource_ctx_t> &ctx,
json_t *o = NULL;
flux_t *h = ctx->h;
const char *hwloc_xml = NULL;
resource_graph_db_t &db = ctx->db;
resource_graph_db_t &db = *(ctx->db);
vtx_t v = boost::graph_traits<resource_graph_t>::null_vertex ();

if (flux_get_size (h, &size) == -1) {
Expand All @@ -395,18 +396,18 @@ static int populate_resource_db_kvs (std::shared_ptr<resource_ctx_t> &ctx,
}
o = get_string_blocking (h, k);
hwloc_xml = json_string_value (o);
if ( (rc = ctx->db.load (hwloc_xml, rd, rank)) < 0) {
if ( (rc = db.load (hwloc_xml, rd, rank)) < 0) {
flux_log (ctx->h, LOG_ERR, "%s: reader: %s",
__FUNCTION__, rd->err_message ().c_str ());
goto done;
}
Jput (o);
if (db.metadata.roots->find ("containment") == db.metadata.roots->end ()) {
if (db.metadata.roots.find ("containment") == db.metadata.roots.end ()) {
flux_log (ctx->h, LOG_ERR, "%s: cluster vertex is unavailable",
__FUNCTION__);
goto done;
}
v = db.metadata.roots->at ("containment");
v = db.metadata.roots.at ("containment");

// For the rest of the ranks -- general case
for (rank=1; rank < size; rank++) {
Expand All @@ -417,7 +418,7 @@ static int populate_resource_db_kvs (std::shared_ptr<resource_ctx_t> &ctx,
}
o = get_string_blocking (h, k);
hwloc_xml = json_string_value (o);
if ( (rc = ctx->db.load (hwloc_xml, rd, v, rank)) < 0) {
if ( (rc = db.load (hwloc_xml, rd, v, rank)) < 0) {
flux_log (ctx->h, LOG_ERR, "%s: reader: %s",
__FUNCTION__, rd->err_message ().c_str ());
goto done;
Expand All @@ -438,7 +439,7 @@ static int populate_resource_db (std::shared_ptr<resource_ctx_t> &ctx)
std::shared_ptr<resource_reader_base_t> rd;

if (ctx->args.reserve_vtx_vec != 0)
ctx->db.resource_graph.m_vertices.reserve (ctx->args.reserve_vtx_vec);
ctx->db->resource_graph.m_vertices.reserve (ctx->args.reserve_vtx_vec);
if ( (rd = create_resource_reader (ctx->args.load_format)) == nullptr) {
flux_log (ctx->h, LOG_ERR, "%s: can't create load reader",
__FUNCTION__);
Expand Down Expand Up @@ -494,15 +495,15 @@ static int select_subsystems (std::shared_ptr<resource_ctx_t> &ctx)
size_t found = token.find_first_of (":");
if (found == std::string::npos) {
subsystem = token;
if (!ctx->db.known_subsystem (subsystem)) {
if (!ctx->db->known_subsystem (subsystem)) {
rc = -1;
errno = EINVAL;
goto done;
}
ctx->matcher->add_subsystem (subsystem, "*");
} else {
subsystem = token.substr (0, found);
if (!ctx->db.known_subsystem (subsystem)) {
if (!ctx->db->known_subsystem (subsystem)) {
rc = -1;
errno = EINVAL;
goto done;
Expand All @@ -524,7 +525,7 @@ static std::shared_ptr<f_resource_graph_t> create_filtered_graph (
resource_ctx_t> &ctx)
{
std::shared_ptr<f_resource_graph_t> fg = nullptr;
resource_graph_t &g = ctx->db.resource_graph;
resource_graph_t &g = ctx->db->resource_graph;

try {
// Set vertex and edge maps
Expand Down Expand Up @@ -584,8 +585,7 @@ static int init_resource_graph (std::shared_ptr<resource_ctx_t> &ctx)
}

// Initialize the DFU traverser
if (ctx->traverser->initialize (ctx->fgraph,
ctx->db.metadata.roots, ctx->matcher) < 0) {
if (ctx->traverser->initialize (ctx->fgraph, ctx->db, ctx->matcher) < 0) {
flux_log (ctx->h, LOG_ERR, "%s: traverser initialization",
__FUNCTION__);
return -1;
Expand Down Expand Up @@ -861,8 +861,8 @@ static void stat_request_cb (flux_t *h, flux_msg_handler_t *w,
min = ctx->perf.min;
}
if (flux_respond_pack (h, msg, "{s:I s:I s:f s:I s:f s:f s:f}",
"V", num_vertices (ctx->db.resource_graph),
"E", num_edges (ctx->db.resource_graph),
"V", num_vertices (ctx->db->resource_graph),
"E", num_edges (ctx->db->resource_graph),
"load-time", ctx->perf.load,
"njobs", ctx->perf.njobs,
"min-match", min,
Expand Down Expand Up @@ -936,9 +936,9 @@ static void set_property_request_cb (flux_t *h, flux_msg_handler_t *w,
property_key = keyval.substr (0, pos);
property_value = keyval.substr (pos + 1);

it = ctx->db.metadata.by_path.find (resource_path);
it = ctx->db->metadata.by_path.find (resource_path);

if (it == ctx->db.metadata.by_path.end ()) {
if (it == ctx->db->metadata.by_path.end ()) {
errno = ENOENT;
flux_log_error (h, "%s: Couldn't find %s in resource graph.",
__FUNCTION__, resource_path.c_str ());
Expand All @@ -947,12 +947,12 @@ static void set_property_request_cb (flux_t *h, flux_msg_handler_t *w,

v = it->second;

ret = ctx->db.resource_graph[v].properties.insert (
ret = ctx->db->resource_graph[v].properties.insert (
std::pair<std::string, std::string> (property_key,property_value));

if (ret.second == false) {
ctx->db.resource_graph[v].properties.erase (property_key);
ctx->db.resource_graph[v].properties.insert (
ctx->db->resource_graph[v].properties.erase (property_key);
ctx->db->resource_graph[v].properties.insert (
std::pair<std::string, std::string> (property_key,property_value));
}

Expand Down Expand Up @@ -985,9 +985,9 @@ static void get_property_request_cb (flux_t *h, flux_msg_handler_t *w,
resource_path = rp;
property_key = gp_key;

it = ctx->db.metadata.by_path.find (resource_path);
it = ctx->db->metadata.by_path.find (resource_path);

if (it == ctx->db.metadata.by_path.end ()) {
if (it == ctx->db->metadata.by_path.end ()) {
errno = ENOENT;
flux_log_error (h, "%s: Couldn't find %s in resource graph.",
__FUNCTION__, resource_path.c_str ());
Expand All @@ -996,8 +996,8 @@ static void get_property_request_cb (flux_t *h, flux_msg_handler_t *w,

v = it->second;

for (p_it = ctx->db.resource_graph[v].properties.begin ();
p_it != ctx->db.resource_graph[v].properties.end (); p_it++) {
for (p_it = ctx->db->resource_graph[v].properties.begin ();
p_it != ctx->db->resource_graph[v].properties.end (); p_it++) {

if (property_key.compare (p_it->first) == 0)
resp_value = p_it->second;
Expand Down
17 changes: 17 additions & 0 deletions resource/readers/resource_reader_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,23 @@ class resource_reader_base_t {
virtual int unpack_at (resource_graph_t &g, resource_graph_metadata_t &m,
vtx_t &vtx, const std::string &str, int rank = -1) = 0;

/*! Update resource graph g with str.
*
* \param g resource graph
* \param m resource graph meta data
* \param str resource set string
* \param jobid jobid of str
* \param at start time of this job
* \param dur duration of this job
* \param rsv true if this update is for a reservation.
* \param trav_token
* token to be used by traverser
* \return 0 on success; non-zero integer on an error
*/
virtual int update (resource_graph_t &g, resource_graph_metadata_t &m,
const std::string &str, int64_t jobid, int64_t at,
uint64_t dur, bool rsv, uint64_t trav_token) = 0;

/*! Set the whitelist: only resources that are part of this whitelist
* will be unpacked into the graph.
*
Expand Down
21 changes: 17 additions & 4 deletions resource/readers/resource_reader_grug.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ vtx_t dfs_emitter_t::emit_vertex (ggv_t u, gge_t e, const gg_t &recipe,
resource_graph_t &g = *m_g_p;
resource_graph_metadata_t &m = *m_gm_p;
if (src_v == boost::graph_traits<resource_graph_t>::null_vertex ())
if (m.roots->find (recipe[u].subsystem) != m.roots->end ())
return (*(m.roots))[recipe[u].subsystem];
if (m.roots.find (recipe[u].subsystem) != m.roots.end ())
return (m.roots)[recipe[u].subsystem];

vtx_t v = add_vertex (g);;
std::string pref = "";
Expand All @@ -198,7 +198,8 @@ vtx_t dfs_emitter_t::emit_vertex (ggv_t u, gge_t e, const gg_t &recipe,

if (src_v == boost::graph_traits<resource_graph_t>::null_vertex ()) {
// ROOT vertex of graph
(*(m.roots))[recipe[u].subsystem] = v;
m.roots.emplace (recipe[u].subsystem, v);
m.v_rt_edges.emplace (recipe[u].subsystem, relation_infra_t ());
id = 0;
} else {
id = gen_id (e, recipe, i, sz, j);
Expand All @@ -212,7 +213,7 @@ vtx_t dfs_emitter_t::emit_vertex (ggv_t u, gge_t e, const gg_t &recipe,
g[v].unit = recipe[u].unit;
g[v].schedule.plans = planner_new (0, INT64_MAX,
recipe[u].size, recipe[u].type.c_str ());
g[v].schedule.x_checker = planner_new (0, INT64_MAX,
g[v].idata.x_checker = planner_new (0, INT64_MAX,
X_CHECKER_NJOBS, X_CHECKER_JOBS_STR);
g[v].id = id;
g[v].name = recipe[u].basename + istr;
Expand Down Expand Up @@ -331,6 +332,7 @@ void dfs_emitter_t::tree_edge (gge_t e, const gg_t &recipe)
g[tgt_vtx].paths[recipe[e].e_subsystem]
= g[src_vtx].paths[recipe[e].e_subsystem]
+ "/" + g[tgt_vtx].name;
m.by_path[g[tgt_vtx].paths[recipe[e].e_subsystem]] = tgt_vtx;
g[tgt_vtx].idata.member_of[recipe[e].e_subsystem]
= "*";
emit_edges (e, recipe, src_vtx, tgt_vtx);
Expand Down Expand Up @@ -360,6 +362,7 @@ void dfs_emitter_t::tree_edge (gge_t e, const gg_t &recipe)
g[tgt_vtx].paths[recipe[e].e_subsystem]
= g[src_vtx].paths[recipe[e].e_subsystem]
+ "/" + g[tgt_vtx].name;
m.by_path[g[tgt_vtx].paths[recipe[e].e_subsystem]] = tgt_vtx;
g[tgt_vtx].idata.member_of[recipe[e].e_subsystem]
= "*";
emit_edges (e, recipe, src_vtx, tgt_vtx);
Expand Down Expand Up @@ -461,6 +464,16 @@ int resource_reader_grug_t::unpack_at (resource_graph_t &g,
return -1;
}

int resource_reader_grug_t::update (resource_graph_t &g,
resource_graph_metadata_t &m,
const std::string &str, int64_t jobid,
int64_t at, uint64_t dur, bool rsv,
uint64_t token)
{
errno = ENOTSUP; // GRUG reader currently does not support update
return -1;
}

bool resource_reader_grug_t::is_whitelist_supported ()
{
return false;
Expand Down
17 changes: 17 additions & 0 deletions resource/readers/resource_reader_grug.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,23 @@ class resource_reader_grug_t : public resource_reader_base_t {
virtual int unpack_at (resource_graph_t &g, resource_graph_metadata_t &m,
vtx_t &vtx, const std::string &str, int rank = -1);

/*! Update resource graph g with str.
*
* \param g resource graph
* \param m resource graph meta data
* \param str resource set string
* \param jobid jobid of str
* \param at start time of this job
* \param dur duration of this job
* \param rsv true if this update is for a reservation.
* \param trav_token
* token to be used by traverser
* \return 0 on success; non-zero integer on an error
*/
virtual int update (resource_graph_t &g, resource_graph_metadata_t &m,
const std::string &str, int64_t jobid, int64_t at,
uint64_t dur, bool rsv, uint64_t trav_token);

/*! Is the selected reader format support whitelist
*
* \return false
Expand Down
46 changes: 25 additions & 21 deletions resource/readers/resource_reader_hwloc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ vtx_t resource_reader_hwloc_t::create_cluster_vertex (
graph_traits<resource_graph_t>::
null_vertex (),
0, subsys, "cluster", "cluster", 1);
(*(m.roots))[subsys] = v;
m.roots.emplace (subsys, v);
m.v_rt_edges.emplace (subsys, relation_infra_t ());

return v;
}
Expand Down Expand Up @@ -95,7 +96,7 @@ vtx_t resource_reader_hwloc_t::add_new_vertex (resource_graph_t &g,
g[v].uniq_id = v;
g[v].rank = rank;
g[v].schedule.plans = planner_new (0, INT64_MAX, size, type.c_str ());
g[v].schedule.x_checker = planner_new (0, INT64_MAX,
g[v].idata.x_checker = planner_new (0, INT64_MAX,
X_CHECKER_NJOBS, X_CHECKER_JOBS_STR);
g[v].id = id;
g[v].name = basename + istr;
Expand Down Expand Up @@ -263,25 +264,18 @@ void resource_reader_hwloc_t::walk_hwloc (resource_graph_t &g,
vtx_t v = add_new_vertex (g, m, parent,
id, subsys, type, basename, size, rank);
valid_ancestor = v;

// Create edge between parent/child
if (parent == boost::graph_traits<resource_graph_t>::null_vertex ()) {
// is root
(*(m.roots))[subsys] = v;
} else {
std::string relation = "contains";
std::string rev_relation = "in";
edg_t e;
bool inserted; // set to false when we try and insert a parallel edge

tie (e, inserted) = add_edge (parent, v, g);
g[e].idata.member_of[subsys] = relation;
g[e].name[subsys] = relation;

tie (e, inserted) = add_edge (v, parent, g);
g[e].idata.member_of[subsys] = rev_relation;
g[e].name[subsys] = rev_relation;
}
std::string relation = "contains";
std::string rev_relation = "in";
edg_t e;
bool inserted; // set to false when we try and insert a parallel edge

tie (e, inserted) = add_edge (parent, v, g);
g[e].idata.member_of[subsys] = relation;
g[e].name[subsys] = relation;

tie (e, inserted) = add_edge (v, parent, g);
g[e].idata.member_of[subsys] = rev_relation;
g[e].name[subsys] = rev_relation;
}

for (unsigned int i = 0; i < obj->arity; i++) {
Expand Down Expand Up @@ -364,6 +358,16 @@ int resource_reader_hwloc_t::unpack_at (resource_graph_t &g,
return unpack_internal (g, m, vtx, str, rank);
}

int resource_reader_hwloc_t::update (resource_graph_t &g,
resource_graph_metadata_t &m,
const std::string &str, int64_t jobid,
int64_t at, uint64_t dur, bool rsv,
uint64_t token)
{
errno = ENOTSUP; // GRUG reader currently does not support update
return -1;
}

bool resource_reader_hwloc_t::is_whitelist_supported ()
{
return true;
Expand Down
17 changes: 17 additions & 0 deletions resource/readers/resource_reader_hwloc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,23 @@ class resource_reader_hwloc_t : public resource_reader_base_t {
virtual int unpack_at (resource_graph_t &g, resource_graph_metadata_t &m,
vtx_t &vtx, const std::string &str, int rank = -1);

/*! Update resource graph g with str.
*
* \param g resource graph
* \param m resource graph meta data
* \param str resource set string
* \param jobid jobid of str
* \param at start time of this job
* \param dur duration of this job
* \param rsv true if this update is for a reservation.
* \param trav_token
* token to be used by traverser
* \return 0 on success; non-zero integer on an error
*/
virtual int update (resource_graph_t &g, resource_graph_metadata_t &m,
const std::string &str, int64_t jobid, int64_t at,
uint64_t dur, bool rsv, uint64_t trav_token);

/*! Is the hwloc reader format support whitelist
*
* \return true
Expand Down
Loading

0 comments on commit 0adeef4

Please sign in to comment.