Skip to content

Commit

Permalink
Merge 0d7b2f7 into ea54ebb
Browse files Browse the repository at this point in the history
  • Loading branch information
davepallot committed Apr 27, 2021
2 parents ea54ebb + 0d7b2f7 commit 84f51be
Show file tree
Hide file tree
Showing 8 changed files with 1,176 additions and 0 deletions.
155 changes: 155 additions & 0 deletions daliuge-engine/dlg/apps/plasma.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
#
# ICRAR - International Centre for Radio Astronomy Research
# (c) UWA - The University of Western Australia, 2015
# Copyright by UWA (in the framework of the ICRAR)
# All rights reserved
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#
import os
import io
import numpy as np

from dlg.drop import BarrierAppDROP
from dlg.meta import dlg_string_param, dlg_component, dlg_batch_input, \
dlg_batch_output, dlg_streaming_input

from casacore import tables


class MSPlasmaReader(BarrierAppDROP):
'''
A BarrierAppDROP that reads a CASA measurement from a plasma store and writes out to file.
Example:
a = FileDROP('a', 'a', filepath=in_file)
b = MSPlasmaWriter('b', 'b')
c = PlasmaDROP('c', 'c')
d = MSPlasmaReader('d', 'd')
e = FileDROP('e', 'e', filepath=out_file)
'''
compontent_meta = dlg_component('MSPlasmaWriter', 'Measurement Set Plasma Writer.',
[dlg_batch_input('binary/*', [])],
[dlg_batch_output('binary/*', [])],
[dlg_streaming_input('binary/*')])

ms_output_path = dlg_string_param('ms_output_path', None)

def initialize(self, **kwargs):
super(MSPlasmaReader, self).initialize(**kwargs)

def _write_table(self, ms, path, delete=True):
if delete is True:
try:
os.rmdir(path)
except OSError:
pass

abs_path = os.path.dirname(os.path.abspath(path))
filename = os.path.basename(path)

value = ms.pop('/')
with tables.table(abs_path + '/' + filename, value[0], nrow=len(value[1])) as t:
with t.row() as r:
for idx, val in enumerate(value[1]):
r.put(idx, val)

for key, value in ms.items():
name = abs_path + '/' + filename + '/' + key
with tables.table(name, value[0], nrow=len(value[1])) as t:
with t.row() as r:
for idx, val in enumerate(value[1]):
if val.get('LOG', None) == []:
val['LOG'] = ''
if val.get('SCHEDULE', None) == []:
val['SCHEDULE'] = ''
r.put(idx, val)

def _deserialize_table(self, in_stream, path):
load_bytes = io.BytesIO(in_stream)
ms = np.load(load_bytes, allow_pickle=True).flat[0]
self._write_table(ms, path)

def run(self, **kwargs):
if len(self.inputs) != 1:
raise Exception("This application read only from one DROP")
if len(self.outputs) != 1:
raise Exception("This application writes only one DROP")

inp = self.inputs[0]
out = self.outputs[0].path

desc = inp.open()
input_stream = inp.read(desc)
self._deserialize_table(input_stream, out)


class MSPlasmaWriter(BarrierAppDROP):
'''
A BarrierAppDROP that reads a CASA measurement set and writes it out to a plasma store.
Example:
a = FileDROP('a', 'a', filepath=in_file)
b = MSPlasmaWriter('b', 'b')
c = PlasmaDROP('c', 'c')
d = MSPlasmaReader('d', 'd')
e = FileDROP('e', 'e', filepath=out_file)
'''
compontent_meta = dlg_component('MSPlasmaWriter', 'Measurement Set Plasma Writer.',
[dlg_batch_input('binary/*', [])],
[dlg_batch_output('binary/*', [])],
[dlg_streaming_input('binary/*')])

ms_input_path = dlg_string_param('ms_input_path', None)

def initialize(self, **kwargs):
super(MSPlasmaWriter, self).initialize(**kwargs)

def _read_table(self, table_path, ms, table_name=None):
if not table_name:
table_name = os.path.basename(table_path)

ms[table_name] = []
with tables.table(table_path) as t:
ms[table_name].append(t.getdesc())
ms[table_name].append([])
for row in t:
ms[table_name][1].append(row)

def _serialize_table(self, path):
ms = {}
self._read_table(path, ms, table_name='/')

with tables.table(path) as t:
sub = t.getsubtables()
for i in sub:
self._read_table(i, ms)

out_stream = io.BytesIO()
np.save(out_stream, ms, allow_pickle=True)
return out_stream.getvalue()

def run(self, **kwargs):
if len(self.inputs) != 1:
raise Exception("This application read only from one DROP")
if len(self.outputs) != 1:
raise Exception("This application writes only one DROP")

