Skip to content

Commit

Permalink
fix dynamic plasmadrop tests
Browse files Browse the repository at this point in the history
  • Loading branch information
calgray committed Jan 12, 2022
1 parent f73b02a commit 87b2223
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 8 deletions.
6 changes: 5 additions & 1 deletion daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -2085,6 +2085,7 @@ class PlasmaDROP(AbstractDROP):

object_id = dlg_string_param("object_id", None)
plasma_path = dlg_string_param("plasma_path", "/tmp/plasma")
use_staging = dlg_bool_param("use_staging", False)

def initialize(self, **kwargs):
object_id = self.uid
Expand All @@ -2094,7 +2095,10 @@ def initialize(self, **kwargs):
self.object_id = object_id

def getIO(self):
return PlasmaIO(plasma.ObjectID(self.object_id), self.plasma_path, False)
return PlasmaIO(plasma.ObjectID(self.object_id),
self.plasma_path,
expected_size=self._expectedSize,
use_staging=self.use_staging)

@property
def dataURL(self):
Expand Down
24 changes: 18 additions & 6 deletions daliuge-engine/dlg/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ class PlasmaIO(DataIO):
"""
_desc: plasma.PlasmaClient

def __init__(self, object_id: plasma.ObjectID, plasma_path="/tmp/plasma", use_staging=False):
def __init__(self, object_id: plasma.ObjectID, plasma_path="/tmp/plasma", expected_size:int=-1, use_staging=False):
"""Initializer
Args:
object_id (plasma.ObjectID): 20 bytes unique object id
Expand All @@ -630,6 +630,8 @@ def __init__(self, object_id: plasma.ObjectID, plasma_path="/tmp/plasma", use_st
self._object_id = object_id
self._reader = None
self._writer = None
# TODO: could support multiple writes without staging if size is known
self._expected_size = expected_size if expected_size > 0 else None
self.use_staging = use_staging

def _open(self, **kwargs):
Expand All @@ -654,9 +656,15 @@ def _read(self, count, **kwargs):
return self._reader.read1(count)

def _write(self, data: Union[memoryview, bytes, bytearray, pyarrow.Buffer], **kwargs):
# NOTE: data must be an array of bytes for len to represent the buffer bytesize
"""
Writes data into the PlasmaIO reserved buffer.
If staging is False and expected_size is None, only a single write may occur
If staging is Flase and expected_size is positive, multiple writes
"""

# NOTE: data must be a collection of bytes for len to represent the buffer bytesize
assert isinstance(data, Union[memoryview, bytes, bytearray, pyarrow.Buffer].__args__)
nbytes = data.nbytes if isinstance(data, memoryview) else len(data)
databytes = data.nbytes if isinstance(data, memoryview) else len(data)

if self.use_staging:
if not self._writer:
Expand All @@ -665,10 +673,14 @@ def _write(self, data: Union[memoryview, bytes, bytearray, pyarrow.Buffer], **kw
else:
if not self._writer:
# write directly into fixed size plasma buffer
plasma_buffer = self._desc.create(self._object_id, nbytes)
self._bufferbytes = self._expected_size if self._expected_size is not None else databytes
plasma_buffer = self._desc.create(self._object_id, self._bufferbytes)
self._writer = pyarrow.FixedSizeBufferWriter(plasma_buffer)
else:
raise Exception("plasmaIO supports only a single write when not using staging buffer")
if self._writer.tell() + databytes > self._bufferbytes:
raise Exception("".join([f"attempted to write {self._writer.tell() + databytes} ",
f"bytes to plasma buffer of size {self._bufferbytes}, ",
"consider using staging or expected_size argument"]))

self._writer.write(data)
return len(data)

Expand Down
3 changes: 2 additions & 1 deletion daliuge-engine/test/test_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ def _test_dynamic_write_withDropType(self, dropType):
without an expected drop size (for app compatibility and not
recommended in production)
"""
a = dropType("oid:A", "uid:A", expectedSize=-1)
# NOTE: use_staging required for multiple writes to plasma drops
a = dropType("oid:A", "uid:A", expectedSize=-1, use_staging=True)
b = SumupContainerChecksum("oid:B", "uid:B")
c = InMemoryDROP("oid:C", "uid:C")
b.addInput(a)
Expand Down

0 comments on commit 87b2223

Please sign in to comment.