Skip to content

Commit

Permalink
avahi host names and bash command line arguments.
Browse files Browse the repository at this point in the history
Enabled dlg-trans.local and dg-engine.local as
host names through avahi zeroconf.

Enabled passing of command line argument values
through data drops for bash shell components.
  • Loading branch information
awicenec committed Apr 12, 2024
1 parent b11f970 commit 2f121bc
Show file tree
Hide file tree
Showing 19 changed files with 218 additions and 243 deletions.
4 changes: 2 additions & 2 deletions daliuge-common/dlg/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def get_log_file(self, sessionId):
return self._request(f"/sessions/{sessionId}/logs", "GET")

def get_submission_method(self):
return self._get_json("/submission_method")
return self._get_json("submission_method")


class CompositeManagerClient(BaseDROPManagerClient):
Expand All @@ -257,7 +257,7 @@ def remove_node(self, node):
self._DELETE(f"/node/{node}")

def get_submission_method(self):
return self._get_json("/submission_method")
return self._get_json("submission_method")


class DataIslandManagerClient(CompositeManagerClient):
Expand Down
1 change: 1 addition & 0 deletions daliuge-common/dlg/restutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ def _request(self, url, method, content=None, headers={}, timeout=10):

if content and hasattr(content, "read"):
headers["Transfer-Encoding"] = "chunked"
headers["Origin"] = "http://dlg-trans.local:8084"
content = chunked(content)

self._conn = http.client.HTTPConnection(self.host, self.port)
Expand Down
3 changes: 2 additions & 1 deletion daliuge-common/docker/Dockerfile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ FROM ubuntu:20.04
ARG BUILD_ID
LABEL stage=builder
LABEL build=$BUILD_ID
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
RUN apt-get update && \
apt-get install -y gcc python3 python3.8-venv python3-pip python3-distutils python3-appdirs libmetis-dev curl git sudo && \
apt-get install -y avahi-utils gcc python3 python3.8-venv python3-pip python3-distutils python3-appdirs libmetis-dev curl git sudo && \
apt-get clean

COPY / /daliuge
Expand Down
10 changes: 9 additions & 1 deletion daliuge-engine/dlg/apps/bash_shell_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@
import json

from .. import droputils, utils
from dlg.named_port_utils import replace_named_ports
from dlg.named_port_utils import (
DropParser,
get_port_reader_function,
replace_named_ports,
)
from ..ddap_protocol import AppDROPStates, DROPStates
from ..apps.app_base import BarrierAppDROP, AppDROP
from ..exceptions import InvalidDropException
Expand All @@ -51,6 +55,7 @@
dlg_batch_input,
dlg_batch_output,
dlg_streaming_input,
dlg_enum_param,
)


Expand Down Expand Up @@ -163,6 +168,7 @@ class BashShellBase(object):
# TODO: use the shlex module for most of the construction of the
# command line to get a proper and safe shell syntax
command = dlg_string_param("Bash command", None)
input_parser: DropParser = dlg_enum_param(DropParser, "input_parser", DropParser.PICKLE) # type: ignore

