Skip to content

Commit

Permalink
component cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
calgray committed Dec 6, 2021
1 parent d5fc1e6 commit 55ad472
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 24 deletions.
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/apps/crc.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def run(self):
outputDrop.write(str(crc).encode("utf8"))


#
##
# @brief CRCStreamApp
# @details Calculate CRC in the streaming mode
# i.e. A "streamingConsumer" of its predecessor in the graph
Expand Down
8 changes: 4 additions & 4 deletions daliuge-engine/dlg/apps/scp.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#
from ..remote import copyTo, copyFrom
from ..drop import (
from dlg.remote import copyTo, copyFrom
from dlg.drop import (
BarrierAppDROP,
NgasDROP,
InMemoryDROP,
NullDROP,
RDBMSDrop,
ContainerDROP,
)
from ..meta import (
from dlg.meta import (
dlg_string_param,
dlg_float_param,
dlg_component,
Expand All @@ -38,7 +38,7 @@
)


#
##
# @brief ScpApp
# @details A BarrierAppDROP that copies the content of its single input onto its
# single output via SSH's scp protocol.
Expand Down
12 changes: 6 additions & 6 deletions daliuge-engine/dlg/apps/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
import time
import numpy as np

from .. import droputils, utils
from ..drop import BarrierAppDROP, BranchAppDrop, ContainerDROP
from ..meta import dlg_float_param, dlg_string_param
from ..meta import dlg_bool_param, dlg_int_param
from ..meta import dlg_component, dlg_batch_input
from ..meta import dlg_batch_output, dlg_streaming_input
from dlg import droputils, utils
from dlg.drop import BarrierAppDROP, BranchAppDrop, ContainerDROP
from dlg.meta import dlg_float_param, dlg_string_param
from dlg.meta import dlg_bool_param, dlg_int_param
from dlg.meta import dlg_component, dlg_batch_input
from dlg.meta import dlg_batch_output, dlg_streaming_input

from dlg.apps.pyfunc import serialize_data, deserialize_data

Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/apps/socket_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
logger = logging.getLogger(__name__)


#
##
# @brief SocketListenerApp
# @details A BarrierAppDROP that listens on a socket for data. The server-side
# socket expects only one client, and assumes that the client will close the
Expand Down
9 changes: 7 additions & 2 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"""

from abc import ABCMeta, abstractmethod
import ast
import base64
import collections
import contextlib
Expand Down Expand Up @@ -357,6 +358,8 @@ def getmembers(object, predicate=None):
value = str(value)
elif isinstance(obj, dlg_list_param):
value = kwargs.get(attr_name, obj.default_value)
if isinstance(value, str):
value = ast.literal_eval(value)
if value is not None and not isinstance(value, list):
raise Exception(
"dlg_list_param {} is not a list. It is a {}".format(
Expand All @@ -365,6 +368,8 @@ def getmembers(object, predicate=None):
)
elif isinstance(obj, dlg_dict_param):
value = kwargs.get(attr_name, obj.default_value)
if isinstance(value, str):
value = ast.literal_eval(value)
if value is not None and not isinstance(value, dict):
raise Exception(
"dlg_dict_param {} is not a dict. It is a {}".format(
Expand Down Expand Up @@ -2052,13 +2057,13 @@ class PlasmaDROP(AbstractDROP):
"""

object_id = dlg_string_param("object_id", None)
plasma_path = dlg_string_param("plasma_path", "/tmp/plasma")
plasma_path = dlg_string_param("plasma_path", "/tmp/.dlg/workspace/plasma")

def initialize(self, **kwargs):
object_id = self.uid
if len(self.uid) != 20:
object_id = np.random.bytes(20)
if self.object_id is None:
if not self.object_id:
self.object_id = object_id

def getIO(self):
Expand Down
18 changes: 8 additions & 10 deletions daliuge-engine/dlg/droputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,9 @@ def allDropContents(drop, bufsize=4096):
"""
buf = io.BytesIO()
desc = drop.open()
read = drop.read

while True:
data = read(desc, bufsize)
data = drop.read(desc, bufsize)
if not data:
break
buf.write(data)
Expand All @@ -135,22 +134,21 @@ def copyDropContents(source, target, bufsize=4096):
"""
Manually copies data from one DROP into another, in bufsize steps
"""
logger.debug("Copying from %r to %r" % (source, target))
logger.debug(f"Copying from {repr(source)} to {repr(target)}")
desc = source.open()
read = source.read
buf = read(desc, bufsize)
logger.debug("Read %d bytes from %r" % (len(buf), source))
buf = source.read(desc, bufsize)
logger.debug(f"Read {len(buf)} bytes from {repr(source)}")
while buf:
target.write(buf)
logger.debug("Wrote %d bytes to %r" % (len(buf), target))
buf = read(desc, bufsize)
logger.debug("Read %d bytes from %r" % (len(buf), source))
logger.debug(f"Wrote {len(buf)} bytes to {repr(target)}")
buf = source.read(desc, bufsize)
logger.debug(f"Read {len(buf)} bytes from {repr(source)}")
source.close(desc)


def getUpstreamObjects(drop):
"""
Returns a list of all direct "upstream" DROPs for the given
Returns a list of all direct "upstream" DROPs for the given+
DROP. An DROP A is "upstream" with respect to DROP B if
any of the following conditions are true:
Expand Down

0 comments on commit 55ad472

Please sign in to comment.