Skip to content

Commit

Permalink
Merge pull request #30 from ICRAR/YAN-678
Browse files Browse the repository at this point in the history
Yan 678
  • Loading branch information
davepallot committed Apr 27, 2021
2 parents ea54ebb + e0f7487 commit a6d98c2
Show file tree
Hide file tree
Showing 8 changed files with 1,188 additions and 1 deletion.
155 changes: 155 additions & 0 deletions daliuge-engine/dlg/apps/plasma.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
#
# ICRAR - International Centre for Radio Astronomy Research
# (c) UWA - The University of Western Australia, 2015
# Copyright by UWA (in the framework of the ICRAR)
# All rights reserved
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#
import os
import io
import numpy as np

from dlg.drop import BarrierAppDROP
from dlg.meta import dlg_string_param, dlg_component, dlg_batch_input, \
dlg_batch_output, dlg_streaming_input

from casacore import tables


class MSPlasmaReader(BarrierAppDROP):
'''
A BarrierAppDROP that reads a CASA measurement from a plasma store and writes out to file.
Example:
a = FileDROP('a', 'a', filepath=in_file)
b = MSPlasmaWriter('b', 'b')
c = PlasmaDROP('c', 'c')
d = MSPlasmaReader('d', 'd')
e = FileDROP('e', 'e', filepath=out_file)
'''
compontent_meta = dlg_component('MSPlasmaWriter', 'Measurement Set Plasma Writer.',
[dlg_batch_input('binary/*', [])],
[dlg_batch_output('binary/*', [])],
[dlg_streaming_input('binary/*')])

ms_output_path = dlg_string_param('ms_output_path', None)

def initialize(self, **kwargs):
super(MSPlasmaReader, self).initialize(**kwargs)

def _write_table(self, ms, path, delete=True):
if delete is True:
try:
os.rmdir(path)
except OSError:
pass

abs_path = os.path.dirname(os.path.abspath(path))
filename = os.path.basename(path)

value = ms.pop('/')
with tables.table(abs_path + '/' + filename, value[0], nrow=len(value[1])) as t:
with t.row() as r:
for idx, val in enumerate(value[1]):
r.put(idx, val)

for key, value in ms.items():
name = abs_path + '/' + filename + '/' + key
with tables.table(name, value[0], nrow=len(value[1])) as t:
with t.row() as r:
for idx, val in enumerate(value[1]):
if val.get('LOG', None) == []:
val['LOG'] = ''
if val.get('SCHEDULE', None) == []:
val['SCHEDULE'] = ''
r.put(idx, val)

def _deserialize_table(self, in_stream, path):
load_bytes = io.BytesIO(in_stream)
ms = np.load(load_bytes, allow_pickle=True).flat[0]
self._write_table(ms, path)

def run(self, **kwargs):
if len(self.inputs) != 1:
raise Exception("This application read only from one DROP")
if len(self.outputs) != 1:
raise Exception("This application writes only one DROP")

inp = self.inputs[0]
out = self.outputs[0].path

desc = inp.open()
input_stream = inp.read(desc)
self._deserialize_table(input_stream, out)


class MSPlasmaWriter(BarrierAppDROP):
'''
A BarrierAppDROP that reads a CASA measurement set and writes it out to a plasma store.
Example:
a = FileDROP('a', 'a', filepath=in_file)
b = MSPlasmaWriter('b', 'b')
c = PlasmaDROP('c', 'c')
d = MSPlasmaReader('d', 'd')
e = FileDROP('e', 'e', filepath=out_file)
'''
compontent_meta = dlg_component('MSPlasmaWriter', 'Measurement Set Plasma Writer.',
[dlg_batch_input('binary/*', [])],
[dlg_batch_output('binary/*', [])],
[dlg_streaming_input('binary/*')])

ms_input_path = dlg_string_param('ms_input_path', None)

def initialize(self, **kwargs):
super(MSPlasmaWriter, self).initialize(**kwargs)

def _read_table(self, table_path, ms, table_name=None):
if not table_name:
table_name = os.path.basename(table_path)

ms[table_name] = []
with tables.table(table_path) as t:
ms[table_name].append(t.getdesc())
ms[table_name].append([])
for row in t:
ms[table_name][1].append(row)

def _serialize_table(self, path):
ms = {}
self._read_table(path, ms, table_name='/')

with tables.table(path) as t:
sub = t.getsubtables()
for i in sub:
self._read_table(i, ms)

out_stream = io.BytesIO()
np.save(out_stream, ms, allow_pickle=True)
return out_stream.getvalue()

def run(self, **kwargs):
if len(self.inputs) != 1:
raise Exception("This application read only from one DROP")
if len(self.outputs) != 1:
raise Exception("This application writes only one DROP")

inp = self.inputs[0].path
out = self.outputs[0]
out_bytes = self._serialize_table(inp)
out.write(out_bytes)

27 changes: 26 additions & 1 deletion daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,19 @@

import six
from six import BytesIO
import numpy as np

