-
Notifications
You must be signed in to change notification settings - Fork 200
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Respect the minimum polling interval for scheduler updates (#3096)
The `aiida.engine.processes.calcjobs.manager.JobsList` container was introduced for two purposes, related to managing running calculation jobs in high-throughput mode: * It bundles the scheduler update calls for all active calculation jobs for given authentication info (the comination of a computer and user) * It ensures that consecutive scheduler update calls are separated at least by a time that is given by the `get_minimum_job_poll_interval` as defined by the computer of the authentication info. However, the final requirement was not being respected. The problem was twofold. The internal attribute `_last_updated` that records the timestamp of the last update was not being updated after the scheduler was queried for a status update. The use of the `RefObjectStore` to create and delete these `JobsLists` instances, however, was an even bigger problem. The reference store would delete the `JobsLists` instance as soon as no-one held a reference to it any more, since they are created through the `request_job_info_request` context manager of the `JobManager` that each runner has. As soon as no requests were active, the object was deleted and with it the record of the last time the scheduler was queried. The next time a request comes in a new `JobsList` would be created that straight away call the scheduler as it has no recollection the last time it was called for the given authinfo, if at all. This would result in the minimum poll interval essentially never being respected. Fundamentally, the problem lies in the fact that the data that needs to be persistent, the "last updated" timestamp, was being stored in the container `JobsList` that by implementation was made non-persistent. The solution is to simply make the `JobsList` instances live for as long as the python interpreter is alive. Each daemon runner creates a single `JobManager` instance on start up and this will now only create a new `JobsList` once for each authinfo and will keep returning the same instance for the rest of its lifetime. Co-Authored-By: Leopold Talirz <leopold.talirz@gmail.com>
- Loading branch information
Showing
7 changed files
with
203 additions
and
223 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
# -*- coding: utf-8 -*- | ||
########################################################################### | ||
# Copyright (c), The AiiDA team. All rights reserved. # | ||
# This file is part of the AiiDA code. # | ||
# # | ||
# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core # | ||
# For further information on the license, see the LICENSE.txt file # | ||
# For further information please visit http://www.aiida.net # | ||
########################################################################### | ||
"""Tests for the classes in `aiida.engine.processes.calcjobs.manager`.""" | ||
from __future__ import division | ||
from __future__ import print_function | ||
from __future__ import absolute_import | ||
|
||
import time | ||
|
||
import tornado | ||
|
||
from aiida.orm import AuthInfo, User | ||
from aiida.backends.testbase import AiidaTestCase | ||
from aiida.engine.processes.calcjobs.manager import JobManager, JobsList | ||
from aiida.engine.transports import TransportQueue | ||
|
||
|
||
class TestJobManager(AiidaTestCase): | ||
"""Test the `aiida.engine.processes.calcjobs.manager.JobManager` class.""" | ||
|
||
def setUp(self): | ||
super(TestJobManager, self).setUp() | ||
self.loop = tornado.ioloop.IOLoop() | ||
self.transport_queue = TransportQueue(self.loop) | ||
self.user = User.objects.get_default() | ||
self.auth_info = AuthInfo(self.computer, self.user).store() | ||
self.manager = JobManager(self.transport_queue) | ||
|
||
def tearDown(self): | ||
super(TestJobManager, self).tearDown() | ||
AuthInfo.objects.delete(self.auth_info.pk) | ||
|
||
def test_get_jobs_list(self): | ||
"""Test the `JobManager.get_jobs_list` method.""" | ||
jobs_list = self.manager.get_jobs_list(self.auth_info) | ||
self.assertIsInstance(jobs_list, JobsList) | ||
|
||
# Calling the method again, should return the exact same instance of `JobsList` | ||
self.assertEqual(self.manager.get_jobs_list(self.auth_info), jobs_list) | ||
|
||
def test_request_job_info_update(self): | ||
"""Test the `JobManager.request_job_info_update` method.""" | ||
with self.manager.request_job_info_update(self.auth_info, job_id=1) as request: | ||
self.assertIsInstance(request, tornado.concurrent.Future) | ||
|
||
|
||
class TestJobsList(AiidaTestCase): | ||
"""Test the `aiida.engine.processes.calcjobs.manager.JobsList` class.""" | ||
|
||
def setUp(self): | ||
super(TestJobsList, self).setUp() | ||
self.loop = tornado.ioloop.IOLoop() | ||
self.transport_queue = TransportQueue(self.loop) | ||
self.user = User.objects.get_default() | ||
self.auth_info = AuthInfo(self.computer, self.user).store() | ||
self.jobs_list = JobsList(self.auth_info, self.transport_queue) | ||
|
||
def tearDown(self): | ||
super(TestJobsList, self).tearDown() | ||
AuthInfo.objects.delete(self.auth_info.pk) | ||
|
||
def test_get_minimum_update_interval(self): | ||
"""Test the `JobsList.get_minimum_update_interval` method.""" | ||
minimum_poll_interval = self.auth_info.computer.get_minimum_job_poll_interval() | ||
self.assertEqual(self.jobs_list.get_minimum_update_interval(), minimum_poll_interval) | ||
|
||
def test_last_updated(self): | ||
"""Test the `JobsList.last_updated` method.""" | ||
jobs_list = JobsList(self.auth_info, self.transport_queue) | ||
self.assertEqual(jobs_list.last_updated, None) | ||
|
||
last_updated = time.time() | ||
jobs_list = JobsList(self.auth_info, self.transport_queue, last_updated=last_updated) | ||
self.assertEqual(jobs_list.last_updated, last_updated) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.