Skip to content

Commit

Permalink
Implement CalcJob process class
Browse files Browse the repository at this point in the history
This commit can be summarized in three steps:

 * Reimplementation of a job calculation as `Process` called `CalcJob`
 * Changing job calculation to be purely informational
 * Remove the old job calculation mechanics and business logic

The old way of creating a job calculation, was to subclass the
`JobCalculation` class and override the `_use_methods` class method to
define the input nodes and the `_prepare_for_submission` to setup the
input files for the calculation. The problem was that these methods were
implemented on the `Node` class, thus mixing the responsabilities of
running and introspecting the results of a completed calculation.

Here we define the `CalcJob` class, a subclass of `Process`. This class
replaces the old `JobCalculation` and allows a user to defined the
inputs and outputs through the `ProcessSpec`, just as they would do for
a `WorkChain`. Except, instead of defining an `outline`, one should
implement the `prepare_for_submission`, which fulfills the exact same
function as before, only it is now a public method of the `CalcJob`
process class.

Finally, the role of the job calculation state, stored as an attribute
with the key `state` on the `CalcJobNode` has changed significantly.
The original job calculations had a calculation state that controlled
the logic during its lifetime. This was already superceded a long time
ago by the process wrapper that now fully governs the progression of the
calculation. Despite the calculation state no longer being authoritative
during the calculation's lifetime, it was still present. Here we finally
fully remove and only leave a stripped down version. The remaining state
is stored as an attribute and is a sub state while the `CalcJob` process
is in an active state and serves as a more granual state that can be
queried for. This is useful, because the process status, which also
keeps similar information is human readable and doesn't allow for easy
querying.
  • Loading branch information
sphuber committed Jan 15, 2019
1 parent a2456ab commit fa3652c
Show file tree
Hide file tree
Showing 98 changed files with 1,719 additions and 3,744 deletions.
84 changes: 9 additions & 75 deletions .ci/test_daemon.py
Expand Up @@ -177,49 +177,6 @@ def validate_cached(cached_calcs):
return valid


def create_calculation(code, counter, inputval, use_cache=False):
parameters = ParameterData(dict={'value': inputval})
template = ParameterData(dict={
# The following line adds a significant sleep time.
# I set it to 1 second to speed up tests
# I keep it to a non-zero value because I want
# To test the case when AiiDA finds some calcs
# in a queued state
# 'cmdline_params': ["{}".format(counter % 3)], # Sleep time
'cmdline_params': ["1"],
'input_file_template': "{value}", # File just contains the value to double
'input_file_name': 'value_to_double.txt',
'output_file_name': 'output.txt',
'retrieve_temporary_files': ['triple_value.tmp']
})
calc = code.new_calc()
calc.set_option('max_wallclock_seconds', 5 * 60) # 5 min
calc.set_option('resources', {"num_machines": 1})
calc.set_option('withmpi', False)
calc.set_option('parser_name', 'templatereplacer.doubler')

calc.use_parameters(parameters)
calc.use_template(template)
calc.store_all(use_cache=use_cache)
expected_result = {
'value': 2 * inputval,
'retrieved_temporary_files': {
'triple_value.tmp': str(inputval * 3)
}
}
print("[{}] created calculation {}, pk={}".format(counter, calc.uuid, calc.pk))
return calc, expected_result


def submit_calculation(code, counter, inputval):
calc, expected_result = create_calculation(
code=code, counter=counter, inputval=inputval
)
calc.submit()
print("[{}] calculation submitted.".format(counter))
return calc, expected_result


def launch_calculation(code, counter, inputval):
"""
Launch calculations to the daemon through the Process layer
Expand All @@ -245,8 +202,6 @@ def create_calculation_process(code, inputval):
Create the process and inputs for a submitting / running a calculation.
"""
TemplatereplacerCalculation = CalculationFactory('templatereplacer')
process = TemplatereplacerCalculation.process()

