From 6d3d6fb24d5c0cbc2b656e7a0580eeda549ed60f Mon Sep 17 00:00:00 2001 From: pritchardn <21726929@student.uwa.edu.au> Date: Fri, 20 May 2022 13:21:22 +0800 Subject: [PATCH 001/183] Replicating changes from LIU-186 to avoid ~3000 unnecessary commits... Adds REST APIs for MM to add/remove DIMs & Nodes from DIMs Adds zeroconf discovery of DIMs Adds nm_dim_assigner - assigns NMs to DIMs as they come on/off-line Adds associated tests asserting this functionality works when zeroconf is enable, and does not work when disabled. --- daliuge-common/dlg/clients.py | 34 ++++++ .../dlg/manager/composite_manager.py | 4 + daliuge-engine/dlg/manager/proc_daemon.py | 61 ++++++++-- daliuge-engine/dlg/manager/rest.py | 35 +++++- daliuge-engine/dlg/nm_dim_assigner.py | 71 ++++++++++++ daliuge-engine/test/manager/test_daemon.py | 108 +++++++++++++++++- 6 files changed, 297 insertions(+), 16 deletions(-) create mode 100644 daliuge-engine/dlg/nm_dim_assigner.py diff --git a/daliuge-common/dlg/clients.py b/daliuge-common/dlg/clients.py index bfef4ef66..dfa91344e 100644 --- a/daliuge-common/dlg/clients.py +++ b/daliuge-common/dlg/clients.py @@ -280,3 +280,37 @@ def create_island(self, island_host, nodes): self._post_json( "/managers/%s/dataisland" % (quote(island_host)), {"nodes": nodes} ) + + def dims(self): + return self._get_json("/islands") + + def add_dim(self, dim): + self._POST("/islands/%s" % (dim,), content=None) + + def remove_dim(self, dim): + self._DELETE("/islands/%s" % (dim,)) + + def add_node_to_dim(self, dim, nm): + """ + Adds a nm to a dim + """ + self._POST( + "managers/%s/nodes/%s" + % ( + dim, + nm, + ), + content=None, + ) + + def remove_node_from_dim(self, dim, nm): + """ + Removes a nm from a dim + """ + self._DELETE( + "managers/%s/nodes/%s" + % ( + dim, + nm, + ) + ) diff --git a/daliuge-engine/dlg/manager/composite_manager.py b/daliuge-engine/dlg/manager/composite_manager.py index 8e34c9d6b..2de9265fa 100644 --- a/daliuge-engine/dlg/manager/composite_manager.py +++ b/daliuge-engine/dlg/manager/composite_manager.py @@ -211,6 +211,10 @@ def dmHosts(self): def addDmHost(self, host): self._dmHosts.append(host) + def removeDmHost(self, host): + if host in self._dmHosts: + self._dmHosts.remove(host) + @property def nodes(self): return self._nodes[:] diff --git a/daliuge-engine/dlg/manager/proc_daemon.py b/daliuge-engine/dlg/manager/proc_daemon.py index 5244a30a2..e76c0299b 100644 --- a/daliuge-engine/dlg/manager/proc_daemon.py +++ b/daliuge-engine/dlg/manager/proc_daemon.py @@ -37,6 +37,7 @@ from . import constants, client from .. import utils from ..restserver import RestServer +from dlg.nm_dim_assigner import NMAssigner logger = logging.getLogger(__name__) @@ -74,7 +75,6 @@ def __init__(self, master=False, noNM=False, disable_zeroconf=False, verbosity=0 self._shutting_down = False self._verbosity = verbosity - # The three processes we run self._nm_proc = None self._dim_proc = None @@ -84,6 +84,7 @@ def __init__(self, master=False, noNM=False, disable_zeroconf=False, verbosity=0 self._zeroconf = None if disable_zeroconf else zc.Zeroconf() self._nm_info = None self._mm_browser = None + self._mm_dim_browser = None # Starting managers app = self.app @@ -128,6 +129,9 @@ def _stop_zeroconf(self): if self._mm_browser: self._mm_browser.cancel() self._mm_browser.join() + if self._mm_dim_browser: + self._mm_dim_browser.cancel() + self._mm_dim_browser.join() self._zeroconf.close() logger.info("Zeroconf stopped") @@ -207,6 +211,19 @@ def startDIM(self, nodes): logger.info( "Started Data Island Drop Manager with PID %d" % (self._dim_proc.pid) ) + + # Registering the new DIM via zeroconf so it gets discovered + # by the Master Manager + if self._zeroconf: + addrs = utils.get_local_ip_addr() + logger.info("Registering this DIM with zeroconf: %s" % addrs) + self._nm_info = utils.register_service( + self._zeroconf, + "DIM", + socket.gethostname(), + addrs[0][0], + constants.ISLAND_DEFAULT_REST_PORT, + ) return def startMM(self): @@ -220,22 +237,20 @@ def startMM(self): # Also subscribe to zeroconf events coming from NodeManagers and feed # the Master Manager with the new hosts we find if self._zeroconf: - mm_client = client.MasterManagerClient() - node_managers = {} + nm_assigner = NMAssigner() def nm_callback(zeroconf, service_type, name, state_change): info = zeroconf.get_service_info(service_type, name) if state_change is zc.ServiceStateChange.Added: server = socket.inet_ntoa(_get_address(info)) port = info.port - node_managers[name] = (server, port) + nm_assigner.add_nm(name, server, port) logger.info( "Found a new Node Manager on %s:%d, will add it to the MM" % (server, port) ) - mm_client.add_node(server) elif state_change is zc.ServiceStateChange.Removed: - server, port = node_managers[name] + server, port = nm_assigner.NMs[name] logger.info( "Node Manager on %s:%d disappeared, removing it from the MM" % (server, port) @@ -245,14 +260,40 @@ def nm_callback(zeroconf, service_type, name, state_change): # we avoid hanging in here if the MM is down already but # we are trying to remove our NM who has just disappeared if not self._shutting_down: - try: - mm_client.remove_node(server) - finally: - del node_managers[name] + nm_assigner.remove_nm(name) + + def dim_callback(zeroconf, service_type, name, state_change): + info = zeroconf.get_service_info(service_type, name) + if state_change is zc.ServiceStateChange.Added: + server = socket.inet_ntoa(_get_address(info)) + port = info.port + nm_assigner.add_dim(name, server, port) + logger.info( + "Found a new Data Island Manager on %s:%d, will add it to the MM" + % (server, port) + ) + elif state_change is zc.ServiceStateChange.Removed: + server, port = nm_assigner.DIMs[name] + logger.info( + "Data Island Manager on %s:%d disappeared, removing it from the MM" + % (server, port) + ) + + # Don't bother to remove it if we're shutting down. This way + # we avoid hanging in here if the MM is down already but + # we are trying to remove our NM who has just disappeared + if not self._shutting_down: + nm_assigner.remove_dim(name) self._mm_browser = utils.browse_service( self._zeroconf, "NodeManager", "tcp", nm_callback ) + self._mm_dim_browser = utils.browse_service( + self._zeroconf, + "DIM", + "tcp", + dim_callback, # DIM since name must be < 15 bytes + ) logger.info("Zeroconf started") return diff --git a/daliuge-engine/dlg/manager/rest.py b/daliuge-engine/dlg/manager/rest.py index 5d1af58bf..56e3bfb45 100644 --- a/daliuge-engine/dlg/manager/rest.py +++ b/daliuge-engine/dlg/manager/rest.py @@ -576,12 +576,15 @@ def visualizeDIM(self): class MasterManagerRestServer(CompositeManagerRestServer): def initializeSpecifics(self, app): CompositeManagerRestServer.initializeSpecifics(self, app) - + # DIM manamagement + app.post("/api/islands/", callback=self.addDIM) + app.delete("/api/islands/", callback=self.removeDIM) # Query forwarding to daemons - app.post("/api/managers//dataisland", callback=self.createDataIsland) app.post("/api/managers//node/start", callback=self.startNM) app.post("/api/managers//node/stop", callback=self.stopNM) + app.post("/api/managers//nodes/", callback=self.addNM) + app.delete("/api/managers//nodes/", callback=self.removeNM) # Querying about managers app.get("/api/islands", callback=self.getDIMs) app.get("/api/nodes", callback=self.getNMs) @@ -601,6 +604,16 @@ def createDataIsland(self, host): def getDIMs(self): return {"islands": self.dm.dmHosts} + @daliuge_aware + def addDIM(self, dim): + logger.debug("Adding DIM %s" % dim) + self.dm.addDmHost(dim) + + @daliuge_aware + def removeDIM(self, dim): + logger.debug("Removing dim %s" % dim) + self.dm.removeDmHost(dim) + @daliuge_aware def getNMs(self): return {"nodes": self.dm.nodes} @@ -619,6 +632,24 @@ def stopNM(self, host): with RestClient(host=host, port=port, timeout=10) as c: return json.loads(c._POST("/managers/node/stop").read()) + @daliuge_aware + def addNM(self, host, node): + port = constants.ISLAND_DEFAULT_REST_PORT + logger.debug("Adding NM %s to DIM %s" % (node, host)) + with RestClient(host=host, port=port, timeout=10, url_prefix="/api") as c: + return json.loads( + c._POST( + "/nodes/%s" % (node,), + ).read() + ) + + @daliuge_aware + def removeNM(self, host, node): + port = constants.ISLAND_DEFAULT_REST_PORT + logger.debug("Removing NM %s from DIM %s" % (node, host)) + with RestClient(host=host, port=port, timeout=10, url_prefix="/api") as c: + return json.loads(c._DELETE("/nodes/%s" % (node,)).read()) + @daliuge_aware def getNMInfo(self, host): port = constants.DAEMON_DEFAULT_REST_PORT diff --git a/daliuge-engine/dlg/nm_dim_assigner.py b/daliuge-engine/dlg/nm_dim_assigner.py new file mode 100644 index 000000000..f8dd98d3d --- /dev/null +++ b/daliuge-engine/dlg/nm_dim_assigner.py @@ -0,0 +1,71 @@ +import logging +from dlg.manager import client + +logger = logging.getLogger(__name__) + + +class NMAssigner: + """ + Handles the logic of assigning / removing NMs from DIMs && handling new DIMs coming on/off-line. + Maintains a leger of DIMs and NMs as name -> (server, port) mappings. + Maintains a mapping of NMs to DIMs + Allocation logic is currently very simple, handling only the case of a single DIM, but with + room for more advanced load-balancing, if we require in the future. + """ + def __init__(self): + self.DIMs = {} # Set of DIMs name -> (server, port) + self.NMs = {} # Set of NMs name -> (server, port) + self.assignments = {} # Maps NMs to DIMs + self.mm_client = client.MasterManagerClient() + + def add_dim(self, name, server, port): + self.DIMs[name] = (server, port) + self.mm_client.add_dim(server) + self.allocate_nms() + + def add_nm(self, name, server, port): + self.NMs[name] = (server, port) + self.mm_client.add_node(server) + self.allocate_nms() + + def remove_dim(self, name): + server, port = self.DIMs[name] + try: + for nm in self.NMs: + if self.assignments[nm] == server: + del self.assignments[nm] + self.mm_client.remove_dim(server) + finally: + del self.DIMs[name] + return server, port + + def remove_nm(self, name): + server, _ = self.NMs[name] + try: + if name in self.assignments: + dim_ip = self.assignments[name] + nm_ip = self.NMs[name][0] + self.mm_client.remove_node_from_dim(dim_ip, nm_ip) + self.mm_client.remove_node(server) + finally: + del self.NMs[name] + + def allocate_nms(self): + if self.DIMs == {}: + logger.info("We have no DIMs") + elif len(self.DIMs.keys()) == 1: + dim_ip = list(self.DIMs.values())[0][0] # IP of the first (only) DIM + for nm in self.NMs: + nm_ip = self.NMs[nm][0] + if nm not in self.assignments: + logger.info("Adding NM %s to DIM %s", nm_ip, dim_ip) + self.mm_client.add_node_to_dim(dim_ip, nm_ip) + self.assignments[nm] = dim_ip + elif self.assignments[nm] not in self.DIMs: # If we've removed a DIM + logger.info("Re-assigning %s to DIM %s", nm_ip, dim_ip) + self.mm_client.add_node_to_dim(dim_ip, nm_ip) + self.assignments[nm] = dim_ip + else: # We have lots of DIMs + # Will do nothing, it's up to the user/deployer to handle this. + logger.info("Multiple DIMs, handle node assignments externally.") + pass diff --git a/daliuge-engine/test/manager/test_daemon.py b/daliuge-engine/test/manager/test_daemon.py index 59a090a8e..962665ae1 100644 --- a/daliuge-engine/test/manager/test_daemon.py +++ b/daliuge-engine/test/manager/test_daemon.py @@ -27,7 +27,7 @@ from dlg import utils, restutils from dlg.manager import constants -from dlg.manager.client import MasterManagerClient +from dlg.manager.client import MasterManagerClient, DataIslandManagerClient from dlg.manager.proc_daemon import DlgDaemon from six.moves import http_client as httplib # @UnresolvedImport @@ -128,6 +128,79 @@ def test_zeroconf_discovery(self): "MasterManager didn't find the NodeManager running on the same node", ) + def _test_zeroconf_dim_mm(self, disable_zeroconf=False): + # Start daemon with no master and no NM + self.create_daemon(master=False, noNM=True, disable_zeroconf=disable_zeroconf) + # Start DIM - now, on it's own + self._start("island", http.HTTPStatus.OK, {"nodes": []}) + # Start daemon with master but no NM + self._start("master", http.HTTPStatus.OK) + # Check that dim registers to MM + timeout_time = time.time() + _TIMEOUT + dims = None + while time.time() < timeout_time: + dims = self._get_dims_from_master(_TIMEOUT) + if dims is not None and len(dims["islands"]) > 0: + break + time.sleep(0.1) + self.assertIsNotNone(dims) + return dims + + def test_zeroconf_dim_mm(self): + dims = self._test_zeroconf_dim_mm(disable_zeroconf=False) + self.assertEqual( + 1, + len(dims["islands"]), + "MasterManager didn't find the DataIslandManager with zeroconf", + ) + + def test_without_zeroconf_dim_mm(self): + dims = self._test_zeroconf_dim_mm(disable_zeroconf=True) + self.assertEqual( + 0, + len(dims["islands"]), + "MasterManager found the DataIslandManager without zeroconf!?", + ) + + def _add_zeroconf_nm(self): + self._start("node", http.HTTPStatus.OK) + timeout_time = time.time() + _TIMEOUT + nodes = None + while time.time() < timeout_time: + nodes = self._get_nodes_from_dim(_TIMEOUT) + if nodes is not None and len(nodes) > 0: + break + time.sleep(0.1) + return nodes + + def test_zeroconf_dim_nm_setup(self): + """ + Sets up a mm with a node manager + Sets up a DIM with zeroconf discovery + Asserts that the mm attaches the nm to the discovered dim + """ + self._test_zeroconf_dim_mm(disable_zeroconf=False) + nodes = self._add_zeroconf_nm() + self.assertIsNotNone(nodes) + + def test_without_zeroconf_dim_nm_setup(self): + self._test_zeroconf_dim_mm(disable_zeroconf=True) + nodes = self._add_zeroconf_nm() + self.assertIsNone(nodes) + + def test_zeroconf_nm_down(self): + self._test_zeroconf_dim_mm(disable_zeroconf=False) + nodes = self._add_zeroconf_nm() + self.assertIsNotNone(nodes) + self._stop("node", http.HTTPStatus.OK) + timeout_time = time.time() + _TIMEOUT + while time.time() < timeout_time: + nodes = self._get_nodes_from_dim(_TIMEOUT) + if nodes is None: + break + time.sleep(0.1) + self.assertIsNone(nodes) + def test_start_dataisland_via_rest(self): self.create_daemon(master=True, noNM=False, disable_zeroconf=False) @@ -222,6 +295,17 @@ def test_start_stop_master_via_rest(self): "The MM did not stop successfully", ) + def test_get_dims(self): + self.create_daemon(master=True, noNM=True, disable_zeroconf=False) + # Check that the DataIsland starts with the given nodes + dims = self._get_dims_from_master(_TIMEOUT) + self.assertIsNotNone(dims) + self.assertEqual( + 0, + len(dims["islands"]), + "MasterManager didn't find the DataIslandManager running on the same node", + ) + def _start(self, manager_name, expected_code, payload=None): conn = http.client.HTTPConnection("localhost", 9000) headers = {} @@ -253,11 +337,27 @@ def _stop(self, manager_name, expected_code, payload=None): response.close() conn.close() - def _get_nodes_from_master(self, timeout): - mc = MasterManagerClient() + def _get_nodes_from_client(self, timeout, client): timeout_time = time.time() + timeout while time.time() < timeout_time: - nodes = mc.nodes() + nodes = client.nodes() if nodes: return nodes time.sleep(0.1) + + def _get_nodes_from_master(self, timeout): + mc = MasterManagerClient() + return self._get_nodes_from_client(timeout, mc) + + def _get_nodes_from_dim(self, timeout): + dimc = DataIslandManagerClient() + return self._get_nodes_from_client(timeout, dimc) + + def _get_dims_from_master(self, timeout): + mc = MasterManagerClient() + timeout_time = time.time() + timeout + while time.time() < timeout_time: + dims = mc.dims() + if dims: + return dims + time.sleep(0.1) From f55d7e9f2386e07189a648fd419985068cc2d9dd Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Mon, 23 May 2022 12:30:33 +0800 Subject: [PATCH 002/183] handle param attributes in base class --- daliuge-engine/dlg/apps/pyfunc.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/daliuge-engine/dlg/apps/pyfunc.py b/daliuge-engine/dlg/apps/pyfunc.py index 0deb323e9..9f50b0cbb 100644 --- a/daliuge-engine/dlg/apps/pyfunc.py +++ b/daliuge-engine/dlg/apps/pyfunc.py @@ -297,23 +297,15 @@ def initialize(self, **kwargs): "func_defaults" ] for kw in self.func_def_keywords: - dum_arg = new_arg = "gIbbERiSH:askldhgol" if kw in self._applicationArgs: # these are the preferred ones now if isinstance( self._applicationArgs[kw]["value"], bool - ): # always transfer booleans - new_arg = self._applicationArgs.pop(kw) - elif ( - self._applicationArgs[kw]["value"] + or self._applicationArgs[kw]["value"] or self._applicationArgs[kw]["precious"] ): # only transfer if there is a value or precious is True - new_arg = self._applicationArgs.pop(kw) + self._applicationArgs.pop(kw) - if new_arg != dum_arg: - logger.debug(f"Setting {kw} to {new_arg['value']}") - # we allow python expressions as values, means that strings need to be quoted - self.__setattr__(kw, new_arg["value"]) self.num_args = len( self._applicationArgs From 1b5cb0dede1c73b0b489eb7c7569df9970a3a8ae Mon Sep 17 00:00:00 2001 From: Nicholas Pritchard <30886786+pritchardn@users.noreply.github.com> Date: Mon, 23 May 2022 12:59:17 +0800 Subject: [PATCH 003/183] Updates blockdags.rst with storage note Remarks on where reprodata is stored throughout the translation and after execution. --- docs/architecture/reproducibility/blockdags.rst | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/docs/architecture/reproducibility/blockdags.rst b/docs/architecture/reproducibility/blockdags.rst index c6d866bc6..84f57b932 100644 --- a/docs/architecture/reproducibility/blockdags.rst +++ b/docs/architecture/reproducibility/blockdags.rst @@ -7,7 +7,17 @@ The fundamental primitive powering workflow signatures are Merkle trees and Bloc acyclic graphs (BlockDAGs). These data structures cryptographically compress provenance and structural information. We describe the primitives of our approach and then their combination. -The most relevant code directory is found under ``dlg.common.reproducibility`` +The most relevant code directory is found under ``dlg.common.reproducibility``. + +Provenance data is stored internally within the graph data-structure throughout translation and execution. + +In the logical graph structure (dictionary) this information is keyed under 'reprodata'. +In the physical graph (template) structure this information is appended to the end of the droplist. + +Following graph execution, the reprodata is written to a log file, alongside the associated execution logs ($DLG_ROOT/logs). + +If the specified rmode is 'NOTHING', no reprodata is appended at any stage in translation and execution. + Merkle Trees ------------ From 0f87ddbb81f7062b2492cc6a56a6f8ea605bfb3d Mon Sep 17 00:00:00 2001 From: Nicholas Pritchard <30886786+pritchardn@users.noreply.github.com> Date: Mon, 23 May 2022 14:12:08 +0800 Subject: [PATCH 004/183] Updates rmodes.rst Indicates how to avoid signature generation --- docs/architecture/reproducibility/rmodes.rst | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/docs/architecture/reproducibility/rmodes.rst b/docs/architecture/reproducibility/rmodes.rst index f5378f41d..fbf523bb1 100644 --- a/docs/architecture/reproducibility/rmodes.rst +++ b/docs/architecture/reproducibility/rmodes.rst @@ -7,10 +7,14 @@ Each drop's provenance information defines what a workflow signature claims. Inspired and extending current workflow literature, we define seven R-modes. R-mode selection occurs when submitting a workflow to |daliuge| for initial filling and unrolling; |daliuge| handles everything else automatically. + Additionally, the ALL mode will generate a signature structure containing separate hash graphs for all supported modes, which is a good choice when experimenting with new workflow concepts or certifying a particular workflow version. +Conversely, the NOTHING option avoids all provenance collection and processing, which may be of performance interest. +For now, this is also the default option if no rmode is specified. + Rerunning --------- A workflow reruns another if they execute the same logical workflow; their logical components and @@ -75,4 +79,4 @@ A total replica repeats and reproduces a workflow execution. Total replicas allow for independent verification of results, adding direct credibility to results coming from a workflow. Moreover, if a workflow's original deployment environment is unavailable, a total replica is -the most robust assertion possibly placed on a workflow. \ No newline at end of file +the most robust assertion possibly placed on a workflow. From 45916a20dd9d7c4c1da2b76d0846b01e07e0daa3 Mon Sep 17 00:00:00 2001 From: pritchardn <21726929@student.uwa.edu.au> Date: Mon, 23 May 2022 14:47:01 +0800 Subject: [PATCH 005/183] Skipped and cancelled drops now file reproducibility data. Updates InMemoryDROP reproduce_data function to default to b"" if skipped or cancelled. --- daliuge-engine/dlg/drop.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/daliuge-engine/dlg/drop.py b/daliuge-engine/dlg/drop.py index 5b71f064a..a6e83c863 100644 --- a/daliuge-engine/dlg/drop.py +++ b/daliuge-engine/dlg/drop.py @@ -1118,12 +1118,14 @@ def cancel(self): if self.status in [DROPStates.INITIALIZED, DROPStates.WRITING]: self._closeWriters() self.status = DROPStates.CANCELLED + self.completedrop() def skip(self): """Moves this drop to the SKIPPED state closing any writers we opened""" if self.status in [DROPStates.INITIALIZED, DROPStates.WRITING]: self._closeWriters() self.status = DROPStates.SKIPPED + self.completedrop() @property def node(self): @@ -1830,7 +1832,11 @@ def dataURL(self) -> str: def generate_reproduce_data(self): from .droputils import allDropContents - data = allDropContents(self, self.size) + data = b"" + try: + data = allDropContents(self, self.size) + except Exception: + logger.debug("Could not read drop reproduce data") return {"data_hash": common_hash(data)} From 3661a544df74e0a560445708d4232136c801d437 Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Mon, 23 May 2022 16:37:18 +0800 Subject: [PATCH 006/183] tested rascil --- daliuge-engine/dlg/apps/pyfunc.py | 5 ++--- daliuge-engine/docker/Dockerfile.devall | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/daliuge-engine/dlg/apps/pyfunc.py b/daliuge-engine/dlg/apps/pyfunc.py index 9f50b0cbb..8fb542b99 100644 --- a/daliuge-engine/dlg/apps/pyfunc.py +++ b/daliuge-engine/dlg/apps/pyfunc.py @@ -383,7 +383,7 @@ def run(self): # Inputs are un-pickled and treated as the arguments of the function # Their order must be preserved, so we use an OrderedDict if DropParser(self.input_parser) is DropParser.PICKLE: - all_contents = lambda x: pickle.loads(x) + all_contents = lambda x: pickle.loads(droputils.allDropContents(x)) elif DropParser(self.input_parser) is DropParser.EVAL: all_contents = lambda x: ast.literal_eval(droputils.allDropContents(x).decode('utf-8')) elif DropParser(self.input_parser) is DropParser.PATH: @@ -395,9 +395,8 @@ def run(self): inputs = collections.OrderedDict() for uid, drop in self._inputs.items(): - contents = droputils.allDropContents(drop) # allow for Null DROPs to be passed around - inputs[uid] = all_contents(contents) if contents else None + inputs[uid] = all_contents(drop) if all_contents is not None else None self.funcargs = {} diff --git a/daliuge-engine/docker/Dockerfile.devall b/daliuge-engine/docker/Dockerfile.devall index 3a98e586b..32cdc2425 100644 --- a/daliuge-engine/docker/Dockerfile.devall +++ b/daliuge-engine/docker/Dockerfile.devall @@ -38,8 +38,8 @@ RUN apt install -y git python3-dev # RASCIL # RUN mkdir -p /tmp/rascil_data && cd /tmp/rascil_data &&\ -# curl https://ska-telescope.gitlab.io/external/rascil/rascil_data.tgz -o rascil_data.tgz -# RUN tar zxf rascil_data.tgz -C /dlg/lib/python3.8/site-packages +# curl https://ska-telescope.gitlab.io/external/rascil/rascil_data.tgz -o rascil_data.tgz &&\ +# tar zxf rascil_data.tgz -C /dlg/lib/python3.8/site-packages # RUN pip install --index-url=https://artefact.skao.int/repository/pypi-all/simple rascil CMD ["dlg", "daemon", "-vv"] \ No newline at end of file From b7f3ab541be6048618180f4a73afde9d27c7a422 Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Mon, 23 May 2022 17:12:58 +0800 Subject: [PATCH 007/183] update test graph --- daliuge-engine/test/graphs/compilePG.graph | 37 ++++++++++++++++++---- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/daliuge-engine/test/graphs/compilePG.graph b/daliuge-engine/test/graphs/compilePG.graph index 23380ebb5..830317918 100644 --- a/daliuge-engine/test/graphs/compilePG.graph +++ b/daliuge-engine/test/graphs/compilePG.graph @@ -37,15 +37,38 @@ "options": [], "positional": false }, - "pickle": { - "text": "Pickle", - "value": false, - "defaultValue": "false", - "description": "Whether the python arguments are pickled.", + "input_parser": { + "text": "Input Parser", + "value": "eval", + "defaultValue": "pickle", + "description": "Input port parsing technique", "readonly": false, - "type": "Boolean", + "type": "Select", "precious": false, - "options": [], + "options": [ + "pickle", + "eval", + "path", + "dataurl", + "npy" + ], + "positional": false + }, + "output_parser": { + "text": "Output Parser", + "value": "eval", + "defaultValue": "pickle", + "description": "Output port parsing technique", + "readonly": false, + "type": "Select", + "precious": false, + "options": [ + "pickle", + "eval", + "path", + "dataurl", + "npy" + ], "positional": false }, "func_defaults": { From 3cefef1c0cd217a2c578ff30eac026d9b9d43f87 Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Mon, 23 May 2022 17:54:15 +0800 Subject: [PATCH 008/183] use pickle in test graph --- daliuge-engine/dlg/apps/pyfunc.py | 4 ++-- daliuge-engine/test/graphs/compilePG.graph | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/daliuge-engine/dlg/apps/pyfunc.py b/daliuge-engine/dlg/apps/pyfunc.py index 8fb542b99..6330277ef 100644 --- a/daliuge-engine/dlg/apps/pyfunc.py +++ b/daliuge-engine/dlg/apps/pyfunc.py @@ -395,8 +395,8 @@ def run(self): inputs = collections.OrderedDict() for uid, drop in self._inputs.items(): - # allow for Null DROPs to be passed around - inputs[uid] = all_contents(drop) if all_contents is not None else None + # TODO: allow for Null DROPs to be passed around + inputs[uid] = all_contents(drop) self.funcargs = {} diff --git a/daliuge-engine/test/graphs/compilePG.graph b/daliuge-engine/test/graphs/compilePG.graph index 830317918..2233e3508 100644 --- a/daliuge-engine/test/graphs/compilePG.graph +++ b/daliuge-engine/test/graphs/compilePG.graph @@ -39,7 +39,7 @@ }, "input_parser": { "text": "Input Parser", - "value": "eval", + "value": "pickle", "defaultValue": "pickle", "description": "Input port parsing technique", "readonly": false, @@ -56,7 +56,7 @@ }, "output_parser": { "text": "Output Parser", - "value": "eval", + "value": "pickle", "defaultValue": "pickle", "description": "Output port parsing technique", "readonly": false, From 1cdc1460f96312a00404241fb29ac4fe2bc7914e Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Mon, 23 May 2022 18:21:29 +0800 Subject: [PATCH 009/183] use ast --- daliuge-engine/test/graphs/compilePG.graph | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/daliuge-engine/test/graphs/compilePG.graph b/daliuge-engine/test/graphs/compilePG.graph index 2233e3508..830317918 100644 --- a/daliuge-engine/test/graphs/compilePG.graph +++ b/daliuge-engine/test/graphs/compilePG.graph @@ -39,7 +39,7 @@ }, "input_parser": { "text": "Input Parser", - "value": "pickle", + "value": "eval", "defaultValue": "pickle", "description": "Input port parsing technique", "readonly": false, @@ -56,7 +56,7 @@ }, "output_parser": { "text": "Output Parser", - "value": "pickle", + "value": "eval", "defaultValue": "pickle", "description": "Output port parsing technique", "readonly": false, From a46dab75059b77393bf6a6091d14e0bb36a62bb0 Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Mon, 23 May 2022 18:33:19 +0800 Subject: [PATCH 010/183] handle null case --- daliuge-engine/dlg/apps/pyfunc.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/daliuge-engine/dlg/apps/pyfunc.py b/daliuge-engine/dlg/apps/pyfunc.py index 6330277ef..92bdfe451 100644 --- a/daliuge-engine/dlg/apps/pyfunc.py +++ b/daliuge-engine/dlg/apps/pyfunc.py @@ -23,6 +23,7 @@ import ast import base64 +from calendar import c import collections from enum import Enum import importlib @@ -36,7 +37,7 @@ from contextlib import redirect_stdout from dlg import droputils, utils -from dlg.drop import BarrierAppDROP +from dlg.drop import BarrierAppDROP, NullDROP from dlg.exceptions import InvalidDropException from dlg.meta import ( dlg_bool_param, @@ -385,7 +386,10 @@ def run(self): if DropParser(self.input_parser) is DropParser.PICKLE: all_contents = lambda x: pickle.loads(droputils.allDropContents(x)) elif DropParser(self.input_parser) is DropParser.EVAL: - all_contents = lambda x: ast.literal_eval(droputils.allDropContents(x).decode('utf-8')) + def astparse(x): + content = droputils.allDropContents(x) + return ast.literal_eval(content.decode('utf-8')) if content is not None else None + all_contents = astparse elif DropParser(self.input_parser) is DropParser.PATH: all_contents = lambda x: x.path elif DropParser(self.input_parser) is DropParser.DATAURL: @@ -396,7 +400,7 @@ def run(self): inputs = collections.OrderedDict() for uid, drop in self._inputs.items(): # TODO: allow for Null DROPs to be passed around - inputs[uid] = all_contents(drop) + inputs[uid] = all_contents(drop) if not isinstance(drop, NullDROP) else None self.funcargs = {} From cd32e3fff1f18a5accdf95ccffc04cd78ef3ddbd Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Mon, 23 May 2022 18:48:54 +0800 Subject: [PATCH 011/183] check for falsy drop --- daliuge-engine/dlg/apps/pyfunc.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/daliuge-engine/dlg/apps/pyfunc.py b/daliuge-engine/dlg/apps/pyfunc.py index 92bdfe451..a045213cd 100644 --- a/daliuge-engine/dlg/apps/pyfunc.py +++ b/daliuge-engine/dlg/apps/pyfunc.py @@ -387,8 +387,9 @@ def run(self): all_contents = lambda x: pickle.loads(droputils.allDropContents(x)) elif DropParser(self.input_parser) is DropParser.EVAL: def astparse(x): - content = droputils.allDropContents(x) - return ast.literal_eval(content.decode('utf-8')) if content is not None else None + content: bytes = droputils.allDropContents(x) + print(content) + return ast.literal_eval(content.decode('utf-8')) if content else None all_contents = astparse elif DropParser(self.input_parser) is DropParser.PATH: all_contents = lambda x: x.path From 93cf42d7a33da8ebb207ffbcc9d5b65b5d376a6f Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Mon, 23 May 2022 19:00:33 +0800 Subject: [PATCH 012/183] cleanup --- daliuge-engine/dlg/apps/pyfunc.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/daliuge-engine/dlg/apps/pyfunc.py b/daliuge-engine/dlg/apps/pyfunc.py index a045213cd..a3b97fa19 100644 --- a/daliuge-engine/dlg/apps/pyfunc.py +++ b/daliuge-engine/dlg/apps/pyfunc.py @@ -23,7 +23,6 @@ import ast import base64 -from calendar import c import collections from enum import Enum import importlib @@ -37,7 +36,7 @@ from contextlib import redirect_stdout from dlg import droputils, utils -from dlg.drop import BarrierAppDROP, NullDROP +from dlg.drop import BarrierAppDROP from dlg.exceptions import InvalidDropException from dlg.meta import ( dlg_bool_param, @@ -388,7 +387,6 @@ def run(self): elif DropParser(self.input_parser) is DropParser.EVAL: def astparse(x): content: bytes = droputils.allDropContents(x) - print(content) return ast.literal_eval(content.decode('utf-8')) if content else None all_contents = astparse elif DropParser(self.input_parser) is DropParser.PATH: @@ -401,7 +399,7 @@ def astparse(x): inputs = collections.OrderedDict() for uid, drop in self._inputs.items(): # TODO: allow for Null DROPs to be passed around - inputs[uid] = all_contents(drop) if not isinstance(drop, NullDROP) else None + inputs[uid] = all_contents(drop) self.funcargs = {} From b237d4f30d880acb36ad6b157dfbb9e6c4c1924f Mon Sep 17 00:00:00 2001 From: Andreas Wicenec Date: Tue, 24 May 2022 10:43:06 +0800 Subject: [PATCH 013/183] Fixed bug when building dev without common version --- daliuge-translator/build_translator.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/daliuge-translator/build_translator.sh b/daliuge-translator/build_translator.sh index 0fa60aec3..1bbce604b 100755 --- a/daliuge-translator/build_translator.sh +++ b/daliuge-translator/build_translator.sh @@ -16,6 +16,7 @@ case "$1" in echo "Build finished!" exit 0 ;; "dev") + C_TAG="master" [[ ! -z "$2" ]] && C_TAG=$2 export VCS_TAG=$DEV_TAG echo "Building daliuge-translator development version using daliuge-common:${C_TAG}" From 980bd2fcec20196d1f897d986da6af30dc0dce6e Mon Sep 17 00:00:00 2001 From: Andreas Wicenec Date: Tue, 24 May 2022 10:45:26 +0800 Subject: [PATCH 014/183] added git to standard build --- daliuge-common/docker/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daliuge-common/docker/Dockerfile b/daliuge-common/docker/Dockerfile index 8750cab18..c30474e35 100644 --- a/daliuge-common/docker/Dockerfile +++ b/daliuge-common/docker/Dockerfile @@ -8,7 +8,7 @@ ARG BUILD_ID LABEL stage=builder LABEL build=$BUILD_ID RUN apt-get update && \ - apt-get install -y gcc python3 python3.8-venv python3-pip python3-distutils python3-appdirs libmetis-dev curl && \ + apt-get install -y gcc python3 python3.8-venv python3-pip python3-distutils python3-appdirs libmetis-dev curl git sudo && \ apt-get clean COPY / /daliuge From 0243f4aec6b1334c770bdc21c71ac84a1a8d5912 Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Tue, 24 May 2022 12:36:45 +0800 Subject: [PATCH 015/183] remove todo --- daliuge-engine/dlg/apps/pyfunc.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/daliuge-engine/dlg/apps/pyfunc.py b/daliuge-engine/dlg/apps/pyfunc.py index a3b97fa19..31bc301d3 100644 --- a/daliuge-engine/dlg/apps/pyfunc.py +++ b/daliuge-engine/dlg/apps/pyfunc.py @@ -386,6 +386,8 @@ def run(self): all_contents = lambda x: pickle.loads(droputils.allDropContents(x)) elif DropParser(self.input_parser) is DropParser.EVAL: def astparse(x): + # Null and Empty Drops will return an empty byte string + # which should propogate back to None content: bytes = droputils.allDropContents(x) return ast.literal_eval(content.decode('utf-8')) if content else None all_contents = astparse @@ -398,7 +400,6 @@ def astparse(x): inputs = collections.OrderedDict() for uid, drop in self._inputs.items(): - # TODO: allow for Null DROPs to be passed around inputs[uid] = all_contents(drop) self.funcargs = {} From 70eb047389199bce6a84727e0d7825c8c7295594 Mon Sep 17 00:00:00 2001 From: Nicholas Pritchard <30886786+pritchardn@users.noreply.github.com> Date: Wed, 25 May 2022 10:27:22 +0800 Subject: [PATCH 016/183] f-string formatting Co-authored-by: rtobar --- daliuge-common/dlg/clients.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daliuge-common/dlg/clients.py b/daliuge-common/dlg/clients.py index dfa91344e..7d7345e27 100644 --- a/daliuge-common/dlg/clients.py +++ b/daliuge-common/dlg/clients.py @@ -285,7 +285,7 @@ def dims(self): return self._get_json("/islands") def add_dim(self, dim): - self._POST("/islands/%s" % (dim,), content=None) + self._POST(f"/islands/{dim}", content=None) def remove_dim(self, dim): self._DELETE("/islands/%s" % (dim,)) From 272192946c25dbde6f225a4208fa1bcc135390e8 Mon Sep 17 00:00:00 2001 From: Nicholas Pritchard <30886786+pritchardn@users.noreply.github.com> Date: Wed, 25 May 2022 10:28:05 +0800 Subject: [PATCH 017/183] log strinrg formatting Co-authored-by: rtobar --- daliuge-engine/dlg/manager/proc_daemon.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daliuge-engine/dlg/manager/proc_daemon.py b/daliuge-engine/dlg/manager/proc_daemon.py index e76c0299b..0f349c72f 100644 --- a/daliuge-engine/dlg/manager/proc_daemon.py +++ b/daliuge-engine/dlg/manager/proc_daemon.py @@ -216,7 +216,7 @@ def startDIM(self, nodes): # by the Master Manager if self._zeroconf: addrs = utils.get_local_ip_addr() - logger.info("Registering this DIM with zeroconf: %s" % addrs) + logger.info("Registering this DIM with zeroconf: %s", addrs) self._nm_info = utils.register_service( self._zeroconf, "DIM", From c01bce6c987c8ced0443bb0f350d5575270a53b9 Mon Sep 17 00:00:00 2001 From: Nicholas Pritchard <30886786+pritchardn@users.noreply.github.com> Date: Wed, 25 May 2022 10:29:05 +0800 Subject: [PATCH 018/183] Dictionary iteration with .items() Co-authored-by: rtobar --- daliuge-engine/dlg/nm_dim_assigner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/daliuge-engine/dlg/nm_dim_assigner.py b/daliuge-engine/dlg/nm_dim_assigner.py index f8dd98d3d..859e8d086 100644 --- a/daliuge-engine/dlg/nm_dim_assigner.py +++ b/daliuge-engine/dlg/nm_dim_assigner.py @@ -55,8 +55,8 @@ def allocate_nms(self): logger.info("We have no DIMs") elif len(self.DIMs.keys()) == 1: dim_ip = list(self.DIMs.values())[0][0] # IP of the first (only) DIM - for nm in self.NMs: - nm_ip = self.NMs[nm][0] + for nm, endpoint in self.NMs.items(): + nm_ip = endpoint[0] if nm not in self.assignments: logger.info("Adding NM %s to DIM %s", nm_ip, dim_ip) self.mm_client.add_node_to_dim(dim_ip, nm_ip) From 318cf2b8246c837c4f3f351e1d1a4c48ce795232 Mon Sep 17 00:00:00 2001 From: pritchardn <21726929@student.uwa.edu.au> Date: Wed, 25 May 2022 12:56:28 +0800 Subject: [PATCH 019/183] De-duplicates timeout calls, introducing a generic method to do so. Adjusts two tests which now check for empty lists, rather than None accordingly. --- daliuge-engine/test/manager/test_daemon.py | 136 +++++++++++++-------- 1 file changed, 83 insertions(+), 53 deletions(-) diff --git a/daliuge-engine/test/manager/test_daemon.py b/daliuge-engine/test/manager/test_daemon.py index 962665ae1..658f9f4a6 100644 --- a/daliuge-engine/test/manager/test_daemon.py +++ b/daliuge-engine/test/manager/test_daemon.py @@ -34,6 +34,43 @@ _TIMEOUT = 10 +def _wait_until(update_condition, test_condition, timeout, interval=0.1, *args): + timeout_time = time.time() + timeout + output = None + while not test_condition(output) and time.time() < timeout_time: + output = update_condition(*args) + time.sleep(interval) + return output + + +def _get_dims_from_client(timeout, client): + def _update_nodes(*args): + return args[0].dims() + + def _test_dims(_dims): + return _dims + + dims = _wait_until(_update_nodes, _test_dims, timeout, 0.1, client) + return dims + + +def _get_nodes_from_client(timeout, client): + def _update_nodes(*args): + return args[0].nodes() + + def _test_nodes(_nodes): + if not _nodes or len(_nodes) == 0: + return False + return _nodes + + nodes = _wait_until(_update_nodes, _test_nodes, timeout, 0.1, client) + return nodes + + +def _update_nodes_with_timeout(*args): + return _get_nodes_from_client(_TIMEOUT, args[0]) + + class TestDaemon(unittest.TestCase): def create_daemon(self, *args, **kwargs): self._daemon_t = None @@ -120,7 +157,8 @@ def test_zeroconf_discovery(self): # Both managers started fine. If they zeroconf themselves correctly then # if we query the MM it should know about its nodes, which should have # one element - nodes = self._get_nodes_from_master(_TIMEOUT) + mc = MasterManagerClient() + nodes = _get_nodes_from_client(_TIMEOUT, mc) self.assertIsNotNone(nodes) self.assertEqual( 1, @@ -138,11 +176,19 @@ def _test_zeroconf_dim_mm(self, disable_zeroconf=False): # Check that dim registers to MM timeout_time = time.time() + _TIMEOUT dims = None - while time.time() < timeout_time: - dims = self._get_dims_from_master(_TIMEOUT) + mc = MasterManagerClient() + + def _update_dims(*args): + _dims = _get_dims_from_client(_TIMEOUT, args[0]) + return _dims + + def _test_dims(_dims): if dims is not None and len(dims["islands"]) > 0: - break - time.sleep(0.1) + return dims + else: + return False + + dims = _wait_until(_update_dims, _test_dims, _TIMEOUT, 0.1, mc) self.assertIsNotNone(dims) return dims @@ -164,13 +210,15 @@ def test_without_zeroconf_dim_mm(self): def _add_zeroconf_nm(self): self._start("node", http.HTTPStatus.OK) - timeout_time = time.time() + _TIMEOUT - nodes = None - while time.time() < timeout_time: - nodes = self._get_nodes_from_dim(_TIMEOUT) - if nodes is not None and len(nodes) > 0: - break - time.sleep(0.1) + mc = MasterManagerClient() + + def _test_nodes(_nodes): + if _nodes is not None and len(_nodes) > 0: + return _nodes + else: + return False + + nodes = _wait_until(_update_nodes_with_timeout, _test_nodes, _TIMEOUT, 0.1, mc) return nodes def test_zeroconf_dim_nm_setup(self): @@ -185,21 +233,23 @@ def test_zeroconf_dim_nm_setup(self): def test_without_zeroconf_dim_nm_setup(self): self._test_zeroconf_dim_mm(disable_zeroconf=True) - nodes = self._add_zeroconf_nm() - self.assertIsNone(nodes) + nodes = self._add_zeroconf_nm()['nodes'] + self.assertEqual(0, len(nodes)) def test_zeroconf_nm_down(self): self._test_zeroconf_dim_mm(disable_zeroconf=False) nodes = self._add_zeroconf_nm() self.assertIsNotNone(nodes) self._stop("node", http.HTTPStatus.OK) - timeout_time = time.time() + _TIMEOUT - while time.time() < timeout_time: - nodes = self._get_nodes_from_dim(_TIMEOUT) - if nodes is None: - break - time.sleep(0.1) - self.assertIsNone(nodes) + mc = MasterManagerClient() + + def _test_nodes(_nodes): + if not nodes['nodes']: + return nodes['nodes'] + return False + + nodes = _wait_until(_update_nodes_with_timeout, _test_nodes, _TIMEOUT, 0.1, mc)['nodes'] + self.assertEqual(0, len(nodes)) def test_start_dataisland_via_rest(self): @@ -208,7 +258,8 @@ def test_start_dataisland_via_rest(self): # Both managers started fine. If they zeroconf themselves correctly then # if we query the MM it should know about its nodes, which should have # one element - nodes = self._get_nodes_from_master(_TIMEOUT) + mc = MasterManagerClient() + nodes = _get_nodes_from_client(_TIMEOUT, mc) self.assertIsNotNone(nodes) self.assertEqual( 1, @@ -227,13 +278,15 @@ def test_stop_dataisland_via_rest(self): # start master and island manager self.create_daemon(master=True, noNM=False, disable_zeroconf=False) - nodes = self._get_nodes_from_master(_TIMEOUT) + mc = MasterManagerClient() + nodes = _get_nodes_from_client(_TIMEOUT, mc) self._start("island", http.HTTPStatus.OK, {"nodes": nodes}) # Both managers started fine. If they zeroconf themselves correctly then # if we query the MM it should know about its nodes, which should have # one element - nodes = self._get_nodes_from_master(_TIMEOUT) + mc = MasterManagerClient() + nodes = _get_nodes_from_client(_TIMEOUT, mc) self.assertIsNotNone(nodes) self.assertEqual( 1, @@ -256,7 +309,8 @@ def test_stop_start_node_via_rest(self): # Both managers started fine. If they zeroconf themselves correctly then # if we query the MM it should know about its nodes, which should have # one element - nodes = self._get_nodes_from_master(_TIMEOUT) + mc = MasterManagerClient() + nodes = _get_nodes_from_client(_TIMEOUT, mc) self.assertIsNotNone(nodes) self.assertEqual( 1, @@ -298,7 +352,8 @@ def test_start_stop_master_via_rest(self): def test_get_dims(self): self.create_daemon(master=True, noNM=True, disable_zeroconf=False) # Check that the DataIsland starts with the given nodes - dims = self._get_dims_from_master(_TIMEOUT) + mc = MasterManagerClient() + dims = _get_dims_from_client(_TIMEOUT, mc) self.assertIsNotNone(dims) self.assertEqual( 0, @@ -314,7 +369,7 @@ def _start(self, manager_name, expected_code, payload=None): headers["Content-Type"] = "application/json" conn.request( "POST", - "/managers/%s/start" % (manager_name,), + f"/managers/{manager_name}/start", body=payload, headers=headers, ) @@ -330,34 +385,9 @@ def _stop(self, manager_name, expected_code, payload=None): payload = json.dumps(payload) headers["Content-Type"] = "application/json" conn.request( - "POST", "/managers/%s/stop" % (manager_name,), body=payload, headers=headers + "POST", f"/managers/{manager_name}/stop", body=payload, headers=headers ) response = conn.getresponse() self.assertEqual(expected_code, response.status, response.read()) response.close() conn.close() - - def _get_nodes_from_client(self, timeout, client): - timeout_time = time.time() + timeout - while time.time() < timeout_time: - nodes = client.nodes() - if nodes: - return nodes - time.sleep(0.1) - - def _get_nodes_from_master(self, timeout): - mc = MasterManagerClient() - return self._get_nodes_from_client(timeout, mc) - - def _get_nodes_from_dim(self, timeout): - dimc = DataIslandManagerClient() - return self._get_nodes_from_client(timeout, dimc) - - def _get_dims_from_master(self, timeout): - mc = MasterManagerClient() - timeout_time = time.time() + timeout - while time.time() < timeout_time: - dims = mc.dims() - if dims: - return dims - time.sleep(0.1) From 14931a9115c8745663e0998d1a604c6cc4fb86e9 Mon Sep 17 00:00:00 2001 From: pritchardn <21726929@student.uwa.edu.au> Date: Wed, 25 May 2022 13:23:18 +0800 Subject: [PATCH 020/183] Log strings now lazily evaluated --- daliuge-common/dlg/clients.py | 9 +++-- .../dlg/manager/composite_manager.py | 6 ++-- daliuge-engine/dlg/manager/proc_daemon.py | 36 +++++++++---------- daliuge-engine/dlg/manager/rest.py | 24 ++++++------- 4 files changed, 37 insertions(+), 38 deletions(-) diff --git a/daliuge-common/dlg/clients.py b/daliuge-common/dlg/clients.py index 7d7345e27..f78fa42fc 100644 --- a/daliuge-common/dlg/clients.py +++ b/daliuge-common/dlg/clients.py @@ -26,7 +26,6 @@ from . import constants from .restutils import RestClient - logger = logging.getLogger(__name__) compress = os.environ.get("DALIUGE_COMPRESSED_JSON", True) @@ -137,7 +136,7 @@ def sessions(self): "Successfully read %d sessions from %s:%s", len(sessions), self.host, - self.port, + self.port ) return sessions @@ -223,7 +222,7 @@ class NodeManagerClient(BaseDROPManagerClient): """ def __init__( - self, host="localhost", port=constants.NODE_DEFAULT_REST_PORT, timeout=10 + self, host="localhost", port=constants.NODE_DEFAULT_REST_PORT, timeout=10 ): super(NodeManagerClient, self).__init__(host=host, port=port, timeout=timeout) @@ -259,7 +258,7 @@ class DataIslandManagerClient(CompositeManagerClient): """ def __init__( - self, host="localhost", port=constants.ISLAND_DEFAULT_REST_PORT, timeout=10 + self, host="localhost", port=constants.ISLAND_DEFAULT_REST_PORT, timeout=10 ): super(DataIslandManagerClient, self).__init__( host=host, port=port, timeout=timeout @@ -272,7 +271,7 @@ class MasterManagerClient(CompositeManagerClient): """ def __init__( - self, host="localhost", port=constants.MASTER_DEFAULT_REST_PORT, timeout=10 + self, host="localhost", port=constants.MASTER_DEFAULT_REST_PORT, timeout=10 ): super(MasterManagerClient, self).__init__(host=host, port=port, timeout=timeout) diff --git a/daliuge-engine/dlg/manager/composite_manager.py b/daliuge-engine/dlg/manager/composite_manager.py index 2de9265fa..86c09b4d4 100644 --- a/daliuge-engine/dlg/manager/composite_manager.py +++ b/daliuge-engine/dlg/manager/composite_manager.py @@ -351,7 +351,7 @@ def addGraphSpec(self, sessionId, graphSpec): # belong to the same host, so we can submit that graph into the individual # DMs. For this we need to make sure that our graph has a the correct # attribute set - logger.info(f"Separating graph using partition attribute {self._partitionAttr}") + logger.info("Separating graph using partition attribute %s", self._partitionAttr) perPartition = collections.defaultdict(list) if "rmode" in graphSpec[-1]: init_pg_repro_data(graphSpec) @@ -386,7 +386,7 @@ def addGraphSpec(self, sessionId, graphSpec): # At each partition the relationships between DROPs should be local at the # moment of submitting the graph; thus we record the inter-partition # relationships separately and remove them from the original graph spec - logger.info(f"Graph splitted into {perPartition.keys()}") + logger.info("Graph split into %r", perPartition.keys()) inter_partition_rels = [] for dropSpecs in perPartition.values(): inter_partition_rels += graph_loader.removeUnmetRelationships(dropSpecs) @@ -452,7 +452,7 @@ def deploySession(self, sessionId, completedDrops=[]): ) logger.info("Delivered node subscription list to node managers") logger.debug( - "Number of subscriptions: %s" % len(self._drop_rels[sessionId].items()) + "Number of subscriptions: %s", len(self._drop_rels[sessionId].items()) ) logger.info("Deploying Session %s in all hosts", sessionId) diff --git a/daliuge-engine/dlg/manager/proc_daemon.py b/daliuge-engine/dlg/manager/proc_daemon.py index 0f349c72f..71008d6b9 100644 --- a/daliuge-engine/dlg/manager/proc_daemon.py +++ b/daliuge-engine/dlg/manager/proc_daemon.py @@ -180,15 +180,15 @@ def startNM(self): tool = get_tool() args = ["--host", "0.0.0.0"] args += self._verbosity_as_cmdline() - logger.info("Starting Node Drop Manager with args: %s" % (" ".join(args))) + logger.info("Starting Node Drop Manager with args: %s", (" ".join(args))) self._nm_proc = tool.start_process("nm", args) - logger.info("Started Node Drop Manager with PID %d" % (self._nm_proc.pid)) + logger.info("Started Node Drop Manager with PID %d", self._nm_proc.pid) # Registering the new NodeManager via zeroconf so it gets discovered # by the Master Manager if self._zeroconf: addrs = utils.get_local_ip_addr() - logger.info("Registering this NM with zeroconf: %s" % addrs) + logger.info("Registering this NM with zeroconf: %s", addrs) self._nm_info = utils.register_service( self._zeroconf, "NodeManager", @@ -205,11 +205,11 @@ def startDIM(self, nodes): if nodes: args += ["--nodes", ",".join(nodes)] logger.info( - "Starting Data Island Drop Manager with args: %s" % (" ".join(args)) + "Starting Data Island Drop Manager with args: %s", (" ".join(args)) ) self._dim_proc = tool.start_process("dim", args) logger.info( - "Started Data Island Drop Manager with PID %d" % (self._dim_proc.pid) + "Started Data Island Drop Manager with PID %d", self._dim_proc.pid ) # Registering the new DIM via zeroconf so it gets discovered @@ -230,9 +230,9 @@ def startMM(self): tool = get_tool() args = ["--host", "0.0.0.0"] args += self._verbosity_as_cmdline() - logger.info("Starting Master Drop Manager with args: %s" % (" ".join(args))) + logger.info("Starting Master Drop Manager with args: %s", (" ".join(args))) self._mm_proc = tool.start_process("mm", args) - logger.info("Started Master Drop Manager with PID %d" % (self._mm_proc.pid)) + logger.info("Started Master Drop Manager with PID %d", self._mm_proc.pid) # Also subscribe to zeroconf events coming from NodeManagers and feed # the Master Manager with the new hosts we find @@ -246,14 +246,14 @@ def nm_callback(zeroconf, service_type, name, state_change): port = info.port nm_assigner.add_nm(name, server, port) logger.info( - "Found a new Node Manager on %s:%d, will add it to the MM" - % (server, port) + "Found a new Node Manager on %s:%d, will add it to the MM", + (server, port) ) elif state_change is zc.ServiceStateChange.Removed: server, port = nm_assigner.NMs[name] logger.info( - "Node Manager on %s:%d disappeared, removing it from the MM" - % (server, port) + "Node Manager on %s:%d disappeared, removing it from the MM", + (server, port) ) # Don't bother to remove it if we're shutting down. This way @@ -269,14 +269,14 @@ def dim_callback(zeroconf, service_type, name, state_change): port = info.port nm_assigner.add_dim(name, server, port) logger.info( - "Found a new Data Island Manager on %s:%d, will add it to the MM" - % (server, port) + "Found a new Data Island Manager on %s:%d, will add it to the MM", + (server, port) ) elif state_change is zc.ServiceStateChange.Removed: server, port = nm_assigner.DIMs[name] logger.info( - "Data Island Manager on %s:%d disappeared, removing it from the MM" - % (server, port) + "Data Island Manager on %s:%d disappeared, removing it from the MM", + (server, port) ) # Don't bother to remove it if we're shutting down. This way @@ -319,7 +319,7 @@ def _rest_stop_manager(self, proc, stop_method): def _rest_get_manager_info(self, proc): if proc: bottle.response.content_type = "application/json" - logger.info("Sending response: %s" % json.dumps({"pid": proc.pid})) + logger.info("Sending response: %s", json.dumps({"pid": proc.pid})) return json.dumps({"pid": proc.pid}) else: return json.dumps({"pid": None}) @@ -337,7 +337,7 @@ def rest_getMgrs(self): if mgrs["node"]: mgrs["node"] = self._nm_proc.pid - logger.info("Sending response: %s" % json.dumps(mgrs)) + logger.info("Sending response: %s", json.dumps(mgrs)) return json.dumps(mgrs) def rest_startNM(self): @@ -437,7 +437,7 @@ def handle_signal(signalNo, stack_frame): global terminating if terminating: return - logger.info("Received signal %d, will stop the daemon now" % (signalNo,)) + logger.info("Received signal %d, will stop the daemon now", signalNo) terminating = True daemon.stop(10) diff --git a/daliuge-engine/dlg/manager/rest.py b/daliuge-engine/dlg/manager/rest.py index 56e3bfb45..f0ebe71a1 100644 --- a/daliuge-engine/dlg/manager/rest.py +++ b/daliuge-engine/dlg/manager/rest.py @@ -191,8 +191,8 @@ def _stop_manager(self): self.dm.shutdown() self.stop() logger.info( - "Thanks for using our %s, come back again :-)" - % (self.dm.__class__.__name__) + "Thanks for using our %s, come back again :-)", + self.dm.__class__.__name__ ) @daliuge_aware @@ -400,7 +400,7 @@ def linkGraphParts(self, sessionId): @daliuge_aware def add_node_subscriptions(self, sessionId): - logger.debug(f"NM REST call: add_subscriptions {bottle.request.json}") + logger.debug("NM REST call: add_subscriptions %s", bottle.request.json) if bottle.request.content_type != "application/json": bottle.response.status = 415 return @@ -471,12 +471,12 @@ def getAllCMNodes(self): @daliuge_aware def addCMNode(self, node): - logger.debug("Adding node %s" % node) + logger.debug("Adding node %s", node) self.dm.add_node(node) @daliuge_aware def removeCMNode(self, node): - logger.debug("Removing node %s" % node) + logger.debug("Removing node %s", node) self.dm.remove_node(node) @daliuge_aware @@ -606,12 +606,12 @@ def getDIMs(self): @daliuge_aware def addDIM(self, dim): - logger.debug("Adding DIM %s" % dim) + logger.debug("Adding DIM %s", dim) self.dm.addDmHost(dim) @daliuge_aware def removeDIM(self, dim): - logger.debug("Removing dim %s" % dim) + logger.debug("Removing dim %s", dim) self.dm.removeDmHost(dim) @daliuge_aware @@ -621,21 +621,21 @@ def getNMs(self): @daliuge_aware def startNM(self, host): port = constants.DAEMON_DEFAULT_REST_PORT - logger.debug("Sending NM start request to %s:%s" % (host, port)) + logger.debug("Sending NM start request to %s:%s", (host, port)) with RestClient(host=host, port=port, timeout=10) as c: return json.loads(c._POST("/managers/node/start").read()) @daliuge_aware def stopNM(self, host): port = constants.DAEMON_DEFAULT_REST_PORT - logger.debug("Sending NM stop request to %s:%s" % (host, port)) + logger.debug("Sending NM stop request to %s:%s", (host, port)) with RestClient(host=host, port=port, timeout=10) as c: return json.loads(c._POST("/managers/node/stop").read()) @daliuge_aware def addNM(self, host, node): port = constants.ISLAND_DEFAULT_REST_PORT - logger.debug("Adding NM %s to DIM %s" % (node, host)) + logger.debug("Adding NM %s to DIM %s", (node, host)) with RestClient(host=host, port=port, timeout=10, url_prefix="/api") as c: return json.loads( c._POST( @@ -646,14 +646,14 @@ def addNM(self, host, node): @daliuge_aware def removeNM(self, host, node): port = constants.ISLAND_DEFAULT_REST_PORT - logger.debug("Removing NM %s from DIM %s" % (node, host)) + logger.debug("Removing NM %s from DIM %s", (node, host)) with RestClient(host=host, port=port, timeout=10, url_prefix="/api") as c: return json.loads(c._DELETE("/nodes/%s" % (node,)).read()) @daliuge_aware def getNMInfo(self, host): port = constants.DAEMON_DEFAULT_REST_PORT - logger.debug("Sending request %s:%s/managers/node" % (host, port)) + logger.debug("Sending request %s:%s/managers/node", (host, port)) with RestClient(host=host, port=port, timeout=10) as c: return json.loads(c._GET("/managers/node").read()) From 9d53642952404a9542716f53dca57ac8ae6217a2 Mon Sep 17 00:00:00 2001 From: pritchardn <21726929@student.uwa.edu.au> Date: Wed, 25 May 2022 14:29:06 +0800 Subject: [PATCH 021/183] Converts formatted strings to f-strings --- daliuge-common/dlg/clients.py | 50 +++++++------------ .../dlg/manager/composite_manager.py | 34 +++++-------- daliuge-engine/dlg/manager/rest.py | 14 +++--- 3 files changed, 39 insertions(+), 59 deletions(-) diff --git a/daliuge-common/dlg/clients.py b/daliuge-common/dlg/clients.py index f78fa42fc..ed9f2bb82 100644 --- a/daliuge-common/dlg/clients.py +++ b/daliuge-common/dlg/clients.py @@ -48,7 +48,7 @@ def stop(self): self._POST("/stop") def cancelSession(self, sessionId): - self._POST("/sessions/%s/cancel" % quote(sessionId)) + self._POST(f"/sessions/{quote(sessionId)}/cancel") def create_session(self, sessionId): """ @@ -67,7 +67,7 @@ def deploy_session(self, sessionId, completed_uids=[]): content = None if completed_uids: content = {"completed": ",".join(completed_uids)} - self._post_form("/sessions/%s/deploy" % (quote(sessionId),), content) + self._post_form(f"/sessions/{quote(sessionId)}/deploy", content) logger.debug( "Successfully deployed session %s on %s:%s", sessionId, self.host, self.port ) @@ -78,7 +78,7 @@ def append_graph(self, sessionId, graphSpec): but checking that the graph looks correct """ self._post_json( - "/sessions/%s/graph/append" % (quote(sessionId),), + f"/sessions/{quote(sessionId)}/graph/append", graphSpec, compress=compress, ) @@ -93,7 +93,7 @@ def destroy_session(self, sessionId): """ Destroys session `sessionId` """ - self._DELETE("/sessions/%s" % (quote(sessionId),)) + self._DELETE(f"/sessions/{quote(sessionId)}") logger.debug( "Successfully deleted session %s on %s:%s", sessionId, self.host, self.port ) @@ -103,7 +103,7 @@ def graph_status(self, sessionId): Returns a dictionary where the keys are DROP UIDs and the values are their corresponding status. """ - ret = self._get_json("/sessions/%s/graph/status" % (quote(sessionId),)) + ret = self._get_json(f"/sessions/{quote(sessionId)}/graph/status") logger.debug( "Successfully read graph status from session %s on %s:%s", sessionId, @@ -117,7 +117,7 @@ def graph(self, sessionId): Returns a dictionary where the key are the DROP UIDs, and the values are the DROP specifications. """ - graph = self._get_json("/sessions/%s/graph" % (quote(sessionId),)) + graph = self._get_json(f"/sessions/{quote(sessionId)}/graph") logger.debug( "Successfully read graph (%d nodes) from session %s on %s:%s", len(graph), @@ -144,7 +144,7 @@ def session(self, sessionId): """ Returns the details of sessions `sessionId` """ - session = self._get_json("/sessions/%s" % (quote(sessionId),)) + session = self._get_json(f"/sessions/{quote(sessionId)}") logger.debug( "Successfully read session %s from %s:%s", sessionId, self.host, self.port ) @@ -154,7 +154,7 @@ def session_status(self, sessionId): """ Returns the status of session `sessionId` """ - status = self._get_json("/sessions/%s/status" % (quote(sessionId),)) + status = self._get_json(f"/sessions/{quote(sessionId)}/status") logger.debug( "Successfully read session %s status (%s) from %s:%s", sessionId, @@ -168,7 +168,7 @@ def session_repro_status(self, sessionId): """ Returns the reproducibility status of session `sessionId`. """ - status = self._get_json("/sessions/%s/repro/status" % (quote(sessionId),)) + status = self._get_json(f"/sessions/{quote(sessionId)}/repro/status") logger.debug( "Successfully read session %s reproducibility status (%s) from %s:%s", sessionId, @@ -182,7 +182,7 @@ def session_repro_data(self, sessionId): """ Returns the graph-wide reproducibility information of session `sessionId`. """ - data = self._get_json("/sessions/%s/repro/data" % (quote(sessionId),)) + data = self._get_json(f"/sessions/{quote(sessionId)}/repro/data") logger.debug( "Successfully read session %s reproducibility data from %s:%s", sessionId, @@ -195,7 +195,7 @@ def graph_size(self, sessionId): """ Returns the size of the graph of session `sessionId` """ - count = self._get_json("/sessions/%s/graph/size" % (quote(sessionId))) + count = self._get_json(f"/sessions/{quote(sessionId)}/graph/size") logger.debug( "Successfully read session %s graph size (%d) from %s:%s", sessionId, @@ -228,11 +228,11 @@ def __init__( def add_node_subscriptions(self, sessionId, node_subscriptions): self._post_json( - "/sessions/%s/subscriptions" % (quote(sessionId),), node_subscriptions + f"/sessions/{quote(sessionId)}/subscriptions", node_subscriptions ) def trigger_drops(self, sessionId, drop_uids): - self._post_json("/sessions/%s/trigger" % (quote(sessionId),), drop_uids) + self._post_json(f"/sessions/{quote(sessionId)}/trigger", drop_uids) def shutdown_node_manager(self): self._GET("/shutdown") @@ -246,10 +246,10 @@ def nodes(self): return self._get_json("/nodes") def add_node(self, node): - self._POST("/nodes/%s" % (node,), content=None) + self._POST(f"/nodes/{node}", content=None) def remove_node(self, node): - self._DELETE("/nodes/%s" % (node,)) + self._DELETE(f"/nodes/{node}") class DataIslandManagerClient(CompositeManagerClient): @@ -277,7 +277,7 @@ def __init__( def create_island(self, island_host, nodes): self._post_json( - "/managers/%s/dataisland" % (quote(island_host)), {"nodes": nodes} + f"/managers/{quote(island_host)}/dataisland", {"nodes": nodes} ) def dims(self): @@ -287,29 +287,17 @@ def add_dim(self, dim): self._POST(f"/islands/{dim}", content=None) def remove_dim(self, dim): - self._DELETE("/islands/%s" % (dim,)) + self._DELETE(f"/islands/{dim}") def add_node_to_dim(self, dim, nm): """ Adds a nm to a dim """ self._POST( - "managers/%s/nodes/%s" - % ( - dim, - nm, - ), - content=None, - ) + f"managers/{dim}/nodes/{nm}", content=None, ) def remove_node_from_dim(self, dim, nm): """ Removes a nm from a dim """ - self._DELETE( - "managers/%s/nodes/%s" - % ( - dim, - nm, - ) - ) + self._DELETE(f"managers/{dim}/nodes/{nm}") diff --git a/daliuge-engine/dlg/manager/composite_manager.py b/daliuge-engine/dlg/manager/composite_manager.py index 86c09b4d4..198f2e6a6 100644 --- a/daliuge-engine/dlg/manager/composite_manager.py +++ b/daliuge-engine/dlg/manager/composite_manager.py @@ -127,13 +127,13 @@ class allows for multiple levels of hierarchy seamlessly. __metaclass__ = abc.ABCMeta def __init__( - self, - dmPort, - partitionAttr, - subDmId, - dmHosts=[], - pkeyPath=None, - dmCheckTimeout=10, + self, + dmPort, + partitionAttr, + subDmId, + dmHosts=[], + pkeyPath=None, + dmCheckTimeout=10, ): """ Creates a new CompositeManager. The sub-DMs it manages are to be located @@ -239,7 +239,7 @@ def dmAt(self, host, port=None): if not self.check_dm(host, port): raise SubManagerException( - "Manager expected but not running in %s:%d" % (host, port) + f"Manager expected but not running in {host}:{port}" ) port = port or self._dmPort @@ -288,10 +288,7 @@ def replicate(self, sessionId, f, action, collect=None, iterable=None, port=None iterable, ) if thrExs: - msg = "More than one error occurred while %s on session %s" % ( - action, - sessionId, - ) + msg = f"More than one error occurred while {action} on session {sessionId}" raise SubManagerException(msg, thrExs) # @@ -364,19 +361,14 @@ def addGraphSpec(self, sessionId, graphSpec): graphSpec.pop() for dropSpec in graphSpec: if self._partitionAttr not in dropSpec: - msg = "Drop %s doesn't specify a %s attribute" % ( - dropSpec["oid"], - self._partitionAttr, - ) + msg = f"Drop {dropSpec.get('oid', None)} doesn't specify a {self._partitionAttr} " \ + f"attribute" raise InvalidGraphException(msg) partition = dropSpec[self._partitionAttr] if partition not in self._dmHosts: - msg = "Drop %s's %s %s does not belong to this DM" % ( - dropSpec["oid"], - self._partitionAttr, - partition, - ) + msg = f"Drop {dropSpec.get('oid', None)}'s {self._partitionAttr} {partition} " \ + f"does not belong to this DM" raise InvalidGraphException(msg) perPartition[partition].append(dropSpec) diff --git a/daliuge-engine/dlg/manager/rest.py b/daliuge-engine/dlg/manager/rest.py index f0ebe71a1..0a3a7e5ab 100644 --- a/daliuge-engine/dlg/manager/rest.py +++ b/daliuge-engine/dlg/manager/rest.py @@ -482,7 +482,7 @@ def removeCMNode(self, node): @daliuge_aware def getNodeSessions(self, node): if node not in self.dm.nodes: - raise Exception("%s not in current list of nodes" % (node,)) + raise Exception(f"{node} not in current list of nodes") with NodeManagerClient(host=node) as dm: return dm.sessions() @@ -527,28 +527,28 @@ def getLogFile(self, sessionId): @daliuge_aware def getNodeSessionInformation(self, node, sessionId): if node not in self.dm.nodes: - raise Exception("%s not in current list of nodes" % (node,)) + raise Exception(f"{node} not in current list of nodes") with NodeManagerClient(host=node) as dm: return dm.session(sessionId) @daliuge_aware def getNodeSessionStatus(self, node, sessionId): if node not in self.dm.nodes: - raise Exception("%s not in current list of nodes" % (node,)) + raise Exception(f"{node} not in current list of nodes") with NodeManagerClient(host=node) as dm: return dm.session_status(sessionId) @daliuge_aware def getNodeGraph(self, node, sessionId): if node not in self.dm.nodes: - raise Exception("%s not in current list of nodes" % (node,)) + raise Exception(f"{node} not in current list of nodes") with NodeManagerClient(host=node) as dm: return dm.graph(sessionId) @daliuge_aware def getNodeGraphStatus(self, node, sessionId): if node not in self.dm.nodes: - raise Exception("%s not in current list of nodes" % (node,)) + raise Exception(f"{node} not in current list of nodes") with NodeManagerClient(host=node) as dm: return dm.graph_status(sessionId) @@ -639,7 +639,7 @@ def addNM(self, host, node): with RestClient(host=host, port=port, timeout=10, url_prefix="/api") as c: return json.loads( c._POST( - "/nodes/%s" % (node,), + f"/nodes/{node}", ).read() ) @@ -648,7 +648,7 @@ def removeNM(self, host, node): port = constants.ISLAND_DEFAULT_REST_PORT logger.debug("Removing NM %s from DIM %s", (node, host)) with RestClient(host=host, port=port, timeout=10, url_prefix="/api") as c: - return json.loads(c._DELETE("/nodes/%s" % (node,)).read()) + return json.loads(c._DELETE(f"/nodes/{node}").read()) @daliuge_aware def getNMInfo(self, host): From b7bf63bf0d40b1850018d588b88de6ec870b448e Mon Sep 17 00:00:00 2001 From: pritchardn <21726929@student.uwa.edu.au> Date: Wed, 25 May 2022 14:36:58 +0800 Subject: [PATCH 022/183] Renames _mm_browser to _mm_nm_browser --- daliuge-engine/dlg/manager/proc_daemon.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/daliuge-engine/dlg/manager/proc_daemon.py b/daliuge-engine/dlg/manager/proc_daemon.py index 71008d6b9..725b8a00b 100644 --- a/daliuge-engine/dlg/manager/proc_daemon.py +++ b/daliuge-engine/dlg/manager/proc_daemon.py @@ -83,7 +83,7 @@ def __init__(self, master=False, noNM=False, disable_zeroconf=False, verbosity=0 # Zeroconf for NM and MM self._zeroconf = None if disable_zeroconf else zc.Zeroconf() self._nm_info = None - self._mm_browser = None + self._mm_nm_browser = None self._mm_dim_browser = None # Starting managers @@ -126,9 +126,9 @@ def _stop_zeroconf(self): return # Stop the MM service browser, the NM registration, and ZC itself - if self._mm_browser: - self._mm_browser.cancel() - self._mm_browser.join() + if self._mm_nm_browser: + self._mm_nm_browser.cancel() + self._mm_nm_browser.join() if self._mm_dim_browser: self._mm_dim_browser.cancel() self._mm_dim_browser.join() @@ -285,7 +285,7 @@ def dim_callback(zeroconf, service_type, name, state_change): if not self._shutting_down: nm_assigner.remove_dim(name) - self._mm_browser = utils.browse_service( + self._mm_nm_browser = utils.browse_service( self._zeroconf, "NodeManager", "tcp", nm_callback ) self._mm_dim_browser = utils.browse_service( From 8b95f114ee92560415c0520b0130fb239f5cd57e Mon Sep 17 00:00:00 2001 From: pritchardn <21726929@student.uwa.edu.au> Date: Wed, 25 May 2022 14:40:34 +0800 Subject: [PATCH 023/183] Adds _dim_info object used to register zeroconf service for DIM discovery --- daliuge-engine/dlg/manager/proc_daemon.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/daliuge-engine/dlg/manager/proc_daemon.py b/daliuge-engine/dlg/manager/proc_daemon.py index 725b8a00b..e74430e39 100644 --- a/daliuge-engine/dlg/manager/proc_daemon.py +++ b/daliuge-engine/dlg/manager/proc_daemon.py @@ -83,6 +83,7 @@ def __init__(self, master=False, noNM=False, disable_zeroconf=False, verbosity=0 # Zeroconf for NM and MM self._zeroconf = None if disable_zeroconf else zc.Zeroconf() self._nm_info = None + self._dim_info = None self._mm_nm_browser = None self._mm_dim_browser = None @@ -170,6 +171,8 @@ def stopNM(self, timeout=10): return self._stop_manager("_nm_proc", timeout) def stopDIM(self, timeout=10): + if self._dim_info: + utils.deregister_service(self._zeroconf, self._dim_info) self._stop_manager("_dim_proc", timeout) def stopMM(self, timeout=10): @@ -217,7 +220,7 @@ def startDIM(self, nodes): if self._zeroconf: addrs = utils.get_local_ip_addr() logger.info("Registering this DIM with zeroconf: %s", addrs) - self._nm_info = utils.register_service( + self._dim_info = utils.register_service( self._zeroconf, "DIM", socket.gethostname(), From 9e250072871e5c4fdc2cc7fa33f1f45da532c174 Mon Sep 17 00:00:00 2001 From: pritchardn <21726929@student.uwa.edu.au> Date: Wed, 25 May 2022 14:59:59 +0800 Subject: [PATCH 024/183] Makes dim/nm callback generic. --- daliuge-engine/dlg/manager/proc_daemon.py | 35 +++++++---------------- daliuge-engine/dlg/nm_dim_assigner.py | 6 ++++ 2 files changed, 16 insertions(+), 25 deletions(-) diff --git a/daliuge-engine/dlg/manager/proc_daemon.py b/daliuge-engine/dlg/manager/proc_daemon.py index e74430e39..20da9f4b6 100644 --- a/daliuge-engine/dlg/manager/proc_daemon.py +++ b/daliuge-engine/dlg/manager/proc_daemon.py @@ -242,18 +242,18 @@ def startMM(self): if self._zeroconf: nm_assigner = NMAssigner() - def nm_callback(zeroconf, service_type, name, state_change): + def _callback(zeroconf, service_type, name, state_change, adder, remover, accessor): info = zeroconf.get_service_info(service_type, name) if state_change is zc.ServiceStateChange.Added: server = socket.inet_ntoa(_get_address(info)) port = info.port - nm_assigner.add_nm(name, server, port) + adder(name, server, port) logger.info( "Found a new Node Manager on %s:%d, will add it to the MM", (server, port) ) elif state_change is zc.ServiceStateChange.Removed: - server, port = nm_assigner.NMs[name] + server, port = accessor(name) logger.info( "Node Manager on %s:%d disappeared, removing it from the MM", (server, port) @@ -263,30 +263,15 @@ def nm_callback(zeroconf, service_type, name, state_change): # we avoid hanging in here if the MM is down already but # we are trying to remove our NM who has just disappeared if not self._shutting_down: - nm_assigner.remove_nm(name) + remover(name) - def dim_callback(zeroconf, service_type, name, state_change): - info = zeroconf.get_service_info(service_type, name) - if state_change is zc.ServiceStateChange.Added: - server = socket.inet_ntoa(_get_address(info)) - port = info.port - nm_assigner.add_dim(name, server, port) - logger.info( - "Found a new Data Island Manager on %s:%d, will add it to the MM", - (server, port) - ) - elif state_change is zc.ServiceStateChange.Removed: - server, port = nm_assigner.DIMs[name] - logger.info( - "Data Island Manager on %s:%d disappeared, removing it from the MM", - (server, port) - ) + nm_callback = functools.partial(_callback, adder=nm_assigner.add_nm, + remover=nm_assigner.remove_nm, + accessor=nm_assigner.get_nm) - # Don't bother to remove it if we're shutting down. This way - # we avoid hanging in here if the MM is down already but - # we are trying to remove our NM who has just disappeared - if not self._shutting_down: - nm_assigner.remove_dim(name) + dim_callback = functools.partial(_callback, adder=nm_assigner.add_dim, + remover=nm_assigner.remove_dim, + accessor=nm_assigner.get_dim) self._mm_nm_browser = utils.browse_service( self._zeroconf, "NodeManager", "tcp", nm_callback diff --git a/daliuge-engine/dlg/nm_dim_assigner.py b/daliuge-engine/dlg/nm_dim_assigner.py index 859e8d086..2ffa0403c 100644 --- a/daliuge-engine/dlg/nm_dim_assigner.py +++ b/daliuge-engine/dlg/nm_dim_assigner.py @@ -28,6 +28,12 @@ def add_nm(self, name, server, port): self.mm_client.add_node(server) self.allocate_nms() + def get_dim(self, name): + return self.DIMs[name] + + def get_nm(self, name): + return self.NMs[name] + def remove_dim(self, name): server, port = self.DIMs[name] try: From bd91021a2a80423a501b3bfa49e5601060733721 Mon Sep 17 00:00:00 2001 From: pritchardn <21726929@student.uwa.edu.au> Date: Wed, 25 May 2022 15:29:39 +0800 Subject: [PATCH 025/183] zeroconf callback service_type now dynamic in log statements --- daliuge-engine/dlg/manager/proc_daemon.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/daliuge-engine/dlg/manager/proc_daemon.py b/daliuge-engine/dlg/manager/proc_daemon.py index 20da9f4b6..723ca01e1 100644 --- a/daliuge-engine/dlg/manager/proc_daemon.py +++ b/daliuge-engine/dlg/manager/proc_daemon.py @@ -249,14 +249,14 @@ def _callback(zeroconf, service_type, name, state_change, adder, remover, access port = info.port adder(name, server, port) logger.info( - "Found a new Node Manager on %s:%d, will add it to the MM", - (server, port) + "Found a new %s on %s:%d, will add it to the MM", + (service_type, server, port) ) elif state_change is zc.ServiceStateChange.Removed: server, port = accessor(name) logger.info( - "Node Manager on %s:%d disappeared, removing it from the MM", - (server, port) + "%s on %s:%d disappeared, removing it from the MM", + (service_type, server, port) ) # Don't bother to remove it if we're shutting down. This way From 9f94be51787b37f336db11eee1d94049f762288d Mon Sep 17 00:00:00 2001 From: pritchardn <21726929@student.uwa.edu.au> Date: Wed, 25 May 2022 15:46:30 +0800 Subject: [PATCH 026/183] zeroconf callback service_type now dynamic in log statements --- daliuge-engine/dlg/manager/proc_daemon.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/daliuge-engine/dlg/manager/proc_daemon.py b/daliuge-engine/dlg/manager/proc_daemon.py index 723ca01e1..008fa3b5f 100644 --- a/daliuge-engine/dlg/manager/proc_daemon.py +++ b/daliuge-engine/dlg/manager/proc_daemon.py @@ -250,13 +250,13 @@ def _callback(zeroconf, service_type, name, state_change, adder, remover, access adder(name, server, port) logger.info( "Found a new %s on %s:%d, will add it to the MM", - (service_type, server, port) + service_type, server, port ) elif state_change is zc.ServiceStateChange.Removed: server, port = accessor(name) logger.info( "%s on %s:%d disappeared, removing it from the MM", - (service_type, server, port) + service_type, server, port ) # Don't bother to remove it if we're shutting down. This way @@ -265,11 +265,11 @@ def _callback(zeroconf, service_type, name, state_change, adder, remover, access if not self._shutting_down: remover(name) - nm_callback = functools.partial(_callback, adder=nm_assigner.add_nm, + nm_callback = functools.partial(_callback, service_type='NodeManager', adder=nm_assigner.add_nm, remover=nm_assigner.remove_nm, accessor=nm_assigner.get_nm) - dim_callback = functools.partial(_callback, adder=nm_assigner.add_dim, + dim_callback = functools.partial(_callback, service_type="DIM", adder=nm_assigner.add_dim, remover=nm_assigner.remove_dim, accessor=nm_assigner.get_dim) From 0620f5a60e905c2127269b9c708f9ba7a308815f Mon Sep 17 00:00:00 2001 From: pritchardn <21726929@student.uwa.edu.au> Date: Wed, 25 May 2022 15:49:48 +0800 Subject: [PATCH 027/183] Minor reformatting of debug statements in rest.py (not sending tuples) --- daliuge-engine/dlg/manager/rest.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/daliuge-engine/dlg/manager/rest.py b/daliuge-engine/dlg/manager/rest.py index 0a3a7e5ab..2e8454f96 100644 --- a/daliuge-engine/dlg/manager/rest.py +++ b/daliuge-engine/dlg/manager/rest.py @@ -621,21 +621,21 @@ def getNMs(self): @daliuge_aware def startNM(self, host): port = constants.DAEMON_DEFAULT_REST_PORT - logger.debug("Sending NM start request to %s:%s", (host, port)) + logger.debug("Sending NM start request to %s:%s", host, port) with RestClient(host=host, port=port, timeout=10) as c: return json.loads(c._POST("/managers/node/start").read()) @daliuge_aware def stopNM(self, host): port = constants.DAEMON_DEFAULT_REST_PORT - logger.debug("Sending NM stop request to %s:%s", (host, port)) + logger.debug("Sending NM stop request to %s:%s", host, port) with RestClient(host=host, port=port, timeout=10) as c: return json.loads(c._POST("/managers/node/stop").read()) @daliuge_aware def addNM(self, host, node): port = constants.ISLAND_DEFAULT_REST_PORT - logger.debug("Adding NM %s to DIM %s", (node, host)) + logger.debug("Adding NM %s to DIM %s", node, host) with RestClient(host=host, port=port, timeout=10, url_prefix="/api") as c: return json.loads( c._POST( @@ -646,14 +646,14 @@ def addNM(self, host, node): @daliuge_aware def removeNM(self, host, node): port = constants.ISLAND_DEFAULT_REST_PORT - logger.debug("Removing NM %s from DIM %s", (node, host)) + logger.debug("Removing NM %s from DIM %s", node, host) with RestClient(host=host, port=port, timeout=10, url_prefix="/api") as c: return json.loads(c._DELETE(f"/nodes/{node}").read()) @daliuge_aware def getNMInfo(self, host): port = constants.DAEMON_DEFAULT_REST_PORT - logger.debug("Sending request %s:%s/managers/node", (host, port)) + logger.debug("Sending request %s:%s/managers/node", host, port) with RestClient(host=host, port=port, timeout=10) as c: return json.loads(c._GET("/managers/node").read()) From e6feb70c0c766afb9cf182fa38d57fb2354e377c Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Wed, 25 May 2022 14:22:26 +0800 Subject: [PATCH 028/183] pyfunc testing and docs --- daliuge-engine/dlg/apps/pyfunc.py | 39 +++++++++------- daliuge-engine/dlg/droputils.py | 46 +++++++++++-------- .../development/app_development/app_index.rst | 1 + .../app_development/datadrop_io.rst | 41 +++++++++++------ .../app_development/pyfunc_components.rst | 26 +++++++++++ .../app_development/python_components.rst | 25 ++++++---- .../app_development/special_components.rst | 1 - 7 files changed, 118 insertions(+), 61 deletions(-) create mode 100644 docs/development/app_development/pyfunc_components.rst diff --git a/daliuge-engine/dlg/apps/pyfunc.py b/daliuge-engine/dlg/apps/pyfunc.py index 31bc301d3..49ab044a6 100644 --- a/daliuge-engine/dlg/apps/pyfunc.py +++ b/daliuge-engine/dlg/apps/pyfunc.py @@ -30,7 +30,7 @@ import logging import pickle -from typing import Callable +from typing import Callable, Optional import dill from io import StringIO from contextlib import redirect_stdout @@ -116,10 +116,10 @@ def import_using_code(code): class DropParser(Enum): PICKLE = 'pickle' EVAL = 'eval' - PATH = 'path' - DATAURL = 'dataurl' NPY = 'npy' #JSON = "json" + PATH = 'path' # input only + DATAURL = 'dataurl' # input only ## # @brief PyFuncApp @@ -148,9 +148,9 @@ class DropParser(Enum): # \~English Python function name # @param[in] aparam/func_code Function Code//String/readwrite/False//False/ # \~English Python function code, e.g. 'def function_name(args): return args' -# @param[in] aparam/input_parser Input Parser/pickle/Select/readwrite/False/pickle,eval,path,dataurl,npy/False/ +# @param[in] aparam/input_parser Input Parser/pickle/Select/readwrite/False/pickle,npy,eval,path,dataurl/False/ # \~English Input port parsing technique -# @param[in] aparam/output_parser Output Parser/pickle/Select/readwrite/False/pickle,eval,path,dataurl,npy/False/ +# @param[in] aparam/output_parser Output Parser/pickle/Select/readwrite/False/pickle,eval,npy/False/ # \~English output port parsing technique # @param[in] aparam/func_defaults Function Defaults//String/readwrite/False//False/ # \~English Mapping from argname to default value. Should match only the last part of the argnames list. @@ -243,7 +243,7 @@ def _init_func_defaults(self): + "{self.f.__name__}: {self.func_defaults}, {type(self.func_defaults)}" ) raise ValueError - if DropParser(self.input_parser) is DropParser.PICKLE: + if self.input_parser is DropParser.PICKLE: # only values are pickled, get them unpickled for name, value in self.func_defaults.items(): self.func_defaults[name] = deserialize_data(value) @@ -382,18 +382,21 @@ def run(self): # Inputs are un-pickled and treated as the arguments of the function # Their order must be preserved, so we use an OrderedDict - if DropParser(self.input_parser) is DropParser.PICKLE: - all_contents = lambda x: pickle.loads(droputils.allDropContents(x)) - elif DropParser(self.input_parser) is DropParser.EVAL: - def astparse(x): + if self.input_parser is DropParser.PICKLE: + #all_contents = lambda x: pickle.loads(droputils.allDropContents(x)) + all_contents = droputils.load_pickle + elif self.input_parser is DropParser.EVAL: + def optionalEval(x): # Null and Empty Drops will return an empty byte string # which should propogate back to None - content: bytes = droputils.allDropContents(x) - return ast.literal_eval(content.decode('utf-8')) if content else None - all_contents = astparse - elif DropParser(self.input_parser) is DropParser.PATH: + content: str = droputils.allDropContents(x).decode('utf-8') + return ast.literal_eval(content) if len(content) > 0 else None + all_contents = optionalEval + elif self.input_parser is DropParser.NPY: + all_contents = droputils.load_npy + elif self.input_parser is DropParser.PATH: all_contents = lambda x: x.path - elif DropParser(self.input_parser) is DropParser.DATAURL: + elif self.input_parser is DropParser.DATAURL: all_contents = lambda x: x.dataurl else: raise ValueError(self.input_parser.__repr__()) @@ -547,11 +550,13 @@ def write_results(self, result): if len(outputs) == 1: result = [result] for r, o in zip(result, outputs): - if DropParser(self.output_parser) is DropParser.PICKLE: + if self.output_parser is DropParser.PICKLE: logger.debug(f"Writing pickeled result {type(r)} to {o}") o.write(pickle.dumps(r)) - elif DropParser(self.output_parser) is DropParser.EVAL: + elif self.output_parser is DropParser.EVAL: o.write(repr(r).encode('utf-8')) + elif self.output_parser is DropParser.NPY: + droputils.save_npy(o, r) else: ValueError(self.output_parser.__repr__()) diff --git a/daliuge-engine/dlg/droputils.py b/daliuge-engine/dlg/droputils.py index 2c21042fb..4ce78c231 100644 --- a/daliuge-engine/dlg/droputils.py +++ b/daliuge-engine/dlg/droputils.py @@ -117,7 +117,7 @@ def __exit__(self, typ, value, tb): ) -def allDropContents(drop, bufsize=4096): +def allDropContents(drop, bufsize=4096) -> bytes: """ Returns all the data contained in a given DROP """ @@ -267,24 +267,24 @@ def listify(o): return [o] -# def save_pickle(drop: DataDROP, data: Any): -# """Saves a python object in pkl format""" -# pickle.dump(data, drop) +def save_pickle(drop: DataDROP, data: Any): + """Saves a python object in pkl format""" + pickle.dump(data, drop) -# def load_pickle(drop: DataDROP) -> Any: -# """Loads a pkl formatted data object stored in a DataDROP. -# Note: does not support streaming mode. -# """ -# buf = io.BytesIO() -# desc = drop.open() -# while True: -# data = drop.read(desc) -# if not data: -# break -# buf.write(data) -# drop.close(desc) -# return pickle.loads(buf.getbuffer()) +def load_pickle(drop: DataDROP) -> Any: + """Loads a pkl formatted data object stored in a DataDROP. + Note: does not support streaming mode. + """ + buf = io.BytesIO() + desc = drop.open() + while True: + data = drop.read(desc) + if not data: + break + buf.write(data) + drop.close(desc) + return pickle.loads(buf.getbuffer()) # async def save_pickle_iter(drop: DataDROP, data: Iterable[Any]): @@ -298,7 +298,7 @@ def listify(o): # yield pickle.load(p) -def save_numpy(drop: DataDROP, ndarray: np.ndarray, allow_pickle=False): +def save_npy(drop: DataDROP, ndarray: np.ndarray, allow_pickle=False): """ Saves a numpy ndarray to a drop in npy format """ @@ -312,7 +312,11 @@ def save_numpy(drop: DataDROP, ndarray: np.ndarray, allow_pickle=False): dropio.close() -def load_numpy(drop: DataDROP, allow_pickle=False) -> np.ndarray: +def save_numpy(drop: DataDROP, ndarray: np.ndarray): + save_npy(drop, ndarray) + + +def load_npy(drop: DataDROP, allow_pickle=False) -> np.ndarray: """ Loads a numpy ndarray from a drop in npy format """ @@ -323,6 +327,10 @@ def load_numpy(drop: DataDROP, allow_pickle=False) -> np.ndarray: return res +def load_numpy(drop: DataDROP): + return load_npy(drop) + + # def save_jsonp(drop: PathBasedDrop, data: Dict[str, object]): # with open(drop.path, 'r') as f: # json.dump(data, f) diff --git a/docs/development/app_development/app_index.rst b/docs/development/app_development/app_index.rst index b15e5ed45..a0959985b 100644 --- a/docs/development/app_development/app_index.rst +++ b/docs/development/app_development/app_index.rst @@ -25,6 +25,7 @@ integration and testing during component development. As mentioned already, for dynlib_components docker_components service_components + pyfunc_components datadrop_io wrap_existing test_and_debug diff --git a/docs/development/app_development/datadrop_io.rst b/docs/development/app_development/datadrop_io.rst index b9a8fc57b..b76feb82f 100644 --- a/docs/development/app_development/datadrop_io.rst +++ b/docs/development/app_development/datadrop_io.rst @@ -1,3 +1,5 @@ +.. _datadrop_io: + DataDROP I/O ============ @@ -24,15 +26,18 @@ Writing data into an output drop is similar but simpler. Application authors nee one or more times the :attr:`write ` method with the data that needs to be written. -String Serialization --------------------- +Serialization +------------- -Many data drops are capable of storing data in different formats managed by the app drop. +Many data components are capable of storing data in multiple formats determined by the drop component. The common data io interface allows app components to be compatible with many data component types, however different app components connected to the same data component must use compatible serialization and deserialization types and utilities. + +String Serialization +^^^^^^^^^^^^^^^^^^^^ Raw String """""""""" -The simplest serialization format supported directly by `DataDrop.write` and `DataDrop.read`. +The simplest deserialization format supported directly by `DataDrop.write` and `DataDrop.read`. JSON (.json) """""""""""" @@ -59,26 +64,34 @@ XML (.xml) """""""""" Markup format with similar features to YAML but with the addition of attributes. Serialization can be performed -using `dicttoxml` or both serialization and deserialiation using `xml.etree.ElementTree`. +using `dicttoxml` or both serialization and deserialization using `xml.etree.ElementTree`. + +Python Eval (.py) +""""""""""""""""" + +Python expressions and literals are valid string serialization formats whereby the string data is iterpreted as python code. Serialization is typically performed using the `__repr__` instance method and deserialization using `eval` or `ast.eval_literal`. Binary Serialization --------------------- +^^^^^^^^^^^^^^^^^^^^ Data drops may also store binary formats that are typically more efficient than string formats and may utilize the python buffer protocol. +Raw Bytes +""""""""" + +Data drops can always be read as raw bytes using `droputils.allDropContents` and written to using `DataDROP.write`. Reading as a bytes object creates a readonly in-memory data copy that may not be as performant as other drop utilities. + Pickle (.pkl) """"""""""""" -Default serialazation format. Use `save_pickle` for serialization to this format and -`allDropContents` or `load_pickle` for deserialization. - +Default serialazation format capable of serializing any python object. Use `save_pickle` for serialization to this format and `load_pickle` for deserialization. Numpy (.npy) """""""""""" -Portable numpy serialization format. Use `save_numpy` +Portable numpy serialization format. Use `save_numpy` for serialization and `load_numpy` for deserialization. Numpy Zipped (.npz) """"""""""""""""""" @@ -87,15 +100,15 @@ Portable zipped numpy serialization format. Consists of a .zip directory holding files. Table Serialization -------------------- +^^^^^^^^^^^^^^^^^^^ -parquet (.parquet) +parquet (.parquet) """"""""""""""""""" Open source column-based relational data format from Apache. -Drop Specialized Serialization ------------------------------- +Specialized Serialization +^^^^^^^^^^^^^^^^^^^^^^^^^ Data drops such as RDBMSDrop drops manage their own record format and are interfaced using relational data objects such `dict`, `pyarrow.RecordBatch` or `pandas.DataFrame`. \ No newline at end of file diff --git a/docs/development/app_development/pyfunc_components.rst b/docs/development/app_development/pyfunc_components.rst new file mode 100644 index 000000000..82601186c --- /dev/null +++ b/docs/development/app_development/pyfunc_components.rst @@ -0,0 +1,26 @@ +.. _pyfunc_components: + +Pyfunc Components +================= + +Pyfunc components are generalized python component that can be configured to behave as a custom python component entirely through component parameters and application arguments. A pyfunc component +maps directly to an existing python function or a lambda expression, named application arguments and input ports are mapped to the function keyword args, and the result is mapping to the output port. + +Port Parsers +------------ + +Pyfunc components when interfacing with data drops may utilize one of several builtin port parsing formats. + +* Pickle +* Eval +* Path +* Url +* Npy + +Basic setup +----------- + +Note +---- + +Only a single parser and \ No newline at end of file diff --git a/docs/development/app_development/python_components.rst b/docs/development/app_development/python_components.rst index 5a9fee90c..f51d07c2c 100644 --- a/docs/development/app_development/python_components.rst +++ b/docs/development/app_development/python_components.rst @@ -57,18 +57,19 @@ GREAT! In exactly the same manner you can work along to change the functionality Obviously you can add more than one component class to the file ``app_components.py``, or add multiple files to the directory. Just don't forget to update the file ``__init__.py`` accordingly as well. -Remove boylerplate and add your documentation +Remove boilerplate and add your documentation --------------------------------------------- Next step is to clean up the mess from the boylerplate template and update the documentation of our new |daliuge| component. The first thing is to remove the files `ABOUT_THIS_TEMPLATE.md` and `CONTRIBUTING.md`. The next step is to update the file `README.md`. Open that file and remove everything above ```` and then do exactly what is written on that line: *Write your project README below!*. Then save the file. Make sure the LICENSE file contains a license you (and your employer) are happy with. If you had to install any additional Python packages, make sure they are listed in the ``requriements-test.txt`` and ``requirements.txt`` files and modify the file ``setup.py`` as required. Finally add more detailed documentation to the docs directory. This will then also be published on readthedocs whenever you push to the main branch. After that you will have a pretty nice and presentable component package already. -Using parameters ----------------- -Typically your code allows some user inputs in the form of parameters and/or keywords. |daliuge| supports that as well and the end user of your component will be able to populate the values for such parameters in EAGLE during the development of the workflows using your component. In order to make this happen you will need to expose the parameters through the component interface and also document them appropriately so that EAGLE can display that information to the end user. Since the end-users of your component will want to specify the values of these parameters through the EAGLE editor there are a few tricks required to enable that. For you as the developer of a component this is pretty much invisible, but you need to use the API. |daliuge| is currently offering six types of parameters: +Adding Parameters and App Arguments +----------------------------------- +Typically workflows require some user configuration in addition to data. |daliuge| supports this in the form of parameters and/or app arguments and the end user of your component will be able to populate the values for such components in EAGLE during the development of the workflows. In order to make this happen you will need to declare the parameters through the component interface and also document them appropriately so that EAGLE can provide the parameters in the component palette to the end user. Since the end-users of your component will want to specify the values of these parameters through the EAGLE editor there are a few tricks required to enable that. For you as the developer of a component this is pretty much invisible, but you need to use the API. |daliuge| is currently offering six types of parameters: #. dlg_string_param #. dlg_int_param #. dlg_float_param #. dlg_bool_param +#. dlg_enum_param #. dlg_list_param #. dlg_dict_param @@ -76,23 +77,25 @@ For example to define a greeting parameter for a HelloWorld application you can .. code-block:: python - greet = dlg_string_param("greet", "World") + greet = dlg_int_param("index", 0) -in the global block of your application class. This will give you access to the parameters passed on through the graph to your component at run time. Another example is shown below, if you have a parameter called ``index`` you can get the value from the graph at run time by adding a single line to your ``initialize`` method: +as a member of the custom component class. At runtime the param will be passed on through the graph to the component and converted to the string type after class initialization. Another example is shown below, if you have a parameter called ``index`` you can get the value from the graph at run time by adding a single line to your ``initialize`` method: .. _graphs.figs.tmpl_params1.png: .. figure:: ../../images/tmpl_params1.png -you should always do that before calling the initialize of the base class, in the example the ``BarrierAppDROP`` class and add an appropriate variable to the object's name space (``self.index``). In that way all other methods will have access to the index parameter's value. Then you should also add a line to the doxygen in-line documentation like this: +you should always do that before calling the initialize of the base class, in the example the ``BarrierAppDROP`` class and add an appropriate variable to the object (``self.index``) such that all other methods will have access to the index parameter's value. Then you should also add a line to the doxygen in-line documentation like this: .. _graphs.figs.tmpl_params2: .. figure:: ../../images/tmpl_params2.png see chapter :doc:`eagle_app_integration` for more details on the syntax. When you now checkin your code to the github repo a github action will generate the palette (JSON description of your components) automatically and you can load it into EAGLE to construct a workflow. -Adding input and output ports +Adding Input and Output Ports ----------------------------- -Ports are another way of getting data and information in and out of your component. Ports are always connected to data components and provide the application component with a homogeneous I/O interface. You write whatever you want to an output port, but be aware that other components, maybe not developed by yourself, will need to be able to understand and interpret correctly. In the same spirit you might not be responsible for what is presented to your component on the input ports, but you certainly need to be able to read and use that information. The first step to make sure this will fit in a workflow, is to document your own inputs and outputs and check the data on the inputs for compliance with what you are expecting. |daliuge|, or more precisely EAGLE is using that information to guide the users developing a workflow and by default allows connections only between matching ports. Again this is based on the doxygen description of your components ports, which look like this: +Ports are how runtime data and information move in and out of your component. Ports are always connected to data components and provide the application component with a homogeneous I/O interface. App components can write whatever data you want to an output port, but be aware that other components, maybe not developed by yourself, will need a compatible reader to interpret the data. In the same spirit you might not be responsible for what is presented to your component on the input ports, but you certainly need to be able to read and use that information. See chapter :doc:`datadrop_io` for more details. + +The first step to make sure this will fit in a workflow, is to document your own inputs and outputs and check the data on the inputs for compliance with what you are expecting. |daliuge|, or more precisely EAGLE is using that information to guide the users developing a workflow and by default allows connections only between matching ports. Again this is based on the doxygen description of your components ports, which look like this: .. _graphs.figs.tmpl_ports1: .. figure:: ../../images/tmpl_ports1.png @@ -112,4 +115,6 @@ Your ``run`` method could look very simple and essentially always the same, but Consider Granularity and Parallelism ------------------------------------ -You can put very complex and even complete applications inside a component, but that is not the idea. In fact components should perform quite limited tasks, which should in general be useful for other, ideally many workflows. There is always a trade-off between overhead and functionality as well. Although the template makes the development of components quite easy, it still is an overhead, compared to just adding a few lines of code in some existing component. One of the driving requirements to write a new component might thus be whether the functionality of the new component is generic enough to be useful. There might also be other ways of implementing that same functionality and thus there might be a choice of components providing that. The other, really important consideration is parallelism. In general you should never do that inside a component, but leave that to the developer of the workflow itself. |daliuge| is mainly about distributing and optimizing the distribution of such parallel tasks (instances of components). You should aim to give the |daliuge| engine as many degrees of freedom as possible to deploy the final workflow on the available platform. When developing a component you won't know in what kind of workflows it is going to be used, nor will you know how big and complex those workflows are. Thus, don't assume anything and implement just the functionality to deal with a single, atomic entity of the data the component has to deal with. That also makes the implementation easier and much more straight forward. +You can put very complex and even complete applications inside a component, but this limits code reusability and daliuge only provides scheduling and deployment parallelism down to the component level. In fact components should perform quite limited tasks, which should in general be useful for other, ideally many workflows. There is always a trade-off between overhead and functionality as well. Although the template makes the development of components quite easy, it still is an overhead, compared to just adding a few lines of code in some existing component. One of the driving requirements to write a new component might thus be whether the functionality of the new component is generic enough to be useful. There might also be other ways of implementing that same functionality and thus there might be a choice of components providing that. + +The other, really important consideration is parallelism. In general you should never do that inside a component, but leave that to the developer of the workflow itself. |daliuge| is mainly about distributing and optimizing the distribution of such parallel tasks (instances of components). You should aim to give the |daliuge| engine as many degrees of freedom as possible to deploy the final workflow on the available platform. When developing a component you won't know in what kind of workflows it is going to be used, nor will you know how big and complex those workflows are. Thus, don't assume anything and implement just the functionality to deal with a single, atomic entity of the data the component has to deal with. That also makes the implementation easier and much more straight forward. diff --git a/docs/development/app_development/special_components.rst b/docs/development/app_development/special_components.rst index cdd1e44f2..23549419c 100644 --- a/docs/development/app_development/special_components.rst +++ b/docs/development/app_development/special_components.rst @@ -7,7 +7,6 @@ In addition users can develop a number of specialized components, which are base #. Start and Stop Components #. Branch Components #. MPI Components -#. Python-function Components #. Archiving/store Components Descriptions TODO From c5692b5a7f65d2ea9954ff2e1b222bfff9133e08 Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Wed, 25 May 2022 18:08:53 +0800 Subject: [PATCH 029/183] test parsers --- daliuge-engine/test/apps/test_pyfunc.py | 71 +++++++++++++++++++++++-- 1 file changed, 66 insertions(+), 5 deletions(-) diff --git a/daliuge-engine/test/apps/test_pyfunc.py b/daliuge-engine/test/apps/test_pyfunc.py index 4f7448e43..7a9181fbb 100644 --- a/daliuge-engine/test/apps/test_pyfunc.py +++ b/daliuge-engine/test/apps/test_pyfunc.py @@ -27,6 +27,7 @@ import unittest import pkg_resources import json +import numpy from ..manager import test_dm from dlg import droputils, graph_loader @@ -79,8 +80,6 @@ def _PyFuncApp(oid, uid, f, **kwargs): func_name=fname, func_code=fcode, func_defaults=fdefaults, - input_parser=pyfunc.DropParser.PICKLE, - output_parser=pyfunc.DropParser.PICKLE, **kwargs ) @@ -124,22 +123,84 @@ def inner_function(x, y): _PyFuncApp("a", "a", inner_function) - def _test_simple_functions(self, f, input_data, output_data): + def test_pickle_func(self, f = lambda x: x, input_data="hello", output_data="hello"): + a = InMemoryDROP("a", "a") + b = _PyFuncApp("b", "b", f) + c = InMemoryDROP("c", "c") + + b.addInput(a) + b.addOutput(c) + + with DROPWaiterCtx(self, c, 5): + droputils.save_pickle(a, input_data) + a.setCompleted() + for drop in a, b, c: + self.assertEqual(DROPStates.COMPLETED, drop.status) + self.assertEqual( + output_data, droputils.load_pickle(c) + ) + + def test_eval_func(self, f = lambda x: x, input_data=None, output_data=None): + input_data = [2,2] if input_data is None else input_data + output_data = [2,2] if output_data is None else output_data + + a = InMemoryDROP("a", "a") + b = _PyFuncApp("b", "b", f, + input_parser=pyfunc.DropParser.EVAL, + output_parser=pyfunc.DropParser.EVAL + ) + c = InMemoryDROP("c", "c") + + b.addInput(a) + b.addOutput(c) + + with DROPWaiterCtx(self, c, 5): + a.write(repr(input_data).encode('utf-8')) + a.setCompleted() + for drop in a, b, c: + self.assertEqual(DROPStates.COMPLETED, drop.status) + self.assertEqual( + output_data, eval(droputils.allDropContents(c).decode('utf-8'), {}, {}) + ) + + def test_npy_func(self, f = lambda x: x, input_data=None, output_data=None): + input_data = numpy.ones([2,2]) if input_data is None else input_data + output_data = numpy.ones([2,2]) if output_data is None else output_data + a = InMemoryDROP("a", "a") + b = _PyFuncApp("b", "b", f, + input_parser=pyfunc.DropParser.NPY, + output_parser=pyfunc.DropParser.NPY + ) + c = InMemoryDROP("c", "c") + + b.addInput(a) + b.addOutput(c) + + with DROPWaiterCtx(self, c, 5): + droputils.save_npy(a, input_data) + a.setCompleted() + for drop in a, b, c: + self.assertEqual(DROPStates.COMPLETED, drop.status) + numpy.testing.assert_equal( + output_data, droputils.load_npy(c) + ) + + def _test_simple_functions(self, f, input_data, output_data): a, c = [InMemoryDROP(x, x) for x in ("a", "c")] b = _PyFuncApp("b", "b", f) b.addInput(a) b.addOutput(c) with DROPWaiterCtx(self, c, 5): - a.write(pickle.dumps(input_data)) # @UndefinedVariable + a.write(pickle.dumps(input_data)) a.setCompleted() for drop in a, b, c: self.assertEqual(DROPStates.COMPLETED, drop.status) self.assertEqual( output_data, pickle.loads(droputils.allDropContents(c)) - ) # @UndefinedVariable + ) def test_func1(self): """Checks that func1 in this module works when wrapped""" From 8afb9cd1092bb73df81e5e76821ee7863c731223 Mon Sep 17 00:00:00 2001 From: Andreas Wicenec Date: Wed, 25 May 2022 22:36:34 +0800 Subject: [PATCH 030/183] Additional checks in pyfunc to remove arguments from initial list --- daliuge-engine/dlg/apps/pyfunc.py | 37 ++++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/daliuge-engine/dlg/apps/pyfunc.py b/daliuge-engine/dlg/apps/pyfunc.py index 31bc301d3..f1c760e0e 100644 --- a/daliuge-engine/dlg/apps/pyfunc.py +++ b/daliuge-engine/dlg/apps/pyfunc.py @@ -430,6 +430,17 @@ def astparse(x): kwargs = {} self.pargs = [] pargsDict = collections.OrderedDict(zip(posargs,[None]*len(posargs))) # Initialize pargs dictionary + if "applicationArgs" in self.parameters: + appArgs = self.parameters["applicationArgs"] # we'll pop the identified ones + if ('outputs' in self.parameters and isinstance(self.parameters['outputs'][0], dict)): + out_names = [list(i.values())[0] for i in self.parameters['outputs']] + logger.debug(f"Using named ports to remove outputs from arguments: "+\ + f"{out_names}") + _dum = [appArgs.pop(k) for k in out_names if k in appArgs] + if len(_dum) > 0: + logger.debug("Application arguments used as outputs removed : %s", + [i['text'] for i in _dum]) + if ('inputs' in self.parameters and isinstance(self.parameters['inputs'][0], dict)): logger.debug(f"Using named ports to identify inputs: "+\ f"{self.parameters['inputs']}") @@ -447,24 +458,34 @@ def astparse(x): for i in range(min(len(inputs),self.fn_nargs)): kwargs.update({self.arguments.args[i]: list(inputs.values())[i]}) - logger.debug(f"updating funcargs with input ports {kwargs}") + logger.debug(f"Updating funcargs with input ports {kwargs}") self.funcargs.update(kwargs) + _dum = [appArgs.pop(k) for k in kwargs if k in appArgs] + logger.debug("Application arguments used as inputs removed: %s", + [i['text'] for i in _dum]) + + logger.debug("Found input ports matching posargs: %s", list(pargsDict.keys())) # Try to get values for still missing positional arguments from Application Args if "applicationArgs" in self.parameters: - appArgs = self.parameters["applicationArgs"] # we'll pop them _dum = [appArgs.pop(k) for k in self.func_def_keywords if k in appArgs] + logger.debug("Identified keyword arguments removed: %s", + [i['text'] for i in _dum]) + _dum = [appArgs.pop(k) for k in pargsDict if k in appArgs] + logger.debug("Identified positional arguments removed: %s", + [i['text'] for i in _dum]) for pa in posargs: - if pa not in self.funcargs: - if pa in appArgs and pa != 'self': + if pa != 'self' and pa not in self.funcargs: + if pa in appArgs: arg = appArgs.pop(pa) value = arg['value'] ptype = arg['type'] if ptype in ["Complex", "Json"]: try: value = ast.literal_eval(value) - except ValueError: - pass + except Exception as e: + # just go on if this did not work + logger.warning("Eval raised an error: %s",e) elif ptype in ["Python"]: try: import numpy @@ -472,9 +493,9 @@ def astparse(x): except: pass pargsDict.update({pa: value}) - elif pa != 'self': + elif pa != 'self' and pa not in pargsDict: logger.warning(f"Required positional argument '{pa}' not found!") - logger.debug(f"updating posargs with {list(kwargs.values())}") + logger.debug(f"updating posargs with {list(pargsDict.keys())}") self.pargs.extend(list(pargsDict.values())) # Try to get values for still missing kwargs arguments from Application kws From 8635f9e907b8f65e32772d471552bfc15661d600 Mon Sep 17 00:00:00 2001 From: Andreas Wicenec Date: Thu, 26 May 2022 00:24:57 +0800 Subject: [PATCH 031/183] Additional fixes for posarg treatment --- daliuge-engine/dlg/apps/pyfunc.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/daliuge-engine/dlg/apps/pyfunc.py b/daliuge-engine/dlg/apps/pyfunc.py index f1c760e0e..200fef366 100644 --- a/daliuge-engine/dlg/apps/pyfunc.py +++ b/daliuge-engine/dlg/apps/pyfunc.py @@ -429,9 +429,18 @@ def astparse(x): posargs = self.arguments.args[:self.fn_npos] kwargs = {} self.pargs = [] - pargsDict = collections.OrderedDict(zip(posargs,[None]*len(posargs))) # Initialize pargs dictionary + # Initialize pargs dictionary and update with provided argument values + pargsDict = collections.OrderedDict(zip(posargs,[None]*len(posargs))) if "applicationArgs" in self.parameters: appArgs = self.parameters["applicationArgs"] # we'll pop the identified ones + pargsDict.update({k:self.parameters[k] for k in pargsDict if k in + self.parameters}) + # if defined in both we use AppArgs values + pargsDict.update({k:appArgs[k]['value'] for k in pargsDict if k + in appArgs}) + logger.debug("Initial posargs dictionary: %s", pargsDict) + else: + appArgs = {} if ('outputs' in self.parameters and isinstance(self.parameters['outputs'][0], dict)): out_names = [list(i.values())[0] for i in self.parameters['outputs']] logger.debug(f"Using named ports to remove outputs from arguments: "+\ @@ -450,21 +459,20 @@ def astparse(x): key = list(self.parameters['inputs'][i].values())[0] # value for final dict is value in inputs dict value = inputs[list(self.parameters['inputs'][i].keys())[0]] + if not value: value = '' # make sure we are passing NULL drop events if key in posargs: pargsDict.update({key:value}) else: kwargs.update({key:value}) + _dum = appArgs.pop(key) + logger.debug("Using input %s for argument %s", value, key) + logger.debug("Argument used as input removed: %s", _dum) else: for i in range(min(len(inputs),self.fn_nargs)): kwargs.update({self.arguments.args[i]: list(inputs.values())[i]}) logger.debug(f"Updating funcargs with input ports {kwargs}") self.funcargs.update(kwargs) - _dum = [appArgs.pop(k) for k in kwargs if k in appArgs] - logger.debug("Application arguments used as inputs removed: %s", - [i['text'] for i in _dum]) - - logger.debug("Found input ports matching posargs: %s", list(pargsDict.keys())) # Try to get values for still missing positional arguments from Application Args if "applicationArgs" in self.parameters: From 9fda4d11328fcc7192cb557c4a2ff060462c47ca Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Thu, 26 May 2022 12:03:54 +0800 Subject: [PATCH 032/183] droputils tests --- daliuge-engine/test/test_droputils.py | 42 ++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/daliuge-engine/test/test_droputils.py b/daliuge-engine/test/test_droputils.py index 109ef2596..07b93a43a 100644 --- a/daliuge-engine/test/test_droputils.py +++ b/daliuge-engine/test/test_droputils.py @@ -25,11 +25,14 @@ @author: rtobar """ +import subprocess import unittest +import numpy + from dlg import droputils from dlg.common import dropdict, Categories -from dlg.drop import InMemoryDROP, FileDROP, BarrierAppDROP +from dlg.drop import InMemoryDROP, FileDROP, BarrierAppDROP, PlasmaDROP, SharedMemoryDROP from dlg.droputils import DROPFile @@ -141,6 +144,43 @@ def testGetEndNodes(self): endNodes = droputils.getLeafNodes(a) self.assertSetEqual(set([j, f]), set(endNodes)) + def _test_datadrop_function(self, test_function, input_data): + # basic datadrop + for drop_type in (InMemoryDROP,FileDROP): + test_function(drop_type, input_data) + + #plasma datadrop + store = None + try: + store = subprocess.Popen( + ["plasma_store", "-m", "1000000", "-s", "/tmp/plasma"] + ) + test_function(PlasmaDROP, input_data) + finally: + if store: + store.terminate() + + def _test_save_load_pickle(self, drop_type, data): + drop = drop_type("a", "a") + droputils.save_pickle(drop, data) + drop.setCompleted() + output_data = droputils.load_pickle(drop) + self.assertEqual(data, output_data) + + def test_save_load_pickle(self): + input_data = {'nested': {'data': {'object': {}}}} + self._test_datadrop_function(self._test_save_load_pickle, input_data) + + def _test_save_load_npy(self, drop_type, data): + drop = drop_type("a", "a") + droputils.save_npy(drop, data) + output_data = droputils.load_npy(drop) + numpy.testing.assert_equal(data, output_data) + + def test_save_load_npy(self): + input_data = numpy.ones([3,5]) + self._test_datadrop_function(self._test_save_load_npy, input_data) + def test_DROPFile(self): """ This test exercises the DROPFile mechanism to read the data represented by From b9f5eac6189b21e1b62dbc69db1ac0c41b211d73 Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Thu, 26 May 2022 12:11:47 +0800 Subject: [PATCH 033/183] pyfunc docs --- daliuge-engine/test/test_droputils.py | 2 +- .../app_development/pyfunc_components.rst | 16 +++++++--------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/daliuge-engine/test/test_droputils.py b/daliuge-engine/test/test_droputils.py index 07b93a43a..529ec9c33 100644 --- a/daliuge-engine/test/test_droputils.py +++ b/daliuge-engine/test/test_droputils.py @@ -32,7 +32,7 @@ from dlg import droputils from dlg.common import dropdict, Categories -from dlg.drop import InMemoryDROP, FileDROP, BarrierAppDROP, PlasmaDROP, SharedMemoryDROP +from dlg.drop import InMemoryDROP, FileDROP, BarrierAppDROP, PlasmaDROP from dlg.droputils import DROPFile diff --git a/docs/development/app_development/pyfunc_components.rst b/docs/development/app_development/pyfunc_components.rst index 82601186c..6e6fc1d23 100644 --- a/docs/development/app_development/pyfunc_components.rst +++ b/docs/development/app_development/pyfunc_components.rst @@ -11,16 +11,14 @@ Port Parsers Pyfunc components when interfacing with data drops may utilize one of several builtin port parsing formats. -* Pickle -* Eval -* Path -* Url -* Npy +* Pickle - Reads and writes data to pickle format +* Eval - Reads data using eval() function and writes using repr() function +* Npy - Reads and writes to .npy format +* Path - Reads the drop path rather than data +* Url - Reads the drop url rather than data -Basic setup ------------ Note ----- +"""" -Only a single parser and \ No newline at end of file +Only a single port parser can currently be used for all input ports of a Pyfunc. This is subject to change in future. From 7bb1ed77ad3acb8ca12c5fc91f0051e286d9fb09 Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Thu, 26 May 2022 12:55:25 +0800 Subject: [PATCH 034/183] palette update --- daliuge-engine/dlg/apps/pyfunc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daliuge-engine/dlg/apps/pyfunc.py b/daliuge-engine/dlg/apps/pyfunc.py index 49ab044a6..2f5b6a1f9 100644 --- a/daliuge-engine/dlg/apps/pyfunc.py +++ b/daliuge-engine/dlg/apps/pyfunc.py @@ -148,7 +148,7 @@ class DropParser(Enum): # \~English Python function name # @param[in] aparam/func_code Function Code//String/readwrite/False//False/ # \~English Python function code, e.g. 'def function_name(args): return args' -# @param[in] aparam/input_parser Input Parser/pickle/Select/readwrite/False/pickle,npy,eval,path,dataurl/False/ +# @param[in] aparam/input_parser Input Parser/pickle/Select/readwrite/False/pickle,eval,npy,path,dataurl/False/ # \~English Input port parsing technique # @param[in] aparam/output_parser Output Parser/pickle/Select/readwrite/False/pickle,eval,npy/False/ # \~English output port parsing technique From f904cdfc95911e523bf7aeb438e1da02afc222e2 Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Fri, 27 May 2022 16:04:07 +0800 Subject: [PATCH 035/183] review changes --- daliuge-engine/dlg/apps/pyfunc.py | 4 +--- docs/development/app_development/pyfunc_components.rst | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/daliuge-engine/dlg/apps/pyfunc.py b/daliuge-engine/dlg/apps/pyfunc.py index 2f5b6a1f9..25490184c 100644 --- a/daliuge-engine/dlg/apps/pyfunc.py +++ b/daliuge-engine/dlg/apps/pyfunc.py @@ -30,7 +30,7 @@ import logging import pickle -from typing import Callable, Optional +from typing import Callable import dill from io import StringIO from contextlib import redirect_stdout @@ -39,10 +39,8 @@ from dlg.drop import BarrierAppDROP from dlg.exceptions import InvalidDropException from dlg.meta import ( - dlg_bool_param, dlg_string_param, dlg_enum_param, - dlg_float_param, dlg_dict_param, dlg_component, dlg_batch_input, diff --git a/docs/development/app_development/pyfunc_components.rst b/docs/development/app_development/pyfunc_components.rst index 6e6fc1d23..d6e4dd19c 100644 --- a/docs/development/app_development/pyfunc_components.rst +++ b/docs/development/app_development/pyfunc_components.rst @@ -3,7 +3,7 @@ Pyfunc Components ================= -Pyfunc components are generalized python component that can be configured to behave as a custom python component entirely through component parameters and application arguments. A pyfunc component +Pyfunc components are generalized python components that can be configured to behave as a custom python component entirely through component parameters and application arguments. A pyfunc component maps directly to an existing python function or a lambda expression, named application arguments and input ports are mapped to the function keyword args, and the result is mapping to the output port. Port Parsers From 767c9eff808e8b64301624f41eb98dab4ea856d5 Mon Sep 17 00:00:00 2001 From: Moritz Wicenec Date: Mon, 30 May 2022 16:11:52 +0800 Subject: [PATCH 036/183] added a shortcuts system that makes it as easy to add new shortcuts as in eagle. new modal showing these shortcuts and new about modal. added help menu in the navbar housing these options as well --- daliuge-translator/dlg/dropmake/web/main.js | 28 ++++++- .../dlg/dropmake/web/pg_viewer.html | 49 +++++++++++++ .../dlg/dropmake/web/src/main.css | 73 +++++++++++++++++++ 3 files changed, 146 insertions(+), 4 deletions(-) diff --git a/daliuge-translator/dlg/dropmake/web/main.js b/daliuge-translator/dlg/dropmake/web/main.js index d931c176a..4937f7d96 100644 --- a/daliuge-translator/dlg/dropmake/web/main.js +++ b/daliuge-translator/dlg/dropmake/web/main.js @@ -22,14 +22,34 @@ $(document).ready(function () { updateDeployOptionsDropdown() //keyboard shortcuts + var keyboardShortcuts = [] + keyboardShortcuts.push({name:"Open Settings", shortcut:"O", code:79, action: "$('#settingsModal').modal('toggle')"}) + keyboardShortcuts.push({name:"Deploy", shortcut:"D", code:75, action: "$('#shortcutsModal').modal('toggle')"}) + keyboardShortcuts.push({name:"Open Keyboardshortcuts Modal", shortcut:"K", code:68, action: "$('#activeDeployMethodButton').click()"}) + + //fill out keyboard shortcuts modal + keyboardShortcuts.forEach(element => { + var shortCutItem = '
'+ + '
'+ + ''+element.name+''+ + ''+element.shortcut+''+ + '
'+ + '
' + $("#shortcutsModal .modal-body .row").append(shortCutItem) + }) + + //keyboard shortcuts execution $(document).keydown(function(e){ if($("input").is(":focus")){ return } - if (e.which == 79) //open settings modal on o - { - $('#settingsModal').modal('toggle') - }; + keyboardShortcuts.forEach(element => { + + if (e.which == element.code) //open settings modal on o + { + eval(element.action) + } + }) }) }); diff --git a/daliuge-translator/dlg/dropmake/web/pg_viewer.html b/daliuge-translator/dlg/dropmake/web/pg_viewer.html index 6ea40d164..88eacf453 100755 --- a/daliuge-translator/dlg/dropmake/web/pg_viewer.html +++ b/daliuge-translator/dlg/dropmake/web/pg_viewer.html @@ -111,6 +111,15 @@ Screenshot + @@ -162,6 +171,7 @@ +