from .ddap_protocol import ExecutionMode, ChecksumTypes, AppDROPStates, \
DROPLinkType, DROPPhases, DROPStates, DROPRel
from .event import EventFirer
from .exceptions import InvalidDropException, InvalidRelationshipException
from .io import OpenMode, FileIO, MemoryIO, NgasIO, NgasLiteIO, ErrorIO, NullIO
from .io import OpenMode, FileIO, MemoryIO, NgasIO, NgasLiteIO, ErrorIO, NullIO, PlasmaIO
from .utils import prepare_sql, createDirIfMissing, isabs, object_tracking
from .meta import dlg_float_param, dlg_int_param, dlg_list_param, \
dlg_string_param, dlg_bool_param, dlg_dict_param

import pyarrow.plasma as plasma

# Opt into using per-drop checksum calculation
checksum_disabled = 'DLG_DISABLE_CHECKSUM' in os.environ
try:
Expand Down Expand Up @@ -1715,6 +1718,28 @@ def initialize(self, **kwargs):
super(BarrierAppDROP, self).initialize(**kwargs)


class PlasmaDROP(AbstractDROP):
'''
A DROP that points to data stored in a Plasma Store
'''
object_id = dlg_string_param('object_id', None)
plasma_link = dlg_string_param('plasma_link', '/tmp/plasma')

def initialize(self, **kwargs):
object_id = self.uid
if len(self.uid) != 20:
object_id = np.random.bytes(20)
if self.object_id is None:
self.object_id = object_id

def getIO(self):
return PlasmaIO(plasma.ObjectID(self.object_id), self.plasma_link)

@property
def dataURL(self):
return "plasma://%s" % (self.object_id.encode('hex'))


# Dictionary mapping 1-to-many DROPLinkType constants to the corresponding methods
# used to append a a DROP into a relationship collection of another
# (e.g., one uses `addConsumer` to add a DROPLinkeType.CONSUMER DROP into
Expand Down
32 changes: 32 additions & 0 deletions daliuge-engine/dlg/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

from . import ngaslite

import pyarrow.plasma as plasma


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -391,3 +393,33 @@ def IOForURL(url):
logger.debug('I/O chosen for dataURL %s: %r', url, io)

return io


class PlasmaIO(DataIO):

def __init__(self, object_id, plasma_link='/tmp/plasma'):
super(PlasmaIO, self).__init__()
self._plasma_link = plasma_link
self._object_id = object_id

def _open(self, **kwargs):
return plasma.connect(self._plasma_link)

def _close(self, **kwargs):
pass

def _read(self, count, **kwargs):
data = self._desc.get(self._object_id)
return data

def _write(self, data, **kwargs):
self._desc.put(data, self._object_id)
return len(data)

def exists(self):
if self._object_id in self._desc.list():
return True
return False

def delete(self):
pass
2 changes: 2 additions & 0 deletions daliuge-engine/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ def run(self):
"six>=1.10",
# 0.6 brings python3 support plus other fixes
"zerorpc >= 0.6",
"pyarrow",
"numpy"
]
# Keep alpha-sorted PLEASE!

Expand Down
Binary file added daliuge-engine/test/apps/data/test_ms.tar.gz
Binary file not shown.
85 changes: 85 additions & 0 deletions daliuge-engine/test/apps/test_plasma.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#
# ICRAR - International Centre for Radio Astronomy Research
# (c) UWA - The University of Western Australia, 2015
# Copyright by UWA (in the framework of the ICRAR)
# All rights reserved
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#
import unittest
import logging
import tarfile

from dlg.drop import FileDROP, PlasmaDROP
from dlg import droputils

casa_unavailable = True
try:
import pyarrow.plasma as plasma
from dlg.apps.plasma import MSPlasmaWriter, MSPlasmaReader
from casacore import tables
casa_unavailable = False
except:
pass

logging.basicConfig()

@unittest.skipIf(casa_unavailable, "python-casacore not available")
class CRCAppTests(unittest.TestCase):

def compare_ms(self, in_file, out_file):
a = []
b = []
with tables.table(out_file) as t1:
for i in t1:
a.append(i['DATA'])

with tables.table(in_file) as t2:
for i in t2:
b.append(i['DATA'])

for i, j in enumerate(a):
comparison = j == b[i]
self.assertEqual(comparison.all(), True)

def test_plasma(self):
in_file = '/tmp/test.ms'
out_file = '/tmp/copy.ms'

with tarfile.open('./data/test_ms.tar.gz', 'r') as ref:
ref.extractall('/tmp/')

a = FileDROP('a', 'a', filepath=in_file)
b = MSPlasmaWriter('b', 'b')
c = PlasmaDROP('c', 'c')
d = MSPlasmaReader('d', 'd')
e = FileDROP('e', 'e', filepath=out_file)

b.addInput(a)
b.addOutput(c)
d.addInput(c)
d.addOutput(e)

# Check the MS DATA content is the same as original
with droputils.DROPWaiterCtx(self, e, 5):
a.setCompleted()

self.compare_ms(in_file, out_file)

# check we can go from dataURL to plasma ID
client = plasma.connect("/tmp/plasma")
a = c.dataURL.split('//')[1].decode("hex")
client.get(plasma.ObjectID(a))
Loading

0 comments on commit a6d98c2

Please sign in to comment.