parameters = ParameterData(dict={'value': inputval})
template = ParameterData(dict={
# The following line adds a significant sleep time.
Expand Down Expand Up @@ -281,35 +236,20 @@ def create_calculation_process(code, inputval):
'code': code,
'parameters': parameters,
'template': template,
'options': options,
'metadata': {
'options': options,
}
}
return process, inputs, expected_result


def create_cache_calc(code, counter, inputval):
calc, expected_result = create_calculation(
code=code, counter=counter, inputval=inputval, use_cache=True
)
print("[{}] created cached calculation.".format(counter))
return calc, expected_result
return TemplatereplacerCalculation, inputs, expected_result


def main():
expected_results_calculations = {}
expected_results_workchains = {}
code = Code.get_from_string(codename)

# Submitting the Calculations the old way, creating and storing a JobCalc first and submitting it
print("Submitting {} old style calculations to the daemon".format(number_calculations))
for counter in range(1, number_calculations + 1):
inputval = counter
calc, expected_result = submit_calculation(
code=code, counter=counter, inputval=inputval
)
expected_results_calculations[calc.pk] = expected_result

# Submitting the Calculations the new way directly through the launchers
print("Submitting {} new style calculations to the daemon".format(number_calculations))
print("Submitting {} calculations to the daemon".format(number_calculations))
for counter in range(1, number_calculations + 1):
inputval = counter
calc, expected_result = launch_calculation(
Expand Down Expand Up @@ -416,18 +356,12 @@ def main():
print("Timeout!! Calculation did not complete after {} seconds".format(timeout_secs))
sys.exit(2)
else:
# create cached calculations -- these should be FINISHED immediately
# Launch the same calculations but with caching enabled -- these should be FINISHED immediately
cached_calcs = []
for counter in range(1, number_calculations + 1):
calc, expected_result = create_cache_calc(
code=code, counter=counter, inputval=counter
)
cached_calcs.append(calc)
expected_results_calculations[calc.pk] = expected_result
# new style cached calculations, with 'run'
with enable_caching():
with enable_caching(node_class=CalcJobNode):
for counter in range(1, number_calculations + 1):
calc, expected_result = run_calculation(code=code, counter=counter, inputval=counter)
inputval = counter
calc, expected_result = run_calculation(code=code, counter=counter, inputval=inputval)
cached_calcs.append(calc)
expected_results_calculations[calc.pk] = expected_result

Expand Down
3 changes: 0 additions & 3 deletions .pre-commit-config.yaml
Expand Up @@ -150,9 +150,6 @@
aiida/orm/autogroup.py|
aiida/orm/backend.py|
aiida/orm/querybuilder.py|
aiida/orm/calculation/__init__.py|
aiida/orm/calculation/inline.py|
aiida/orm/calculation/job/__init__.py|
aiida/orm/code.py|
aiida/orm/data/array/bands.py|
aiida/orm/data/array/kpoints.py|
Expand Down
4 changes: 2 additions & 2 deletions .pylintrc
Expand Up @@ -136,7 +136,7 @@ function-name-hint=(([a-z][a-z0-9_]{2,40})|(_[a-z0-9_]*))$
function-rgx=(([a-z][a-z0-9_]{2,30})|(_[a-z0-9_]*))$

# Good variable names which should always be accepted, separated by a comma
good-names=i,j,k,ex,Run,_, _INPUT_FILE_NAME, _OUTPUT_FILE_NAME, pk
good-names=i,j,k,ex,Run,_,pk

# Include a hint for the correct naming format with invalid-name
include-naming-hint=no
Expand Down Expand Up @@ -409,7 +409,7 @@ defining-attr-methods=__init__,__new__,setUp

# List of member names, which should be excluded from the protected access
# warning.
exclude-protected=_asdict,_fields,_replace,_source,_make,_get_linkname_retrieved
exclude-protected=_asdict,_fields,_replace,_source,_make

# List of valid names for the first argument in a class method.
valid-classmethod-first-arg=cls
Expand Down
2 changes: 0 additions & 2 deletions aiida/backends/djsite/db/__init__.py
Expand Up @@ -7,5 +7,3 @@
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################


Expand Up @@ -22,6 +22,7 @@
# Currently valid hash key
_HASH_EXTRA_KEY = '_aiida_hash'


def notify_user(apps, schema_editor):
echo_warning("Invalidating all the hashes of all the nodes. Please run verdi rehash", bold=True)

Expand All @@ -37,7 +38,6 @@ class Migration(migrations.Migration):
migrations.RunPython(notify_user, reverse_code=notify_user),
migrations.RunSQL(
""" DELETE FROM db_dbextra WHERE key='""" + _HASH_EXTRA_KEY + """';""",
reverse_sql=""" DELETE FROM db_dbextra
WHERE key='""" + _HASH_EXTRA_KEY + """';"""),
reverse_sql=""" DELETE FROM db_dbextra WHERE key='""" + _HASH_EXTRA_KEY + """';"""),
upgrade_schema_version(REVISION, DOWN_REVISION)
]
@@ -0,0 +1,115 @@
# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
# pylint: disable=invalid-name,too-few-public-methods
"""Migration of CalcJobNode attributes for metadata options whose key changed."""
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from __future__ import absolute_import

# Remove when https://github.com/PyCQA/pylint/issues/1931 is fixed
# pylint: disable=no-name-in-module,import-error
from django.db import migrations

from aiida.backends.djsite.db.migrations import upgrade_schema_version

REVISION = '1.0.23'
DOWN_REVISION = '1.0.22'


class Migration(migrations.Migration):
"""Migration of CalcJobNode attributes for metadata options whose key changed.
Renamed attribute keys:
* `custom_environment_variables` -> `environment_variables`
* `jobresource_params` -> `resources`
* `_process_label` -> `process_label`
* `parser` -> `parser_name`
Deleted attributes:
* `linkname_retrieved` (We do not actually delete it just in case some relies on it)
"""

dependencies = [
('db', '0022_dbgroup_type_string_change_content'),
]

