Skip to content

Commit

Permalink
Merge 7ede09b into 7fa67ff
Browse files Browse the repository at this point in the history
  • Loading branch information
pritchardn committed Dec 9, 2021
2 parents 7fa67ff + 7ede09b commit 454ce9b
Show file tree
Hide file tree
Showing 20 changed files with 1,406 additions and 58 deletions.
6 changes: 3 additions & 3 deletions daliuge-engine/dlg/apps/crc.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from ..meta import dlg_component, dlg_batch_input, dlg_batch_output, dlg_streaming_input

try:
from crc32c import crc32c # @UnusedImport
from crc32c import crc32c as crc32 # @UnusedImport
except:
from binascii import crc32 # @Reimport

Expand Down Expand Up @@ -63,7 +63,7 @@ def run(self):
buf = inputDrop.read(desc, bufsize)
crc = 0
while buf:
crc = crc32c(buf, crc)
crc = crc32(buf, crc)
buf = inputDrop.read(desc, bufsize)
inputDrop.close(desc)

Expand Down Expand Up @@ -102,7 +102,7 @@ def initialize(self, **kwargs):

def dataWritten(self, uid, data):
self.execStatus = AppDROPStates.RUNNING
self._crc = crc32c(data, self._crc)
self._crc = crc32(data, self._crc)

def dropCompleted(self, uid, status):
outputDrop = self.outputs[0]
Expand Down
83 changes: 78 additions & 5 deletions daliuge-engine/dlg/apps/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#
"""Applications used as examples, for testing, or in simple situations"""
import pickle
import random
import urllib.error
import urllib.request

Expand Down Expand Up @@ -256,7 +257,10 @@ class AverageArraysApp(BarrierAppDROP):
# default values
methods = ["mean", "median"]
method = dlg_string_param("method", methods[0])
marray = []

def __init__(self, oid, uid, **kwargs):
super().__init__(oid, kwargs)
self.marray = []

def initialize(self, **kwargs):
super(AverageArraysApp, self).initialize(**kwargs)
Expand All @@ -268,9 +272,9 @@ def run(self):
if len(outs) < 1:
raise Exception("At least one output should have been added to %r" % self)
self.getInputArrays()
avg = self.averageArray()
self._avg = self.averageArray()
for o in outs:
d = pickle.dumps(avg)
d = pickle.dumps(self._avg)
o.len = len(d)
o.write(d) # average across inputs

Expand All @@ -283,8 +287,14 @@ def getInputArrays(self):
ins = self.inputs
if len(ins) < 1:
raise Exception("At least one input should have been added to %r" % self)

marray = [pickle.loads(droputils.allDropContents(inp)) for inp in ins]
marray = []
for inp in ins:
sarray = droputils.allDropContents(inp)
if len(sarray) == 0:
print(f"Input does not contain data!")
else:
sarray = pickle.loads(sarray)
marray.extend(sarray)
self.marray = marray

def averageArray(self):
Expand Down Expand Up @@ -462,3 +472,66 @@ def run(self):

def condition(self):
return self.result



##
# @brief ListAppendThrashingApp\n
# @details A testing APP that appends a random integer to a list num times.
# This is a CPU intensive operation and can thus be used to provide a test for application threading
# since this operation will not yield.
# The resulting array will be sent to all connected output apps.
# @par EAGLE_START
# @param gitrepo $(GIT_REPO)
# @param version $(PROJECT_VERSION)
# @param category PythonApp
# @param[in] param/size/100/Integer/readwrite
# \~English the size of the array\n
# @param[in] param/appclass/dlg.apps.simple.ListAppendThrashingApp/String/readonly
# \~English Application class\n
# @param[out] port/array
# \~English Port carrying the random array.
# @par EAGLE_END
class ListAppendThrashingApp(BarrierAppDROP):
"""
A BarrierAppDrop that appends random integers to a list N times. It does
not require any inputs and writes the generated array to all of its
outputs.
Keywords:
size: int, number of array elements
"""
compontent_meta = dlg_component('ListAppendThrashingApp', 'List Append Thrashing',
[dlg_batch_input('binary/*', [])],
[dlg_batch_output('binary/*', [])],
[dlg_streaming_input('binary/*')])

def initialize(self, **kwargs):
self.size = self._getArg(kwargs, 'size', 100)
self.marray = []
super(ListAppendThrashingApp, self).initialize(**kwargs)

def run(self):
# At least one output should have been added
outs = self.outputs
if len(outs) < 1:
raise Exception(
'At least one output should have been added to %r' % self)
self.marray = self.generateArray()
for o in outs:
d = pickle.dumps(self.marray)
o.len = len(d)
o.write(pickle.dumps(self.marray))

def generateArray(self):
# This operation is wasteful to simulate an N^2 operation.
marray = []
for _ in range(int(self.size)):
marray = []
for i in range(int(self.size)):
marray.append(random.random())
return marray

def _getArray(self):
return self.marray
29 changes: 27 additions & 2 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import threading
import time
import re
import sys
import inspect
import binascii

