diff --git a/daliuge-engine/dlg/apps/dockerapp.py b/daliuge-engine/dlg/apps/dockerapp.py index 24c20d378..bec25b7bd 100644 --- a/daliuge-engine/dlg/apps/dockerapp.py +++ b/daliuge-engine/dlg/apps/dockerapp.py @@ -235,6 +235,9 @@ def initialize(self, **kwargs): self._portMappings = self._getArg(kwargs, "portMappings", "") logger.info(f"portMappings: {self._portMappings}") + self._shmSize = self._getArg(kwargs, "shmSize", "") + logger.info(f"shmSize: {self._shmSize}") + # Additional volume bindings can be specified for existing files/dirs # on the host system. They are given either as a list or as a # comma-separated string @@ -428,6 +431,7 @@ def run(self): environment=env, working_dir=self.workdir, init=True, + shm_size=self._shmSize, # auto_remove=self._removeContainer, detach=True, ) diff --git a/daliuge-engine/dlg/drop.py b/daliuge-engine/dlg/drop.py index 5e9640178..4f32f3a30 100644 --- a/daliuge-engine/dlg/drop.py +++ b/daliuge-engine/dlg/drop.py @@ -43,6 +43,7 @@ import sys import inspect import binascii +from typing import Union import numpy as np @@ -58,6 +59,7 @@ from .event import EventFirer from .exceptions import InvalidDropException, InvalidRelationshipException from .io import ( + DataIO, OpenMode, FileIO, MemoryIO, @@ -530,7 +532,7 @@ def isBeingRead(self): return self._refCount > 0 @track_current_drop - def write(self, data, **kwargs): + def write(self, data: Union[bytes, memoryview], **kwargs): """ Writes the given `data` into this DROP. This method is only meant to be called while the DROP is in INITIALIZED or WRITING state; @@ -601,7 +603,7 @@ def write(self, data, **kwargs): return nbytes @abstractmethod - def getIO(self): + def getIO(self) -> DataIO: """ Returns an instance of one of the `dlg.io.DataIO` instances that handles the data contents of this DROP. @@ -2127,6 +2129,7 @@ class PlasmaDROP(AbstractDROP): object_id = dlg_string_param("object_id", None) plasma_path = dlg_string_param("plasma_path", "/tmp/plasma") + use_staging = dlg_bool_param("use_staging", False) def initialize(self, **kwargs): object_id = self.uid @@ -2136,7 +2139,10 @@ def initialize(self, **kwargs): self.object_id = object_id def getIO(self): - return PlasmaIO(plasma.ObjectID(self.object_id), self.plasma_path) + return PlasmaIO(plasma.ObjectID(self.object_id), + self.plasma_path, + expected_size=self._expectedSize, + use_staging=self.use_staging) @property def dataURL(self): diff --git a/daliuge-engine/dlg/io.py b/daliuge-engine/dlg/io.py index dc7851163..329303dc4 100644 --- a/daliuge-engine/dlg/io.py +++ b/daliuge-engine/dlg/io.py @@ -78,7 +78,7 @@ def open(self, mode, **kwargs): self._mode = mode self._desc = self._open(**kwargs) - def write(self, data, **kwargs): + def write(self, data, **kwargs) -> int: """ Writes `data` into the storage """ @@ -148,8 +148,8 @@ def _read(self, count, **kwargs): pass @abstractmethod - def _write(self, data, **kwargs): - pass + def _write(self, data, **kwargs) -> int: + return 0 @abstractmethod def _close(self, **kwargs): @@ -611,20 +611,41 @@ def IOForURL(url): class PlasmaIO(DataIO): - def __init__(self, object_id, plasma_path="/tmp/plasma"): + """ + A shared-memory IO reader/writer implemented using plasma store + memory buffers. Note: not compatible with PlasmaClient put()/get() + which performs data pickling before writing. + """ + _desc: plasma.PlasmaClient + + def __init__(self, object_id: plasma.ObjectID, plasma_path="/tmp/plasma", expected_size:Optional[int]=None, use_staging=False): + """Initializer + Args: + object_id (plasma.ObjectID): 20 bytes unique object id + plasma_path (str, optional): The socket file path visible to all shared processes. Defaults to "/tmp/plasma". + expected_size (Optional[int], optional) Total size of data to allocate to buffer if known. Defaults to None. + use_staging (bool, optional): Whether to stream first to a resizable staging buffer. Defaults to False. + """ super(PlasmaIO, self).__init__() self._plasma_path = plasma_path self._object_id = object_id self._reader = None self._writer = None + self._expected_size = expected_size if expected_size > 0 else None + self.use_staging = use_staging def _open(self, **kwargs): return plasma.connect(self._plasma_path) def _close(self, **kwargs): if self._writer: - self._desc.put_raw_buffer(self._writer.getvalue(), self._object_id) - self._writer.close() + if self.use_staging: + self._desc.put_raw_buffer(self._writer.getbuffer(), self._object_id) + self._writer.close() + else: + self._desc.seal(self._object_id) + print(self._desc.list()) + self._writer.close() if self._reader: self._reader.close() @@ -634,10 +655,33 @@ def _read(self, count, **kwargs): self._reader = pyarrow.BufferReader(data) return self._reader.read1(count) - def _write(self, data, **kwargs): - if not self._writer: - # use client.create and FixedSizeBufferWriter - self._writer = pyarrow.BufferOutputStream() + def _write(self, data: Union[memoryview, bytes, bytearray, pyarrow.Buffer], **kwargs): + """ + Writes data into the PlasmaIO reserved buffer. + If use_staging is False and expected_size is None, only a single write may occur. + If use_staging is False and expected_size is > 0, multiple writes up to expected size may occur. + If use_staging is True, any number of writes may occur with small performance penalty. + """ + + # NOTE: data must be a collection of bytes for len to represent the buffer bytesize + assert isinstance(data, Union[memoryview, bytes, bytearray, pyarrow.Buffer].__args__) + databytes = data.nbytes if isinstance(data, memoryview) else len(data) + + if self.use_staging: + if not self._writer: + # write into a resizable staging buffer + self._writer = io.BytesIO() + else: + if not self._writer: + # write directly into fixed size plasma buffer + self._bufferbytes = self._expected_size if self._expected_size is not None else databytes + plasma_buffer = self._desc.create(self._object_id, self._bufferbytes) + self._writer = pyarrow.FixedSizeBufferWriter(plasma_buffer) + if self._writer.tell() + databytes > self._bufferbytes: + raise Exception("".join([f"attempted to write {self._writer.tell() + databytes} ", + f"bytes to plasma buffer of size {self._bufferbytes}, ", + "consider using staging or expected_size argument"])) + self._writer.write(data) return len(data) @@ -654,6 +698,8 @@ def buffer(self) -> memoryview: class PlasmaFlightIO(DataIO): + _desc: PlasmaFlightClient + def __init__( self, object_id: plasma.ObjectID, @@ -666,7 +712,7 @@ def __init__( self._object_id = object_id self._plasma_path = plasma_path self._flight_path = flight_path - self._size = size + self._nbytes = size self._reader = None self._writer = None @@ -676,11 +722,11 @@ def _open(self, **kwargs): def _close(self, **kwargs): if self._writer: - if self._size == -1: + if self._nbytes == -1: self._desc.put(self._writer.getvalue(), self._object_id) self._writer.close() else: - assert self._size == self._writer.tell() + assert self._nbytes == self._writer.tell() self._desc.seal(self._object_id) if self._reader: self._reader.close() @@ -693,7 +739,7 @@ def _read(self, count, **kwargs): def _write(self, data, **kwargs): if not self._writer: - if self._size == -1: + if self._nbytes == -1: # stream into resizeable buffer logger.warning( "Using dynamically sized Plasma buffer. Performance may be reduced." @@ -701,7 +747,7 @@ def _write(self, data, **kwargs): self._writer = pyarrow.BufferOutputStream() else: # optimally write directly to fixed size plasma buffer - self._buffer = self._desc.create(self._object_id, self._size) + self._buffer = self._desc.create(self._object_id, self._nbytes) self._writer = pyarrow.FixedSizeBufferWriter(self._buffer) self._writer.write(data) return len(data) diff --git a/daliuge-engine/test/test_drop.py b/daliuge-engine/test/test_drop.py index 6ee3780da..6a07b77c4 100644 --- a/daliuge-engine/test/test_drop.py +++ b/daliuge-engine/test/test_drop.py @@ -224,7 +224,8 @@ def _test_dynamic_write_withDropType(self, dropType): without an expected drop size (for app compatibility and not recommended in production) """ - a = dropType("oid:A", "uid:A", expectedSize=-1) + # NOTE: use_staging required for multiple writes to plasma drops + a = dropType("oid:A", "uid:A", expectedSize=-1, use_staging=True) b = SumupContainerChecksum("oid:B", "uid:B") c = InMemoryDROP("oid:C", "uid:C") b.addInput(a) diff --git a/daliuge-translator/dlg/dropmake/pg_generator.py b/daliuge-translator/dlg/dropmake/pg_generator.py index 45e7c3adc..18f6c1ec1 100644 --- a/daliuge-translator/dlg/dropmake/pg_generator.py +++ b/daliuge-translator/dlg/dropmake/pg_generator.py @@ -747,8 +747,9 @@ def _create_test_drop_spec(self, oid, rank, kwargs) -> dropdict: kwargs["removeContainer"] = self.str_to_bool( str(self.jd.get("removeContainer", "1")) ) - kwargs["portMappings"] = str(self.jd.get("portMappings", "")) kwargs["additionalBindings"] = str(self.jd.get("additionalBindings", "")) + kwargs["portMappings"] = str(self.jd.get("portMappings", "")) + kwargs["shmSize"] = str(self.jd.get("shmSize","")) drop_spec.update(kwargs) elif drop_type == Categories.GROUP_BY: