Skip to content

Commit

Permalink
Refined meta definitions
Browse files Browse the repository at this point in the history
  • Loading branch information
davepallot committed Jun 21, 2019
1 parent c666d91 commit 36a1043
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 21 deletions.
26 changes: 20 additions & 6 deletions dlg/apps/archiving.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from ..drop import BarrierAppDROP, ContainerDROP
from ..droputils import DROPFile
from ..io import NgasIO, OpenMode, NgasLiteIO
from ..meta import dlg_string_param, dlg_float_param, dlg_int_param, \
dlg_component, dlg_batch_input, dlg_batch_output, dlg_streaming_input


class ExternalStoreApp(BarrierAppDROP):
Expand All @@ -34,6 +36,12 @@ class ExternalStoreApp(BarrierAppDROP):
shouldn't contain any output, making it a leaf node of the physical graph
where it resides.
"""
compontent_meta = dlg_component('An application that takes its input DROP (which must be one, and only one) '
'and creates a copy of it in a completely external store, from the point '
'of view of the DALiuGE framework.',
[dlg_batch_input('binary/*', [])],
[dlg_batch_output('binary/*', [])],
[dlg_streaming_input('binary/*')])

def run(self):

Expand Down Expand Up @@ -62,23 +70,29 @@ class NgasArchivingApp(ExternalStoreApp):
new NGAS client process. This way we can read the different storage types
supported by the framework, and not only filesystem objects.
'''
compontent_meta = dlg_component('An ExternalStoreApp class that takes its input DROP and archives it in '
'an NGAS server. It currently deals with non-container DROPs only.',
[dlg_batch_input('binary/*', [])],
[dlg_batch_output('binary/*', [])],
[dlg_streaming_input('binary/*')])

ngasSrv = dlg_string_param('NGAS hostname', 'localhost')
ngasPort = dlg_int_param('NGAS Port', 7777)
ngasConnectTimeout = dlg_float_param('Connect Timeout', 2.)
ngasTimeout = dlg_float_param('Timeout', 2.)

def initialize(self, **kwargs):
super(NgasArchivingApp, self).initialize(**kwargs)
self._ngasSrv = self._getArg(kwargs, 'ngasSrv', 'localhost')
self._ngasPort = int(self._getArg(kwargs, 'ngasPort', 7777))
self._ngasTimeout = float(self._getArg(kwargs, 'ngasConnectTimeout', 2.))
self._ngasConnectTimeout = float(self._getArg(kwargs, 'ngasTimeout', 2.))

def store(self, inDrop):
if isinstance(inDrop, ContainerDROP):
raise Exception("ContainerDROPs are not supported as inputs for this application")

size = -1 if inDrop.size is None else inDrop.size
try:
ngasIO = NgasIO(self._ngasSrv, inDrop.uid, self._ngasPort, self._ngasConnectTimeout, self._ngasTimeout, size)
ngasIO = NgasIO(self.ngasSrv, inDrop.uid, self.ngasPort, self.ngasConnectTimeout, self.ngasTimeout, size)
except ImportError:
ngasIO = NgasLiteIO(self._ngasSrv, inDrop.uid, self._ngasPort, self._ngasConnectTimeout, self._ngasTimeout, size)
ngasIO = NgasLiteIO(self.ngasSrv, inDrop.uid, self.ngasPort, self.ngasConnectTimeout, self.ngasTimeout, size)

ngasIO.open(OpenMode.OPEN_WRITE)

Expand Down
25 changes: 23 additions & 2 deletions dlg/apps/bash_shell_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@
import threading
import time
import types

import six

from .. import droputils, utils
from ..ddap_protocol import AppDROPStates, DROPStates
from ..drop import BarrierAppDROP, AppDROP
from ..exceptions import InvalidDropException
from ..param_types import dlg_string_param
from ..meta import dlg_string_param, dlg_component, dlg_batch_input, \
dlg_batch_output, dlg_streaming_input


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -147,6 +147,7 @@ class BashShellBase(object):
Common class for BashShell apps. It simply requires a command to be
specified.
"""