inp = self.inputs[0].path
out = self.outputs[0]
out_bytes = self._serialize_table(inp)
out.write(out_bytes)

20 changes: 20 additions & 0 deletions daliuge-engine/dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
from .meta import dlg_float_param, dlg_int_param, dlg_list_param, \
dlg_string_param, dlg_bool_param, dlg_dict_param

import pyarrow.plasma as plasma

# Opt into using per-drop checksum calculation
checksum_disabled = 'DLG_DISABLE_CHECKSUM' in os.environ
try:
Expand Down Expand Up @@ -1715,6 +1717,24 @@ def initialize(self, **kwargs):
super(BarrierAppDROP, self).initialize(**kwargs)


class PlasmaDROP(AbstractDROP):
'''
A DROP that points to data stored in a Plasma Store
'''
object_id = dlg_string_param('object_id', plasma.ObjectID.from_random().binary())
plasma_link = dlg_string_param('plasma_link', '/tmp/plasma')

def initialize(self, **kwargs):
pass

def getIO(self):
return PlasmaIO(plasma.ObjectID(self.object_id), plasma_link)

@property
def dataURL(self):
return "plasma://%s:%d/%s" % (plasma.ObjectID(self.object_id))


# Dictionary mapping 1-to-many DROPLinkType constants to the corresponding methods
# used to append a a DROP into a relationship collection of another
# (e.g., one uses `addConsumer` to add a DROPLinkeType.CONSUMER DROP into
Expand Down
32 changes: 32 additions & 0 deletions daliuge-engine/dlg/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

from . import ngaslite

import pyarrow.plasma as plasma


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -391,3 +393,33 @@ def IOForURL(url):
logger.debug('I/O chosen for dataURL %s: %r', url, io)

return io


class PlasmaIO(DataIO):

def __init__(self, object_id, plasma_link='/tmp/plasma'):
super(PlasmaIO, self).__init__()
self._plasma_link = plasma_link
self._object_id = object_id

def _open(self, **kwargs):
return plasma.connect(self._plasma_link)

def _close(self, **kwargs):
pass

def _read(self, count, **kwargs):
data = self._desc.get(self._object_id)
return data

def _write(self, data, **kwargs):
self._desc.put(data, self._object_id)
return len(data)

def exists(self):
if self._object_id in self._desc.list():
return True
return False

def delete(self):
pass
2 changes: 2 additions & 0 deletions daliuge-engine/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ def run(self):
"six>=1.10",
# 0.6 brings python3 support plus other fixes
"zerorpc >= 0.6",
"pyarrow",
"numpy"
]
# Keep alpha-sorted PLEASE!

Expand Down
Binary file added daliuge-engine/test/apps/data/test_ms.tar.gz
Binary file not shown.
79 changes: 79 additions & 0 deletions daliuge-engine/test/apps/test_plasma.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#
# ICRAR - International Centre for Radio Astronomy Research
# (c) UWA - The University of Western Australia, 2015
# Copyright by UWA (in the framework of the ICRAR)
# All rights reserved
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#
import unittest
import logging
import tarfile

from dlg.drop import FileDROP, PlasmaDROP
from dlg import droputils

casa_unavailable = True
try:
from dlg.apps.plasma import MSPlasmaWriter, MSPlasmaReader
from casacore import tables
casa_unavailable = False
except:
pass

logging.basicConfig()

@unittest.skipIf(casa_unavailable, "python-casacore not available")
class CRCAppTests(unittest.TestCase):

def compare_ms(self, in_file, out_file):
a = []
b = []
with tables.table(out_file) as t1:
for i in t1:
a.append(i['DATA'])

with tables.table(in_file) as t2:
for i in t2:
b.append(i['DATA'])

for i, j in enumerate(a):
comparison = j == b[i]
self.assertEqual(comparison.all(), True)

def test_plasma(self):
in_file = '/tmp/test.ms'
out_file = '/tmp/copy.ms'

with tarfile.open('./data/test_ms.tar.gz', 'r') as ref:
ref.extractall('/tmp/')

a = FileDROP('a', 'a', filepath=in_file)
b = MSPlasmaWriter('b', 'b')
c = PlasmaDROP('c', 'c')
d = MSPlasmaReader('d', 'd')
e = FileDROP('e', 'e', filepath=out_file)

b.addInput(a)
b.addOutput(c)
d.addInput(c)
d.addOutput(e)

# Check the MS DATA content is the same as original
with droputils.DROPWaiterCtx(self, e, 5):
a.setCompleted()

self.compare_ms(in_file, out_file)
Loading

0 comments on commit 84f51be

Please sign in to comment.