diff --git a/README.rst b/README.rst index 227014db1..9fef7cc80 100644 --- a/README.rst +++ b/README.rst @@ -1,8 +1,8 @@ Data Activated 流 Graph Engine ============================== -.. image:: https://travis-ci.org/ICRAR/daliuge.svg?branch=master - :target: https://travis-ci.org/ICRAR/daliuge +.. image:: https://travis-ci.com/ICRAR/daliuge.svg?branch=master + :target: https://travis-ci.com/github/ICRAR/daliuge .. image:: https://coveralls.io/repos/github/ICRAR/daliuge/badge.svg?branch=master :target: https://coveralls.io/github/ICRAR/daliuge?branch=master diff --git a/daliuge-common/dlg/__init__.py b/daliuge-common/dlg/__init__.py index 0cb96f675..a3d589079 100644 --- a/daliuge-common/dlg/__init__.py +++ b/daliuge-common/dlg/__init__.py @@ -22,3 +22,10 @@ # Declaring this as a namespace package __path__ = __import__("pkgutil").extend_path(__path__, __name__) # @ReservedAssignment +# set the version +try: + from dlg.common import version + __version__ = version.full_version +except: + # This can happen when running from source + __version__ = 'unknown' diff --git a/daliuge-engine/build_engine.sh b/daliuge-engine/build_engine.sh index e35957ece..010f62726 100755 --- a/daliuge-engine/build_engine.sh +++ b/daliuge-engine/build_engine.sh @@ -13,14 +13,15 @@ case "$1" in echo "Build finished!" exit 0 ;; "dev") + C_TAG="master" [[ ! -z $2 ]] && C_TAG=$2 export VERSION=`git describe --tags --abbrev=0|sed s/v//` export VCS_TAG=`git rev-parse --abbrev-ref HEAD | tr '[:upper:]' '[:lower:]'` - echo "Building daliuge-engine development version using daliuge-common:${VCS_TAG}" + echo "Building daliuge-engine development version using daliuge-common:${C_TAG}" echo "$VERSION:$VCS_TAG" > dlg/manager/web/VERSION git rev-parse --verify HEAD >> dlg/manager/web/VERSION cp ../LICENSE dlg/manager/web/. - docker build --build-arg VCS_TAG=${VCS_TAG} --no-cache -t icrar/daliuge-engine:${VCS_TAG} -f docker/Dockerfile.dev . + docker build --build-arg VCS_TAG=${C_TAG} --no-cache -t icrar/daliuge-engine:${VCS_TAG} -f docker/Dockerfile.dev . echo "Build finished!" exit 0;; "devall") diff --git a/daliuge-engine/dlg/__init__.py b/daliuge-engine/dlg/__init__.py index 0cb96f675..a3d589079 100644 --- a/daliuge-engine/dlg/__init__.py +++ b/daliuge-engine/dlg/__init__.py @@ -22,3 +22,10 @@ # Declaring this as a namespace package __path__ = __import__("pkgutil").extend_path(__path__, __name__) # @ReservedAssignment +# set the version +try: + from dlg.common import version + __version__ = version.full_version +except: + # This can happen when running from source + __version__ = 'unknown' diff --git a/daliuge-engine/dlg/apps/dockerapp.py b/daliuge-engine/dlg/apps/dockerapp.py index 948d36c7d..78d635e74 100644 --- a/daliuge-engine/dlg/apps/dockerapp.py +++ b/daliuge-engine/dlg/apps/dockerapp.py @@ -74,7 +74,7 @@ def waitForIp(self, timeout=None): ## # @brief Docker -# @details +# @details A component wrapping docker based applications. # @par EAGLE_START # @param category Docker # @param tag template @@ -93,7 +93,7 @@ def waitForIp(self, timeout=None): # @param[in] cparam/command_line_arguments Command Line Arguments//String/readwrite/False//False/ # \~English Additional command line arguments to be added to the command line to be executed # @param[in] cparam/paramValueSeparator Param value separator/ /String/readwrite/False//False/ -# \~English Separator character(s) between parameters on the command line +# \~English Separator character(s) between parameters and their respective values on the command line # @param[in] cparam/argumentPrefix Argument prefix/"--"/String/readwrite/False//False/ # \~English Prefix to each keyed argument on the command line # @param[in] cparam/execution_time Execution Time/5/Float/readonly/False//False/ diff --git a/daliuge-engine/dlg/apps/pyfunc.py b/daliuge-engine/dlg/apps/pyfunc.py index f60e07746..ad0f37bda 100644 --- a/daliuge-engine/dlg/apps/pyfunc.py +++ b/daliuge-engine/dlg/apps/pyfunc.py @@ -65,13 +65,17 @@ def serialize_func(f): fser = dill.dumps(f) fdefaults = {"args":[], "kwargs": {}} + adefaults = {"args":[], "kwargs": {}} a = inspect.getfullargspec(f) if a.defaults: fdefaults["kwargs"] = dict( zip(a.args[-len(a.defaults):], [serialize_data(d) for d in a.defaults]) ) + adefaults["kwargs"] = dict( + zip(a.args[-len(a.defaults):], [d for d in a.defaults]) + ) logger.debug(f"Introspection of function {f}: {a}") - logger.debug("Defaults for function %r: %r", f, fdefaults) + logger.debug("Defaults for function %r: %r", f, adefaults) return fser, fdefaults @@ -363,7 +367,7 @@ def run(self): # #raise ValueError # use explicit mapping of inputs to arguments first - # TODO: Required by dlg_delayed?? Else, we should reall not do this. + # TODO: Required by dlg_delayed?? Else, we should really not do this. kwargs = { name: inputs.pop(uid) for name, uid in self.func_arg_mapping.items() @@ -382,7 +386,7 @@ def run(self): kwargs = {} if ('inputs' in self.parameters and isinstance(self.parameters['inputs'][0], dict)): logger.debug(f"Using named ports to identify inputs: "+\ - f"{self.parameters['inputs']}") + f"{self.parameters['inputs']}") for i in range(min(len(inputs),self.fn_nargs +\ len(self.arguments.kwonlyargs))): # key for final dict is value in named ports dict @@ -397,25 +401,75 @@ def run(self): logger.debug(f"updating funcargs with {kwargs}") self.funcargs.update(kwargs) + # Try to get values for still missing positional arguments from Application Args + if "applicationArgs" in self.parameters: + kwargs = {} + posargs = self.arguments.args[:self.fn_npos] + for pa in posargs: + if pa not in self.funcargs: + if pa in self.parameters["applicationArgs"]: + value = self.parameters["applicationArgs"][pa]['value'] + ptype = self.parameters["applicationArgs"][pa]['type'] + if ptype in ["Complex", "Json"]: + try: + value = ast.literal_eval(value) + except: + pass + kwargs.update({ + pa: + value + }) + else: + logger.warning(f"Required positional argument '{pa}' not found!") + logger.debug(f"updating funcargs with {kwargs}") + self.funcargs.update(kwargs) + + # Try to get values for still missing kwargs arguments from parameters + kwargs = {} + kws = self.arguments.args[self.fn_npos:] + for ka in kws: + if ka not in self.funcargs: + if ka in self.parameters["applicationArgs"]: + value = self.parameters["applicationArgs"][ka]['value'] + ptype = self.parameters["applicationArgs"][ka]['type'] + if ptype in ["Complex", "Json"]: + try: + value = ast.literal_eval(value) + except: + pass + kwargs.update({ + ka: + value + }) + else: + logger.warning(f"Keyword argument '{ka}' not found!") + logger.debug(f"updating funcargs with {kwargs}") + self.funcargs.update(kwargs) + # Fill rest with default arguments if there are any more kwargs = {} for kw in self.func_defaults.keys(): + value = self.func_defaults[kw] if kw not in self.funcargs: - kwargs.update({kw: self.func_defaults[kw]}) + kwargs.update({kw: value}) logger.debug(f"updating funcargs with {kwargs}") self.funcargs.update(kwargs) logger.debug(f"Running {self.func_name} with {self.funcargs}") result = self.f(**self.funcargs) + logger.debug(f"Finished execution of {self.func_name}.") # Depending on how many outputs we have we treat our result # as an iterable or as a single object. Each result is pickled # and written to its corresponding output outputs = self.outputs - if len(outputs) == 1: - result = [result] - for r, o in zip(result, outputs): - if self.pickle: - o.write(pickle.dumps(r)) # @UndefinedVariable - else: - o.write(repr(r).encode('utf-8')) + if len(outputs) > 0: + if len(outputs) == 1: + result = [result] + for r, o in zip(result, outputs): + p = pickle.dumps(r) + if self.pickle: + logger.debug(f"Writing pickeled result {type(r)} to {o}") + o.write(pickle.dumps(r)) # @UndefinedVariable + else: + o.write(repr(r).encode('utf-8')) diff --git a/daliuge-engine/dlg/drop.py b/daliuge-engine/dlg/drop.py index de1102e73..f46792bd6 100644 --- a/daliuge-engine/dlg/drop.py +++ b/daliuge-engine/dlg/drop.py @@ -387,8 +387,9 @@ def getmembers(object, predicate=None): elif isinstance(obj, dlg_dict_param): value = kwargs.get(attr_name, obj.default_value) if isinstance(value, str): - value = {} - if value: + if value == "": + value = {} + else: value = ast.literal_eval(value) if value is not None and not isinstance(value, dict): raise Exception( @@ -1618,7 +1619,17 @@ def dataURL(self) -> str: hostname = os.uname()[1] return f"shmem://{hostname}/{os.getpid()}/{id(self._buf)}" - +## +# @brief NULL +# @details A Drop not storing any data (useful for just passing on events) +# @par EAGLE_START +# @param category Memory +# @param tag template +# @param[in] cparam/data_volume Data volume/0/Float/readonly/False//False/ +# \~English This never stores any data +# @param[in] cparam/group_end Group end/False/Boolean/readwrite/False//False/ +# \~English Is this node the end of a group? +# @par EAGLE_END class NullDROP(DataDROP): """ A DROP that doesn't store any data. @@ -1637,7 +1648,27 @@ class EndDROP(NullDROP): A DROP that ends the session when reached """ - +## +# @brief RDBMS +# @details A Drop allowing storage and retrieval from a SQL DB. +# @par EAGLE_START +# @param category File +# @param tag template +# @param[in] cparam/data_volume Data volume/5/Float/readwrite/False//False/ +# \~English Estimated size of the data contained in this node +# @param[in] cparam/group_end Group end/False/Boolean/readwrite/False//False/ +# \~English Is this node the end of a group? +# @param[in] cparam/dbmodule Python DB module//String/readwrite/False//False/ +# \~English Load path for python DB module +# @param[in] cparam/dbtable DB table name//String/readwrite/False//False/ +# \~English The name of the table to use +# @param[in] cparam/vals Values dictionary//Json/readwrite/False//False/ +# \~English Json encoded values dictionary used for INSERT. The keys of ``vals`` are used as the column names. +# @param[in] cparam/condition Whats used after WHERE//String/readwrite/False//False/ +# \~English Condition for SELECT. For this the WHERE statement must be written using the "{X}" or "{}" placeholders +# @param[in] cparam/selectVals values for WHERE//Json/readwrite/False//False/ +# \~English Values for the WHERE statement +# @par EAGLE_END class RDBMSDrop(DataDROP): """ A Drop that stores data in a table of a relational database diff --git a/daliuge-engine/test/test_dask_emulation.py b/daliuge-engine/test/test_dask_emulation.py index fb6415df4..9f20c003d 100644 --- a/daliuge-engine/test/test_dask_emulation.py +++ b/daliuge-engine/test/test_dask_emulation.py @@ -267,4 +267,5 @@ def delayed(self, f, *args, **kwargs): return dask_delayed(f, *args, **kwargs) def compute(self, val): + logger.info(f"Running compute...") return dask_compute(val)[0] diff --git a/daliuge-translator/dlg/__init__.py b/daliuge-translator/dlg/__init__.py index 0cb96f675..a3d589079 100644 --- a/daliuge-translator/dlg/__init__.py +++ b/daliuge-translator/dlg/__init__.py @@ -22,3 +22,10 @@ # Declaring this as a namespace package __path__ = __import__("pkgutil").extend_path(__path__, __name__) # @ReservedAssignment +# set the version +try: + from dlg.common import version + __version__ = version.full_version +except: + # This can happen when running from source + __version__ = 'unknown' diff --git a/tools/xml2palette/xml2palette.py b/tools/xml2palette/xml2palette.py old mode 100644 new mode 100755 index 9edd8aefb..f923723a1 --- a/tools/xml2palette/xml2palette.py +++ b/tools/xml2palette/xml2palette.py @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/env python import csv import getopt