Skip to content

Commit

Permalink
Merge branch 'master' into yan-927
Browse files Browse the repository at this point in the history
  • Loading branch information
pritchardn committed May 12, 2022
2 parents 2bd2891 + 897d74f commit 5d63459
Show file tree
Hide file tree
Showing 218 changed files with 26,414 additions and 5,299 deletions.
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

# To use docker later...
sudo: required
dist: focal

# let's go!
language: python
Expand All @@ -34,6 +35,7 @@ matrix:
env: NO_DLG_RUNTIME=1
- python: "3.8"
env: NO_DLG_TRANSLATOR=1
- python: "3.9"
# NOTE: The OpenAPI code still needs to be removed
# - python: "3.8"
# env: TEST_OPENAPI=1
Expand Down Expand Up @@ -68,7 +70,7 @@ matrix:
python: "3.8"
before_install:
install:
- pip install sphinx sphinx-rtd-theme
- pip install sphinx sphinx-rtd-theme gputil merklelib
script:
- READTHEDOCS=True make -C docs html SPHINXOPTS="-W --keep-going"

Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ RUN cd /home/ray/daliuge/daliuge-common && pip install . \
&& sudo apt-get remove cmake gcc -y \
&& sudo apt-get clean

CMD ["dlg", "daemon", "-vv", "--no-nm"]
CMD ["dlg", "daemon", "-vv", "--no-nm"]
1 change: 0 additions & 1 deletion OpenAPI/tests/managers_test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import node_manager_client as nmc
from composite_manager_client.api.default_api import DefaultApi


nm_config = nmc.Configuration()
nm_config.host = "127.0.0.1:8000"
dim_config = cmc.Configuration()
Expand Down
1 change: 0 additions & 1 deletion OpenAPI/tests/translator_test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import translator_client as tc


translator_config = tc.Configuration()
translator_config.host = "127.0.0.1:8084"

Expand Down
4 changes: 2 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
Data Activated 流 Graph Engine
==============================

.. image:: https://travis-ci.org/ICRAR/daliuge.svg?branch=master
:target: https://travis-ci.org/ICRAR/daliuge
.. image:: https://travis-ci.com/ICRAR/daliuge.svg?branch=master
:target: https://travis-ci.com/github/ICRAR/daliuge

.. image:: https://coveralls.io/repos/github/ICRAR/daliuge/badge.svg?branch=master
:target: https://coveralls.io/github/ICRAR/daliuge?branch=master
Expand Down
Empty file added __init__.py
Empty file.
2 changes: 1 addition & 1 deletion daliuge-common/build_common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ case "$1" in
"devcuda")
export VCS_TAG=`git rev-parse --abbrev-ref HEAD | tr '[:upper:]' '[:lower:]'`
echo "Building daliuge-common development version using tag ${VCS_TAG}"
docker build --build-arg VCS_TAG=${VCS_TAG} --no-cache -t icrar/daliuge-common:${VCS_TAG} -f docker/Dockerfile.cuda .
docker build --build-arg VCS_TAG=${VCS_TAG} --no-cache -t icrar/daliuge-common:${VCS_TAG} -f docker/Dockerfile.devcuda .
echo "Build finished!"
exit 0;;
"casa")
Expand Down
8 changes: 8 additions & 0 deletions daliuge-common/dlg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,11 @@

# Declaring this as a namespace package
__path__ = __import__("pkgutil").extend_path(__path__, __name__) # @ReservedAssignment
# set the version
try:
from dlg.common import version

__version__ = version.full_version
except:
# This can happen when running from source
__version__ = "unknown"
27 changes: 27 additions & 0 deletions daliuge-common/dlg/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,33 @@ def session_status(self, sessionId):
)
return status

def session_repro_status(self, sessionId):
"""
Returns the reproducibility status of session `sessionId`.
"""
status = self._get_json("/sessions/%s/repro/status" % (quote(sessionId),))
logger.debug(
"Successfully read session %s reproducibility status (%s) from %s:%s",
sessionId,
status,
self.host,
self.port,
)
return status

