Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/liu-294' into liu-296
Browse files Browse the repository at this point in the history
  • Loading branch information
Moritz Wicenec authored and Moritz Wicenec committed Sep 2, 2022
2 parents 1dbf7c6 + 314d2aa commit 0dcbe0f
Show file tree
Hide file tree
Showing 69 changed files with 544 additions and 483 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/run-unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
- name: Install test dependencies
run: |
pip install -U coveralls pytest pytest-cov
pip install -U setuptools pip wheel dask dlg-example-cmpts
pip install -U setuptools pip wheel dask
- name: Install daliuge-common
run: pip install -e daliuge-common/
Expand Down
12 changes: 6 additions & 6 deletions OpenAPI/composite_manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ paths:
type: string
operationId: getCMNodes
description: Returns the list of nodes covered by this Composite Manager
'/api/nodes/{node}':
'/api/node/{node}':
parameters:
- schema:
type: string
Expand All @@ -58,7 +58,7 @@ paths:
'200':
description: OK
description: "Add `node` to this Composite Manager's nodes list"
'/api/nodes/{node}/sessions':
'/api/node/{node}/sessions':
parameters:
- schema:
type: string
Expand Down Expand Up @@ -103,7 +103,7 @@ paths:
description: Generic session information type
operationId: getNodeSessions
description: 'Get the list of sessions on `node`, managed by this Manager'
'/api/nodes/{node}/sessions/{sessionId}':
'/api/node/{node}/sessions/{sessionId}':
parameters:
- schema:
type: string
Expand All @@ -127,7 +127,7 @@ paths:
$ref: './manager_common.yaml#/components/schemas/session'
operationId: getNodeSessionInformation
description: Returns the main information of session `sessionId` in node `node`
'/api/nodes/{node}/sessions/{sessionId}/status':
'/api/node/{node}/sessions/{sessionId}/status':
parameters:
- schema:
type: string
Expand All @@ -151,7 +151,7 @@ paths:
type: integer
operationId: getNodeSessionStatus
description: Returns the status of `sessionId` in `node`
'/api/nodes/{node}/sessions/{sessionId}/graph':
'/api/node/{node}/sessions/{sessionId}/graph':
parameters:
- schema:
type: string
Expand All @@ -175,7 +175,7 @@ paths:
$ref: './manager_common.yaml#/components/schemas/physical_graph'
operationId: getNodeGraph
description: Returns the physical graph of session `sessionId` in `node`
'/api/nodes/{node}/sessions/{sessionId}/graph/status':
'/api/node/{node}/sessions/{sessionId}/graph/status':
parameters:
- schema:
type: string
Expand Down
Empty file removed __init__.py
Empty file.
14 changes: 7 additions & 7 deletions daliuge-common/dlg/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,10 @@ def nodes(self):
return self._get_json("/nodes")

def add_node(self, node):
self._POST(f"/nodes/{node}", content=None)
self._POST(f"/node/{node}", content=None)

def remove_node(self, node):
self._DELETE(f"/nodes/{node}")
self._DELETE(f"/node/{node}")


class DataIslandManagerClient(CompositeManagerClient):
Expand Down Expand Up @@ -277,27 +277,27 @@ def __init__(

def create_island(self, island_host, nodes):
self._post_json(
f"/managers/{quote(island_host)}/dataisland", {"nodes": nodes}
f"/managers/{quote(island_host)}/island", {"nodes": nodes}
)

def dims(self):
return self._get_json("/islands")

def add_dim(self, dim):
self._POST(f"/islands/{dim}", content=None)
self._POST(f"/island/{dim}", content=None)

def remove_dim(self, dim):
self._DELETE(f"/islands/{dim}")
self._DELETE(f"/island/{dim}")

def add_node_to_dim(self, dim, nm):
"""
Adds a nm to a dim
"""
self._POST(
f"managers/{dim}/nodes/{nm}", content=None, )
f"/managers/{dim}/node/{nm}", content=None, )

