Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Liu 349 #226

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8138a4d
added a few more properties
awicenec Apr 17, 2023
a1a2ac7
Removed "nm", using "name" instead
awicenec Apr 17, 2023
325d9cb
Fixed environmentvars tests
awicenec Apr 17, 2023
c4b0a58
Fixed pg_gen test, removed comment lines
awicenec Apr 17, 2023
ab34e25
Changed loop_cxt to loop_ctx
awicenec Apr 17, 2023
e77e76e
Changed from "nm" to "name" on the node level
awicenec Apr 17, 2023
98507b0
replaced tw and dw with weight
awicenec Apr 17, 2023
e64bdfb
changed to drop_type and size from dt and sz
awicenec Apr 17, 2023
ad710e0
Changed from node.text to node.name everywhere
awicenec Apr 18, 2023
6768032
Fixed loop context
awicenec Apr 18, 2023
0773f4e
Fixed display of node name in DIM
awicenec Apr 18, 2023
1de826e
adjusting doxygen strings
awicenec Apr 18, 2023
8e109c7
Got node name and port names working
awicenec Apr 19, 2023
8fd50ed
Minor modifications
awicenec Apr 19, 2023
a3897e9
Adjusted tests
awicenec Apr 19, 2023
f7cba58
Fix python version
awicenec Apr 20, 2023
cce875d
More options to support key field names
awicenec Apr 20, 2023
01dc80f
name string for sankey; S3DROP; fix python version for tests
awicenec Apr 20, 2023
97399e7
name string for sankey; S3DROP; fix python version for tests
awicenec Apr 20, 2023
defb95c
Fixed issues with some example graphs
awicenec Apr 21, 2023
2acc47c
Cleanup of function args treatment
awicenec Apr 27, 2023
0b7368d
Fixed issues with named ports resolution
awicenec Apr 28, 2023
fc22834
Fixed test failures
awicenec Apr 28, 2023
6005d0a
added missing dependency.
awicenec May 1, 2023
f890125
Added netifaces to dependencies
awicenec May 1, 2023
0b05efd
Changed sequence for updating arguments
awicenec May 1, 2023
35a58a9
Calling correct prepareUsers function
awicenec May 1, 2023
3cf790c
Merge branch 'liu-348' into liu-349
awicenec May 2, 2023
412496b
Removed stray debug message
awicenec May 2, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions .github/workflows/run-unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,23 @@ name: Run unit tests
on: [push, pull_request]

jobs:

run_tests:
name: Run unit tests with python ${{matrix.python-version}} - ${{ matrix.desc }}
runs-on: ubuntu-20.04
strategy:
matrix:
include:
- python-version: '3.8'
- python-version: "3.8.10"
test_number: 0
engine: no
translator: yes
desc: "no engine"
- python-version: '3.8'
- python-version: "3.8.10"
test_number: 1
desc: "no translator"
engine: yes
translator: no
- python-version: '3.9'
- python-version: "3.9"
test_number: 2
desc: "full package"
engine: yes
Expand Down Expand Up @@ -81,8 +80,8 @@ jobs:
needs: run_tests
runs-on: ubuntu-20.04
steps:
- name: Coveralls Finished
uses: coverallsapp/github-action@master
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
parallel-finished: true
- name: Coveralls Finished
uses: coverallsapp/github-action@master
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
parallel-finished: true
6 changes: 6 additions & 0 deletions daliuge-common/dlg/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class DropType:
class CategoryType:
DATA = "Data"
APPLICATION = "Application"
CONSTRUCT = "Construct"
GROUP = "Group"
UNKNOWN = "Unknown"
SERVICE = "Service"
Expand Down Expand Up @@ -81,7 +82,12 @@ def _addSomething(self, other, key, IdText=None):
if key not in self:
self[key] = []
if other["oid"] not in self[key]:
# TODO: Returning just the other drop OID instead of the named
# port list is not a good solution. Required for the dask
# tests.
append = {other["oid"]: IdText} if IdText else other["oid"]
# if IdText is None:
# raise ValueError
self[key].append(append)

