Skip to content

Commit

Permalink
renamed nodeAttributes in PG to fields added constraintParams
Browse files Browse the repository at this point in the history
  • Loading branch information
awicenec committed Aug 25, 2023
1 parent 989e848 commit 0ff471f
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 27 deletions.
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
"dropclass",
"category",
"storage",
"nodeAttributes",
"fields",
"streaming",
"persist",
"rank",
Expand Down
44 changes: 18 additions & 26 deletions daliuge-translator/dlg/dropmake/lg_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ def weight(self):

@weight.setter
def weight(self, default_value):
"""
The weight of a data drop is its volume.
The weight of an app drop is the execution time.
"""
key = []
if self.is_app:
key = [
Expand Down Expand Up @@ -777,31 +781,20 @@ def _update_key_value_attributes(self, kwargs):
"""
get all the arguments from new fields dictionary in a backwards compatible way
"""
kwargs["applicationArgs"] = {}
kwargs["constraintParams"] = {}
if "fields" in self.jd:
self.jd.update({"nodeAttributes": {}})
kwargs.update({"nodeAttributes": {}})
kwargs["fields"] = self.jd["fields"]
for je in self.jd["fields"]:
# The field to be used is not the text, but the name field
self.jd[je["name"]] = je["value"]
kwargs[je["name"]] = je["value"]
self.jd["nodeAttributes"].update({je["name"]: je})
kwargs["nodeAttributes"].update({je["name"]: je})
kwargs[
"applicationArgs"
] = {} # make sure the dict always exists downstream
if "applicationArgs" in self.jd: # and fill it if provided
for je in self.jd["applicationArgs"]:
j = {je["name"]: {k: je[k] for k in je if k not in ["name"]}}
self.jd.update(j)
kwargs["applicationArgs"].update(j)
if "nodeAttributes" not in kwargs:
kwargs.update({"nodeAttributes": {}})
for k, na in kwargs["nodeAttributes"].items():
if (
"parameterType" in na
and na["parameterType"] == "ApplicationArgument"
):
kwargs["applicationArgs"].update({k: na})
if "parameterType" in je:
if je["parameterType"] == "ApplicationArgument":
kwargs["applicationArgs"].update({je["name"]: je})
elif je["parameterType"] == "ConstraintParameter":
kwargs["constraintParams"].update({je["name"]: je})

# NOTE: drop Argxx keywords

def _getPortName(
Expand Down Expand Up @@ -973,7 +966,6 @@ def _create_app_drop(self, drop_spec):
logger.debug("Might be a problem with this node: %s", self.jd)

self.dropclass = app_class
execTime = self.weight
self.jd["dropclass"] = app_class
self.dropclass = app_class
logger.debug(
Expand All @@ -984,17 +976,17 @@ def _create_app_drop(self, drop_spec):
if self.dropclass is None or self.dropclass == "":
logger.warning(f"Something wrong with this node: {self.jd}")
if self.weight is not None:
execTime = self.weight
if execTime < 0:
if self.weight < 0:
raise GraphException(
"Execution_time must be greater"
" than 0 for Node '%s'" % self.name
)
else:
kwargs["weight"] = self.weight
else:
execTime = random.randint(3, 8)
kwargs["weight"] = execTime
kwargs["weight"] = random.randint(3, 8)
if app_class == "dlg.apps.simple.SleepApp":
kwargs["sleep_time"] = execTime
kwargs["sleep_time"] = self.weight

kwargs["dropclass"] = app_class
kwargs["num_cpus"] = int(self.jd.get("num_cpus", 1))
Expand Down

0 comments on commit 0ff471f

Please sign in to comment.