Skip to content

Commit

Permalink
Merge branch 'master' into job-cache
Browse files Browse the repository at this point in the history
  • Loading branch information
rexissimus committed May 26, 2014
2 parents 28dbb06 + 518606c commit efc7625
Show file tree
Hide file tree
Showing 74 changed files with 42,098 additions and 385 deletions.
2 changes: 2 additions & 0 deletions doc/usersguide/example.rst
Expand Up @@ -50,6 +50,8 @@ Intermediate Concepts and VisTrails Packages

controlflow
cfassistant
list_handling
streaming
parallelflow
database
example_webservices
Expand Down
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
94 changes: 94 additions & 0 deletions doc/usersguide/list_handling.rst
@@ -0,0 +1,94 @@
.. _chap-list_handling:

**************************
List Handling in VisTrails
**************************

VisTrails supports passing typed lists between modules. Ports on modules have a
depth parameter specifying the list depth it expects. 0 means no list, 1 is a
list, 2 is a list of lists etc. Port depth can be specified either by module
creators or in a ``PythonSource`` or similar module.

Iterating over lists
====================

Passing a list to a module that does not support lists will cause that module
to be executed once for each element in the list. When passing lists on
multiple input ports, the inputs will be combined. The default combination is
cartesian product, where each element in a list is combined with each element
in another list. This combination can be changed by selecting "Looping Options"
in the module menu. The options are ``Cartesian``, ``Pairwise`` (where the
elements are combined pairwise), and ``Custom``. ``Custom`` gives you complete
control over how inputs are combined and allows you to use both
pairwise/cartesian combiners as well as reordering them. The output of an
iterated module will be an ordered list with the individual results of the
module execution. This will cause modules downstrean to also be iterated over,
unless they accept a list as input. Iterated modules will have duplicated
connections to show that they are being iterated over. A list of lists will
have the connection triplicated etc.

.. topic:: Try it Now!

Lets create a simple example showing how to combine strings. First we will
create a module that generates lists of strings. Create a new workflow and
add a ``PythonSource`` module. Give it three output ports named ``s1``,
``s2``, ``s3`` of type ``String`` and set their list depth to 1. Enter this
code:

.. code-block:: python
s1 = ['A', 'B', 'C']
s2 = ['+', '-', '*']
s3 = ['1', '2', '3']
.. topic:: Next Step!

Add a ``ConcatenateString`` module, connect ``s1->str1``, ``s2->str2``,
``s3->str3``. Notice how the connections going into ``ConcatenateString`` are
duplicated. This indicates that ``ConcatenateString`` will iterate over the
list inputs. Add a ``StandardOutput`` module and connect
``ConcatenateString.value`` to ``StandardOutput.value``. This connection
will be duplicated in both ends, indicating they will both be iterated over.
Your workflow should now look like Figure
:ref:`fig-list_handling-list-combine-workflow`.

.. _fig-list_handling-list-combine-workflow:

.. figure:: figures/list_handling/list-combine-workflow.png
:align: center
:width: 1.8in

The complete workflow

.. topic:: Next Step!

By default ``ConcatenateString`` will combine the inputs using cartesian
product ``["A+1", "A+2", "A+3", "A-1", ...]``. Lets change this. Go to
Module ``Menu->Looping Options`` and click custom. Right click in the port
list and add a pairwise product. Rearrange the ports so that it looks like
Figure :ref:`fig-list_handling-list-combine`.

.. _fig-list_handling-list-combine:

.. figure:: figures/list_handling/list-combine.png
:align: center

A custom list combiner

.. topic:: Finally:

``str2`` and ``str3`` will now be combined pairwise and then combined with
``str1`` using cartesian product. Open the vistrails console and execute the
workflow. You should see this output: :vtl:`(Open result) <list-handling.vt>`

.. code-block:: python
A+1
A-2
A*3
B+1
B-2
B*3
C+1
C-2
C*3
94 changes: 94 additions & 0 deletions doc/usersguide/streaming.rst
@@ -0,0 +1,94 @@
.. _chap-streaming:

**********************
Streaming in VisTrails
**********************

Streaming data may be useful for a number of reasons, such as to incrementally
update a visualization, or to process more data than fit into memory.
VisTrails supports streaming data through the workflow. By implementing modules
that supports streaming, data items will be passed through the whole workflow
one at a time.

Using Streaming
===============

Streaming is similar to list handling (see Chapter
:ref:`chap-list_handling`). Modules that create streams should output a port
with list depth 1. Downstream modules that do not accept lists will be executed
once for each item in the stream. Modules with multiple input streams will
combine them pairwise. For this reason the input streams should contain the
same number of items (or ben unlimited).

Modules accepting a type with list depth 1, but does not support streaming,
will convert input streams to lists and execute after the streaming have ended.

.. topic:: Try it Now!

