/
vistrails_module.py
1863 lines (1610 loc) · 72.3 KB
/
vistrails_module.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
###############################################################################
##
## Copyright (C) 2014-2015, New York University.
## Copyright (C) 2011-2014, NYU-Poly.
## Copyright (C) 2006-2011, University of Utah.
## All rights reserved.
## Contact: contact@vistrails.org
##
## This file is part of VisTrails.
##
## "Redistribution and use in source and binary forms, with or without
## modification, are permitted provided that the following conditions are met:
##
## - Redistributions of source code must retain the above copyright notice,
## this list of conditions and the following disclaimer.
## - Redistributions in binary form must reproduce the above copyright
## notice, this list of conditions and the following disclaimer in the
## documentation and/or other materials provided with the distribution.
## - Neither the name of the New York University nor the names of its
## contributors may be used to endorse or promote products derived from
## this software without specific prior written permission.
##
## THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
## AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
## THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
## PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
## CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
## EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
## PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
## OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
## WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
## OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
## ADVISED OF THE POSSIBILITY OF SUCH DAMAGE."
##
###############################################################################
from __future__ import division
from base64 import b16encode, b16decode
import copy
from itertools import izip, product
import json
import sys
import time
import traceback
import warnings
from vistrails.core.data_structures.bijectivedict import Bidict
from vistrails.core import debug
from vistrails.core.configuration import get_vistrails_configuration
from vistrails.core.modules.config import ModuleSettings, IPort, OPort
from vistrails.core.vistrail.module_control_param import ModuleControlParam
from vistrails.core.utils import VistrailsDeprecation, deprecated, \
xor, long2bytes
try:
import hashlib
sha1_hash = hashlib.sha1
except ImportError:
import sha
sha1_hash = sha.new
class NeedsInputPort(Exception):
def __init__(self, obj, port):
self.obj = obj
self.port = port
def __str__(self):
return "Module %s needs port %s" % (self.obj, self.port)
class IncompleteImplementation(Exception):
def __str__(self):
return "Module has incomplete implementation"
class ModuleBreakpoint(Exception):
def __init__(self, module):
self.module = module
self.msg = "Hit breakpoint"
self.errorTrace = ''
def __str__(self):
retstr = "Encoutered breakpoint at Module %s:\n" % (self.module)
for k in self.module.__dict__.keys():
retstr += "\t%s = %s\n" % (k, self.module.__dict__[k])
inputs = self.examine_inputs()
retstr += "\nModule has inputs:\n"
for i in inputs.keys():
retstr += "\t%s = %s\n" % (i, inputs[i])
return retstr
def examine_inputs(self):
in_ports = self.module.__dict__["inputPorts"]
inputs = {}
for p in in_ports:
inputs[p] = self.module.get_input_list(p)
return inputs
class ModuleHadError(Exception):
"""Exception occurring when a module that failed gets updated again.
It is caught by the interpreter that doesn't log it.
"""
def __init__(self, module):
self.module = module
class ModuleWasSuspended(ModuleHadError):
"""Exception occurring when a module that was suspended gets updated again.
"""
class ModuleError(Exception):
"""Exception representing a VisTrails module runtime error. This
exception is recognized by the interpreter and allows meaningful error
reporting to the user and to the logging mechanism.
"""
def __init__(self, module, errormsg, abort=False, errorTrace=None):
"""ModuleError should be passed the module instance that signaled the
error and the error message as a string.
"""
Exception.__init__(self, errormsg)
self.abort = abort # force abort even if stopOnError is False
self.module = module
self.msg = errormsg
self.errorTrace = errorTrace
class ModuleSuspended(ModuleError):
"""Exception representing a VisTrails module being suspended.
Raising ModuleSuspended flags that the module is not ready to finish yet
and that the workflow should be executed later.
This is useful when executing external jobs where you do not want to block
vistrails while waiting for the execution to finish.
'monitor' is a class instance that should provide a finished() method for
checking if the job has finished
'children' is a list of ModuleSuspended instances that is used for nested
modules
"""
def __init__(self, module, errormsg, handle=None, children=None,
queue=None):
ModuleError.__init__(self, module, errormsg)
self.handle = handle
if handle is None and queue is not None:
warnings.warn("Use of deprecated argument 'queue' replaced by "
"'handle'",
category=VistrailsDeprecation,
stacklevel=2)
self.handle = queue
self.children = children
self.name = None
@property
def queue(self):
return self.handle
class ModuleErrors(Exception):
"""Exception representing a list of VisTrails module runtime errors.
This exception is recognized by the interpreter and allows meaningful
error reporting to the user and to the logging mechanism.
"""
def __init__(self, module_errors):
"""ModuleErrors should be passed a list of ModuleError objects"""
Exception.__init__(self, str(tuple(me.msg for me in module_errors)))
self.module_errors = module_errors
class _InvalidOutput(object):
""" Specify an invalid result
"""
pass
InvalidOutput = _InvalidOutput
################################################################################
# DummyModuleLogging
class DummyModuleLogging(object):
def _dummy_method(self, *args, **kwargs): pass
def __getattr__(self, name):
return self._dummy_method
_dummy_logging = DummyModuleLogging()
################################################################################
# Module
class Module(object):
"""Module is the base module from which all module functionality
is derived from in VisTrails. It defines a set of basic interfaces to
deal with data input/output (through ports, as will be explained
later), as well as a basic mechanism for dataflow based updates.
*Execution Model*
VisTrails assumes fundamentally that a pipeline is a dataflow. This
means that pipeline cycles are disallowed, and that modules are
supposed to be free of side-effects. This is obviously not possible
in general, particularly for modules whose sole purpose is to
interact with operating system resources. In these cases, designing
a module is harder -- the side effects should ideally not be exposed
to the module interface. VisTrails provides some support for making
this easier, as will be discussed later.
VisTrails caches intermediate results to increase efficiency in
exploration. It does so by reusing pieces of pipelines in later
executions.
*Terminology*
Module Interface: The module interface is the set of input and
output ports a module exposes.
*Designing New Modules*
Designing new modules is essentially a matter of subclassing this
module class and overriding the compute() method. There is a
fully-documented example of this on the default package
'pythonCalc', available on the 'packages/pythonCalc' directory.
*Caching*
Caching affects the design of a new module. Most importantly,
users have to account for compute() being called more than
once. Even though compute() is only called once per individual
execution, new connections might mean that previously uncomputed
output must be made available.
Also, operating system side-effects must be carefully accounted
for. Some operations are fundamentally side-effectful (creating OS
output like uploading a file on the WWW or writing a file to a
local hard drive). These modules should probably not be cached at
all. VisTrails provides an easy way for modules to report that
they should not be cached: simply subclass from the NotCacheable
mixin provided in this python module. (NB: In order for the mixin
to work appropriately, NotCacheable must appear *BEFORE* any other
subclass in the class hierarchy declarations). These modules (and
anything that depends on their results) will then never be reused.
*Intermediate Files*
Many modules communicate through intermediate files. VisTrails
provides automatic filename and handle management to alleviate the
burden of determining tricky things (e.g. longevity) of these
files. Modules can request temporary file names through the file pool,
currently accessible through ``self.interpreter.filePool``.
The FilePool class is available in core/modules/module_utils.py -
consult its documentation for usage. Notably, using the file pool
will make temporary files work correctly with caching, and will
make sure the temporaries are correctly removed.
"""
_settings = ModuleSettings(is_root=True, abstract=True)
_output_ports = [OPort("self", "Module", optional=True)]
def __init__(self):
self.inputPorts = {}
self.outputPorts = {}
self.upToDate = False
self.had_error = False
self.was_suspended = False
self.is_while = False
self.list_depth = 0
self.logging = _dummy_logging
# isMethod stores whether a certain input port is a method.
# If so, isMethod maps the port to the order in which it is
# stored. This is so that modules that need to know about the
# method order can work correctly
self.is_method = Bidict()
self._latest_method_order = 0
self.control_params = {}
self.input_specs = {}
self.output_specs = {}
self.input_specs_order = []
self.output_specs_order = []
self.iterated_ports = []
self.streamed_ports = {}
self.in_pipeline = False
self.set_output("self", self) # every object can return itself
# Pipeline info that a module should know about This is useful
# for a spreadsheet cell to know where it is from. It will be
# also used for talking back and forth between the spreadsheet
# and the builder besides Parameter Exploration.
self.moduleInfo = {
'locator': None,
'controller': None,
'vistrailName': 'Unknown',
'version': -1,
'pipeline': None,
'moduleId': -1,
'reason': 'Pipeline Execution',
'actions': []
}
self.is_breakpoint = False
# computed stores wether the module was computed
# used for the logging stuff
self.computed = False
self.signature = None
# stores whether the output of the module should be annotated in the
# execution log
self.annotate_output = False
def transfer_attrs(self, module):
if module.cache != 1:
self.is_cacheable = lambda *args: False
self.list_depth = module.list_depth
self.is_breakpoint = module.is_breakpoint
for cp in module.control_parameters:
self.control_params[cp.name] = cp.value
self.input_specs = dict((p.name, p) for p in module.destinationPorts())
self.output_specs = dict((p.name, p) for p in module.sourcePorts())
self.input_specs_order = [p.name for p in module.destinationPorts()]
self.output_specs_order = [p.name for p in module.sourcePorts()]
def __copy__(self):
"""Makes a copy of the input/output ports on shallow copy.
"""
s = super(Module, self)
if hasattr(s, '__copy__'):
clone = s.__copy__()
else:
clone = object.__new__(self.__class__)
clone.__dict__ = self.__dict__.copy()
clone.inputPorts = copy.copy(self.inputPorts)
clone.outputPorts = copy.copy(self.outputPorts)
clone.outputPorts['self'] = clone
clone.control_params = self.control_params.copy()
clone.input_specs = self.input_specs
clone.output_specs = self.output_specs
clone.input_specs_order = self.input_specs_order
clone.output_specs_order = self.output_specs_order
return clone
def clear(self):
"""clear(self) -> None.
Removes all references, prepares for deletion.
"""
for connector_list in self.inputPorts.itervalues():
for connector in connector_list:
connector.clear()
self.inputPorts = {}
self.outputPorts = {}
self.logging = _dummy_logging
self.is_method = Bidict()
self._latest_method_order = 0
def is_cacheable(self):
"""is_cacheable() -> bool.
A Module should return whether it can be
reused across executions. It is safe for a Module to return
different values in different occasions. In other words, it is
possible for modules to be cacheable depending on their
execution context.
"""
return True
def update_upstream_port(self, port_name):
"""Updates upstream of a single port instead of all ports."""
if port_name in self.inputPorts:
for connector in self.inputPorts[port_name]:
connector.obj.update() # Might raise
for connector in copy.copy(self.inputPorts[port_name]):
if connector.obj.get_output(connector.port) is InvalidOutput:
self.remove_input_connector(port_name, connector)
def useJobCache(self):
""" useJobCache() -> Module/None
Checks if this is a job cache
"""
if not self.moduleInfo.get('pipeline', None):
return False
p_modules = self.moduleInfo['pipeline'].modules
p_module = p_modules[self.moduleInfo['moduleId']]
if p_module.has_control_parameter_with_name(
ModuleControlParam.JOB_CACHE_KEY):
jobCache = p_module.get_control_parameter_by_name(
ModuleControlParam.JOB_CACHE_KEY).value
if jobCache and jobCache.lower() == 'true':
return p_module
return False
def setJobCache(self):
""" setJobCache() -> Boolean
Checks if this is a job cache and it exists
"""
p_module = self.useJobCache()
if not p_module:
return False
jm = self.job_monitor()
specs = p_module.sourcePorts()
if jm.getCache(self.signature):
self.cache = jm.getCache(self.signature)
from vistrails.core.modules.basic_modules import Constant
for param, value in jm.getCache(self.signature).parameters.iteritems():
# get type for output param
spec = [s for s in specs if s.name == param][0]
module = spec.descriptors()[0].module
if not issubclass(module, Constant):
raise ModuleError(self, "Trying to use a non-constant type a cache: %s" % spec.name)
self.set_output(param, module.translate_to_python(value))
self.upToDate = True
return True
return False
def addJobCache(self):
""" addJobCache() -> None
Add outputs from job cache
"""
p_module = self.useJobCache()
if not p_module:
return False
jm = self.job_monitor()
specs = p_module.sourcePorts()
params = {}
if not jm.getCache(self.signature):
from vistrails.core.modules.basic_modules import Constant
for spec in specs:
if spec.name == 'self':
continue
# get type for output param
module = spec.descriptors()[0].module
if not issubclass(module, Constant):
raise ModuleError(self, "Trying to cache a non-constant type: %s" % spec.name)
params[spec.name] = module.translate_to_string(self.get_output(spec.name))
jm.setCache(self.signature, params, p_module.name)
def update_upstream(self):
""" update_upstream() -> None
Go upstream from the current module, then update its upstream
modules and check input connection based on upstream modules
results
"""
suspended = []
was_suspended = None
for connectorList in self.inputPorts.itervalues():
for connector in connectorList:
try:
connector.obj.update()
except ModuleWasSuspended, e:
was_suspended = e
except ModuleSuspended, e:
suspended.append(e)
# Here we keep going even if one of the module suspended, but
# we'll stop right after the loop
if len(suspended) == 1:
raise suspended[0]
elif suspended:
raise ModuleSuspended(
self,
"multiple suspended upstream modules",
children=suspended)
elif was_suspended is not None:
raise was_suspended
for iport, connectorList in copy.copy(self.inputPorts.items()):
for connector in connectorList:
if connector.obj.get_output(connector.port) is InvalidOutput:
self.remove_input_connector(iport, connector)
def set_iterated_ports(self):
""" set_iterated_ports() -> None
Calculates which inputs needs to be iterated over
"""
iports = {}
from vistrails.core.modules.basic_modules import List, Variant
for port_name, connectorList in self.inputPorts.iteritems():
for connector in connectorList:
src_depth = connector.depth()
if not self.input_specs:
# cannot do depth wrapping
continue
# Give List an additional depth
dest_descs = self.input_specs[port_name].descriptors()
dest_depth = self.input_specs[port_name].depth
if len(dest_descs) == 1 and dest_descs[0].module == List:
dest_depth += 1
if connector.spec:
src_descs = connector.spec.descriptors()
if len(src_descs) == 1 and src_descs[0].module == List and \
len(dest_descs) == 1 and dest_descs[0].module == Variant:
# special case - Treat Variant as list
src_depth -= 1
if len(src_descs) == 1 and src_descs[0].module == Variant and \
len(dest_descs) == 1 and dest_descs[0].module == List:
# special case - Treat Variant as list
dest_depth -= 1
# store connector with greatest depth
# if value depth > port depth
depth = src_depth - dest_depth
if depth > 0 and (port_name not in iports or
iports[port_name][1] < depth):
# keep largest
# raw connector only use by streaming and then use only
# one connector
iports[port_name] = (port_name, depth, connector.get_raw())
self.iterated_ports = [iports[p] for p in self.input_specs_order
if p in iports]
def set_streamed_ports(self):
""" set_streamed_ports() -> None
Calculates which inputs will be streamed
"""
self.streamed_ports = {}
from vistrails.core.modules.basic_modules import Generator
for iport, connectorList in self.inputPorts.items():
for connector in connectorList:
value = connector.get_raw()
if isinstance(value, Generator):
self.streamed_ports[iport] = value
def update(self):
""" update() -> None
Check if the module is up-to-date then update the
modules. Report to the logger if available
"""
if self.had_error:
raise ModuleHadError(self)
elif self.was_suspended:
raise ModuleWasSuspended(self)
elif self.computed:
return
self.logging.begin_update(self)
if not self.setJobCache():
self.update_upstream()
if self.upToDate:
if not self.computed:
self.logging.update_cached(self)
self.computed = True
return
self.had_error = True # Unset later in this method
self.logging.begin_compute(self)
try:
if self.is_breakpoint:
raise ModuleBreakpoint(self)
self.set_iterated_ports()
self.set_streamed_ports()
if self.streamed_ports:
self.build_stream()
elif self.list_depth > 0:
self.compute_all()
elif (self.in_pipeline and
not self.is_while and
(ModuleControlParam.WHILE_COND_KEY in self.control_params or
ModuleControlParam.WHILE_MAX_KEY in self.control_params)):
self.is_while = True
self.compute_while()
else:
self.compute()
self.addJobCache()
self.computed = True
except ModuleSuspended, e:
self.had_error, self.was_suspended = False, True
raise
except ModuleError, me:
if hasattr(me.module, 'interpreter'):
if me.errorTrace is None:
me.errorTrace = traceback.format_exc()
raise
else:
msg = "A dynamic module raised an exception: '%s'" % me
raise ModuleError(self, msg, errorTrace=me.errorTrace)
except ModuleErrors:
raise
except KeyboardInterrupt, e:
raise ModuleError(self, 'Interrupted by user')
except ModuleBreakpoint:
raise
except Exception, e:
import traceback
traceback.print_exc()
debug.unexpected_exception(e)
raise ModuleError(
self,
"Uncaught exception: %s" % debug.format_exception(e),
errorTrace=traceback.format_exc())
if self.annotate_output:
self.annotate_output_values()
self.upToDate = True
self.had_error = False
self.logging.end_update(self)
self.logging.signalSuccess(self)
def do_combine(self, combine_type, inputs, port_name_order):
values = []
port_names = []
for port_name in port_name_order:
# this is how the (brittle) recursion is accomplished
if not isinstance(port_name, basestring):
sub_combine_type = port_name[0]
sub_port_names = port_name[1:]
sub_values, sub_port_names = self.do_combine(sub_combine_type,
inputs,
sub_port_names)
values.append(sub_values)
port_names.extend(sub_port_names)
else:
values.append((e,) for e in inputs[port_name])
port_names.append(port_name)
if combine_type == "pairwise":
elements = [tuple(e for t in t_list for e in t)
for t_list in izip(*values)]
elif combine_type == "cartesian":
elements = [tuple(e for t in t_list for e in t)
for t_list in product(*values)]
else:
raise ValueError('Unknown combine type "%s"' % combine_type)
return elements, port_names
def get_combine_type(self, default="cartesian"):
if ModuleControlParam.LOOP_KEY in self.control_params:
return self.control_params[ModuleControlParam.LOOP_KEY]
return default
def compute_all(self):
"""This method executes the module once for each input.
Similarly to controlflow's fold.
"""
from vistrails.core.modules.sub_module import InputPort
if isinstance(self, InputPort):
return self.compute()
if self.list_depth < 1:
raise ModuleError(self, "List compute has wrong depth: %s" %
self.list_depth)
combine_type = self.get_combine_type('cartesian')
suspended = []
inputs = {} # dict of port_name: value
port_names = []
for port_name, depth, _ in self.iterated_ports:
# only iterate max depth and leave the others for the
# next iteration
if depth != self.list_depth:
continue
inputs[port_name] = self.get_input(port_name)
port_names.append(port_name)
if combine_type not in ['pairwise', 'cartesian']:
custom_order = json.loads(combine_type)
combine_type = custom_order[0]
port_names = custom_order[1:]
elements, port_names = self.do_combine(combine_type, inputs, port_names)
num_inputs = len(elements)
loop = self.logging.begin_loop_execution(self, num_inputs)
## Update everything for each value inside the list
outputs = {}
for i in xrange(num_inputs):
self.logging.update_progress(self, float(i)/num_inputs)
module = copy.copy(self)
module.list_depth = self.list_depth - 1
module.had_error = False
module.was_suspended = False
if not self.upToDate: # pragma: no partial
## Type checking if first iteration and last iteration level
if i == 0 and self.list_depth == 1:
self.typeChecking(module, port_names, elements)
module.upToDate = False
module.computed = False
self.setInputValues(module, port_names, elements[i], i)
loop.begin_iteration(module, i)
try:
module.update()
except ModuleSuspended, e:
e.loop_iteration = i
module.logging.end_update(module, e, was_suspended=True)
suspended.append(e)
loop.end_iteration(module)
continue
loop.end_iteration(module)
## Getting the result from the output port
for nameOutput in module.outputPorts:
if nameOutput not in outputs:
outputs[nameOutput] = []
output = module.get_output(nameOutput)
outputs[nameOutput].append(output)
self.logging.update_progress(self, i * 1.0 / num_inputs)
if suspended:
raise ModuleSuspended(
self,
"function module suspended in %d/%d iterations" % (
len(suspended), num_inputs),
children=suspended)
# set final outputs
for nameOutput in outputs:
self.set_output(nameOutput, outputs[nameOutput])
loop.end_loop_execution()
def build_stream(self):
"""Determines and builds correct generator type.
"""
from vistrails.core.modules.basic_modules import PythonSource
if True in [g.accumulated for g in self.streamed_ports.values()]:
# the module can only compute once the streaming is finished
self.compute_after_streaming()
elif self.list_depth > 0:
# iterate the module for each value in the stream
self.compute_streaming()
elif isinstance(self, Streaming) or\
(isinstance(self, PythonSource) and
'%23%20pragma%3A%20streaming' in self.get_input('source')):
# Magic tag: "# pragma: streaming"
# the module creates its own generator object
self.compute()
else:
# the module cannot handle generator so we accumulate the stream
self.compute_accumulate()
def compute_streaming(self):
"""This method creates a generator object and sets the outputs as
generators.
"""
from vistrails.core.modules.basic_modules import Generator
type = self.control_params.get(ModuleControlParam.LOOP_KEY, 'pairwise')
if type == 'cartesian':
raise ModuleError(self,
'Cannot use cartesian product while streaming!')
suspended = []
# only iterate the max depth and leave others for the next iteration
ports = [port for port, depth, value in self.iterated_ports
if depth == self.list_depth]
num_inputs = self.iterated_ports[0][2].size
# the generator will read next from each iterated input port and
# compute the module again
module = copy.copy(self)
module.list_depth = self.list_depth - 1
if num_inputs:
milestones = [i*num_inputs//10 for i in xrange(1, 11)]
def generator(self):
self.logging.begin_compute(module)
i = 0
while 1:
iter_dict = dict([(port, (depth, value))
for port, depth, value in
self.iterated_ports])
elements = [iter_dict[port][1].next() for port in ports]
if None in elements:
for name_output in module.outputPorts:
module.set_output(name_output, None)
if suspended:
raise ModuleSuspended(
self,
("function module suspended after streaming "
"%d/%d iterations") % (
len(suspended), num_inputs),
children=suspended)
self.logging.update_progress(module, 1.0)
self.logging.end_update(module)
yield None
if num_inputs:
if i in milestones:
self.logging.update_progress(module, float(i)/num_inputs)
else:
self.logging.update_progress(module, 0.5)
module.had_error = False
## Type checking
if i == 0:
self.typeChecking(module, ports, [elements])
module.upToDate = False
module.computed = False
self.setInputValues(module, ports, elements, i)
try:
module.compute()
except ModuleSuspended, e:
e.loop_iteration = i
suspended.append(e)
except Exception, e:
raise ModuleError(module, str(e))
i += 1
yield True
_generator = generator(self)
# set streaming outputs
for name_output in self.outputPorts:
iterator = Generator(size=num_inputs,
module=module,
generator=_generator,
port=name_output)
self.set_output(name_output, iterator)
def compute_accumulate(self):
"""This method creates a generator object that converts all
streaming inputs to list inputs for modules that does not explicitly
support streaming.
"""
from vistrails.core.modules.basic_modules import Generator
suspended = []
# max depth should be one
ports = self.streamed_ports.keys()
num_inputs = self.streamed_ports[ports[0]].size
# the generator will read next from each iterated input port and
# compute the module again
module = copy.copy(self)
module.had_error = False
module.upToDate = False
module.computed = False
inputs = dict([(port, []) for port in ports])
def generator(self):
self.logging.begin_update(module)
i = 0
while 1:
elements = [self.streamed_ports[port].next() for port in ports]
if None in elements:
self.logging.begin_compute(module)
# assembled all inputs so do the actual computation
elements = [inputs[port] for port in ports]
## Type checking
self.typeChecking(module, ports, zip(*elements))
self.setInputValues(module, ports, elements, i)
try:
module.compute()
except Exception, e:
raise ModuleError(module, str(e))
if suspended:
raise ModuleSuspended(
self,
("function module suspended after streaming "
"%d/%d iterations") % (
len(suspended), num_inputs),
children=suspended)
self.logging.end_update(module)
yield None
for port, value in zip(ports, elements):
inputs[port].append(value)
for name_output in module.outputPorts:
module.set_output(name_output, None)
i += 1
yield True
_generator = generator(self)
# set streaming outputs
for name_output in self.outputPorts:
iterator = Generator(size=num_inputs,
module=module,
generator=_generator,
port=name_output,
accumulated=True)
self.set_output(name_output, iterator)
def compute_after_streaming(self):
"""This method creates a generator object that computes when the
streaming is finished.
"""
from vistrails.core.modules.basic_modules import Generator
suspended = []
# max depth should be one
# max depth should be one
ports = self.streamed_ports.keys()
num_inputs = self.streamed_ports[ports[0]].size
# the generator will read next from each iterated input port and
# compute the module again
module = copy.copy(self)
module.had_error = False
module.upToDate = False
module.computed = False
def generator(self):
self.logging.begin_update(module)
i = 0
for name_output in module.outputPorts:
module.set_output(name_output, None)
while 1:
elements = [self.streamed_ports[port].next() for port in ports]
if None not in elements:
self.logging.begin_compute(module)
## Type checking
self.typeChecking(module, ports, [elements])
self.setInputValues(module, ports, elements, i)
try:
module.compute()
except Exception, e:
raise ModuleError(module, str(e))
if suspended:
raise ModuleSuspended(
self,
("function module suspended after streaming "
"%d/%d iterations") % (
len(suspended), num_inputs),
children=suspended)
self.logging.update_progress(self, 1.0)
self.logging.end_update(module)
yield None
i += 1
yield True
_generator = generator(self)
# set streaming outputs
for name_output in self.outputPorts:
iterator = Generator(size=num_inputs,
module=module,
generator=_generator,
port=name_output,
accumulated=True)
self.set_output(name_output, iterator)
def compute_while(self):
"""This method executes the module once for each module.
Similarly to fold.
"""
name_condition = self.control_params.get(
ModuleControlParam.WHILE_COND_KEY, None)
max_iterations = int(self.control_params.get(
ModuleControlParam.WHILE_MAX_KEY, 20))
delay = float(self.control_params.get(
ModuleControlParam.WHILE_DELAY_KEY, 0.0))
# todo only one state port supported right now
name_state_input = self.control_params.get(
ModuleControlParam.WHILE_INPUT_KEY, None)
name_state_input = [name_state_input] if name_state_input else None
name_state_output = self.control_params.get(
ModuleControlParam.WHILE_OUTPUT_KEY, None)
name_state_output = [name_state_output] if name_state_output else None
from vistrails.core.modules.basic_modules import create_constant
if name_state_input or name_state_output:
if not name_state_input or not name_state_output:
raise ModuleError(self,
"Passing state between iterations requires "
"BOTH StateInputPorts and StateOutputPorts "
"to be set")
if len(name_state_input) != len(name_state_output):
raise ModuleError(self,
"StateInputPorts and StateOutputPorts need "
"to have the same number of ports "
"(got %d and %d)" %(len(name_state_input),
len(name_state_output)))
module = copy.copy(self)
module.had_error = False
module.is_while = True
state = None
loop = self.logging.begin_loop_execution(self, max_iterations)
for i in xrange(max_iterations):
if not self.upToDate:
module.upToDate = False
module.computed = False
# Set state on input ports
if i > 0 and name_state_input:
for value, input_port, output_port \
in izip(state, name_state_input, name_state_output):
if input_port in module.inputPorts:
del module.inputPorts[input_port]
new_connector = ModuleConnector(
create_constant(value), 'value',
module.output_specs.get(output_port, None))
module.set_input_port(input_port, new_connector)
loop.begin_iteration(module, i)
try: