Skip to content

Commit

Permalink
Merge b4340ab into db91b75
Browse files Browse the repository at this point in the history
  • Loading branch information
myxie committed Jun 13, 2024
2 parents db91b75 + b4340ab commit 9a9468b
Show file tree
Hide file tree
Showing 33 changed files with 86 additions and 124 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/linting.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ jobs:
- name: Run pylint
run: |
pylint daliuge-common daliuge-translator daliuge-engine
pylint daliuge-common daliuge-translator daliuge-engine --fail-under=9 --fail-on=E
3 changes: 2 additions & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
[Main]
disable=all
disable=C, R, W, no-name-in-module, c-extension-no-member, no-member, import-error, unsupported-membership-test
enable=logging-not-lazy,logging-format-interpolation

2 changes: 1 addition & 1 deletion daliuge-common/dlg/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def __init__(self, init_dict=None):
self.update(init_dict)
if "oid" not in self:
self.update({"oid": None})
return super().__init_subclass__()
super().__init_subclass__()

def _addSomething(self, other, key, name=None):
if key not in self:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def accumulate_pgt_unroll_drop_data(drop: dict):
}
if drop["reprodata"].get("rmode") is None:
level = REPRO_DEFAULT
drop["reprodata"]["rmode"] = str(level.level)
drop["reprodata"]["rmode"] = str(level)
drop["reprodata"][level.name] = {}
else:
level = rflag_caster(drop["reprodata"]["rmode"])
Expand Down
2 changes: 1 addition & 1 deletion daliuge-common/dlg/restutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def __exit__(self, typ, value, traceback):

def _get_json(self, url):
ret = self._GET(url)
return json.load(ret) if ret else None
return json.load(ret) if ret else {}

def _post_form(self, url, content=None):
if content is not None:
Expand Down
14 changes: 7 additions & 7 deletions daliuge-engine/dlg/apps/app_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@ def _generateNamedPorts(self, ports):
named_ports: OrderedDict[str, DataDROP] = OrderedDict()
port_dict = self.__getattribute__(f"_{ports}")
if (
ports in self.parameters
and len(self.parameters[ports]) > 0
and isinstance(self.parameters[ports][0], dict)
ports in self.parameters
and len(self.parameters[ports]) > 0
and isinstance(self.parameters[ports][0], dict)
):
for i in range(len(port_dict)):
key = list(self.parameters[ports][i].values())[0]
Expand All @@ -217,9 +217,9 @@ def _generateNamedPorts(self, ports):
else:
named_ports[key] = [named_ports[key], value]
elif (
ports in self.parameters
and len(self.parameters[ports]) > 0
and isinstance(self.parameters[ports], list)
ports in self.parameters
and len(self.parameters[ports]) > 0
and isinstance(self.parameters[ports], list)
):
# This enablkes the gather to work
return {}
Expand Down Expand Up @@ -399,7 +399,7 @@ def dropCompleted(self, uid, drop_state):
# More effective inputs than inputs, this is a horror
if n_eff_inputs > n_inputs:
raise Exception(
"%r: More effective inputs (%d) than inputs (%d)"
"%r: More effective inputs (%s) than inputs (%d)"
% (self, self.n_effective_inputs, n_inputs)
)

Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/data/drops/ngas.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def setCompleted(self):

@property
def dataURL(self) -> str:
return "ngas://%s:%d/%s" % (self.ngasSrv, self.ngasPort, self.fileId)
return "ngas://%s:%s/%s" % (self.ngasSrv, self.ngasPort, self.fileId)

