Skip to content

Commit

Permalink
Merge pull request #93 from ICRAR/LIU-213
Browse files Browse the repository at this point in the history
LIU-213 docker and plasma shared memory improvements
  • Loading branch information
awicenec committed Jan 13, 2022
2 parents c27a821 + 4b888fb commit cc15b7d
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 20 deletions.
4 changes: 4 additions & 0 deletions daliuge-engine/dlg/apps/dockerapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ def initialize(self, **kwargs):
self._portMappings = self._getArg(kwargs, "portMappings", "")
logger.info(f"portMappings: {self._portMappings}")

self._shmSize = self._getArg(kwargs, "shmSize", "")
logger.info(f"shmSize: {self._shmSize}")

# Additional volume bindings can be specified for existing files/dirs
# on the host system. They are given either as a list or as a
# comma-separated string
Expand Down Expand Up @@ -428,6 +431,7 @@ def run(self):
environment=env,
working_dir=self.workdir,
init=True,
shm_size=self._shmSize,
# auto_remove=self._removeContainer,
detach=True,
)
Expand Down
12 changes: 9 additions & 3 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import sys
import inspect
import binascii
from typing import Union

import numpy as np

Expand All @@ -58,6 +59,7 @@
from .event import EventFirer
from .exceptions import InvalidDropException, InvalidRelationshipException
from .io import (
DataIO,
OpenMode,
FileIO,
MemoryIO,
Expand Down Expand Up @@ -530,7 +532,7 @@ def isBeingRead(self):
return self._refCount > 0

@track_current_drop
def write(self, data, **kwargs):
def write(self, data: Union[bytes, memoryview], **kwargs):
"""
Writes the given `data` into this DROP. This method is only meant
to be called while the DROP is in INITIALIZED or WRITING state;
Expand Down Expand Up @@ -601,7 +603,7 @@ def write(self, data, **kwargs):
return nbytes

@abstractmethod
def getIO(self):
def getIO(self) -> DataIO:
"""
Returns an instance of one of the `dlg.io.DataIO` instances that
handles the data contents of this DROP.
Expand Down Expand Up @@ -2127,6 +2129,7 @@ class PlasmaDROP(AbstractDROP):

object_id = dlg_string_param("object_id", None)
plasma_path = dlg_string_param("plasma_path", "/tmp/plasma")
use_staging = dlg_bool_param("use_staging", False)

def initialize(self, **kwargs):
object_id = self.uid
Expand All @@ -2136,7 +2139,10 @@ def initialize(self, **kwargs):
self.object_id = object_id

def getIO(self):
return PlasmaIO(plasma.ObjectID(self.object_id), self.plasma_path)
return PlasmaIO(plasma.ObjectID(self.object_id),
self.plasma_path,
expected_size=self._expectedSize,
use_staging=self.use_staging)

@property
def dataURL(self):
Expand Down
76 changes: 61 additions & 15 deletions daliuge-engine/dlg/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def open(self, mode, **kwargs):
self._mode = mode
self._desc = self._open(**kwargs)

def write(self, data, **kwargs):
def write(self, data, **kwargs) -> int:
"""
Writes `data` into the storage
"""
Expand Down Expand Up @@ -148,8 +148,8 @@ def _read(self, count, **kwargs):
pass

@abstractmethod
def _write(self, data, **kwargs):
pass
def _write(self, data, **kwargs) -> int:
return 0

@abstractmethod
def _close(self, **kwargs):
Expand Down Expand Up @@ -611,20 +611,41 @@ def IOForURL(url):


class PlasmaIO(DataIO):
def __init__(self, object_id, plasma_path="/tmp/plasma"):
"""
A shared-memory IO reader/writer implemented using plasma store
memory buffers. Note: not compatible with PlasmaClient put()/get()
which performs data pickling before writing.
"""
_desc: plasma.PlasmaClient

def __init__(self, object_id: plasma.ObjectID, plasma_path="/tmp/plasma", expected_size:Optional[int]=None, use_staging=False):
"""Initializer
Args:
object_id (plasma.ObjectID): 20 bytes unique object id
plasma_path (str, optional): The socket file path visible to all shared processes. Defaults to "/tmp/plasma".
expected_size (Optional[int], optional) Total size of data to allocate to buffer if known. Defaults to None.
use_staging (bool, optional): Whether to stream first to a resizable staging buffer. Defaults to False.
"""
super(PlasmaIO, self).__init__()
self._plasma_path = plasma_path
self._object_id = object_id
self._reader = None
self._writer = None
self._expected_size = expected_size if expected_size > 0 else None
self.use_staging = use_staging

def _open(self, **kwargs):
return plasma.connect(self._plasma_path)

def _close(self, **kwargs):
if self._writer:
self._desc.put_raw_buffer(self._writer.getvalue(), self._object_id)
self._writer.close()
if self.use_staging:
self._desc.put_raw_buffer(self._writer.getbuffer(), self._object_id)
self._writer.close()
else:
self._desc.seal(self._object_id)
print(self._desc.list())
self._writer.close()
if self._reader:
self._reader.close()

Expand All @@ -634,10 +655,33 @@ def _read(self, count, **kwargs):
self._reader = pyarrow.BufferReader(data)
return self._reader.read1(count)

def _write(self, data, **kwargs):
if not self._writer:
# use client.create and FixedSizeBufferWriter
self._writer = pyarrow.BufferOutputStream()
def _write(self, data: Union[memoryview, bytes, bytearray, pyarrow.Buffer], **kwargs):
"""
Writes data into the PlasmaIO reserved buffer.
If use_staging is False and expected_size is None, only a single write may occur.
If use_staging is False and expected_size is > 0, multiple writes up to expected size may occur.
If use_staging is True, any number of writes may occur with small performance penalty.
"""

# NOTE: data must be a collection of bytes for len to represent the buffer bytesize
assert isinstance(data, Union[memoryview, bytes, bytearray, pyarrow.Buffer].__args__)
databytes = data.nbytes if isinstance(data, memoryview) else len(data)

if self.use_staging:
if not self._writer:
# write into a resizable staging buffer
self._writer = io.BytesIO()
else:
if not self._writer:
# write directly into fixed size plasma buffer
self._bufferbytes = self._expected_size if self._expected_size is not None else databytes
plasma_buffer = self._desc.create(self._object_id, self._bufferbytes)
self._writer = pyarrow.FixedSizeBufferWriter(plasma_buffer)
if self._writer.tell() + databytes > self._bufferbytes:
raise Exception("".join([f"attempted to write {self._writer.tell() + databytes} ",
f"bytes to plasma buffer of size {self._bufferbytes}, ",
"consider using staging or expected_size argument"]))

self._writer.write(data)
return len(data)

Expand All @@ -654,6 +698,8 @@ def buffer(self) -> memoryview:


class PlasmaFlightIO(DataIO):
_desc: PlasmaFlightClient

def __init__(
self,
object_id: plasma.ObjectID,
Expand All @@ -666,7 +712,7 @@ def __init__(
self._object_id = object_id
self._plasma_path = plasma_path
self._flight_path = flight_path
self._size = size
self._nbytes = size
self._reader = None
self._writer = None

Expand All @@ -676,11 +722,11 @@ def _open(self, **kwargs):

def _close(self, **kwargs):
if self._writer:
if self._size == -1:
if self._nbytes == -1:
self._desc.put(self._writer.getvalue(), self._object_id)
self._writer.close()
else:
assert self._size == self._writer.tell()
assert self._nbytes == self._writer.tell()
self._desc.seal(self._object_id)
if self._reader:
self._reader.close()
Expand All @@ -693,15 +739,15 @@ def _read(self, count, **kwargs):

def _write(self, data, **kwargs):
if not self._writer:
if self._size == -1:
if self._nbytes == -1:
# stream into resizeable buffer
logger.warning(
"Using dynamically sized Plasma buffer. Performance may be reduced."
)
self._writer = pyarrow.BufferOutputStream()
else:
# optimally write directly to fixed size plasma buffer
self._buffer = self._desc.create(self._object_id, self._size)
self._buffer = self._desc.create(self._object_id, self._nbytes)
self._writer = pyarrow.FixedSizeBufferWriter(self._buffer)
self._writer.write(data)
return len(data)
Expand Down
3 changes: 2 additions & 1 deletion daliuge-engine/test/test_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ def _test_dynamic_write_withDropType(self, dropType):
without an expected drop size (for app compatibility and not
recommended in production)
"""
a = dropType("oid:A", "uid:A", expectedSize=-1)
# NOTE: use_staging required for multiple writes to plasma drops
a = dropType("oid:A", "uid:A", expectedSize=-1, use_staging=True)
b = SumupContainerChecksum("oid:B", "uid:B")
c = InMemoryDROP("oid:C", "uid:C")
b.addInput(a)
Expand Down
3 changes: 2 additions & 1 deletion daliuge-translator/dlg/dropmake/pg_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,8 +747,9 @@ def _create_test_drop_spec(self, oid, rank, kwargs) -> dropdict:
kwargs["removeContainer"] = self.str_to_bool(
str(self.jd.get("removeContainer", "1"))
)
kwargs["portMappings"] = str(self.jd.get("portMappings", ""))
kwargs["additionalBindings"] = str(self.jd.get("additionalBindings", ""))
kwargs["portMappings"] = str(self.jd.get("portMappings", ""))
kwargs["shmSize"] = str(self.jd.get("shmSize",""))
drop_spec.update(kwargs)

elif drop_type == Categories.GROUP_BY:
Expand Down

0 comments on commit cc15b7d

Please sign in to comment.