def remove_node_from_dim(self, dim, nm):
"""
Removes a nm from a dim
"""
self._DELETE(f"managers/{dim}/nodes/{nm}")
self._DELETE(f"/managers/{dim}/node/{nm}")
21 changes: 17 additions & 4 deletions daliuge-common/dlg/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,25 @@ class Categories:


class DropType:
PLAIN = "plain"
# this gives the mapping to fields containing class paths
DATA = "data" # TODO: need to drop this one
DATACLASS = "dataclass"
APPCLASS = "appclass"
SOCKET = "socket"
APP = "app" # Application drop that terminates onces executed
APP = "app" # TODO: need to drop this one
SERVICE_APP = "serviceapp" # App drop that runs continously
CONTAINER = "container" # Drop that contains other drops

class CategoryType:
DATA = "Data"
APPLICATION = "Application"
GROUP = "Group"
UNKNOWN = "Unknown"
SERVICE = "Service"
CONTAINER = "Container"
SOCKET = "Socket"
CONTROL = "Control"
OTHER = "Other"

def b2s(b, enc="utf8"):
"Converts bytes into a string"
Expand Down Expand Up @@ -200,7 +213,7 @@ def get_roots(pg_spec):
if dropspec.get("outputs", None):
do = _sanitize_links(dropspec["outputs"])
nonroots |= set(do)
elif dropspec["type"] == DropType.PLAIN:
elif dropspec["type"] == DropType.DATA:
if dropspec.get("producers", None):
nonroots.add(oid)
if dropspec.get("consumers", None):
Expand Down Expand Up @@ -245,7 +258,7 @@ def get_leaves(pg_spec):
if dropspec.get("inputs", None):
di = _sanitize_links(dropspec["inputs"])
nonleaves |= set(di)
elif dropspec["type"] == DropType.PLAIN:
elif dropspec["type"] == DropType.DATA:
if dropspec.get("producers", None):
dp = _sanitize_links(dropspec["producers"])
nonleaves |= set(dp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ def pgt_unroll_block_fields(category_type, rmode: ReproducibilityFlags):
if rmode != ReproducibilityFlags.NOTHING:
data["type"] = FieldOps.STORE
if rmode != ReproducibilityFlags.REPRODUCE:
if category_type != "plain":
if category_type != "data":
data["dt"] = FieldOps.STORE
if category_type == "plain":
if category_type == "data":
data["storage"] = FieldOps.STORE
if rmode in (ReproducibilityFlags.RECOMPUTE, ReproducibilityFlags.REPLICATE_COMP):
data["rank"] = FieldOps.STORE
Expand Down
27 changes: 24 additions & 3 deletions daliuge-engine/dlg/dask_emulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import struct
import time

from dlg.common import CategoryType, DropType

from . import utils, droputils
from .apps import pyfunc
from .common import dropdict, Categories
Expand Down Expand Up @@ -107,10 +109,13 @@ def compute(value, **kwargs):
transmitter = dropdict(
{
"type": "app",
# "categoryType": CategoryType.APPLICATION,
"app": "dlg.dask_emulation.ResultTransmitter",
"appclass": "dlg.dask_emulation.ResultTransmitter",
"oid": transmitter_oid,
"port": port,
"nm": "result transmitter",
"text": "result transmitter",
}
)
for leaf_oid in droputils.get_leaves(graph.values()):
Expand Down Expand Up @@ -265,7 +270,13 @@ def __getitem__(self, i):

def make_dropdict(self):
return dropdict(
{"type": "app", "app": "dlg.dask_emulation._Listifier", "nm": "listifier"}
{
"type": "app",
"categoryType": CategoryType.APPLICATION,
"app": "dlg.dask_emulation._Listifier",
"nm": "listifier",
"text": "listifier",
}
)

def __repr__(self):
Expand All @@ -291,12 +302,19 @@ def make_dropdict(self):
self.kwarg_names = list(self.original_kwarg_names)
self.kwarg_names.reverse()
my_dropdict = dropdict(
{"type": "app", "app": "dlg.apps.pyfunc.PyFuncApp", "func_arg_mapping": {}}
{
"type": "app",
"categoryType": CategoryType.APPLICATION,
"app": "dlg.apps.pyfunc.PyFuncApp",
"appclass": "dlg.apps.pyfunc.PyFuncApp",
"func_arg_mapping": {}
}
)
if self.fname is not None:
simple_fname = self.fname.split(".")[-1]
my_dropdict["func_name"] = self.fname
my_dropdict["nm"] = simple_fname
my_dropdict["text"] = simple_fname
if self.fcode is not None:
my_dropdict["func_code"] = utils.b2s(base64.b64encode(self.fcode))
if self.fdefaults:
Expand Down Expand Up @@ -366,7 +384,10 @@ def __init__(self, producer=None, pydata=_no_data):
logger.debug("Created %r", self)

def make_dropdict(self):
my_dropdict = dropdict({"type": "plain", "storage": Categories.MEMORY})
my_dropdict = dropdict(
{"type": "data",
"categoryType": CategoryType.DATA,
"storage": Categories.MEMORY})
if not self.producer:
my_dropdict["pydata"] = pyfunc.serialize_data(self.pydata)
return my_dropdict
Expand Down
8 changes: 8 additions & 0 deletions daliuge-engine/dlg/data/drops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
"RDBMSDrop",
"PlasmaDROP",
"PlasmaFlightDROP",
"ParameterSetDROP",
"EnvironmentVarDROP",
"S3DROP",
"DataDummyDROP"
]

