Skip to content

Commit

Permalink
Merge 6946e7e into b9e1148
Browse files Browse the repository at this point in the history
  • Loading branch information
awicenec committed Apr 21, 2022
2 parents b9e1148 + 6946e7e commit beaf931
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 22 deletions.
4 changes: 2 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
Data Activated 流 Graph Engine
==============================

.. image:: https://travis-ci.org/ICRAR/daliuge.svg?branch=master
:target: https://travis-ci.org/ICRAR/daliuge
.. image:: https://travis-ci.com/ICRAR/daliuge.svg?branch=master
:target: https://travis-ci.com/github/ICRAR/daliuge

.. image:: https://coveralls.io/repos/github/ICRAR/daliuge/badge.svg?branch=master
:target: https://coveralls.io/github/ICRAR/daliuge?branch=master
Expand Down
7 changes: 7 additions & 0 deletions daliuge-common/dlg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,10 @@

# Declaring this as a namespace package
__path__ = __import__("pkgutil").extend_path(__path__, __name__) # @ReservedAssignment
# set the version
try:
from dlg.common import version
__version__ = version.full_version
except:
# This can happen when running from source
__version__ = 'unknown'
5 changes: 3 additions & 2 deletions daliuge-engine/build_engine.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ case "$1" in
echo "Build finished!"
exit 0 ;;
"dev")
C_TAG="master"
[[ ! -z $2 ]] && C_TAG=$2
export VERSION=`git describe --tags --abbrev=0|sed s/v//`
export VCS_TAG=`git rev-parse --abbrev-ref HEAD | tr '[:upper:]' '[:lower:]'`
echo "Building daliuge-engine development version using daliuge-common:${VCS_TAG}"
echo "Building daliuge-engine development version using daliuge-common:${C_TAG}"
echo "$VERSION:$VCS_TAG" > dlg/manager/web/VERSION
git rev-parse --verify HEAD >> dlg/manager/web/VERSION
cp ../LICENSE dlg/manager/web/.
docker build --build-arg VCS_TAG=${VCS_TAG} --no-cache -t icrar/daliuge-engine:${VCS_TAG} -f docker/Dockerfile.dev .
docker build --build-arg VCS_TAG=${C_TAG} --no-cache -t icrar/daliuge-engine:${VCS_TAG} -f docker/Dockerfile.dev .
echo "Build finished!"
exit 0;;
"devall")
Expand Down
7 changes: 7 additions & 0 deletions daliuge-engine/dlg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,10 @@

# Declaring this as a namespace package
__path__ = __import__("pkgutil").extend_path(__path__, __name__) # @ReservedAssignment
# set the version
try:
from dlg.common import version
__version__ = version.full_version
except:
# This can happen when running from source
__version__ = 'unknown'
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/apps/dockerapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def waitForIp(self, timeout=None):

##
# @brief Docker
# @details
# @details A component wrapping docker based applications.
# @par EAGLE_START
# @param category Docker
# @param tag template
Expand All @@ -93,7 +93,7 @@ def waitForIp(self, timeout=None):
# @param[in] cparam/command_line_arguments Command Line Arguments//String/readwrite/False//False/
# \~English Additional command line arguments to be added to the command line to be executed
# @param[in] cparam/paramValueSeparator Param value separator/ /String/readwrite/False//False/
# \~English Separator character(s) between parameters on the command line
# \~English Separator character(s) between parameters and their respective values on the command line
# @param[in] cparam/argumentPrefix Argument prefix/"--"/String/readwrite/False//False/
# \~English Prefix to each keyed argument on the command line
# @param[in] cparam/execution_time Execution Time/5/Float/readonly/False//False/
Expand Down
76 changes: 65 additions & 11 deletions daliuge-engine/dlg/apps/pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,17 @@ def serialize_func(f):

fser = dill.dumps(f)
fdefaults = {"args":[], "kwargs": {}}
adefaults = {"args":[], "kwargs": {}}
a = inspect.getfullargspec(f)
if a.defaults:
fdefaults["kwargs"] = dict(
zip(a.args[-len(a.defaults):], [serialize_data(d) for d in a.defaults])
)
adefaults["kwargs"] = dict(
zip(a.args[-len(a.defaults):], [d for d in a.defaults])
)
logger.debug(f"Introspection of function {f}: {a}")
logger.debug("Defaults for function %r: %r", f, fdefaults)
logger.debug("Defaults for function %r: %r", f, adefaults)
return fser, fdefaults


Expand Down Expand Up @@ -363,7 +367,7 @@ def run(self):
# #raise ValueError

# use explicit mapping of inputs to arguments first
# TODO: Required by dlg_delayed?? Else, we should reall not do this.
# TODO: Required by dlg_delayed?? Else, we should really not do this.
kwargs = {
name: inputs.pop(uid)
for name, uid in self.func_arg_mapping.items()
Expand All @@ -382,7 +386,7 @@ def run(self):
kwargs = {}
if ('inputs' in self.parameters and isinstance(self.parameters['inputs'][0], dict)):
logger.debug(f"Using named ports to identify inputs: "+\
f"{self.parameters['inputs']}")
f"{self.parameters['inputs']}")
for i in range(min(len(inputs),self.fn_nargs +\
len(self.arguments.kwonlyargs))):
# key for final dict is value in named ports dict
Expand All @@ -397,25 +401,75 @@ def run(self):
logger.debug(f"updating funcargs with {kwargs}")
self.funcargs.update(kwargs)

