Skip to content

Commit

Permalink
Merge pull request #1149 from jameshcorbett/reader-fix-jgf-properties
Browse files Browse the repository at this point in the history
reader: fix jgf properties
  • Loading branch information
mergify[bot] committed Apr 5, 2024
2 parents 68c636a + f2f2899 commit d0b9ac3
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 49 deletions.
7 changes: 7 additions & 0 deletions resource/readers/resource_reader_jgf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,13 @@ int resource_reader_jgf_t::fill_fetcher (json_t *element, fetch_helper_t &f,
}
*properties = json_object_get (metadata, "properties");
*paths = p;
if (*properties && !json_is_object (*properties)) {
errno = EINVAL;
m_err_msg += __FUNCTION__;
m_err_msg += ": key (properties) must be an object or null for ";
m_err_msg += std::string (f.vertex_id) + ".\n";
goto done;
}
rc = 0;
done:
return rc;
Expand Down
45 changes: 15 additions & 30 deletions src/python/fluxion/resourcegraph/V1.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(
exclusive -- Exclusivity
unit -- Unit of this resource
size -- Amount of individual resources in this resource pool in unit
properties -- Comma-separated list of property strings
properties -- mapping from property name to value
paths -- Fully qualified paths dictionary
status -- Resource status (0 for 'up', 1 for 'down'), defaults to 0
"""
Expand Down Expand Up @@ -127,31 +127,18 @@ def _extract_id_from_hn(self, hostName):
rc = int(postfix[0])
return rc

def _add_to_rankdict(self, key, gLabel, rankdict):
if key in rankdict:
rankdict[key].append(gLabel)
else:
rankdict[key] = [gLabel]

def _contains_any(self, prop_str, charset):
for c in charset:
if c in prop_str:
return True
return False

def _per_rank_property_dict(self, properties, rankdict):
for p in properties:
# This can be extended later to support scheduler specific
# string (@suffix)
if self._contains_any(p, "!&'\"^`|()"):
raise ValueError(f"invalid character used in property={p}")
self._add_to_rankdict("node", p, rankdict)

def _encode_child(self, ppid, hPath, rank, resType, i, rpd):
def _encode_child(self, ppid, hPath, rank, resType, i, properties):
path = f"{hPath}/{resType}{i}"
properties = []
properties = {}
# This can be extended later to support fine grained property
# attachment using rpd
# attachment using properties passed in from parent;
# for now, set empty properties
vtx = FluxionResourcePoolV1(
self._uniqId,
resType,
Expand All @@ -169,14 +156,11 @@ def _encode_child(self, ppid, hPath, rank, resType, i, rpd):
edg = FluxionResourceRelationshipV1(ppid, vtx.get_id())
self._add_and_tick_uniq_id(vtx, edg)

def _encode_rank(self, ppid, rank, children, hList, rdict, rpd):
def _encode_rank(self, ppid, rank, children, hList, rdict, properties):
if rdict[rank] >= len(hList):
raise ValueError(f"nodelist doesn't include node for rank={rank}")
hPath = f"/cluster0/{hList[rdict[rank]]}"
iden = self._extract_id_from_hn(hList[rdict[rank]])
properties = []
if "node" in rpd:
properties = rpd["node"]
vtx = FluxionResourcePoolV1(
self._uniqId,
"node",
Expand All @@ -195,14 +179,13 @@ def _encode_rank(self, ppid, rank, children, hList, rdict, rpd):
self._add_and_tick_uniq_id(vtx, edg)
for key, val in children.items():
for i in IDset(val):
self._encode_child(vtx.get_id(), hPath, rank, str(key), i, rpd)
self._encode_child(vtx.get_id(), hPath, rank, str(key), i, properties)

def _encode_rlite(self, ppid, entry, hList, rdict, pdict):
for rank in list(IDset(entry["rank"])):
rpd = {}
if rank in pdict:
self._per_rank_property_dict(pdict[rank], rpd)
self._encode_rank(ppid, rank, entry["children"], hList, rdict, rpd)
self._encode_rank(
ppid, rank, entry["children"], hList, rdict, pdict.get(rank, {})
)

def _encode(self):
hList = Hostlist(self._rv1NoSched["execution"]["nodelist"])
Expand All @@ -217,7 +200,7 @@ def _encode(self):
True,
"",
1,
[],
{},
"/cluster0",
)
self._add_and_tick_uniq_id(vtx, None)
Expand All @@ -234,11 +217,13 @@ def _encode(self):
props = self._rv1NoSched["execution"]["properties"]
pdict = {}
for p in props:
if self._contains_any(p, "!&'\"^`|()"):
raise ValueError(f"invalid character used in property={p}")
for rank in list(IDset(props[p])):
if rank in pdict:
pdict[rank].append(p)
pdict[rank][p] = ""
else:
pdict[rank] = [p]
pdict[rank] = {p: ""}

for entry in self._rv1NoSched["execution"]["R_lite"]:
self._encode_rlite(vtx.get_id(), entry, hList, rdict, pdict)
33 changes: 32 additions & 1 deletion t/python/t10001-resourcegraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@
},
}

RV1_3 = {
"version": 1,
"execution": {
"R_lite": [{"rank": "0-19", "children": {"gpu": "0-1", "core": "0-7"}}],
"starttime": 0.0,
"expiration": 0.0,
"nodelist": ["compute[0-19]"],
"properties": {"pdebug": "0-9", "pbatch": "10-19"},
},
}


class TestResourceGraph(unittest.TestCase):
"""Test for the ResourceGraph class."""
Expand All @@ -53,7 +64,7 @@ def _check_metadata(self, metadata):
if metadata["type"] in ("node", "core", "gpu", "cluster"):
self.assertEqual(metadata["unit"], "")
self.assertEqual(metadata["size"], 1)
self.assertEqual(metadata["properties"], [])
self.assertEqual(metadata["properties"], {})
else:
raise ValueError(metadata["type"])

Expand All @@ -79,5 +90,25 @@ def test_basic_2(self):
for node in graph.get_nodes():
self._check_metadata(node.get_metadata())

def test_basic_3(self):
graph = FluxionResourceGraphV1(RV1_3)
self.assertTrue(graph.is_directed())
j = graph.to_JSON()
self.assertTrue(j["graph"]["directed"])
self.assertEqual(len(j["graph"]["nodes"]), len(graph.get_nodes()))
self.assertEqual(len(j["graph"]["edges"]), len(graph.get_edges()))
for node in graph.get_nodes():
metadata = node.get_metadata()
if metadata["type"] != "node":
self._check_metadata(node.get_metadata())
else:
self.assertEqual(metadata["unit"], "")
self.assertEqual(metadata["size"], 1)
self.assertEqual(len(metadata["properties"]), 1)
if metadata["id"] < 10:
self.assertEqual(metadata["properties"]["pdebug"], "")
else:
self.assertEqual(metadata["properties"]["pbatch"], "")


unittest.main(testRunner=TAPTestRunner())
36 changes: 18 additions & 18 deletions t/t8001-util-ion-R.t
Original file line number Diff line number Diff line change
Expand Up @@ -159,24 +159,24 @@ test_expect_success 'fluxion-R: can detect insufficient nodelist' '

test_expect_success 'fluxion-R: encoding properties on heterogeneity works' '
cat <<-EOF >expected6 &&
/cluster0 -1 []
/cluster0/foo2 0 ["arm-v9@core"]
/cluster0/foo2/core0 0 []
/cluster0/foo2/core1 0 []
/cluster0/foo2/gpu0 0 []
/cluster0/foo2/gpu1 0 []
/cluster0/foo3 2 ["arm-v9@core","amd-mi60@gpu"]
/cluster0/foo3/core0 2 []
/cluster0/foo3/core1 2 []
/cluster0/foo3/gpu0 2 []
/cluster0/foo3/gpu1 2 []
/cluster0/foo1 3 ["arm-v9@core","amd-mi60@gpu"]
/cluster0/foo1/core0 3 []
/cluster0/foo1/core1 3 []
/cluster0/foo1/gpu0 3 []
/cluster0/foo1/gpu1 3 []
/cluster0/foo4 1 ["arm-v8@core"]
/cluster0/foo4/core0 1 []
/cluster0 -1 {}
/cluster0/foo2 0 {"arm-v9@core":""}
/cluster0/foo2/core0 0 {}
/cluster0/foo2/core1 0 {}
/cluster0/foo2/gpu0 0 {}
/cluster0/foo2/gpu1 0 {}
/cluster0/foo3 2 {"arm-v9@core":"","amd-mi60@gpu":""}
/cluster0/foo3/core0 2 {}
/cluster0/foo3/core1 2 {}
/cluster0/foo3/gpu0 2 {}
/cluster0/foo3/gpu1 2 {}
/cluster0/foo1 3 {"arm-v9@core":"","amd-mi60@gpu":""}
/cluster0/foo1/core0 3 {}
/cluster0/foo1/core1 3 {}
/cluster0/foo1/gpu0 3 {}
/cluster0/foo1/gpu1 3 {}
/cluster0/foo4 1 {"arm-v8@core":""}
/cluster0/foo4/core0 1 {}
EOF
flux R encode -r 0 -c 0-1 -g 0-1 -p "arm-v9@core:0" -H foo2 > out6 &&
flux R encode -r 1 -c 0 -H foo3 -p "arm-v8@core:1" >> out6 &&
Expand Down

0 comments on commit d0b9ac3

Please sign in to comment.