Skip to content

Commit

Permalink
Merge branch 'master' into liu-304
Browse files Browse the repository at this point in the history
  • Loading branch information
awicenec committed Nov 3, 2022
2 parents 47e79db + 74b83f6 commit 14d65fb
Show file tree
Hide file tree
Showing 81 changed files with 1,385 additions and 1,757 deletions.
32 changes: 26 additions & 6 deletions OpenAPI/tests/test.graph
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
"height": 72,
"collapsed": true,
"flipPorts": false,
"streaming": false,
"precious": false,
"subject": null,
"expanded": false,
"readonly": true,
Expand Down Expand Up @@ -235,8 +233,6 @@
"height": 72,
"collapsed": true,
"flipPorts": false,
"streaming": false,
"precious": false,
"subject": null,
"expanded": false,
"readonly": true,
Expand Down Expand Up @@ -304,6 +300,19 @@
"readonly": false,
"type": "Unknown",
"precious": false
},
{
"defaultValue": true,
"description": "Specifies whether this data component contains data that should not be deleted after execution",
"keyAttribute": false,
"name": "persist",
"options": [],
"positional": false,
"precious": false,
"readonly": false,
"text": "Persist",
"type": "Boolean",
"value": false
}
],
"applicationArgs": [],
Expand Down Expand Up @@ -337,8 +346,6 @@
"height": 72,
"collapsed": true,
"flipPorts": false,
"streaming": false,
"precious": false,
"subject": null,
"expanded": false,
"readonly": true,
Expand Down Expand Up @@ -386,6 +393,19 @@
"readonly": false,
"type": "Unknown",
"precious": false
},
{
"defaultValue": true,
"description": "Specifies whether this data component contains data that should not be deleted after execution",
"keyAttribute": false,
"name": "persist",
"options": [],
"positional": false,
"precious": false,
"readonly": false,
"text": "Persist",
"type": "Boolean",
"value": false
}
],
"applicationArgs": [],
Expand Down
6 changes: 6 additions & 0 deletions daliuge-common/dlg/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ def shutdown_node_manager(self):
def get_log_file(self, sessionId):
return self._request(f"/sessions/{sessionId}/logs", "GET")

def get_submission_method(self):
return self._get_json("/submission_method")


class CompositeManagerClient(BaseDROPManagerClient):
def nodes(self):
Expand All @@ -252,6 +255,9 @@ def add_node(self, node):
def remove_node(self, node):
self._DELETE(f"/node/{node}")

def get_submission_method(self):
return self._get_json("/submission_method")