def addConsumer(self, other, IdText=None):
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/apps/app_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ def execute(self, _send_notifications=True):
return
tries += 1
logger.exception(
"Error while executing %r (try %d/%d)",
"Error while executing %r (try %s/%s)",
self,
tries,
self.n_tries,
Expand Down
11 changes: 9 additions & 2 deletions daliuge-engine/dlg/apps/pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ def optionalEval(x):

# if we have named ports use the inputs with
# the correct UIDs
logger.debug(f"Parameters found: {self.parameters}")
# logger.debug(f"Parameters found: {self.parameters}")
posargs = self.arguments.args[: self.fn_npos]
keyargs = self.arguments.args[self.fn_npos :]
kwargs = {}
Expand Down Expand Up @@ -702,8 +702,15 @@ def optionalEval(x):
logger.debug(f"updating funcargs with {kwargs}")
funcargs.update(kwargs)
self._recompute_data["args"] = funcargs.copy()
logger.debug(f"Running {self.func_name} with *{pargs} **{funcargs}")

if (
self.func_name is not None
and self.func_name.split(".")[-1] in ["__init__", "__class__"]
and "self" in funcargs
):
# remove self if this is the initializer.
funcargs.pop("self")
logger.debug(f"Running {self.func_name} with *{pargs} **{funcargs}")
# we capture and log whatever is produced on STDOUT
capture = StringIO()
with redirect_stdout(capture):
Expand Down
180 changes: 96 additions & 84 deletions daliuge-engine/dlg/apps/simple.py

Large diffs are not rendered by default.

6 changes: 2 additions & 4 deletions daliuge-engine/dlg/dask_emulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ def compute(value, **kwargs):
{
"categoryType": "Application",
# "categoryType": CategoryType.APPLICATION,
"Application": "dlg.dask_emulation.ResultTransmitter",
# "Application": "dlg.dask_emulation.ResultTransmitter",
"appclass": "dlg.dask_emulation.ResultTransmitter",
"oid": transmitter_oid,
"uid": transmitter_oid,
"port": port,
"nm": "result transmitter",
"name": "result transmitter",
}
)
Expand Down Expand Up @@ -286,7 +286,6 @@ def make_dropdict(self):
{
"categoryType": "Application",
"appclass": "dlg.dask_emulation._Listifier",
"nm": "listifier",
"name": "listifier",
}
)
Expand Down Expand Up @@ -322,7 +321,6 @@ def make_dropdict(self):
if self.fname is not None:
simple_fname = self.fname.split(".")[-1]
my_dropdict["func_name"] = self.fname
my_dropdict["nm"] = simple_fname
my_dropdict["name"] = simple_fname
if self.fcode is not None:
my_dropdict["func_code"] = utils.b2s(base64.b64encode(self.fcode))
Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/dlg/data/drops/data_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from typing import Union

from dlg.ddap_protocol import DROPStates