command = dlg_string_param('Bash command', None)

def initialize(self, **kwargs):
Expand Down Expand Up @@ -287,6 +288,11 @@ class BashShellApp(BashShellBase, BarrierAppDROP):
its inputs are COMPLETED. It also *doesn't* output a stream of data; see
StreamingOutputBashApp for those cases.
"""
compontent_meta = dlg_component('An app that runs a bash command in batch mode',
[dlg_batch_input('text/*', [])],
[dlg_batch_output('text/*', [])],
[dlg_streaming_input('text/*')])

def run(self):
self._run_bash(self._inputs, self._outputs)

Expand All @@ -295,6 +301,11 @@ class StreamingOutputBashApp(BashShellBase, BarrierAppDROP):
Like BashShellApp, but its stdout is a stream of data that is fed into the
next application.
"""
compontent_meta = dlg_component('Like BashShellApp, but its stdout is a stream '
'of data that is fed into the next application.',
[dlg_batch_input('text/*', [])],
[dlg_batch_output('text/*', [])],
[dlg_streaming_input('text/*')])
def run(self):
with contextlib.closing(prepare_output_channel(self.node, self.outputs[0])) as outchan:
self._run_bash(self._inputs, {}, stdout=outchan)
Expand All @@ -309,6 +320,11 @@ class StreamingInputBashApp(StreamingInputBashAppBase):
to establish the streaming channel. This information is also used to kick
this application off.
"""
compontent_meta = dlg_component('An app that runs a bash command that consumes data from stdin.',
[dlg_batch_input('text/*', [])],
[dlg_batch_output('text/*', [])],
[dlg_streaming_input('text/*')])

def run(self, data):
with contextlib.closing(prepare_input_channel(data)) as inchan:
self._run_bash({}, self._outputs, stdin=inchan)
Expand All @@ -319,6 +335,11 @@ class StreamingInputOutputBashApp(StreamingInputBashAppBase):
Like StreamingInputBashApp, but its stdout is also a stream of data that is
fed into the next application.
"""
compontent_meta = dlg_component('Like StreamingInputBashApp, but its stdout is also a '
'stream of data that is fed into the next application.',
[dlg_batch_input('text/*', [])],
[dlg_batch_output('text/*', [])],
[dlg_streaming_input('text/*')])
def run(self, data):
with contextlib.closing(prepare_input_channel(data)) as inchan:
with contextlib.closing(prepare_output_channel(self.node, self.outputs[0])) as outchan:
Expand Down
14 changes: 13 additions & 1 deletion dlg/apps/crc.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,26 @@

from ..drop import BarrierAppDROP, AppDROP
from dlg.ddap_protocol import AppDROPStates

from ..meta import dlg_component, dlg_batch_input, dlg_batch_output, dlg_streaming_input

try:
from crc32c import crc32 # @UnusedImport
except:
from binascii import crc32 # @Reimport


class CRCApp(BarrierAppDROP):
'''
An BarrierAppDROP that calculates the CRC of the single DROP it
consumes. It assumes the DROP being consumed is not a container.
This is a simple example of an BarrierAppDROP being implemented, and
not something really intended to be used in a production system
'''
compontent_meta = dlg_component('A BarrierAppDROP that calculates the '
'CRC of the single DROP it consumes',
[dlg_batch_input('binary/*', [])],
[dlg_batch_output('binary/*', [])],
[dlg_streaming_input('binary/*')])

def run(self):
if len(self.inputs) != 1:
Expand All @@ -64,11 +70,17 @@ def run(self):
# for storing our data
outputDrop.write(six.b(str(crc)))


class CRCStreamApp(AppDROP):
"""
Calculate CRC in the streaming mode
i.e. A "streamingConsumer" of its predecessor in the graph
"""
compontent_meta = dlg_component('Calculate CRC in the streaming mode.',
[dlg_batch_input('binary/*', [])],
[dlg_batch_output('binary/*', [])],
[dlg_streaming_input('binary/*')])