# Override
def generate_reproduce_data(self):
Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/data/drops/s3_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def getIO(self) -> DataIO:
# self.mapped_inputs = identify_named_ports(
# self._producers, {}, self.keyargs, mode="inputs"
# )
logger.debug("Parameters found: {}", self.parameters)
logger.debug("Parameters found: %s", self.parameters)
return S3IO(
self.aws_access_key_id,
self.aws_secret_access_key,
Expand Down Expand Up @@ -369,7 +369,7 @@ def _exists(self) -> Tuple[bool, bool]:
logger.info("Object: %s does not exist", self._key)
return True, False
else:
raise ErrorIO()
raise RuntimeError("Error occured in Client: %s", e.response)

@overrides
def exists(self) -> bool:
Expand Down
6 changes: 4 additions & 2 deletions daliuge-engine/dlg/data/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ def buffer(self) -> memoryview:
return self._buf.getbuffer()


# pylint: disable=possibly-used-before-assignment
class SharedMemoryIO(DataIO):
"""
A DataIO class that writes to a shared memory buffer
Expand All @@ -321,10 +322,10 @@ def _write(self, data, **kwargs) -> int:
total_size = len(data) + self._written
if total_size > self._buf.size:
self._buf.resize(total_size)
self._buf.buf[self._written : total_size] = data
self._buf.buf[self._written: total_size] = data
self._written = total_size
else:
self._buf.buf[self._written : total_size] = data
self._buf.buf[self._written: total_size] = data
self._written = total_size
self._buf.resize(total_size)
# It may be inefficient to resize many times, but assuming data is written 'once' this is
Expand Down Expand Up @@ -361,6 +362,7 @@ def exists(self) -> bool:
def delete(self):
self._close()

# pylint: enable=possibly-used-before-assignment

class FileIO(DataIO):
"""
Expand Down
2 changes: 2 additions & 0 deletions daliuge-engine/dlg/deploy/configs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
# MA 02111-1307 USA
#
import os, string
from abc import abstractmethod

__sub_tpl_str = """#!/bin/bash --login
Expand Down Expand Up @@ -49,6 +50,7 @@ def __init__(self):
self.setpar("modules", l[2].strip())
self.setpar("venv", l[3].strip())

@abstractmethod
def init_list(self):
pass

Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/deploy/create_dlg_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ def main():
all_nics=opts.all_nics,
check_with_session=opts.check_with_session,
logical_graph=opts.logical_graph,
physical_graph=opts.physical_graph,
physical_graph_template_data=opts.physical_graph,
submit=opts.submit in ["True", "true"],
)
client._visualise_graph = opts.visualise_graph
Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/dlg/deploy/deployment_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def finish_element(sub_values, range_start):
finish_element(sub_values, range_start)
return values
if token == ListTokens.MULTICASE_START:
prefix = ""
if values:
prefix = values.pop()
sub_values = _parse_list_tokens(token_iter)
Expand Down
8 changes: 4 additions & 4 deletions daliuge-engine/dlg/deploy/helm_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,8 @@ def submit_and_monitor_pgt(self):
"""
Combines submission and monitoring steps of a pgt.
"""
session_id = self.submit_pgt()
monitoring_thread = self._monitor(session_id)
self.submit_pgt()
monitoring_thread = self._monitor()
monitoring_thread.join()

def submit_pg(self):
Expand All @@ -408,6 +408,6 @@ def submit_and_monitor_pg(self):
"""
Combines submission and monitoring steps of a pg.
"""
session_id = self.submit_pg()
monitoring_thread = self._monitor(session_id)
self.submit_pg()
monitoring_thread = self._monitor()
monitoring_thread.join()
8 changes: 3 additions & 5 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ def get_param_value(attr_name, default_value):
has_app_param = (
"applicationArgs" in kwargs and attr_name in kwargs["applicationArgs"]
)

param = default_value
if has_component_param and has_app_param:
logger.warning(
f"Drop has both component and app param {attr_name}. Using component param."
Expand All @@ -428,8 +428,6 @@ def get_param_value(attr_name, default_value):
pass
else:
param = kwargs["applicationArgs"].get(attr_name).value
else:
param = default_value
return param

# Take a class dlg defined parameter class attribute and create an instanced attribute on object
Expand Down Expand Up @@ -496,8 +494,8 @@ def __hash__(self):
def __repr__(self):
return "<%s oid=%s, uid=%s>" % (
self.__class__.__name__,
self.oid,
self.uid,
"self.oid",
"self.uid",
)

def initialize(self, **kwargs):
Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/droputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from dlg import common
from dlg.apps.app_base import AppDROP

from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, List

if TYPE_CHECKING:
from dlg.drop import AbstractDROP
Expand Down Expand Up @@ -190,7 +190,7 @@ def getUpstreamObjects(drop: "AbstractDROP"):
In practice if A is an upstream DROP of B means that it must be moved
to the COMPLETED state before B can do so.
"""
upObjs: list[AbstractDROP] = []
upObjs: List[AbstractDROP] = []
if isinstance(drop, AppDROP):
upObjs += drop.inputs
upObjs += drop.streamingInputs
Expand Down
8 changes: 4 additions & 4 deletions daliuge-engine/dlg/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from collections import defaultdict
import logging
from abc import ABC, abstractmethod
from typing import Optional, Union
from typing import Optional, Union, List, DefaultDict

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -69,8 +69,8 @@ class EventFirer(object):

def __init__(self):
# Union string key with object to handle __ALL_EVENTS above
self._listeners: defaultdict[
Union[str, object], list[EventHandler]
self._listeners: DefaultDict[
Union[str, object], List[EventHandler]
] = defaultdict(list)

def subscribe(
Expand Down Expand Up @@ -116,7 +116,7 @@ def _fireEvent(self, eventType: str, **attrs):
"""

# Which listeners should we call?
listeners: list[EventHandler] = []
listeners: List[EventHandler] = []
if eventType in self._listeners:
listeners += self._listeners[eventType]
if EventFirer.__ALL_EVENTS in self._listeners:
Expand Down
11 changes: 5 additions & 6 deletions daliuge-engine/dlg/graph_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import importlib
import logging

from typing import List
from dlg.common.reproducibility.constants import ReproducibilityFlags

from . import droputils
Expand Down Expand Up @@ -308,7 +309,7 @@ def createGraphFromDropSpecList(dropSpecList, session=None):

# We're done! Return the roots of the graph to the caller
logger.info("Calculating graph roots")
roots: list[AbstractDROP] = []
roots: List[AbstractDROP] = []
for drop in drops.values():
if not droputils.getUpstreamObjects(drop):
roots.append(drop)
Expand Down Expand Up @@ -398,11 +399,9 @@ def _createSocket(dropSpec, dryRun=False, session_id=None):
def _createApp(dropSpec, dryRun=False, session_id=None):
oid, uid = _getIds(dropSpec)
kwargs = _getKwargs(dropSpec)

if "dropclass" in dropSpec:
appName = dropSpec["dropclass"]
elif "Application" in dropSpec:
appName = dropSpec["Application"]
appName = dropSpec.get("dropclass", "")
if not appName:
dropSpec.get("Application", "")
parts = appName.split(".")

# Support old "dfms..." package names (pre-Oct2017)
Expand Down
3 changes: 2 additions & 1 deletion daliuge-engine/dlg/lifecycle/dlm.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
import threading
import time

from typing import Dict
from . import registry
from .hsm import manager
from .hsm.store import AbstractStore
Expand Down Expand Up @@ -226,7 +227,7 @@ def __init__(
# instead of _drops.itervalues() to get a full, thread-safe copy of the
# dictionary values. Maybe there's a better approach for thread-safety
# here
self._drops: dict[str, AbstractDROP] = {}
self._drops: Dict[str, AbstractDROP] = {}

self._check_period = check_period
self._cleanup_period = cleanup_period
Expand Down
4 changes: 3 additions & 1 deletion daliuge-engine/dlg/manager/node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import collections
import copy
import logging
from typing import Optional

from psutil import cpu_count
import os
import queue
Expand Down Expand Up @@ -124,7 +126,7 @@ def close(self):
class NodeManagerThreadDropRunner(NodeManagerDropRunner):
def __init__(self, max_workers: int):
self._max_workers = max_workers
self._thread_pool: ThreadPoolExecutor | None = None
self._thread_pool: Optional[ThreadPoolExecutor] = None

def start(self, _rpc_endpoint):
logger.info("Initializing thread pool with %d workers", self._max_workers)
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/named_port_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def serialize_applicationArgs(applicationArgs, prefix="--", separator=" "):
the final command line.
"""
applicationArgs = clean_applicationArgs(
applicationArgs, prefix=prefix, separator=separator
applicationArgs
)
pargs = []
kwargs = {}
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/test/apps/test_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ def test_multi_listappendthrashing(self, size=1000, parallel=True):
max_threads = cpu_count(logical=False)
drop_ids = [chr(97 + x) for x in range(max_threads)]
threadpool = ThreadPool(processes=max_threads)
memory_manager = DlgSharedMemoryManager()
memory_manager = DlgSharedMemoryManager() # pylint: disable=possibly-used-before-assignment
session_id = 1
memory_manager.register_session(session_id)
S = InMemoryDROP("S", "S")
Expand Down
2 changes: 2 additions & 0 deletions daliuge-engine/test/deploy/test_helm_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
"""
Module tests the helm chart translation and deployment functionality.
"""
# pylint: disable=possibly-used-before-assignment

import json
import os
import sys
Expand Down
2 changes: 2 additions & 0 deletions daliuge-engine/test/integrate/example_split.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
To run it standalone, change the directories, which are now hardcoded
"""

# pylint: skip-file

import datetime
import os
import time
Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/test/manager/test_smm.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def test_register_session(self):
"""
SMM should successfully register a session with no drops
"""
manager = DlgSharedMemoryManager()
manager = DlgSharedMemoryManager() # pylint: disable=possibly-used-before-assignment
manager.register_session("session1")
self.assertTrue(len(manager.drop_names), 1)
manager.shutdown_all()
Expand Down Expand Up @@ -90,7 +90,7 @@ def test_shutdown_all(self):
"""
SMM should be able to remove all sessions and drop references when shutdown
"""
manager = DlgSharedMemoryManager()
manager = DlgSharedMemoryManager() # pylint: disable=possibly-used-before-assignment
manager.register_session("session1")
manager.register_session("session2")
self.assertEqual(len(manager.drop_names.keys()), 2)
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/test/memoryUsage.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def measure(n, droptype):
mem1 = p.memory_info()[0]
uTime1, sTime1 = p.cpu_times()
drops = []
for i in xrange(n):
for i in range(n):
uid = str(i)
drops.append(droptype(uid, uid))
mem2 = p.memory_info()[0]
Expand Down
1 change: 1 addition & 0 deletions daliuge-engine/test/test_S3Drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#
from unittest.case import skipIf

# pylint: disable=possibly-used-before-assignment
"""
Test the S3 Drop
"""
Expand Down
Loading

0 comments on commit 9a9468b

Please sign in to comment.