Skip to content

Commit

Permalink
Merge 8bdd6ad into 322d297
Browse files Browse the repository at this point in the history
  • Loading branch information
calgray committed Aug 16, 2021
2 parents 322d297 + 8bdd6ad commit 2699842
Show file tree
Hide file tree
Showing 11 changed files with 313 additions and 25 deletions.
4 changes: 3 additions & 1 deletion daliuge-common/dlg/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class Categories:
JSON = 'json'
S3 = 'S3'
PLASMA = 'Plasma'
PLASMAFLIGHT = 'PlasmaFlight'

MKN = 'MKN'
SCATTER = 'Scatter'
Expand Down Expand Up @@ -62,7 +63,8 @@ class Categories:
Categories.NGAS,
Categories.NULL,
Categories.JSON,
Categories.PLASMA
Categories.PLASMA,
Categories.PLASMAFLIGHT
}
APP_DROP_TYPES = [
Categories.COMPONENT,
Expand Down
20 changes: 20 additions & 0 deletions daliuge-engine/dlg/apps/dockerapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,11 @@ def initialize(self, **kwargs):
# handle, but for the time being we do it here
self._removeContainer = self._getArg(kwargs, 'removeContainer', True)

# Ports - a comma seperated list of the host port mappings of form:
# "hostport1:containerport1, hostport2:containerport2"
self._portMappings = self._getArg(kwargs, 'portMappings', "")
logger.info(f"portMappings: {self._portMappings}")

# Additional volume bindings can be specified for existing files/dirs
# on the host system. They are given either as a list or as a
# comma-separated string
Expand Down Expand Up @@ -315,6 +320,20 @@ def run(self):
binds = list(set(binds)) # make this a unique list else docker complains
logger.debug("Volume bindings: %r", binds)

portMappings = {} # = {'5005/tcp': 5005, '5006/tcp': 5006}
for mapping in self._portMappings.split(','):
if mapping:
if mapping.find(':') == -1:
host_port = container_port = mapping
else:
host_port, container_port = mapping.split(':')
if host_port not in portMappings:
logger.debug(f"mapping port {host_port} -> {container_port}")
portMappings[host_port] = int(container_port)
else:
raise Exception(f"Duplicate port {host_port} in container port mappings")
logger.debug(f"port mappings: {portMappings}")

# Wait until the DockerApps this application runtime depends on have
# started, and replace their IP placeholders by the real IPs
for waiter in self._waiters:
Expand Down Expand Up @@ -361,6 +380,7 @@ def rm(container):
self._image,
cmd,
volumes=binds,
ports=portMappings,
user=user,
environment=env,
working_dir=self.workdir
Expand Down
2 changes: 1 addition & 1 deletion daliuge-engine/dlg/apps/plasma.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def run(self):

##
# @brief MSPlasmaReader\n
# @details Batch read entire Measurement Set from Plamsa.
# @details Batch read entire Measurement Set from Plasma.
# @par EAGLE_START
# @param category PythonApp
# @param[in] port/plasma_ms_input
Expand Down
95 changes: 95 additions & 0 deletions daliuge-engine/dlg/apps/plasmaflight.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#
# ICRAR - International Centre for Radio Astronomy Research
# (c) UWA - The University of Western Australia, 2015
# Copyright by UWA (in the framework of the ICRAR)
# All rights reserved
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#
import hashlib
from io import BytesIO
import logging
from typing import Optional

import pyarrow
import pyarrow.flight as paf
import pyarrow.plasma as plasma

logger = logging.getLogger(__name__)


class PlasmaFlightClient():
def __init__(self, socket: str, scheme: str = "grpc+tcp", connection_args={}):
"""
Args:
socket (str): The socket of the local plasma store
scheme (str, optional): [description]. Defaults to "grpc+tcp".
connection_args (dict, optional): [description]. Defaults to {}.
"""
self.plasma_client = plasma.connect(socket)
self._scheme = scheme
self._connection_args = connection_args

def list_flights(self, location: str):
flight_client = paf.FlightClient(
f"{self._scheme}://{location}", **self._connection_args)
return flight_client.list_flights()

def get_flight(self, object_id: plasma.ObjectID, location: Optional[str]) -> paf.FlightStreamReader:
descriptor = paf.FlightDescriptor.for_path(object_id.binary().hex().encode('utf-8'))
if location is not None:
logger.debug(f"connecting to {self._scheme}://{location} with descriptor {descriptor}")
flight_client = paf.FlightClient(f"{self._scheme}://{location}", **self._connection_args)
info = flight_client.get_flight_info(descriptor)
for endpoint in info.endpoints:
logger.debug(f"using endpoint locations {endpoint.locations}")
return flight_client.do_get(endpoint.ticket)
else:
raise Exception("Location required")

def put(self, data: memoryview, object_id: plasma.ObjectID):
self.plasma_client.put_raw_buffer(data, object_id)

def get(self, object_id: plasma.ObjectID, owner: Optional[str] = None) -> memoryview:
logger.debug(f"PlasmaFlightClient Get {object_id}")
if self.plasma_client.contains(object_id):
# first check if the local store contains the object
[buf] = self.plasma_client.get_buffers([object_id])
return memoryview(buf)
elif owner is not None:
# fetch from the specified owner
reader = self.get_flight(object_id, owner)
table = reader.read_all()
output = BytesIO(table.column(0)[0].as_py()).getbuffer()
# cache output
self.put(output, object_id)
return output
else:
raise KeyError("ObjectID not found", object_id)

def exists(self, object_id: plasma.ObjectID, owner: Optional[str] = None) -> bool:
# check cache
if self.plasma_client.contains(object_id):
return True
# check remote
if owner is not None:
client = paf.FlightClient(f"{self._scheme}://{owner}", **self._connection_args)
try:
info = client.get_flight_info(paf.FlightDescriptor.for_path(object_id.binary().hex().encode('utf-8')))
return True
except:
return False
return False
34 changes: 32 additions & 2 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
DROPLinkType, DROPPhases, DROPStates, DROPRel
from .event import EventFirer
from .exceptions import InvalidDropException, InvalidRelationshipException
from .io import OpenMode, FileIO, MemoryIO, NgasIO, NgasLiteIO, ErrorIO, NullIO, PlasmaIO
from .io import OpenMode, FileIO, MemoryIO, NgasIO, NgasLiteIO, ErrorIO, NullIO, PlasmaIO, PlasmaFlightIO
from .utils import prepare_sql, createDirIfMissing, isabs, object_tracking
from .meta import dlg_float_param, dlg_int_param, dlg_list_param, \
dlg_string_param, dlg_bool_param, dlg_dict_param
Expand Down Expand Up @@ -460,7 +460,7 @@ def write(self, data, **kwargs):
if self.status not in [DROPStates.INITIALIZED, DROPStates.WRITING]:
raise Exception("No more writing expected")

if not isinstance(data, bytes):
if not isinstance(data, (bytes, memoryview)):
raise Exception("Data type not of binary type: %s", type(data).__name__)

# We lazily initialize our writing IO instance because the data of this
Expand Down Expand Up @@ -1779,6 +1779,7 @@ def execute(self, _send_notifications=True):
self.outputs[1 if self.condition() else 0].skip()
self._notifyAppIsFinished()


class PlasmaDROP(AbstractDROP):
'''
A DROP that points to data stored in a Plasma Store
Expand All @@ -1801,6 +1802,35 @@ def dataURL(self):
return "plasma://%s" % (binascii.hexlify(self.object_id).decode('ascii'))


class PlasmaFlightDROP(AbstractDROP):
'''
A DROP that points to data stored in a Plasma Store
'''
object_id = dlg_string_param('object_id', None)
plasma_path = dlg_string_param('plasma_path', '/tmp/plasma')
flight_path = dlg_string_param('flight_path', None)

def initialize(self, **kwargs):
object_id = self.uid
if len(self.uid) != 20:
object_id = np.random.bytes(20)
if self.object_id is None:
self.object_id = object_id

def getIO(self):
if isinstance(self.object_id, str):
object_id = plasma.ObjectID(self.object_id.encode('ascii'))
elif isinstance(self.object_id, bytes):
object_id = plasma.ObjectID(self.object_id)
else:
raise Exception("Invalid argument " + str(self.object_id) + " expected str, got" + str(type(self.object_id)))
return PlasmaFlightIO(object_id, self.plasma_path, flight_path=self.flight_path, size=self._expectedSize)

@property
def dataURL(self):
return "plasmaflight://%s" % (binascii.hexlify(self.object_id).decode('ascii'))


# Dictionary mapping 1-to-many DROPLinkType constants to the corresponding methods
# used to append a a DROP into a relationship collection of another
# (e.g., one uses `addConsumer` to add a DROPLinkeType.CONSUMER DROP into
Expand Down
11 changes: 6 additions & 5 deletions daliuge-engine/dlg/graph_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,20 @@
from .ddap_protocol import DROPRel, DROPLinkType
from .drop import ContainerDROP, InMemoryDROP, \
FileDROP, NgasDROP, LINKTYPE_NTO1_PROPERTY, \
LINKTYPE_1TON_APPEND_METHOD, NullDROP, PlasmaDROP
LINKTYPE_1TON_APPEND_METHOD, NullDROP, PlasmaDROP, PlasmaFlightDROP
from .exceptions import InvalidGraphException
from .json_drop import JsonDROP
from .common import Categories


STORAGE_TYPES = {
Categories.MEMORY: InMemoryDROP,
Categories.FILE : FileDROP,
Categories.NGAS : NgasDROP,
Categories.NULL : NullDROP,
Categories.JSON : JsonDROP,
Categories.FILE: FileDROP,
Categories.NGAS: NgasDROP,
Categories.NULL: NullDROP,
Categories.JSON: JsonDROP,
Categories.PLASMA: PlasmaDROP,
Categories.PLASMAFLIGHT: PlasmaFlightDROP
}

try:
Expand Down
84 changes: 77 additions & 7 deletions daliuge-engine/dlg/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@
import os
import urllib.parse

from typing import Optional

from . import ngaslite
from .apps.plasmaflight import PlasmaFlightClient

import pyarrow
import pyarrow.plasma as plasma


Expand Down Expand Up @@ -178,7 +182,7 @@ class MemoryIO(DataIO):
construction time
"""

def __init__(self, buf, **kwargs):
def __init__(self, buf: io.BytesIO, **kwargs):
self._buf = buf

def _open(self, **kwargs):
Expand Down Expand Up @@ -424,25 +428,91 @@ def __init__(self, object_id, plasma_path='/tmp/plasma'):
super(PlasmaIO, self).__init__()
self._plasma_path = plasma_path
self._object_id = object_id
self._reader = None
self._writer = None

def _open(self, **kwargs):
return plasma.connect(self._plasma_path)

def _close(self, **kwargs):
if self._writer:
self._desc.put_raw_buffer(self._writer.getvalue(), self._object_id)
self._writer.close()
if self._reader:
self._reader.close()

def _read(self, count, **kwargs):
if not self._reader:
[data] = self._desc.get_buffers([self._object_id])
self._reader = pyarrow.BufferReader(data)
return self._reader.read1(count)

def _write(self, data, **kwargs):
if not self._writer:
# use client.create and FixedSizeBufferWriter
self._writer = pyarrow.BufferOutputStream()
self._writer.write(data)
return len(data)

def exists(self):
return self._object_id in self._desc.list()

def delete(self):
pass


class PlasmaFlightIO(DataIO):
def __init__(
self,
object_id: plasma.ObjectID,
plasma_path='/tmp/plasma',
flight_path: Optional[str] = None,
size: int = -1):
super(PlasmaFlightIO, self).__init__()
assert size >= -1
self._object_id = object_id
self._plasma_path = plasma_path
self._flight_path = flight_path
self._size = size
self._reader = None
self._writer = None

def _open(self, **kwargs):
self._done = False
return PlasmaFlightClient(socket=self._plasma_path)

def _close(self, **kwargs):
if self._writer:
if self._size == -1:
self._desc.put(self._writer.getvalue(), self._object_id)
self._writer.close()
else:
assert self._size == self._writer.tell()
self._desc.seal(self._object_id)
if self._reader:
self._reader.close()

def _read(self, count, **kwargs):
data = self._desc.get(self._object_id)
return data
if not self._reader:
data = self._desc.get(self._object_id, self._flight_path)
self._reader = pyarrow.BufferReader(data)
return self._reader.read1(count)

def _write(self, data, **kwargs):
self._desc.put(data, self._object_id)
if not self._writer:
if self._size == -1:
# stream into resizeable buffer
logger.warning("Using dynamically sized Plasma buffer. Performance may be reduced.")
self._writer = pyarrow.BufferOutputStream()
else:
# optimally write directly to fixed size plasma buffer
self._buffer = self._desc.create(self._object_id, self._size)
self._writer = pyarrow.FixedSizeBufferWriter(self._buffer)
self._writer.write(data)
return len(data)

def exists(self):
if self._object_id in self._desc.list():
return True
return False
return self._desc.exists(self._object_id, self._flight_path)

def delete(self):
pass
2 changes: 1 addition & 1 deletion daliuge-engine/test/manager/test_dm.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def _test_runGraphInTwoNMs(
drops.update(dm2._sessions[sessionId].drops)

leaf_drop = drops[leaf_oid]
with droputils.DROPWaiterCtx(self, leaf_drop, 1):
with droputils.DROPWaiterCtx(self, leaf_drop, 5):
for oid in root_oids:
drop = drops[oid]
drop.write(root_data)
Expand Down
Loading

0 comments on commit 2699842

Please sign in to comment.