def initialize(self, **kwargs):
super(CRCStreamApp, self).initialize(**kwargs)
self._crc = 0
Expand Down
11 changes: 10 additions & 1 deletion dlg/apps/fileimport.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

from ..drop import ContainerDROP
from ..drop import FileDROP
from dlg.param_types import dlg_string_param, dlg_list_param
from dlg.meta import dlg_string_param, dlg_list_param, dlg_component, \
dlg_batch_input, dlg_batch_output, dlg_streaming_input


class FileImportApp(ContainerDROP):
Expand All @@ -35,6 +36,14 @@ class FileImportApp(ContainerDROP):
is created which contains the path to the file. The FileDROP is then added
to the FileImportApp (ContainerDROP)
"""
compontent_meta = dlg_component('Recursively scans a directory (dirname) and checks for files with '
'a particular extension (ext). If a match is made then a FileDROP '
'is created which contains the path to the file. The FileDROP is then added '
'to the FileImportApp (ContainerDROP)',
[dlg_batch_input('binary/*', [])],
[dlg_batch_output('binary/*', [])],
[dlg_streaming_input('binary/*')])

dirname = dlg_string_param('dirname', None)
ext = dlg_list_param('ext', [])

Expand Down
24 changes: 17 additions & 7 deletions dlg/apps/scp.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston,
# MA 02111-1307 USA
#
from .. import remote
from ..drop import BarrierAppDROP
from ..param_types import dlg_string_param
from ..remote import copyTo, copyFrom
from ..drop import BarrierAppDROP, ShoreDROP, NgasDROP, InMemoryDROP, \
NullDROP, RDBMSDrop, ContainerDROP
from ..meta import dlg_string_param, dlg_component, dlg_batch_input, \
dlg_batch_output, dlg_streaming_input


class ScpApp(BarrierAppDROP):
Expand All @@ -38,6 +40,14 @@ class ScpApp(BarrierAppDROP):
TO other host. This application's node must thus coincide with one of the
two I/O DROPs.
"""
compontent_meta = dlg_component('A BarrierAppDROP that copies the content of its single '
'input onto its single output via SSHs scp protocol.',
[dlg_batch_input('binary/*', [ShoreDROP, NgasDROP, InMemoryDROP,
NullDROP, RDBMSDrop, ContainerDROP])],
[dlg_batch_output('binary/*', [ShoreDROP, NgasDROP, InMemoryDROP,
NullDROP, RDBMSDrop, ContainerDROP])],
[dlg_streaming_input('binary/*')])

remoteUser = dlg_string_param('remoteUser', None)
pkeyPath = dlg_string_param('pkeyPath', None)
timeout = dlg_string_param('timeout', None)
Expand Down Expand Up @@ -82,8 +92,8 @@ def run(self):
# recursive = isinstance(inp, DirectoryContainer)
recursive = hasattr(inp, 'children')
if self.node == inp.node:
remote.copyTo(out.node, inp.path, remotePath=out.path, recursive=recursive,
username=self.remoteUser, pkeyPath=self.pkeyPath, timeout=self.timeout)
copyTo(out.node, inp.path, remotePath=out.path, recursive=recursive,
username=self.remoteUser, pkeyPath=self.pkeyPath, timeout=self.timeout)
else:
remote.copyFrom(inp.node, inp.path, localPath=out.path, recursive=recursive,
username=self.remoteUser, pkeyPath=self.pkeyPath, timeout=self.timeout)
copyFrom(inp.node, inp.path, localPath=out.path, recursive=recursive,
username=self.remoteUser, pkeyPath=self.pkeyPath, timeout=self.timeout)
18 changes: 17 additions & 1 deletion dlg/apps/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,28 @@

from .. import droputils
from ..drop import BarrierAppDROP, ContainerDROP
from ..param_types import dlg_float_param
from ..meta import dlg_float_param, dlg_component, dlg_batch_input, \
dlg_batch_output, dlg_streaming_input


class NullBarrierApp(BarrierAppDROP):
compontent_meta = dlg_component('Null Barrier.',
[dlg_batch_input('binary/*', [])],
[dlg_batch_output('binary/*', [])],
[dlg_streaming_input('binary/*')])

