Skip to content

Commit

Permalink
Merge 310b431 into ca64b91
Browse files Browse the repository at this point in the history
  • Loading branch information
pritchardn committed Nov 4, 2021
2 parents ca64b91 + 310b431 commit c4b7145
Show file tree
Hide file tree
Showing 21 changed files with 1,448 additions and 95 deletions.
6 changes: 3 additions & 3 deletions daliuge-engine/dlg/apps/crc.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from ..meta import dlg_component, dlg_batch_input, dlg_batch_output, dlg_streaming_input

try:
from crc32c import crc32c # @UnusedImport
from crc32c import crc32c as crc32 # @UnusedImport
except:
from binascii import crc32 # @Reimport

Expand Down Expand Up @@ -60,7 +60,7 @@ def run(self):
buf = inputDrop.read(desc, bufsize)
crc = 0
while buf:
crc = crc32c(buf, crc)
crc = crc32(buf, crc)
buf = inputDrop.read(desc, bufsize)
inputDrop.close(desc)

Expand All @@ -85,7 +85,7 @@ def initialize(self, **kwargs):

def dataWritten(self, uid, data):
self.execStatus = AppDROPStates.RUNNING
self._crc = crc32c(data, self._crc)
self._crc = crc32(data, self._crc)

def dropCompleted(self, uid, status):
outputDrop = self.outputs[0]
Expand Down
85 changes: 79 additions & 6 deletions daliuge-engine/dlg/apps/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#
"""Applications used as examples, for testing, or in simple situations"""
import pickle
import random
import urllib.error
import urllib.request

Expand Down Expand Up @@ -235,7 +236,10 @@ class AverageArraysApp(BarrierAppDROP):
# default values
methods = ['mean', 'median']
method = dlg_string_param('method', methods[0])
marray = []

def __init__(self, oid, uid, **kwargs):
super().__init__(oid, kwargs)
self.marray = []

def initialize(self, **kwargs):
super(AverageArraysApp, self).initialize(**kwargs)
Expand All @@ -248,9 +252,9 @@ def run(self):
raise Exception(
'At least one output should have been added to %r' % self)
self.getInputArrays()
avg = self.averageArray()
self._avg = self.averageArray()
for o in outs:
d = pickle.dumps(avg)
d = pickle.dumps(self._avg)
o.len = len(d)
o.write(d) # average across inputs

Expand All @@ -264,13 +268,18 @@ def getInputArrays(self):
if len(ins) < 1:
raise Exception(
'At least one input should have been added to %r' % self)

marray = [pickle.loads(droputils.allDropContents(inp)) for inp in ins]
marray = []
for inp in ins:
sarray = droputils.allDropContents(inp)
if len(sarray) == 0:
print(f"Input does not contain data!")
else:
sarray = pickle.loads(sarray)
marray.extend(sarray)
self.marray = marray


def averageArray(self):

method_to_call = getattr(np, self.method)
return method_to_call(self.marray, axis=0)

Expand Down Expand Up @@ -417,6 +426,7 @@ def run(self):
o.len = len(d)
o.write(d) # average across inputs


class SimpleBranch(BranchAppDrop, NullBarrierApp):
"""Simple branch app that is told the result of its condition"""

Expand All @@ -429,3 +439,66 @@ def run(self):

def condition(self):
return self.result



##
# @brief ListAppendThrashingApp\n
# @details A testing APP that appends a random integer to a list num times.
# This is a CPU intensive operation and can thus be used to provide a test for application threading
# since this operation will not yield.
# The resulting array will be sent to all connected output apps.
# @par EAGLE_START
# @param gitrepo $(GIT_REPO)
# @param version $(PROJECT_VERSION)
# @param category PythonApp
# @param[in] param/size/100/Integer/readwrite
# \~English the size of the array\n
# @param[in] param/appclass/dlg.apps.simple.ListAppendThrashingApp/String/readonly
# \~English Application class\n
# @param[out] port/array
# \~English Port carrying the random array.
# @par EAGLE_END
class ListAppendThrashingApp(BarrierAppDROP):
"""
A BarrierAppDrop that appends random integers to a list N times. It does
not require any inputs and writes the generated array to all of its
outputs.
Keywords:
size: int, number of array elements
"""
compontent_meta = dlg_component('ListAppendThrashingApp', 'List Append Thrashing',
[dlg_batch_input('binary/*', [])],
[dlg_batch_output('binary/*', [])],
[dlg_streaming_input('binary/*')])

