diff --git a/trytond/trytond/application.py b/trytond/trytond/application.py index e28500e91bd..1f68a409cf4 100644 --- a/trytond/trytond/application.py +++ b/trytond/trytond/application.py @@ -5,6 +5,8 @@ import logging.config import os import threading +import datetime +import uwsgidecorators from io import StringIO __all__ = ['app', 'application'] @@ -36,11 +38,13 @@ Pool.start() # TRYTOND_CONFIG it's managed by importing config db_names = os.environ.get('TRYTOND_DATABASE_NAMES') +db_list = [] if db_names: # Read with csv so database name can include special chars reader = csv.reader(StringIO(db_names)) threads = [] for name in next(reader): + db_list.append(name) thread = threading.Thread(target=lambda: Pool(name).init()) thread.start() threads.append(thread) @@ -65,3 +69,21 @@ Pool.app_initialization_completed() assert len(threads := threading.enumerate()) == 1, f"len({threads}) != 1" + + +@uwsgidecorators.postfork +def preload(): + from trytond.transaction import Transaction + from trytond.cache import Cache + pid = os.getpid() + for db_name in db_list: + if (pid, db_name) not in Cache._listener: + if not Cache._clean_last: + Cache._clean_last = datetime.date.min + with Transaction().start(db_name, 0, readonly=True): + # Starting a transaction will trigger `Cache.sync`, which + # should spawn a thread to listen for cache invalidation and + # pool refresh events + pass + if (pid, db_name) not in Cache._listener: + raise AssertionError diff --git a/trytond/trytond/cache.py b/trytond/trytond/cache.py index 744dea10033..a2cd69e8a65 100644 --- a/trytond/trytond/cache.py +++ b/trytond/trytond/cache.py @@ -8,6 +8,7 @@ import threading from collections import OrderedDict, defaultdict from copy import deepcopy +from uuid import uuid4 from weakref import WeakKeyDictionary from sql import Table @@ -160,6 +161,8 @@ class MemoryCache(BaseCache): _default_lower = Transaction.monotonic_time() _listener = {} _listener_lock = defaultdict(threading.Lock) + _listener_id = None + _listener_id_lock = threading.Lock() _table = 'ir_cache' _channel = _table @@ -375,10 +378,15 @@ def refresh_pool(cls, transaction): if not _clear_timeout and database.has_channel(): database = backend.Database(dbname) conn = database.get_connection() + with cls._listener_id_lock: + if cls._listener_id: + process_id = cls._listener_id + else: + cls._listener_id = process_id = uuid4() + payload = json.dumps(('refresh_pool', str(process_id))) try: cursor = conn.cursor() - cursor.execute( - 'NOTIFY "%s", %%s' % cls._channel, ('refresh pool',)) + cursor.execute(f'NOTIFY "{cls._channel}", %s', (payload,)) conn.commit() finally: database.put_connection(conn) @@ -417,13 +425,19 @@ def _listen(cls, dbname): pool = Pool(dbname) callbacks = pool._notification_callbacks.get(dbname, {}) notification = conn.notifies.pop() - if notification.payload == 'refresh pool': - Pool.refresh(dbname, _get_modules(cursor)) - elif notification.payload in callbacks: - callbacks[notification.payload](pool) - elif notification.payload: - reset = json.loads(notification.payload) - for name in reset: + payload = json.loads(notification.payload) + if payload and payload[0] == 'refresh_pool': + with cls._listener_id_lock: + if cls._listener_id: + process_id = cls._listener_id + else: + cls._listener_id = process_id = uuid4() + if payload[1] != str(process_id): + Pool.refresh(dbname, _get_modules(cursor)) + elif isinstance(payload, str) and payload in callbacks: + callbacks[payload](pool) + elif payload: + for name in payload: # XUNG # Name not in instances when control_vesion_upgrade # table is locked because another process is diff --git a/trytond/trytond/ir/module.py b/trytond/trytond/ir/module.py index 6f4a1b15399..d04add57ead 100644 --- a/trytond/trytond/ir/module.py +++ b/trytond/trytond/ir/module.py @@ -5,7 +5,6 @@ from sql.operators import NotIn -from trytond.cache import Cache from trytond.exceptions import UserError from trytond.i18n import gettext from trytond.model import ModelSQL, ModelView, Unique, fields, sequence_ordered @@ -503,7 +502,6 @@ def transition_upgrade(self): lang = [x.code for x in langs] if update: pool.init(update=update, lang=lang) - Cache.refresh_pool(transaction) return 'done' def transition_next_(self): diff --git a/trytond/trytond/pool.py b/trytond/trytond/pool.py index abc9542569b..dbb55e0cea0 100644 --- a/trytond/trytond/pool.py +++ b/trytond/trytond/pool.py @@ -165,7 +165,6 @@ def stop(cls, database_name): ''' with cls._lock: cls._pools.pop(database_name, None) - cls._pool_instances.clear() @classmethod def database_list(cls): @@ -203,6 +202,7 @@ def init(self, update=None, lang=None, activatedeps=False, indexes=None): self._pools[self.database_name] = self._pool self._pool_modules[self.database_name] = self._modules self._pool_instances.clear() + self._pool_instances.add(self) if restart: self.init()