-
Notifications
You must be signed in to change notification settings - Fork 23
/
shellcmd.py
executable file
·1722 lines (1497 loc) · 67.7 KB
/
shellcmd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#! /usr/bin/env python
# Copyright (C) 2009-2018 University of Zurich. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# make coding more python3-ish, must be the first statement
from __future__ import (absolute_import, division, print_function)
## module doc and other metadata
"""
Run applications as processes starting them from the shell.
"""
__docformat__ = 'reStructuredText'
## imports and other dependencies
# stdlib imports
from abc import ABCMeta, abstractmethod
from collections import defaultdict
import cPickle as pickle
from getpass import getuser
import os
import os.path
import posixpath
import time
from pkg_resources import Requirement, resource_filename
# GC3Pie imports
import gc3libs
import gc3libs.exceptions
import gc3libs.backends.transport
from gc3libs import log, Run
from gc3libs.utils import same_docstring_as, Struct, sh_quote_safe, sh_quote_unsafe
from gc3libs.backends import LRMS
from gc3libs.quantity import Duration, Memory, MB
## helper functions
#
# Mainly for parsing output of shell programs.
#
def _parse_process_status(pstat):
"""
Map `ps` process status letter to a `Run.State` label.
"""
# Check manpage of ``ps`` both on linux and MacOSX/BSD to know the meaning
# of these statuses
if pstat[0] in [
# sort by likelihood of process being in this state,
# to minimize loookup times
'R', # in run queue
'S', # interruptible sleep
'D', # uninterruptible sleep (Linux)
'U', # uninterruptible sleep (MacOSX)
'I', # idle (= sleeping > 20s, MacOSX)
'W', # paging (Linux, no longer valid since the 2.6.xx kernel)
'Z', # "zombie" process
]:
return Run.State.RUNNING
elif pstat[0] in [
'T', # stopped by job control signal
't', # stopped by debugger during the tracing
'X' # dead (should never be seen)
]:
return Run.State.STOPPED
else:
raise KeyError("Unknown process status code `{0}`".format(pstat[0]))
def _parse_time_duration(val):
"""
Convert the output of common Linux/UNIX system utilities into a GC3Pie `Duration` object.
Any of the time formats *DD-HH:MM:SS* (days, hours, minutes, seconds),
*HH:MM:SS* (hours, minutes, seconds), or *MM:SS* (minutes, seconds), or
even just the number of seconds are acceptable::
>>> _parse_time_duration('25-00:31:05') == Duration('25d') + Duration('31m') + Duration('5s')
True
>>> _parse_time_duration('1:02:03') == Duration('1h') + Duration('2m') + Duration('3s')
True
>>> _parse_time_duration('01:02') == Duration('1m') + Duration('2s')
True
>>> _parse_time_duration('42') == Duration(42, unit=Duration.s)
True
The *seconds* portion of the time string can be followed by
decimal digits for greater precision::
>>> _parse_time_duration('0:00.00') == Duration(0, unit=Duration.s)
True
>>> _parse_time_duration('4.20') == Duration(4.20, unit=Duration.s)
True
When only the number of seconds is given, an optional trailing
unit specified `s` is allowed::
>>> _parse_time_duration('4.20s') == Duration(4.20, unit=Duration.s)
True
Among the programs whose output can be parsed by this function, there are:
- GNU time's `%e` format specifier;
- output of `ps -o etime=` (on both GNU/Linux and MacOSX)
"""
n = val.count(':')
if 2 == n:
if '-' in val:
days, timespan = val.split('-')
return (Duration(days + 'd') + Duration(timespan))
else:
# Duration's ctor can natively parse this
return Duration(val)
elif 1 == n:
# AA:BB is rejected as ambiguous by `Duration`'s built-in
# parser; work around it
mm, ss = val.split(':')
return (Duration(int(mm, 10), unit=Duration.m)
+ Duration(float(ss), unit=Duration.s))
elif 0 == n:
# remove final unit spec, if present
if val.endswith('s'):
val = val[:-1]
# number of seconds with up to 2 decimal precision
return Duration(float(val), unit=Duration.s)
else:
raise ValueError(
"Expecting duration in the form HH:MM:SS, MM:SS,"
" or just number of seconds,"
" got {val} instead".format(val=val))
def _parse_percentage(val):
"""
Convert a percentage string into a Python float.
The percent sign at the end is optional::
>>> _parse_percentage('10')
10.0
>>> _parse_percentage('10%')
10.0
>>> _parse_percentage('0.25%')
0.25
"""
return float(val[:-1]) if val.endswith('%') else float(val)
def _parse_returncode_string(val):
return Run.shellexit_to_returncode(int(val))
## interface to different OS
#
# Linux and MacOSX require slightly different command incantations to
# achieve the same task. The `Machine` class abstract this into a
# uniform interface.
#
class _Machine(object):
"""Base class for OS-specific shell services."""
__metaclass__ = ABCMeta
def __init__(self, transport):
self.transport = transport
@staticmethod
def detect(transport):
"""Factory method to create a `_Machine` instance based on the running kernel."""
exit_code, stdout, stderr = transport.execute_command('uname -s')
running_kernel = stdout.strip()
if running_kernel == 'Linux':
return _LinuxMachine(transport)
elif running_kernel == 'Darwin':
return _MacOSXMachine(transport)
else:
raise RuntimeError(
"Unexpected kernel name: got {0},"
" expecting one of 'Linux', 'Darwin'"
.format(running_kernel))
def _run_command(self, cmd):
"""
Run a command and return its STDOUT, or raise exception if it errors out.
This is like `subprocess.check_call`, with the following differences:
* the slave command is executed through `Transport.execute_command`
so it need not be locally executed.
* a `RuntimeError` exception is raised if either the exit code from the
process is non-zero or STDERR contains any text.
"""
exit_code, stdout, stderr = self.transport.execute_command(cmd)
if exit_code != 0 or stderr:
raise RuntimeError("Got error running command `{0}` (exit code {1}): {2}"
.format(cmd, exit_code, stderr.strip()))
return stdout
def get_architecture(self):
cmd = 'uname -m'
stdout = self._run_command(cmd)
return gc3libs.config._parse_architecture(stdout)
def get_process_state(self, pid):
"""
Return the 1-letter state of process PID.
(See the ``ps`` man page for a list of possible codes and explanations.)
Raise ``LookupError`` if no process is identified by the given PID.
"""
cmd = 'ps -p {0} -o state='.format(pid)
rc, stdout, stderr = self.transport.execute_command(cmd)
if rc == 1: # FIXME: same return code on MacOSX?
raise LookupError('No process with PID {0}'.format(pid))
elif rc == 0:
return stdout.strip()
else:
raise RuntimeError("Got error running command `{0}` (exit code {1}): {2}"
.format(cmd, exit_code, stderr.strip()))
def get_process_running_time(self, pid):
"""
Return elapsed time since start of process identified by PID.
Raise ``LookupError`` if no process is identified by the given PID.
"""
cmd = 'ps -p {0} -o etime='.format(pid)
rc, stdout, stderr = self.transport.execute_command(cmd)
if rc == 1: # FIXME: same return code on MacOSX?
raise LookupError('No process with PID {0}'.format(pid))
elif rc == 0:
etime = stdout.strip()
return _parse_time_duration(etime)
else:
raise RuntimeError("Got error running command `{0}` (exit code {1}): {2}"
.format(cmd, exit_code, stderr.strip()))
def get_total_cores(self):
"""Return total nr. of CPU cores."""
cmd = self._get_total_cores_command()
stdout = self._run_command(cmd)
try:
return int(stdout)
except (ValueError, TypeError) as err:
raise RuntimeError("Cannot parse output `{0}` of command `{1}`"
" as total number of CPU cores: {2}"
.format(stdout.strip(), cmd, err))
# This could be a property of the class, but then we won't be able to
# use the `@abstractmethod` constructor to enforce that derived classes
# provide an implementation
@abstractmethod
def _get_total_cores_command(self):
"""Command to run to print the nr. of CPU cores to STDOUT."""
pass
def get_total_memory(self):
"""
Return amount of total amount of RAM as a `gc3libs.quantity.Memory` object.
"""
parts, index, unit = self._get_total_memory_impl()
try:
qty = parts[index]
amount = int(qty)
return amount*unit
except KeyError: # index out of bounds
raise AssertionError(
"Call to {0} returned out-of-bounds index {1} into sequence {2}"
.format(self._get_total_memory_impl, index, parts))
except (ValueError, TypeError) as err:
raise RuntimeError("Cannot `{0}` as a memory amount: {1}"
.format(qty, err))
@abstractmethod
def _get_total_memory_impl(self):
"""Machine-specific part of `get_total_memory`."""
pass
def list_process_tree(self, root_pid="1"):
"""
Return list of PIDs of children of the given process.
The returned list is empty if no process whose PID is
`root_pid` can be found.
Otherwise, the list is composed by walking the tree
breadth-first, so it always starts with `root_pid` and ends
with leaf processes (i.e., those which have no children).
"""
ps_output = self._run_command(self._list_pids_and_ppids_command())
children = defaultdict(list)
for line in ps_output.split('\n'):
line = line.strip()
if not line:
continue
pid, ppid = line.split()
children[str(ppid)].append(str(pid))
if root_pid not in children:
return []
result = []
queue = [root_pid]
while queue:
node = queue.pop() # dequeue
result.append(node)
for child in children[node]:
queue.insert(0, child) # enqueue
return result
class _LinuxMachine(_Machine):
"""Linux-specific shell tools."""
def _get_total_cores_command(self):
"""Return nr. of CPU cores from ``nproc``"""
return 'nproc'
def _get_total_memory_impl(self):
with self.transport.open('/proc/meminfo', 'r') as fd:
for line in fd:
if line.startswith('MemTotal'):
return (line.split(), 1, Memory.KiB)
def _list_pids_and_ppids_command(self):
return 'ps --no-header -o pid,ppid'
class _MacOSXMachine(_Machine):
"""MacOSX-specific shell tools."""
def _get_total_cores_command(self):
"""Return nr. of CPU cores from ``sysctl hw.ncpu``"""
return 'sysctl -n hw.ncpu'
def _get_total_memory_impl(self):
"""Return amount of total memory from ``sysctl hw.memsize``"""
cmd = 'sysctl hw.memsize'
stdout = self._run_command(cmd)
return (stdout.split(':'), 1, Memory.B)
def _list_pids_and_ppids_command(self):
return 'ps -o pid=,ppid='
## the main LRMS class
#
#
#
class ShellcmdLrms(LRMS):
"""
Execute an `Application`:class: instance through the shell.
Construction of an instance of `ShellcmdLrms` takes the following
optional parameters (in addition to any parameters taken by the
base class `LRMS`:class:):
:param str time_cmd:
Path to the GNU ``time`` command. Default is
`/usr/bin/time`:file: which is correct on all known Linux
distributions.
This backend uses many of the
extended features of GNU ``time``, so the shell-builtins or the
BSD ``time`` will not work.
:param str spooldir:
Path to a filesystem location where to create
temporary working directories for processes executed through
this backend. The default value `None` means to use ``$TMPDIR``
or `/var/tmp`:file: (see `tempfile.mkftemp` for details).
:param str resourcedir:
Path to a filesystem location where to create a temporary
directory that will contain information on the jobs running on
the machine. The default value `None` means to use
``$HOME/.gc3/shellcmd.d``.
:param str transport:
Transport to use to connect to the resource. Valid values are
``'ssh'`` or ``'local'``.
:param str frontend:
If `transport` is ``'ssh'``, then `frontend` is the hostname of the
remote machine where the jobs will be executed.
:param bool ignore_ssh_host_key:
When connecting to a remote resource using the ``'ssh'`` transport the
server's SSH public key is usually checked against a database of known
hosts, and if the key is found but it does not match with the one saved
in the database, the connection will fail. Setting `ignore_ssh_host_key`
to `True` will disable this check, thus introducing a potential security
issue but allowing connection even though the database contains
old/invalid keys. (The main use case is when connecting to VMs on a IaaS
cloud, since the IP is usually reused and therefore the ssh key is
recreated.)
:param bool override:
`ShellcmdLrms` by default will try to gather information on the
machine the resource is running on, including the number of
cores and the available memory. These values may be different
from the values stored in the configuration file. If `override`
is ``True``, then the values automatically discovered will be used
instead of the ones in the configuration file. If `override` is
False, instead, the values in the configuration file will be
used.
:param int ssh_timeout:
If `transport` is ``'ssh'``, this value will be used as timeout (in
seconds) for connecting to the SSH TCP socket.
:param gc3libs.quantity.Memory large_file_threshold:
Copy files below this size in one single SFTP GET operation;
see `SshTransport.get`:meth: for more information.
Only used if `transport` is ``'ssh'``.
:param gc3libs.quantity.Memory large_file_chunk_size:
Copy files that are over the above-mentioned threshold by
sequentially transferring chunks of this size.
see `SshTransport.get`:meth: for more information.
Only used if `transport` is ``'ssh'``.
"""
TIMEFMT = '\n'.join([
'WallTime=%es',
'KernelTime=%Ss',
'UserTime=%Us',
'CPUUsage=%P',
'MaxResidentMemory=%MkB',
'AverageResidentMemory=%tkB',
'AverageTotalMemory=%KkB',
'AverageUnsharedMemory=%DkB',
'AverageUnsharedStack=%pkB',
'AverageSharedMemory=%XkB',
'PageSize=%ZB',
'MajorPageFaults=%F',
'MinorPageFaults=%R',
'Swaps=%W',
'ForcedSwitches=%c',
'WaitSwitches=%w',
'Inputs=%I',
'Outputs=%O',
'SocketReceived=%r',
'SocketSent=%s',
'Signals=%k',
'ReturnCode=%x',
])
"""
Format string for running commands with ``/usr/bin/time``.
It is used by GC3Pie to capture resource usage data for commands
executed through the shell.
The value used here lists all the resource usage values that *GNU
time* can capture, with the same names used by the ARC Resource
Manager (for historical reasons).
"""
# how to translate GNU time output into GC3Pie execution metrics
TIMEFMT_CONV = {
# GNU time output key .execution attr converter function
# | | |
# v v v
'WallTime': ('duration', _parse_time_duration),
'KernelTime': ('shellcmd_kernel_time', Duration),
'UserTime': ('shellcmd_user_time', Duration),
'CPUUsage': ('shellcmd_cpu_usage', _parse_percentage),
'MaxResidentMemory': ('max_used_memory', Memory),
'AverageResidentMemory': ('shellcmd_average_resident_memory', Memory),
'AverageTotalMemory': ('shellcmd_average_total_memory', Memory),
'AverageUnsharedMemory': ('shellcmd_average_unshared_memory', Memory),
'AverageUnsharedStack': ('shellcmd_average_unshared_stack', Memory),
'AverageSharedMemory': ('shellcmd_average_shared_memory', Memory),
'PageSize': ('shellcmd_page_size', Memory),
'MajorPageFaults': ('shellcmd_major_page_faults', int),
'MinorPageFaults': ('shellcmd_minor_page_faults', int),
'Swaps': ('shellcmd_swapped', int),
'ForcedSwitches': ('shellcmd_involuntary_context_switches', int),
'WaitSwitches': ('shellcmd_voluntary_context_switches', int),
'Inputs': ('shellcmd_filesystem_inputs', int),
'Outputs': ('shellcmd_filesystem_outputs', int),
'SocketReceived': ('shellcmd_socket_received', int),
'SocketSent': ('shellcmd_socket_sent', int),
'Signals': ('shellcmd_signals_delivered', int),
'ReturnCode': ('returncode', _parse_returncode_string),
}
"""
How to translate *GNU time* output into values stored in the ``.execution`` attribute.
The dictionary maps key names (as used in the `TIMEFMT` string) to
a pair *(attribute name, converter function)* consisting of the
name of an attribute that will be set on a task's ``.execution``
object, and a function to convert the (string) value gotten from
*GNU time* output into the actual Python value written.
"""
PRIVATE_DIR = '.gc3pie_shellcmd'
"""
Subdirectory of a tasks's execution directory reserved for storing
`ShellcmdLrms`:class: files.
"""
WRAPPER_SCRIPT = 'wrapper_script.sh'
"""
Name of the task launcher script (within `PRIVATE_DIR`).
The `ShellcmdLrms`:class: writes here that wrap an application's
payload script, to collect resource usage or download/upload
result files, etc.
"""
WRAPPER_OUTPUT_FILENAME = 'resource_usage.txt'
"""
Name of the file where resource usage is written to.
(Relative to `PRIVATE_DIR`.)
"""
WRAPPER_PID = 'wrapper.pid'
"""
Name of the file where the wrapper script's PID is stored.
(Relative to `PRIVATE_DIR`).
"""
MOVER_SCRIPT = 'mover.py'
"""
Name of the data uploader/downloader script (within `PRIVATE_DIR`).
"""
RESOURCE_DIR = '$HOME/.gc3/shellcmd.d'
"""
Path to the directory where bookkeeping files are stored.
(This is on the target machine where `ShellcmdLrms`:class:
executes commands.)
It may contain environmental variable references, which are
expanded through the (remote) shell.
"""
def __init__(self, name,
# these parameters are inherited from the `LRMS` class
architecture, max_cores, max_cores_per_job,
max_memory_per_core, max_walltime,
auth=None,
# these are specific to `ShellcmdLrms`
frontend='localhost', transport='local',
time_cmd=None,
override='False',
spooldir=None,
resourcedir=None,
# SSH-related options; ignored if `transport` is 'local'
ssh_config=None,
keyfile=None,
ignore_ssh_host_keys=False,
ssh_timeout=None,
large_file_threshold=None,
large_file_chunk_size=None,
**extra_args):
# init base class
LRMS.__init__(
self, name,
architecture, max_cores, max_cores_per_job,
max_memory_per_core, max_walltime, auth, **extra_args)
# whether actual machine params (cores, memory) should be
# auto-detected on first use
self.override = gc3libs.utils.string_to_boolean(override)
# default is to use $TMPDIR or '/var/tmp' (see
# `tempfile.mkftemp`), but we delay the determination of the
# correct dir to the submit_job, so that we don't have to use
# `transport` right now.
self.spooldir = spooldir
# Configure transport
if transport == 'local':
self.transport = gc3libs.backends.transport.LocalTransport()
self._username = getuser()
self.frontend = 'localhost'
elif transport == 'ssh':
auth = self._auth_fn()
self._username = auth.username
self.transport = gc3libs.backends.transport.SshTransport(
frontend,
ignore_ssh_host_keys=ignore_ssh_host_keys,
ssh_config=(ssh_config or auth.ssh_config),
username=self._username,
port=auth.port,
keyfile=(keyfile or auth.keyfile),
timeout=(ssh_timeout or auth.timeout),
large_file_threshold=large_file_threshold,
large_file_chunk_size=large_file_chunk_size,
)
self.frontend = frontend
else:
raise AssertionError("Unknown transport '{0}'" .format(transport))
# Init bookkeeping
self.updated = False # data may not reflect actual state
self.free_slots = self.max_cores
self.user_run = 0
self.user_queued = 0
self.queued = 0
self.total_memory = max_memory_per_core
self.available_memory = self.total_memory
self._job_infos = {}
# Some init parameters can only be discovered / checked when a
# connection to the target resource is up. We want to delay
# this until the time they are actually used, to avoid opening
# up a network connection when the backend is initialized
# (could be e.g. a `ginfo -n` call that has no need to operate
# on remote objects).
self._resourcedir_raw = resourcedir or ShellcmdLrms.RESOURCE_DIR
self._time_cmd = time_cmd
self._time_cmd_ok = False # check on first use
@property
def frontend(self):
return self._frontend
@frontend.setter
def frontend(self, value):
self._frontend = value
self.transport.set_connection_params(value)
@property
def resource_dir(self):
try:
return self._resource_dir
except AttributeError:
# Since RESOURCE_DIR contains the `$HOME` variable, we
# have to expand it by connecting to the remote
# host. However, we don't want to do that during
# initialization, so we do it the first time this is
# actually needed.
self._init_resource_dir()
return self._resource_dir
@resource_dir.setter
def resource_dir(self, value):
self._resource_dir = value
def _init_resource_dir(self):
self.transport.connect()
# expand env variables in the `resource_dir` setting
exit_code, stdout, stderr = self.transport.execute_command(
'echo %s' % sh_quote_unsafe(self._resourcedir_raw))
self.resource_dir = stdout.strip()
if not self.transport.exists(self.resource_dir):
try:
log.info("Creating resource directory: '%s' ...", self.resource_dir)
self.transport.makedirs(self.resource_dir)
except Exception as ex:
log.error("Failed creating resource directory '%s': %s: %s",
self.resource_dir, type(ex), ex)
# cannot continue
raise
@property
def spooldir(self):
"""
Root folder for all working directories of GC3Pie tasks.
When this backend executes a task, it first creates a temporary
subdirectory of this folder, then launches commands in there.
If not explicitly set (e.g. at construction time), the "spool
directory" will be given a default value according to the logic of
:meth:`_discover_spooldir`:
* If the remote environment variable ``TMPDIR`` is set and points to an
existing directory, that value is used;
* otherwise, the hard-coded default ``/var/tmp`` is used instead.
"""
if not self._spooldir:
self._init_spooldir()
return self._spooldir
@spooldir.setter
def spooldir(self, value):
self._spooldir = value
def _init_spooldir(self):
"""Set `self.spooldir` to a sensible value."""
rc, stdout, stderr = self.transport.execute_command(
'cd "$TMPDIR" && pwd')
if (rc != 0 or stdout.strip() == '' or stdout[0] != '/'):
log.debug(
"Unable to recover a valid absolute path for `spooldir`"
" on resource `%s`. Using `/var/tmp`.", self.name)
self.spooldir = '/var/tmp'
else:
self.spooldir = stdout.strip()
@property
def time_cmd(self):
if not self._time_cmd_ok:
self._time_cmd = self._locate_gnu_time()
self._time_cmd_ok = True
return self._time_cmd
def _gather_machine_specs(self):
"""
Gather information about target machine and update config.
The following attributes are set (or reset) as an effect
of calling this method:
- ``_machine``: Set to the an appropriate instance of
`_Machine`:class:, detected through connection via
`self.transport`.
- ``_resource_dir``: Set to the expansion of whatever was
passed as ``resource`` construction parameter.
- ``max_cores``: If ``self.override`` is true, set to the
number of processors on the target.
- ``total_memory``: If ``self.override`` is true, set to total
amount of memory on the target.
"""
self.transport.connect()
self._machine = _Machine.detect(self.transport)
self._init_arch()
if self.override:
self._init_max_cores()
self._init_total_memory()
self._update_resource_usage_info()
def _init_arch(self):
arch = self._machine.get_architecture()
if not (arch <= self.architecture):
if self.override:
log.info(
"Mismatch of value `architecture` on resource %s:"
" configuration file says `achitecture=%s`"
" but GC3Pie detected `%s`. Updating current value.",
self.name,
','.join(self.architecture),
','.join(arch))
self.architecture = arch
else:
raise gc3libs.exceptions.ConfigurationError(
"Invalid architecture: configuration file says `%s` but "
"it actually is `%s`" % (str.join(', ', self.architecture),
str.join(', ', arch)))
def _init_max_cores(self):
max_cores = self._machine.get_total_cores()
if max_cores != self.max_cores:
log.info(
"Mismatch of value `max_cores` on resource '%s':"
" configuration file says `max_cores=%d` while it's actually `%d`."
" Updating current value.",
self.name, self.max_cores, max_cores)
self.max_cores = max_cores
def _init_total_memory(self):
self.total_memory = self._machine.get_total_memory()
if self.total_memory != self.max_memory_per_core:
log.info(
"Mismatch of value `max_memory_per_core` on resource %s:"
" configuration file says `max_memory_per_core=%s` while it's"
" actually `%s`. Updating current value.",
self.name,
self.max_memory_per_core,
self.total_memory.to_str('%g%s', unit=Memory.MB))
self.max_memory_per_core = self.total_memory
def _locate_gnu_time(self):
"""
Return the command path to run the GNU `time` binary.
:raise ConfigurationError:
if no GNU ``time`` executable can be located.
"""
candidates = [
'time', # default on Linux systems
'gtime', # MacOSX with Homebrew or MacPorts
]
if self._time_cmd:
# try this first
candidates.insert(0, self._time_cmd)
for time_cmd in candidates:
gc3libs.log.debug(
"Checking if GNU time is available as command `%s`", time_cmd)
# We use `command` in order to force the shell to execute
# the binary and not the shell builtin (cf. the POSIX
# standard). However note that the wrapper script will
# execute `exec time_cmd` in order to replace the current
# shell, but `exec` will never run the builtin.
exit_code, stdout, stderr = self.transport.execute_command(
'command %s --version 2>&1 | grep GNU' % time_cmd)
if exit_code == 0:
# command is GNU! Good!
return time_cmd
raise gc3libs.exceptions.ConfigurationError(
"Unable to find GNU `time` on resource `{name}`."
" Please, install GNU time and set the `time_cmd`"
" configuration option in gc3pie.conf."
.format(name=self.name))
## Bookkeeping
#
# The following methods deal with internal book-keeping: how much
# of the target's configured resources has been used by GC3Pie.
# Presently, book-keeping is so complicated (and requires
# recomputing at each invocation) because GC3Pie does makes the
# assumption that the target resource is *shared*, i.e., other
# GC3Pie processes run concurrently by the user may compete for
# the same resources.
#
def count_running_tasks(self):
"""
Returns number of currently running tasks.
.. note::
1. The count of running tasks includes also tasks that may
have been started by another GC3Pie process so this count
can be positive when the resource has just been opened.
2. The count is updated every time the resource is updated,
so the returned number can be stale if the
`ShellcmdLrms.get_resource_status()` has not been called
for a while.
"""
return sum(1 for info in self._job_infos.values()
if not info['terminated'])
def count_used_cores(self):
"""
Return total nr. of cores used by running tasks.
Similar caveats as in `ShellcmdLrms.count_running_tasks`:meth:
apply here.
"""
return sum(info['requested_cores']
for info in self._job_infos.values()
if not info['terminated'])
def count_used_memory(self):
"""
Return total amount of memory used by running tasks.
Similar caveats as in `ShellcmdLrms.count_running_tasks`:meth:
apply here.
"""
return sum(
# FIXME: if `requested_memory==None` then just do not
# account a task's memory usage. This is of course
# incorrect and leads to situations where a single task
# can wreak havoc on an entire compute node, but is
# consistent with what we do during scheduling /
# requirements check. (OTOH, it's *not* consistent with
# what other backends do: SLURM, for example, takes the
# pessimistic stance that a job with no memory
# requirements is using (DefMemPerCPU * NumCPUs)
((info['requested_memory'] or 0*MB)
for info in self._job_infos.values()
if not info['terminated']), 0*MB)
def _get_persisted_job_info(self):
"""
Get information on total resources from the files stored in
`self.resource_dir`. It then returns a dictionary {PID: {key:
values}} with informations for each job which is associated to
a running process.
"""
self.transport.connect()
job_infos = {}
pidfiles = self.transport.listdir(self.resource_dir)
if pidfiles:
log.debug("Checking status of the following PIDs: %s",
str.join(", ", pidfiles))
for pid in pidfiles:
job = self._read_job_info_file(pid)
if job:
job_infos[str(pid)] = job
else:
# Process not found, ignore it
continue
return job_infos
def _read_job_info_file(self, pid):
"""
Get resource information on job with pid `pid`, if it
exists. Returns None if it does not exist.
"""
self.transport.connect()
log.debug("Reading job info file for pid %r", pid)
jobinfo = None
path = posixpath.join(self.resource_dir, str(pid))
with self.transport.open(path, 'rb') as fp:
try:
jobinfo = pickle.load(fp)
except Exception as ex:
log.error("Unable to read remote job info file %s: %s",
path, ex)
raise
return jobinfo
def _write_job_info_file(self, pid, resources):
"""
Update file in `self.resource_dir/PID` with `resources`.
"""
self.transport.connect()
# XXX: We should check for exceptions!
log.debug("Updating job info file for pid %s", pid)
with self.transport.open(
posixpath.join(self.resource_dir, str(pid)), 'wb') as fp:
pickle.dump(resources, fp, -1)
def _delete_job_info_file(self, pid):
"""
Delete `self.resource_dir/PID` file
"""
self.transport.connect()
log.debug("Deleting job info file for pid %s ...", pid)
pidfile = posixpath.join(self.resource_dir, pid)
try:
self.transport.remove(pidfile)
except Exception as err:
msg = str(err)
if 'OSError: [Errno 2]' not in msg:
log.debug(
"Ignored error deleting file `%s`: %s: %s",
pidfile, err.__class__.__name__, err)
## Backend interface implementation
#
# These methods provide what is expected of any LRMS class.
#
def _connect(self):
"""Ensure transport to remote resource works."""
try:
self.transport.connect()
except gc3libs.exceptions.TransportError as err:
log.error("Unable to connect to host `%s`: %s", self.frontend, err)
raise
try:
self._machine
except AttributeError:
self._gather_machine_specs()
def cancel_job(self, app):
"""
Kill all children processes of the given task `app`.
The PID of the wrapper script (which is the root of the PID
tree we are going to send a "TERM" signal) must have been
stored (by `submit_job`:meth:) as `app.execution.lrms_jobid`.
"""
try:
root_pid = app.execution.lrms_jobid
except ValueError:
raise gc3libs.exceptions.InvalidArgument(
"Invalid field `lrms_jobid` in Task '{0}':"
" expected a number, got '{{2}) instead"
.format(app, app.execution.lrms_jobid,
type(app.execution.lrms_jobid)))