Skip to content

Commit

Permalink
stream tests passing
Browse files Browse the repository at this point in the history
  • Loading branch information
calgray committed May 27, 2022
1 parent a1d9d04 commit a5795b2
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 37 deletions.
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/apps/pyfunc.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import logging
import pickle

from typing import Callable, Optional
from typing import Callable
import dill
from io import StringIO
from contextlib import redirect_stdout
Expand Down
4 changes: 2 additions & 2 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,8 +695,8 @@ def commit(self):
].merkle_root
else:
# Fill MerkleTree, add data and set the MerkleRoot Value
self._merkleTree = MerkleTree(self._merkleData.items(), common_hash)
self._merkleRoot = self._merkleTree.merkle_root
self._merkleTree = [] # MerkleTree(self._merkleData.items(), common_hash)
self._merkleRoot = [] # self._merkleTree.merkle_root
# Set as committed
self._committed = True
else:
Expand Down
13 changes: 8 additions & 5 deletions daliuge-engine/dlg/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,22 +355,24 @@ def __init__(self, buf: io.BytesIO, **kwargs):

class BytesIOReader(io.BufferedReader):
"""
An BinaryIO Reader that wraps a BytesIO object for concurrent
reading and writing. Closing this reader will not close other
readers observing the same BytesIO object.
A BinaryIO Reader that wraps a BytesIO object for concurrent
reading and writing. Closing this reader will not close the BytesIO
object and avoid preventing readers observing the same BytesIO object.
"""
_closed = False
def __init__(self, raw: io.BytesIO, buffer_size: int = 2048):
assert isinstance(raw, io.BytesIO)
# NOTE: BytesIO extends BufferedIOBase instead of RawIOBase. Read
# and peek operations may return more bytes than requested.
super().__init__(raw, buffer_size) # type: ignore
def close(self) -> None:
self._closed = True

@property
def closed(self) -> bool:
return self.closed

def close(self) -> None:
self._closed = True

def _open(self, **kwargs):
if self._mode == OpenMode.OPEN_WRITE:
return self._buf
Expand Down Expand Up @@ -439,6 +441,7 @@ def buffer(self) -> memoryview:
# TODO: This may also be an issue
return self._buf.getbuffer()


class SharedMemoryIO(DataIO):
"""
A DataIO class that writes to a shared memory buffer
Expand Down
78 changes: 49 additions & 29 deletions daliuge-engine/test/test_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import contextlib
import io
import os, unittest
from typing import Any, AsyncIterable, AsyncIterator, Iterable, List
import random
import shutil
import sqlite3
Expand Down Expand Up @@ -138,31 +137,39 @@ def test_write_FileDROP(self):
def test_write_InMemoryDROP(self):
"""
Test an InMemoryDROP and a simple AppDROP (for checksum calculation)
using multiple writes to an InMemoryDROP of known size.
"""
self._test_write_withDropType(InMemoryDROP)

def test_dynamic_write_InMemoryDROP(self):
"""
Test an InMemoryDROP and a simple AppDROP (for checksum calculation)
using multiple writes to a dynamically resizing InMemoryDROP.
"""
self._test_dynamic_write_withDropType(InMemoryDROP)

def test_manual_stream_write_InMemoryDROP(self):
self._test_manual_async_stream_npy_withDropType(InMemoryDROP, 0.025, 0.01)
self._test_manual_async_stream_npy_withDropType(InMemoryDROP, 0.01, 0.025)

def test_stream_write_InMemoryDROP(self):
self._test_async_stream_npy_withDropType(InMemoryDROP, 0.025, 0.01)
self._test_async_stream_npy_withDropType(InMemoryDROP, 0.01, 0.025)
"""
Tests stream writing and reading on a single data drop.
"""
self._test_async_stream_npy_withDropType(InMemoryDROP, write_delay=0.025, read_delay=0.01)
self._test_async_stream_npy_withDropType(InMemoryDROP, write_delay=0.01, read_delay=0.025)

def test_stream_copy_InMemoryDROP(self):
"""
Tests stream writing and reading through multiple data drops in a graph.
"""
self._test_stream_copy_npy_withDropType(InMemoryDROP, write_delay=0.025, read_delay=0.01)
self._test_stream_copy_npy_withDropType(InMemoryDROP, write_delay=0.01, read_delay=0.025)

def test_threaded_stream_write_InMemoryDROP(self):
self._test_threaded_stream_npy_withDropType(InMemoryDROP, 0.025, 0.01)
self._test_threaded_stream_npy_withDropType(InMemoryDROP, 0.01, 0.025)
self._test_threaded_stream_npy_withDropType(InMemoryDROP, write_delay=0.025, read_delay=0.01)
self._test_threaded_stream_npy_withDropType(InMemoryDROP, write_delay=0.01, read_delay=0.025)

@unittest.skip("pickling abstract drop not supported")
def test_process_stream_write_InMemoryDROP(self):
self._test_process_stream_npy_withDropType(InMemoryDROP, 0.025, 0.01)
self._test_process_stream_npy_withDropType(InMemoryDROP, 0.01, 0.025)
self._test_process_stream_npy_withDropType(InMemoryDROP, write_delay=0.025, read_delay=0.01)
self._test_process_stream_npy_withDropType(InMemoryDROP, write_delay=0.01, read_delay=0.025)

@unittest.skipIf(sys.version_info < (3, 8), "Shared memory does nt work < python 3.8")
def test_write_SharedMemoryDROP(self):
Expand All @@ -184,49 +191,57 @@ def test_write_plasmaDROP(self):
"""
Test an PlasmaDrop and a simple AppDROP (for checksum calculation)
"""
store = None
try:
store = subprocess.Popen(
["plasma_store", "-m", "100000000", "-s", "/tmp/plasma"]
)
self._test_write_withDropType(PlasmaDROP)
finally:
store.terminate()
if store:
store.terminate()

def test_dynamic_write_plasmaDROP(self):
"""
Test an PlasmaDrop and a simple AppDROP (for checksum calculation)
"""
store = None
try:
store = subprocess.Popen(
["plasma_store", "-m", "100000000", "-s", "/tmp/plasma"]
)
self._test_dynamic_write_withDropType(PlasmaDROP)
finally:
store.terminate()
if store:
store.terminate()

def test_write_plasmaFlightDROP(self):
"""
Test an PlasmaDrop and a simple AppDROP (for checksum calculation)
"""
store = None
try:
store = subprocess.Popen(
["plasma_store", "-m", "100000000", "-s", "/tmp/plasma"]
)
self._test_write_withDropType(PlasmaFlightDROP)
finally:
store.terminate()
if store:
store.terminate()

def test_dynamic_write_plasmaFlightDROP(self):
"""
Test an PlasmaDrop and a simple AppDROP (for checksum calculation)
"""
store = None
try:
store = subprocess.Popen(
["plasma_store", "-m", "100000000", "-s", "/tmp/plasma"]
)
self._test_dynamic_write_withDropType(PlasmaFlightDROP)
finally:
store.terminate()
if store:
store.terminate()

def _test_write_withDropType(self, dropType):
"""
Expand Down Expand Up @@ -278,7 +293,10 @@ def _test_dynamic_write_withDropType(self, dropType):
self.assertEqual(a.checksum, test_crc)
self.assertEqual(cChecksum, test_crc)