def initialize(self, **kwargs):
self.size = self._getArg(kwargs, 'size', 100)
self.marray = []
super(ListAppendThrashingApp, self).initialize(**kwargs)

def run(self):
# At least one output should have been added
outs = self.outputs
if len(outs) < 1:
raise Exception(
'At least one output should have been added to %r' % self)
self.marray = self.generateArray()
for o in outs:
d = pickle.dumps(self.marray)
o.len = len(d)
o.write(pickle.dumps(self.marray))

def generateArray(self):
# This operation is wasteful to simulate an N^2 operation.
marray = []
for _ in range(int(self.size)):
marray = []
for i in range(int(self.size)):
marray.append(random.random())
return marray

def _getArray(self):
return self.marray
32 changes: 25 additions & 7 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
"""
Module containing the core DROP classes.
"""

from abc import ABCMeta, abstractmethod
import base64
import collections
Expand All @@ -39,6 +38,7 @@
import threading
import time
import re
import sys
import inspect
import binascii

Expand All @@ -49,10 +49,12 @@
from .event import EventFirer
from .exceptions import InvalidDropException, InvalidRelationshipException
from .io import OpenMode, FileIO, MemoryIO, NgasIO, NgasLiteIO, ErrorIO, NullIO, PlasmaIO, PlasmaFlightIO
if sys.version_info >= (3, 8):
from .io import SharedMemoryIO
from .utils import prepare_sql, createDirIfMissing, isabs, object_tracking
from .meta import dlg_float_param, dlg_int_param, dlg_list_param, \
dlg_string_param, dlg_bool_param, dlg_dict_param

from dlg.process import DlgProcess
import pyarrow.plasma as plasma

# Opt into using per-drop checksum calculation
Expand Down Expand Up @@ -465,7 +467,6 @@ def write(self, data, **kwargs):
The underlying storage mechanism is responsible for implementing the
final writing logic via the `self.writeMeta()` method.
'''

if self.status not in [DROPStates.INITIALIZED, DROPStates.WRITING]:
raise Exception("No more writing expected")

Expand All @@ -482,7 +483,6 @@ def write(self, data, **kwargs):
self.status = DROPStates.ERROR
raise Exception("Problem opening drop for write!")
nbytes = self._wio.write(data)

dataLen = len(data)
if nbytes != dataLen:
# TODO: Maybe this should be an actual error?
Expand Down Expand Up @@ -515,7 +515,6 @@ def write(self, data, **kwargs):
self.setCompleted()
else:
self.status = DROPStates.WRITING

return nbytes

@abstractmethod
Expand Down Expand Up @@ -564,6 +563,15 @@ def checksum(self):
:see: `self.checksumType`
"""
if self.status == DROPStates.COMPLETED and self._checksum is None:
# Generate on the fly
io = self.getIO()
io.open(OpenMode.OPEN_READ)
data = io.read(4096)
while data is not None and len(data) > 0:
self._updateChecksum(data)
data = io.read(4096)
io.close()
return self._checksum

@checksum.setter
Expand Down Expand Up @@ -1284,7 +1292,10 @@ def initialize(self, **kwargs):
self._buf = io.BytesIO(*args)

def getIO(self):
return MemoryIO(self._buf)
if hasattr(self, '_tp') and hasattr(self, '_sessID') and sys.version_info >= (3, 8):
return SharedMemoryIO(self.oid, self._sessID)
else:
return MemoryIO(self._buf)

@property
def dataURL(self):
Expand Down Expand Up @@ -1800,7 +1811,14 @@ def execute(self, _send_notifications=True):
self.execStatus = AppDROPStates.RUNNING
while tries < self.n_tries:
try:
self.run()
if hasattr(self, '_tp'):
proc = DlgProcess(target=self.run, daemon=True)
proc.start()
proc.join()
if proc.exception:
raise proc.exception
else:
self.run()
if self.execStatus == AppDROPStates.CANCELLED:
return
self.execStatus = AppDROPStates.FINISHED
Expand Down
3 changes: 2 additions & 1 deletion daliuge-engine/dlg/droputils.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ def copyDropContents(source, target, bufsize=4096):
target.write(buf)
logger.debug("Wrote %d bytes to %r" % (len(buf), target))
buf = read(desc, bufsize)
logger.debug("Read %d bytes from %r" % (len(buf), source))
if buf is not None:
logger.debug("Read %d bytes from %r" % (len(buf), source))
source.close(desc)

def getUpstreamObjects(drop):
Expand Down
Loading

0 comments on commit c4b7145

Please sign in to comment.