Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIU-213 docker and plasma shared memory improvements #93

Merged
merged 4 commits into from
Jan 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions daliuge-engine/dlg/apps/dockerapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ def initialize(self, **kwargs):
self._portMappings = self._getArg(kwargs, "portMappings", "")
logger.info(f"portMappings: {self._portMappings}")

self._shmSize = self._getArg(kwargs, "shmSize", "")
logger.info(f"shmSize: {self._shmSize}")

# Additional volume bindings can be specified for existing files/dirs
# on the host system. They are given either as a list or as a
# comma-separated string
Expand Down Expand Up @@ -428,6 +431,7 @@ def run(self):
environment=env,
working_dir=self.workdir,
init=True,
shm_size=self._shmSize,
# auto_remove=self._removeContainer,
detach=True,
)
Expand Down
12 changes: 9 additions & 3 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import sys
import inspect
import binascii
from typing import Union

import numpy as np

Expand All @@ -58,6 +59,7 @@
from .event import EventFirer
from .exceptions import InvalidDropException, InvalidRelationshipException
from .io import (
DataIO,
OpenMode,
FileIO,
MemoryIO,
Expand Down Expand Up @@ -524,7 +526,7 @@ def isBeingRead(self):
return self._refCount > 0

@track_current_drop
def write(self, data, **kwargs):
def write(self, data: Union[bytes, memoryview], **kwargs):
"""
Writes the given `data` into this DROP. This method is only meant
to be called while the DROP is in INITIALIZED or WRITING state;
Expand Down Expand Up @@ -595,7 +597,7 @@ def write(self, data, **kwargs):
return nbytes

@abstractmethod
def getIO(self):
def getIO(self) -> DataIO:
"""
Returns an instance of one of the `dlg.io.DataIO` instances that
handles the data contents of this DROP.
Expand Down Expand Up @@ -2083,6 +2085,7 @@ class PlasmaDROP(AbstractDROP):

object_id = dlg_string_param("object_id", None)
plasma_path = dlg_string_param("plasma_path", "/tmp/plasma")
use_staging = dlg_bool_param("use_staging", False)

def initialize(self, **kwargs):
object_id = self.uid
Expand All @@ -2092,7 +2095,10 @@ def initialize(self, **kwargs):
self.object_id = object_id

def getIO(self):
return PlasmaIO(plasma.ObjectID(self.object_id), self.plasma_path)
return PlasmaIO(plasma.ObjectID(self.object_id),
self.plasma_path,
expected_size=self._expectedSize,
use_staging=self.use_staging)

@property
def dataURL(self):
Expand Down
76 changes: 61 additions & 15 deletions daliuge-engine/dlg/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def open(self, mode, **kwargs):
self._mode = mode
self._desc = self._open(**kwargs)

def write(self, data, **kwargs):
def write(self, data, **kwargs) -> int:
"""
Writes `data` into the storage
"""
Expand Down Expand Up @@ -148,8 +148,8 @@ def _read(self, count, **kwargs):
pass

@abstractmethod
def _write(self, data, **kwargs):
pass
def _write(self, data, **kwargs) -> int:
return 0

@abstractmethod
def _close(self, **kwargs):
Expand Down Expand Up @@ -611,20 +611,41 @@ def IOForURL(url):


class PlasmaIO(DataIO):
def __init__(self, object_id, plasma_path="/tmp/plasma"):
"""
A shared-memory IO reader/writer implemented using plasma store
memory buffers. Note: not compatible with PlasmaClient put()/get()
which performs data pickling before writing.
"""
_desc: plasma.PlasmaClient

def __init__(self, object_id: plasma.ObjectID, plasma_path="/tmp/plasma", expected_size:Optional[int]=None, use_staging=False):
"""Initializer
Args:
object_id (plasma.ObjectID): 20 bytes unique object id
plasma_path (str, optional): The socket file path visible to all shared processes. Defaults to "/tmp/plasma".
expected_size (Optional[int], optional) Total size of data to allocate to buffer if known. Defaults to None.
use_staging (bool, optional): Whether to stream first to a resizable staging buffer. Defaults to False.
"""
super(PlasmaIO, self).__init__()
self._plasma_path = plasma_path
self._object_id = object_id
self._reader = None
self._writer = None
self._expected_size = expected_size if expected_size > 0 else None
self.use_staging = use_staging

def _open(self, **kwargs):
return plasma.connect(self._plasma_path)

def _close(self, **kwargs):
if self._writer:
self._desc.put_raw_buffer(self._writer.getvalue(), self._object_id)
self._writer.close()
if self.use_staging:
self._desc.put_raw_buffer(self._writer.getbuffer(), self._object_id)
self._writer.close()
else:
self._desc.seal(self._object_id)
print(self._desc.list())
self._writer.close()
if self._reader:
self._reader.close()

Expand All @@ -634,10 +655,33 @@ def _read(self, count, **kwargs):
self._reader = pyarrow.BufferReader(data)
return self._reader.read1(count)

def _write(self, data, **kwargs):
if not self._writer:
# use client.create and FixedSizeBufferWriter
self._writer = pyarrow.BufferOutputStream()
def _write(self, data: Union[memoryview, bytes, bytearray, pyarrow.Buffer], **kwargs):
"""
Writes data into the PlasmaIO reserved buffer.
If use_staging is False and expected_size is None, only a single write may occur.
If use_staging is False and expected_size is > 0, multiple writes up to expected size may occur.
If use_staging is True, any number of writes may occur with small performance penalty.
"""

# NOTE: data must be a collection of bytes for len to represent the buffer bytesize
assert isinstance(data, Union[memoryview, bytes, bytearray, pyarrow.Buffer].__args__)
databytes = data.nbytes if isinstance(data, memoryview) else len(data)

if self.use_staging:
if not self._writer:
# write into a resizable staging buffer
self._writer = io.BytesIO()
else:
if not self._writer:
# write directly into fixed size plasma buffer
self._bufferbytes = self._expected_size if self._expected_size is not None else databytes
plasma_buffer = self._desc.create(self._object_id, self._bufferbytes)
self._writer = pyarrow.FixedSizeBufferWriter(plasma_buffer)
if self._writer.tell() + databytes > self._bufferbytes:
raise Exception("".join([f"attempted to write {self._writer.tell() + databytes} ",
f"bytes to plasma buffer of size {self._bufferbytes}, ",
"consider using staging or expected_size argument"]))

self._writer.write(data)
return len(data)

Expand All @@ -654,6 +698,8 @@ def buffer(self) -> memoryview:


class PlasmaFlightIO(DataIO):
_desc: PlasmaFlightClient

def __init__(
self,
object_id: plasma.ObjectID,
Expand All @@ -666,7 +712,7 @@ def __init__(
self._object_id = object_id
self._plasma_path = plasma_path
self._flight_path = flight_path
self._size = size
self._nbytes = size
self._reader = None
self._writer = None

Expand All @@ -676,11 +722,11 @@ def _open(self, **kwargs):

def _close(self, **kwargs):
if self._writer:
if self._size == -1:
if self._nbytes == -1:
self._desc.put(self._writer.getvalue(), self._object_id)
self._writer.close()
else:
assert self._size == self._writer.tell()
assert self._nbytes == self._writer.tell()
self._desc.seal(self._object_id)
if self._reader:
self._reader.close()
Expand All @@ -693,15 +739,15 @@ def _read(self, count, **kwargs):

def _write(self, data, **kwargs):
if not self._writer:
if self._size == -1:
if self._nbytes == -1:
# stream into resizeable buffer
logger.warning(
"Using dynamically sized Plasma buffer. Performance may be reduced."
)
self._writer = pyarrow.BufferOutputStream()
else:
# optimally write directly to fixed size plasma buffer
self._buffer = self._desc.create(self._object_id, self._size)
self._buffer = self._desc.create(self._object_id, self._nbytes)
self._writer = pyarrow.FixedSizeBufferWriter(self._buffer)
self._writer.write(data)
return len(data)
Expand Down
3 changes: 2 additions & 1 deletion daliuge-engine/test/test_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ def _test_dynamic_write_withDropType(self, dropType):
without an expected drop size (for app compatibility and not
recommended in production)
"""
a = dropType("oid:A", "uid:A", expectedSize=-1)
# NOTE: use_staging required for multiple writes to plasma drops
a = dropType("oid:A", "uid:A", expectedSize=-1, use_staging=True)
b = SumupContainerChecksum("oid:B", "uid:B")
c = InMemoryDROP("oid:C", "uid:C")
b.addInput(a)
Expand Down
3 changes: 2 additions & 1 deletion daliuge-translator/dlg/dropmake/pg_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,8 +747,9 @@ def _create_test_drop_spec(self, oid, rank, kwargs) -> dropdict:
kwargs["removeContainer"] = self.str_to_bool(
str(self.jd.get("removeContainer", "1"))
)
kwargs["portMappings"] = str(self.jd.get("portMappings", ""))
kwargs["additionalBindings"] = str(self.jd.get("additionalBindings", ""))
kwargs["portMappings"] = str(self.jd.get("portMappings", ""))
kwargs["shmSize"] = str(self.jd.get("shmSize",""))
drop_spec.update(kwargs)

elif drop_type == Categories.GROUP_BY:
Expand Down