from dlg.drop import AbstractDROP, track_current_drop
from dlg.data.io import (
DataIO,
Expand Down
6 changes: 6 additions & 0 deletions daliuge-engine/dlg/data/drops/parset_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
from dlg.data.io import MemoryIO
from dlg.meta import dlg_string_param

from logging import Logger

logger = Logger("__main__")


##
# @brief ParameterSet
Expand All @@ -40,6 +44,7 @@
# @param config_data ConfigData/""/String/ComponentParameter/readwrite//False/False/Additional configuration information to be mixed in with the initial data
# @param streaming Streaming/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component streams input and output data
# @param persist Persist/False/Boolean/ComponentParameter/readwrite//False/False/Specifies whether this data component contains data that should not be deleted after execution
# @param dataclass dataclass//dlg.data.drops.parset_drop.ParameterSetDROP//readonly//False/False/default class for this DROP
# @param Config ConfigFile//Object.File/OutputPort/readwrite//False/False/The output configuration file
# @par EAGLE_END
class ParameterSetDROP(DataDROP):
Expand Down Expand Up @@ -85,6 +90,7 @@ def initialize(self, **kwargs):
self.config_data = self.serialize_parameters(
self.filter_parameters(self.parameters, self.mode), self.mode
).encode("utf-8")
logger.debug(">>>> config_data: %s", self.config_data)

def getIO(self):
return MemoryIO(io.BytesIO(self.config_data))
Expand Down
1 change: 0 additions & 1 deletion daliuge-engine/dlg/data/drops/s3_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ def __init__(
expectedSize=-1,
**kwargs,
):

super().__init__(**kwargs)

logger.debug(
Expand Down
60 changes: 42 additions & 18 deletions daliuge-engine/dlg/deploy/create_dlg_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,13 @@
import time
import os

from dlg.deploy.configs import ConfigFactory # get all available configurations
from dlg.deploy.deployment_constants import DEFAULT_AWS_MON_PORT, DEFAULT_AWS_MON_HOST
from dlg.deploy.configs import (
ConfigFactory,
) # get all available configurations
from dlg.deploy.deployment_constants import (
DEFAULT_AWS_MON_PORT,
DEFAULT_AWS_MON_HOST,
)
from dlg.deploy.slurm_client import SlurmClient

FACILITIES = ConfigFactory.available()
Expand All @@ -50,7 +55,10 @@ def get_timestamp(line):
date_time = "{0}T{1}".format(split[0], split[1])
pattern = "%Y-%m-%dT%H:%M:%S,%f"
epoch = time.mktime(time.strptime(date_time, pattern))
return datetime.datetime.strptime(date_time, pattern).microsecond / 1e6 + epoch
return (
datetime.datetime.strptime(date_time, pattern).microsecond / 1e6
+ epoch
)


class LogEntryPair:
Expand Down Expand Up @@ -137,7 +145,8 @@ def build_nm_log_entry_pairs():
def construct_catchall_pattern(node_type):
pattern_strs = LogParser.kwords.get(node_type)
patterns = [
x.format(".*").replace("(", r"\(").replace(")", r"\)") for x in pattern_strs
x.format(".*").replace("(", r"\(").replace(")", r"\)")
for x in pattern_strs
]
catchall = "|".join(["(%s)" % (s,) for s in patterns])
catchall = ".*(%s).*" % (catchall,)
Expand Down Expand Up @@ -205,15 +214,20 @@ class LogParser:

kwords = dict()
kwords["dim"] = dim_kl
kwords["nm"] = nm_kl
kwords["name"] = nm_kl

def __init__(self, log_dir):
self._dim_log_f = None
if not self.check_log_dir(log_dir):
raise Exception("No DIM log found at: {0}".format(log_dir))
self._log_dir = log_dir
self._dim_catchall_pattern = construct_catchall_pattern(node_type="dim")
self._nm_catchall_pattern = construct_catchall_pattern(node_type="nm")
self._dim_catchall_pattern = construct_catchall_pattern(
node_type="dim"
)
# self._nm_catchall_pattern = construct_catchall_pattern(node_type="nm")
self._nm_catchall_pattern = construct_catchall_pattern(
node_type="name"
)

def parse(self, out_csv=None):
"""
Expand Down Expand Up @@ -271,11 +285,14 @@ def parse(self, out_csv=None):

num_dims = 0
for log_directory_file_name in os.listdir(self._log_dir):

# Check this is a dir and contains the NM log
if not os.path.isdir(os.path.join(self._log_dir, log_directory_file_name)):
if not os.path.isdir(
os.path.join(self._log_dir, log_directory_file_name)
):
continue
nm_logf = os.path.join(self._log_dir, log_directory_file_name, "dlgNM.log")
nm_logf = os.path.join(
self._log_dir, log_directory_file_name, "dlgNM.log"
)
nm_dim_logf = os.path.join(
self._log_dir, log_directory_file_name, "dlgDIM.log"
)
Expand Down Expand Up @@ -303,7 +320,6 @@ def parse(self, out_csv=None):

# Looking for the deployment times and counting for finished sessions
for lep in nm_log_pairs:

# Consider only valid durations
dur = lep.get_duration()
if dur is None:
Expand Down Expand Up @@ -339,7 +355,6 @@ def parse(self, out_csv=None):
# effect
max_exec_time = 0
for log_entry_pairs in nm_logs:

indexed_leps = {lep.name: lep for lep in log_entry_pairs}
deploy_time = indexed_leps["node_deploy_time"].get_duration()
if deploy_time is None: # since some node managers failed to start
Expand All @@ -366,7 +381,9 @@ def parse(self, out_csv=None):
git_commit,
]
ret = [str(x) for x in ret]
num_dims = num_dims if num_dims == 1 else num_dims - 1 # exclude master manager
num_dims = (
num_dims if num_dims == 1 else num_dims - 1
) # exclude master manager
add_line = ",".join(ret + temp_dim + temp_nm + [str(int(num_dims))])
if out_csv is not None:
with open(out_csv, "a") as out_file:
Expand All @@ -384,7 +401,9 @@ def check_log_dir(self, log_dir):
if os.path.exists(dim_log_f):
self._dim_log_f = [dim_log_f]
if dim_log_f == possible_logs[0]:
cluster_log = os.path.join(log_dir, "0", "start_dlg_cluster.log")
cluster_log = os.path.join(
log_dir, "0", "start_dlg_cluster.log"
)
if os.path.exists(cluster_log):
self._dim_log_f.append(cluster_log)
return True
Expand Down Expand Up @@ -585,7 +604,9 @@ def main():
if not (opts.action and opts.facility) and not opts.configs:
parser.error("Missing required parameters!")
if opts.facility not in FACILITIES:
parser.error(f"Unknown facility provided. Please choose from {FACILITIES}")
parser.error(
f"Unknown facility provided. Please choose from {FACILITIES}"
)

if opts.action == 2:
if opts.log_dir is None:
Expand All @@ -609,20 +630,23 @@ def main():
log_parser = LogParser(log_dir)
log_parser.parse(out_csv=opts.csv_output)
except Exception as exp:
print("Fail to parse {0}: {1}".format(log_dir, exp))
print(
"Fail to parse {0}: {1}".format(log_dir, exp)
)
else:
log_parser = LogParser(opts.log_dir)
log_parser.parse(out_csv=opts.csv_output)
elif opts.action == 1:

if opts.logical_graph and opts.physical_graph:
parser.error(
"Either a logical graph or physical graph filename must be specified"
)
for path_to_graph_file in (opts.logical_graph, opts.physical_graph):
if path_to_graph_file and not os.path.exists(path_to_graph_file):
parser.error(
"Cannot locate graph file at '{0}'".format(path_to_graph_file)
"Cannot locate graph file at '{0}'".format(
path_to_graph_file
)
)

client = SlurmClient(
Expand Down
Loading