-
-
Notifications
You must be signed in to change notification settings - Fork 436
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[13.0] Support multi-nodes with lock on jobrunner #256
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,35 @@ | |
* It does not run jobs itself, but asks Odoo to run them through an | ||
anonymous ``/queue_job/runjob`` HTTP request. [1]_ | ||
|
||
How does concurrent job runners work? | ||
------------------------------------- | ||
|
||
If several nodes (on different hosts or not) of job runners are started, | ||
a shared lock ensures that only one job runner works on a database at | ||
a time. These rules are to take in consideration: | ||
|
||
* The identifier of the shared lock is based on the database list provided, | ||
so either ``--database``/``db_name`` or all the databases in PostgreSQL. | ||
* When 2 job runners with the exact same list of databases are started, | ||
only the first one will work. The second one will wait and take over | ||
if the first one is stopped. | ||
|
||
Caveats: | ||
|
||
* If 2 job runners have a database in common but a different list (e.g. | ||
``db_name=project1,project2`` and ``db_name=project2,project3``), both job | ||
runners will work and listen to ``project2``, which will lead to unexpected | ||
behavior. | ||
* The same applies when no database is specified and all the cluster's databases | ||
are used. If a job runner is started on the cluster's databases, a new database | ||
is created and a second job runner is started, they'll both work on a same set | ||
of databases with unexpected behaviors. | ||
* PostgreSQL advisory locks are based on a integer, the list of database names | ||
is sorted, hashed and converted to an int64, so we lose information in the | ||
identifier. A low risk of collision is possible. If it happens some day, we | ||
should add an option for a custom lock identifier. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could it be a solution to try to lock the
instead of going for the integer lock? That could solve both above issues. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Long locks on a table will lead to vacuum issues (and probably replication as well) |
||
|
||
|
||
How to use it? | ||
-------------- | ||
|
||
|
@@ -134,6 +163,7 @@ | |
""" | ||
|
||
import datetime | ||
import hashlib | ||
import logging | ||
import os | ||
import select | ||
|
@@ -152,6 +182,8 @@ | |
from .channels import ENQUEUED, NOT_DONE, PENDING, ChannelManager | ||
|
||
SELECT_TIMEOUT = 60 | ||
TRY_ACQUIRE_INTERVAL = 30 # seconds | ||
SHARED_LOCK_KEEP_ALIVE = 60 # seconds | ||
ERROR_RECOVERY_DELAY = 5 | ||
|
||
_logger = logging.getLogger(__name__) | ||
|
@@ -251,7 +283,63 @@ def urlopen(): | |
thread.start() | ||
|
||
|
||
class Database(object): | ||
class SharedLockDatabase(object): | ||
def __init__(self, db_name, lock_name): | ||
self.db_name = db_name | ||
self.lock_ident = self.name_to_int64(lock_name) | ||
connection_info = _connection_info_for(db_name) | ||
self.conn = psycopg2.connect(**connection_info) | ||
self.acquired = False | ||
self._keep_alive_cursor = None | ||
self.try_acquire() | ||
|
||
@staticmethod | ||
def name_to_int64(lock_name): | ||
hasher = hashlib.sha256() | ||
hasher.update(lock_name.encode("utf-8")) | ||
# pg_try_advisory_lock is limited to an 8-byte (64bit) signed integer | ||
return int.from_bytes(hasher.digest()[:8], byteorder="big", signed=True) | ||
|
||
def try_acquire(self): | ||
self.acquired = self._acquire() | ||
if self.acquired: | ||
# we open a transaction that we will never commit; | ||
# at most every SHARED_LOCK_KEEP_ALIVE seconds we will | ||
# keep it alive with a simple SELECT 1 query; | ||
# if the process crashes or if the connection is cut, | ||
# the pg server will terminate self.conn after | ||
# 2*SHARED_LOCK_KEEP_ALIVE seconds, which will | ||
# free the advisory lock and let another worker take over | ||
self._keep_alive_cursor = self.conn.cursor() | ||
self._keep_alive_cursor.execute( | ||
"SET idle_in_transaction_session_timeout = %s;", | ||
(SHARED_LOCK_KEEP_ALIVE * 1000 * 2,), | ||
) | ||
|
||
def _acquire(self): | ||
with closing(self.conn.cursor()) as cr: | ||
# session level lock | ||
cr.execute("SELECT pg_try_advisory_lock(%s);", (self.lock_ident,)) | ||
acquired = cr.fetchone()[0] | ||
return acquired | ||
|
||
def keep_alive(self): | ||
query = "SELECT 1" | ||
self._keep_alive_cursor.execute(query) | ||
|
||
def close(self): | ||
# pylint: disable=except-pass | ||
# if close fail for any reason, it's either because it's already closed | ||
# and we don't care, or for any reason but anyway it will be closed on | ||
# del | ||
try: | ||
self.conn.close() | ||
except Exception: | ||
pass | ||
self.conn = None | ||
|
||
|
||
class QueueDatabase(object): | ||
def __init__(self, db_name): | ||
self.db_name = db_name | ||
connection_info = _connection_info_for(db_name) | ||
|
@@ -355,6 +443,11 @@ def __init__( | |
if channel_config_string is None: | ||
channel_config_string = _channels() | ||
self.channel_manager.simple_configure(channel_config_string) | ||
|
||
self.shared_lock_db = None | ||
# TODO: how to detect new databases or databases | ||
# on which queue_job is installed after server start? | ||
self.list_db_names = self.get_db_names() | ||
self.db_by_name = {} | ||
self._stop = False | ||
self._stop_pipe = os.pipe() | ||
|
@@ -404,11 +497,22 @@ def close_databases(self, remove_jobs=True): | |
db.close() | ||
except Exception: | ||
_logger.warning("error closing database %s", db_name, exc_info=True) | ||
|
||
self.db_by_name = {} | ||
|
||
if self.shared_lock_db: | ||
try: | ||
self.shared_lock_db.close() | ||
except Exception: | ||
_logger.warning( | ||
"error closing database %s", | ||
self.shared_lock_db.db_name, | ||
exc_info=True, | ||
) | ||
|
||
def initialize_databases(self): | ||
for db_name in self.get_db_names(): | ||
db = Database(db_name) | ||
for db_name in self.list_db_names: | ||
db = QueueDatabase(db_name) | ||
if db.has_queue_job: | ||
self.db_by_name[db_name] = db | ||
with db.select_jobs("state in %s", (NOT_DONE,)) as cr: | ||
|
@@ -484,6 +588,12 @@ def wait_notification(self): | |
for conn in conns: | ||
conn.poll() | ||
|
||
def keep_alive_shared_lock(self): | ||
self.shared_lock_db.keep_alive() | ||
|
||
def _lock_ident(self): | ||
return "qj:{}".format("-".join(sorted(self.list_db_names))) | ||
|
||
def stop(self): | ||
_logger.info("graceful stop requested") | ||
self._stop = True | ||
|
@@ -495,16 +605,42 @@ def run(self): | |
while not self._stop: | ||
# outer loop does exception recovery | ||
try: | ||
# When concurrent jobrunners are started, the first to win the | ||
# race acquires an advisory lock on PostgreSQL and gets to | ||
# work. When a jobrunner is stopped, the lock is released, and | ||
# another node can take over. | ||
self.shared_lock_db = SharedLockDatabase("postgres", self._lock_ident()) | ||
if not self.shared_lock_db.acquired: | ||
self.close_databases() | ||
_logger.info("already started on another node") | ||
# no database to work with... retry later in case a concurrent | ||
# node is stopped | ||
time.sleep(TRY_ACQUIRE_INTERVAL) | ||
continue | ||
|
||
_logger.info("initializing database connections") | ||
# TODO: how to detect new databases or databases | ||
# on which queue_job is installed after server start? | ||
self.initialize_databases() | ||
_logger.info("database connections ready") | ||
|
||
last_keep_alive = None | ||
|
||
# inner loop does the normal processing | ||
while not self._stop: | ||
self.process_notifications() | ||
self.run_jobs() | ||
self.wait_notification() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not 100% sure what the exact problem is that @amigrave has with locking vs. replication, but if it is the fact that the advisory lock is held for long - maybe a solution can be to always try to re-acquire the lock at the beginning of this loop, and let it go at the end? Then the lock will not be held so long. If, by chance, another instance of the job runner is 'first' to acquire the lock, it will just change the master job runner to that one, but there will still only be one running at a time. |
||
if ( | ||
not last_keep_alive | ||
or time.time() >= last_keep_alive + SHARED_LOCK_KEEP_ALIVE | ||
): | ||
last_keep_alive = time.time() | ||
# send a keepalive on the shared lock connection at | ||
# most every SHARED_LOCK_KEEP_ALIVE seconds | ||
self.keep_alive_shared_lock() | ||
# TODO here, when we have no "db_name", we could list again | ||
# the databases and if the list changed, try to acquire a new | ||
# lock | ||
|
||
except KeyboardInterrupt: | ||
self.stop() | ||
except InterruptedError: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps hold a lock for each of the database names separately?