Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into YAN-913-async-exec
Browse files Browse the repository at this point in the history
  • Loading branch information
calgray committed Jun 2, 2022
2 parents 221a86b + 7867702 commit f9cfd56
Show file tree
Hide file tree
Showing 25 changed files with 879 additions and 276 deletions.
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
#

# To use docker later...
sudo: required
os: linux
dist: focal

# let's go!
language: python
matrix:
jobs:
include:
# - python: "3.10"
- python: "3.8"
Expand Down Expand Up @@ -70,7 +70,7 @@ matrix:
python: "3.8"
before_install:
install:
- pip install sphinx sphinx-rtd-theme gputil merklelib
- pip install sphinx-rtd-theme sphinx gputil merklelib
script:
- READTHEDOCS=True make -C docs html SPHINXOPTS="-W --keep-going"

Expand Down
63 changes: 42 additions & 21 deletions daliuge-common/dlg/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from . import constants
from .restutils import RestClient


logger = logging.getLogger(__name__)
compress = os.environ.get("DALIUGE_COMPRESSED_JSON", True)

Expand All @@ -49,7 +48,7 @@ def stop(self):
self._POST("/stop")

def cancelSession(self, sessionId):
self._POST("/sessions/%s/cancel" % quote(sessionId))
self._POST(f"/sessions/{quote(sessionId)}/cancel")

def create_session(self, sessionId):
"""
Expand All @@ -68,7 +67,7 @@ def deploy_session(self, sessionId, completed_uids=[]):
content = None
if completed_uids:
content = {"completed": ",".join(completed_uids)}
self._post_form("/sessions/%s/deploy" % (quote(sessionId),), content)
self._post_form(f"/sessions/{quote(sessionId)}/deploy", content)
logger.debug(
"Successfully deployed session %s on %s:%s", sessionId, self.host, self.port
)
Expand All @@ -79,7 +78,7 @@ def append_graph(self, sessionId, graphSpec):
but checking that the graph looks correct
"""
self._post_json(
"/sessions/%s/graph/append" % (quote(sessionId),),
f"/sessions/{quote(sessionId)}/graph/append",
graphSpec,
compress=compress,
)
Expand All @@ -94,7 +93,7 @@ def destroy_session(self, sessionId):
"""
Destroys session `sessionId`
"""
self._DELETE("/sessions/%s" % (quote(sessionId),))
self._DELETE(f"/sessions/{quote(sessionId)}")
logger.debug(
"Successfully deleted session %s on %s:%s", sessionId, self.host, self.port
)
Expand All @@ -104,7 +103,7 @@ def graph_status(self, sessionId):
Returns a dictionary where the keys are DROP UIDs and the values are
their corresponding status.
"""
ret = self._get_json("/sessions/%s/graph/status" % (quote(sessionId),))
ret = self._get_json(f"/sessions/{quote(sessionId)}/graph/status")
logger.debug(
"Successfully read graph status from session %s on %s:%s",
sessionId,
Expand All @@ -118,7 +117,7 @@ def graph(self, sessionId):
Returns a dictionary where the key are the DROP UIDs, and the values are
the DROP specifications.
"""
graph = self._get_json("/sessions/%s/graph" % (quote(sessionId),))
graph = self._get_json(f"/sessions/{quote(sessionId)}/graph")
logger.debug(
"Successfully read graph (%d nodes) from session %s on %s:%s",
len(graph),
Expand All @@ -137,15 +136,15 @@ def sessions(self):
"Successfully read %d sessions from %s:%s",
len(sessions),
self.host,
self.port,
self.port
)
return sessions