"""A BarrierAppDrop that doesn't perform any work"""
def run(self):
pass


class SleepApp(BarrierAppDROP):
"""A BarrierAppDrop that sleeps the specified amount of time (0 by default)"""
compontent_meta = dlg_component('Sleep App.',
[dlg_batch_input('binary/*', [])],
[dlg_batch_output('binary/*', [])],
[dlg_streaming_input('binary/*')])

sleepTime = dlg_float_param('sleep time', 0)

def initialize(self, **kwargs):
Expand All @@ -51,6 +62,11 @@ class CopyApp(BarrierAppDROP):
All inputs are copied into all outputs in the order they were declared in
the graph.
"""
compontent_meta = dlg_component('Copy App.',
[dlg_batch_input('binary/*', [])],
[dlg_batch_output('binary/*', [])],
[dlg_streaming_input('binary/*')])

def run(self):
self.copyAll()

Expand Down
8 changes: 7 additions & 1 deletion dlg/apps/socket_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
from ..ddap_protocol import DROPRel, DROPLinkType
from ..drop import BarrierAppDROP
from ..exceptions import InvalidRelationshipException
from ..param_types import dlg_string_param, dlg_int_param, dlg_bool_param
from ..meta import dlg_string_param, dlg_int_param, dlg_bool_param, \
dlg_component, dlg_batch_output, dlg_batch_input, dlg_streaming_input


logger = logging.getLogger(__name__)
Expand All @@ -51,6 +52,11 @@ class SocketListenerApp(BarrierAppDROP):

_dryRun = False

compontent_meta = dlg_component('A BarrierAppDROP that listens on a socket for data',
[dlg_batch_input('binary/*', [])],
[dlg_batch_output('binary/*', [])],
[dlg_streaming_input('binary/*')])

host = dlg_string_param('host', '127.0.0.1')
port = dlg_int_param('port', 1111)
bufsize = dlg_int_param('bufsize', 4096)
Expand Down
2 changes: 1 addition & 1 deletion dlg/drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
from .exceptions import InvalidDropException, InvalidRelationshipException
from .io import OpenMode, FileIO, MemoryIO, NgasIO, ErrorIO, NullIO, ShoreIO
from .utils import prepare_sql, createDirIfMissing, isabs, object_tracking
from .param_types import dlg_float_param, dlg_int_param, dlg_list_param, \
from .meta import dlg_float_param, dlg_int_param, dlg_list_param, \
dlg_string_param, dlg_bool_param, dlg_dict_param

# Opt into using per-drop checksum calculation
Expand Down
26 changes: 25 additions & 1 deletion dlg/param_types.py → dlg/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,34 @@

import collections

dlg_component = collections.namedtuple('dlg_component', 'description inputs outputs')
dlg_bool_param = collections.namedtuple('dlg_bool_param', 'description default_value')
dlg_int_param = collections.namedtuple('dlg_int_param', 'description default_value')
dlg_float_param = collections.namedtuple('dlg_float_param', 'description default_value')
dlg_string_param = collections.namedtuple('dlg_string_param', 'description default_value')
dlg_list_param = collections.namedtuple('dlg_list_param', 'description default_value')
dlg_dict_param = collections.namedtuple('dlg_dict_param', 'description default_value')


class dlg_batch_input(object):
def __init__(self, mime_type, input_drop_class):
self.input_drop_class = input_drop_class
self.mime_type = mime_type


class dlg_batch_output(object):
def __init__(self, mime_type, output_drop_class):
self.output_drop_class = output_drop_class
self.mime_type = mime_type


class dlg_streaming_input(object):
def __init__(self, mime_type):
self.mime_type = mime_type


class dlg_component(object):
def __init__(self, description, batch_inputs, batch_outputs, streaming_inputs):
self.description = description
self.batch_inputs = batch_inputs
self.batch_outputs = batch_outputs
self.streaming_inputs = streaming_inputs

0 comments on commit 36a1043

Please sign in to comment.