Lets use PythonSources to create a simple example that incrementally sums up
a sequence of numbers. First we will create a module that streams the
natural numbers up to some value. Create a new workflow and add a
``PythonSource`` module. Give it an input port named ``inputs`` of type
Integer, which will specify the maxim number to stream, and an output port
named ``out`` of type ``Integer`` with list depth 1, which will be the output
stream. An output stream can be created by using
``self.set_streaming_output``, which takes the port name, an iterator object,
and an optional length of the input items. To create an integer iterator we
can use xrange. Add this to the PythonSource:

.. code-block:: python
self.set_streaming_output('out',
xrange(inputs).__iter__(),
inputs)
.. topic:: Next Step!

Now lets create a module that captures the item in the string. Add a second
``PythonSource`` module below the first one. Give it an input port named
``integerStream`` of type Integer and list depth 1 that will be our input
stream. An input stream can be captured by adding the magic string
``#STREAMING`` to the PYthonSource code and calling ``self.set_streaming``
with a generator method as argument. The generator method should take the
module as an input. It should first initialize its value, in our case set
``intsum=0``. Then it should receive the inputs in a loop ending with yield.
In each iteration the module will be updated to contain a new input in the
stream. Similar to a normal module, the loop should:

1. get inputs
2. compute outputs
3. set outputs
4. call ``yield``

Below is the complete example. Add it to the PythonSource.

.. code-block:: python
#STREAMING - This tag is magic, do not change.
def generator(module):
intsum = 0
while 1:
i = module.get_input('integerStream')
intsum += i
print "Sum so far:", intsum
yield
self.set_streaming(generator)
.. topic:: Finally:

Connect the two PythonSource's, set ``inputs`` to 100 in the first
PythonSource, open the vistrails console and execute. See how the output is
printed to the console while the stream runs and how the progress of the
modules increase. The output should look like this: :vtl:`(open in vistrails)
<streaming.vt>`

.. code-block:: python
Sum so far: 0
Sum so far: 1
Sum so far: 3
...
Sum so far: 4851
Sum so far: 4950
1 change: 1 addition & 0 deletions doc/usersguide/vtl/list-handling.vtl
@@ -0,0 +1 @@
<vtlink filename="@examples/list-handling.vt" version="combine lists"/>
1 change: 1 addition & 0 deletions doc/usersguide/vtl/streaming.vtl
@@ -0,0 +1 @@
<vtlink filename="@examples/streaming.vt" version="add numbers"/>
Binary file added examples/list-handling.vt
Binary file not shown.
Binary file added examples/streaming.vt
Binary file not shown.
13 changes: 12 additions & 1 deletion vistrails/core/cache/hasher.py
Expand Up @@ -74,6 +74,14 @@ def function_signature(function, constant_hasher_map={}):
constant_hasher_map))
return hasher.digest()

@staticmethod
def control_param_signature(control_param, constant_hasher_map={}):
hasher = sha_hash()
u = hasher.update
u(control_param.name)
u(control_param.value)
return hasher.digest()

@staticmethod
def connection_signature(c):
hasher = sha_hash()
Expand Down Expand Up @@ -104,7 +112,10 @@ def module_signature(obj, constant_hasher_map={}):
u(obj.module_descriptor.namespace or '')
u(obj.module_descriptor.package_version or '')
u(obj.module_descriptor.version or '')
u(hash_list(obj.functions, Hasher.function_signature, constant_hasher_map))
u(hash_list(obj.functions, Hasher.function_signature,
constant_hasher_map))
u(hash_list(obj.control_parameters, Hasher.control_param_signature,
constant_hasher_map))
return hasher.digest()

