From 0ff471fed605a192efd606f154da5a278d96d5a6 Mon Sep 17 00:00:00 2001 From: Andreas Wicenec Date: Fri, 25 Aug 2023 13:51:42 +0800 Subject: [PATCH] renamed nodeAttributes in PG to fields added constraintParams --- daliuge-engine/dlg/drop.py | 2 +- daliuge-translator/dlg/dropmake/lg_node.py | 44 +++++++++------------- 2 files changed, 19 insertions(+), 27 deletions(-) diff --git a/daliuge-engine/dlg/drop.py b/daliuge-engine/dlg/drop.py index a0d7a6c97..a491cede8 100644 --- a/daliuge-engine/dlg/drop.py +++ b/daliuge-engine/dlg/drop.py @@ -56,7 +56,7 @@ "dropclass", "category", "storage", - "nodeAttributes", + "fields", "streaming", "persist", "rank", diff --git a/daliuge-translator/dlg/dropmake/lg_node.py b/daliuge-translator/dlg/dropmake/lg_node.py index 7cdab7140..4e978bd27 100644 --- a/daliuge-translator/dlg/dropmake/lg_node.py +++ b/daliuge-translator/dlg/dropmake/lg_node.py @@ -354,6 +354,10 @@ def weight(self): @weight.setter def weight(self, default_value): + """ + The weight of a data drop is its volume. + The weight of an app drop is the execution time. + """ key = [] if self.is_app: key = [ @@ -777,31 +781,20 @@ def _update_key_value_attributes(self, kwargs): """ get all the arguments from new fields dictionary in a backwards compatible way """ + kwargs["applicationArgs"] = {} + kwargs["constraintParams"] = {} if "fields" in self.jd: - self.jd.update({"nodeAttributes": {}}) - kwargs.update({"nodeAttributes": {}}) + kwargs["fields"] = self.jd["fields"] for je in self.jd["fields"]: # The field to be used is not the text, but the name field self.jd[je["name"]] = je["value"] kwargs[je["name"]] = je["value"] - self.jd["nodeAttributes"].update({je["name"]: je}) - kwargs["nodeAttributes"].update({je["name"]: je}) - kwargs[ - "applicationArgs" - ] = {} # make sure the dict always exists downstream - if "applicationArgs" in self.jd: # and fill it if provided - for je in self.jd["applicationArgs"]: - j = {je["name"]: {k: je[k] for k in je if k not in ["name"]}} - self.jd.update(j) - kwargs["applicationArgs"].update(j) - if "nodeAttributes" not in kwargs: - kwargs.update({"nodeAttributes": {}}) - for k, na in kwargs["nodeAttributes"].items(): - if ( - "parameterType" in na - and na["parameterType"] == "ApplicationArgument" - ): - kwargs["applicationArgs"].update({k: na}) + if "parameterType" in je: + if je["parameterType"] == "ApplicationArgument": + kwargs["applicationArgs"].update({je["name"]: je}) + elif je["parameterType"] == "ConstraintParameter": + kwargs["constraintParams"].update({je["name"]: je}) + # NOTE: drop Argxx keywords def _getPortName( @@ -973,7 +966,6 @@ def _create_app_drop(self, drop_spec): logger.debug("Might be a problem with this node: %s", self.jd) self.dropclass = app_class - execTime = self.weight self.jd["dropclass"] = app_class self.dropclass = app_class logger.debug( @@ -984,17 +976,17 @@ def _create_app_drop(self, drop_spec): if self.dropclass is None or self.dropclass == "": logger.warning(f"Something wrong with this node: {self.jd}") if self.weight is not None: - execTime = self.weight - if execTime < 0: + if self.weight < 0: raise GraphException( "Execution_time must be greater" " than 0 for Node '%s'" % self.name ) + else: + kwargs["weight"] = self.weight else: - execTime = random.randint(3, 8) - kwargs["weight"] = execTime + kwargs["weight"] = random.randint(3, 8) if app_class == "dlg.apps.simple.SleepApp": - kwargs["sleep_time"] = execTime + kwargs["sleep_time"] = self.weight kwargs["dropclass"] = app_class kwargs["num_cpus"] = int(self.jd.get("num_cpus", 1))