-
Notifications
You must be signed in to change notification settings - Fork 23
/
workflow.py
executable file
·1212 lines (1037 loc) · 45.6 KB
/
workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#! /usr/bin/env python
#
"""
Implementation of task collections.
Tasks can be grouped into collections, which are tasks themselves,
therefore can be controlled (started/stopped/cancelled) like a single
whole. Collection classes provided in this module implement the basic
patterns of job group execution; they can be combined to form more
complex workflows. Hook methods are provided so that derived classes
can implement problem-specific job control policies.
"""
# Copyright (C) 2009-2016 S3IT, Zentrale Informatik, University of Zurich. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
__docformat__ = 'reStructuredText'
import itertools
import os
from collections import defaultdict
from gc3libs.compat.toposort import toposort
from gc3libs import Run, Task
import gc3libs.exceptions
import gc3libs.utils
class TaskCollection(Task):
"""
Base class for all task collections. A "task collection" is a
group of tasks, that can be managed collectively as a single one.
A task collection implements the same interface as the `Task`
class, so you can use a `TaskCollection` everywhere a `Task` is
required. A task collection has a `state` attribute, which is an
instance of `gc3libs.Run.State`; each concrete collection class
decides how to deduce a collective state based on the individual
task states.
"""
# a generic `TaskCollection` produces no output -- change in
# subclasses or specific instance if needed
would_output = False
def __init__(self, tasks=None, **extra_args):
if tasks is None:
self.tasks = []
else:
self.tasks = tasks
Task.__init__(self, **extra_args)
def iter_workflow(self):
"""
Returns an iterator that will traverse the whole tree of tasks.
"""
return itertools.chain(
# this task collection
itertools.repeat(self, 1),
# iterator over non-collection subtasks
(task for task in self.tasks
if not isinstance(task, TaskCollection)),
# recurse into collection subtasks
*(coll.iter_workflow() for coll in self.tasks
if isinstance(coll, TaskCollection)))
def iter_tasks(self):
"""
Iterate over non-collection tasks enclosed in this collection.
"""
return itertools.chain(
# this task collection
itertools.repeat(self, 1),
# iterator over non-collection subtasks
(task for task in self.tasks),
)
@gc3libs.utils.defproperty
def changed():
"""
Evaluates to `True` if this task or any of its subtasks has been
modified and should be saved to persistent storage.
"""
def fget(self):
if self._changed:
return True
for task in self.tasks:
if '_changed' in task:
if task._changed:
return True
return False
def fset(self, value):
self._changed = value
return locals()
# manipulate the "controller" interface used to control the associated task
def attach(self, controller):
"""
Use the given Controller interface for operations on the job
associated with this task.
"""
raise NotImplementedError(
"Called abstract method TaskCollection.attach() -"
" this should be overridden in derived classes.")
def detach(self):
for task in self.tasks:
task.detach()
Task.detach(self)
def add(self, task):
"""
Add a task to the collection.
"""
raise NotImplementedError(
"Called abstract method TaskCollection.add() -"
" this should be overridden in derived classes.")
def remove(self, task):
"""
Remove a task from the collection.
"""
self.tasks.remove(task)
task.detach()
# task execution manipulation -- these methods should be overriden
# in derived classes, to implement the desired policy.
def submit(self, resubmit=False, targets=None, **extra_args):
raise NotImplementedError(
"Called abstract method TaskCollection.submit() -"
" this should be overridden in derived classes.")
def update_state(self, **extra_args):
"""
Update the running state of all managed tasks.
"""
for task in self.tasks:
if task.execution.state not in [Run.State.NEW, Run.State.TERMINATED]:
self._controller.update_job_state(task, **extra_args)
def kill(self, **extra_args):
# XXX: provide default implementation that kills all jobs?
raise NotImplementedError(
"Called abstract method TaskCollection.kill() -"
" this should be overridden in derived classes.")
def _get_download_dir(self, download_dir):
"""
Return the base directory path where to download this Task
collection's output files.
Works just like method `Task._get_download_dir`, the only
difference being that here we cannot return ``None`` if this
Task is not expected to produce any output, as the value will
be used as a base path for all dependent tasks.
"""
# determine download dir
if download_dir is not None:
return download_dir
else:
try:
return self.output_dir
except AttributeError:
cwd = os.getcwd()
gc3libs.log.warning(
"`TaskCollection._get_download_dir()` called"
" with no explicit download directory,"
" but object '%s' (%s) has no `output_dir`"
" attribute set either. Using `%s` as collection's"
" base output directory."
% (self, type(self), cwd))
return cwd
def fetch_output(self, output_dir=None,
overwrite=False, changed_only=True, **extra_args):
# if `output_dir` is not None, it is interpreted as the base
# directory where to download files; each task will get its
# own subdir based on its `.persistent_id`
coll_output_dir = self._get_download_dir(output_dir)
assert coll_output_dir is not None, \
("Unknown collection output directory!"
" Task collection '%s' was not initialized with"
" an explicit `output_dir=...` setting,"
" but then `fetch_output()` was called without any"
" explicit `output_dir=...` argument."
% (self,))
for task in self.tasks:
if task.execution.state != Run.State.TERMINATING:
continue
if 'output_dir' in task:
task_output_dir = task.output_dir
else:
task_output_dir = task.persistent_id
# XXX: uses a feature from `os.path.join`: if the second
# path is absolute, the first path is discarded and the
# second one is returned unchanged
task_output_dir = os.path.join(coll_output_dir, task_output_dir)
self._controller.fetch_output(
task,
task_output_dir,
overwrite,
changed_only,
**extra_args)
# if any sub-task is not yet TERMINATED, return the base
# output directory for the collection...
for task in self.tasks:
if task.execution.state != Run.State.TERMINATED:
return coll_output_dir
# ...otherwise set the state to TERMINATED
self.execution.state = Run.State.TERMINATED
self.changed = True
return coll_output_dir
def free(self):
"""
This method just asks the Engine to free the contained tasks.
"""
if self._attached:
for task in self.tasks:
self._controller.free(task)
def peek(self, what, offset=0, size=None, **extra_args):
"""
Raise a `gc3libs.exceptions.InvalidOperation` error, as there
is no meaningful semantics that can be defined for `peek` into
a generic collection of tasks.
"""
# is there any sensible semantic here?
raise gc3libs.exceptions.InvalidOperation(
"Cannot `peek()` on a task collection.")
def stats(self, only=None):
"""
Return a dictionary mapping each state name into the count of
tasks in that state. In addition, the following keys are defined:
* `ok`: count of TERMINATED tasks with return code 0
* `failed`: count of TERMINATED tasks with nonzero return code
* `total`: count of managed tasks, whatever their state
If the optional argument `only` is not None, tasks whose
class is not contained in `only` are ignored.
:param tuple only: Restrict counting to tasks of these classes.
"""
result = defaultdict(lambda: 0)
for task in self.tasks:
if only and not isinstance(task, only):
continue
state = task.execution.state
result[state] += 1
if state == Run.State.TERMINATED:
if task.execution.returncode == 0:
result['ok'] += 1
else:
result['failed'] += 1
if only:
result['total'] = len([task for task in self.tasks
if isinstance(task, only)])
else:
result['total'] = len(self.tasks)
return result
def terminated(self):
"""
Called when the job state transitions to `TERMINATED`, i.e.,
the job has finished execution (with whatever exit status, see
`returncode`) and the final output has been retrieved.
Default implementation for `TaskCollection` is to set the
exitcode to the maximum of the exit codes of its tasks.
If no tasks were run, the exitcode is set to 0.
"""
if self.tasks:
self.execution._exitcode = max(
task.execution._exitcode for task in self.tasks
)
else:
# a sequence with no tasks terminates successfully
self.execution._exitcode = 0
class SequentialTaskCollection(TaskCollection):
"""
A `SequentialTaskCollection` runs its tasks one at a time.
After a task has completed, the `next` method is called with the
index of the finished task in the `self.tasks` list; the return
value of the `next` method is then made the collection
`execution.state`. If the returned state is `RUNNING`, then the
subsequent task is started, otherwise no action is performed.
The default `next` implementation just runs the tasks in the order
they were given to the constructor, and sets the state to
`TERMINATED` when all tasks have been run.
"""
def __init__(self, tasks, **extra_args):
# XXX: check that `tasks` is a sequence type
TaskCollection.__init__(self, tasks, **extra_args)
self._current_task = None
def add(self, task):
task.detach()
self.tasks.append(task)
def attach(self, controller):
"""
Use the given Controller interface for operations on the job
associated with this task.
"""
if self._current_task is not None:
self.tasks[self._current_task].attach(controller)
Task.attach(self, controller)
def kill(self, **extra_args):
"""
Stop execution of this sequence. Kill currently-running task
(if any), then set collection state to TERMINATED.
"""
if self._current_task is not None:
self.tasks[self._current_task].kill(**extra_args)
for i in range(self._current_task+1, len(self.tasks)):
self.tasks[i].execution.state = Run.State.TERMINATED
self.execution.returned = (Run.Signals.Cancelled, -1)
self.execution.state = Run.State.TERMINATED
self.execution.returncode = (Run.Signals.Cancelled, -1)
self.changed = True
def next(self, done):
"""
Return collection state or task to run after step number `done` is terminated.
This method is called when a task is finished; the `done`
argument contains the index number of the just-finished task
into the `self.tasks` list. In other words, the task that
just completed is available as `self.tasks[done]`.
The return value from `next` can be either a task state (i.e.,
an instance of `Run.State`), or a valid index number for
`self.tasks`. In the first case:
- if the return value is `Run.State.TERMINATED`,
then no other jobs will be run;
- otherwise, the return value is assigned to `execution.state`
and the next job in the `self.tasks` list is executed.
If instead the return value is a (nonnegative) number, then
tasks in the sequence will be re-run starting from that index.
The default implementation runs tasks in the order they were
given to the constructor, and sets the state to TERMINATED
when all tasks have been run. This method can (and should) be
overridden in derived classes to implement policies for serial
job execution.
"""
if done == len(self.tasks) - 1:
return Run.State.TERMINATED
else:
return Run.State.RUNNING
def progress(self):
assert self._attached
task = self.stage()
if task is not None:
task.progress()
super(SequentialTaskCollection, self).progress()
def redo(self, from_stage=0, *args, **kwargs):
"""
Rewind the sequence to a given stage and reset its state to ``NEW``.
"""
if len(self.tasks) > 0:
super(SequentialTaskCollection, self).redo(*args, **kwargs)
self._current_task = from_stage
task = self.stage()
if task is not None:
task.redo(*args, **kwargs)
# All other tasks should be put in NEW again
for i in range(from_stage+1, len(self.tasks)):
self.tasks[i].redo(*args, **kwargs)
def submit(self, resubmit=False, targets=None, **extra_args):
"""
Start the current task in the collection.
"""
if self.tasks:
if self._current_task is None:
self._current_task = 0
task = self.tasks[self._current_task]
task.attach(self._controller)
task.submit(resubmit, targets, **extra_args)
if task.execution.state == Run.State.NEW:
# submission failed, state unchanged
self.execution.state = Run.State.NEW
elif task.execution.state == Run.State.SUBMITTED:
self.execution.state = Run.State.SUBMITTED
else:
self.execution.state = Run.State.RUNNING
else:
# no tasks to run, sequence is already finished
self.execution.state = Run.State.TERMINATED
self.changed = True
return self.execution.state
def stage(self):
"""
Return the `Task` that is currently executing, or ``None``
(if finished or not yet started).
"""
if self._current_task is None:
return None
else:
return self.tasks[self._current_task]
def update_state(self, **extra_args):
"""
Update state of the collection, based on the jobs' statuses.
"""
if self._current_task is None:
# state is either NEW or TERMINATED, no update
assert self.execution.state in [
Run.State.NEW, Run.State.TERMINATED]
return self.execution.state
# update state of current task
task = self.tasks[self._current_task]
if task.execution.state not in [Run.State. NEW, Run.State.TERMINATED]:
task.update_state(**extra_args)
gc3libs.log.debug("Task `%s` in state %s", task, task.execution.state)
# now set state based on the state of current task:
#
# 1. first task ever gets special treatment
if (self._current_task == 0
and task.execution.state in [
Run.State.NEW,
Run.State.SUBMITTED,
]):
# avoid state flapping back to NEW if it's already SUBMITTED
if self.execution.state == Run.State.NEW:
self.execution.state = task.execution.state
return self.execution.state
# 2. if current task is terminated, advance to next one
if (task.execution.state == Run.State.TERMINATED):
nxt = self.next(self._current_task)
if nxt in Run.State:
self.execution.state = nxt
collection_state_already_set = True
if self.execution.state not in [
Run.State.STOPPED,
Run.State.TERMINATED,
Run.State.TERMINATING,
]:
self._current_task += 1
else:
# `nxt` must be a valid index into `self.tasks`
assert 0 <= nxt < len(self.tasks)
self._current_task = nxt
collection_state_already_set = False
# submit next task, unless we're TERMINATED or STOPPED
if self.execution.state not in [
Run.State.STOPPED,
Run.State.TERMINATED,
Run.State.TERMINATING,
]:
next_task = self.tasks[self._current_task]
next_task.attach(self._controller)
resubmit_task = (next_task.execution.state != Run.State.NEW)
next_task.submit(resubmit=resubmit_task)
if not collection_state_already_set:
self.execution.state = Run.State.RUNNING
self.changed = True
return self.execution.state
# 3. if task stopped, stop the sequence too
if (task.execution.state == Run.State.STOPPED):
self.execution.state = Run.State.STOPPED
return self.execution.state
# 4. if task is running or terminating, keep on running
if task.execution.state in [
Run.State.NEW,
Run.State.SUBMITTED,
Run.State.RUNNING,
Run.State.TERMINATING,
Run.State.UNKNOWN,
]:
self.execution.state = Run.State.RUNNING
return self.execution.state
# 5. this shouldn't happen!
raise gc3libs.exceptions.InternalError(
"Unhandled task state `{0}`"
" in SequentialTaskCollection.update_state()"
.format(task.execution.state))
class _OnError(object):
"""
Mix-in class to make a `SequentialTaskCollection`:class: instance
turn to a specific state as soon as one of the tasks fail.
The final state is set by copying the `_on_error_state` attribute;
of course it only makes sense that it is a state that stops
further updates from GC3Pie -- like ``STOPPED`` (see
`StopOnError`:class:) or ``TERMINATED`` (see
`AbortOnError`:class:).
A second effect of mixing this class in is that the
`self.execution.returncode` attribute of the task collection
instance mirrors the return code of the last finished task.
.. seealso:: `StopOnError`:class:, `AbortOnError`:class:
"""
# override in subclasses!
_on_error_state = Run.State.UNKNOWN
def complete(self):
"""
Return ``True`` if the last executed task is the final task of the sequence.
This is used in the `_OnError.next`:meth: to determine whether
the last executed task is actually the last task of the
sequence (hence the task collection is TERMINATED even if it
failed), or more tasks could be added (hence the final state
is set by `_on_error_state`).
The default implementation always returns ``True``, which
ensures that the mix-in classes `AbortOnError`:class: and
`StopOnError`:class: work out-of-the-box with
`StagedTaskCollection`:class: and
`DependentTaskCollection`:task:.
.. versionadded:: 2.5
"""
return True
def next(self, done):
"""
Return collection state after step number `done` is terminated.
Note that the task collection state should be set to
``TERMINATED`` after the last task has been executed,
regardless of whether it failed or not. Now, there are two
distinct contexts where `next` could be applied in a
`SequentialTaskCollection` instance:
1. After the whole sequence has been built, i.e., when
`self.tasks[-1]` is actually the last task that should be
executed. This is, e.g., the case with the stock
`StagedTaskCollection`:class: and
`DependentTaskCollection`:class: instances. In this case,
the `next` method can confidently mark the task collection
as ``TERMINATED`` when the last task has been executed.
2. While the sequence is still being built: in this case, a
failed job should only set the task collection to
`_on_error_state` -- as there might be further tasks coming
down the road.
Method `complete`:meth: is available to tell `next` which of
these cases applies: if ``self.complete()`` returns ``True``
then this class' implementation of `next` takes the behavior
described in case 1. above; if `self.complete()` is instead
``False`` then the logic of case 2. above is applied instead.
See `GitHub issue #512` for a lengthier discussion.
.. seealso: :meth:`SequentialTaskCollection.next`, `GitHub issue #512`_
.. _`GitHub issue #512`: https://github.com/uzh/gc3pie/issues/512
"""
self.execution.returncode = self.tasks[done].execution.returncode
if self.complete() and done == len(self.tasks)-1:
return Run.State.TERMINATED
else:
if self.execution.returncode != 0:
return self._on_error_state
else:
return Run.State.RUNNING
class AbortOnError(_OnError):
"""
Mix-in class to make a `SequentialTaskCollection`:class: turn to
``TERMINATED`` state as soon as one of the tasks fail.
A second effect of mixing this class in is that the
`self.execution.returncode` mirrors the return code of the last
finished task.
.. note::
For the mix-in to take effect, this class should be listed
*before* the base task collection class, e.g.::
# this works
class MyTaskCollection(AbortOnError, SequentialTaskCollection):
pass
# this *does not* work
class MyOtherTaskCollection(SequentialTaskCollection, AbortOnError):
pass
See :meth:`SequentialTaskCollection.next` and `GitHub issue #512`_
for some caveats on applying this to dynamically-built task
collections.
.. _`GitHub issue #512`: https://github.com/uzh/gc3pie/issues/512
"""
_on_error_state = Run.State.TERMINATED
class StopOnError(_OnError):
"""
Mix-in class to make a `SequentialTaskCollection`:class: turn to
``STOPPED`` state as soon as one of the tasks fail.
A second effect of mixing this class in is that the
`self.execution.returncode` mirrors the return code of the last
finished task.
.. note::
For the mix-in to take effect, this class should be listed
*before* the base task collection class, e.g.::
# this works
class MyTaskCollection(StopOnError, SequentialTaskCollection):
pass
# this *does not* work
class MyOtherTaskCollection(SequentialTaskCollection, StopOnError):
pass
See :meth:`SequentialTaskCollection.next` and `GitHub issue #512`_
for some caveats on applying this to dynamically-built task
collections.
.. _`GitHub issue #512`: https://github.com/uzh/gc3pie/issues/512
"""
_on_error_state = Run.State.STOPPED
class StagedTaskCollection(SequentialTaskCollection):
"""
Simplified interface for creating a sequence of Tasks.
This can be used when the number of Tasks to run is
fixed and known at program writing time.
A `StagedTaskCollection` subclass should define methods `stage0`,
`stage1`, ... up to `stageN` (for some arbitrary value of N positive
integer). Each of these `stageN` must return a `Task`:class:
instance; the task returned by the `stage0` method will be executed
first, followed by the task returned by `stage1`, and so on.
The sequence stops at the first N such that `stageN` is not defined.
The exit status of the whole sequence is the exit status of the
last `Task` instance run. However, if any of the `stageN` methods
returns an integer value instead of a `Task` instance, then the
sequence stops and that number is used as the sequence exit
code.
"""
def __init__(self, **extra_args):
try:
first_stage = self.stage0()
if isinstance(first_stage, Task):
# init parent class with the initial task
SequentialTaskCollection.__init__(
self, [first_stage], **extra_args)
elif isinstance(first_stage, (int, long, tuple)):
# init parent class with no tasks,
# and immediately set the exitcode
SequentialTaskCollection.__init__(self, [], **extra_args)
self.execution.returncode = first_stage
self.execution.state = Run.State.TERMINATED
else:
raise AssertionError(
"Invalid return value from method `stage0()` of"
" `StagedTaskCollection` object %r:"
" must return `Task` instance or integer exit code" % self)
except AttributeError as ex:
raise AssertionError(
"Invalid `StagedTaskCollection` instance %r: %s"
% (self, str(ex)))
def next(self, done):
# get next stage (1); if none exists, log it and exit
try:
next_stage_fn = getattr(self, "stage%d" % (done + 1))
except AttributeError:
gc3libs.log.debug("StagedTaskCollection '%s' has no stage%d,"
" ending sequence now.", self, (done + 1))
self.execution.returncode = self.tasks[done].execution.returncode
return Run.State.TERMINATED
# get next stage (2); if we get an error here, something is wrong in
# the code
try:
next_stage = next_stage_fn()
except AttributeError as err:
raise AssertionError(
"Invalid `StagedTaskCollection` instance %r: %s"
% (self, str(err)))
# add next stage to the collection, or end graciously
if isinstance(next_stage, Task):
self.add(next_stage)
return Run.State.RUNNING
elif isinstance(next_stage, (int, long, tuple)):
self.execution.returncode = next_stage
return Run.State.TERMINATED
else:
raise AssertionError(
"Invalid return value from method `stage%d()` of"
" `StagedTaskCollection` object %r: must return `Task`"
" instance or number" % (done + 1, self))
class ParallelTaskCollection(TaskCollection):
"""
A `ParallelTaskCollection` runs all of its tasks concurrently.
The collection state is set to `TERMINATED` once all tasks have
reached the same terminal status.
"""
def __init__(self, tasks=None, **extra_args):
TaskCollection.__init__(self, tasks, **extra_args)
def _state(self):
"""
Return the state of the collection.
For a `ParallelTaskCollection`, the state of dependent jobs is
computed by looping across the states STOPPED, RUNNING,
SUBMITTED, TERMINATING, UNKNOWN, TERMINATED, NEW in the order
given: the first state for which there is at least one job in
that state is returned as the overall collection state. As an
exception, if the collection is a mixture of NEW and
TERMINATED jobs, then the global state is RUNNING (presuming
we're in the middle of a computation).
"""
stats = self.stats()
if (stats[Run.State.NEW] > 0
and stats[Run.State.TERMINATED] > 0
and stats[Run.State.NEW] + stats[Run.State.TERMINATED] ==
len(self.tasks)):
# we're in the middle of a computation (there's a mixture
# of unsubmitted and finished tasks), so let's chalk this
# up to ``RUNNING`` state
return Run.State.RUNNING
for state in [Run.State.STOPPED,
Run.State.RUNNING,
Run.State.SUBMITTED,
Run.State.UNKNOWN,
Run.State.TERMINATING,
# if we get here, then all jobs are TERMINATED or all
# NEW
Run.State.TERMINATED,
Run.State.NEW,
]:
if stats[state] > 0:
return state
return Run.State.UNKNOWN
def add(self, task):
"""
Add a task to the collection.
"""
task.detach()
self.tasks.append(task)
if self._attached:
task.attach(self._controller)
def attach(self, controller):
"""
Use the given Controller interface for operations on the job
associated with this task.
"""
for task in self.tasks:
if not task._attached:
task.attach(controller)
Task.attach(self, controller)
def kill(self, **extra_args):
"""
Terminate all tasks in the collection, and set collection
state to `TERMINATED`.
"""
for task in self.tasks:
task.kill(**extra_args)
self.execution.state = Run.State.TERMINATED
self.execution.returncode = (Run.Signals.Cancelled, -1)
self.changed = True
def progress(self):
"""
Try to advance all jobs in the collection to the next state in
a normal lifecycle.
"""
for task in self.tasks:
task.progress()
super(ParallelTaskCollection, self).progress()
def redo(self, *args, **kwargs):
"""
Reset collection and all included tasks to state ``NEW``.
If not all included tasks should are in a terminal state or
``NEW``, an `AssertionError` exception will be thrown.
See also `Task.redo`:meth: for a listing of allowed run states
when ``redo()`` is called.
"""
for task in self.tasks:
task.redo(*args, **kwargs)
super(ParallelTaskCollection, self).redo(*args, **kwargs)
def submit(self, resubmit=False, targets=None, **extra_args):
"""
Start all tasks in the collection.
"""
for task in self.tasks:
task.submit(resubmit, targets, **extra_args)
self.execution.state = self._state()
def update_state(self, **extra_args):
"""
Update state of all tasks in the collection.
"""
for task in self.tasks:
# gc3libs.log.debug("Updating state of %s in collection %s ..."
# % (task, self))
if task.execution.state not in [Run.State.NEW, Run.State.TERMINATED]:
task.update_state(**extra_args)
self.execution.state = self._state()
if self.execution.state == Run.State.TERMINATED:
self.execution.returncode = (0, 0)
# set exitcode based on returncode of sub-tasks
for task in self.tasks:
if task.execution.returncode != 0:
self.execution.exitcode = 1
# FIXME: incorrectly sets `changed` each time it's called!
self.changed = True
class ChunkedParameterSweep(ParallelTaskCollection):
def __init__(self, min_value, max_value, step, chunk_size, **extra_args):
"""
Like `ParallelTaskCollection`, but generate a sequence of jobs
with a parameter varying from `min_value` to `max_value` in
steps of `step`. Only `chunk_size` jobs are generated at a
time, to distribute the burden of job creation along the whole run.
"""
self.min_value = min_value
self.max_value = max_value
self.step = step
self.chunk_size = chunk_size
self._floor = min(min_value + (chunk_size * step), max_value)
initial = list()
for param in range(min_value, self._floor, step):
initial.append(self.new_task(param))
# start with the initial chunk of jobs
ParallelTaskCollection.__init__(self, initial, **extra_args)
def new_task(self, param, **extra_args):
"""
Return the `Task` corresponding to the parameter value `param`.
This method *must* be overridden in subclasses to generate tasks.
"""
raise NotImplementedError(
"Abstract method `ChunkedParameterSweep.new_task()` called -"
" this should have been defined in a derived class.")
# this is called at every cycle
def update_state(self, **extra_args):
"""
Like `ParallelTaskCollection.update_state()`,
but also creates new tasks if less than
`chunk_size` are running.
"""
ParallelTaskCollection.update_state(self, **extra_args)
# XXX: proposal, reset chuck_size from self._controller.max_in_flight
# this is the way to pass new 'max-running' value to the class
# this creates though, a tigh coupling with 'controller' and maybe
# limits the flexibility of the class.
# In this way we obsolete 'chunked_size' as part of the __init__ args
# if self._controller:
# gc3libs.log.info("Updating %s chunk_size from %d to %d" %
# (self.__class__,
# self.chunk_size,
# self._controller.max_in_flight))
# self.chunk_size = self._controller.
# XXX: shall we als could jobs in Run.State.STOPPED ?
num_running = len([task for task in self.tasks if
task.execution.state in [Run.State.NEW,
Run.State.SUBMITTED,
Run.State.RUNNING]])
# Run.State.UNKNOWN ]])
# add more jobs if we're close to the end
# XXX: why using 2*self.chunk_size as treshold ?
# is the idea to submit more jobs once we reach at least 50%
# of completion ?
# if num_running < 2*self.chunk_size and self._floor < self.max_value:
if 2 * num_running < self.chunk_size and self._floor < self.max_value:
# generate more tasks
top = min(
self._floor + (self.chunk_size * self.step), self.max_value)
for param in range(self._floor, top, self.step):
self.add(self.new_task(param, **extra_args))
self._floor = top
self.execution.state = self._state()
self.changed = True
return self.execution.state
class RetryableTask(Task):
"""
Wrap a `Task` instance and re-submit it until a specified
termination condition is met.
By default, the re-submission upon failure happens iff execution
terminated with nonzero return code; the failed task is retried up
to `self.max_retries` times (indefinitely if `self.max_retries` is 0).
Override the `retry` method to implement a different retryal policy.
*Note:* The resubmission code is implemented in the
`terminated`:meth:, so be sure to call it if you override in
derived classes.
"""
def __init__(self, task, max_retries=0, **extra_args):
"""
Wrap `task` and resubmit it until `self.retry()` returns `False`.
:param Task task: A `Task` instance that should be retried.
:param int max_retries: Maximum number of times `task` should be
re-submitted; use 0 for 'no limit'.
"""
self.max_retries = max_retries
self.retried = 0
self.task = task
self.would_output = self.task.would_output
Task.__init__(self, **extra_args)
@gc3libs.utils.defproperty
def changed():
"""
Evaluates to `True` if this task or any of its subtasks has been
modified and should be saved to persistent storage.
"""
def fget(self):
return self._changed or self.task.changed
def fset(self, value):
self._changed = value
return locals()
def __getattr__(self, name):
"""Proxy public attributes of the wrapped task."""
if name.startswith('_'):