def _test_manual_async_stream_npy_withDropType(self, dropType, write_delay: float, read_delay: float):
def _test_async_stream_npy_withDropType(self, dropType, write_delay: float, read_delay: float):
"""
Tests only a droptype for async saving and loading to npy stream format.
"""
a: DataDROP = dropType("oid:A", "uid:A",
expectedSize=-1, use_staging=True,
streamingType=DROPStreamingTypes.SINGLE_STREAM)
Expand All @@ -289,7 +307,11 @@ def _test_manual_async_stream_npy_withDropType(self, dropType, write_delay: floa
async def write_read_assert_stream():
# NOTE: typically these are performed in parallel on seperate subprocesses
_, out_arrays = await asyncio.gather(
asyncio.create_task(droputils.save_npy_stream(a, delay_iterable(in_arrays, write_delay))),
asyncio.create_task(droputils.save_npy_stream(a,
stream.iterate(in_arrays)
| pipe.delay(write_delay)
| pipe.spaceout(write_delay)
)),
asyncio.create_task(asyncstdlib.list(droputils.load_npy_stream(a, backoff=read_delay)))
)
assert len(in_arrays) == len(out_arrays)
Expand All @@ -299,7 +321,10 @@ async def write_read_assert_stream():
with DROPWaiterCtx(self, a, 5):
asyncio.run(write_read_assert_stream())

def _test_async_stream_npy_withDropType(self, dropType, write_delay: float, read_delay: float):
def _test_stream_copy_npy_withDropType(self, dropType, write_delay: float, read_delay: float):
"""
Tests stream copy app with a fully deferred stream read after completion.
"""
a: DataDROP = InMemoryDROP("oid:A", "oid:A")
b = StreamCopyApp("oid:B", "uid:B", n_effective_inputs=1)
c: DataDROP = dropType("oid:C", "uid:C",
Expand All @@ -312,12 +337,13 @@ def _test_async_stream_npy_withDropType(self, dropType, write_delay: float, read

in_arrays = [np.random.rand(10,10,10) for _ in range(0,10)]

async def write_read_assert_stream():
# NOTE: typically these are performed in parallel on seperate subprocesses
await droputils.save_npy_stream(a, TestDROP.delay_iterable(in_arrays, write_delay))

with DROPWaiterCtx(self, (a,b,c), 5):
res = asyncio.run(write_read_assert_stream())
# NOTE: typically these are performed in parallel on seperate subprocesses
asyncio.run(droputils.save_npy_stream(a,
stream.iterate(in_arrays)
| pipe.delay(write_delay)
| pipe.spaceout(write_delay)
))
assert a.status != DROPStates.ERROR
assert b.status != DROPStates.ERROR
assert c.status != DROPStates.ERROR
Expand Down Expand Up @@ -371,12 +397,6 @@ def _test_process_stream_npy_withDropType(self, dropType, write_delay: float, re
for in_array, out_array in zip(in_arrays, out_arrays):
np.testing.assert_array_equal(in_array, out_array)

@staticmethod
async def delay_iterable(iterable, delay):
for i in iterable:
await asyncio.sleep(delay)
yield i

@staticmethod
def save(drop: DataDROP, input, delay):
asyncio.run(droputils.save_npy_stream(drop, input))
Expand Down

0 comments on commit a5795b2

Please sign in to comment.