# Try to get values for still missing positional arguments from Application Args
if "applicationArgs" in self.parameters:
kwargs = {}
posargs = self.arguments.args[:self.fn_npos]
for pa in posargs:
if pa not in self.funcargs:
if pa in self.parameters["applicationArgs"]:
value = self.parameters["applicationArgs"][pa]['value']
ptype = self.parameters["applicationArgs"][pa]['type']
if ptype in ["Complex", "Json"]:
try:
value = ast.literal_eval(value)
except:
pass
kwargs.update({
pa:
value
})
else:
logger.warning(f"Required positional argument '{pa}' not found!")
logger.debug(f"updating funcargs with {kwargs}")
self.funcargs.update(kwargs)

# Try to get values for still missing kwargs arguments from parameters
kwargs = {}
kws = self.arguments.args[self.fn_npos:]
for ka in kws:
if ka not in self.funcargs:
if ka in self.parameters["applicationArgs"]:
value = self.parameters["applicationArgs"][ka]['value']
ptype = self.parameters["applicationArgs"][ka]['type']
if ptype in ["Complex", "Json"]:
try:
value = ast.literal_eval(value)
except:
pass
kwargs.update({
ka:
value
})
else:
logger.warning(f"Keyword argument '{ka}' not found!")
logger.debug(f"updating funcargs with {kwargs}")
self.funcargs.update(kwargs)

# Fill rest with default arguments if there are any more
kwargs = {}
for kw in self.func_defaults.keys():
value = self.func_defaults[kw]
if kw not in self.funcargs:
kwargs.update({kw: self.func_defaults[kw]})
kwargs.update({kw: value})
logger.debug(f"updating funcargs with {kwargs}")
self.funcargs.update(kwargs)

logger.debug(f"Running {self.func_name} with {self.funcargs}")
result = self.f(**self.funcargs)
logger.debug(f"Finished execution of {self.func_name}.")

# Depending on how many outputs we have we treat our result
# as an iterable or as a single object. Each result is pickled
# and written to its corresponding output
outputs = self.outputs
if len(outputs) == 1:
result = [result]
for r, o in zip(result, outputs):
if self.pickle:
o.write(pickle.dumps(r)) # @UndefinedVariable
else:
o.write(repr(r).encode('utf-8'))
if len(outputs) > 0:
if len(outputs) == 1:
result = [result]
for r, o in zip(result, outputs):
p = pickle.dumps(r)
if self.pickle:
logger.debug(f"Writing pickeled result {type(r)} to {o}")
o.write(pickle.dumps(r)) # @UndefinedVariable
else:
o.write(repr(r).encode('utf-8'))
39 changes: 35 additions & 4 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,9 @@ def getmembers(object, predicate=None):
elif isinstance(obj, dlg_dict_param):
value = kwargs.get(attr_name, obj.default_value)
if isinstance(value, str):
value = {}
if value:
if value == "":
value = {}
else:
value = ast.literal_eval(value)
if value is not None and not isinstance(value, dict):
raise Exception(
Expand Down Expand Up @@ -1618,7 +1619,17 @@ def dataURL(self) -> str:
hostname = os.uname()[1]
return f"shmem://{hostname}/{os.getpid()}/{id(self._buf)}"


##
# @brief NULL
# @details A Drop not storing any data (useful for just passing on events)
# @par EAGLE_START
# @param category Memory
# @param tag template
# @param[in] cparam/data_volume Data volume/0/Float/readonly/False//False/
# \~English This never stores any data
# @param[in] cparam/group_end Group end/False/Boolean/readwrite/False//False/
# \~English Is this node the end of a group?
# @par EAGLE_END
class NullDROP(DataDROP):
"""
A DROP that doesn't store any data.
Expand All @@ -1637,7 +1648,27 @@ class EndDROP(NullDROP):
A DROP that ends the session when reached
"""


##
# @brief RDBMS
# @details A Drop allowing storage and retrieval from a SQL DB.
# @par EAGLE_START
# @param category File
# @param tag template
# @param[in] cparam/data_volume Data volume/5/Float/readwrite/False//False/
# \~English Estimated size of the data contained in this node
# @param[in] cparam/group_end Group end/False/Boolean/readwrite/False//False/
# \~English Is this node the end of a group?
# @param[in] cparam/dbmodule Python DB module//String/readwrite/False//False/
# \~English Load path for python DB module
# @param[in] cparam/dbtable DB table name//String/readwrite/False//False/
# \~English The name of the table to use
# @param[in] cparam/vals Values dictionary//Json/readwrite/False//False/
# \~English Json encoded values dictionary used for INSERT. The keys of ``vals`` are used as the column names.
# @param[in] cparam/condition Whats used after WHERE//String/readwrite/False//False/
# \~English Condition for SELECT. For this the WHERE statement must be written using the "{X}" or "{}" placeholders
# @param[in] cparam/selectVals values for WHERE//Json/readwrite/False//False/
# \~English Values for the WHERE statement
# @par EAGLE_END
class RDBMSDrop(DataDROP):
"""
A Drop that stores data in a table of a relational database
Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/test/test_dask_emulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,4 +267,5 @@ def delayed(self, f, *args, **kwargs):
return dask_delayed(f, *args, **kwargs)

def compute(self, val):
logger.info(f"Running compute...")
return dask_compute(val)[0]
7 changes: 7 additions & 0 deletions daliuge-translator/dlg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,10 @@

# Declaring this as a namespace package
__path__ = __import__("pkgutil").extend_path(__path__, __name__) # @ReservedAssignment
# set the version
try:
from dlg.common import version
__version__ = version.full_version
except:
# This can happen when running from source
__version__ = 'unknown'
2 changes: 1 addition & 1 deletion tools/xml2palette/xml2palette.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/python
#!/usr/bin/env python

import csv
import getopt
Expand Down

0 comments on commit beaf931

Please sign in to comment.