Skip to content

Commit

Permalink
Respect the minimum polling interval for scheduler updates (#3096)
Browse files Browse the repository at this point in the history
The `aiida.engine.processes.calcjobs.manager.JobsList` container was
introduced for two purposes, related to managing running calculation
jobs in high-throughput mode:

 * It bundles the scheduler update calls for all active calculation jobs
   for given authentication info (the comination of a computer and user)
 * It ensures that consecutive scheduler update calls are separated at
   least by a time that is given by the `get_minimum_job_poll_interval`
   as defined by the computer of the authentication info.

However, the final requirement was not being respected. The problem was
twofold. The internal attribute `_last_updated` that records the
timestamp of the last update was not being updated after the scheduler
was queried for a status update.

The use of the `RefObjectStore` to create and delete these `JobsLists`
instances, however, was an even bigger problem. The reference store
would delete the `JobsLists` instance as soon as no-one held a reference
to it any more, since they are created through the
`request_job_info_request` context manager of the `JobManager` that each
runner has. As soon as no requests were active, the object was deleted
and with it the record of the last time the scheduler was queried. The
next time a request comes in a new `JobsList` would be created that
straight away call the scheduler as it has no recollection the last time
it was called for the given authinfo, if at all. This would result in
the minimum poll interval essentially never being respected.

Fundamentally, the problem lies in the fact that the data that needs to
be persistent, the "last updated" timestamp, was being stored in the
container `JobsList` that by implementation was made non-persistent.
The solution is to simply make the `JobsList` instances live for as long
as the python interpreter is alive. Each daemon runner creates a single
`JobManager` instance on start up and this will now only create a new
`JobsList` once for each authinfo and will keep returning the same
instance for the rest of its lifetime.

Co-Authored-By: Leopold Talirz <leopold.talirz@gmail.com>
  • Loading branch information
sphuber and ltalirz committed Jul 1, 2019
1 parent 1578227 commit a5b6477
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 223 deletions.
1 change: 1 addition & 0 deletions aiida/backends/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
'engine.daemon': ['aiida.backends.tests.engine.test_daemon'],
'engine.futures': ['aiida.backends.tests.engine.test_futures'],
'engine.launch': ['aiida.backends.tests.engine.test_launch'],
'engine.manager': ['aiida.backends.tests.engine.test_manager'],
'engine.persistence': ['aiida.backends.tests.engine.test_persistence'],
'engine.ports': ['aiida.backends.tests.engine.test_ports'],
'engine.process': ['aiida.backends.tests.engine.test_process'],
Expand Down
81 changes: 81 additions & 0 deletions aiida/backends/tests/engine/test_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""Tests for the classes in `aiida.engine.processes.calcjobs.manager`."""
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import

import time

import tornado

from aiida.orm import AuthInfo, User
from aiida.backends.testbase import AiidaTestCase
from aiida.engine.processes.calcjobs.manager import JobManager, JobsList
from aiida.engine.transports import TransportQueue


class TestJobManager(AiidaTestCase):
"""Test the `aiida.engine.processes.calcjobs.manager.JobManager` class."""

def setUp(self):
super(TestJobManager, self).setUp()
self.loop = tornado.ioloop.IOLoop()
self.transport_queue = TransportQueue(self.loop)
self.user = User.objects.get_default()
self.auth_info = AuthInfo(self.computer, self.user).store()
self.manager = JobManager(self.transport_queue)

def tearDown(self):
super(TestJobManager, self).tearDown()
AuthInfo.objects.delete(self.auth_info.pk)

def test_get_jobs_list(self):
"""Test the `JobManager.get_jobs_list` method."""
jobs_list = self.manager.get_jobs_list(self.auth_info)
self.assertIsInstance(jobs_list, JobsList)

# Calling the method again, should return the exact same instance of `JobsList`
self.assertEqual(self.manager.get_jobs_list(self.auth_info), jobs_list)

def test_request_job_info_update(self):
"""Test the `JobManager.request_job_info_update` method."""
with self.manager.request_job_info_update(self.auth_info, job_id=1) as request:
self.assertIsInstance(request, tornado.concurrent.Future)


class TestJobsList(AiidaTestCase):
"""Test the `aiida.engine.processes.calcjobs.manager.JobsList` class."""

def setUp(self):
super(TestJobsList, self).setUp()
self.loop = tornado.ioloop.IOLoop()
self.transport_queue = TransportQueue(self.loop)
self.user = User.objects.get_default()
self.auth_info = AuthInfo(self.computer, self.user).store()
self.jobs_list = JobsList(self.auth_info, self.transport_queue)

def tearDown(self):
super(TestJobsList, self).tearDown()
AuthInfo.objects.delete(self.auth_info.pk)

def test_get_minimum_update_interval(self):
"""Test the `JobsList.get_minimum_update_interval` method."""
minimum_poll_interval = self.auth_info.computer.get_minimum_job_poll_interval()
self.assertEqual(self.jobs_list.get_minimum_update_interval(), minimum_poll_interval)

def test_last_updated(self):
"""Test the `JobsList.last_updated` method."""
jobs_list = JobsList(self.auth_info, self.transport_queue)
self.assertEqual(jobs_list.last_updated, None)

last_updated = time.time()
jobs_list = JobsList(self.auth_info, self.transport_queue, last_updated=last_updated)
self.assertEqual(jobs_list.last_updated, last_updated)
68 changes: 1 addition & 67 deletions aiida/backends/tests/engine/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@
from __future__ import absolute_import
from __future__ import print_function

import unittest
from tornado.ioloop import IOLoop
from tornado.gen import coroutine

from aiida import orm
from aiida.backends.testbase import AiidaTestCase
from aiida.engine.utils import exponential_backoff_retry, RefObjectStore
from aiida.engine.utils import exponential_backoff_retry

ITERATION = 0
MAX_ITERATIONS = 3
Expand Down Expand Up @@ -65,68 +64,3 @@ def coro():
max_attempts = MAX_ITERATIONS - 1
with self.assertRaises(RuntimeError):
loop.run_sync(lambda: exponential_backoff_retry(coro, initial_interval=0.1, max_attempts=max_attempts))


class RefObjectsStore(unittest.TestCase):

def test_simple(self):
""" Test the reference counting works """
IDENTIFIER = 'a'
OBJECT = 'my string'
obj_store = RefObjectStore()

with obj_store.get(IDENTIFIER, lambda: OBJECT) as obj:
# Make sure we got back the same object
self.assertIs(OBJECT, obj)

# Now check that the reference has the correct information
ref = obj_store._objects['a']
self.assertEqual(OBJECT, ref._obj)
self.assertEqual(1, ref.count)

# Now request the object again
with obj_store.get(IDENTIFIER) as obj2:
# ...and check the reference has had it's count upped
self.assertEqual(OBJECT, obj2)
self.assertEqual(2, ref.count)

# Now it should have been reduced
self.assertEqual(1, ref.count)

# Finally the store should be empty (there are no more references)
self.assertEqual(0, len(obj_store._objects))

def test_get_no_constructor(self):
"""
Test that trying to get an object that does exists and providing
no means to construct it fails
"""
obj_store = RefObjectStore()
with self.assertRaises(ValueError):
with obj_store.get('a'):
pass

def test_construct(self):
""" Test that construction only gets called when used """
IDENTIFIER = 'a'
OBJECT = 'my string'

# Use a list for a single number so we can get references to it
times_constructed = [
0,
]

def construct():
times_constructed[0] += 1
return OBJECT

obj_store = RefObjectStore()
with obj_store.get(IDENTIFIER, construct):
self.assertEqual(1, times_constructed[0])
with obj_store.get(IDENTIFIER, construct):
self.assertEqual(1, times_constructed[0])

# Now the object should be removed and so another call to get
# should create
with obj_store.get(IDENTIFIER, construct):
self.assertEqual(2, times_constructed[0])
2 changes: 2 additions & 0 deletions aiida/engine/daemon/execmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,15 @@ def submit_calculation(calculation, transport, calc_info, script_filename):
:param transport: an already opened transport to use to submit the calculation.
:param calc_info: the calculation info datastructure returned by `CalcJobNode._presubmit`
:param script_filename: the job launch script returned by `CalcJobNode._presubmit`
:return: the job id as returned by the scheduler `submit_from_script` call
"""
scheduler = calculation.computer.get_scheduler()
scheduler.set_transport(transport)

workdir = calculation.get_remote_workdir()
job_id = scheduler.submit_from_script(workdir, script_filename)
calculation.set_job_id(job_id)
return job_id


def retrieve_calculation(calculation, transport, retrieved_temporary_folder):
Expand Down
Loading

0 comments on commit a5b6477

Please sign in to comment.