operations = [
migrations.RunSQL(
sql="""
UPDATE db_dbattribute AS attribute
SET key = regexp_replace(attribute.key, '^custom_environment_variables', 'environment_variables')
FROM db_dbnode AS node
WHERE
attribute.key like 'custom_environment_variables%' AND
node.type = 'node.process.calculation.calcjob.CalcJobNode.';
-- custom_environment_variables -> environment_variables
UPDATE db_dbattribute AS attribute
SET key = regexp_replace(attribute.key, '^jobresource_params', 'resources')
FROM db_dbnode AS node
WHERE
attribute.key like 'jobresource_params%' AND
node.type = 'node.process.calculation.calcjob.CalcJobNode.';
-- jobresource_params -> resources
UPDATE db_dbattribute AS attribute
SET key = regexp_replace(attribute.key, '^_process_label', 'process_label')
FROM db_dbnode AS node
WHERE
attribute.key like '_process_label' AND
node.type like 'node.process.%';
-- _process_label -> process_label
UPDATE db_dbattribute AS attribute
SET key = regexp_replace(attribute.key, '^parser', 'parser_name')
FROM db_dbnode AS node
WHERE
attribute.key like 'parser%' AND
node.type = 'node.process.calculation.calcjob.CalcJobNode.';
-- parser -> parser_name
""",
reverse_sql="""
UPDATE db_dbattribute AS attribute
SET key = regexp_replace(attribute.key, '^environment_variables', 'custom_environment_variables')
FROM db_dbnode AS node
WHERE
attribute.key like 'environment_variables%' AND
node.type = 'node.process.calculation.calcjob.CalcJobNode.';
-- environment_variables -> custom_environment_variables
UPDATE db_dbattribute AS attribute
SET key = regexp_replace(attribute.key, '^resources', 'jobresource_params')
FROM db_dbnode AS node
WHERE
attribute.key like 'resources%' AND
node.type = 'node.process.calculation.calcjob.CalcJobNode.';
-- resources -> jobresource_params
UPDATE db_dbattribute AS attribute
SET key = regexp_replace(attribute.key, '^process_label', '_process_label')
FROM db_dbnode AS node
WHERE
attribute.key like 'process_label%' AND
node.type like 'node.process.%';
-- process_label -> _process_label
UPDATE db_dbattribute AS attribute
SET key = regexp_replace(attribute.key, '^parser_name', 'parser')
FROM db_dbnode AS node
WHERE
attribute.key like 'parser_name%' AND
node.type = 'node.process.calculation.calcjob.CalcJobNode.';
-- parser_name -> parser
"""),
upgrade_schema_version(REVISION, DOWN_REVISION)
]
5 changes: 3 additions & 2 deletions aiida/backends/djsite/db/migrations/__init__.py
Expand Up @@ -11,7 +11,8 @@
from __future__ import print_function
from __future__ import absolute_import

LATEST_MIGRATION = '0022_dbgroup_type_string_change_content'
LATEST_MIGRATION = '0023_calc_job_option_attribute_keys'


def _update_schema_version(version, apps, schema_editor):
from aiida.backends.djsite.utils import set_db_schema_version
Expand All @@ -31,7 +32,7 @@ def current_schema_version():
# Have to use this ugly way of importing because the django migration
# files start with numbers which are not a valid package name
latest_migration = __import__(
"aiida.backends.djsite.db.migrations.{}".format(LATEST_MIGRATION),
'aiida.backends.djsite.db.migrations.{}'.format(LATEST_MIGRATION),
fromlist=['REVISION']
)
return latest_migration.REVISION
3 changes: 1 addition & 2 deletions aiida/backends/djsite/db/models.py
Expand Up @@ -841,8 +841,7 @@ def create_value(cls, key, value, subspecifier_value=None,
try:
jsondata = json.dumps(value)
except TypeError:
raise ValueError("Unable to store the value: it must be "
"either a basic datatype, or json-serializable")
raise ValueError("Unable to store the value: it must be either a basic datatype, or json-serializable: {}".format(value))

new_entry.datatype = 'json'
new_entry.tval = jsondata
Expand Down
17 changes: 6 additions & 11 deletions aiida/backends/djsite/db/subtests/generic.py
Expand Up @@ -10,14 +10,13 @@
"""
Generic tests that need the use of the DB
"""

from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
from aiida.backends.testbase import AiidaTestCase
from aiida.common import exceptions
from aiida.orm import Node, Data

from aiida import orm
from aiida.backends.testbase import AiidaTestCase
from aiida.orm import Data


class TestComputer(AiidaTestCase):
Expand All @@ -36,13 +35,9 @@ def test_deletion(self):
# # This should be possible, because nothing is using this computer
orm.Computer.objects.delete(newcomputer.id)

calc_params = {
'computer': self.computer,
'resources': {'num_machines': 1,
'num_mpiprocs_per_machine': 1}
}

_ = CalcJobNode(**calc_params).store()
calc = CalcJobNode(computer=self.computer)
calc.set_option('resources', {'num_machines': 1, 'num_mpiprocs_per_machine': 1})
calc.store()

# This should fail, because there is at least a calculation
# using this computer (the one created just above)
Expand Down

0 comments on commit fa3652c

Please sign in to comment.