def session_repro_data(self, sessionId):
"""
Returns the graph-wide reproducibility information of session `sessionId`.
"""
data = self._get_json("/sessions/%s/repro/data" % (quote(sessionId),))
logger.debug(
"Successfully read session %s reproducibility data from %s:%s",
sessionId,
self.host,
self.port,
)
return data

def graph_size(self, sessionId):
"""
Returns the size of the graph of session `sessionId`
Expand Down
74 changes: 50 additions & 24 deletions daliuge-common/dlg/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class Categories:
PLASMA = "Plasma"
PLASMAFLIGHT = "PlasmaFlight"
PARSET = "ParameterSet"
ENVIRONMENTVARS = "EnvironmentVars"
ENVIRONMENTVARS = "EnvironmentVariables"

MKN = "MKN"
SCATTER = "Scatter"
Expand Down Expand Up @@ -74,7 +74,7 @@ class Categories:
Categories.PLASMA,
Categories.PLASMAFLIGHT,
Categories.PARSET,
Categories.ENVIRONMENTVARS
Categories.ENVIRONMENTVARS,
}
APP_DROP_TYPES = [
Categories.COMPONENT,
Expand Down Expand Up @@ -125,29 +125,47 @@ class dropdict(dict):
DROPManager.
"""

def _addSomething(self, other, key):
def _addSomething(self, other, key, IdText=None):
if key not in self:
self[key] = []
if other["oid"] not in self[key]:
self[key].append(other["oid"])
append = {other["oid"]: IdText} if IdText else other["oid"]
self[key].append(append)

def addConsumer(self, other):
self._addSomething(other, "consumers")
def addConsumer(self, other, IdText=None):
self._addSomething(other, "consumers", IdText=IdText)

def addStreamingConsumer(self, other):
self._addSomething(other, "streamingConsumers")
def addStreamingConsumer(self, other, IdText=None):
self._addSomething(other, "streamingConsumers", IdText=IdText)

def addInput(self, other):
self._addSomething(other, "inputs")
def addInput(self, other, IdText=None):
self._addSomething(other, "inputs", IdText=IdText)

def addStreamingInput(self, other):
self._addSomething(other, "streamingInputs")
def addStreamingInput(self, other, IdText=None):
self._addSomething(other, "streamingInputs", IdText=IdText)

def addOutput(self, other):
self._addSomething(other, "outputs")
def addOutput(self, other, IdText=None):
self._addSomething(other, "outputs", IdText=IdText)

def addProducer(self, other):
self._addSomething(other, "producers")
def addProducer(self, other, IdText=None):
self._addSomething(other, "producers", IdText=IdText)


def _sanitize_links(links):
"""
Links can now be dictionaries, but we only need
the key.
"""
if isinstance(links, list):
nlinks = []
for l in links:
if isinstance(l, dict): # could be a list of dicts
nlinks.extend(list(l.keys()))
else:
nlinks.extend(l) if isinstance(l, list) else nlinks.append(l)
return nlinks
elif isinstance(links, dict):
return list(links.keys()) if isinstance(links, dict) else links


def get_roots(pg_spec):
Expand All @@ -169,14 +187,17 @@ def get_roots(pg_spec):
if dropspec.get("inputs", None) or dropspec.get("streamingInputs", None):
nonroots.add(oid)
if dropspec.get("outputs", None):
nonroots |= set(dropspec["outputs"])
do = _sanitize_links(dropspec["outputs"])
nonroots |= set(do)
elif dropspec["type"] == DropType.PLAIN:
if dropspec.get("producers", None):
nonroots.add(oid)
if dropspec.get("consumers", None):
nonroots |= set(dropspec["consumers"])
dc = _sanitize_links(dropspec["consumers"])
nonroots |= set(dc)
if dropspec.get("streamingConsumers", None):
nonroots |= set(dropspec["streamingConsumers"])
dsc = _sanitize_links(dropspec["streamingConsumers"])
nonroots |= set(dsc)