def initialize(self, **kwargs):
super(BashShellBase, self).initialize(**kwargs)
Expand Down Expand Up @@ -214,6 +220,7 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE):
outport_names = (
self.parameters["outputs"] if "outputs" in self.parameters else []
)
reader = get_port_reader_function(self.input_parser)
keyargs, pargs = replace_named_ports(
inputs.items(),
outputs.items(),
Expand All @@ -222,6 +229,7 @@ def _run_bash(self, inputs, outputs, stdin=None, stdout=subprocess.PIPE):
self.appArgs,
argumentPrefix=self._argumentPrefix,
separator=self._paramValueSeparator,
parser=reader,
)
argumentString = (
f"{' '.join(map(str,pargs + keyargs))}" # add kwargs to end of pargs
Expand Down
43 changes: 8 additions & 35 deletions daliuge-engine/dlg/apps/pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,14 @@
from io import StringIO
from contextlib import redirect_stdout

from dlg import droputils, drop_loaders
from dlg import drop_loaders
from dlg.utils import serialize_data, deserialize_data
from dlg.named_port_utils import check_ports_dict, identify_named_ports
from dlg.named_port_utils import (
DropParser,
check_ports_dict,
get_port_reader_function,
identify_named_ports,
)
from dlg.apps.app_base import BarrierAppDROP
from dlg.exceptions import InvalidDropException
from dlg.meta import (
Expand Down Expand Up @@ -136,16 +141,6 @@ def import_using_code(code):
return dill.loads(code)


class DropParser(Enum):
RAW = "raw"
PICKLE = "pickle"
EVAL = "eval"
NPY = "npy"
# JSON = "json"
PATH = "path" # input only
DATAURL = "dataurl" # input only


##
# @brief PythonMemberFunction
# @details A placeholder APP to aid construction of new class member function applications.
Expand Down Expand Up @@ -453,29 +448,7 @@ def run(self):
"""
funcargs = {}
# Inputs are un-pickled and treated as the arguments of the function
# Their order must be preserved, so we use an OrderedDict
if self.input_parser is DropParser.PICKLE:
# all_contents = lambda x: pickle.loads(droputils.allDropContents(x))
all_contents = drop_loaders.load_pickle
elif self.input_parser is DropParser.EVAL:

def optionalEval(x):
# Null and Empty Drops will return an empty byte string
# which should propogate back to None
content: str = droputils.allDropContents(x).decode("utf-8")
return ast.literal_eval(content) if len(content) > 0 else None

all_contents = optionalEval
elif self.input_parser is DropParser.NPY:
all_contents = drop_loaders.load_npy
elif self.input_parser is DropParser.PATH:
all_contents = lambda x: x.path
elif self.input_parser is DropParser.DATAURL:
all_contents = lambda x: x.dataurl
else:
raise ValueError(self.input_parser.__repr__())

all_contents = get_port_reader_function(self.input_parser)
inputs = collections.OrderedDict()
for uid, drop in self._inputs.items():
inputs[uid] = all_contents(drop)
Expand Down
14 changes: 6 additions & 8 deletions daliuge-engine/dlg/apps/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
import pickle
import random
from typing import List, Optional
import urllib.error
import urllib.request
import requests
import logging
import time
import numpy as np
Expand Down Expand Up @@ -647,18 +646,17 @@ class UrlRetrieveApp(BarrierAppDROP):

def run(self):
try:
u = urllib.request.urlopen(self.url)
except urllib.error.URLError as e:
logger.info("Accessing URL %s", self.url)
u = requests.get(self.url)
except requests.exceptions.RequestException as e:
raise e.reason

content = u.read()

outs = self.outputs
if len(outs) < 1:
raise Exception("At least one output should have been added to %r" % self)
for o in outs:
o.len = len(content)
o.write(content) # send content to all outputs
o.len = len(u.content)
o.write(u.content) # send content to all outputs


##
Expand Down
13 changes: 6 additions & 7 deletions daliuge-engine/dlg/data/drops/s3_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
dlg_list_param,
)

from dlg.named_port_utils import identify_named_ports, check_ports_dict
# from dlg.named_port_utils import identify_named_ports, check_ports_dict


##
Expand Down Expand Up @@ -128,10 +128,10 @@ def getIO(self) -> DataIO:
:return:
"""
logger.debug("S3DROP producers: %s", self._producers)
if check_ports_dict(self._producers):
self.mapped_inputs = identify_named_ports(
self._producers, {}, self.keyargs, mode="inputs"
)
# if check_ports_dict(self._producers):
# self.mapped_inputs = identify_named_ports(
# self._producers, {}, self.keyargs, mode="inputs"
# )
logger.debug("Parameters found: {}", self.parameters)
return S3IO(
self.aws_access_key_id,
Expand Down Expand Up @@ -309,8 +309,7 @@ def _close(self, **kwargs):
Bucket=self._bucket, Key=self._key, UploadId=self._uploadId
)
parts = [
{"ETag": p["ETag"], "PartNumber": p["PartNumber"]}
for p in res["Parts"]
{"ETag": p["ETag"], "PartNumber": p["PartNumber"]} for p in res["Parts"]
]
# TODO: Check checksum!
res = self._s3.complete_multipart_upload(
Expand Down
3 changes: 3 additions & 0 deletions daliuge-engine/dlg/group.template
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ nogroup:x:65534:
crontab:x:101:
messagebus:x:102:
ssh:x:103:
avahi:x:103:
netdev:x:104:

78 changes: 26 additions & 52 deletions daliuge-engine/dlg/manager/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,28 +78,24 @@ def fwrapper(*args, **kwargs):
bottle.response.content_type = "application/json"
# set CORS headers
origin = bottle.request.headers.raw("Origin")
if origin is None:
origin = "http://localhost:8084"
elif not re.match(
r"http://((localhost)|(127.0.0.1)):80[0-9][0-9]", origin
logger.debug("CORS request comming from: %s", origin)
if origin is None or re.match(
r"http://dlg-trans.local:80[0-9][0-9]", origin
):
origin = "http://dlg-trans.local:8084"
elif re.match(r"http://((localhost)|(127.0.0.1)):80[0-9][0-9]", origin):
origin = "http://localhost:8084"
bottle.response.headers["Access-Control-Allow-Origin"] = origin
bottle.response.headers[
"Access-Control-Allow-Credentials"
] = "true"
bottle.response.headers[
"Access-Control-Allow-Methods"
] = "GET, POST, PUT, OPTIONS"
bottle.response.headers[
"Access-Control-Allow-Headers"
] = "Origin, Accept, Content-Type, Content-Encoding, X-Requested-With, X-CSRF-Token"
jres = (
json.dumps(res) if res else json.dumps({"Status": "Success"})
)
logger.debug(
"Bottle sending back result: %s", jres[: min(len(jres), 80)]
)
bottle.response.headers["Access-Control-Allow-Credentials"] = "true"
bottle.response.headers["Access-Control-Allow-Methods"] = (
"GET, POST, PUT, OPTIONS"
)
bottle.response.headers["Access-Control-Allow-Headers"] = (
"Origin, Accept, Content-Type, Content-Encoding, X-Requested-With, X-CSRF-Token"
)
logger.debug("CORS headers set to allow from: %s", origin)
jres = json.dumps(res) if res else json.dumps({"Status": "Success"})
logger.debug("Bottle sending back result: %s", jres[: min(len(jres), 80)])
return json.dumps(res)
except Exception as e:
logger.exception("Error while fulfilling request")
Expand Down Expand Up @@ -166,24 +162,14 @@ def __init__(self, dm, maxreqsize=10):
app.post("/api/stop", callback=self.stop_manager)
app.post("/api/sessions", callback=self.createSession)
app.get("/api/sessions", callback=self.getSessions)
app.get(
"/api/sessions/<sessionId>", callback=self.getSessionInformation
)
app.get("/api/sessions/<sessionId>", callback=self.getSessionInformation)
app.delete("/api/sessions/<sessionId>", callback=self.destroySession)
app.get("/api/sessions/<sessionId>/logs", callback=self.getLogFile)
app.get(
"/api/sessions/<sessionId>/status", callback=self.getSessionStatus
)
app.post(
"/api/sessions/<sessionId>/deploy", callback=self.deploySession
)
app.post(
"/api/sessions/<sessionId>/cancel", callback=self.cancelSession
)
app.get("/api/sessions/<sessionId>/status", callback=self.getSessionStatus)
app.post("/api/sessions/<sessionId>/deploy", callback=self.deploySession)
app.post("/api/sessions/<sessionId>/cancel", callback=self.cancelSession)
app.get("/api/sessions/<sessionId>/graph", callback=self.getGraph)
app.get(
"/api/sessions/<sessionId>/graph/size", callback=self.getGraphSize
)
app.get("/api/sessions/<sessionId>/graph/size", callback=self.getGraphSize)
app.get(
"/api/sessions/<sessionId>/graph/status",
callback=self.getGraphStatus,
Expand All @@ -201,9 +187,7 @@ def __init__(self, dm, maxreqsize=10):
callback=self.getSessionReproStatus,
)

app.route(
"/api/sessions", method="OPTIONS", callback=self.acceptPreflight
)
app.route("/api/sessions", method="OPTIONS", callback=self.acceptPreflight)
app.route(
"/api/sessions/<sessionId>/graph/append",
method="OPTIONS",
Expand All @@ -226,9 +210,7 @@ def initializeSpecifics(self, app):

@daliuge_aware
def submit_methods(self):
return {
"methods": [DeploymentMethods.BROWSER, DeploymentMethods.SERVER]
}
return {"methods": [DeploymentMethods.BROWSER, DeploymentMethods.SERVER]}

def _stop_manager(self):
self.dm.shutdown()
Expand Down Expand Up @@ -408,9 +390,7 @@ def initializeSpecifics(self, app):
"/api/sessions/<sessionId>/subscriptions",
callback=self.add_node_subscriptions,
)
app.post(
"/api/sessions/<sessionId>/trigger", callback=self.trigger_drops
)
app.post("/api/sessions/<sessionId>/trigger", callback=self.trigger_drops)
# The non-REST mappings that serve HTML-related content
app.get("/", callback=self.visualizeDM)
app.get("/api/shutdown", callback=self.shutdown_node_manager)
Expand Down Expand Up @@ -617,9 +597,7 @@ def visualizeDIM(self):
tpl = file_as_string("web/dim.html")
urlparts = bottle.request.urlparts
selectedNode = (
bottle.request.params["node"]
if "node" in bottle.request.params
else ""
bottle.request.params["node"] if "node" in bottle.request.params else ""
)
serverUrl = urlparts.scheme + "://" + urlparts.netloc
return bottle.template(
Expand Down Expand Up @@ -698,9 +676,7 @@ def stopNM(self, host):
def addNM(self, host, node):
port = constants.ISLAND_DEFAULT_REST_PORT
logger.debug("Adding NM %s to DIM %s", node, host)
with RestClient(
host=host, port=port, timeout=10, url_prefix="/api"
) as c:
with RestClient(host=host, port=port, timeout=10, url_prefix="/api") as c:
return json.loads(
c._POST(
f"/node/{node}",
Expand All @@ -711,9 +687,7 @@ def addNM(self, host, node):
def removeNM(self, host, node):
port = constants.ISLAND_DEFAULT_REST_PORT
logger.debug("Removing NM %s from DIM %s", node, host)
with RestClient(
host=host, port=port, timeout=10, url_prefix="/api"
) as c:
with RestClient(host=host, port=port, timeout=10, url_prefix="/api") as c:
return json.loads(c._DELETE(f"/node/{node}").read())

@daliuge_aware
Expand Down
Loading

0 comments on commit 2f121bc

Please sign in to comment.