Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CalcJob: move job resource validation to the Scheduler class #4192

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion .pylintrc
Expand Up @@ -59,7 +59,8 @@ disable=bad-continuation,
no-else-raise,
import-outside-toplevel,
cyclic-import,
duplicate-code
duplicate-code,
too-few-public-methods

# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
Expand Down
63 changes: 35 additions & 28 deletions aiida/engine/processes/calcjobs/calcjob.py
Expand Up @@ -26,7 +26,7 @@
__all__ = ('CalcJob',)


def validate_calc_job(inputs, ctx):
def validate_calc_job(inputs, ctx): # pylint: disable=inconsistent-return-statements,too-many-return-statements
"""Validate the entire set of inputs passed to the `CalcJob` constructor.

Reasons that will cause this validation to raise an `InputValidationError`:
Expand All @@ -35,7 +35,7 @@ def validate_calc_job(inputs, ctx):
* The specified computer is not stored
* The `Computer` specified in `metadata.computer` is not the same as that of the specified `Code`

:raises `~aiida.common.exceptions.InputValidationError`: if inputs are invalid
:return: string with error message in case the inputs are invalid
"""
try:
ctx.get_port('code')
Expand All @@ -49,48 +49,57 @@ def validate_calc_job(inputs, ctx):
computer_from_metadata = inputs.get('metadata', {}).get('computer', None)

if not computer_from_code and not computer_from_metadata:
raise exceptions.InputValidationError('no computer has been specified in `metadata.computer` nor via `code`.')
return 'no computer has been specified in `metadata.computer` nor via `code`.'

if computer_from_code and not computer_from_code.is_stored:
raise exceptions.InputValidationError('the Computer<{}> is not stored'.format(computer_from_code))
return 'the Computer<{}> is not stored'.format(computer_from_code)

if computer_from_metadata and not computer_from_metadata.is_stored:
raise exceptions.InputValidationError('the Computer<{}> is not stored'.format(computer_from_metadata))
return 'the Computer<{}> is not stored'.format(computer_from_metadata)

if computer_from_code and computer_from_metadata and computer_from_code.uuid != computer_from_metadata.uuid:
raise exceptions.InputValidationError(
'Computer<{}> explicitly defined in `metadata.computer is different from '
'Computer<{}> which is the computer of Code<{}> defined as the `code` input.'.format(
return (
'Computer<{}> explicitly defined in `metadata.computer` is different from Computer<{}> which is the '
'computer of Code<{}> defined as the `code` input.'.format(
computer_from_metadata, computer_from_code, code
)
)

try:
resources_port = ctx.get_port('metadata.options.resources')
except ValueError:
return

# If the resources port exists but is not required, we don't need to validate it against the computer's scheduler
if not resources_port.required:
return

computer = computer_from_code or computer_from_metadata
scheduler = computer.get_scheduler()
try:
resources = inputs['metadata']['options']['resources']
except KeyError:
return 'input `metadata.options.resources` is required but is not specified'

def validate_parser(parser_name, ctx): # pylint: disable=unused-argument
try:
scheduler.preprocess_resources(resources, computer.get_default_mpiprocs_per_machine())
scheduler.validate_resources(**resources)
except (ValueError, TypeError) as exception:
return 'input `metadata.options.resources` is not valid for the {} scheduler: {}'.format(scheduler, exception)


def validate_parser(parser_name, _): # pylint: disable=inconsistent-return-statements
"""Validate the parser.

:raises InputValidationError: if the parser name does not correspond to a loadable `Parser` class.
:return: string with error message in case the inputs are invalid
"""
from aiida.plugins import ParserFactory

if parser_name is not plumpy.UNSPECIFIED:
try:
ParserFactory(parser_name)
except exceptions.EntryPointError as exception:
raise exceptions.InputValidationError('invalid parser specified: {}'.format(exception))


def validate_resources(resources, ctx): # pylint: disable=unused-argument
"""Validate the resources.

:raises InputValidationError: if `num_machines` is not specified or is not an integer.
"""
if resources is not plumpy.UNSPECIFIED:
if 'num_machines' not in resources:
raise exceptions.InputValidationError('the `resources` input has to at least include `num_machines`.')

if not isinstance(resources['num_machines'], int):
raise exceptions.InputValidationError('the input `resources.num_machines` shoud be an integer.')
return 'invalid parser specified: {}'.format(exception)


class CalcJob(Process):
Expand Down Expand Up @@ -137,7 +146,7 @@ def define(cls, spec: CalcJobProcessSpec):
help='Filename to which the content of stdout of the scheduler is written.')
spec.input('metadata.options.scheduler_stderr', valid_type=str, default='_scheduler-stderr.txt',
help='Filename to which the content of stderr of the scheduler is written.')
spec.input('metadata.options.resources', valid_type=dict, required=True, validator=validate_resources,
spec.input('metadata.options.resources', valid_type=dict, required=True,
help='Set the dictionary of resources to be used by the scheduler plugin, like the number of nodes, '
'cpus etc. This dictionary is scheduler-plugin dependent. Look at the documentation of the '
'scheduler for more details.')
Expand Down Expand Up @@ -389,9 +398,7 @@ def presubmit(self, folder):

# Set resources, also with get_default_mpiprocs_per_machine
resources = self.node.get_option('resources')
def_cpus_machine = computer.get_default_mpiprocs_per_machine()
if def_cpus_machine is not None:
resources['default_mpiprocs_per_machine'] = def_cpus_machine
scheduler.preprocess_resources(resources, computer.get_default_mpiprocs_per_machine())
job_tmpl.job_resource = scheduler.create_job_resource(**resources)

subst_dict = {'tot_num_mpiprocs': job_tmpl.job_resource.get_tot_num_mpiprocs()}
Expand Down