From cef5e7139d3a4a3a17ad5817648baf747c67bfd4 Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Thu, 17 Mar 2022 16:44:22 +0800 Subject: [PATCH 01/12] plasma pipeline testing --- daliuge-engine/dlg/apps/dockerapp.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/daliuge-engine/dlg/apps/dockerapp.py b/daliuge-engine/dlg/apps/dockerapp.py index 17a57cf43..eaddaa83e 100644 --- a/daliuge-engine/dlg/apps/dockerapp.py +++ b/daliuge-engine/dlg/apps/dockerapp.py @@ -236,7 +236,6 @@ class DockerApp(BarrierAppDROP): running in a container must quit themselves after successfully performing their task. """ - _container: Optional[Container] = None # signals for stopping this drop must first wait @@ -245,7 +244,7 @@ class DockerApp(BarrierAppDROP): # be to use a stopcontainer member variable flag. As soon as the container is # created the running process checks to see if it should stop. Use lock for # atomicity with _container and _stopflag. - _containerLock = multiprocessing.synchronize.Lock + _containerLock: multiprocessing.synchronize.Lock @property def container(self) -> Optional[Container]: From 5951ceff9aca827a91ada237035b55c67b34572a Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Wed, 23 Mar 2022 14:16:43 +0800 Subject: [PATCH 02/12] support npy streams --- daliuge-engine/dlg/drop.py | 6 +- daliuge-engine/dlg/droputils.py | 41 ++++++++++--- daliuge-engine/dlg/io.py | 102 +++++++++++++++++++++++++------- 3 files changed, 116 insertions(+), 33 deletions(-) diff --git a/daliuge-engine/dlg/drop.py b/daliuge-engine/dlg/drop.py index 5e0d738e6..214ca34bb 100644 --- a/daliuge-engine/dlg/drop.py +++ b/daliuge-engine/dlg/drop.py @@ -136,10 +136,11 @@ class ListAsDict(list): """A list that adds drop UIDs to a set as they get appended to the list""" def __init__(self, my_set): + super().__init__() self.set = my_set def append(self, drop): - super(ListAsDict, self).append(drop) + super().append(drop) self.set.add(drop.uid) @@ -1337,7 +1338,7 @@ def write(self, data: Union[bytes, memoryview], **kwargs): self._wio.open(OpenMode.OPEN_WRITE) except: self.status = DROPStates.ERROR - raise Exception("Problem opening drop for write!") + raise # Exception("Problem opening drop for write!") nbytes = self._wio.write(data) dataLen = len(data) @@ -1823,6 +1824,7 @@ class InMemoryDROP(DataDROP): """ A DROP that points data stored in memory. """ + _buf: io.BytesIO def initialize(self, **kwargs): args = [] diff --git a/daliuge-engine/dlg/droputils.py b/daliuge-engine/dlg/droputils.py index 4ce78c231..ce85732b1 100644 --- a/daliuge-engine/dlg/droputils.py +++ b/daliuge-engine/dlg/droputils.py @@ -31,7 +31,7 @@ import re import threading import traceback -from typing import IO, Any, AsyncIterable, BinaryIO, Dict, Iterable, overload +from typing import List, IO, Any, AsyncIterable, BinaryIO, Dict, Iterable, overload import numpy as np from dlg.ddap_protocol import DROPStates @@ -83,8 +83,9 @@ class DROPWaiterCtx(object): a.write('a') a.setCompleted() """ + _evts: List[threading.Event] - def __init__(self, test, drops, timeout=1, expected_states=[]): + def __init__(self, test, drops, timeout=1, expected_states=None): self._drops = listify(drops) self._expected_states = expected_states or ( DROPStates.COMPLETED, @@ -204,7 +205,7 @@ def getLeafNodes(drops): ] -def depthFirstTraverse(node: AbstractDROP, visited=[]): +def depthFirstTraverse(node: AbstractDROP, visited=None): """ Depth-first iterator for a DROP graph. @@ -215,7 +216,7 @@ def depthFirstTraverse(node: AbstractDROP, visited=[]): This implementation is recursive. """ - + if visited is None: visited = [] dependencies = getDownstreamObjects(node) yield node, dependencies visited.append(node) @@ -302,12 +303,12 @@ def save_npy(drop: DataDROP, ndarray: np.ndarray, allow_pickle=False): """ Saves a numpy ndarray to a drop in npy format """ - bio = io.BytesIO() + dropio = drop.getIO() + dropio.open(OpenMode.OPEN_WRITE) # np.save accepts a "file-like" object which basically just requires # a .write() method. Try np.save(drop, array) + bio = io.BytesIO() np.save(bio, ndarray, allow_pickle=allow_pickle) - dropio = drop.getIO() - dropio.open(OpenMode.OPEN_WRITE) dropio.write(bio.getbuffer()) dropio.close() @@ -327,8 +328,30 @@ def load_npy(drop: DataDROP, allow_pickle=False) -> np.ndarray: return res -def load_numpy(drop: DataDROP): - return load_npy(drop) +async def save_npy_stream(drop: DataDROP, arrays: AsyncIterable[np.ndarray], allow_pickle=False): + """ + Saves an async stream of numpy ndarrays to a data drop + """ + dropio = drop.getIO() + dropio.open(OpenMode.OPEN_WRITE) + async for ndarray in arrays: + bio = io.BytesIO() + np.save(bio, ndarray, allow_pickle=allow_pickle) + dropio.write(bio.getbuffer()) + dropio.close() + + +async def load_npy_stream(drop: DataDROP, allow_pickle=False) -> AsyncIterable[np.ndarray]: + """ + Loads an async stream of numpy ndarrays from a data drop + """ + dropio = drop.getIO() + dropio.open(OpenMode.OPEN_READ) + # this requires dropio interface to contain read() and seek() + while dropio.peek(1): + # TODO: peek should also be awaitable, otherwise return regular iterable + yield np.load(dropio, allow_pickle=allow_pickle) + dropio.close() # def save_jsonp(drop: PathBasedDrop, data: Dict[str, object]): diff --git a/daliuge-engine/dlg/io.py b/daliuge-engine/dlg/io.py index 1201226a7..49da2c241 100644 --- a/daliuge-engine/dlg/io.py +++ b/daliuge-engine/dlg/io.py @@ -20,6 +20,7 @@ # MA 02111-1307 USA # from abc import abstractmethod, ABCMeta +from enum import IntEnum from http.client import HTTPConnection from multiprocessing.sharedctypes import Value from overrides import overrides @@ -29,7 +30,7 @@ import sys import urllib.parse from abc import abstractmethod, ABCMeta -from typing import Optional, Union +from typing import BinaryIO, Optional, Union from . import ngaslite from .apps.plasmaflight import PlasmaFlightClient @@ -43,16 +44,15 @@ logger = logging.getLogger(__name__) - -class OpenMode: +class OpenMode(IntEnum): """ Open Mode for Data Drops """ - - OPEN_WRITE, OPEN_READ = range(2) + OPEN_WRITE = 0 + OPEN_READ = 1 -class DataIO: +class DataIO(io.BufferedIOBase, BinaryIO): """ A class used to read/write data stored in a particular kind of storage in an abstract way. This base class simply declares a number of methods that @@ -73,8 +73,10 @@ class DataIO: __metaclass__ = ABCMeta _mode: Optional[OpenMode] + _desc: Any def __init__(self): + super().__init__() self._mode = None def open(self, mode: OpenMode, **kwargs): @@ -96,7 +98,7 @@ def write(self, data, **kwargs) -> int: raise ValueError("Writing operation attempted on write-only DataIO object") return self._write(data, **kwargs) - def read(self, count: int, **kwargs): + def read(self, count: int, **kwargs) -> bytes: """ Reads `count` bytes from the underlying storage. """ @@ -106,6 +108,24 @@ def read(self, count: int, **kwargs): raise ValueError("Reading operation attempted on write-only DataIO object") return self._read(count, **kwargs) + def peek(self, size: int, **kwargs): + """ + Previews `count` bytes from the underlying storage without moving the read cursor + """ + return self._peek(size, **kwargs) + + def tell(self): + """ + Returns the current read cursor position. + """ + return self._tell() + + def seek(self, offset, whence): + """ + Sets the position of the read cursor + """ + return self._seek(offset, whence) + def close(self, **kwargs): """ Closes the underlying storage where the data represented by this @@ -151,10 +171,25 @@ def buffer(self) -> Union[memoryview, bytes, bytearray, pyarrow.Buffer]: @abstractmethod def _open(self, **kwargs): + """ + Returns a reader or writer object depending on self._mode + """ pass @abstractmethod - def _read(self, count, **kwargs): + def _read(self, count, **kwargs) -> bytes: + pass + + #@abstractmethod + def _peek(self, count, **kwargs): + pass + + #@abstractmethod + def _tell(self): + pass + + #@abstractmethod + def _seek(self, offset, whence): pass @abstractmethod @@ -162,7 +197,7 @@ def _write(self, data, **kwargs) -> int: pass @abstractmethod - def _close(self, **kwargs): + def _close(self, **kwargs) -> None: pass @abstractmethod @@ -175,15 +210,19 @@ class NullIO(DataIO): A DataIO that stores no data """ + @overrides def _open(self, **kwargs): return None - def _read(self, count=4096, **kwargs): + @overrides + def _read(self, count=4096, **kwargs) -> bytes: return None + @overrides def _write(self, data, **kwargs) -> int: return len(data) + @overrides def _close(self, **kwargs): pass @@ -213,7 +252,7 @@ def _open(self, **kwargs): raise NotImplementedError() @overrides - def _read(self, count=4096, **kwargs): + def _read(self, count=4096, **kwargs) -> bytes: raise NotImplementedError() @overrides @@ -242,8 +281,9 @@ class MemoryIO(DataIO): A DataIO class that reads/write from/into the BytesIO object given at construction time """ - - _desc: io.BytesIO # TODO: This might actually be a problem + # TODO: This might actually be a problem + _desc: io.BufferedReader + _buf: io.BytesIO def __init__(self, buf: io.BytesIO, **kwargs): super().__init__() @@ -253,8 +293,12 @@ def _open(self, **kwargs): if self._mode == OpenMode.OPEN_WRITE: return self._buf elif self._mode == OpenMode.OPEN_READ: - # TODO: potentially wasteful copy - return io.BytesIO(self._buf.getbuffer()) + # NOTE: BytesIO extends BufferedIOBase instead of RawIOBase + # TODO: a new bytesIO object must be created as it is currently + # convention to close the write dataIO afte writing + br = io.BufferedReader(io.BytesIO(self._buf.getbuffer())) + br.seek(0) + return br else: raise ValueError() @@ -264,9 +308,18 @@ def _write(self, data, **kwargs) -> int: return len(data) @overrides - def _read(self, count=4096, **kwargs): + def _read(self, count=4096, **kwargs) -> bytes: return self._desc.read(count) + def _peek(self, count, **kwargs): + return self._desc.peek(count) + + def _tell(self): + return self._desc.tell() + + def _seek(self, offset, whence): + return self._desc.seek(offset, whence) + @overrides def _close(self, **kwargs): if self._mode == OpenMode.OPEN_READ: @@ -332,7 +385,7 @@ def _write(self, data, **kwargs) -> int: return len(data) @overrides - def _read(self, count=4096, **kwargs): + def _read(self, count=4096, **kwargs) -> bytes: if self._pos == self._buf.size: return None start = self._pos @@ -379,7 +432,7 @@ def _open(self, **kwargs) -> io.BufferedReader: return open(self._fnm, flag) @overrides - def _read(self, count=4096, **kwargs): + def _read(self, count=4096, **kwargs) -> bytes: return self._desc.read(count) @overrides @@ -491,9 +544,9 @@ def _close(self, **kwargs): del self._desc @overrides - def _read(self, count, **kwargs): + def _read(self, count, **kwargs) -> bytes: # Read data from NGAS and give it back to our reader - self._desc.retrieve2File(self._fileId, cmd="QRETRIEVE") + return self._desc.retrieve2File(self._fileId, cmd="QRETRIEVE") @overrides def _write(self, data, **kwargs) -> int: @@ -722,7 +775,8 @@ def _close(self, **kwargs): if self._reader: self._reader.close() - def _read(self, count, **kwargs): + @overrides + def _read(self, count, **kwargs) -> bytes: if not self._reader: [data] = self._desc.get_buffers([self._object_id]) self._reader = pyarrow.BufferReader(data) @@ -816,9 +870,11 @@ def __init__( self._buffer_size = 0 self._use_staging = use_staging + @overrides def _open(self, **kwargs): return PlasmaFlightClient(socket=self._plasma_path) + @overrides def _close(self, **kwargs): if self._writer: if self._use_staging: @@ -833,12 +889,14 @@ def _close(self, **kwargs): if self._reader: self._reader.close() - def _read(self, count, **kwargs): + @overrides + def _read(self, count, **kwargs) -> bytes: if not self._reader: data = self._desc.get_buffer(self._object_id, self._flight_path) self._reader = pyarrow.BufferReader(data) return self._reader.read1(count) + @overrides def _write(self, data, **kwargs) -> int: # NOTE: data must be a collection of bytes for len to represent the buffer bytesize From c261bb0610df8861e3eb13aaf76d6520d874bc13 Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Mon, 28 Mar 2022 16:46:15 +0800 Subject: [PATCH 03/12] private getio, add streaming types --- daliuge-engine/dlg/ddap_protocol.py | 31 +++++++---- daliuge-engine/dlg/environmentvar_drop.py | 4 +- daliuge-engine/dlg/parset_drop.py | 2 +- daliuge-engine/dlg/s3_drop.py | 2 +- .../data_development/data_index.rst | 14 +++++ .../data_development/data_streaming.rst | 53 +++++++++++++++++++ 6 files changed, 91 insertions(+), 15 deletions(-) create mode 100644 docs/development/data_development/data_streaming.rst diff --git a/daliuge-engine/dlg/ddap_protocol.py b/daliuge-engine/dlg/ddap_protocol.py index fcce57bd9..3cc448e61 100644 --- a/daliuge-engine/dlg/ddap_protocol.py +++ b/daliuge-engine/dlg/ddap_protocol.py @@ -20,6 +20,7 @@ # MA 02111-1307 USA # import collections +from enum import IntEnum class DROPLinkType: @@ -47,24 +48,32 @@ class DROPLinkType: ) = range(8) -class DROPStates: +class DROPStates(IntEnum): """ An enumeration of the different states a DROP can be found in. DROPs start in the INITIALIZED state, go optionally through WRITING and arrive to COMPLETED. Later, they transition through EXPIRED, eventually arriving to DELETED. """ + INITIALIZED = 0 + WRITING = 1 + COMPLETED = 2 + ERROR = 3 + EXPIRED = 4 + DELETED = 5 + CANCELLED = 6 + SKIPPED = 7 - ( - INITIALIZED, - WRITING, - COMPLETED, - ERROR, - EXPIRED, - DELETED, - CANCELLED, - SKIPPED, - ) = range(8) + +class DROPStreamingTypes(IntEnum): + """ + An enumeration of the different types of streaming a data drop can be + configured to. + """ + NONE = 0 # No data streaming. Single write, multiple reads. + SYNC_STREAM = 1 # Multiple reads using callback. + SINGLE_STREAM = 2 # Cold stream using AsyncIterable. + MULTI_STREAM = 3 # Hot stream using AsyncIterable. class AppDROPStates: diff --git a/daliuge-engine/dlg/environmentvar_drop.py b/daliuge-engine/dlg/environmentvar_drop.py index d0f5948c8..c04ceb301 100644 --- a/daliuge-engine/dlg/environmentvar_drop.py +++ b/daliuge-engine/dlg/environmentvar_drop.py @@ -81,8 +81,8 @@ def initialize(self, **kwargs): self._variables = dict() self._variables.update(_filter_parameters(self.parameters)) - def getIO(self): - return MemoryIO(io.BytesIO(json.dumps(self._variables).encode("utf-8"))) + def _getIO(self): + return MemoryIO(io.BytesIO(json.dumps(self._variables).encode('utf-8'))) def get(self, key): """ diff --git a/daliuge-engine/dlg/parset_drop.py b/daliuge-engine/dlg/parset_drop.py index f57c5a77e..c3db33b08 100644 --- a/daliuge-engine/dlg/parset_drop.py +++ b/daliuge-engine/dlg/parset_drop.py @@ -86,7 +86,7 @@ def initialize(self, **kwargs): self.filter_parameters(self.parameters, self.mode), self.mode ).encode("utf-8") - def getIO(self): + def _getIO(self): return MemoryIO(io.BytesIO(self.config_data)) @property diff --git a/daliuge-engine/dlg/s3_drop.py b/daliuge-engine/dlg/s3_drop.py index b2052746b..7f6696a4f 100644 --- a/daliuge-engine/dlg/s3_drop.py +++ b/daliuge-engine/dlg/s3_drop.py @@ -106,7 +106,7 @@ def size(self): return -1 - def getIO(self): + def _getIO(self): """ This type of DROP cannot be accessed directly :return: diff --git a/docs/development/data_development/data_index.rst b/docs/development/data_development/data_index.rst index 194981c25..d70018623 100644 --- a/docs/development/data_development/data_index.rst +++ b/docs/development/data_development/data_index.rst @@ -14,6 +14,20 @@ Different from most other frameworks |daliuge| makes data components first class * Abstraction of the I/O interface particularities of the underlying data storage mechanism. * Implementation of the DALiuGE data state engine, which is the |daliuge| mechansim to drive the execution of the workflow graphs. +DataDROP Concepts +================= + +DataDROP +DropStates +DROPPhases +DropLink +ChecksumTypes + +Streaming Data +============== + +ExecutionMode + Components ========== diff --git a/docs/development/data_development/data_streaming.rst b/docs/development/data_development/data_streaming.rst new file mode 100644 index 000000000..a0999b2e4 --- /dev/null +++ b/docs/development/data_development/data_streaming.rst @@ -0,0 +1,53 @@ +.. _data_streaming: + +Synchronous Streaming +--------------------- + +This form of data streaming connects an app producer with an app consumer via the +appdrop write method and dataWritten callback method. This effectively bypasses +the datadrop component which removes the serialization performance overhead, but +does not benefit from subprocessing parallelism without data buffering within the +appdrop. + +Asynchronous Streaming (Proposed) +--------------------------------- + +Async streaming uses async interface such that drop subprocesses +executing run() can use an async loop to await multiple io bound operations. +This has the added benefit of utilizing subprocesses and handling multiple data +streams at the cost of handling back-pressure in data drops. + +Some approaches are: + +asyncio and AsyncIterable - async-pull model +uses asyncio loop + +ReactiveX for Python (RxPy) - async-push model +uses reactivex loop + +asyncio and aioreactive - async-push and async-pull models +uses asyncio loop + +Back-Pressure +""""""""""""" + +Back pressure is when a stream is outputting items more rapidly than an operator +can consume them. + +A cold stream can begin building back pressure until an observer sees fit to subscribe +and consume them. A cold stream will always produce the same sequence regardless of +when and how fast the observer consumes them. Synonymous to TCP. + +A hot stream begins to emit immediately when it is created. A hot stream creates items +at its own pace and it's up to the observer to keep up. Synonymous to UDP. + +https://reactivex.io/documentation/operators/backpressure.html + +Single and Multi Stream +""""""""""""""""""""""" + +AsyncMultiStream are hot + +AsyncSingleStream are cold + +See https://github.com/dbrattli/aioreactive \ No newline at end of file From 3bb8aa45d9b784bcb65f699c176587b1efcd3497 Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Mon, 28 Mar 2022 17:37:37 +0800 Subject: [PATCH 04/12] streaming working --- daliuge-engine/dlg/drop.py | 57 +++++++++++++++++--------- daliuge-engine/dlg/droputils.py | 54 ++++++++++++++++-------- daliuge-engine/dlg/io.py | 70 ++++++++++++++++++++++++++++---- daliuge-engine/setup.py | 1 + daliuge-engine/test/test_drop.py | 49 ++++++++++++++++++---- 5 files changed, 179 insertions(+), 52 deletions(-) diff --git a/daliuge-engine/dlg/drop.py b/daliuge-engine/dlg/drop.py index 214ca34bb..525fed91f 100644 --- a/daliuge-engine/dlg/drop.py +++ b/daliuge-engine/dlg/drop.py @@ -46,6 +46,7 @@ import inspect import binascii from typing import List, Optional, Union +from overrides import overrides import numpy as np import pyarrow.plasma as plasma @@ -61,6 +62,7 @@ from six import BytesIO from .ddap_protocol import ( + DROPStreamingTypes, ExecutionMode, ChecksumTypes, AppDROPStates, @@ -70,7 +72,7 @@ DROPRel, ) from dlg.event import EventFirer -from dlg.exceptions import InvalidDropException, InvalidRelationshipException +from dlg.exceptions import DaliugeException, InvalidDropException, InvalidRelationshipException from dlg.io import ( DataIO, OpenMode, @@ -1211,6 +1213,8 @@ class DataDROP(AbstractDROP): parsed by function `IOForURL`. """ + streamingType = DROPStreamingTypes.NONE + def incrRefCount(self): """ Increments the reference count of this DROP by one atomically. @@ -1236,13 +1240,18 @@ def open(self, **kwargs): invoked. Failing to do so will result in DROPs not expiring and getting deleted. """ - if self.status != DROPStates.COMPLETED: - raise Exception( + valid = (self.isStreaming() and self.status != DROPStates.INITIALIZED) or\ + self.status == DROPStates.COMPLETED + if not valid: + raise DaliugeException( "%r is in state %s (!=COMPLETED), cannot be opened for reading" - % (self, self.status) + % ( + self, + self.status, + ) ) - io = self.getIO() + io = self._getIO() logger.debug("Opening drop %s" % (self.oid)) io.open(OpenMode.OPEN_READ, **kwargs) @@ -1333,7 +1342,7 @@ def write(self, data: Union[bytes, memoryview], **kwargs): # We lazily initialize our writing IO instance because the data of this # DROP might not be written through this DROP if not self._wio: - self._wio = self.getIO() + self._wio = self._getIO() try: self._wio.open(OpenMode.OPEN_WRITE) except: @@ -1406,7 +1415,7 @@ def checksum(self): """ if self.status == DROPStates.COMPLETED and self._checksum is None: # Generate on the fly - io = self.getIO() + io = self._getIO() io.open(OpenMode.OPEN_READ) data = io.read(4096) while data is not None and len(data) > 0: @@ -1458,24 +1467,30 @@ def checksumType(self, value): self._checksumType = value @abstractmethod - def getIO(self) -> DataIO: + def _getIO(self) -> DataIO: """ Returns an instance of one of the `dlg.io.DataIO` instances that handles the data contents of this DROP. """ + def isStreaming(self): + """ + Returns true if the drop is a streaming drop. + """ + return self.streamingType != DROPStreamingTypes.NONE + def delete(self): """ Deletes the data represented by this DROP. """ - self.getIO().delete() + self._getIO().delete() def exists(self): """ Returns `True` if the data represented by this DROP exists indeed in the underlying storage mechanism """ - return self.getIO().exists() + return self._getIO().exists() @abstractproperty def dataURL(self) -> str: @@ -1608,7 +1623,7 @@ def initialize(self, **kwargs): self._wio = None - def getIO(self): + def _getIO(self): return FileIO(self._path) def delete(self): @@ -1724,7 +1739,7 @@ def initialize(self, **kwargs): else: self.fileId = self.uid - def getIO(self): + def _getIO(self): try: ngasIO = NgasIO( self.ngasSrv, @@ -1774,7 +1789,7 @@ def setCompleted(self): # downstream don't fail to read logger.debug("Trying to set size of NGASDrop") try: - stat = self.getIO().fileStatus() + stat = self._getIO().fileStatus() logger.debug( "Setting size of NGASDrop %s to %s" % (self.fileId, stat["FileSize"]) ) @@ -1835,7 +1850,7 @@ def initialize(self, **kwargs): args.append(base64.b64decode(pydata)) self._buf = io.BytesIO(*args) - def getIO(self): + def _getIO(self): if ( hasattr(self, "_tp") and hasattr(self, "_sessID") @@ -1887,7 +1902,7 @@ def initialize(self, **kwargs): args.append(base64.b64decode(pydata)) self._buf = io.BytesIO(*args) - def getIO(self): + def _getIO(self): if sys.version_info >= (3, 8): if hasattr(self, "_sessID"): return SharedMemoryIO(self.oid, self._sessID) @@ -1924,7 +1939,7 @@ class NullDROP(DataDROP): A DROP that doesn't store any data. """ - def getIO(self): + def _getIO(self): return NullIO() @property @@ -1986,7 +2001,7 @@ def initialize(self, **kwargs): # Data store for reproducibility self._querylog = [] - def getIO(self): + def _getIO(self): # This Drop cannot be accessed directly return ErrorIO() @@ -2077,7 +2092,7 @@ def initialize(self, **kwargs): # =========================================================================== # No data-related operations should actually be called in Container DROPs # =========================================================================== - def getIO(self): + def _getIO(self): return ErrorIO() @property @@ -2205,7 +2220,7 @@ def initialize(self, **kwargs): elif isinstance(self.object_id, str): self.object_id = self.object_id.encode("ascii") - def getIO(self): + def _getIO(self): return PlasmaIO( plasma.ObjectID(self.object_id), self.plasma_path, @@ -2256,7 +2271,7 @@ def initialize(self, **kwargs): elif isinstance(self.object_id, str): self.object_id = self.object_id.encode("ascii") - def getIO(self): + def _getIO(self): return PlasmaFlightIO( plasma.ObjectID(self.object_id), self.plasma_path, @@ -2538,12 +2553,14 @@ def initialize(self, **kwargs): self, "Invalid n_tries, must be a positive number" ) + @overrides def addStreamingInput(self, streamingInputDrop, back=True): raise InvalidRelationshipException( DROPRel(streamingInputDrop, DROPLinkType.STREAMING_INPUT, self), "InputFiredAppDROPs don't accept streaming inputs", ) + @overrides def dropCompleted(self, uid, drop_state): super(InputFiredAppDROP, self).dropCompleted(uid, drop_state) diff --git a/daliuge-engine/dlg/droputils.py b/daliuge-engine/dlg/droputils.py index ce85732b1..1995d1d2f 100644 --- a/daliuge-engine/dlg/droputils.py +++ b/daliuge-engine/dlg/droputils.py @@ -34,8 +34,9 @@ from typing import List, IO, Any, AsyncIterable, BinaryIO, Dict, Iterable, overload import numpy as np -from dlg.ddap_protocol import DROPStates +from dlg.ddap_protocol import DROPStates, DROPStreamingTypes from dlg.drop import AppDROP, AbstractDROP, DataDROP, PathBasedDrop +from dlg.exceptions import DaliugeException from dlg.io import IOForURL, OpenMode from dlg import common from dlg.common import DropType @@ -303,7 +304,7 @@ def save_npy(drop: DataDROP, ndarray: np.ndarray, allow_pickle=False): """ Saves a numpy ndarray to a drop in npy format """ - dropio = drop.getIO() + dropio = drop._getIO() dropio.open(OpenMode.OPEN_WRITE) # np.save accepts a "file-like" object which basically just requires # a .write() method. Try np.save(drop, array) @@ -321,7 +322,7 @@ def load_npy(drop: DataDROP, allow_pickle=False) -> np.ndarray: """ Loads a numpy ndarray from a drop in npy format """ - dropio = drop.getIO() + dropio = drop._getIO() dropio.open(OpenMode.OPEN_READ) res = np.load(io.BytesIO(dropio.buffer()), allow_pickle=allow_pickle) dropio.close() @@ -332,27 +333,46 @@ async def save_npy_stream(drop: DataDROP, arrays: AsyncIterable[np.ndarray], all """ Saves an async stream of numpy ndarrays to a data drop """ - dropio = drop.getIO() - dropio.open(OpenMode.OPEN_WRITE) + #assert drop.streamingType in (DROPStreamingTypes.SINGLE_STREAM, DROPStreamingTypes.MULTI_STREAM) async for ndarray in arrays: + logger.debug(f"saving... {drop._wio.tell() if drop._wio else 0}") bio = io.BytesIO() np.save(bio, ndarray, allow_pickle=allow_pickle) - dropio.write(bio.getbuffer()) - dropio.close() + drop.write(bio.getbuffer()) + logger.debug(f"saved {drop._wio.tell() if drop._wio else 0}") + drop.setCompleted() -async def load_npy_stream(drop: DataDROP, allow_pickle=False) -> AsyncIterable[np.ndarray]: +async def load_npy_stream(drop: DataDROP, allow_pickle=False, backoff=0.01) -> AsyncIterable[np.ndarray]: """ Loads an async stream of numpy ndarrays from a data drop """ - dropio = drop.getIO() - dropio.open(OpenMode.OPEN_READ) - # this requires dropio interface to contain read() and seek() - while dropio.peek(1): - # TODO: peek should also be awaitable, otherwise return regular iterable - yield np.load(dropio, allow_pickle=allow_pickle) - dropio.close() - + import asyncio + + desc = None + while desc is None: + try: + desc = drop.open() + except DaliugeException: + # cannot open for read before opening for write + logger.debug("load backing off") + await asyncio.sleep(backoff) + dropio = drop._rios[desc] + + cursor = 0 + while not (drop.isCompleted() and cursor == dropio.size()): + # TODO: peek ideally would be awaitable + if cursor != dropio.size(): #and dropio.peek(1): + dropio.seek(cursor) + logger.debug(f"loading.. {dropio.tell()}") + res = np.load(dropio, allow_pickle=allow_pickle) + # TODO(calgray): buffered reader stream position + # currently matches writer stream position. + cursor = dropio.tell() + yield res + else: + await asyncio.sleep(backoff) + drop.close(desc) # def save_jsonp(drop: PathBasedDrop, data: Dict[str, object]): # with open(drop.path, 'r') as f: @@ -366,7 +386,7 @@ async def load_npy_stream(drop: DataDROP, allow_pickle=False) -> AsyncIterable[n # def load_json(drop: DataDROP) -> dict: -# dropio = drop.getIO() +# dropio = drop._getIO() # dropio.open(OpenMode.OPEN_READ) # data = json.loads(dropio.buffer()) # dropio.close() diff --git a/daliuge-engine/dlg/io.py b/daliuge-engine/dlg/io.py index 49da2c241..eee5ad948 100644 --- a/daliuge-engine/dlg/io.py +++ b/daliuge-engine/dlg/io.py @@ -30,7 +30,7 @@ import sys import urllib.parse from abc import abstractmethod, ABCMeta -from typing import BinaryIO, Optional, Union +from typing import Any, AsyncIterable, AsyncIterator, BinaryIO, Optional, Union from . import ngaslite from .apps.plasmaflight import PlasmaFlightClient @@ -52,7 +52,7 @@ class OpenMode(IntEnum): OPEN_READ = 1 -class DataIO(io.BufferedIOBase, BinaryIO): +class DataIO(): # io.BufferedIOBase, BinaryIO """ A class used to read/write data stored in a particular kind of storage in an abstract way. This base class simply declares a number of methods that @@ -108,6 +108,27 @@ def read(self, count: int, **kwargs) -> bytes: raise ValueError("Reading operation attempted on write-only DataIO object") return self._read(count, **kwargs) + + # TODO: @abstractmethod + async def writeStream(self, stream: AsyncIterable): + """ + Writes a stream of byte buffers to the drop buffer(s) that + can each be asynchronously read using readStream. When + the stream async iterator raises StopAsyncIteration + the drop will close for writing. + """ + raise Exception # TODO: NotImplementedError + + # TODO: @abstractmethod + async def readStream(self) -> AsyncIterable: + """ + Returns a asynchronous stream typically processed using + `async for` that either yields when no data is available, + iterates when data is buffered, or raises StopAsyncIterator + when the stream is complete. + """ + raise StopAsyncIteration + def peek(self, size: int, **kwargs): """ Previews `count` bytes from the underlying storage without moving the read cursor @@ -120,7 +141,7 @@ def tell(self): """ return self._tell() - def seek(self, offset, whence): + def seek(self, offset, whence=io.SEEK_SET): """ Sets the position of the read cursor """ @@ -189,7 +210,7 @@ def _tell(self): pass #@abstractmethod - def _seek(self, offset, whence): + def _seek(self, offset, whence=io.SEEK_SET): pass @abstractmethod @@ -204,6 +225,26 @@ def _close(self, **kwargs) -> None: def _size(self, **kwargs) -> int: pass +class AwaitOnce(): + def __await__(self): + yield + +class DataIOAsyncIterator(AsyncIterator): + """ + An async iterator for a drop using a dynamically + expanding buffer. + """ + def __init__(self, memory_io: DataIO): + self._io = memory_io + + async def __anext__(self): + while not self._io.closed and self._io.tell() != self._io.size(): + if self._io.peek(1): + return self._io.read(4096) + else: + import asyncio + await asyncio.sleep(0.1) + raise StopAsyncIteration class NullIO(DataIO): """ @@ -294,9 +335,8 @@ def _open(self, **kwargs): return self._buf elif self._mode == OpenMode.OPEN_READ: # NOTE: BytesIO extends BufferedIOBase instead of RawIOBase - # TODO: a new bytesIO object must be created as it is currently - # convention to close the write dataIO afte writing - br = io.BufferedReader(io.BytesIO(self._buf.getbuffer())) + br = io.BufferedReader(self._buf) # type: ignore + #br = io.BufferedReader(io.BytesIO(self._buf.getbuffer())) br.seek(0) return br else: @@ -307,10 +347,23 @@ def _write(self, data, **kwargs) -> int: self._desc.write(data) return len(data) + @overrides + async def writeStream(self, stream: AsyncIterable): + async for data in stream: + assert self._desc.writable() + self._desc.write(data) + @overrides def _read(self, count=4096, **kwargs) -> bytes: return self._desc.read(count) + @overrides + async def readStream(self) -> AsyncIterable: + yield self.__aiter__() + + def __aiter__(self) -> AsyncIterator: + return DataIOAsyncIterator(self) + def _peek(self, count, **kwargs): return self._desc.peek(count) @@ -324,6 +377,8 @@ def _seek(self, offset, whence): def _close(self, **kwargs): if self._mode == OpenMode.OPEN_READ: self._desc.close() + else: + logger.debug("closing") # If we're writing we don't close the descriptor because it's our # self._buf, which won't be readable afterwards @@ -347,7 +402,6 @@ def buffer(self) -> memoryview: # TODO: This may also be an issue return self._buf.getbuffer() - class SharedMemoryIO(DataIO): """ A DataIO class that writes to a shared memory buffer diff --git a/daliuge-engine/setup.py b/daliuge-engine/setup.py index 794460ce3..c11361fdf 100644 --- a/daliuge-engine/setup.py +++ b/daliuge-engine/setup.py @@ -117,6 +117,7 @@ def run(self): # Keep alpha-sorted PLEASE! install_requires = [ "wheel", # need to get wheel first... + "asyncstdlib", "bottle", "configobj", "crc32c", diff --git a/daliuge-engine/test/test_drop.py b/daliuge-engine/test/test_drop.py index b08c462f9..10dc09ba0 100644 --- a/daliuge-engine/test/test_drop.py +++ b/daliuge-engine/test/test_drop.py @@ -23,6 +23,7 @@ import contextlib import io import os, unittest +from typing import Any, AsyncIterable, AsyncIterator, Iterable import random import shutil import sqlite3 @@ -31,10 +32,11 @@ import tempfile import subprocess -from dlg import droputils +import dlg.droputils as droputils from dlg.common.reproducibility.constants import ReproducibilityFlags -from dlg.ddap_protocol import DROPStates, ExecutionMode, AppDROPStates +from dlg.ddap_protocol import DROPStates, DROPStreamingTypes, ExecutionMode, AppDROPStates from dlg.drop import ( + DataDROP, FileDROP, AppDROP, InMemoryDROP, @@ -137,9 +139,11 @@ def test_dynamic_write_InMemoryDROP(self): """ self._test_dynamic_write_withDropType(InMemoryDROP) - @unittest.skipIf( - sys.version_info < (3, 8), "Shared memory does nt work < python 3.8" - ) + def test_stream_write_InMemoryDROP(self): + self._test_async_stream_npy_withDropType(InMemoryDROP, 0.025, 0.01) + self._test_async_stream_npy_withDropType(InMemoryDROP, 0.01, 0.025) + + @unittest.skipIf(sys.version_info < (3, 8), "Shared memory does nt work < python 3.8") def test_write_SharedMemoryDROP(self): """ Test a SharedMemoryDROP with simple AppDROP (for checksum calculation) @@ -253,6 +257,37 @@ def _test_dynamic_write_withDropType(self, dropType): self.assertEqual(a.checksum, test_crc) self.assertEqual(cChecksum, test_crc) + def _test_async_stream_npy_withDropType(self, dropType, write_delay: float, read_delay: float): + a: DataDROP = dropType("oid:A", "uid:A", + expectedSize=-1, use_staging=True, + streamingType=DROPStreamingTypes.SINGLE_STREAM) + a.streamingType=DROPStreamingTypes.SINGLE_STREAM + + import asyncio + import asyncstdlib + import numpy as np + + async def delay_iterable(iterable, delay): + for i in iterable: + await asyncio.sleep(delay) + yield i + + in_arrays = [np.random.rand(10,10,10) for _ in range(0,10)] + + async def write_read_assert_stream(): + # NOTE: typically these are performed in parallel on seperate subprocesses + _, out_arrays = await asyncio.gather( + asyncio.create_task(droputils.save_npy_stream(a, delay_iterable(in_arrays, write_delay))), + asyncio.create_task(asyncstdlib.list(droputils.load_npy_stream(a, backoff=read_delay))) + ) + assert len(in_arrays) == len(out_arrays) + for in_array, out_array in zip(in_arrays, out_arrays): + np.testing.assert_array_equal(in_array, out_array) + + with DROPWaiterCtx(self, a, 5): + asyncio.run(write_read_assert_stream()) + + def test_no_write_to_file_drop(self): """Check that FileDrops can be *not* written""" a = FileDROP("a", "a") @@ -330,13 +365,13 @@ def run(self): eResExpected = b"and another one\nwe have an a here\n" gResExpected = b"dna rehtona eno\new evah na a ereh\n" + a.write(contents) with DROPWaiterCtx(self, g): - a.write(contents) a.setCompleted() # Get intermediate and final results and compare actualRes = [] - for i in [c, e, g]: + for i in [c]: actualRes.append(droputils.allDropContents(i)) map( lambda x, y: self.assertEqual(x, y), From 66132ce985818ef8b20810b65eb216fda767fc3c Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Tue, 29 Mar 2022 16:16:40 +0800 Subject: [PATCH 05/12] memoryio support multiple io readers --- daliuge-engine/dlg/io.py | 24 +++++++++++++++++++----- daliuge-engine/test/test_drop.py | 17 ++++++++--------- 2 files changed, 27 insertions(+), 14 deletions(-) diff --git a/daliuge-engine/dlg/io.py b/daliuge-engine/dlg/io.py index eee5ad948..134ec5d49 100644 --- a/daliuge-engine/dlg/io.py +++ b/daliuge-engine/dlg/io.py @@ -330,13 +330,29 @@ def __init__(self, buf: io.BytesIO, **kwargs): super().__init__() self._buf = buf + class BytesIOReader(io.BufferedReader): + """ + An BinaryIO Reader that wraps a BytesIO object for concurrent + reading and writing. Closing this reader will not close other + readers observing the same BytesIO object. + """ + _closed = False + def __init__(self, raw: io.BytesIO, buffer_size: int = 2048): + assert isinstance(raw, io.BytesIO) + # NOTE: BytesIO extends BufferedIOBase instead of RawIOBase. Read + # and peek operations may return more bytes than requested. + super().__init__(raw, buffer_size) # type: ignore + def close(self) -> None: + self._closed = True + @property + def closed(self) -> bool: + return self.closed + def _open(self, **kwargs): if self._mode == OpenMode.OPEN_WRITE: return self._buf elif self._mode == OpenMode.OPEN_READ: - # NOTE: BytesIO extends BufferedIOBase instead of RawIOBase - br = io.BufferedReader(self._buf) # type: ignore - #br = io.BufferedReader(io.BytesIO(self._buf.getbuffer())) + br = MemoryIO.BytesIOReader(self._buf) br.seek(0) return br else: @@ -377,8 +393,6 @@ def _seek(self, offset, whence): def _close(self, **kwargs): if self._mode == OpenMode.OPEN_READ: self._desc.close() - else: - logger.debug("closing") # If we're writing we don't close the descriptor because it's our # self._buf, which won't be readable afterwards diff --git a/daliuge-engine/test/test_drop.py b/daliuge-engine/test/test_drop.py index 10dc09ba0..8819c6842 100644 --- a/daliuge-engine/test/test_drop.py +++ b/daliuge-engine/test/test_drop.py @@ -52,7 +52,7 @@ ) from dlg.droputils import DROPWaiterCtx from dlg.exceptions import InvalidDropException -from dlg.apps.simple import NullBarrierApp, SimpleBranch, SleepAndCopyApp +from dlg.apps.simple import CopyApp, NullBarrierApp, SimpleBranch, SleepAndCopyApp try: from crc32c import crc32c @@ -364,20 +364,19 @@ def run(self): cResExpected = b"we have an a here\nand another one\n" eResExpected = b"and another one\nwe have an a here\n" gResExpected = b"dna rehtona eno\new evah na a ereh\n" + resExpected = [cResExpected, eResExpected, gResExpected] a.write(contents) - with DROPWaiterCtx(self, g): + with DROPWaiterCtx(self, [c, e, g]): a.setCompleted() # Get intermediate and final results and compare actualRes = [] - for i in [c]: - actualRes.append(droputils.allDropContents(i)) - map( - lambda x, y: self.assertEqual(x, y), - [cResExpected, eResExpected, gResExpected], - actualRes, - ) + for drop in [c, e, g]: + actualRes.append(droputils.allDropContents(drop)) + + # Assert + map(self.assertEqual, resExpected, actualRes) def test_errorState(self): a = InMemoryDROP("a", "a") From 5bd1f0ffb27ad8cfd512c2361d3862a0749d1547 Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Tue, 29 Mar 2022 16:54:45 +0800 Subject: [PATCH 06/12] async datadrop --- daliuge-engine/dlg/drop.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/daliuge-engine/dlg/drop.py b/daliuge-engine/dlg/drop.py index 525fed91f..f332e0c8c 100644 --- a/daliuge-engine/dlg/drop.py +++ b/daliuge-engine/dlg/drop.py @@ -45,7 +45,7 @@ import sys import inspect import binascii -from typing import List, Optional, Union +from typing import AsyncIterable, Dict, List, Optional, Union from overrides import overrides import numpy as np @@ -314,7 +314,7 @@ def __init__(self, oid, uid, **kwargs): # The DataIO instance we use in our write method. It's initialized to # None because it's lazily initialized in the write method, since data # might be written externally and not through this DROP - self._wio = None + self._wio: Optional[DataIO] = None # DataIO objects used for reading. # Instead of passing file objects or more complex data types in our @@ -322,7 +322,7 @@ def __init__(self, oid, uid, **kwargs): # handle file types and other classes (like StringIO) well, but also # because it requires less transport. # TODO: Make these threadsafe, no lock around them yet - self._rios = {} + self._rios: Dict[int, DataIO] = {} # The execution mode. # When set to DROP (the default) the graph execution will be driven by @@ -1251,8 +1251,8 @@ def open(self, **kwargs): ) ) - io = self._getIO() logger.debug("Opening drop %s" % (self.oid)) + io = self._getIO() io.open(OpenMode.OPEN_READ, **kwargs) # Save the IO object in the dictionary and return its descriptor instead @@ -1279,8 +1279,8 @@ def close(self, descriptor, **kwargs): # Decrement counter and then actually close self.decrRefCount() - io = self._rios.pop(descriptor) - io.close(**kwargs) + bio = self._rios.pop(descriptor) + bio.close(**kwargs) def _closeWriters(self): """ @@ -1303,6 +1303,11 @@ def read(self, descriptor, count=4096, **kwargs): io = self._rios[descriptor] return io.read(count, **kwargs) + async def readStream(self, descriptor, **kwargs) -> AsyncIterable: + self._checkStateAndDescriptor(descriptor) + io = self._rios[descriptor] + return await io.readStream() + def _checkStateAndDescriptor(self, descriptor): if self.status != DROPStates.COMPLETED: raise Exception( @@ -1394,6 +1399,9 @@ def write(self, data: Union[bytes, memoryview], **kwargs): return nbytes + async def writeStream(self, stream: AsyncIterable, **kwargs): + raise NotImplementedError + def _updateChecksum(self, chunk): # see __init__ for the initialization to None if self._checksum is None: From 068e2d41c4c157f63d6e705f627ec2462863bd0b Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Fri, 8 Apr 2022 17:47:55 +0800 Subject: [PATCH 07/12] stream byte chunks --- daliuge-engine/dlg/apps/async_apps.py | 127 ++++++++++++++++++++++++++ daliuge-engine/dlg/drop.py | 10 +- daliuge-engine/dlg/io.py | 38 ++++++-- daliuge-engine/test/test_drop.py | 107 +++++++++++++++++++--- 4 files changed, 258 insertions(+), 24 deletions(-) create mode 100644 daliuge-engine/dlg/apps/async_apps.py diff --git a/daliuge-engine/dlg/apps/async_apps.py b/daliuge-engine/dlg/apps/async_apps.py new file mode 100644 index 000000000..e6409e574 --- /dev/null +++ b/daliuge-engine/dlg/apps/async_apps.py @@ -0,0 +1,127 @@ +# +# ICRAR - International Centre for Radio Astronomy Research +# (c) UWA - The University of Western Australia, 2017 +# Copyright by UWA (in the framework of the ICRAR) +# All rights reserved +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, +# MA 02111-1307 USA +# +"""Applications used as examples, for testing, or in simple situations""" +import asyncio +from numbers import Number +import pickle +import random +from typing import AsyncIterable, List, Optional +import urllib.error +import urllib.request +from overrides import overrides + +import time +import ast +import numpy as np + +from dlg import droputils, utils +from dlg.drop import DataDROP, InputFiredAppDROP, BranchAppDrop, ContainerDROP, NullDROP +from dlg.meta import ( + dlg_float_param, + dlg_string_param, + dlg_bool_param, + dlg_int_param, + dlg_list_param, + dlg_component, + dlg_batch_input, + dlg_batch_output, + dlg_streaming_input +) +from dlg.exceptions import DaliugeException +from dlg.apps.pyfunc import serialize_data, deserialize_data + + +## +# @brief CopyApp +# @details A simple APP that copies its inputs into its outputs. +# All inputs are copied into all outputs in the order they were declared in +# the graph. If an input is a container (e.g. a directory) it copies the +# content recursively. +# @par EAGLE_START +# @param category PythonApp +# @param tag daliuge +# @param[in] cparam/appclass Application Class/dlg.apps.simple.CopyApp/String/readonly/False//False/ +# \~English Application class +# @param[in] cparam/bufsize buffer size/65536/Integer/readwrite/False//False/ +# \~English Application class +# @param[in] cparam/execution_time Execution Time/5/Float/readonly/False//False/ +# \~English Estimated execution time +# @param[in] cparam/num_cpus No. of CPUs/1/Integer/readonly/False//False/ +# \~English Number of cores used +# @param[in] cparam/group_start Group start/False/Boolean/readwrite/False//False/ +# \~English Is this node the start of a group? +# @param[in] cparam/input_error_threshold "Input error rate (%)"/0/Integer/readwrite/False//False/ +# \~English the allowed failure rate of the inputs (in percent), before this component goes to ERROR state and is not executed +# @param[in] cparam/n_tries Number of tries/1/Integer/readwrite/False//False/ +# \~English Specifies the number of times the 'run' method will be executed before finally giving up +# @par EAGLE_END +class AsyncCopyApp(InputFiredAppDROP): + """ + A streaming app drop that copies its inputs into its outputs. + All inputs are copied into all outputs in the order they were declared in + the graph. + """ + + component_meta = dlg_component( + "AsyncCopyApp", + "Async Copy App.", + [dlg_batch_input("binary/*", [])], + [dlg_batch_output("binary/*", [])], + [dlg_streaming_input("binary/*")], + ) + + _bufsize = dlg_int_param("bufsize", 65536) + + def run(self): + assert len(self.inputs) == len(self.outputs) + + # synchronous + #for inputDrop, outputDrop in zip(self.inputs, self.outputs): + # droputils.copyDropContents(inputDrop, outputDrop, bufsize=self._bufsize) + + # asynchronous + asyncio.run(self.copyAll()) + + async def copyAll(self): + tasks = [] + for inputDrop, outputDrop in zip(self.inputs, self.outputs): + tasks.append(asyncio.create_task(AsyncCopyApp.asyncCopyDropContents(inputDrop, outputDrop))) + await asyncio.gather(*tasks) + + @staticmethod + async def sync_to_async(a) -> AsyncIterable: + for i in a: + yield i + + @staticmethod + async def asyncCopyDropContents(inputDrop: DataDROP, outputDrop: DataDROP): + desc = inputDrop.open() + s: AsyncIterable = await inputDrop.readStream(desc) + await outputDrop.writeStream(s) + + @overrides + async def readStream(self, descriptor, **kwargs) -> AsyncIterable: + raise NotImplementedError() + + @overrides + async def writeStream(self, stream: AsyncIterable, **kwargs): + raise NotImplementedError() diff --git a/daliuge-engine/dlg/drop.py b/daliuge-engine/dlg/drop.py index f332e0c8c..4bc62e03e 100644 --- a/daliuge-engine/dlg/drop.py +++ b/daliuge-engine/dlg/drop.py @@ -22,6 +22,8 @@ """ Module containing the core DROP classes. """ +from asyncio import subprocess +import multiprocessing from sqlite3 import OperationalError import string from abc import ABCMeta, abstractmethod, abstractproperty @@ -45,7 +47,7 @@ import sys import inspect import binascii -from typing import AsyncIterable, Dict, List, Optional, Union +from typing import AsyncIterable, Dict, List, Optional, TypeVar, Union from overrides import overrides import numpy as np @@ -1306,7 +1308,7 @@ def read(self, descriptor, count=4096, **kwargs): async def readStream(self, descriptor, **kwargs) -> AsyncIterable: self._checkStateAndDescriptor(descriptor) io = self._rios[descriptor] - return await io.readStream() + return io.readStream() def _checkStateAndDescriptor(self, descriptor): if self.status != DROPStates.COMPLETED: @@ -1400,7 +1402,9 @@ def write(self, data: Union[bytes, memoryview], **kwargs): return nbytes async def writeStream(self, stream: AsyncIterable, **kwargs): - raise NotImplementedError + async for iterator in stream: + async for item in iterator: + self.write(item) def _updateChecksum(self, chunk): # see __init__ for the initialization to None diff --git a/daliuge-engine/dlg/io.py b/daliuge-engine/dlg/io.py index 134ec5d49..b70d6e961 100644 --- a/daliuge-engine/dlg/io.py +++ b/daliuge-engine/dlg/io.py @@ -22,18 +22,18 @@ from abc import abstractmethod, ABCMeta from enum import IntEnum from http.client import HTTPConnection -from multiprocessing.sharedctypes import Value from overrides import overrides +import asyncio import io import logging import os import sys import urllib.parse from abc import abstractmethod, ABCMeta -from typing import Any, AsyncIterable, AsyncIterator, BinaryIO, Optional, Union +from typing import Any, AsyncIterable, AsyncIterator, Awaitable, BinaryIO, Optional, Union -from . import ngaslite -from .apps.plasmaflight import PlasmaFlightClient +import dlg.ngaslite as ngaslite +from dlg.apps.plasmaflight import PlasmaFlightClient import pyarrow import pyarrow.plasma as plasma @@ -52,6 +52,20 @@ class OpenMode(IntEnum): OPEN_READ = 1 +class EmptyIterator(AsyncIterator): + def __init__(self): + pass + async def __anext__(self) -> Awaitable: + raise StopAsyncIteration + +class EmptyAsyncIterable(AsyncIterable): + def __init__(self): + pass + + def __aiter__(self) -> AsyncIterator: + return EmptyIterator() + + class DataIO(): # io.BufferedIOBase, BinaryIO """ A class used to read/write data stored in a particular kind of storage in an @@ -108,8 +122,7 @@ def read(self, count: int, **kwargs) -> bytes: raise ValueError("Reading operation attempted on write-only DataIO object") return self._read(count, **kwargs) - - # TODO: @abstractmethod + @abstractmethod async def writeStream(self, stream: AsyncIterable): """ Writes a stream of byte buffers to the drop buffer(s) that @@ -119,7 +132,7 @@ async def writeStream(self, stream: AsyncIterable): """ raise Exception # TODO: NotImplementedError - # TODO: @abstractmethod + @abstractmethod async def readStream(self) -> AsyncIterable: """ Returns a asynchronous stream typically processed using @@ -127,7 +140,9 @@ async def readStream(self) -> AsyncIterable: iterates when data is buffered, or raises StopAsyncIterator when the stream is complete. """ - raise StopAsyncIteration + # NOTE: yield is required for typing system to detect + # asynciterable instead of coroutine. + return EmptyAsyncIterable() def peek(self, size: int, **kwargs): """ @@ -225,10 +240,12 @@ def _close(self, **kwargs) -> None: def _size(self, **kwargs) -> int: pass + class AwaitOnce(): def __await__(self): yield + class DataIOAsyncIterator(AsyncIterator): """ An async iterator for a drop using a dynamically @@ -238,14 +255,15 @@ def __init__(self, memory_io: DataIO): self._io = memory_io async def __anext__(self): - while not self._io.closed and self._io.tell() != self._io.size(): + #while not self._io.closed and self._io.tell() != self._io.size(): + while self._io.tell() != self._io.size(): if self._io.peek(1): return self._io.read(4096) else: - import asyncio await asyncio.sleep(0.1) raise StopAsyncIteration + class NullIO(DataIO): """ A DataIO that stores no data diff --git a/daliuge-engine/test/test_drop.py b/daliuge-engine/test/test_drop.py index 8819c6842..3ef2b1bc7 100644 --- a/daliuge-engine/test/test_drop.py +++ b/daliuge-engine/test/test_drop.py @@ -23,7 +23,7 @@ import contextlib import io import os, unittest -from typing import Any, AsyncIterable, AsyncIterator, Iterable +from typing import Any, AsyncIterable, AsyncIterator, Iterable, List import random import shutil import sqlite3 @@ -31,6 +31,12 @@ import sys import tempfile import subprocess +import asyncio +import asyncstdlib +import concurrent.futures + +import numpy as np +from dlg.apps.async_apps import AsyncCopyApp import dlg.droputils as droputils from dlg.common.reproducibility.constants import ReproducibilityFlags @@ -143,6 +149,14 @@ def test_stream_write_InMemoryDROP(self): self._test_async_stream_npy_withDropType(InMemoryDROP, 0.025, 0.01) self._test_async_stream_npy_withDropType(InMemoryDROP, 0.01, 0.025) + def test_threaded_stream_write_InMemoryDROP(self): + self._test_threaded_stream_npy_withDropType(InMemoryDROP, 0.025, 0.01) + self._test_threaded_stream_npy_withDropType(InMemoryDROP, 0.01, 0.025) + + def test_process_stream_write_InMemoryDROP(self): + self._test_process_stream_npy_withDropType(InMemoryDROP, 0.025, 0.01) + self._test_process_stream_npy_withDropType(InMemoryDROP, 0.01, 0.025) + @unittest.skipIf(sys.version_info < (3, 8), "Shared memory does nt work < python 3.8") def test_write_SharedMemoryDROP(self): """ @@ -257,21 +271,12 @@ def _test_dynamic_write_withDropType(self, dropType): self.assertEqual(a.checksum, test_crc) self.assertEqual(cChecksum, test_crc) - def _test_async_stream_npy_withDropType(self, dropType, write_delay: float, read_delay: float): + def _test_manual_async_stream_npy_withDropType(self, dropType, write_delay: float, read_delay: float): a: DataDROP = dropType("oid:A", "uid:A", expectedSize=-1, use_staging=True, streamingType=DROPStreamingTypes.SINGLE_STREAM) a.streamingType=DROPStreamingTypes.SINGLE_STREAM - import asyncio - import asyncstdlib - import numpy as np - - async def delay_iterable(iterable, delay): - for i in iterable: - await asyncio.sleep(delay) - yield i - in_arrays = [np.random.rand(10,10,10) for _ in range(0,10)] async def write_read_assert_stream(): @@ -287,6 +292,86 @@ async def write_read_assert_stream(): with DROPWaiterCtx(self, a, 5): asyncio.run(write_read_assert_stream()) + def _test_async_stream_npy_withDropType(self, dropType, write_delay: float, read_delay: float): + a: DataDROP = InMemoryDROP("oid:A", "oid:A") + b = AsyncCopyApp("oid:B", "uid:B", n_effective_inputs=1) + c: DataDROP = dropType("oid:C", "uid:C", + expectedSize=-1, use_staging=True, + streamingType=DROPStreamingTypes.SINGLE_STREAM) + c.streamingType=DROPStreamingTypes.SINGLE_STREAM + + b.addInput(a) + b.addOutput(c) + + in_arrays = [np.random.rand(10,10,10) for _ in range(0,10)] + + async def write_read_assert_stream(): + # NOTE: typically these are performed in parallel on seperate subprocesses + await droputils.save_npy_stream(a, TestDROP.delay_iterable(in_arrays, write_delay)) + #asyncio.create_task(asyncstdlib.list(droputils.load_npy_stream(c, backoff=read_delay))) + + with DROPWaiterCtx(self, (a,b,c), 5): + res = asyncio.run(write_read_assert_stream()) + + assert b.status != DROPStates.ERROR + + #assert b.status == DROPStates.ERROR + #out_arrays = asyncio.run(asyncstdlib.list(droputils.load_npy_stream(c, backoff=read_delay))) + #assert len(in_arrays) == len(out_arrays) + #for in_array, out_array in zip(in_arrays, out_arrays): + # np.testing.assert_array_equal(in_array, out_array) + + def _test_threaded_stream_npy_withDropType(self, dropType, write_delay: float, read_delay: float): + a: DataDROP = dropType("oid:A", "uid:A", + expectedSize=-1, use_staging=True, + streamingType=DROPStreamingTypes.SINGLE_STREAM) + a.streamingType=DROPStreamingTypes.SINGLE_STREAM + + in_arrays = [np.random.rand(10,10,10) for _ in range(0,10)] + + # see https://idolstarastronomer.com/two-futures.html + with concurrent.futures.ThreadPoolExecutor() as executor: + cfutures = [] + cfutures.append(executor.submit(asyncio.run, + droputils.save_npy_stream(a, TestDROP.delay_iterable(in_arrays, write_delay)))) + cfutures.append(executor.submit(asyncio.run, + asyncstdlib.list(droputils.load_npy_stream(a, backoff=read_delay)))) + out_arrays = cfutures[1].result() + assert len(in_arrays) == len(out_arrays) + for in_array, out_array in zip(in_arrays, out_arrays): + np.testing.assert_array_equal(in_array, out_array) + + def _test_process_stream_npy_withDropType(self, dropType, write_delay: float, read_delay: float): + a: DataDROP = dropType("oid:A", "uid:A", + expectedSize=-1, use_staging=True, + streamingType=DROPStreamingTypes.SINGLE_STREAM) + a.streamingType=DROPStreamingTypes.SINGLE_STREAM + + in_arrays = [np.random.rand(10,10,10) for _ in range(0,10)] + + # see https://idolstarastronomer.com/two-futures.html + with concurrent.futures.ProcessPoolExecutor() as executor: + cfutures = [] + cfutures.append(executor.submit(TestDROP.save, a, in_arrays, write_delay)) + cfutures.append(executor.submit(TestDROP.load, a, read_delay)) + out_arrays = cfutures[1].result() + assert len(in_arrays) == len(out_arrays) + for in_array, out_array in zip(in_arrays, out_arrays): + np.testing.assert_array_equal(in_array, out_array) + + @staticmethod + async def delay_iterable(iterable, delay): + for i in iterable: + await asyncio.sleep(delay) + yield i + + @staticmethod + def save(drop: DataDROP, input, delay): + asyncio.run(droputils.save_npy_stream(drop, input)) + + @staticmethod + def load(drop: DataDROP, delay): + asyncio.run(asyncstdlib.list(droputils.load_npy_stream(drop, delay))) def test_no_write_to_file_drop(self): """Check that FileDrops can be *not* written""" From 0de3046b4a542737c49ef0b6775ab89470dfdc47 Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Mon, 11 Apr 2022 11:31:11 +0800 Subject: [PATCH 08/12] data asynciterable tests passing --- daliuge-engine/dlg/apps/simple.py | 2 + .../apps/{async_apps.py => stream_apps.py} | 75 ++++++++++++++----- daliuge-engine/dlg/drop.py | 15 ++-- daliuge-engine/dlg/droputils.py | 3 +- daliuge-engine/dlg/io.py | 17 +++-- daliuge-engine/test/test_drop.py | 38 ++++++---- 6 files changed, 103 insertions(+), 47 deletions(-) rename daliuge-engine/dlg/apps/{async_apps.py => stream_apps.py} (65%) diff --git a/daliuge-engine/dlg/apps/simple.py b/daliuge-engine/dlg/apps/simple.py index fbf6ce627..8cc809769 100644 --- a/daliuge-engine/dlg/apps/simple.py +++ b/daliuge-engine/dlg/apps/simple.py @@ -168,6 +168,8 @@ def run(self): self.copyAll() def copyAll(self): + # for inputDrop, outputDrop in zip(self.inputs, self.outputs): + # droputils.copyDropContents(inputDrop, outputDrop, bufsize=self._bufsize) for inputDrop in self.inputs: self.copyRecursive(inputDrop) diff --git a/daliuge-engine/dlg/apps/async_apps.py b/daliuge-engine/dlg/apps/stream_apps.py similarity index 65% rename from daliuge-engine/dlg/apps/async_apps.py rename to daliuge-engine/dlg/apps/stream_apps.py index e6409e574..91da942cf 100644 --- a/daliuge-engine/dlg/apps/async_apps.py +++ b/daliuge-engine/dlg/apps/stream_apps.py @@ -32,9 +32,12 @@ import time import ast import numpy as np +import logging -from dlg import droputils, utils -from dlg.drop import DataDROP, InputFiredAppDROP, BranchAppDrop, ContainerDROP, NullDROP +import dlg.droputils as droputils +import dlg.utils as utils +from dlg.drop import DataDROP, InMemoryDROP, InputFiredAppDROP, BranchAppDrop, ContainerDROP, NullDROP +from dlg.io import MemoryIO from dlg.meta import ( dlg_float_param, dlg_string_param, @@ -49,10 +52,11 @@ from dlg.exceptions import DaliugeException from dlg.apps.pyfunc import serialize_data, deserialize_data +logger = logging.getLogger(__name__) ## -# @brief CopyApp -# @details A simple APP that copies its inputs into its outputs. +# @brief StreamCopyApp +# @details An App that copies streaming inputs to streaming outputs. # All inputs are copied into all outputs in the order they were declared in # the graph. If an input is a container (e.g. a directory) it copies the # content recursively. @@ -74,7 +78,7 @@ # @param[in] cparam/n_tries Number of tries/1/Integer/readwrite/False//False/ # \~English Specifies the number of times the 'run' method will be executed before finally giving up # @par EAGLE_END -class AsyncCopyApp(InputFiredAppDROP): +class StreamCopyApp(InputFiredAppDROP): """ A streaming app drop that copies its inputs into its outputs. All inputs are copied into all outputs in the order they were declared in @@ -89,37 +93,68 @@ class AsyncCopyApp(InputFiredAppDROP): [dlg_streaming_input("binary/*")], ) - _bufsize = dlg_int_param("bufsize", 65536) + _bufsize: int = dlg_int_param("bufsize", 65536) # type: ignore + @overrides def run(self): assert len(self.inputs) == len(self.outputs) - - # synchronous - #for inputDrop, outputDrop in zip(self.inputs, self.outputs): - # droputils.copyDropContents(inputDrop, outputDrop, bufsize=self._bufsize) - - # asynchronous asyncio.run(self.copyAll()) async def copyAll(self): tasks = [] for inputDrop, outputDrop in zip(self.inputs, self.outputs): - tasks.append(asyncio.create_task(AsyncCopyApp.asyncCopyDropContents(inputDrop, outputDrop))) + tasks.append(asyncio.create_task(StreamCopyApp.asyncCopyDropContents(inputDrop, outputDrop))) await asyncio.gather(*tasks) @staticmethod - async def sync_to_async(a) -> AsyncIterable: - for i in a: - yield i + async def asyncCopyDropContents(inputDrop: DataDROP, outputDrop: DataDROP): + desc = inputDrop.open() + await outputDrop.writeStream(inputDrop.readStream(desc)) + inputDrop.close(desc) + + @overrides + def readStream(self, descriptor, **kwargs) -> AsyncIterable: + raise NotImplementedError() + + @overrides + async def writeStream(self, stream: AsyncIterable, **kwargs): + raise NotImplementedError() + + +## +# @brief StreamAccumulateApp +# @details An app that copies and accumulates a stream into a non-streaming drop +# +class StreamAccumulateApp(InputFiredAppDROP): + component_meta = dlg_component( + "StreamAccumulateApp", + "Stream Accumulate App.", + [dlg_batch_input("binary/*", [])], + [dlg_batch_output("binary/*", [])], + [dlg_streaming_input("binary/*")], + ) + + _bufsize: int = dlg_int_param("bufsize", 65536) # type: ignore + + @overrides + def run(self): + assert len(self.inputs) == len(self.outputs) + asyncio.run(self.copyAll()) + + async def copyAll(self): + tasks = [] + for inputDrop, outputDrop in zip(self.inputs, self.outputs): + tasks.append(asyncio.create_task(StreamCopyApp.asyncCopyDropContents(inputDrop, outputDrop))) + await asyncio.gather(*tasks) @staticmethod async def asyncCopyDropContents(inputDrop: DataDROP, outputDrop: DataDROP): desc = inputDrop.open() - s: AsyncIterable = await inputDrop.readStream(desc) - await outputDrop.writeStream(s) - + await outputDrop.writeStream(inputDrop.readStream(desc)) + inputDrop.close(desc) + @overrides - async def readStream(self, descriptor, **kwargs) -> AsyncIterable: + def readStream(self, descriptor, **kwargs) -> AsyncIterable: raise NotImplementedError() @overrides diff --git a/daliuge-engine/dlg/drop.py b/daliuge-engine/dlg/drop.py index 4bc62e03e..364c2266c 100644 --- a/daliuge-engine/dlg/drop.py +++ b/daliuge-engine/dlg/drop.py @@ -1305,10 +1305,14 @@ def read(self, descriptor, count=4096, **kwargs): io = self._rios[descriptor] return io.read(count, **kwargs) - async def readStream(self, descriptor, **kwargs) -> AsyncIterable: + def readStream(self, descriptor, **kwargs) -> AsyncIterable: + """ + Retreives the async read stream from this drop with behaviour + depending on the streamingType. + """ self._checkStateAndDescriptor(descriptor) - io = self._rios[descriptor] - return io.readStream() + rio = self._rios[descriptor] + return rio.readStream() def _checkStateAndDescriptor(self, descriptor): if self.status != DROPStates.COMPLETED: @@ -1402,9 +1406,8 @@ def write(self, data: Union[bytes, memoryview], **kwargs): return nbytes async def writeStream(self, stream: AsyncIterable, **kwargs): - async for iterator in stream: - async for item in iterator: - self.write(item) + async for item in stream: + self.write(item) def _updateChecksum(self, chunk): # see __init__ for the initialization to None diff --git a/daliuge-engine/dlg/droputils.py b/daliuge-engine/dlg/droputils.py index 1995d1d2f..07d03639e 100644 --- a/daliuge-engine/dlg/droputils.py +++ b/daliuge-engine/dlg/droputils.py @@ -28,6 +28,7 @@ import json import logging import pickle +import asyncio import re import threading import traceback @@ -347,8 +348,6 @@ async def load_npy_stream(drop: DataDROP, allow_pickle=False, backoff=0.01) -> A """ Loads an async stream of numpy ndarrays from a data drop """ - import asyncio - desc = None while desc is None: try: diff --git a/daliuge-engine/dlg/io.py b/daliuge-engine/dlg/io.py index b70d6e961..281adccdb 100644 --- a/daliuge-engine/dlg/io.py +++ b/daliuge-engine/dlg/io.py @@ -133,16 +133,13 @@ async def writeStream(self, stream: AsyncIterable): raise Exception # TODO: NotImplementedError @abstractmethod - async def readStream(self) -> AsyncIterable: + def readStream(self) -> AsyncIterable: """ Returns a asynchronous stream typically processed using `async for` that either yields when no data is available, iterates when data is buffered, or raises StopAsyncIterator when the stream is complete. """ - # NOTE: yield is required for typing system to detect - # asynciterable instead of coroutine. - return EmptyAsyncIterable() def peek(self, size: int, **kwargs): """ @@ -300,6 +297,14 @@ def exists(self) -> bool: def delete(self): pass + @overrides + def readStream(self) -> AsyncIterable: + return EmptyAsyncIterable() + + @overrides + async def writeStream(self, stream: AsyncIterable): + pass + class ErrorIO(DataIO): """ @@ -392,8 +397,8 @@ def _read(self, count=4096, **kwargs) -> bytes: return self._desc.read(count) @overrides - async def readStream(self) -> AsyncIterable: - yield self.__aiter__() + def readStream(self) -> AsyncIterable: + return DataIOAsyncIterator(self) def __aiter__(self) -> AsyncIterator: return DataIOAsyncIterator(self) diff --git a/daliuge-engine/test/test_drop.py b/daliuge-engine/test/test_drop.py index 3ef2b1bc7..64854f391 100644 --- a/daliuge-engine/test/test_drop.py +++ b/daliuge-engine/test/test_drop.py @@ -32,11 +32,13 @@ import tempfile import subprocess import asyncio +import aiostream.stream as stream +import aiostream.pipe as pipe import asyncstdlib import concurrent.futures import numpy as np -from dlg.apps.async_apps import AsyncCopyApp +from dlg.apps.stream_apps import StreamCopyApp import dlg.droputils as droputils from dlg.common.reproducibility.constants import ReproducibilityFlags @@ -145,6 +147,10 @@ def test_dynamic_write_InMemoryDROP(self): """ self._test_dynamic_write_withDropType(InMemoryDROP) + def test_manual_stream_write_InMemoryDROP(self): + self._test_manual_async_stream_npy_withDropType(InMemoryDROP, 0.025, 0.01) + self._test_manual_async_stream_npy_withDropType(InMemoryDROP, 0.01, 0.025) + def test_stream_write_InMemoryDROP(self): self._test_async_stream_npy_withDropType(InMemoryDROP, 0.025, 0.01) self._test_async_stream_npy_withDropType(InMemoryDROP, 0.01, 0.025) @@ -153,6 +159,7 @@ def test_threaded_stream_write_InMemoryDROP(self): self._test_threaded_stream_npy_withDropType(InMemoryDROP, 0.025, 0.01) self._test_threaded_stream_npy_withDropType(InMemoryDROP, 0.01, 0.025) + @unittest.skip("pickling abstract drop not supported") def test_process_stream_write_InMemoryDROP(self): self._test_process_stream_npy_withDropType(InMemoryDROP, 0.025, 0.01) self._test_process_stream_npy_withDropType(InMemoryDROP, 0.01, 0.025) @@ -294,7 +301,7 @@ async def write_read_assert_stream(): def _test_async_stream_npy_withDropType(self, dropType, write_delay: float, read_delay: float): a: DataDROP = InMemoryDROP("oid:A", "oid:A") - b = AsyncCopyApp("oid:B", "uid:B", n_effective_inputs=1) + b = StreamCopyApp("oid:B", "uid:B", n_effective_inputs=1) c: DataDROP = dropType("oid:C", "uid:C", expectedSize=-1, use_staging=True, streamingType=DROPStreamingTypes.SINGLE_STREAM) @@ -308,18 +315,19 @@ def _test_async_stream_npy_withDropType(self, dropType, write_delay: float, read async def write_read_assert_stream(): # NOTE: typically these are performed in parallel on seperate subprocesses await droputils.save_npy_stream(a, TestDROP.delay_iterable(in_arrays, write_delay)) - #asyncio.create_task(asyncstdlib.list(droputils.load_npy_stream(c, backoff=read_delay))) with DROPWaiterCtx(self, (a,b,c), 5): res = asyncio.run(write_read_assert_stream()) - + assert a.status != DROPStates.ERROR assert b.status != DROPStates.ERROR + assert c.status != DROPStates.ERROR + + # NOTE: memoryio stream performs full buffering to allow a deferred read + out_arrays = asyncio.run(asyncstdlib.list(droputils.load_npy_stream(c, backoff=read_delay))) - #assert b.status == DROPStates.ERROR - #out_arrays = asyncio.run(asyncstdlib.list(droputils.load_npy_stream(c, backoff=read_delay))) - #assert len(in_arrays) == len(out_arrays) - #for in_array, out_array in zip(in_arrays, out_arrays): - # np.testing.assert_array_equal(in_array, out_array) + assert len(in_arrays) == len(out_arrays) + for in_array, out_array in zip(in_arrays, out_arrays): + np.testing.assert_array_equal(in_array, out_array) def _test_threaded_stream_npy_withDropType(self, dropType, write_delay: float, read_delay: float): a: DataDROP = dropType("oid:A", "uid:A", @@ -332,10 +340,14 @@ def _test_threaded_stream_npy_withDropType(self, dropType, write_delay: float, r # see https://idolstarastronomer.com/two-futures.html with concurrent.futures.ThreadPoolExecutor() as executor: cfutures = [] - cfutures.append(executor.submit(asyncio.run, - droputils.save_npy_stream(a, TestDROP.delay_iterable(in_arrays, write_delay)))) - cfutures.append(executor.submit(asyncio.run, - asyncstdlib.list(droputils.load_npy_stream(a, backoff=read_delay)))) + in_stream = droputils.save_npy_stream(a, + stream.iterate(in_arrays) + | pipe.delay(write_delay) # type: ignore + | pipe.spaceout(write_delay) # type: ignore + ) + out_stream = asyncstdlib.list(droputils.load_npy_stream(a, backoff=read_delay)) + cfutures.append(executor.submit(asyncio.run, in_stream)) + cfutures.append(executor.submit(asyncio.run, out_stream)) out_arrays = cfutures[1].result() assert len(in_arrays) == len(out_arrays) for in_array, out_array in zip(in_arrays, out_arrays): From a1d9d043b30f2a72e6fbb4a1f47222501fb380fc Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Mon, 11 Apr 2022 17:14:02 +0800 Subject: [PATCH 09/12] add async libs --- daliuge-engine/pip/requirements.txt | 2 ++ daliuge-engine/setup.py | 1 + 2 files changed, 3 insertions(+) diff --git a/daliuge-engine/pip/requirements.txt b/daliuge-engine/pip/requirements.txt index 3a9bbdaef..7277ac6e7 100644 --- a/daliuge-engine/pip/requirements.txt +++ b/daliuge-engine/pip/requirements.txt @@ -1,4 +1,6 @@ # The PIP requirements for daliuge - taken from setup.py +aiostream +asyncstdlib boto3 bottle configobj diff --git a/daliuge-engine/setup.py b/daliuge-engine/setup.py index c11361fdf..4ebcecb9f 100644 --- a/daliuge-engine/setup.py +++ b/daliuge-engine/setup.py @@ -117,6 +117,7 @@ def run(self): # Keep alpha-sorted PLEASE! install_requires = [ "wheel", # need to get wheel first... + "aiostream", "asyncstdlib", "bottle", "configobj", From a5795b217156899c37003a15add6a2a834c43770 Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Fri, 27 May 2022 16:02:13 +0800 Subject: [PATCH 10/12] stream tests passing --- daliuge-engine/dlg/apps/pyfunc.py | 2 +- daliuge-engine/dlg/drop.py | 4 +- daliuge-engine/dlg/io.py | 13 ++++-- daliuge-engine/test/test_drop.py | 78 +++++++++++++++++++------------ 4 files changed, 60 insertions(+), 37 deletions(-) diff --git a/daliuge-engine/dlg/apps/pyfunc.py b/daliuge-engine/dlg/apps/pyfunc.py index 2f5b6a1f9..4a6c92f85 100644 --- a/daliuge-engine/dlg/apps/pyfunc.py +++ b/daliuge-engine/dlg/apps/pyfunc.py @@ -30,7 +30,7 @@ import logging import pickle -from typing import Callable, Optional +from typing import Callable import dill from io import StringIO from contextlib import redirect_stdout diff --git a/daliuge-engine/dlg/drop.py b/daliuge-engine/dlg/drop.py index 364c2266c..cf879ffda 100644 --- a/daliuge-engine/dlg/drop.py +++ b/daliuge-engine/dlg/drop.py @@ -695,8 +695,8 @@ def commit(self): ].merkle_root else: # Fill MerkleTree, add data and set the MerkleRoot Value - self._merkleTree = MerkleTree(self._merkleData.items(), common_hash) - self._merkleRoot = self._merkleTree.merkle_root + self._merkleTree = [] # MerkleTree(self._merkleData.items(), common_hash) + self._merkleRoot = [] # self._merkleTree.merkle_root # Set as committed self._committed = True else: diff --git a/daliuge-engine/dlg/io.py b/daliuge-engine/dlg/io.py index 281adccdb..a4a9bd9ec 100644 --- a/daliuge-engine/dlg/io.py +++ b/daliuge-engine/dlg/io.py @@ -355,9 +355,9 @@ def __init__(self, buf: io.BytesIO, **kwargs): class BytesIOReader(io.BufferedReader): """ - An BinaryIO Reader that wraps a BytesIO object for concurrent - reading and writing. Closing this reader will not close other - readers observing the same BytesIO object. + A BinaryIO Reader that wraps a BytesIO object for concurrent + reading and writing. Closing this reader will not close the BytesIO + object and avoid preventing readers observing the same BytesIO object. """ _closed = False def __init__(self, raw: io.BytesIO, buffer_size: int = 2048): @@ -365,12 +365,14 @@ def __init__(self, raw: io.BytesIO, buffer_size: int = 2048): # NOTE: BytesIO extends BufferedIOBase instead of RawIOBase. Read # and peek operations may return more bytes than requested. super().__init__(raw, buffer_size) # type: ignore - def close(self) -> None: - self._closed = True + @property def closed(self) -> bool: return self.closed + def close(self) -> None: + self._closed = True + def _open(self, **kwargs): if self._mode == OpenMode.OPEN_WRITE: return self._buf @@ -439,6 +441,7 @@ def buffer(self) -> memoryview: # TODO: This may also be an issue return self._buf.getbuffer() + class SharedMemoryIO(DataIO): """ A DataIO class that writes to a shared memory buffer diff --git a/daliuge-engine/test/test_drop.py b/daliuge-engine/test/test_drop.py index 64854f391..208589c69 100644 --- a/daliuge-engine/test/test_drop.py +++ b/daliuge-engine/test/test_drop.py @@ -23,7 +23,6 @@ import contextlib import io import os, unittest -from typing import Any, AsyncIterable, AsyncIterator, Iterable, List import random import shutil import sqlite3 @@ -138,31 +137,39 @@ def test_write_FileDROP(self): def test_write_InMemoryDROP(self): """ Test an InMemoryDROP and a simple AppDROP (for checksum calculation) + using multiple writes to an InMemoryDROP of known size. """ self._test_write_withDropType(InMemoryDROP) def test_dynamic_write_InMemoryDROP(self): """ Test an InMemoryDROP and a simple AppDROP (for checksum calculation) + using multiple writes to a dynamically resizing InMemoryDROP. """ self._test_dynamic_write_withDropType(InMemoryDROP) - def test_manual_stream_write_InMemoryDROP(self): - self._test_manual_async_stream_npy_withDropType(InMemoryDROP, 0.025, 0.01) - self._test_manual_async_stream_npy_withDropType(InMemoryDROP, 0.01, 0.025) - def test_stream_write_InMemoryDROP(self): - self._test_async_stream_npy_withDropType(InMemoryDROP, 0.025, 0.01) - self._test_async_stream_npy_withDropType(InMemoryDROP, 0.01, 0.025) + """ + Tests stream writing and reading on a single data drop. + """ + self._test_async_stream_npy_withDropType(InMemoryDROP, write_delay=0.025, read_delay=0.01) + self._test_async_stream_npy_withDropType(InMemoryDROP, write_delay=0.01, read_delay=0.025) + + def test_stream_copy_InMemoryDROP(self): + """ + Tests stream writing and reading through multiple data drops in a graph. + """ + self._test_stream_copy_npy_withDropType(InMemoryDROP, write_delay=0.025, read_delay=0.01) + self._test_stream_copy_npy_withDropType(InMemoryDROP, write_delay=0.01, read_delay=0.025) def test_threaded_stream_write_InMemoryDROP(self): - self._test_threaded_stream_npy_withDropType(InMemoryDROP, 0.025, 0.01) - self._test_threaded_stream_npy_withDropType(InMemoryDROP, 0.01, 0.025) + self._test_threaded_stream_npy_withDropType(InMemoryDROP, write_delay=0.025, read_delay=0.01) + self._test_threaded_stream_npy_withDropType(InMemoryDROP, write_delay=0.01, read_delay=0.025) @unittest.skip("pickling abstract drop not supported") def test_process_stream_write_InMemoryDROP(self): - self._test_process_stream_npy_withDropType(InMemoryDROP, 0.025, 0.01) - self._test_process_stream_npy_withDropType(InMemoryDROP, 0.01, 0.025) + self._test_process_stream_npy_withDropType(InMemoryDROP, write_delay=0.025, read_delay=0.01) + self._test_process_stream_npy_withDropType(InMemoryDROP, write_delay=0.01, read_delay=0.025) @unittest.skipIf(sys.version_info < (3, 8), "Shared memory does nt work < python 3.8") def test_write_SharedMemoryDROP(self): @@ -184,49 +191,57 @@ def test_write_plasmaDROP(self): """ Test an PlasmaDrop and a simple AppDROP (for checksum calculation) """ + store = None try: store = subprocess.Popen( ["plasma_store", "-m", "100000000", "-s", "/tmp/plasma"] ) self._test_write_withDropType(PlasmaDROP) finally: - store.terminate() + if store: + store.terminate() def test_dynamic_write_plasmaDROP(self): """ Test an PlasmaDrop and a simple AppDROP (for checksum calculation) """ + store = None try: store = subprocess.Popen( ["plasma_store", "-m", "100000000", "-s", "/tmp/plasma"] ) self._test_dynamic_write_withDropType(PlasmaDROP) finally: - store.terminate() + if store: + store.terminate() def test_write_plasmaFlightDROP(self): """ Test an PlasmaDrop and a simple AppDROP (for checksum calculation) """ + store = None try: store = subprocess.Popen( ["plasma_store", "-m", "100000000", "-s", "/tmp/plasma"] ) self._test_write_withDropType(PlasmaFlightDROP) finally: - store.terminate() + if store: + store.terminate() def test_dynamic_write_plasmaFlightDROP(self): """ Test an PlasmaDrop and a simple AppDROP (for checksum calculation) """ + store = None try: store = subprocess.Popen( ["plasma_store", "-m", "100000000", "-s", "/tmp/plasma"] ) self._test_dynamic_write_withDropType(PlasmaFlightDROP) finally: - store.terminate() + if store: + store.terminate() def _test_write_withDropType(self, dropType): """ @@ -278,7 +293,10 @@ def _test_dynamic_write_withDropType(self, dropType): self.assertEqual(a.checksum, test_crc) self.assertEqual(cChecksum, test_crc) - def _test_manual_async_stream_npy_withDropType(self, dropType, write_delay: float, read_delay: float): + def _test_async_stream_npy_withDropType(self, dropType, write_delay: float, read_delay: float): + """ + Tests only a droptype for async saving and loading to npy stream format. + """ a: DataDROP = dropType("oid:A", "uid:A", expectedSize=-1, use_staging=True, streamingType=DROPStreamingTypes.SINGLE_STREAM) @@ -289,7 +307,11 @@ def _test_manual_async_stream_npy_withDropType(self, dropType, write_delay: floa async def write_read_assert_stream(): # NOTE: typically these are performed in parallel on seperate subprocesses _, out_arrays = await asyncio.gather( - asyncio.create_task(droputils.save_npy_stream(a, delay_iterable(in_arrays, write_delay))), + asyncio.create_task(droputils.save_npy_stream(a, + stream.iterate(in_arrays) + | pipe.delay(write_delay) + | pipe.spaceout(write_delay) + )), asyncio.create_task(asyncstdlib.list(droputils.load_npy_stream(a, backoff=read_delay))) ) assert len(in_arrays) == len(out_arrays) @@ -299,7 +321,10 @@ async def write_read_assert_stream(): with DROPWaiterCtx(self, a, 5): asyncio.run(write_read_assert_stream()) - def _test_async_stream_npy_withDropType(self, dropType, write_delay: float, read_delay: float): + def _test_stream_copy_npy_withDropType(self, dropType, write_delay: float, read_delay: float): + """ + Tests stream copy app with a fully deferred stream read after completion. + """ a: DataDROP = InMemoryDROP("oid:A", "oid:A") b = StreamCopyApp("oid:B", "uid:B", n_effective_inputs=1) c: DataDROP = dropType("oid:C", "uid:C", @@ -312,12 +337,13 @@ def _test_async_stream_npy_withDropType(self, dropType, write_delay: float, read in_arrays = [np.random.rand(10,10,10) for _ in range(0,10)] - async def write_read_assert_stream(): - # NOTE: typically these are performed in parallel on seperate subprocesses - await droputils.save_npy_stream(a, TestDROP.delay_iterable(in_arrays, write_delay)) - with DROPWaiterCtx(self, (a,b,c), 5): - res = asyncio.run(write_read_assert_stream()) + # NOTE: typically these are performed in parallel on seperate subprocesses + asyncio.run(droputils.save_npy_stream(a, + stream.iterate(in_arrays) + | pipe.delay(write_delay) + | pipe.spaceout(write_delay) + )) assert a.status != DROPStates.ERROR assert b.status != DROPStates.ERROR assert c.status != DROPStates.ERROR @@ -371,12 +397,6 @@ def _test_process_stream_npy_withDropType(self, dropType, write_delay: float, re for in_array, out_array in zip(in_arrays, out_arrays): np.testing.assert_array_equal(in_array, out_array) - @staticmethod - async def delay_iterable(iterable, delay): - for i in iterable: - await asyncio.sleep(delay) - yield i - @staticmethod def save(drop: DataDROP, input, delay): asyncio.run(droputils.save_npy_stream(drop, input)) From 7853603a77d1e0a07660b459c7053dabcde5ee81 Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Fri, 27 May 2022 16:49:01 +0800 Subject: [PATCH 11/12] restore merkle --- daliuge-engine/dlg/drop.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/daliuge-engine/dlg/drop.py b/daliuge-engine/dlg/drop.py index cf879ffda..364c2266c 100644 --- a/daliuge-engine/dlg/drop.py +++ b/daliuge-engine/dlg/drop.py @@ -695,8 +695,8 @@ def commit(self): ].merkle_root else: # Fill MerkleTree, add data and set the MerkleRoot Value - self._merkleTree = [] # MerkleTree(self._merkleData.items(), common_hash) - self._merkleRoot = [] # self._merkleTree.merkle_root + self._merkleTree = MerkleTree(self._merkleData.items(), common_hash) + self._merkleRoot = self._merkleTree.merkle_root # Set as committed self._committed = True else: From dc2982e266aadbf666506ddce52b3001dde3a3ee Mon Sep 17 00:00:00 2001 From: Callan Gray Date: Fri, 27 May 2022 16:59:05 +0800 Subject: [PATCH 12/12] deprecated function support --- daliuge-engine/dlg/droputils.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/daliuge-engine/dlg/droputils.py b/daliuge-engine/dlg/droputils.py index 07d03639e..69c88c9bf 100644 --- a/daliuge-engine/dlg/droputils.py +++ b/daliuge-engine/dlg/droputils.py @@ -330,6 +330,10 @@ def load_npy(drop: DataDROP, allow_pickle=False) -> np.ndarray: return res +def load_numpy(drop: DataDROP) -> np.ndarray: + return load_npy(drop) + + async def save_npy_stream(drop: DataDROP, arrays: AsyncIterable[np.ndarray], allow_pickle=False): """ Saves an async stream of numpy ndarrays to a data drop