-
Notifications
You must be signed in to change notification settings - Fork 23
/
core.py
executable file
·2445 lines (2107 loc) · 95.1 KB
/
core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
"""
Top-level classes for task execution and control.
"""
# Copyright (C) 2009-2016 S3IT, Zentrale Informatik, University of Zurich. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 2110-1301 USA
#
from collections import defaultdict
from fnmatch import fnmatch
import functools
import itertools
import os
import posix
import sys
import time
import tempfile
from warnings import warn
from dictproxyhack import dictproxy
import gc3libs
import gc3libs.debug
from gc3libs import Application, Run, Task
import gc3libs.exceptions
from gc3libs.quantity import Duration
import gc3libs.utils as utils
__docformat__ = 'reStructuredText'
class MatchMaker(object):
"""
Select and sort resources for attempting submission of a `Task`.
A match-making algorithm must implement two methods:
- `filter`: given a task and a list of resources, return the list
of resources that the given task could be submitted to.
- `rank`: given a task and a list of resources, return a list of
resources sorted in preference order, i.e., submission of the
given task will be attempted to the first returned resource,
then the next one, etc.
This class implements the default match-making algorithm in
GC3Pie, which operates as follows:
- *filter phase:* if `task` has a `compatible_resources` method (as
instances of `Application`:class: do), retain only those
resources where it evaluates to ``True``. Otherwise, return the
resources list unchanged.
- *rank phase:* sort resources according to the task's
`rank_resources` method, or retain the given order if task does
not define such method.
"""
# pylint: disable=no-self-use
def filter(self, task, resources):
"""
Return the subset of resources to which `task` could be submitted to.
Note that the result subset could be empty (no resource can
accomodate task's requirements).
The default implementation uses the task's
`compatible_resources` method to retain only the resources
that satisfy the task's requirements. If `task` does not
provide such a method, the resource list is returned
unchanged.
"""
gc3libs.log.debug(
"Performing matching of resource(s) %s to task '%s' ...",
str.join(',', (r.name for r in resources)), task)
# keep only compatible resources
try:
compatible_resources = task.compatible_resources(resources)
gc3libs.log.debug(
'Task compatiblity check returned %d matching resources',
len(compatible_resources))
except AttributeError:
# XXX: should we require that len(resources) > 0?
compatible_resources = resources
return compatible_resources
# pylint: disable=no-self-use
def rank(self, task, resources):
"""
Sort the list of `resources` in the preferred order for submitting
`task`.
Unless overridden in a derived class, this calls the task's
`rank_resources` method to sort the list. If the task does
not provide such a method, the resources list is returned
unchanged.
"""
# sort resources according to the Task's preference, if stated
try:
targets = task.rank_resources(resources)
except AttributeError:
targets = resources
return targets
class Core(object):
"""
Core operations: submit, update state, retrieve (a snapshot of) output,
cancel job.
Core operations are *blocking*, i.e., they return only after the
operation has successfully completed, or an error has been detected.
Operations are always performed by a `Core` object. `Core` implements
an overlay Grid on the resources specified in the configuration file.
Initialization of a `Core`:class: instance also initializes all
resources in the passed `Configuration`:class: instance. By default,
GC3Pie's `Core` objects will ignore errors in initializing resources,
and only raise an exception if *no* resources can be initialized.
This can be changed by either passing an optional argument
``resource_errors_are_fatal=True``, or by setting the environmental
variable ``GC3PIE_RESOURCE_INIT_ERRORS_ARE_FATAL`` to ``yes`` or ``1``.
"""
def __init__(self, cfg, matchmaker=MatchMaker(),
resource_errors_are_fatal=None):
# propagate resource init errors?
if resource_errors_are_fatal is None:
# get result from the environment
resource_errors_are_fatal = gc3libs.utils.string_to_boolean(
os.environ.get('GC3PIE_RESOURCE_INIT_ERRORS_ARE_FATAL', 'no'))
# init auths
self.auto_enable_auth = cfg.auto_enable_auth
# init backends
self.resources = cfg.make_resources(
ignore_errors=(not resource_errors_are_fatal))
if len(self.resources) == 0:
raise gc3libs.exceptions.NoResources(
"No resources given to initialize `gc3libs.core.Core` object!")
# init matchmaker
self.matchmaker = matchmaker
def get_backend(self, name):
try:
return self.resources[name]
except KeyError:
raise gc3libs.exceptions.InvalidResourceName(
"No configured resource by the name '%s'"
% (name,))
def select_resource(self, match):
"""
Disable resources that *do not* satisfy predicate `match`.
Return number of enabled resources.
Argument `match` can be:
- either a function (or a generic callable) that is passed
each `Resource` object in turn, and should return a
boolean indicating whether the resources should be kept
(`True`) or not (`False`);
- or it can be a string: only resources whose name matches
(wildcards ``*`` and ``?`` are allowed) are retained.
.. note::
Calling this method modifies the configured list of
resources in-place.
"""
enabled = 0
for lrms in self.resources.itervalues():
try:
if not match(lrms):
lrms.enabled = False
# we expect `TypeError: 'str' object is not callable` in case
# argument `match` is a string
except TypeError:
if not fnmatch(lrms.name, match):
lrms.enabled = False
if lrms.enabled:
enabled += 1
return enabled
def free(self, app, **extra_args):
"""
Free up any remote resources used for the execution of `app`.
In particular, this should delete any remote directories and
files.
It is an error to call this method if `app.execution.state` is
anything other than `TERMINATED`: an `InvalidOperation` exception
will be raised in this case.
:raise: `gc3libs.exceptions.InvalidOperation` if `app.execution.state`
differs from `Run.State.TERMINATED`.
"""
assert isinstance(
app, Task), "Core.free: passed an `app` argument which" \
" is not a `Task` instance."
if isinstance(app, Application):
return self.__free_application(app, **extra_args)
else:
# must be a `Task` instance
return self.__free_task(app, **extra_args)
# pylint: disable=unused-argument
def __free_application(self, app, **extra_args):
"""Implementation of `free` on `Application` objects."""
if app.execution.state not in [
Run.State.TERMINATING, Run.State.TERMINATED]:
raise gc3libs.exceptions.InvalidOperation(
"Attempting to free resources of job '%s',"
" which is in non-terminal state." % app)
# auto_enable_auth = extra_args.get(
# 'auto_enable_auth', self.auto_enable_auth)
try:
lrms = self.get_backend(app.execution.resource_name)
lrms.free(app)
except AttributeError:
gc3libs.log.debug(
"Core.__free_application():"
" Application `%s` is missing the `execution.resource_name` attribute."
" This should not happen. I'm assuming the application had been"
" aborted before submission.",
app)
# pylint: disable=unused-argument
def __free_task(self, task, **extra_args):
"""Implementation of `free` on generic `Task` objects."""
return task.free(**extra_args)
def submit(self, app, resubmit=False, targets=None, **extra_args):
"""Submit a job running an instance of the given task `app`.
Upon successful submission, call the `submitted` method on the
`app` object. If `targets` are given, submission of the task
is attempted to the resources in the order given; the `submit`
method returns after the first successful attempt. If
`targets` is ``None`` (default), a brokering procedure is run
to determine the best resource among the configured ones.
At the beginning of the submission process, the
`app.execution` state is reset to ``NEW``; if submission is
successful, the task will be in ``SUBMITTED`` or ``RUNNING``
state when this call returns.
:raise: `gc3libs.exceptions.InputFileError` if an input file
does not exist or cannot otherwise be read.
:param Task app:
A GC3Pie `Task`:class: instance to be submitted.
:param resubmit:
If ``True``, submit task regardless of its execution state;
if ``False`` (default), submission is a no-op if task is not
in ``NEW`` state.
:param targets:
A list of `Resource`s to submit the task to; resources are
tried in the order given. If ``None`` (default), perform
brokering among all the configured resources.
"""
assert isinstance(
app, Task), "Core.submit: passed an `app` argument" \
"which is not a `Task` instance."
if isinstance(app, Application):
return self.__submit_application(
app, resubmit, targets, **extra_args)
else:
# must be a `Task` instance
return self.__submit_task(app, resubmit, targets, **extra_args)
def __submit_application(self, app, resubmit, targets, **extra_args):
"""Implementation of `submit` on `Application` objects."""
gc3libs.log.debug("Submitting %s ...", app)
# auto_enable_auth = extra_args.get(
# 'auto_enable_auth', self.auto_enable_auth)
job = app.execution
if resubmit:
job.state = Run.State.NEW
elif job.state != Run.State.NEW:
return
# Validate Application local input files
for input_ref in app.inputs:
if input_ref.scheme == 'file':
# Local file, check existence before proceeding
if not os.path.exists(input_ref.path):
raise gc3libs.exceptions.UnrecoverableDataStagingError(
"Input file '%s' does not exist" % input_ref.path,
do_log=True)
if targets is not None:
assert len(targets) > 0
else: # targets is None
enabled_resources = [
r for r in self.resources.itervalues() if r.enabled]
if len(enabled_resources) == 0:
raise gc3libs.exceptions.NoResources(
"Could not initialize any computational resource"
" - please check log and configuration file.")
# decide which resource to use
compatible_resources = self.matchmaker.filter(
app, enabled_resources)
if len(compatible_resources) == 0:
raise gc3libs.exceptions.NoResources(
"No available resource can accomodate the application"
" requirements")
gc3libs.log.debug(
"Application compatibility check returned %d matching"
" resources", len(compatible_resources))
if len(compatible_resources) <= 1:
# shortcut: no brokering to do, just use what we've got
targets = compatible_resources
else:
# update status of selected resources
self.update_resources(compatible_resources)
updated_resources = [r for r in compatible_resources if r.updated]
if len(updated_resources) == 0:
raise gc3libs.exceptions.LRMSSubmitError(
"No computational resource found reachable during"
" update! Aborting submission of task '%s'" %
app)
# sort resources according to Application's preferences
targets = self.matchmaker.rank(app, updated_resources)
exs = []
# after brokering we have a sorted list of valid resource
for resource in targets:
gc3libs.log.debug("Attempting submission to resource '%s'...",
resource.name)
try:
job.timestamp[Run.State.NEW] = time.time()
job.info = ("Submitting to '%s'" % (resource.name,))
resource.submit_job(app)
except gc3libs.exceptions.LRMSSkipSubmissionToNextIteration as ex:
gc3libs.log.info("Submission of job %s delayed", app)
# Just raise the exception
raise
# pylint: disable=broad-except
except Exception as ex:
gc3libs.log.info(
"Error in submitting job to resource '%s': %s: %s",
resource.name, ex.__class__.__name__, str(ex),
exc_info=True)
exs.append(ex)
continue
gc3libs.log.info("Successfully submitted %s to: %s",
str(app), resource.name)
job.state = Run.State.SUBMITTED
job.resource_name = resource.name
job.info = ("Submitted to '%s'" % (job.resource_name,))
app.changed = True
app.submitted()
# job submitted; return to caller
return
# if wet get here, all submissions have failed; call the
# appropriate handler method if defined
ex = app.submit_error(exs)
if isinstance(ex, Exception):
app.execution.info = ("Submission failed: %s" % str(ex))
raise ex
else:
return
def __submit_task(self, task, resubmit, targets, **extra_args):
"""Implementation of `submit` on generic `Task` objects."""
extra_args.setdefault('auto_enable_auth', self.auto_enable_auth)
task.submit(resubmit, targets, **extra_args)
def update_job_state(self, *apps, **extra_args):
"""
Update state of all applications passed in as arguments.
If keyword argument `update_on_error` is `False` (default),
then application execution state is not changed in case a
backend error happens; it is changed to `UNKNOWN` otherwise.
Note that if state of a job changes, the `Run.state` calls the
appropriate handler method on the application/task object.
:raise: `gc3libs.exceptions.InvalidArgument` in case one of
the passed `Application` or `Task` objects is
invalid. This can stop updating the state of other
objects in the argument list.
:raise: `gc3libs.exceptions.ConfigurationError` if the
configuration of this `Core` object is invalid or
otherwise inconsistent (e.g., a resource references a
non-existing auth section).
"""
self.__update_application(
(app for app in apps if isinstance(
app,
Application)),
**extra_args)
self.__update_task(
(app for app in apps if not isinstance(
app,
Application)),
**extra_args)
def __update_application(self, apps, **extra_args):
"""Implementation of `update_job_state` on `Application` objects."""
update_on_error = extra_args.get('update_on_error', False)
# auto_enable_auth = extra_args.get(
# 'auto_enable_auth', self.auto_enable_auth)
for app in apps:
state = app.execution.state
old_state = state
gc3libs.log.debug(
"About to update state of application: %s (currently: %s)",
app,
state)
try:
if state not in [
Run.State.NEW,
Run.State.TERMINATING,
Run.State.TERMINATED,
]:
lrms = self.get_backend(app.execution.resource_name)
try:
state = lrms.update_job_state(app)
# pylint: disable=broad-except
except Exception as ex:
gc3libs.log.debug(
"Error getting status of application '%s': %s: %s",
app, ex.__class__.__name__, ex, exc_info=True)
state = Run.State.UNKNOWN
# run error handler if defined
ex = app.update_job_state_error(ex)
if isinstance(ex, Exception):
raise ex
if state != old_state:
app.changed = True
# set log information accordingly
if (app.execution.state == Run.State.TERMINATING
and app.execution.returncode is not None
and app.execution.returncode != 0):
# there was some error, try to explain
app.execution.info = (
"Execution failed on resource: %s" %
app.execution.resource_name)
signal = app.execution.signal
if signal in Run.Signals:
app.execution.info = (
"Abnormal termination: %s" % signal)
else:
if os.WIFSIGNALED(app.execution.returncode):
app.execution.info = (
"Remote job terminated by signal %d" %
signal)
else:
app.execution.info = (
"Remote job exited with code %d" %
app.execution.exitcode)
if state != Run.State.UNKNOWN or update_on_error:
app.execution.state = state
except (gc3libs.exceptions.InvalidArgument,
gc3libs.exceptions.ConfigurationError,
gc3libs.exceptions.UnrecoverableAuthError,
gc3libs.exceptions.FatalError):
# Unrecoverable; no sense in continuing --
# pass immediately on to client code and let
# it handle this...
raise
except gc3libs.exceptions.UnknownJob:
# information about the job is lost, mark it as failed
app.execution.returncode = (Run.Signals.Lost, -1)
app.execution.state = Run.State.TERMINATED
app.changed = True
continue
except gc3libs.exceptions.InvalidResourceName:
# could be the corresponding LRMS has been removed
# because of an unrecoverable error mark application
# as state UNKNOWN
gc3libs.log.warning(
"Cannot access computational resource '%s',"
" marking task '%s' as UNKNOWN.",
app.execution.resource_name, app)
app.execution.state = Run.State.TERMINATED
app.changed = True
continue
# This catch-all clause is needed otherwise the loop stops
# at the first erroneous iteration
#
# pylint: disable=broad-except
except Exception as ex:
if gc3libs.error_ignored(
# context:
# - module
'core',
# - class
'Core',
# - method
'update_job_state',
# - actual error class
ex.__class__.__name__,
# - additional keywords
'update',
):
gc3libs.log.warning(
"Ignored error in Core.update_job_state(): %s", ex)
# print again with traceback at a higher log level
gc3libs.log.debug(
"(Original traceback follows.)", exc_info=True)
continue
else:
# propagate generic exceptions for debugging purposes
raise
# pylint: disable=no-self-use
def __update_task(self, tasks, **extra_args):
"""Implementation of `update_job_state` on generic `Task` objects."""
for task in tasks:
assert isinstance(
task, Task), "Core.update_job_state: passed an argument" \
" which is not a `Task` instance."
task.update_state()
def fetch_output(self, app, download_dir=None,
overwrite=False, changed_only=True, **extra_args):
"""
Retrieve output into local directory `app.output_dir`.
If the task is not expected to produce any output (i.e.,
`app.would_output == False`) then the only effect of this is
to advance the state of ``TERMINATING`` tasks to
``TERMINATED``.
Optional argument `download_dir` overrides the download location.
The download directory is created if it does not exist. If it
already exists, and the optional argument `overwrite` is
``False`` (default), it is renamed with a `.NUMBER` suffix and
a new empty one is created in its place. Otherwise, if
'overwrite` is ``True``, files are downloaded over the ones
already present; in this case, the `changed_only` argument
controls which files are overwritten:
- if `changed_only` is ``True`` (default), then only files for
which the source has a different size or has been modified
more recently than the destination are copied;
- if `changed_only` is ``False``, then *all* files in `source`
will be copied into `destination`, unconditionally.
Source files that do not exist at `destination` will be
copied, independently of the `overwrite` and `changed_only`
settings.
If the task is in TERMINATING state, the state is changed to
`TERMINATED`, attribute `output_dir`:attr: is set to the
absolute path to the directory where files were downloaded,
and the `terminated` transition method is called on the `app`
object.
Task output cannot be retrieved when `app.execution` is in one
of the states `NEW` or `SUBMITTED`; an
`OutputNotAvailableError` exception is thrown in these cases.
:raise: `gc3libs.exceptions.OutputNotAvailableError` if no
output can be fetched from the remote job (e.g., the
Application/Task object is in `NEW` or `SUBMITTED`
state, indicating the remote job has not started
running).
"""
assert isinstance(
app, Task), "Core.fetch_output: passed an `app` argument " \
"which is not a `Task` instance."
if isinstance(app, Application):
self.__fetch_output_application(
app, download_dir, overwrite, changed_only, **extra_args)
else:
# generic `Task` object
self.__fetch_output_task(
app, download_dir, overwrite, changed_only, **extra_args)
def __fetch_output_application(
self, app, download_dir, overwrite, changed_only, **extra_args):
"""Implementation of `fetch_output` on `Application` objects."""
job = app.execution
if job.state in [Run.State.NEW, Run.State.SUBMITTED]:
raise gc3libs.exceptions.OutputNotAvailableError(
"Output not available: '%s' currently in state '%s'"
% (app, app.execution.state))
# auto_enable_auth = extra_args.get(
# 'auto_enable_auth', self.auto_enable_auth)
# determine download directory
#
# pylint: disable=protected-access
download_dir = app._get_download_dir(download_dir)
if download_dir is not None:
# Prepare/Clean download dir
try:
if overwrite:
if not os.path.exists(download_dir):
os.makedirs(download_dir)
else:
utils.mkdir_with_backup(download_dir)
except Exception as ex:
gc3libs.log.error(
"Failed creating download directory '%s': %s: %s",
download_dir,
ex.__class__.__name__,
str(ex))
raise
# download job output
try:
lrms = self.get_backend(job.resource_name)
lrms.get_results(app, download_dir, overwrite, changed_only)
# clear previous data staging errors
if job.signal == Run.Signals.DataStagingFailure:
job.signal = 0
except gc3libs.exceptions.InvalidResourceName as err:
ex = app.fetch_output_error(err)
if isinstance(ex, Exception):
job.info = ("No output could be retrieved: %s" % (ex,))
raise ex
else:
return
except gc3libs.exceptions.RecoverableDataStagingError as rex:
job.info = ("Temporary failure when retrieving results: %s."
" Ignoring error, try again." % str(rex))
return
except gc3libs.exceptions.UnrecoverableDataStagingError as ex:
# pylint: disable=redefined-variable-type
job.signal = Run.Signals.DataStagingFailure
ex = app.fetch_output_error(ex)
if isinstance(ex, Exception):
job.info = ("No output could be retrieved: %s" % str(ex))
raise ex
# pylint: disable=broad-except
except Exception as ex:
ex = app.fetch_output_error(ex)
if isinstance(ex, Exception):
raise ex
# successfully downloaded results
gc3libs.log.debug(
"Downloaded output of '%s' (which is in state %s)",
app, job.state)
app.output_dir = os.path.abspath(download_dir)
app.changed = True
if job.state == Run.State.TERMINATING:
gc3libs.log.debug("Final output of '%s' retrieved", app)
return Task.fetch_output(app, download_dir)
def __fetch_output_task(
self, task, download_dir, overwrite, changed_only, **extra_args):
"""Implementation of `fetch_output` on generic `Task` objects."""
return task.fetch_output(
download_dir, overwrite, changed_only, **extra_args)
def get_resources(self, **extra_args):
"""
Return list of resources configured into this `Core` instance.
"""
return [lrms for lrms in self.resources.itervalues()]
def kill(self, app, **extra_args):
"""
Terminate a job.
Terminating a job in RUNNING, SUBMITTED, or STOPPED state
entails canceling the job with the remote execution system;
terminating a job in the NEW or TERMINATED state is a no-op.
"""
assert isinstance(
app, Task), "Core.kill: passed an `app` argument which is not"\
" a `Task` instance."
if isinstance(app, Application):
self.__kill_application(app, **extra_args)
else:
self.__kill_task(app, **extra_args)
def __kill_application(self, app, **extra_args):
"""Implementation of `kill` on `Application` objects."""
job = app.execution
# auto_enable_auth = extra_args.get(
# 'auto_enable_auth', self.auto_enable_auth)
try:
lrms = self.get_backend(job.resource_name)
lrms.cancel_job(app)
except AttributeError:
# A job in state NEW does not have a `resource_name`
# attribute.
if job.state != Run.State.NEW:
raise
except gc3libs.exceptions.InvalidResourceName:
gc3libs.log.warning(
"Cannot access computational resource '%s',"
" but marking task '%s' as TERMINATED anyway.",
app.execution.resource_name, app)
gc3libs.log.debug(
"Setting task '%s' status to TERMINATED"
" and returncode to SIGCANCEL", app)
app.changed = True
# setting the state runs the state-transition handlers,
# which may raise an error -- ignore them, but log nonetheless
try:
job.state = Run.State.TERMINATED
# pylint: disable=broad-except
except Exception as ex:
if gc3libs.error_ignored(
# context:
# - module
'core',
# - class
'Core',
# - method
'kill',
# - actual error class
ex.__class__.__name__,
# - additional keywords
'state',
job.state,
'TERMINATED',
):
gc3libs.log.info("Ignoring error in state transition"
" since task is being killed: %s", ex)
else:
# propagate exception to caller
raise
job.signal = Run.Signals.Cancelled
job.history.append("Cancelled")
def __kill_task(self, task, **extra_args):
extra_args.setdefault('auto_enable_auth', self.auto_enable_auth)
task.kill(**extra_args)
def peek(self, app, what='stdout', offset=0, size=None, **extra_args):
"""
Download `size` bytes (at `offset` bytes from the start) from
the remote job standard output or error stream, and write them
into a local file. Return file-like object from which the
downloaded contents can be read.
If `size` is `None` (default), then snarf all available
contents of the remote stream from `offset` unto the end.
The only allowed values for the `what` arguments are the
strings `'stdout'` and `'stderr'`, indicating that the
relevant section of the job's standard output resp. standard
error should be downloaded.
"""
assert isinstance(
app, Task), "Core.peek: passed an `app` argument which is" \
" not a `Task` instance."
if isinstance(app, Application):
return self.__peek_application(
app, what, offset, size, **extra_args)
else:
return self.__peek_task(app, what, offset, size, **extra_args)
def __peek_application(self, app, what, offset, size, **extra_args):
"""Implementation of `peek` on `Application` objects."""
if what == 'stdout':
remote_filename = app.stdout
elif what == 'stderr':
remote_filename = app.stderr
else:
raise gc3libs.exceptions.Error(
"File name requested to `Core.peek` must be"
" 'stdout' or 'stderr', not '%s'" % what)
# Check if local data available
job = app.execution
if job.state == Run.State.TERMINATED:
# FIXME: local data could be stale!!
filename = os.path.join(app.output_dir, remote_filename)
local_file = open(filename, 'r')
else:
# Get authN
# auto_enable_auth = extra_args.get(
# 'auto_enable_auth', self.auto_enable_auth)
lrms = self.get_backend(job.resource_name)
local_file = tempfile.NamedTemporaryFile(
suffix='.tmp', prefix='gc3libs.')
lrms.peek(app, remote_filename, local_file, offset, size)
local_file.flush()
local_file.seek(0)
return local_file
def __peek_task(self, task, what, offset, size, **extra_args):
"""Implementation of `peek` on generic `Task` objects."""
return task.peek(what, offset, size, **extra_args)
def update_resources(self, resources=all, **extra_args):
"""
Update the state of a given set of resources.
Each resource object in the returned list will have its `updated`
attribute set to `True` if the update operation succeeded, or `False`
if it failed.
Optional argument `resources` should be a subset of the
resources configured in this `Core` instance (the actual
`Lrms`:class: objects, not the resource names). By default,
all configured resources are updated.
"""
if resources is all:
resources = self.resources.values()
for lrms in self.resources.itervalues():
try:
if not lrms.enabled:
continue
# auto_enable_auth = extra_args.get(
# 'auto_enable_auth', self.auto_enable_auth)
lrms.get_resource_status()
lrms.updated = True
except gc3libs.exceptions.UnrecoverableError as err:
# disable resource -- there's no point in
# trying it again at a later stage
lrms.enabled = False
lrms.updated = False
gc3libs.log.error(
"Unrecoverable error updating status"
" of resource '%s': %s."
" Disabling resource.",
lrms.name, err)
gc3libs.log.warning(
"Resource %s will be ignored from now on.",
lrms.name)
gc3libs.log.debug(
"Got error '%s' in updating resource '%s';"
" printing full traceback.",
err.__class__.__name__, lrms.name,
exc_info=True)
# pylint: disable=broad-except
except Exception as err:
gc3libs.log.error(
"Ignoring error updating resource '%s': %s.",
lrms.name, err)
gc3libs.log.debug(
"Got error '%s' in updating resource '%s';"
" printing full traceback.",
err.__class__.__name__, lrms.name,
exc_info=True)
lrms.updated = False
def close(self):
"""
Used to invoke explicitly the destructor on objects
e.g. LRMS
"""
for lrms in self.resources.itervalues():
lrms.close()
# compatibility with the `Engine` interface
def add(self, task):
"""
This method is here just to allow `Core` and `Engine` objects
to be used interchangeably. It's effectively a no-op, as it makes
no sense in the synchronous/blocking semantics implemented by `Core`.
"""
pass
def remove(self, task):
"""
This method is here just to allow `Core` and `Engine` objects
to be used interchangeably. It's effectively a no-op, as it makes
no sense in the synchronous/blocking semantics implemented by `Core`.
"""
pass
def _update_task_counts(self, task, state, increment):
"""
No-op, implemented for compatibility with `Engine`.
This method is here just to allow `Core` and `Engine` objects
to be used interchangeably.
"""
pass
class Scheduler(object):
"""
Instances of the `Scheduler` class are used in
`Engine.progress`:meth: to determine what tasks (among those in
`Run.State.NEW` state) are to be submitted.
A `Scheduler` object must implement *both* the context_ protocol
*and* the iterator_ protocol.
.. _context: http://goo.gl/SvWWyw
.. _iterator: http://goo.gl/ue2zje
The way a `Scheduler` instance is actually used within `Engine` is
as follows:
0. A `Scheduler` instance is created, passing it two arguments: a
list of tasks in ``NEW`` state, and a dictionary of configured
resources (keys are resource names, values are actual resource
objects).
1. When a new submission cycle starts, the `__enter__`:meth:
method is called.
2. The `Engine` iterates by repeatedly calling the `next`:meth:
method to receive tasks to be submitted. The `send`:meth: and
`throw`:meth: methods are used to notify the scheduler of the
outcome of the submission attempt.
3. When the submission cycle ends, the `__exit__`:meth: method is called.
The `Scheduler.schedule` generator is the heart of the submission
process and has basically complete control over it. It is
initialized with the list of tasks in ``NEW`` state, and the list
of configured resources. The `next`:meth: method should yield
pairs *(task index, resource name)*, where the *task index* is the
position of the task to be submitted next in the given list, and
--similarly-- the *resource name* is the name of the resource to
which the task should be submitted.
For each pair yielded, submission of that task to the selected
resource is attempted; the state of the task object after
submission is sent back (via the `send`:meth: method) to the
`Scheduler` instance; if an exception is raised, that exception is
thrown (via the `throw`:meth: method) into the scheduler object
instead. Submission stops when the `next()` call raises a
`StopIteration` exception.
"""
def __init__(self, tasks, resources):
self.tasks = tasks
self.resources = resources
def __enter__(self):
"""Called at the start of a scheduling cycle.
Implementation of this method should follow Python's context_
protocol; in particular, this method must return a reference
to a valid context object.
By default, just returns a reference to ``self``.
"""
return self
# pylint: disable=missing-docstring
def next(self):
raise NotImplementedError(
"Method `next` of class `%s` has not been implemented."
% self.__class__.__name__)
# pylint: disable=missing-docstring
def send(self, result):
raise NotImplementedError(
"Method `send` of class `%s` has not been implemented."
% self.__class__.__name__)
# pylint: disable=missing-docstring
def throw(self, *excinfo):
raise NotImplementedError(
"Method `throw` of class `%s` has not been implemented."
% self.__class__.__name__)