Skip to content

Commit

Permalink
Merge f73b02a into c18936e
Browse files Browse the repository at this point in the history
  • Loading branch information
calgray committed Jan 11, 2022
2 parents c18936e + f73b02a commit fa99797
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 19 deletions.
4 changes: 4 additions & 0 deletions daliuge-engine/dlg/apps/dockerapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ def initialize(self, **kwargs):
self._portMappings = self._getArg(kwargs, "portMappings", "")
logger.info(f"portMappings: {self._portMappings}")

self._shmSize = self._getArg(kwargs, "shmSize", "")
logger.info(f"shmSize: {self._shmSize}")

# Additional volume bindings can be specified for existing files/dirs
# on the host system. They are given either as a list or as a
# comma-separated string
Expand Down Expand Up @@ -428,6 +431,7 @@ def run(self):
environment=env,
working_dir=self.workdir,
init=True,
shm_size=self._shmSize,
# auto_remove=self._removeContainer,
detach=True,
)
Expand Down
8 changes: 5 additions & 3 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import sys
import inspect
import binascii
from typing import Union

import numpy as np

Expand All @@ -58,6 +59,7 @@
from .event import EventFirer
from .exceptions import InvalidDropException, InvalidRelationshipException
from .io import (
DataIO,
OpenMode,
FileIO,
MemoryIO,
Expand Down Expand Up @@ -530,7 +532,7 @@ def isBeingRead(self):
return self._refCount > 0

@track_current_drop
def write(self, data, **kwargs):
def write(self, data: Union[bytes, memoryview], **kwargs):
"""
Writes the given `data` into this DROP. This method is only meant
to be called while the DROP is in INITIALIZED or WRITING state;
Expand Down Expand Up @@ -601,7 +603,7 @@ def write(self, data, **kwargs):
return nbytes

@abstractmethod
def getIO(self):
def getIO(self) -> DataIO:
"""
Returns an instance of one of the `dlg.io.DataIO` instances that
handles the data contents of this DROP.
Expand Down Expand Up @@ -2100,7 +2102,7 @@ def initialize(self, **kwargs):
self.object_id = object_id

def getIO(self):
return PlasmaIO(plasma.ObjectID(self.object_id), self.plasma_path)
return PlasmaIO(plasma.ObjectID(self.object_id), self.plasma_path, False)

@property
def dataURL(self):
Expand Down
66 changes: 51 additions & 15 deletions daliuge-engine/dlg/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def open(self, mode, **kwargs):
self._mode = mode
self._desc = self._open(**kwargs)

def write(self, data, **kwargs):
def write(self, data, **kwargs) -> int:
"""
Writes `data` into the storage
"""
Expand Down Expand Up @@ -148,8 +148,8 @@ def _read(self, count, **kwargs):
pass

@abstractmethod
def _write(self, data, **kwargs):
pass
def _write(self, data, **kwargs) -> int:
return 0

@abstractmethod
def _close(self, **kwargs):
Expand Down Expand Up @@ -611,20 +611,39 @@ def IOForURL(url):


class PlasmaIO(DataIO):
def __init__(self, object_id, plasma_path="/tmp/plasma"):
"""
A shared-memory IO reader/writer implemented using plasma store
memory buffers. Note: not compatible with plasmaClient.put/get
which performs data pickling.
"""
_desc: plasma.PlasmaClient

def __init__(self, object_id: plasma.ObjectID, plasma_path="/tmp/plasma", use_staging=False):
"""Initializer
Args:
object_id (plasma.ObjectID): 20 bytes unique object id
plasma_path (str, optional): The socket file path visible to all shared processes. Defaults to "/tmp/plasma".
use_staging (bool, optional): Whether to stream first to a resizable staging buffer. Defaults to False.
"""
super(PlasmaIO, self).__init__()
self._plasma_path = plasma_path
self._object_id = object_id
self._reader = None
self._writer = None
self.use_staging = use_staging

def _open(self, **kwargs):
return plasma.connect(self._plasma_path)

def _close(self, **kwargs):
if self._writer:
self._desc.put_raw_buffer(self._writer.getvalue(), self._object_id)
self._writer.close()
if self.use_staging:
self._desc.put_raw_buffer(self._writer.getbuffer(), self._object_id)
self._writer.close()
else:
self._desc.seal(self._object_id)
print(self._desc.list())
self._writer.close()
if self._reader:
self._reader.close()

Expand All @@ -634,10 +653,22 @@ def _read(self, count, **kwargs):
self._reader = pyarrow.BufferReader(data)
return self._reader.read1(count)

def _write(self, data, **kwargs):
if not self._writer:
# use client.create and FixedSizeBufferWriter
self._writer = pyarrow.BufferOutputStream()
def _write(self, data: Union[memoryview, bytes, bytearray, pyarrow.Buffer], **kwargs):
# NOTE: data must be an array of bytes for len to represent the buffer bytesize
assert isinstance(data, Union[memoryview, bytes, bytearray, pyarrow.Buffer].__args__)
nbytes = data.nbytes if isinstance(data, memoryview) else len(data)

if self.use_staging:
if not self._writer:
# write into a resizable staging buffer
self._writer = io.BytesIO()
else:
if not self._writer:
# write directly into fixed size plasma buffer
plasma_buffer = self._desc.create(self._object_id, nbytes)
self._writer = pyarrow.FixedSizeBufferWriter(plasma_buffer)
else:
raise Exception("plasmaIO supports only a single write when not using staging buffer")
self._writer.write(data)
return len(data)

Expand All @@ -654,6 +685,11 @@ def buffer(self) -> memoryview:


class PlasmaFlightIO(DataIO):
"""
A plasma
"""
_desc: PlasmaFlightClient

def __init__(
self,
object_id: plasma.ObjectID,
Expand All @@ -666,7 +702,7 @@ def __init__(
self._object_id = object_id
self._plasma_path = plasma_path
self._flight_path = flight_path
self._size = size
self._nbytes = size
self._reader = None
self._writer = None

Expand All @@ -676,11 +712,11 @@ def _open(self, **kwargs):

def _close(self, **kwargs):
if self._writer:
if self._size == -1:
if self._nbytes == -1:
self._desc.put(self._writer.getvalue(), self._object_id)
self._writer.close()
else:
assert self._size == self._writer.tell()
assert self._nbytes == self._writer.tell()
self._desc.seal(self._object_id)
if self._reader:
self._reader.close()
Expand All @@ -693,15 +729,15 @@ def _read(self, count, **kwargs):

def _write(self, data, **kwargs):
if not self._writer:
if self._size == -1:
if self._nbytes == -1:
# stream into resizeable buffer
logger.warning(
"Using dynamically sized Plasma buffer. Performance may be reduced."
)
self._writer = pyarrow.BufferOutputStream()
else:
# optimally write directly to fixed size plasma buffer
self._buffer = self._desc.create(self._object_id, self._size)
self._buffer = self._desc.create(self._object_id, self._nbytes)
self._writer = pyarrow.FixedSizeBufferWriter(self._buffer)
self._writer.write(data)
return len(data)
Expand Down
3 changes: 2 additions & 1 deletion daliuge-translator/dlg/dropmake/pg_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,8 +747,9 @@ def _create_test_drop_spec(self, oid, rank, kwargs) -> dropdict:
kwargs["removeContainer"] = self.str_to_bool(
str(self.jd.get("removeContainer", "1"))
)
kwargs["portMappings"] = str(self.jd.get("portMappings", ""))
kwargs["additionalBindings"] = str(self.jd.get("additionalBindings", ""))
kwargs["portMappings"] = str(self.jd.get("portMappings", ""))
kwargs["shmSize"] = str(self.jd.get("shmSize",""))
drop_spec.update(kwargs)

elif drop_type == Categories.GROUP_BY:
Expand Down

0 comments on commit fa99797

Please sign in to comment.