Expand Down Expand Up @@ -67,7 +68,12 @@
PlasmaIO,
PlasmaFlightIO,
)
if sys.version_info >= (3, 8):
from .io import SharedMemoryIO
from .utils import prepare_sql, createDirIfMissing, isabs, object_tracking
from .meta import dlg_float_param, dlg_int_param, dlg_list_param, \
dlg_string_param, dlg_bool_param, dlg_dict_param
from dlg.process import DlgProcess
from .meta import (
dlg_float_param,
dlg_int_param,
Expand Down Expand Up @@ -634,6 +640,15 @@ def checksum(self):
:see: `self.checksumType`
"""
if self.status == DROPStates.COMPLETED and self._checksum is None:
# Generate on the fly
io = self.getIO()
io.open(OpenMode.OPEN_READ)
data = io.read(4096)
while data is not None and len(data) > 0:
self._updateChecksum(data)
data = io.read(4096)
io.close()
return self._checksum

@checksum.setter
Expand Down Expand Up @@ -1416,7 +1431,10 @@ def initialize(self, **kwargs):
self._buf = io.BytesIO(*args)

def getIO(self):
return MemoryIO(self._buf)
if hasattr(self, '_tp') and hasattr(self, '_sessID') and sys.version_info >= (3, 8):
return SharedMemoryIO(self.oid, self._sessID)
else:
return MemoryIO(self._buf)

@property
def dataURL(self):
Expand Down Expand Up @@ -1986,7 +2004,14 @@ def execute(self, _send_notifications=True):
self.execStatus = AppDROPStates.RUNNING
while tries < self.n_tries:
try:
self.run()
if hasattr(self, '_tp'):
proc = DlgProcess(target=self.run, daemon=True)
proc.start()
proc.join()
if proc.exception:
raise proc.exception
else:
self.run()
if self.execStatus == AppDROPStates.CANCELLED:
return
self.execStatus = AppDROPStates.FINISHED
Expand Down
3 changes: 2 additions & 1 deletion daliuge-engine/dlg/droputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ def copyDropContents(source, target, bufsize=4096):
target.write(buf)
logger.debug(f"Wrote {len(buf)} bytes to {repr(target)}")
buf = source.read(desc, bufsize)
logger.debug(f"Read {len(buf)} bytes from {repr(source)}")
if buf is not None:
logger.debug(f"Read {len(buf)} bytes from {repr(source)}")
source.close(desc)


Expand Down
69 changes: 67 additions & 2 deletions daliuge-engine/dlg/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@
import io
import logging
import os
import sys
import urllib.parse

from abc import abstractmethod, ABCMeta
from typing import Optional, Union

from . import ngaslite
from .apps.plasmaflight import PlasmaFlightClient

import pyarrow
import pyarrow.plasma as plasma
if sys.version_info >= (3, 8):
from dlg.shared_memory import DlgSharedMemory


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -219,9 +222,10 @@ class MemoryIO(DataIO):
A DataIO class that reads/write from/into the BytesIO object given at
construction time
"""
_desc: io.BytesIO
_desc: io.BytesIO # TODO: This might actually be a problem

def __init__(self, buf: io.BytesIO, **kwargs):
super().__init__()
self._buf = buf

def _open(self, **kwargs):
Expand Down Expand Up @@ -257,9 +261,70 @@ def delete(self):

@overrides
def buffer(self) -> memoryview:
# TODO: This may also be an issue
return self._open().getbuffer()



class SharedMemoryIO(DataIO):
"""
A DataIO class that writes to a shared memory buffer
"""

def __init__(self, uid, session_id, **kwargs):
super().__init__()
self._name = f'{session_id}_{uid}'
self._written = 0
self._pos = 0
self._buf = None

def _open(self, **kwargs):
self._pos = 0
if self._buf is None:
if self._mode == OpenMode.OPEN_WRITE:
self._written = 0
self._buf = DlgSharedMemory(self._name)
return self._buf

def _write(self, data, **kwargs):
total_size = len(data) + self._written
if total_size > self._buf.size:
self._buf.resize(total_size)
self._buf.buf[self._written:total_size] = data
self._written = total_size
else:
self._buf.buf[self._written:total_size] = data
self._written = total_size
self._buf.resize(total_size)
"""
It may be inefficient to resize many times, but assuming data is written 'once' this is
might be tolerable and guarantees that the size of the underlying buffer is tight.
"""
return len(data)

def _read(self, count=4096, **kwargs):
if self._pos == self._buf.size:
return None
start = self._pos
end = self._pos + count
end = min(end, self._buf.size)
out = self._buf.buf[start:end]
self._pos = end
return out

def _close(self, **kwargs):
if self._mode == OpenMode.OPEN_WRITE:
self._buf.resize(self._written)
self._buf.close()
self._buf = None

def exists(self):
return self._buf is not None

def delete(self):
self._close()


class FileIO(DataIO):
_desc: io.BufferedReader
def __init__(self, filename, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/manager/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ def dlgNM(parser, args):
action="store",
type="int",
dest="max_threads",
help="Max thread pool size used for executing drops. 0 (default) means no pool.",
help="Max thread pool size used for executing drops. -1 means use all CPUs. 0 (default) means no threads.",
default=0,
)
(options, args) = parser.parse_args(args)
Expand Down
Loading

0 comments on commit 454ce9b

Please sign in to comment.