class DataIslandManagerClient(CompositeManagerClient):
"""
Expand Down
30 changes: 30 additions & 0 deletions daliuge-common/dlg/common/deployment_methods.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#
# ICRAR - International Centre for Radio Astronomy Research
# (c) UWA - The University of Western Australia, 2015
# Copyright by UWA (in the framework of the ICRAR)
# All rights reserved
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#

from enum import Enum


class DeploymentMethods(str, Enum):
SERVER = "SERVER",
BROWSER = "BROWSER",
HELM = "HELM",
OOD = "OOD"
40 changes: 40 additions & 0 deletions daliuge-common/dlg/common/k8s_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#
# ICRAR - International Centre for Radio Astronomy Research
# (c) UWA - The University of Western Australia, 2016
# Copyright by UWA (in the framework of the ICRAR)
# All rights reserved
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#


import re
import subprocess


def check_k8s_env():
"""
Makes sure kubectl can be called and is accessible.
"""
try:
output = subprocess.run(
["kubectl version"], capture_output=True, shell=True
).stdout
output = output.decode(encoding="utf-8").replace("\n", "")
pattern = re.compile(r"^Client Version:.*Server Version:.*")
return re.match(pattern, output)
except subprocess.SubprocessError:
return False
4 changes: 4 additions & 0 deletions daliuge-common/dlg/common/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ def check_port(host, port, timeout=0, checking_open=True, return_socket=False):
)
return not checking_open

except socket.gaierror:
logger.debug("Endpoint service %s:%d not known", host, port)
return not checking_open

except socket.error as e:

# If the connection becomes suddenly closed from the server-side.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,12 @@ def lgt_block_fields(rmode: ReproducibilityFlags):
"outputPorts": FieldOps.COUNT,
"inputLocalPorts": FieldOps.COUNT,
"outputLocalPorts": FieldOps.COUNT, # MKN Nodes
"streaming": FieldOps.STORE,
}
if rmode == ReproducibilityFlags.REPRODUCE:
del data["inputPorts"]
del data["outputPorts"]
del data["inputLocalPorts"]
del data["outputLocalPorts"]
del data["streaming"]
return data


Expand Down
2 changes: 2 additions & 0 deletions daliuge-engine/dlg/data/drops/dataDummy_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
# @param dataclass Application Class//String/ComponentParameter/readonly//False/False/Data class
# @param data_volume Data volume/5/Float/ComponentParameter/readwrite//False/False/Estimated size of the data contained in this node
# @param group_end Group end/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the end of a group?
# @param streaming Streaming/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component streams input and output data
# @param persist Persist/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component contains data that should not be deleted after execution
# @param dummy dummy//Object/InputPort/readwrite//False/False/Dummy input port
# @param dummy dummy//Object/OutputPort/readwrite//False/False/Dummy output port
# @par EAGLE_END
Expand Down
2 changes: 2 additions & 0 deletions daliuge-engine/dlg/data/drops/environmentvar_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ def _filter_parameters(parameters: dict):
# @par EAGLE_START
# @param category EnvironmentVariables
# @param tag daliuge
# @param streaming Streaming/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component streams input and output data
# @param persist Persist/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component contains data that should not be deleted after execution
# @param dummy dummy//Object/OutputPort/readwrite//False/False/Dummy output port
# @par EAGLE_END
class EnvironmentVarDROP(AbstractDROP, KeyValueDROP):
Expand Down
13 changes: 8 additions & 5 deletions daliuge-engine/dlg/data/drops/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@
# @param tag daliuge
# @param data_volume Data volume/5/Float/ComponentParameter/readwrite//False/False/Estimated size of the data contained in this node
# @param group_end Group end/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the end of a group?
# @param check_filepath_exists Check file path exists/True/Boolean/ComponentParameter/readwrite//False/False/Perform a check to make sure the file path exists before proceeding with the application
# @param delete_parent_directory Delete parent directory/False/Boolean/ComponentParameter/readwrite//False/False/Also delete the parent directory of this file when deleting the file itself
# @param check_filepath_exists Check file path exists/False/Boolean/ComponentParameter/readwrite//False/False/Perform a check to make sure the file path exists before proceeding with the application
# @param filepath File Path//String/ComponentParameter/readwrite//False/False/Path to the file for this node
# @param dirname Directory name//String/ComponentParameter/readwrite//False/False/Path to the file for this node
# @param streaming Streaming/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component streams input and output data
# @param persist Persist/True/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component contains data that should not be deleted after execution
# @param dummy dummy//Object/InputPort/readwrite//False/False/Dummy input port
# @param dummy dummy//Object/OutputPort/readwrite//False/False/Dummy output port
# @par EAGLE_END
Expand Down Expand Up @@ -89,11 +92,11 @@ class FileDROP(DataDROP, PathBasedDrop):
check_filepath_exists = dlg_bool_param("check_filepath_exists", False)

# Make sure files are not deleted by default and certainly not if they are
# marked as precious no matter what expireAfterUse said
# marked to be persisted no matter what expireAfterUse said
def __init__(self, *args, **kwargs):
if "precious" not in kwargs:
kwargs["precious"] = True
if kwargs["precious"] and "lifespan" not in kwargs:
if "persist" not in kwargs:
kwargs["persist"] = True
if kwargs["persist"] and "lifespan" not in kwargs:
kwargs["expireAfterUse"] = False
super().__init__(*args, **kwargs)

Expand Down
10 changes: 7 additions & 3 deletions daliuge-engine/dlg/data/drops/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
# @param tag daliuge
# @param data_volume Data volume/5/Float/ComponentParameter/readwrite//False/False/Estimated size of the data contained in this node
# @param group_end Group end/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the end of a group?
# @param streaming Streaming/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component streams input and output data
# @param persist Persist/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component contains data that should not be deleted after execution
# @param dummy dummy//Object/InputPort/readwrite//False/False/Dummy input port
# @param dummy dummy//Object/OutputPort/readwrite//False/False/Dummy output port
# @par EAGLE_END
Expand All @@ -49,11 +51,11 @@ class InMemoryDROP(DataDROP):

# Allow in-memory drops to be automatically removed by default
def __init__(self, *args, **kwargs):
if "precious" not in kwargs:
kwargs["precious"] = False
if "persist" not in kwargs:
kwargs["persist"] = False
if "expireAfterUse" not in kwargs:
kwargs["expireAfterUse"] = True
if kwargs["precious"]:
if kwargs["persist"]:
kwargs["expireAfterUse"] = False
super().__init__(*args, **kwargs)

Expand Down Expand Up @@ -101,6 +103,8 @@ def generate_reproduce_data(self):
# @param tag daliuge
# @param data_volume Data volume/5/Float/ComponentParameter/readwrite//False/False/Estimated size of the data contained in this node
# @param group_end Group end/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the end of a group?
# @param streaming Streaming/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component streams input and output data
# @param persist Persist/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component contains data that should not be deleted after execution
# @param dummy dummy//Object/InputPort/readwrite//False/False/Dummy input port
# @param dummy dummy//Object/OutputPort/readwrite//False/False/Dummy output port
# @par EAGLE_END
Expand Down
2 changes: 2 additions & 0 deletions daliuge-engine/dlg/data/drops/ngas.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
# @param ngasConnectTimeout Connection timeout/2/Integer/ComponentParameter/readwrite//False/False/Timeout for connecting to the NGAS server
# @param ngasMime NGAS mime-type/"text/ascii"/String/ComponentParameter/readwrite//False/False/Mime-type to be used for archiving
# @param ngasTimeout NGAS timeout/2/Integer/ComponentParameter/readwrite//False/False/Timeout for receiving responses for NGAS
# @param streaming Streaming/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component streams input and output data
# @param persist Persist/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component contains data that should not be deleted after execution
# @param dummy dummy//Object/InputPort/readwrite//False/False/Dummy input port
# @param dummy dummy//Object/OutputPort/readwrite//False/False/Dummy output port
# @par EAGLE_END
Expand Down
2 changes: 2 additions & 0 deletions daliuge-engine/dlg/data/drops/parset_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
# @param group_end Group end/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the end of a group?
# @param mode Parset mode/"YANDA"/String/ComponentParameter/readonly//False/False/To what standard DALiuGE should filter and serialize the parameters.
# @param config_data ConfigData/""/String/ComponentParameter/readwrite//False/False/Additional configuration information to be mixed in with the initial data
# @param streaming Streaming/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component streams input and output data
# @param persist Persist/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component contains data that should not be deleted after execution
# @param Config ConfigFile//Object.File/OutputPort/readwrite//False/False/The output configuration file
# @par EAGLE_END
class ParameterSetDROP(DataDROP):
Expand Down
2 changes: 2 additions & 0 deletions daliuge-engine/dlg/data/drops/s3_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
# @param object_name Object Name//String/ComponentParameter/readwrite//False/False/The S3 object key
# @param profile_name Profile Name//String/ComponentParameter/readwrite//False/False/The S3 profile name
# @param endpoint_url Endpoint URL//String/ComponentParameter/readwrite//False/False/The URL exposing the S3 REST API
# @param streaming Streaming/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component streams input and output data
# @param persist Persist/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component contains data that should not be deleted after execution
# @param dummy dummy//Object/InputPort/readwrite//False/False/Dummy input port
# @param dummy dummy//Object/OutputPort/readwrite//False/False/Dummy output port
# @par EAGLE_END
Expand Down
14 changes: 0 additions & 14 deletions daliuge-engine/dlg/deploy/deployment_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,20 +113,6 @@ def list_as_string(s):
return _parse_list_tokens(iter(_list_tokenizer(s)))


def check_k8s_env():
"""
Makes sure kubectl can be called and is accessible.
"""
try:
output = subprocess.run(
["kubectl version"], capture_output=True, shell=True
).stdout
pattern = re.compile(r"^Client Version:.*\nServer Version:.*")
return re.match(pattern, output.decode(encoding="utf-8"))
except subprocess.SubprocessError:
return False


def find_numislands(physical_graph_template_file):
"""
Given the physical graph data extract the graph name and the total number of
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/deploy/helm_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@
find_service_ips,
find_pod_ips,
wait_for_pods,
check_k8s_env,
)
from dlg.dropmake import pg_generator
from dlg.restutils import RestClient
from dlg.common.k8s_utils import check_k8s_env