from dlg.data.drops.directorycontainer import DirectoryContainer
Expand All @@ -42,3 +46,7 @@
from dlg.data.drops.ngas import NgasDROP
from dlg.data.drops.plasma import PlasmaDROP, PlasmaFlightDROP
from dlg.data.drops.rdbms import RDBMSDrop
from dlg.data.drops.parset_drop import ParameterSetDROP
from dlg.data.drops.environmentvar_drop import EnvironmentVarDROP
from dlg.data.drops.s3_drop import S3DROP
from dlg.data.drops.dataDummy_drop import DataDummyDROP
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@
"""
Drops that interact with AWS S3
"""
import boto3
import botocore
from asyncio.log import logger

from .drop import AbstractDROP
from dlg.data.io import ErrorIO
from .meta import dlg_string_param, dlg_list_param
try:
import boto3
import botocore
except ImportError:
logger.warning("BOTO bindings are not available")

from ...drop import AbstractDROP
from dlg.data.io import ErrorIO
from ...meta import dlg_string_param, dlg_list_param

##
# @brief S3
Expand Down
6 changes: 3 additions & 3 deletions daliuge-engine/dlg/deploy/utils/monitor_replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def parse_status(self, gexf_file, out_dir=None, remove_gexf=False):
def get_downstream_drop_ids(self, dropspec):
if dropspec["type"] == "app":
ds_kw = "outputs" # down stream key word
elif dropspec["type"] == "plain":
elif dropspec["type"] == "data":
ds_kw = "consumers"
else:
ds_kw = "None"
Expand Down Expand Up @@ -373,7 +373,7 @@ def build_drop_fullgraphs(self, do_subgraph=False, graph_lib="pygraphviz"):
G.add_node(
gid, shape="rect", label=""
) # , fixedsize=True, hight=.05, width=.05)
elif dropspec["type"] == "plain": # parallelogram
elif dropspec["type"] == "data": # parallelogram
G.add_node(
gid, shape="circle", label=""
) # , fixedsize=True, hight=.05, width=.05)
Expand All @@ -383,7 +383,7 @@ def build_drop_fullgraphs(self, do_subgraph=False, graph_lib="pygraphviz"):
gid = oid_gnid_dict[dropspec["oid"]]
if dropspec["type"] == "app":
ds_kw = "outputs" # down stream key word
elif dropspec["type"] == "plain":
elif dropspec["type"] == "data":
ds_kw = "consumers"
else:
ds_kw = "None"
Expand Down
Loading

0 comments on commit 0dcbe0f

Please sign in to comment.