Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions trytond/trytond/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import logging.config
import os
import threading
import datetime
import uwsgidecorators
from io import StringIO

__all__ = ['app', 'application']
Expand Down Expand Up @@ -36,11 +38,13 @@
Pool.start()
# TRYTOND_CONFIG it's managed by importing config
db_names = os.environ.get('TRYTOND_DATABASE_NAMES')
db_list = []
if db_names:
# Read with csv so database name can include special chars
reader = csv.reader(StringIO(db_names))
threads = []
for name in next(reader):
db_list.append(name)
thread = threading.Thread(target=lambda: Pool(name).init())
thread.start()
threads.append(thread)
Expand All @@ -65,3 +69,21 @@

Pool.app_initialization_completed()
assert len(threads := threading.enumerate()) == 1, f"len({threads}) != 1"


@uwsgidecorators.postfork
def preload():
from trytond.transaction import Transaction
from trytond.cache import Cache
pid = os.getpid()
for db_name in db_list:
if (pid, db_name) not in Cache._listener:
if not Cache._clean_last:
Cache._clean_last = datetime.date.min
with Transaction().start(db_name, 0, readonly=True):
# Starting a transaction will trigger `Cache.sync`, which
# should spawn a thread to listen for cache invalidation and
# pool refresh events
pass
if (pid, db_name) not in Cache._listener:
raise AssertionError
32 changes: 23 additions & 9 deletions trytond/trytond/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import threading
from collections import OrderedDict, defaultdict
from copy import deepcopy
from uuid import uuid4
from weakref import WeakKeyDictionary

from sql import Table
Expand Down Expand Up @@ -160,6 +161,8 @@ class MemoryCache(BaseCache):
_default_lower = Transaction.monotonic_time()
_listener = {}
_listener_lock = defaultdict(threading.Lock)
_listener_id = None
_listener_id_lock = threading.Lock()
_table = 'ir_cache'
_channel = _table

Expand Down Expand Up @@ -375,10 +378,15 @@ def refresh_pool(cls, transaction):
if not _clear_timeout and database.has_channel():
database = backend.Database(dbname)
conn = database.get_connection()
with cls._listener_id_lock:
if cls._listener_id:
process_id = cls._listener_id
else:
cls._listener_id = process_id = uuid4()
payload = json.dumps(('refresh_pool', str(process_id)))
try:
cursor = conn.cursor()
cursor.execute(
'NOTIFY "%s", %%s' % cls._channel, ('refresh pool',))
cursor.execute(f'NOTIFY "{cls._channel}", %s', (payload,))
conn.commit()
finally:
database.put_connection(conn)
Expand Down Expand Up @@ -417,13 +425,19 @@ def _listen(cls, dbname):
pool = Pool(dbname)
callbacks = pool._notification_callbacks.get(dbname, {})
notification = conn.notifies.pop()
if notification.payload == 'refresh pool':
Pool.refresh(dbname, _get_modules(cursor))
elif notification.payload in callbacks:
callbacks[notification.payload](pool)
elif notification.payload:
reset = json.loads(notification.payload)
for name in reset:
payload = json.loads(notification.payload)
if payload and payload[0] == 'refresh_pool':
with cls._listener_id_lock:
if cls._listener_id:
process_id = cls._listener_id
else:
cls._listener_id = process_id = uuid4()
if payload[1] != str(process_id):
Pool.refresh(dbname, _get_modules(cursor))
elif isinstance(payload, str) and payload in callbacks:
callbacks[payload](pool)
elif payload:
for name in payload:
# XUNG
# Name not in instances when control_vesion_upgrade
# table is locked because another process is
Expand Down
2 changes: 0 additions & 2 deletions trytond/trytond/ir/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from sql.operators import NotIn

from trytond.cache import Cache
from trytond.exceptions import UserError
from trytond.i18n import gettext
from trytond.model import ModelSQL, ModelView, Unique, fields, sequence_ordered
Expand Down Expand Up @@ -503,7 +502,6 @@ def transition_upgrade(self):
lang = [x.code for x in langs]
if update:
pool.init(update=update, lang=lang)
Cache.refresh_pool(transaction)
return 'done'

def transition_next_(self):
Expand Down
2 changes: 1 addition & 1 deletion trytond/trytond/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ def stop(cls, database_name):
'''
with cls._lock:
cls._pools.pop(database_name, None)
cls._pool_instances.clear()

@classmethod
def database_list(cls):
Expand Down Expand Up @@ -203,6 +202,7 @@ def init(self, update=None, lang=None, activatedeps=False, indexes=None):
self._pools[self.database_name] = self._pool
self._pool_modules[self.database_name] = self._modules
self._pool_instances.clear()
self._pool_instances.add(self)
if restart:
self.init()

Expand Down