logger = logging.getLogger(__name__)

Expand Down
16 changes: 9 additions & 7 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,10 @@ def __init__(self, oid, uid, **kwargs):
if "expectedSize" in kwargs and kwargs["expectedSize"]:
self._expectedSize = int(kwargs.pop("expectedSize"))

# No DROP is precious unless stated otherwise; used for replication
self._precious = self._popArg(kwargs, "precious", False)
# If DROP is precious, don't expire (delete) it.
if self._precious:
# No DROP should be persisted unless stated otherwise; used for replication
self._persist: bool = self._popArg(kwargs, "persist", False)
# If DROP should be persisted, don't expire (delete) it.
if self._persist:
self._expireAfterUse = False

# Useful to have access to all EAGLE parameters without a prior knowledge
Expand Down Expand Up @@ -798,11 +798,11 @@ def size(self, size):
self._size = size

@property
def precious(self):
def persist(self):
"""
Whether this DROP should be considered as 'precious' or not
Whether this DROP should be considered persisted after completion
"""
return self._precious
return self._persist

@property
def status(self):
Expand Down Expand Up @@ -1199,6 +1199,8 @@ def path(self) -> str:
# @param dataclass Data Class/my.awesome.data.Component/String/ComponentParameter/readonly//False/False/The python class that implements this data component
# @param data_volume Data volume/5/Float/ComponentParameter/readwrite//False/False/Estimated size of the data contained in this node
# @param group_end Group end/False/Boolean/ComponentParameter/readwrite//False/False/Is this node the end of a group?
# @param streaming Streaming/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component streams input and output data
# @param persist Persist/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component contains data that should not be deleted after execution
# @param dummy dummy//Object/InputPort/readwrite//False/False/Dummy input port
# @param dummy dummy//Object/OutputPort/readwrite//False/False/Dummy output port
# @par EAGLE_END
Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/droputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def copyDropContents(source: DataDROP, target: DataDROP, bufsize=65536):
return


def getUpstreamObjects(drop):
def getUpstreamObjects(drop: AbstractDROP):
"""
Returns a list of all direct "upstream" DROPs for the given+
DROP. An DROP A is "upstream" with respect to DROP B if
Expand All @@ -179,7 +179,7 @@ def getUpstreamObjects(drop):
In practice if A is an upstream DROP of B means that it must be moved
to the COMPLETED state before B can do so.
"""
upObjs = []
upObjs: list[AbstractDROP] = []
if isinstance(drop, AppDROP):
upObjs += drop.inputs
upObjs += drop.streamingInputs
Expand Down

0 comments on commit 14d65fb

Please sign in to comment.