-
Notifications
You must be signed in to change notification settings - Fork 187
/
scheduler.py
384 lines (291 loc) · 15.8 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""Implementation of `Scheduler` base class."""
import abc
from aiida.common import exceptions, log
from aiida.common.escaping import escape_for_bash
from aiida.common.lang import classproperty
from aiida.schedulers.datastructures import JobResource, JobTemplate
__all__ = ('Scheduler', 'SchedulerError', 'SchedulerParsingError')
class SchedulerError(exceptions.AiidaException):
pass
class SchedulerParsingError(SchedulerError):
pass
class Scheduler(metaclass=abc.ABCMeta):
"""Base class for a job scheduler."""
_logger = log.AIIDA_LOGGER.getChild('scheduler')
# A list of features
# Features that should be defined in the plugins:
# 'can_query_by_user': True if I can pass the 'user' argument to
# get_joblist_command (and in this case, no 'jobs' should be given).
# Otherwise, if False, a list of jobs is passed, and no 'user' is given.
_features = {}
# The class to be used for the job resource.
_job_resource_class = None
def __str__(self):
return self.__class__.__name__
@classmethod
def preprocess_resources(cls, resources, default_mpiprocs_per_machine=None):
"""Pre process the resources.
Add the `num_mpiprocs_per_machine` key to the `resources` if it is not already defined and it cannot be deduced
from the `num_machines` and `tot_num_mpiprocs` being defined. The value is also not added if the job resource
class of this scheduler does not accept the `num_mpiprocs_per_machine` keyword. Note that the changes are made
in place to the `resources` argument passed.
"""
num_machines = resources.get('num_machines', None)
tot_num_mpiprocs = resources.get('tot_num_mpiprocs', None)
num_mpiprocs_per_machine = resources.get('num_mpiprocs_per_machine', None)
if (
num_mpiprocs_per_machine is None and cls.job_resource_class.accepts_default_mpiprocs_per_machine() # pylint: disable=no-member
and (num_machines is None or tot_num_mpiprocs is None)
):
resources['num_mpiprocs_per_machine'] = default_mpiprocs_per_machine
@classmethod
def validate_resources(cls, **resources):
"""Validate the resources against the job resource class of this scheduler.
:param resources: keyword arguments to define the job resources
:raises ValueError: if the resources are invalid or incomplete
"""
cls._job_resource_class.validate_resources(**resources)
def __init__(self):
self._transport = None
if not issubclass(self._job_resource_class, JobResource):
raise RuntimeError('the class attribute `_job_resource_class` is not a subclass of `JobResource`.')
@classmethod
def get_short_doc(cls):
"""Return the first non-empty line of the class docstring, if available."""
# Remove empty lines
docstring = cls.__doc__
if not docstring:
return 'No documentation available'
doclines = [i for i in docstring.splitlines() if i.strip()]
if doclines:
return doclines[0].strip()
return 'No documentation available'
def get_feature(self, feature_name):
try:
return self._features[feature_name]
except KeyError:
raise NotImplementedError(f'Feature {feature_name} not implemented for this scheduler')
@property
def logger(self):
"""Return the internal logger."""
try:
return self._logger
except AttributeError:
raise exceptions.InternalError('No self._logger configured for {}!')
@classproperty
def job_resource_class(cls): # pylint: disable=no-self-argument
return cls._job_resource_class
@classmethod
def create_job_resource(cls, **kwargs):
"""Create a suitable job resource from the kwargs specified."""
# pylint: disable=not-callable
return cls._job_resource_class(**kwargs)
def get_submit_script(self, job_tmpl):
"""Return the submit script as a string.
:parameter job_tmpl: a `aiida.schedulers.datastrutures.JobTemplate` instance.
The plugin returns something like
#!/bin/bash <- this shebang line is configurable to some extent
scheduler_dependent stuff to choose numnodes, numcores, walltime, ...
prepend_computer [also from calcinfo, joined with the following?]
prepend_code [from calcinfo]
output of _get_script_main_content
postpend_code
postpend_computer
"""
if not isinstance(job_tmpl, JobTemplate):
raise exceptions.InternalError('job_tmpl should be of type JobTemplate')
empty_line = ''
# I fill the list with the lines, and finally join them and return
script_lines = []
if job_tmpl.shebang:
script_lines.append(job_tmpl.shebang)
elif job_tmpl.shebang == '':
# Here I check whether the shebang was set explicitly as an empty line.
# In such a case, the first line is empty, if that's what the user wants:
script_lines.append(job_tmpl.shebang)
elif job_tmpl.shebang is None:
script_lines.append('#!/bin/bash')
else:
raise ValueError(f'Invalid shebang set: {job_tmpl.shebang}')
script_lines.append(self._get_submit_script_header(job_tmpl))
script_lines.append(empty_line)
if job_tmpl.prepend_text:
script_lines.append(job_tmpl.prepend_text)
script_lines.append(empty_line)
script_lines.append(self._get_run_line(job_tmpl.codes_info, job_tmpl.codes_run_mode))
script_lines.append(empty_line)
if job_tmpl.append_text:
script_lines.append(job_tmpl.append_text)
script_lines.append(empty_line)
footer = self._get_submit_script_footer(job_tmpl) # pylint: disable=assignment-from-none
if footer:
script_lines.append(footer)
script_lines.append(empty_line)
return '\n'.join(script_lines)
@abc.abstractmethod
def _get_submit_script_header(self, job_tmpl):
"""Return the submit script header, using the parameters from the job template.
:param job_tmpl: a `JobTemplate` instance with relevant parameters set.
"""
def _get_submit_script_footer(self, job_tmpl):
"""Return the submit script final part, using the parameters from the job template.
:param job_tmpl: a `JobTemplate` instance with relevant parameters set.
"""
# pylint: disable=no-self-use,unused-argument
return None
def _get_run_line(self, codes_info, codes_run_mode):
"""Return a string with the line to execute a specific code with specific arguments.
:parameter codes_info: a list of `aiida.common.datastructures.CodeInfo` objects. Each contains the information
needed to run the code. I.e. `cmdline_params`, `stdin_name`, `stdout_name`, `stderr_name`, `join_files`. See
the documentation of `JobTemplate` and `CodeInfo`.
:parameter codes_run_mode: instance of `aiida.common.datastructures.CodeRunMode` contains the information on how
to launch the multiple codes.
:return: string with format: [executable] [args] {[ < stdin ]} {[ < stdout ]} {[2>&1 | 2> stderr]}
"""
from aiida.common.datastructures import CodeRunMode
list_of_runlines = []
for code_info in codes_info:
command_to_exec_list = []
for arg in code_info.cmdline_params:
command_to_exec_list.append(escape_for_bash(arg))
command_to_exec = ' '.join(command_to_exec_list)
stdin_str = f'< {escape_for_bash(code_info.stdin_name)}' if code_info.stdin_name else ''
stdout_str = f'> {escape_for_bash(code_info.stdout_name)}' if code_info.stdout_name else ''
join_files = code_info.join_files
if join_files:
stderr_str = '2>&1'
else:
stderr_str = f'2> {escape_for_bash(code_info.stderr_name)}' if code_info.stderr_name else ''
output_string = f'{command_to_exec} {stdin_str} {stdout_str} {stderr_str}'
list_of_runlines.append(output_string)
self.logger.debug(f'_get_run_line output: {list_of_runlines}')
if codes_run_mode == CodeRunMode.PARALLEL:
list_of_runlines.append('wait\n')
return ' &\n\n'.join(list_of_runlines)
if codes_run_mode == CodeRunMode.SERIAL:
return '\n\n'.join(list_of_runlines)
raise NotImplementedError('Unrecognized code run mode')
@abc.abstractmethod
def _get_joblist_command(self, jobs=None, user=None):
"""Return the command to get the most complete description possible of currently active jobs.
.. note::
Typically one can pass only either jobs or user, depending on the specific plugin. The choice can be done
according to the value returned by `self.get_feature('can_query_by_user')`
:param jobs: either None to get a list of all jobs in the machine, or a list of jobs.
:param user: either None, or a string with the username (to show only jobs of the specific user).
"""
def _get_detailed_job_info_command(self, job_id):
"""Return the command to run to get detailed information for a given job.
This is typically called after the job has finished, to retrieve the most detailed information possible about
the job. This is done because most schedulers just make finished jobs disappear from the `qstat` command, and
instead sometimes it is useful to know some more detailed information about the job exit status, etc.
:raises: :class:`aiida.common.exceptions.FeatureNotAvailable`
"""
# pylint: disable=no-self-use,not-callable,unused-argument
raise exceptions.FeatureNotAvailable('Cannot get detailed job info')
def get_detailed_job_info(self, job_id):
"""Return the detailed job info.
This will be a dictionary with the return value, stderr and stdout content returned by calling the command that
is returned by `_get_detailed_job_info_command`.
:param job_id: the job identifier
:return: dictionary with `retval`, `stdout` and `stderr`.
"""
command = self._get_detailed_job_info_command(job_id) # pylint: disable=assignment-from-no-return
with self.transport:
retval, stdout, stderr = self.transport.exec_command_wait(command)
detailed_job_info = {
'retval': retval,
'stdout': stdout,
'stderr': stderr,
}
return detailed_job_info
@abc.abstractmethod
def _parse_joblist_output(self, retval, stdout, stderr):
"""Parse the joblist output as returned by executing the command returned by `_get_joblist_command` method.
:return: list of `JobInfo` objects, one of each job each with at least its default params implemented.
"""
def get_jobs(self, jobs=None, user=None, as_dict=False):
"""Return the list of currently active jobs.
.. note:: typically, only either jobs or user can be specified. See also comments in `_get_joblist_command`.
:param list jobs: a list of jobs to check; only these are checked
:param str user: a string with a user: only jobs of this user are checked
:param list as_dict: if False (default), a list of JobInfo objects is returned. If True, a dictionary is
returned, having as key the job_id and as value the JobInfo object.
:return: list of active jobs
"""
with self.transport:
retval, stdout, stderr = self.transport.exec_command_wait(self._get_joblist_command(jobs=jobs, user=user))
joblist = self._parse_joblist_output(retval, stdout, stderr)
if as_dict:
jobdict = {job.job_id: job for job in joblist}
if None in jobdict:
raise SchedulerError('Found at least one job without jobid')
return jobdict
return joblist
@property
def transport(self):
"""Return the transport set for this scheduler."""
if self._transport is None:
raise SchedulerError('Use the set_transport function to set the transport for the scheduler first.')
return self._transport
def set_transport(self, transport):
"""Set the transport to be used to query the machine or to submit scripts.
This class assumes that the transport is open and active.
"""
self._transport = transport
@abc.abstractmethod
def _get_submit_command(self, submit_script):
"""Return the string to execute to submit a given script.
.. warning:: the `submit_script` should already have been bash-escaped
:param submit_script: the path of the submit script relative to the working directory.
:return: the string to execute to submit a given script.
"""
@abc.abstractmethod
def _parse_submit_output(self, retval, stdout, stderr):
"""Parse the output of the submit command returned by calling the `_get_submit_command` command.
:return: a string with the job ID.
"""
def submit_from_script(self, working_directory, submit_script):
"""Submit the submission script to the scheduler.
:return: return a string with the job ID in a valid format to be used for querying.
"""
self.transport.chdir(working_directory)
result = self.transport.exec_command_wait(self._get_submit_command(escape_for_bash(submit_script)))
return self._parse_submit_output(*result)
def kill(self, jobid):
"""Kill a remote job and parse the return value of the scheduler to check if the command succeeded.
..note::
On some schedulers, even if the command is accepted, it may take some seconds for the job to actually
disappear from the queue.
:param jobid: the job ID to be killed
:return: True if everything seems ok, False otherwise.
"""
retval, stdout, stderr = self.transport.exec_command_wait(self._get_kill_command(jobid))
return self._parse_kill_output(retval, stdout, stderr)
@abc.abstractmethod
def _get_kill_command(self, jobid):
"""Return the command to kill the job with specified jobid."""
@abc.abstractmethod
def _parse_kill_output(self, retval, stdout, stderr):
"""Parse the output of the kill command.
:return: True if everything seems ok, False otherwise.
"""
def parse_output(self, detailed_job_info, stdout, stderr):
"""Parse the output of the scheduler.
:param detailed_job_info: dictionary with the output returned by the `Scheduler.get_detailed_job_info` command.
This should contain the keys `retval`, `stdout` and `stderr` corresponding to the return value, stdout and
stderr returned by the accounting command executed for a specific job id.
:param stdout: string with the output written by the scheduler to stdout
:param stderr: string with the output written by the scheduler to stderr
:return: None or an instance of `aiida.engine.processes.exit_code.ExitCode`
"""
raise exceptions.FeatureNotAvailable(f'output parsing is not available for `{self.__class__.__name__}`')