return all_oids - nonroots

Expand All @@ -200,18 +221,23 @@ def get_leaves(pg_spec):
if dropspec.get("outputs", None):
nonleaves.add(oid)
if dropspec.get("streamingInputs", None):
nonleaves |= set(dropspec["streamingInputs"])
dsi = _sanitize_links(dropspec["streamingInputs"])
nonleaves |= set(dsi)
if dropspec.get("inputs", None):
nonleaves |= set(dropspec["inputs"])
di = _sanitize_links(dropspec["inputs"])
nonleaves |= set(di)
if dropspec["type"] == DropType.SERVICE_APP:
nonleaves.add(oid) # services are never leaves
if dropspec.get("streamingInputs", None):
nonleaves |= set(dropspec["streamingInputs"])
dsi = _sanitize_links(dropspec["streamingInputs"])
nonleaves |= set(dsi)
if dropspec.get("inputs", None):
nonleaves |= set(dropspec["inputs"])
di = _sanitize_links(dropspec["inputs"])
nonleaves |= set(di)
elif dropspec["type"] == DropType.PLAIN:
if dropspec.get("producers", None):
nonleaves |= set(dropspec["producers"])
dp = _sanitize_links(dropspec["producers"])
nonleaves |= set(dp)
if dropspec.get("consumers", None) or dropspec.get(
"streamingConsumers", None
):
Expand Down
5 changes: 2 additions & 3 deletions daliuge-common/dlg/common/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#
import contextlib
import errno
import logging
import socket
import time
import contextlib


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -162,7 +161,7 @@ def connect_to(host, port, timeout=None):
return s


def write_to(host, port, data, timeout=None):
def write_to(host, port, data, timeout=5):
"""
Connects to ``host``:``port`` within the given timeout and write the given
piece of ``data`` into the connected socket.
Expand Down
1 change: 0 additions & 1 deletion daliuge-common/dlg/common/osutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import math
import time


logger = logging.getLogger(__name__)


Expand Down
Empty file.
83 changes: 83 additions & 0 deletions daliuge-common/dlg/common/reproducibility/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""
Contains several very basic apps to test python function reproducibility.
"""
import numpy as np

from dlg.apps.pyfunc import PyFuncApp


def write_in():
"""
:return: "world" always
"""
return "world"


def write_out(phrase="everybody"):
"""
Appends s to "Hello "
:param phrase: The string to be appended
:return: "Hello " + s
"""
return "Hello " + phrase


def numpy_av(nums):
"""
Finds the mean of a list of numbers using numpy.
:param nums: The numbers to be averaged.
:return: The mean.
"""
return np.asscalar(np.mean(nums))


def my_av(nums):
"""
Finds the mean of a list of numbers manually
:param nums: The numbers to be averaged
:return: The mean.
"""
res = 0.0
for num in nums:
res += num
return res / len(nums)


class HelloWorldPythonIn(PyFuncApp):
"""
Wrapper app turning writeIn into a Python function app
"""

def initialize(self, **kwargs):
fname = "dlg.common.reproducibility.apps.write_in"
super().initialize(func_name=fname)


class HelloWorldPythonOut(PyFuncApp):
"""
Wrapper app turning writeOut into a Python function app
"""

def initialize(self, **kwargs):
fname = "dlg.common.reproducibility.apps.write_out"
super().initialize(func_name=fname)


class NumpyAverage(PyFuncApp):
"""
Wrapper app turning numpy_av into a Python function app
"""

def initialize(self, **kwargs):
fname = "dlg.common.reproducibility.apps.numpy_av"
super().initialize(func_name=fname)


class MyAverage(PyFuncApp):
"""
Wrapper app turning my_av into a Python function app
"""

def initialize(self, **kwargs):
fname = "dlg.common.reproducibility.apps.my_av"
super().initialize(func_name=fname)
Loading

0 comments on commit 5d63459

Please sign in to comment.