From 2f121bc4d5a65b74054591badd048cdeb34e797e Mon Sep 17 00:00:00 2001 From: Andreas Wicenec Date: Fri, 12 Apr 2024 22:54:44 +0800 Subject: [PATCH] avahi host names and bash command line arguments. Enabled dlg-trans.local and dg-engine.local as host names through avahi zeroconf. Enabled passing of command line argument values through data drops for bash shell components. --- daliuge-common/dlg/clients.py | 4 +- daliuge-common/dlg/restutils.py | 1 + daliuge-common/docker/Dockerfile.dev | 3 +- daliuge-engine/dlg/apps/bash_shell_app.py | 10 +- daliuge-engine/dlg/apps/pyfunc.py | 43 ++------ daliuge-engine/dlg/apps/simple.py | 14 +-- daliuge-engine/dlg/data/drops/s3_drop.py | 13 +-- daliuge-engine/dlg/group.template | 3 + daliuge-engine/dlg/manager/rest.py | 78 +++++-------- daliuge-engine/dlg/named_port_utils.py | 90 +++++++++++---- daliuge-engine/dlg/passwd.template | 1 + daliuge-engine/docker/Dockerfile.dev | 1 + daliuge-engine/run_engine.sh | 35 +++--- daliuge-engine/test/apps/test_pyfunc.py | 9 +- daliuge-translator/build_translator.sh | 2 +- .../dlg/dropmake/web/translator_rest.py | 103 +++++------------- .../dlg/dropmake/web/translator_utils.py | 34 ++++-- daliuge-translator/docker/Dockerfile.dev | 1 + daliuge-translator/run_translator.sh | 16 ++- 19 files changed, 218 insertions(+), 243 deletions(-) diff --git a/daliuge-common/dlg/clients.py b/daliuge-common/dlg/clients.py index a80868f08..8d27ba2cf 100644 --- a/daliuge-common/dlg/clients.py +++ b/daliuge-common/dlg/clients.py @@ -243,7 +243,7 @@ def get_log_file(self, sessionId): return self._request(f"/sessions/{sessionId}/logs", "GET") def get_submission_method(self): - return self._get_json("/submission_method") + return self._get_json("submission_method") class CompositeManagerClient(BaseDROPManagerClient): @@ -257,7 +257,7 @@ def remove_node(self, node): self._DELETE(f"/node/{node}") def get_submission_method(self): - return self._get_json("/submission_method") + return self._get_json("submission_method") class DataIslandManagerClient(CompositeManagerClient): diff --git a/daliuge-common/dlg/restutils.py b/daliuge-common/dlg/restutils.py index 108f6306c..7bfbb3cdc 100644 --- a/daliuge-common/dlg/restutils.py +++ b/daliuge-common/dlg/restutils.py @@ -190,6 +190,7 @@ def _request(self, url, method, content=None, headers={}, timeout=10): if content and hasattr(content, "read"): headers["Transfer-Encoding"] = "chunked" + headers["Origin"] = "http://dlg-trans.local:8084" content = chunked(content) self._conn = http.client.HTTPConnection(self.host, self.port) diff --git a/daliuge-common/docker/Dockerfile.dev b/daliuge-common/docker/Dockerfile.dev index eba084089..f3a6589fe 100644 --- a/daliuge-common/docker/Dockerfile.dev +++ b/daliuge-common/docker/Dockerfile.dev @@ -7,8 +7,9 @@ FROM ubuntu:20.04 ARG BUILD_ID LABEL stage=builder LABEL build=$BUILD_ID +RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone RUN apt-get update && \ - apt-get install -y gcc python3 python3.8-venv python3-pip python3-distutils python3-appdirs libmetis-dev curl git sudo && \ + apt-get install -y avahi-utils gcc python3 python3.8-venv python3-pip python3-distutils python3-appdirs libmetis-dev curl git sudo && \ apt-get clean COPY / /daliuge diff --git a/daliuge-engine/dlg/apps/bash_shell_app.py b/daliuge-engine/dlg/apps/bash_shell_app.py index b4e446f53..d790787bc 100644 --- a/daliuge-engine/dlg/apps/bash_shell_app.py +++ b/daliuge-engine/dlg/apps/bash_shell_app.py @@ -41,7 +41,11 @@ import json from .. import droputils, utils -from dlg.named_port_utils import replace_named_ports +from dlg.named_port_utils import ( + DropParser, + get_port_reader_function, + replace_named_ports, +) from ..ddap_protocol import AppDROPStates, DROPStates from ..apps.app_base import BarrierAppDROP, AppDROP from ..exceptions import InvalidDropException @@ -51,6 +55,7 @@ dlg_batch_input, dlg_batch_output, dlg_streaming_input, + dlg_enum_param, ) @@ -163,6 +168,7 @@ class BashShellBase(object): # TODO: use the shlex module for most of the construction of the # command line to get a proper and safe shell syntax command = dlg_string_param("Bash command", None) + input_parser: DropParser = dlg_enum_param(DropParser, "input_parser", DropParser.PICKLE) # type: ignore def initialize(self, **kwargs): super(BashShellBase, self).initialize(**kwargs) @@ -214,6 +220,7 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE): outport_names = ( self.parameters["outputs"] if "outputs" in self.parameters else [] ) + reader = get_port_reader_function(self.input_parser) keyargs, pargs = replace_named_ports( inputs.items(), outputs.items(), @@ -222,6 +229,7 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE): self.appArgs, argumentPrefix=self._argumentPrefix, separator=self._paramValueSeparator, + parser=reader, ) argumentString = ( f"{' '.join(map(str,pargs + keyargs))}" # add kwargs to end of pargs diff --git a/daliuge-engine/dlg/apps/pyfunc.py b/daliuge-engine/dlg/apps/pyfunc.py index fcdd8c589..5cb042986 100644 --- a/daliuge-engine/dlg/apps/pyfunc.py +++ b/daliuge-engine/dlg/apps/pyfunc.py @@ -37,9 +37,14 @@ from io import StringIO from contextlib import redirect_stdout -from dlg import droputils, drop_loaders +from dlg import drop_loaders from dlg.utils import serialize_data, deserialize_data -from dlg.named_port_utils import check_ports_dict, identify_named_ports +from dlg.named_port_utils import ( + DropParser, + check_ports_dict, + get_port_reader_function, + identify_named_ports, +) from dlg.apps.app_base import BarrierAppDROP from dlg.exceptions import InvalidDropException from dlg.meta import ( @@ -136,16 +141,6 @@ def import_using_code(code): return dill.loads(code) -class DropParser(Enum): - RAW = "raw" - PICKLE = "pickle" - EVAL = "eval" - NPY = "npy" - # JSON = "json" - PATH = "path" # input only - DATAURL = "dataurl" # input only - - ## # @brief PythonMemberFunction # @details A placeholder APP to aid construction of new class member function applications. @@ -453,29 +448,7 @@ def run(self): """ funcargs = {} - # Inputs are un-pickled and treated as the arguments of the function - # Their order must be preserved, so we use an OrderedDict - if self.input_parser is DropParser.PICKLE: - # all_contents = lambda x: pickle.loads(droputils.allDropContents(x)) - all_contents = drop_loaders.load_pickle - elif self.input_parser is DropParser.EVAL: - - def optionalEval(x): - # Null and Empty Drops will return an empty byte string - # which should propogate back to None - content: str = droputils.allDropContents(x).decode("utf-8") - return ast.literal_eval(content) if len(content) > 0 else None - - all_contents = optionalEval - elif self.input_parser is DropParser.NPY: - all_contents = drop_loaders.load_npy - elif self.input_parser is DropParser.PATH: - all_contents = lambda x: x.path - elif self.input_parser is DropParser.DATAURL: - all_contents = lambda x: x.dataurl - else: - raise ValueError(self.input_parser.__repr__()) - + all_contents = get_port_reader_function(self.input_parser) inputs = collections.OrderedDict() for uid, drop in self._inputs.items(): inputs[uid] = all_contents(drop) diff --git a/daliuge-engine/dlg/apps/simple.py b/daliuge-engine/dlg/apps/simple.py index f27223101..620b1d5c0 100644 --- a/daliuge-engine/dlg/apps/simple.py +++ b/daliuge-engine/dlg/apps/simple.py @@ -25,8 +25,7 @@ import pickle import random from typing import List, Optional -import urllib.error -import urllib.request +import requests import logging import time import numpy as np @@ -647,18 +646,17 @@ class UrlRetrieveApp(BarrierAppDROP): def run(self): try: - u = urllib.request.urlopen(self.url) - except urllib.error.URLError as e: + logger.info("Accessing URL %s", self.url) + u = requests.get(self.url) + except requests.exceptions.RequestException as e: raise e.reason - content = u.read() - outs = self.outputs if len(outs) < 1: raise Exception("At least one output should have been added to %r" % self) for o in outs: - o.len = len(content) - o.write(content) # send content to all outputs + o.len = len(u.content) + o.write(u.content) # send content to all outputs ## diff --git a/daliuge-engine/dlg/data/drops/s3_drop.py b/daliuge-engine/dlg/data/drops/s3_drop.py index acd15b889..4c1e2fac7 100644 --- a/daliuge-engine/dlg/data/drops/s3_drop.py +++ b/daliuge-engine/dlg/data/drops/s3_drop.py @@ -46,7 +46,7 @@ dlg_list_param, ) -from dlg.named_port_utils import identify_named_ports, check_ports_dict +# from dlg.named_port_utils import identify_named_ports, check_ports_dict ## @@ -128,10 +128,10 @@ def getIO(self) -> DataIO: :return: """ logger.debug("S3DROP producers: %s", self._producers) - if check_ports_dict(self._producers): - self.mapped_inputs = identify_named_ports( - self._producers, {}, self.keyargs, mode="inputs" - ) + # if check_ports_dict(self._producers): + # self.mapped_inputs = identify_named_ports( + # self._producers, {}, self.keyargs, mode="inputs" + # ) logger.debug("Parameters found: {}", self.parameters) return S3IO( self.aws_access_key_id, @@ -309,8 +309,7 @@ def _close(self, **kwargs): Bucket=self._bucket, Key=self._key, UploadId=self._uploadId ) parts = [ - {"ETag": p["ETag"], "PartNumber": p["PartNumber"]} - for p in res["Parts"] + {"ETag": p["ETag"], "PartNumber": p["PartNumber"]} for p in res["Parts"] ] # TODO: Check checksum! res = self._s3.complete_multipart_upload( diff --git a/daliuge-engine/dlg/group.template b/daliuge-engine/dlg/group.template index 6a3fce748..2dd1a8eae 100644 --- a/daliuge-engine/dlg/group.template +++ b/daliuge-engine/dlg/group.template @@ -39,3 +39,6 @@ nogroup:x:65534: crontab:x:101: messagebus:x:102: ssh:x:103: +avahi:x:103: +netdev:x:104: + diff --git a/daliuge-engine/dlg/manager/rest.py b/daliuge-engine/dlg/manager/rest.py index 6e35d4ae7..b48881d10 100644 --- a/daliuge-engine/dlg/manager/rest.py +++ b/daliuge-engine/dlg/manager/rest.py @@ -78,28 +78,24 @@ def fwrapper(*args, **kwargs): bottle.response.content_type = "application/json" # set CORS headers origin = bottle.request.headers.raw("Origin") - if origin is None: - origin = "http://localhost:8084" - elif not re.match( - r"http://((localhost)|(127.0.0.1)):80[0-9][0-9]", origin + logger.debug("CORS request comming from: %s", origin) + if origin is None or re.match( + r"http://dlg-trans.local:80[0-9][0-9]", origin ): + origin = "http://dlg-trans.local:8084" + elif re.match(r"http://((localhost)|(127.0.0.1)):80[0-9][0-9]", origin): origin = "http://localhost:8084" bottle.response.headers["Access-Control-Allow-Origin"] = origin - bottle.response.headers[ - "Access-Control-Allow-Credentials" - ] = "true" - bottle.response.headers[ - "Access-Control-Allow-Methods" - ] = "GET, POST, PUT, OPTIONS" - bottle.response.headers[ - "Access-Control-Allow-Headers" - ] = "Origin, Accept, Content-Type, Content-Encoding, X-Requested-With, X-CSRF-Token" - jres = ( - json.dumps(res) if res else json.dumps({"Status": "Success"}) - ) - logger.debug( - "Bottle sending back result: %s", jres[: min(len(jres), 80)] - ) + bottle.response.headers["Access-Control-Allow-Credentials"] = "true" + bottle.response.headers["Access-Control-Allow-Methods"] = ( + "GET, POST, PUT, OPTIONS" + ) + bottle.response.headers["Access-Control-Allow-Headers"] = ( + "Origin, Accept, Content-Type, Content-Encoding, X-Requested-With, X-CSRF-Token" + ) + logger.debug("CORS headers set to allow from: %s", origin) + jres = json.dumps(res) if res else json.dumps({"Status": "Success"}) + logger.debug("Bottle sending back result: %s", jres[: min(len(jres), 80)]) return json.dumps(res) except Exception as e: logger.exception("Error while fulfilling request") @@ -166,24 +162,14 @@ def __init__(self, dm, maxreqsize=10): app.post("/api/stop", callback=self.stop_manager) app.post("/api/sessions", callback=self.createSession) app.get("/api/sessions", callback=self.getSessions) - app.get( - "/api/sessions/", callback=self.getSessionInformation - ) + app.get("/api/sessions/", callback=self.getSessionInformation) app.delete("/api/sessions/", callback=self.destroySession) app.get("/api/sessions//logs", callback=self.getLogFile) - app.get( - "/api/sessions//status", callback=self.getSessionStatus - ) - app.post( - "/api/sessions//deploy", callback=self.deploySession - ) - app.post( - "/api/sessions//cancel", callback=self.cancelSession - ) + app.get("/api/sessions//status", callback=self.getSessionStatus) + app.post("/api/sessions//deploy", callback=self.deploySession) + app.post("/api/sessions//cancel", callback=self.cancelSession) app.get("/api/sessions//graph", callback=self.getGraph) - app.get( - "/api/sessions//graph/size", callback=self.getGraphSize - ) + app.get("/api/sessions//graph/size", callback=self.getGraphSize) app.get( "/api/sessions//graph/status", callback=self.getGraphStatus, @@ -201,9 +187,7 @@ def __init__(self, dm, maxreqsize=10): callback=self.getSessionReproStatus, ) - app.route( - "/api/sessions", method="OPTIONS", callback=self.acceptPreflight - ) + app.route("/api/sessions", method="OPTIONS", callback=self.acceptPreflight) app.route( "/api/sessions//graph/append", method="OPTIONS", @@ -226,9 +210,7 @@ def initializeSpecifics(self, app): @daliuge_aware def submit_methods(self): - return { - "methods": [DeploymentMethods.BROWSER, DeploymentMethods.SERVER] - } + return {"methods": [DeploymentMethods.BROWSER, DeploymentMethods.SERVER]} def _stop_manager(self): self.dm.shutdown() @@ -408,9 +390,7 @@ def initializeSpecifics(self, app): "/api/sessions//subscriptions", callback=self.add_node_subscriptions, ) - app.post( - "/api/sessions//trigger", callback=self.trigger_drops - ) + app.post("/api/sessions//trigger", callback=self.trigger_drops) # The non-REST mappings that serve HTML-related content app.get("/", callback=self.visualizeDM) app.get("/api/shutdown", callback=self.shutdown_node_manager) @@ -617,9 +597,7 @@ def visualizeDIM(self): tpl = file_as_string("web/dim.html") urlparts = bottle.request.urlparts selectedNode = ( - bottle.request.params["node"] - if "node" in bottle.request.params - else "" + bottle.request.params["node"] if "node" in bottle.request.params else "" ) serverUrl = urlparts.scheme + "://" + urlparts.netloc return bottle.template( @@ -698,9 +676,7 @@ def stopNM(self, host): def addNM(self, host, node): port = constants.ISLAND_DEFAULT_REST_PORT logger.debug("Adding NM %s to DIM %s", node, host) - with RestClient( - host=host, port=port, timeout=10, url_prefix="/api" - ) as c: + with RestClient(host=host, port=port, timeout=10, url_prefix="/api") as c: return json.loads( c._POST( f"/node/{node}", @@ -711,9 +687,7 @@ def addNM(self, host, node): def removeNM(self, host, node): port = constants.ISLAND_DEFAULT_REST_PORT logger.debug("Removing NM %s from DIM %s", node, host) - with RestClient( - host=host, port=port, timeout=10, url_prefix="/api" - ) as c: + with RestClient(host=host, port=port, timeout=10, url_prefix="/api") as c: return json.loads(c._DELETE(f"/node/{node}").read()) @daliuge_aware diff --git a/daliuge-engine/dlg/named_port_utils.py b/daliuge-engine/dlg/named_port_utils.py index e24146dad..51ec112a9 100644 --- a/daliuge-engine/dlg/named_port_utils.py +++ b/daliuge-engine/dlg/named_port_utils.py @@ -1,20 +1,30 @@ +import ast +from enum import Enum import logging import collections from typing import Tuple -import dlg.common as common +from dlg import droputils, drop_loaders logger = logging.getLogger(__name__) +class DropParser(Enum): + RAW = "raw" + PICKLE = "pickle" + EVAL = "eval" + NPY = "npy" + # JSON = "json" + PATH = "path" # input only + DATAURL = "dataurl" # input only + + def serialize_kwargs(keyargs, prefix="--", separator=" "): kwargs = [] for name, value in iter(keyargs.items()): if prefix == "--" and len(name) == 1: kwargs += [f"-{name} {value}"] else: - kwargs += [ - f"{prefix.strip()}{name.strip()}{separator}{str(value).strip()}" - ] + kwargs += [f"{prefix.strip()}{name.strip()}{separator}{str(value).strip()}"] logger.debug("kwargs after serialization: %s", kwargs) return kwargs @@ -78,6 +88,7 @@ def identify_named_ports( keyargs: dict, check_len: int = 0, mode: str = "inputs", + parser: callable = None, ) -> dict: """ Checks port names for matches with arguments and returns mapped ports. @@ -89,6 +100,7 @@ def identify_named_ports( keyargs (dict): keyword arguments check_len (int): number of of ports to be checked mode (str ["inputs"]): mode, used just for logging messages + parser (function): parser function for this port Returns: dict: port arguments @@ -118,17 +130,21 @@ def identify_named_ports( if value is None: value = "" # make sure we are passing NULL drop events if key in posargs: + if parser: + logger.debug("Reading from port using %s", parser.__repr__()) + value = parser(port_dict[keys[i]]["drop"]) pargsDict.update({key: value}) # portargs.update({key: value}) logger.debug("Using %s '%s' for parg %s", mode, value, key) posargs.pop(posargs.index(key)) elif key in keyargs: + if parser: + logger.debug("Reading from port using %s", parser.__repr__()) + value = parser(port_dict[keys[i]]["drop"]) # if not found in appArgs we don't put them into portargs either portargs.update({key: value}) # pargsDict.update({key: value}) - logger.debug( - "Using %s of type %s for kwarg %s", mode, type(value), key - ) + logger.debug("Using %s of type %s for kwarg %s", mode, type(value), key) _ = keyargs.pop(key) # remove from original arg list else: logger.debug( @@ -168,6 +184,7 @@ def replace_named_ports( appArgs: dict, argumentPrefix: str = "--", separator: str = " ", + parser: callable = None, ) -> Tuple[str, str]: """ Function attempts to identify CLI component arguments that match port names. @@ -180,6 +197,7 @@ def replace_named_ports( appArgs: dictionary of all arguments argumentPrefix: prefix for keyword arguments separator: character used between keyword and value + parser: reader function for ports Returns: tuple of serialized keyword arguments and positional arguments @@ -192,27 +210,27 @@ def replace_named_ports( ) inputs_dict = collections.OrderedDict() for uid, drop in iitems: - inputs_dict[uid] = {"path": drop.path if hasattr(drop, "path") else ""} + inputs_dict[uid] = { + "drop": drop, + "path": drop.path if hasattr(drop, "path") else "", + } outputs_dict = collections.OrderedDict() for uid, drop in oitems: outputs_dict[uid] = { - "path": drop.path if hasattr(drop, "path") else "" + "drop": drop, + "path": drop.path if hasattr(drop, "path") else "", } # logger.debug("appArgs: %s", appArgs) # get positional args posargs = [arg for arg in appArgs if appArgs[arg]["positional"]] # get kwargs keyargs = { - arg: appArgs[arg]["value"] - for arg in appArgs - if not appArgs[arg]["positional"] + arg: appArgs[arg]["value"] for arg in appArgs if not appArgs[arg]["positional"] } # we will need an ordered dict for all positional arguments # thus we create it here and fill it with values - portPosargsDict = collections.OrderedDict( - zip(posargs, [None] * len(posargs)) - ) + portPosargsDict = collections.OrderedDict(zip(posargs, [None] * len(posargs))) logger.debug( "posargs: %s; keyargs: %s, %s", posargs, @@ -234,6 +252,7 @@ def replace_named_ports( keyargs, check_len=len(iitems), mode="inputs", + parser=parser, ) portkeyargs.update(ipkeyargs) else: @@ -268,16 +287,12 @@ def replace_named_ports( appArgs = clean_applicationArgs(appArgs) # get cleaned positional args posargs = { - arg: appArgs[arg]["value"] - for arg in appArgs - if appArgs[arg]["positional"] + arg: appArgs[arg]["value"] for arg in appArgs if appArgs[arg]["positional"] } logger.debug("posargs: %s", posargs) # get cleaned kwargs keyargs = { - arg: appArgs[arg]["value"] - for arg in appArgs - if not appArgs[arg]["positional"] + arg: appArgs[arg]["value"] for arg in appArgs if not appArgs[arg]["positional"] } for k, v in portkeyargs.items(): if v not in [None, ""]: @@ -298,7 +313,34 @@ def replace_named_ports( ) pargs = list(posargs.values()) pargs = [""] if len(pargs) == 0 or None in pargs else pargs - logger.debug( - "After port replacement: pargs: %s; keyargs: %s", pargs, keyargs - ) + logger.debug("After port replacement: pargs: %s; keyargs: %s", pargs, keyargs) return keyargs, pargs + + +def get_port_reader_function(input_parser: DropParser): + """ + Return the function used to read input from a named port + """ + # Inputs are un-pickled and treated as the arguments of the function + # Their order must be preserved, so we use an OrderedDict + if input_parser is DropParser.PICKLE: + # all_contents = lambda x: pickle.loads(droputils.allDropContents(x)) + reader = drop_loaders.load_pickle + elif input_parser is DropParser.EVAL: + + def optionalEval(x): + # Null and Empty Drops will return an empty byte string + # which should propogate back to None + content: str = droputils.allDropContents(x).decode("utf-8") + return ast.literal_eval(content) if len(content) > 0 else None + + reader = optionalEval + elif input_parser is DropParser.NPY: + reader = drop_loaders.load_npy + elif input_parser is DropParser.PATH: + reader = lambda x: x.path + elif input_parser is DropParser.DATAURL: + reader = lambda x: x.dataurl + else: + raise ValueError(input_parser.__repr__()) + return reader diff --git a/daliuge-engine/dlg/passwd.template b/daliuge-engine/dlg/passwd.template index c9043069a..98f539b7a 100644 --- a/daliuge-engine/dlg/passwd.template +++ b/daliuge-engine/dlg/passwd.template @@ -18,3 +18,4 @@ gnats:x:41:41:Gnats Bug-Reporting System (admin):/var/lib/gnats:/usr/sbin/nologi nobody:x:65534:65534:nobody:/nonexistent:/usr/sbin/nologin _apt:x:100:65534::/nonexistent:/usr/sbin/nologin messagebus:x:101:102::/nonexistent:/usr/sbin/nologin +avahi:x:102:103:Avahi mDNS daemon,,,:/var/run/avahi-daemon:/usr/sbin/nologin diff --git a/daliuge-engine/docker/Dockerfile.dev b/daliuge-engine/docker/Dockerfile.dev index 4a3cb7743..dd7e8eef3 100644 --- a/daliuge-engine/docker/Dockerfile.dev +++ b/daliuge-engine/docker/Dockerfile.dev @@ -5,6 +5,7 @@ FROM icrar/daliuge-common:${VCS_TAG:-latest} # RUN sudo apt-get update && sudo DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends tzdata \ # gcc g++ gdb casacore-dev clang-tidy-10 clang-tidy libboost1.71-all-dev libgsl-dev +RUN service dbus start && service avahi-daemon start && avahi-set-host-name dlg-engine RUN useradd --create-home awicenec COPY / /daliuge RUN . /dlg/bin/activate && pip install --upgrade pip && pip install wheel && cd /daliuge && \ diff --git a/daliuge-engine/run_engine.sh b/daliuge-engine/run_engine.sh index 6c5cdc4bc..78300eb4c 100755 --- a/daliuge-engine/run_engine.sh +++ b/daliuge-engine/run_engine.sh @@ -4,6 +4,7 @@ DOCKER_OPTS="\ --rm \ $([[ $(nvidia-docker version) ]] && echo '--gpus=all' || echo '') \ --name daliuge-engine \ +-h dlg-engine \ -v /var/run/docker.sock:/var/run/docker.sock \ -p 5555:5555 -p 6666:6666 \ -p 8000:8000 -p 8001:8001 \ @@ -43,8 +44,10 @@ case "$1" in echo "Running Engine deployment version in background..." echo "docker run -td "${DOCKER_OPTS}" icrar/daliuge-engine:${VCS_TAG}" docker run -td ${DOCKER_OPTS} icrar/daliuge-engine:${VCS_TAG} - echo "Engine IP address: "`docker exec daliuge-engine sh -c "hostname --ip-address"` - exit 0 + sleep 3 + ENGINE_NAME=`docker exec daliuge-engine sh -c "hostname"` + ENGINE_IP=`docker exec daliuge-engine sh -c "hostname --ip-address"` + # exit 0 fi;; "dev") export DLG_ROOT="$HOME/dlg" @@ -53,11 +56,12 @@ case "$1" in echo "docker run -td ${DOCKER_OPTS} icrar/daliuge-engine:${C_TAG}" docker run -td ${DOCKER_OPTS} icrar/daliuge-engine:${C_TAG} sleep 3 + docker exec -u root daliuge-engine bash -c "service avahi-daemon stop && service dbus restart && service avahi-daemon start" + ENGINE_NAME=`docker exec daliuge-engine sh -c "hostname"` ENGINE_IP=`docker exec daliuge-engine sh -c "hostname --ip-address"` curl -X POST http://${ENGINE_IP}:9000/managers/island/start - echo - echo "Engine IP address: ${ENGINE_IP}" - exit 0;; + curl -X POST http://${ENGINE_IP}:8001/api/node/dlg-engine.local;; + # exit 0;; "casa") DLG_ROOT="/tmp/dlg" export VCS_TAG=`git rev-parse --abbrev-ref HEAD | tr '[:upper:]' '[:lower:]'` @@ -67,11 +71,10 @@ case "$1" in echo "docker run -td ${DOCKER_OPTS} ${CONTAINER_NM}" docker run -td ${DOCKER_OPTS} ${CONTAINER_NM} sleep 3 + ENGINE_NAME=`docker exec daliuge-engine sh -c "hostname"` ENGINE_IP=`docker exec daliuge-engine sh -c "hostname --ip-address"` - curl -X POST http://${ENGIONE_IP}:9000/managers/island/start - echo - echo "Engine IP address: ${ENGINE_IP}" - exit 0;; + curl -X POST http://${ENGIONE_IP}:9000/managers/island/start;; + # exit 0;; "slim") export DLG_ROOT="$HOME/dlg" common_prep @@ -79,11 +82,10 @@ case "$1" in echo "docker run -td ${DOCKER_OPTS} icrar/daliuge-engine:${VCS_TAG}" docker run -td ${DOCKER_OPTS} icrar/daliuge-engine:${VCS_TAG} sleep 3 + ENGINE_NAME=`docker exec daliuge-engine sh -c "hostname"` ENGINE_IP=`docker exec daliuge-engine sh -c "hostname --ip-address"` - curl -X POST http://${ENGINE_IP}:9000/managers/island/start - echo - echo "Engine IP address: ${ENGINE_IP}" - exit 0;; + curl -X POST http://${ENGINE_IP}:9000/managers/island/start;; + # exit 0;; "local") common_prep echo "Starting managers locally in background.." @@ -92,8 +94,13 @@ case "$1" in echo echo "Use any of the following URLs to access the DIM:" python -c "from dlg.utils import get_local_ip_addr; print([f'http://{addr}:8001' for addr,name in get_local_ip_addr() if not name.startswith('docker')])" - echo "Log files can be found in ${DLG_ROOT}/log";; + echo "Log files can be found in ${DLG_ROOT}/log" + ENGINE_NAME="localhost" + ENGINE_IP=`hostname --ip-address`;; *) echo "Usage run_engine.sh " exit 0;; esac +echo +echo "Engine NAME/IP address: ${ENGINE_NAME}/${ENGINE_IP}" + diff --git a/daliuge-engine/test/apps/test_pyfunc.py b/daliuge-engine/test/apps/test_pyfunc.py index b94464ad0..866361a74 100644 --- a/daliuge-engine/test/apps/test_pyfunc.py +++ b/daliuge-engine/test/apps/test_pyfunc.py @@ -28,6 +28,7 @@ import numpy from dlg import droputils, drop_loaders +from dlg.named_port_utils import DropParser from dlg.apps import pyfunc from dlg.apps.simple_functions import string2json from dlg.ddap_protocol import DROPStates, DROPRel, DROPLinkType @@ -154,8 +155,8 @@ def test_eval_func(self, f=lambda x: x, input_data=None, output_data=None): "b", "b", f, - input_parser=pyfunc.DropParser.EVAL, - output_parser=pyfunc.DropParser.EVAL, + input_parser=DropParser.EVAL, + output_parser=DropParser.EVAL, ) c = InMemoryDROP("c", "c") @@ -203,8 +204,8 @@ def test_npy_func(self, f=lambda x: x, input_data=None, output_data=None): "b", "b", f, - input_parser=pyfunc.DropParser.NPY, - output_parser=pyfunc.DropParser.NPY, + input_parser=DropParser.NPY, + output_parser=DropParser.NPY, ) c = InMemoryDROP("c", "c") diff --git a/daliuge-translator/build_translator.sh b/daliuge-translator/build_translator.sh index bce978c8a..17e1e6f58 100755 --- a/daliuge-translator/build_translator.sh +++ b/daliuge-translator/build_translator.sh @@ -47,7 +47,7 @@ case "$1" in echo "" echo ">>>>> docker-slim output <<<<<<<<<" docker run -it --rm -v /var/run/docker.sock:/var/run/docker.sock dslim/docker-slim build --include-shell \ - --include-path /usr/local/lib --include-path /usr/local/bin --include-path /daliuge --include-path /dlg \ + --include-path /usr/bin/hostname --include-path /usr/local/lib --include-path /usr/local/bin --include-path /daliuge --include-path /dlg \ --http-probe=false --tag=icrar/daliuge-translator:${VCS_TAG} icrar/daliuge-translator.big:${VCS_TAG} ;; *) diff --git a/daliuge-translator/dlg/dropmake/web/translator_rest.py b/daliuge-translator/dlg/dropmake/web/translator_rest.py index 93d79dae1..748bfa773 100644 --- a/daliuge-translator/dlg/dropmake/web/translator_rest.py +++ b/daliuge-translator/dlg/dropmake/web/translator_rest.py @@ -138,17 +138,13 @@ global lg_dir global pgt_dir global pg_mgr -LG_SCHEMA = json.loads( - file_as_string("lg.graph.schema", package="dlg.dropmake") -) +LG_SCHEMA = json.loads(file_as_string("lg.graph.schema", package="dlg.dropmake")) @app.post("/jsonbody", tags=["Original"]) def jsonbody_post_lg( lg_name: str = Form(description="The name of the lg to use"), - lg_content: str = Form( - description="The content of the lg to save to file" - ), + lg_content: str = Form(description="The content of the lg to save to file"), rmode: str = Form(default=str(REPRO_DEFAULT.value)), ): """ @@ -172,9 +168,7 @@ def jsonbody_post_lg( except Exception as e: raise HTTPException( status_code=500, - detail="Failed to save logical graph {0}:{1}".format( - lg_name, str(e) - ), + detail="Failed to save logical graph {0}:{1}".format(lg_name, str(e)), ) finally: post_sem.release() @@ -303,9 +297,7 @@ def get_gantt_chart( except GraphException as ge: raise HTTPException( status_code=500, - detail="Failed to generate Gantt chart for {0}: {1}".format( - pgt_id, ge - ), + detail="Failed to generate Gantt chart for {0}: {1}".format(pgt_id, ge), ) @@ -345,9 +337,7 @@ def get_schedule_matrices( except Exception as e: raise HTTPException( status_code=500, - detail="Failed to get schedule matrices for {0}: {1}".format( - pgt_id, e - ), + detail="Failed to get schedule matrices for {0}: {1}".format(pgt_id, e), ) @@ -429,18 +419,14 @@ def gen_pgt( logger.info("Schedule Exception") raise HTTPException( status_code=500, - detail="Graph scheduling exception {1}: {0}".format( - str(se), lg_name - ), + detail="Graph scheduling exception {1}: {0}".format(str(se), lg_name), ) except Exception: logger.info("Partition / Other exception") trace_msg = traceback.format_exc() raise HTTPException( status_code=500, - detail="Graph partition exception {1}: {0}".format( - trace_msg, lg_name - ), + detail="Graph partition exception {1}: {0}".format(trace_msg, lg_name), ) @@ -450,9 +436,7 @@ async def gen_pgt_post( lg_name: str = Form( description="If present, translator will attempt to load this lg from file" ), - json_data: str = Form( - description="The graph data used as the graph if supplied" - ), + json_data: str = Form(description="The graph data used as the graph if supplied"), rmode: str = Form( str(REPRO_DEFAULT.value), description="Reproducibility mode setting level of provenance tracking. Refer to main documentation for more information", @@ -549,18 +533,14 @@ async def gen_pgt_post( logger.info("SCHEDULE EXCEPTION") raise HTTPException( status_code=500, - detail="Graph scheduling exception {1}: {0}".format( - str(se), lg_name - ), + detail="Graph scheduling exception {1}: {0}".format(str(se), lg_name), ) except Exception: logger.info("OTHER EXCEPTION") trace_msg = traceback.format_exc() raise HTTPException( status_code=500, - detail="Graph partition exception {1}: {0}".format( - trace_msg, lg_name - ), + detail="Graph partition exception {1}: {0}".format(trace_msg, lg_name), ) @@ -634,9 +614,7 @@ def gen_pg( pgtpj = pgtp._gojs_json_obj reprodata = pgtp.reprodata - num_partitions = len( - list(filter(lambda n: "isGroup" in n, pgtpj["nodeDataArray"])) - ) + num_partitions = len(list(filter(lambda n: "isGroup" in n, pgtpj["nodeDataArray"]))) if mhost is None: if tpl_nodes_len > 0: @@ -684,9 +662,7 @@ def gen_pg( except restutils.RestClientException as re: raise HTTPException( status_code=500, - detail="Failed to interact with DALiUGE Drop Manager: {0}".format( - re - ), + detail="Failed to interact with DALiUGE Drop Manager: {0}".format(re), ) except Exception as ex: logger.error(traceback.format_exc()) @@ -725,9 +701,7 @@ def gen_pg_spec( logger.error("%s", traceback.format_exc()) raise HTTPException( status_code=500, - detail="Unable to parse json body of request for pg_spec: {0}".format( - ex - ), + detail="Unable to parse json body of request for pg_spec: {0}".format(ex), ) pgtp = pg_mgr.get_pgt(pgt_id) if pgtp is None: @@ -738,9 +712,7 @@ def gen_pg_spec( ), ) if node_list is None: - raise HTTPException( - status_code=500, detail="Must specify DALiuGE nodes list" - ) + raise HTTPException(status_code=500, detail="Must specify DALiuGE nodes list") try: pg_spec = pgtp.to_pg_spec( @@ -787,9 +759,7 @@ def gen_pg_helm( pgtpj = pgtp._gojs_json_obj logger.info("PGTP: %s", pgtpj) - num_partitions = len( - list(filter(lambda n: "isGroup" in n, pgtpj["nodeDataArray"])) - ) + num_partitions = len(list(filter(lambda n: "isGroup" in n, pgtpj["nodeDataArray"]))) # Send pgt_data to helm_start try: start_helm(pgtp, num_partitions, pgt_dir) @@ -845,17 +815,13 @@ def load_graph(graph_content: str, graph_name: str): ) if not lg_exists(lg_dir, graph_name): if not graph_content: - raise HTTPException( - status_code=400, detail="LG content is nonexistent" - ) + raise HTTPException(status_code=400, detail="LG content is nonexistent") else: try: out_graph = json.loads(graph_content) except JSONDecodeError as jerror: logger.error(jerror) - raise HTTPException( - status_code=400, detail="LG content is malformed" - ) + raise HTTPException(status_code=400, detail="LG content is malformed") else: lgp = lg_path(lg_dir, graph_name) with open(lgp, "r") as f: @@ -884,10 +850,7 @@ def lg_fill( ), rmode: str = Form( REPRO_DEFAULT.name, - enum=[ - roption.name - for roption in [ReproducibilityFlags.NOTHING] + ALL_RMODES - ], + enum=[roption.name for roption in [ReproducibilityFlags.NOTHING] + ALL_RMODES], description="Reproducibility mode setting level of provenance tracking. Refer to main documentation for more information", ), ): @@ -901,9 +864,7 @@ def lg_fill( params = json.loads(parameters) except JSONDecodeError as jerror: logger.error(jerror) - raise HTTPException( - status_code=400, detail="Parameter string is invalid" - ) + raise HTTPException(status_code=400, detail="Parameter string is invalid") output_graph = dlg.dropmake.pg_generator.fill(lg_graph, params) output_graph = init_lg_repro_data(init_lgt_repro_data(output_graph, rmode)) return JSONResponse(output_graph) @@ -937,9 +898,7 @@ def lg_unroll( One of lg_name or lg_content, but not both, needs to be specified. """ lg_graph = load_graph(lg_content, lg_name) - pgt = dlg.dropmake.pg_generator.unroll( - lg_graph, oid_prefix, zero_run, default_app - ) + pgt = dlg.dropmake.pg_generator.unroll(lg_graph, oid_prefix, zero_run, default_app) pgt = init_pgt_unroll_repro_data(pgt) return JSONResponse(pgt) @@ -986,9 +945,7 @@ def pgt_partition( return JSONResponse(pgt) -@app.post( - "/unroll_and_partition", response_class=JSONResponse, tags=["Updated"] -) +@app.post("/unroll_and_partition", response_class=JSONResponse, tags=["Updated"]) def lg_unroll_and_partition( lg_name: str = Form( default=None, @@ -1030,9 +987,7 @@ def lg_unroll_and_partition( One of lg_name and lg_content, but not both, must be specified. """ lg_graph = load_graph(lg_content, lg_name) - pgt = dlg.dropmake.pg_generator.unroll( - lg_graph, oid_prefix, zero_run, default_app - ) + pgt = dlg.dropmake.pg_generator.unroll(lg_graph, oid_prefix, zero_run, default_app) pgt = init_pgt_unroll_repro_data(pgt) reprodata = pgt.pop() pgt = dlg.dropmake.pg_generator.partition( @@ -1090,9 +1045,7 @@ def pgt_map( if not pgt[-1].get("oid"): reprodata = pgt.pop() logger.info(nodes) - pg = dlg.dropmake.pg_generator.resource_map( - pgt, nodes, num_islands, co_host_dim - ) + pg = dlg.dropmake.pg_generator.resource_map(pgt, nodes, num_islands, co_host_dim) pg.append(reprodata) pg = init_pg_repro_data(pg) return JSONResponse(pg) @@ -1121,11 +1074,7 @@ def get_submission_method( if check_k8s_env(): available_methods.append(DeploymentMethods.HELM) if mhost is not None: - host_available_methods = get_mgr_deployment_methods( - mhost, mport, mprefix - ) - if DeploymentMethods.BROWSER in host_available_methods: - available_methods.append(DeploymentMethods.SERVER) + available_methods = get_mgr_deployment_methods(mhost, mport, mprefix) logger.debug("Methods available: %s", available_methods) return {"methods": available_methods} @@ -1260,9 +1209,7 @@ def handler(*_args): signal.signal(signal.SIGINT, handler) logging.debug("Starting uvicorn verbose %s", options.verbose) - uvicorn.run( - app=app, host=options.host, port=options.port, debug=options.verbose - ) + uvicorn.run(app=app, host=options.host, port=options.port, debug=options.verbose) if __name__ == "__main__": diff --git a/daliuge-translator/dlg/dropmake/web/translator_utils.py b/daliuge-translator/dlg/dropmake/web/translator_utils.py index 39a7f043d..f934e6b49 100644 --- a/daliuge-translator/dlg/dropmake/web/translator_utils.py +++ b/daliuge-translator/dlg/dropmake/web/translator_utils.py @@ -5,8 +5,12 @@ from dlg import common from dlg.clients import CompositeManagerClient -from dlg.common.reproducibility.reproducibility import init_lg_repro_data, init_lgt_repro_data, \ - init_pgt_unroll_repro_data, init_pgt_partition_repro_data +from dlg.common.reproducibility.reproducibility import ( + init_lg_repro_data, + init_lgt_repro_data, + init_pgt_unroll_repro_data, + init_pgt_partition_repro_data, +) from dlg.dropmake.lg import load_lg from dlg.dropmake.pg_generator import unroll, partition from dlg.restutils import RestClientException @@ -117,7 +121,17 @@ def parse_mgr_url(mgr_url): return mparse.hostname, mport, mprefix -def make_algo_param_dict(min_goal, ptype, max_load_imb, max_cpu, time_greedy, deadline, topk, swam_size, max_mem): +def make_algo_param_dict( + min_goal, + ptype, + max_load_imb, + max_cpu, + time_greedy, + deadline, + topk, + swam_size, + max_mem, +): return { "min_goal": min_goal, "ptype": ptype, @@ -127,13 +141,19 @@ def make_algo_param_dict(min_goal, ptype, max_load_imb, max_cpu, time_greedy, de "deadline": deadline, "topk": topk, "swarm_size": swam_size, - "max_mem": max_mem + "max_mem": max_mem, } -def unroll_and_partition_with_params(lgt: dict, test: bool, algorithm: str = "none", - num_partitions: int = 1, num_islands: int = 0, - par_label: str = "Partition", algorithm_parameters=None): +def unroll_and_partition_with_params( + lgt: dict, + test: bool, + algorithm: str = "none", + num_partitions: int = 1, + num_islands: int = 0, + par_label: str = "Partition", + algorithm_parameters=None, +): if algorithm_parameters is None: algorithm_parameters = {} app = "dlg.apps.simple.SleepApp" if test else None diff --git a/daliuge-translator/docker/Dockerfile.dev b/daliuge-translator/docker/Dockerfile.dev index 873aa6878..d6f2f8f51 100644 --- a/daliuge-translator/docker/Dockerfile.dev +++ b/daliuge-translator/docker/Dockerfile.dev @@ -12,6 +12,7 @@ LABEL build=$BUILD_ID # apt-get clean && \ # apt install -y gcc python3-venv python3-distutils +RUN service avahi-daemon stop && service dbus start && service avahi-daemon start && avahi-set-host-name dlg-engine COPY / /daliuge RUN . /dlg/bin/activate && \ cd /daliuge && \ diff --git a/daliuge-translator/run_translator.sh b/daliuge-translator/run_translator.sh index ce0450496..cdefc0317 100755 --- a/daliuge-translator/run_translator.sh +++ b/daliuge-translator/run_translator.sh @@ -3,22 +3,20 @@ case "$1" in "dep") VCS_TAG=`git describe --tags --abbrev=0|sed s/v//` echo "Running Translator deployment version in background..." - docker run --name daliuge-translator --rm -td -p 8084:8084 icrar/daliuge-translator:${VCS_TAG} - echo "Translator URL: http://"`docker exec daliuge-translator sh -c "hostname --ip-address"`:8084 - exit 0;; + docker run -h dlg-trans --name daliuge-translator --rm -td -p 8084:8084 icrar/daliuge-translator:${VCS_TAG};; "dev") export VCS_TAG=`git rev-parse --abbrev-ref HEAD| tr '[:upper:]' '[:lower:]'` echo "Running Translator development version in foreground..." - docker run --volume $PWD/dlg/dropmake:/dlg/lib/python3.8/site-packages/dlg/dropmake --name daliuge-translator --rm -t -p 8084:8084 icrar/daliuge-translator:${VCS_TAG} - echo "Translator URL: http://"`docker exec daliuge-translator sh -c "hostname --ip-address"`:8084 - exit 0;; + docker run -h dlg-trans --volume $PWD/dlg/dropmake:/dlg/lib/python3.8/site-packages/dlg/dropmake --name daliuge-translator --rm -t -p 8084:8084 icrar/daliuge-translator:${VCS_TAG};; "casa") export VCS_TAG=`git rev-parse --abbrev-ref HEAD| tr '[:upper:]' '[:lower:]'`-casa echo "Running Translator development version in foreground..." - docker run --volume $PWD/dlg/dropmake:/dlg/lib/python3.8/site-packages/dlg/dropmake --name daliuge-translator --rm -t -p 8084:8084 icrar/daliuge-translator:${VCS_TAG} - echo "Translator URL: http://"`docker exec daliuge-translator sh -c "hostname --ip-address"`:8084 - exit 0;; + docker run -h dlg-trans --volume $PWD/dlg/dropmake:/dlg/lib/python3.8/site-packages/dlg/dropmake --name daliuge-translator --rm -t -p 8084:8084 icrar/daliuge-translator:${VCS_TAG};; *) echo "Usage run_translator.sh " exit 0;; esac +sleep 3 +TRANS_NAME=`docker exec daliuge-translator sh -c "hostname"` +TRANS_IP=`docker exec daliuge-translator sh -c "hostname --ip-address"` +echo "Translator URL: http://${TRANS_NAME}.local:8084"