def session(self, sessionId):
"""
Returns the details of sessions `sessionId`
"""
session = self._get_json("/sessions/%s" % (quote(sessionId),))
session = self._get_json(f"/sessions/{quote(sessionId)}")
logger.debug(
"Successfully read session %s from %s:%s", sessionId, self.host, self.port
)
Expand All @@ -155,7 +154,7 @@ def session_status(self, sessionId):
"""
Returns the status of session `sessionId`
"""
status = self._get_json("/sessions/%s/status" % (quote(sessionId),))
status = self._get_json(f"/sessions/{quote(sessionId)}/status")
logger.debug(
"Successfully read session %s status (%s) from %s:%s",
sessionId,
Expand All @@ -169,7 +168,7 @@ def session_repro_status(self, sessionId):
"""
Returns the reproducibility status of session `sessionId`.
"""
status = self._get_json("/sessions/%s/repro/status" % (quote(sessionId),))
status = self._get_json(f"/sessions/{quote(sessionId)}/repro/status")
logger.debug(
"Successfully read session %s reproducibility status (%s) from %s:%s",
sessionId,
Expand All @@ -183,7 +182,7 @@ def session_repro_data(self, sessionId):
"""
Returns the graph-wide reproducibility information of session `sessionId`.
"""
data = self._get_json("/sessions/%s/repro/data" % (quote(sessionId),))
data = self._get_json(f"/sessions/{quote(sessionId)}/repro/data")
logger.debug(
"Successfully read session %s reproducibility data from %s:%s",
sessionId,
Expand All @@ -196,7 +195,7 @@ def graph_size(self, sessionId):
"""
Returns the size of the graph of session `sessionId`
"""
count = self._get_json("/sessions/%s/graph/size" % (quote(sessionId)))
count = self._get_json(f"/sessions/{quote(sessionId)}/graph/size")
logger.debug(
"Successfully read session %s graph size (%d) from %s:%s",
sessionId,
Expand All @@ -223,17 +222,17 @@ class NodeManagerClient(BaseDROPManagerClient):
"""

def __init__(
self, host="localhost", port=constants.NODE_DEFAULT_REST_PORT, timeout=10
self, host="localhost", port=constants.NODE_DEFAULT_REST_PORT, timeout=10
):
super(NodeManagerClient, self).__init__(host=host, port=port, timeout=timeout)

def add_node_subscriptions(self, sessionId, node_subscriptions):
self._post_json(
"/sessions/%s/subscriptions" % (quote(sessionId),), node_subscriptions
f"/sessions/{quote(sessionId)}/subscriptions", node_subscriptions
)

def trigger_drops(self, sessionId, drop_uids):
self._post_json("/sessions/%s/trigger" % (quote(sessionId),), drop_uids)
self._post_json(f"/sessions/{quote(sessionId)}/trigger", drop_uids)

def shutdown_node_manager(self):
self._GET("/shutdown")
Expand All @@ -247,10 +246,10 @@ def nodes(self):
return self._get_json("/nodes")

def add_node(self, node):
self._POST("/nodes/%s" % (node,), content=None)
self._POST(f"/nodes/{node}", content=None)

def remove_node(self, node):
self._DELETE("/nodes/%s" % (node,))
self._DELETE(f"/nodes/{node}")


class DataIslandManagerClient(CompositeManagerClient):
Expand All @@ -259,7 +258,7 @@ class DataIslandManagerClient(CompositeManagerClient):
"""

def __init__(
self, host="localhost", port=constants.ISLAND_DEFAULT_REST_PORT, timeout=10
self, host="localhost", port=constants.ISLAND_DEFAULT_REST_PORT, timeout=10
):
super(DataIslandManagerClient, self).__init__(
host=host, port=port, timeout=timeout
Expand All @@ -272,11 +271,33 @@ class MasterManagerClient(CompositeManagerClient):
"""

def __init__(
self, host="localhost", port=constants.MASTER_DEFAULT_REST_PORT, timeout=10
self, host="localhost", port=constants.MASTER_DEFAULT_REST_PORT, timeout=10
):
super(MasterManagerClient, self).__init__(host=host, port=port, timeout=timeout)

def create_island(self, island_host, nodes):
self._post_json(
"/managers/%s/dataisland" % (quote(island_host)), {"nodes": nodes}
f"/managers/{quote(island_host)}/dataisland", {"nodes": nodes}
)

def dims(self):
return self._get_json("/islands")

def add_dim(self, dim):
self._POST(f"/islands/{dim}", content=None)

def remove_dim(self, dim):
self._DELETE(f"/islands/{dim}")

def add_node_to_dim(self, dim, nm):
"""
Adds a nm to a dim
"""
self._POST(
f"managers/{dim}/nodes/{nm}", content=None, )

def remove_node_from_dim(self, dim, nm):
"""
Removes a nm from a dim
"""
self._DELETE(f"managers/{dim}/nodes/{nm}")
Loading

0 comments on commit f9cfd56

Please sign in to comment.