@staticmethod
Expand Down
22 changes: 13 additions & 9 deletions vistrails/core/db/io.py
Expand Up @@ -125,12 +125,14 @@ def get_workflow_diff(vt_pair_1, vt_pair_2):
"""

from vistrails.core.vistrail.pipeline import Pipeline
(v1, v2, pairs, heuristic_pairs, v1_only, v2_only, param_changes, \
_, _, _, _) = \
vistrails.db.services.vistrail.getWorkflowDiff(vt_pair_1, vt_pair_2, True)
(v1, v2, pairs, heuristic_pairs, v1_only, v2_only, param_changes,
cparam_changes, annot_changes, _, _, _, _) = \
vistrails.db.services.vistrail.getWorkflowDiff(vt_pair_1, vt_pair_2,
True)
Pipeline.convert(v1)
Pipeline.convert(v2)
return (v1, v2, pairs, heuristic_pairs, v1_only, v2_only, param_changes)
return (v1, v2, pairs, heuristic_pairs, v1_only, v2_only, param_changes,
cparam_changes, annot_changes)

def get_workflow_diff_with_connections(vt_pair_1, vt_pair_2):
"""get_workflow_diff_with_connections
Expand All @@ -139,13 +141,15 @@ def get_workflow_diff_with_connections(vt_pair_1, vt_pair_2):
"""

from vistrails.core.vistrail.pipeline import Pipeline
(v1, v2, m_pairs, m_heuristic, v1_only, v2_only, param_changes, \
c_pairs, c_heuristic, c1_only, c2_only) = \
vistrails.db.services.vistrail.getWorkflowDiff(vt_pair_1, vt_pair_2, False)
(v1, v2, m_pairs, m_heuristic, v1_only, v2_only, param_changes,
cparam_changes, annot_changes, c_pairs, c_heuristic, c1_only, c2_only) =\
vistrails.db.services.vistrail.getWorkflowDiff(vt_pair_1, vt_pair_2,
False)
Pipeline.convert(v1)
Pipeline.convert(v2)
return (v1, v2, m_pairs, m_heustric, v1_only, v2_only, param_changes,
c_pairs, c_heuristic, c1_only, c2_only)
return (v1, v2, m_pairs, m_heuristic, v1_only, v2_only, param_changes,
cparam_changes, annot_changes, c_pairs, c_heuristic, c1_only,
c2_only)

def getPathAsAction(vt, v1, v2, do_copy=False):
a = vistrails.db.services.vistrail.getPathAsAction(vt, v1, v2, do_copy)
Expand Down
53 changes: 49 additions & 4 deletions vistrails/core/interpreter/cached.py
Expand Up @@ -46,7 +46,8 @@
from vistrails.core.interpreter.job import JobMonitor
import vistrails.core.interpreter.utils
from vistrails.core.log.controller import DummyLogController
from vistrails.core.modules.basic_modules import identifier as basic_pkg
from vistrails.core.modules.basic_modules import identifier as basic_pkg, \
Iterator
from vistrails.core.modules.module_registry import get_module_registry
from vistrails.core.modules.vistrails_module import ModuleBreakpoint, \
ModuleConnector, ModuleError, ModuleErrors, ModuleHadError, \
Expand Down Expand Up @@ -134,7 +135,7 @@ def _handle_suspended(self, obj, error):
error.name = name
# if signature is not set we use the module identifier
if not error.signature:
error.signature = i
error.signature = obj.signature
jm.addParent(error)

def end_update(self, obj, error=None, errorTrace=None,
Expand Down Expand Up @@ -162,7 +163,8 @@ def end_update(self, obj, error=None, errorTrace=None,
if i in self.ids:
self.ids.remove(i)
self.view.set_execution_progress(
1.0 - (len(self.ids) * 1.0 / self.nb_modules))
1.0 - ((len(self.ids) + len(Iterator.generators)) * 1.0 /
(self.nb_modules + len(Iterator.generators))))

msg = '' if error is None else error.msg
self.log.finish_execution(obj, msg, errorTrace,
Expand Down Expand Up @@ -210,6 +212,7 @@ def create(self):
self._objects = {}
self._executed = {}
self.filePool = self._file_pool
self._streams = []

def clear(self):
self._file_pool.cleanup()
Expand Down Expand Up @@ -340,6 +343,7 @@ def create_constant(param, module):
persistent_id = tmp_to_persistent_module_map[i]
module = self._persistent_pipeline.modules[persistent_id]
obj = self._objects[persistent_id] = module.summon()
obj.list_depth = module.list_depth
obj.interpreter = self
obj.id = persistent_id
obj.is_breakpoint = module.is_breakpoint
Expand Down Expand Up @@ -487,6 +491,9 @@ def make_change_parameter(obj):
persistent_sinks = [tmp_id_to_module_map[sink]
for sink in pipeline.graph.sinks()]

self._streams.append(Iterator.generators)
Iterator.generators = []

# Update new sinks
for obj in persistent_sinks:
abort = False
Expand Down Expand Up @@ -519,6 +526,44 @@ def make_change_parameter(obj):
if stop_on_error or abort:
break

# execute all generators until inputs are exhausted
# this makes sure branching and multiple sinks are executed correctly
if not logging_obj.errors and not logging_obj.suspended and \
Iterator.generators:
result = True
while result is not None:
for g in Iterator.generators:
abort = False
try:
result = g.next()
continue
except ModuleWasSuspended:
continue
except ModuleHadError:
pass
except AbortExecution:
break
except ModuleSuspended, ms:
ms.module.logging.end_update(ms.module, ms,
was_suspended=True)
continue
except ModuleErrors, mes:
for me in mes.module_errors:
me.module.logging.end_update(me.module, me)
logging_obj.signalError(me.module, me)
abort = abort or me.abort
except ModuleError, me:
me.module.logging.end_update(me.module, me, me.errorTrace)
logging_obj.signalError(me.module, me)
abort = me.abort
except ModuleBreakpoint, mb:
mb.module.logging.end_update(mb.module)
logging_obj.signalError(mb.module, mb)
abort = True
if stop_on_error or abort:
break
Iterator.generators = self._streams.pop()

if self.done_update_hook:
self.done_update_hook(self._persistent_pipeline, self._objects)

Expand Down Expand Up @@ -644,9 +689,9 @@ def fetch(name, default):
if len(kwargs) > 0:
raise VistrailsInternalError('Wrong parameters passed '
'to execute: %s' % kwargs)

self.clean_non_cacheable_modules()


# if controller is not None:
# vistrail = controller.vistrail
# (pipeline, module_remap) = \
Expand Down

0 comments on commit efc7625

Please sign in to comment.