From ea6c5871e2381a257758b684d2e33fa8ef91ff26 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 26 Aug 2025 17:56:26 -0500 Subject: [PATCH 01/33] Remove sentinel context from `start_reactor` (`daemonize_process`/`run`) Relevant comment introductions: - https://github.com/matrix-org/synapse/pull/5609 - https://github.com/element-hq/synapse/commit/067b00d49d25d6994e851fc362328a164ace85b2 --- synapse/app/_base.py | 27 +++++++++------------------ synapse/util/daemonize.py | 14 ++++++++++++++ 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 48989540bb2..348d13d26a1 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -72,7 +72,6 @@ from synapse.events.presence_router import load_legacy_presence_router from synapse.handlers.auth import load_legacy_password_auth_providers from synapse.http.site import SynapseSite -from synapse.logging.context import PreserveLoggingContext from synapse.logging.opentracing import init_tracer from synapse.metrics import install_gc_manager, register_threadpool from synapse.metrics.background_process_metrics import run_as_background_process @@ -185,23 +184,15 @@ def run() -> None: install_gc_manager() run_command() - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - # - # We also need to drop the logcontext before forking if we're daemonizing, - # otherwise the cputime metrics get confused about the per-thread resource usage - # appearing to go backwards. - with PreserveLoggingContext(): - if daemonize: - assert pid_file is not None - - if print_pidfile: - print(pid_file) - - daemonize_process(pid_file, logger) - run() + if daemonize: + assert pid_file is not None + + if print_pidfile: + print(pid_file) + + daemonize_process(pid_file, logger) + + run() def quit_with_error(error_string: str) -> NoReturn: diff --git a/synapse/util/daemonize.py b/synapse/util/daemonize.py index 9fdefc5a763..bb4d35182c0 100644 --- a/synapse/util/daemonize.py +++ b/synapse/util/daemonize.py @@ -29,6 +29,11 @@ from types import FrameType, TracebackType from typing import NoReturn, Optional, Type +from synapse.logging.context import ( + SENTINEL_CONTEXT, + set_current_context, +) + def daemonize_process(pid_file: str, logger: logging.Logger, chdir: str = "/") -> None: """daemonize the current process @@ -64,9 +69,18 @@ def daemonize_process(pid_file: str, logger: logging.Logger, chdir: str = "/") - pid_fh.write(old_pid) sys.exit(1) + # Stop the existing context *before* we fork the process. Otherwise the cputime + # metrics get confused about the per-thread resource usage appearing to go backwards + # because we're comparing the resource usage from the original process to the forked + # process. + calling_context = set_current_context(SENTINEL_CONTEXT) + # Fork, creating a new process for the child. process_id = os.fork() + # Restart the logging context *after* forking + set_current_context(calling_context) + if process_id != 0: # parent process: exit. From bf21b4ab8b2bd1458011e0dc8437873358a9cebb Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 26 Aug 2025 21:22:59 -0500 Subject: [PATCH 02/33] Simplify stop/start context pattern in `daemonize_process` --- synapse/util/daemonize.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/synapse/util/daemonize.py b/synapse/util/daemonize.py index bb4d35182c0..ddf98d37a8d 100644 --- a/synapse/util/daemonize.py +++ b/synapse/util/daemonize.py @@ -31,7 +31,7 @@ from synapse.logging.context import ( SENTINEL_CONTEXT, - set_current_context, + PreserveLoggingContext, ) @@ -72,14 +72,11 @@ def daemonize_process(pid_file: str, logger: logging.Logger, chdir: str = "/") - # Stop the existing context *before* we fork the process. Otherwise the cputime # metrics get confused about the per-thread resource usage appearing to go backwards # because we're comparing the resource usage from the original process to the forked - # process. - calling_context = set_current_context(SENTINEL_CONTEXT) - - # Fork, creating a new process for the child. - process_id = os.fork() - - # Restart the logging context *after* forking - set_current_context(calling_context) + # process. `PreserveLoggingContext` already takes care of restarting the original + # context *after* the block. + with PreserveLoggingContext(SENTINEL_CONTEXT): + # Fork, creating a new process for the child. + process_id = os.fork() if process_id != 0: # parent process: exit. From a677d9b3191c6d9b55d9913fef1df2c2bf43afcb Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 26 Aug 2025 18:44:00 -0500 Subject: [PATCH 03/33] Remove sentinel context from `atexit` daemonize logs ``` 2025-08-26 18:40:27,996 - my.synapse.linux.server - synapse.app.homeserver - 187 - WARNING - main - Starting daemon. 2025-08-26 18:40:27,996 - my.synapse.linux.server - synapse.app.homeserver - 181 - WARNING - atexit - Stopping daemon. ``` --- synapse/util/daemonize.py | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/synapse/util/daemonize.py b/synapse/util/daemonize.py index ddf98d37a8d..21ad8c6ce1f 100644 --- a/synapse/util/daemonize.py +++ b/synapse/util/daemonize.py @@ -31,6 +31,7 @@ from synapse.logging.context import ( SENTINEL_CONTEXT, + LoggingContext, PreserveLoggingContext, ) @@ -149,11 +150,33 @@ def sigterm(signum: int, frame: Optional[FrameType]) -> NoReturn: signal.signal(signal.SIGTERM, sigterm) + # Create a logging context that we can use later as these `atexit` handlers will run + # after the `with LoggingContext("main")` context manager finishes and we still want + # some context here to know which server is logging. + # + # We're using `PreserveLoggingContext(SENTINEL_CONTEXT)` so our new `LoggingContext` + # ends up with `LoggingContext.previous_context = SENTINEL_CONTEXT` so that when the + # `LoggingContext` exits and restores the previous context, we don't leak some + # context into the reactor that would be erroneously be picked up by something else + # down the line. + with PreserveLoggingContext(SENTINEL_CONTEXT): + exit_logging_context = LoggingContext( + "atexit", + # TODO: In the future, we will want + # `server_name=calling_context.server_name` so we know which server this log + # pertains to, https://github.com/element-hq/synapse/pull/18868 + # + # No parent_context as we don't want to attribute the metrics/traces to the + # calling context. `atexit` is completely out-of-band from our application + # so it doesn't make sense to associate it back. + ) + # Cleanup pid file at exit. def exit() -> None: - logger.warning("Stopping daemon.") - os.remove(pid_file) - sys.exit(0) + with PreserveLoggingContext(exit_logging_context): + logger.warning("Stopping daemon.") + os.remove(pid_file) + sys.exit(0) atexit.register(exit) From fce59b2284283a1b8a0fdbf9cdbf793515bcd736 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 27 Aug 2025 01:56:07 -0500 Subject: [PATCH 04/33] DEBUG: Logging contexts getting lost Running a normal server (`daemonize: false`): ``` poetry run synapse_homeserver --config-path homeserver.yaml ``` Bad logs being seen: ``` PreserveLoggingContext: Expected logging context sentinel but found main ``` ``` LoggingContext: Expected logging context main was lost ``` --- synapse/app/_base.py | 5 +++ synapse/app/homeserver.py | 18 ++++---- synapse/handlers/presence.py | 16 ++++---- synapse/logging/context.py | 41 +++++++++++++++++-- synapse/metrics/background_process_metrics.py | 6 +++ synapse/server.py | 4 +- synapse/storage/database.py | 33 +++++++++++---- synapse/storage/databases/main/cache.py | 25 +++++++---- synapse/storage/databases/main/lock.py | 10 ++--- synapse/util/__init__.py | 9 +++- 10 files changed, 122 insertions(+), 45 deletions(-) diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 348d13d26a1..78660f9f33c 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -72,6 +72,7 @@ from synapse.events.presence_router import load_legacy_presence_router from synapse.handlers.auth import load_legacy_password_auth_providers from synapse.http.site import SynapseSite +from synapse.logging.context import PreserveLoggingContext, current_context from synapse.logging.opentracing import init_tracer from synapse.metrics import install_gc_manager, register_threadpool from synapse.metrics.background_process_metrics import run_as_background_process @@ -182,7 +183,9 @@ def run() -> None: if gc_thresholds: gc.set_threshold(*gc_thresholds) install_gc_manager() + logger.info("run-asdf1") run_command() + logger.info("run-asdf2") if daemonize: assert pid_file is not None @@ -192,7 +195,9 @@ def run() -> None: daemonize_process(pid_file, logger) + logger.info("asdf1") run() + logger.info("asdf2") def quit_with_error(error_string: str) -> NoReturn: diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index e027b5eaea2..7725909cb3b 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -376,18 +376,18 @@ def setup(config_options: List[str]) -> SynapseHomeServer: except Exception as e: handle_startup_exception(e) - async def start() -> None: - # Load the OIDC provider metadatas, if OIDC is enabled. - if hs.config.oidc.oidc_enabled: - oidc = hs.get_oidc_handler() - # Loading the provider metadata also ensures the provider config is valid. - await oidc.load_metadata() + # async def start() -> None: + # # Load the OIDC provider metadatas, if OIDC is enabled. + # if hs.config.oidc.oidc_enabled: + # oidc = hs.get_oidc_handler() + # # Loading the provider metadata also ensures the provider config is valid. + # await oidc.load_metadata() - await _base.start(hs) + # await _base.start(hs) - hs.get_datastores().main.db_pool.updates.start_doing_background_updates() + # hs.get_datastores().main.db_pool.updates.start_doing_background_updates() - register_start(start) + # register_start(start) return hs diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index b253117498a..2b72ab6fa58 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -529,14 +529,14 @@ def __init__(self, hs: "HomeServer"): self.send_stop_syncing, UPDATE_SYNCING_USERS_MS ) - hs.get_reactor().addSystemEventTrigger( - "before", - "shutdown", - run_as_background_process, - "generic_presence.on_shutdown", - self.server_name, - self._on_shutdown, - ) + # hs.get_reactor().addSystemEventTrigger( + # "before", + # "shutdown", + # run_as_background_process, + # "generic_presence.on_shutdown", + # self.server_name, + # self._on_shutdown, + # ) async def _on_shutdown(self) -> None: if self._track_presence: diff --git a/synapse/logging/context.py b/synapse/logging/context.py index 3ef97f23c96..6e21ea82018 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -411,10 +411,13 @@ def __exit__( current = set_current_context(self.previous_context) if current is not self: if current is SENTINEL_CONTEXT: - logcontext_error("Expected logging context %s was lost" % (self,)) + logcontext_error( + "LoggingContext: Expected logging context %s was lost" % (self,) + ) else: logcontext_error( - "Expected logging context %s but found %s" % (self, current) + "LoggingContext: Expected logging context %s but found %s" + % (self, current) ) # the fact that we are here suggests that the caller thinks that everything @@ -636,6 +639,16 @@ def __init__( def __enter__(self) -> None: self._old_context = set_current_context(self._new_context) + import traceback + + logger.info( + "asdf PreserveLoggingContext enter new=%s old=%s %s", + self._new_context, + self._old_context, + traceback.format_stack()[-2].replace( + "/home/eric/Documents/github/element/synapse/", "" + ), + ) def __exit__( self, @@ -643,16 +656,27 @@ def __exit__( value: Optional[BaseException], traceback: Optional[TracebackType], ) -> None: + import traceback + + logger.info( + "asdf PreserveLoggingContext exit new=%s old=%s %s", + self._new_context, + self._old_context, + traceback.format_stack()[-2].replace( + "/home/eric/Documents/github/element/synapse/", "" + ), + ) context = set_current_context(self._old_context) if context != self._new_context: if not context: logcontext_error( - "Expected logging context %s was lost" % (self._new_context,) + "PreserveLoggingContext: Expected logging context %s was lost" + % (self._new_context,) ) else: logcontext_error( - "Expected logging context %s but found %s" + "PreserveLoggingContext: Expected logging context %s but found %s" % ( self._new_context, context, @@ -684,6 +708,15 @@ def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSe current = current_context() + # import traceback + + logger.info( + "asdf set_current_context new=%s old=%s", + context, + current, + # traceback.format_stack(), + ) + if current is not context: rusage = get_thread_resource_usage() current.stop(rusage) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index f7f2d88885e..601201746ec 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -260,6 +260,12 @@ async def run() -> Optional[R]: with BackgroundProcessLoggingContext( name=desc, server_name=server_name, instance_id=count ) as context: + logger.info( + "asdf run_as_background_process %s %s previous_context=%s", + desc, + context, + context.previous_context, + ) try: if bg_start_span: ctx = start_active_span( diff --git a/synapse/server.py b/synapse/server.py index bf82f79bec9..9aca5d81b39 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -366,8 +366,8 @@ def setup(self) -> None: # Register background tasks required by this server. This must be done # somewhat manually due to the background tasks not being registered # unless handlers are instantiated. - if self.config.worker.run_background_tasks: - self.setup_background_tasks() + # if self.config.worker.run_background_tasks: + # self.setup_background_tasks() def start_listening(self) -> None: # noqa: B027 (no-op by design) """Start the HTTP, manhole, metrics, etc listeners diff --git a/synapse/storage/database.py b/synapse/storage/database.py index f7aec16c969..807460ea582 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -52,6 +52,7 @@ from twisted.enterprise import adbapi from twisted.internet.interfaces import IReactorCore +from twisted.internet import defer from synapse.api.errors import StoreError from synapse.config.database import DatabaseConnectionConfig @@ -642,13 +643,31 @@ def __init__( # Check ASAP (and then later, every 1s) to see if we have finished # background updates of tables that aren't safe to update. - self._clock.call_later( - 0.0, - run_as_background_process, - "upsert_safety_check", - self.server_name, - self._check_safe_to_upsert, - ) + # self._clock.call_later( + # 0.0, + # run_as_background_process, + # "upsert_safety_check", + # self.server_name, + # self._check_safe_to_upsert, + # ) + + # self._clock.call_later( + # 0.0, + # run_as_background_process, + # "asdf_call_later", + # self.server_name, + # self.asdf, + # ) + run_as_background_process("asdf_call_later", self.server_name, self.asdf) + run_as_background_process("qwer_call_later", self.server_name, self.qwer) + + async def asdf(self) -> None: + # Await some dummy value + await self._clock.sleep(1.0) + + async def qwer(self) -> None: + # Await some dummy value + await self._clock.sleep(1.0) def name(self) -> str: "Return the name of this database" diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 77949268125..e1bf2f3505b 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -26,7 +26,10 @@ from synapse.api.constants import EventTypes from synapse.config._base import Config -from synapse.metrics.background_process_metrics import wrap_as_background_process +from synapse.metrics.background_process_metrics import ( + wrap_as_background_process, + run_as_background_process, +) from synapse.replication.tcp.streams import BackfillStream, CachesStream from synapse.replication.tcp.streams.events import ( EventsStream, @@ -128,13 +131,19 @@ def __init__( # old rows. # This is only applicable when Postgres is in use; this table is unused # and not populated at all when SQLite is the active database engine. - if hs.config.worker.run_background_tasks and isinstance( - self.database_engine, PostgresEngine - ): - self.hs.get_clock().call_later( - CATCH_UP_CLEANUP_INTERVAL_MS / 1000, - self._clean_up_cache_invalidation_wrapper, - ) + # if hs.config.worker.run_background_tasks and isinstance( + # self.database_engine, PostgresEngine + # ): + # self.hs.get_clock().call_later( + # CATCH_UP_CLEANUP_INTERVAL_MS / 1000, + # self._clean_up_cache_invalidation_wrapper, + # ) + + # run_as_background_process("qwer_call_later", self.server_name, self.qwer) + + # async def qwer(self) -> None: + # # Await some dummy value + # await self._clock.sleep(1.0) async def get_all_updated_caches( self, instance_name: str, last_id: int, current_id: int, limit: int diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py index e733f65cb1e..d683cbc6718 100644 --- a/synapse/storage/databases/main/lock.py +++ b/synapse/storage/databases/main/lock.py @@ -99,11 +99,11 @@ def __init__( # lead to a race, as we may drop the lock while we are still processing. # However, a) it should be a small window, b) the lock is best effort # anyway and c) we want to really avoid leaking locks when we restart. - hs.get_reactor().addSystemEventTrigger( - "before", - "shutdown", - self._on_shutdown, - ) + # hs.get_reactor().addSystemEventTrigger( + # "before", + # "shutdown", + # self._on_shutdown, + # ) self._acquiring_locks: Set[Tuple[str, str]] = set() diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 36129c3a67b..8e76e82a127 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -124,6 +124,7 @@ async def sleep(self, seconds: float) -> None: with context.PreserveLoggingContext(): self._reactor.callLater(seconds, d.callback, seconds) await d + logger.info("asdf sleep done") def time(self) -> float: """Returns the current system time in seconds since epoch.""" @@ -158,6 +159,8 @@ def looping_call( *args: Positional arguments to pass to function. **kwargs: Key arguments to pass to function. """ + # TODO: remove + return return self._looping_call_common(f, msec, False, *args, **kwargs) def looping_call_now( @@ -213,12 +216,14 @@ def call_later( **kwargs: Key arguments to pass to function. """ - def wrapped_callback(*args: Any, **kwargs: Any) -> None: + def wrapped_call_later_callback(*args: Any, **kwargs: Any) -> None: with context.PreserveLoggingContext(): callback(*args, **kwargs) with context.PreserveLoggingContext(): - return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs) + return self._reactor.callLater( + delay, wrapped_call_later_callback, *args, **kwargs + ) def cancel_call_later(self, timer: IDelayedCall, ignore_errs: bool = False) -> None: try: From af1944bec45e977a946eda271f7c2197df8c4b99 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 27 Aug 2025 02:12:24 -0500 Subject: [PATCH 05/33] Revert "DEBUG: Logging contexts getting lost" This reverts commit fce59b2284283a1b8a0fdbf9cdbf793515bcd736. --- synapse/app/_base.py | 5 --- synapse/app/homeserver.py | 18 ++++---- synapse/handlers/presence.py | 16 ++++---- synapse/logging/context.py | 41 ++----------------- synapse/metrics/background_process_metrics.py | 6 --- synapse/server.py | 4 +- synapse/storage/database.py | 33 ++++----------- synapse/storage/databases/main/cache.py | 25 ++++------- synapse/storage/databases/main/lock.py | 10 ++--- synapse/util/__init__.py | 9 +--- 10 files changed, 45 insertions(+), 122 deletions(-) diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 78660f9f33c..348d13d26a1 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -72,7 +72,6 @@ from synapse.events.presence_router import load_legacy_presence_router from synapse.handlers.auth import load_legacy_password_auth_providers from synapse.http.site import SynapseSite -from synapse.logging.context import PreserveLoggingContext, current_context from synapse.logging.opentracing import init_tracer from synapse.metrics import install_gc_manager, register_threadpool from synapse.metrics.background_process_metrics import run_as_background_process @@ -183,9 +182,7 @@ def run() -> None: if gc_thresholds: gc.set_threshold(*gc_thresholds) install_gc_manager() - logger.info("run-asdf1") run_command() - logger.info("run-asdf2") if daemonize: assert pid_file is not None @@ -195,9 +192,7 @@ def run() -> None: daemonize_process(pid_file, logger) - logger.info("asdf1") run() - logger.info("asdf2") def quit_with_error(error_string: str) -> NoReturn: diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 7725909cb3b..e027b5eaea2 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -376,18 +376,18 @@ def setup(config_options: List[str]) -> SynapseHomeServer: except Exception as e: handle_startup_exception(e) - # async def start() -> None: - # # Load the OIDC provider metadatas, if OIDC is enabled. - # if hs.config.oidc.oidc_enabled: - # oidc = hs.get_oidc_handler() - # # Loading the provider metadata also ensures the provider config is valid. - # await oidc.load_metadata() + async def start() -> None: + # Load the OIDC provider metadatas, if OIDC is enabled. + if hs.config.oidc.oidc_enabled: + oidc = hs.get_oidc_handler() + # Loading the provider metadata also ensures the provider config is valid. + await oidc.load_metadata() - # await _base.start(hs) + await _base.start(hs) - # hs.get_datastores().main.db_pool.updates.start_doing_background_updates() + hs.get_datastores().main.db_pool.updates.start_doing_background_updates() - # register_start(start) + register_start(start) return hs diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 2b72ab6fa58..b253117498a 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -529,14 +529,14 @@ def __init__(self, hs: "HomeServer"): self.send_stop_syncing, UPDATE_SYNCING_USERS_MS ) - # hs.get_reactor().addSystemEventTrigger( - # "before", - # "shutdown", - # run_as_background_process, - # "generic_presence.on_shutdown", - # self.server_name, - # self._on_shutdown, - # ) + hs.get_reactor().addSystemEventTrigger( + "before", + "shutdown", + run_as_background_process, + "generic_presence.on_shutdown", + self.server_name, + self._on_shutdown, + ) async def _on_shutdown(self) -> None: if self._track_presence: diff --git a/synapse/logging/context.py b/synapse/logging/context.py index 6e21ea82018..3ef97f23c96 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -411,13 +411,10 @@ def __exit__( current = set_current_context(self.previous_context) if current is not self: if current is SENTINEL_CONTEXT: - logcontext_error( - "LoggingContext: Expected logging context %s was lost" % (self,) - ) + logcontext_error("Expected logging context %s was lost" % (self,)) else: logcontext_error( - "LoggingContext: Expected logging context %s but found %s" - % (self, current) + "Expected logging context %s but found %s" % (self, current) ) # the fact that we are here suggests that the caller thinks that everything @@ -639,16 +636,6 @@ def __init__( def __enter__(self) -> None: self._old_context = set_current_context(self._new_context) - import traceback - - logger.info( - "asdf PreserveLoggingContext enter new=%s old=%s %s", - self._new_context, - self._old_context, - traceback.format_stack()[-2].replace( - "/home/eric/Documents/github/element/synapse/", "" - ), - ) def __exit__( self, @@ -656,27 +643,16 @@ def __exit__( value: Optional[BaseException], traceback: Optional[TracebackType], ) -> None: - import traceback - - logger.info( - "asdf PreserveLoggingContext exit new=%s old=%s %s", - self._new_context, - self._old_context, - traceback.format_stack()[-2].replace( - "/home/eric/Documents/github/element/synapse/", "" - ), - ) context = set_current_context(self._old_context) if context != self._new_context: if not context: logcontext_error( - "PreserveLoggingContext: Expected logging context %s was lost" - % (self._new_context,) + "Expected logging context %s was lost" % (self._new_context,) ) else: logcontext_error( - "PreserveLoggingContext: Expected logging context %s but found %s" + "Expected logging context %s but found %s" % ( self._new_context, context, @@ -708,15 +684,6 @@ def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSe current = current_context() - # import traceback - - logger.info( - "asdf set_current_context new=%s old=%s", - context, - current, - # traceback.format_stack(), - ) - if current is not context: rusage = get_thread_resource_usage() current.stop(rusage) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 601201746ec..f7f2d88885e 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -260,12 +260,6 @@ async def run() -> Optional[R]: with BackgroundProcessLoggingContext( name=desc, server_name=server_name, instance_id=count ) as context: - logger.info( - "asdf run_as_background_process %s %s previous_context=%s", - desc, - context, - context.previous_context, - ) try: if bg_start_span: ctx = start_active_span( diff --git a/synapse/server.py b/synapse/server.py index 9aca5d81b39..bf82f79bec9 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -366,8 +366,8 @@ def setup(self) -> None: # Register background tasks required by this server. This must be done # somewhat manually due to the background tasks not being registered # unless handlers are instantiated. - # if self.config.worker.run_background_tasks: - # self.setup_background_tasks() + if self.config.worker.run_background_tasks: + self.setup_background_tasks() def start_listening(self) -> None: # noqa: B027 (no-op by design) """Start the HTTP, manhole, metrics, etc listeners diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 807460ea582..f7aec16c969 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -52,7 +52,6 @@ from twisted.enterprise import adbapi from twisted.internet.interfaces import IReactorCore -from twisted.internet import defer from synapse.api.errors import StoreError from synapse.config.database import DatabaseConnectionConfig @@ -643,31 +642,13 @@ def __init__( # Check ASAP (and then later, every 1s) to see if we have finished # background updates of tables that aren't safe to update. - # self._clock.call_later( - # 0.0, - # run_as_background_process, - # "upsert_safety_check", - # self.server_name, - # self._check_safe_to_upsert, - # ) - - # self._clock.call_later( - # 0.0, - # run_as_background_process, - # "asdf_call_later", - # self.server_name, - # self.asdf, - # ) - run_as_background_process("asdf_call_later", self.server_name, self.asdf) - run_as_background_process("qwer_call_later", self.server_name, self.qwer) - - async def asdf(self) -> None: - # Await some dummy value - await self._clock.sleep(1.0) - - async def qwer(self) -> None: - # Await some dummy value - await self._clock.sleep(1.0) + self._clock.call_later( + 0.0, + run_as_background_process, + "upsert_safety_check", + self.server_name, + self._check_safe_to_upsert, + ) def name(self) -> str: "Return the name of this database" diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index e1bf2f3505b..77949268125 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -26,10 +26,7 @@ from synapse.api.constants import EventTypes from synapse.config._base import Config -from synapse.metrics.background_process_metrics import ( - wrap_as_background_process, - run_as_background_process, -) +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.replication.tcp.streams import BackfillStream, CachesStream from synapse.replication.tcp.streams.events import ( EventsStream, @@ -131,19 +128,13 @@ def __init__( # old rows. # This is only applicable when Postgres is in use; this table is unused # and not populated at all when SQLite is the active database engine. - # if hs.config.worker.run_background_tasks and isinstance( - # self.database_engine, PostgresEngine - # ): - # self.hs.get_clock().call_later( - # CATCH_UP_CLEANUP_INTERVAL_MS / 1000, - # self._clean_up_cache_invalidation_wrapper, - # ) - - # run_as_background_process("qwer_call_later", self.server_name, self.qwer) - - # async def qwer(self) -> None: - # # Await some dummy value - # await self._clock.sleep(1.0) + if hs.config.worker.run_background_tasks and isinstance( + self.database_engine, PostgresEngine + ): + self.hs.get_clock().call_later( + CATCH_UP_CLEANUP_INTERVAL_MS / 1000, + self._clean_up_cache_invalidation_wrapper, + ) async def get_all_updated_caches( self, instance_name: str, last_id: int, current_id: int, limit: int diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py index d683cbc6718..e733f65cb1e 100644 --- a/synapse/storage/databases/main/lock.py +++ b/synapse/storage/databases/main/lock.py @@ -99,11 +99,11 @@ def __init__( # lead to a race, as we may drop the lock while we are still processing. # However, a) it should be a small window, b) the lock is best effort # anyway and c) we want to really avoid leaking locks when we restart. - # hs.get_reactor().addSystemEventTrigger( - # "before", - # "shutdown", - # self._on_shutdown, - # ) + hs.get_reactor().addSystemEventTrigger( + "before", + "shutdown", + self._on_shutdown, + ) self._acquiring_locks: Set[Tuple[str, str]] = set() diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 8e76e82a127..36129c3a67b 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -124,7 +124,6 @@ async def sleep(self, seconds: float) -> None: with context.PreserveLoggingContext(): self._reactor.callLater(seconds, d.callback, seconds) await d - logger.info("asdf sleep done") def time(self) -> float: """Returns the current system time in seconds since epoch.""" @@ -159,8 +158,6 @@ def looping_call( *args: Positional arguments to pass to function. **kwargs: Key arguments to pass to function. """ - # TODO: remove - return return self._looping_call_common(f, msec, False, *args, **kwargs) def looping_call_now( @@ -216,14 +213,12 @@ def call_later( **kwargs: Key arguments to pass to function. """ - def wrapped_call_later_callback(*args: Any, **kwargs: Any) -> None: + def wrapped_callback(*args: Any, **kwargs: Any) -> None: with context.PreserveLoggingContext(): callback(*args, **kwargs) with context.PreserveLoggingContext(): - return self._reactor.callLater( - delay, wrapped_call_later_callback, *args, **kwargs - ) + return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs) def cancel_call_later(self, timer: IDelayedCall, ignore_errs: bool = False) -> None: try: From 675d94a0ec09b66121cae28cf45b45663604ed59 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 27 Aug 2025 02:17:24 -0500 Subject: [PATCH 06/33] Fix `run_as_background_process` not following Synapse log context rules Resulting in bad logs being seen: ``` PreserveLoggingContext: Expected logging context sentinel but found main ``` ``` LoggingContext: Expected logging context main was lost ``` --- synapse/metrics/background_process_metrics.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index f7f2d88885e..ffa69e82103 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -47,7 +47,7 @@ from synapse.logging.context import ( ContextResourceUsage, LoggingContext, - PreserveLoggingContext, + make_deferred_yieldable, ) from synapse.logging.opentracing import SynapseTags, start_active_span from synapse.metrics import SERVER_NAME_LABEL @@ -223,10 +223,9 @@ def run_as_background_process( This should be used to wrap processes which are fired off to run in the background, instead of being associated with a particular request. - It returns a Deferred which completes when the function completes, but it doesn't - follow the synapse logcontext rules, which makes it appropriate for passing to - clock.looping_call and friends (or for firing-and-forgetting in the middle of a - normal synapse async function). + It returns a Deferred which completes when the function completes, which makes it + appropriate for passing to clock.looping_call and friends (or for + firing-and-forgetting in the middle of a normal synapse async function). Args: desc: a description for this background process type @@ -241,8 +240,6 @@ def run_as_background_process( Returns: Deferred which returns the result of func, or `None` if func raises. - Note that the returned Deferred does not follow the synapse logcontext - rules. """ async def run() -> Optional[R]: @@ -280,10 +277,9 @@ async def run() -> Optional[R]: name=desc, **{SERVER_NAME_LABEL: server_name} ).dec() - with PreserveLoggingContext(): # Note that we return a Deferred here so that it can be used in a # looping_call and other places that expect a Deferred. - return defer.ensureDeferred(run()) + return make_deferred_yieldable(defer.ensureDeferred(run())) P = ParamSpec("P") From 7938e8cef4041cb754f8b21088e04c4ea1934994 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 27 Aug 2025 19:50:33 -0500 Subject: [PATCH 07/33] DEBUG: lost `main` context after request Reproduction instructions: 1. `poetry run synapse_homeserver --config-path homeserver.yaml` 1. `curl http://localhost:8008/_matrix/client/versions` 1. Stop Synapse (`Ctrl + c`) Notice the bad log: ``` synapse.logging.context - WARNING - sentinel - LoggingContext: Expected logging context main was lost ``` --- synapse/app/homeserver.py | 1 + synapse/http/site.py | 7 ++++++ synapse/logging/context.py | 49 ++++++++++++++++++++++++++++++++++---- 3 files changed, 53 insertions(+), 4 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index e027b5eaea2..019641aaa82 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -415,6 +415,7 @@ def main() -> None: redirect_stdio_to_logs() run(hs) + logger.info("asdf outside main") if __name__ == "__main__": diff --git a/synapse/http/site.py b/synapse/http/site.py index 55088fc190e..4cd7d0a59d1 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -299,6 +299,7 @@ def get_authenticated_entity(self) -> Tuple[Optional[str], Optional[str]]: return None, None def render(self, resrc: Resource) -> None: + logger.info("asdf->SynapseRequest.render") # this is called once a Resource has been found to serve the request; in our # case the Resource in question will normally be a JsonResource. @@ -319,6 +320,11 @@ def render(self, resrc: Resource) -> None: user_agent=get_request_user_agent(self), ), ) + logger.info( + "asdf request logcontext=%s logcontext.previous_context=%s", + self.logcontext, + self.logcontext.previous_context, + ) # override the Server header which is set by twisted self.setHeader("Server", self.synapse_site.server_version_string) @@ -342,6 +348,7 @@ def render(self, resrc: Resource) -> None: servlet=self.request_metrics.name, **{SERVER_NAME_LABEL: self.our_server_name}, ).inc() + logger.info("asdf->SynapseRequest.render done") @contextlib.contextmanager def processing(self) -> Generator[None, None, None]: diff --git a/synapse/logging/context.py b/synapse/logging/context.py index 7ea3f3d726d..f1d459b2fbf 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -51,6 +51,7 @@ import attr from typing_extensions import ParamSpec +from traceback import format_stack from twisted.internet import defer, threads from twisted.python.threadpool import ThreadPool @@ -379,6 +380,14 @@ def set_current_context( def __enter__(self) -> "LoggingContext": """Enters this logging context into thread local storage""" + logger.debug( + "LoggingContext(%s) enter (previous_context=%s) (source: %s)", + self, + self.previous_context, + format_stack()[-2] + .replace("/home/eric/Documents/github/element/synapse/", "") + .replace("\n", "->"), + ) old_context = set_current_context(self) if self.previous_context != old_context: logcontext_error( @@ -401,13 +410,24 @@ def __exit__( Returns: None to avoid suppressing any exceptions that were thrown. """ + logger.debug( + "LoggingContext(%s) exit: returning to previous_context=%s (source: %s)", + self, + self.previous_context, + format_stack()[-2] + .replace("/home/eric/Documents/github/element/synapse/", "") + .replace("\n", "->"), + ) current = set_current_context(self.previous_context) if current is not self: if current is SENTINEL_CONTEXT: - logcontext_error("Expected logging context %s was lost" % (self,)) + logcontext_error( + "LoggingContext: Expected logging context %s was lost" % (self,) + ) else: logcontext_error( - "Expected logging context %s but found %s" % (self, current) + "LoggingContext: Expected logging context %s but found %s" + % (self, current) ) # the fact that we are here suggests that the caller thinks that everything @@ -628,6 +648,14 @@ def __init__( self._new_context = new_context def __enter__(self) -> None: + logger.debug( + "PreserveLoggingContext(%s) enter (old_context=%s) (source: %s)", + self._new_context, + current_context(), + format_stack()[-2] + .replace("/home/eric/Documents/github/element/synapse/", "") + .replace("\n", "->"), + ) self._old_context = set_current_context(self._new_context) def __exit__( @@ -636,16 +664,25 @@ def __exit__( value: Optional[BaseException], traceback: Optional[TracebackType], ) -> None: + logger.debug( + "PreserveLoggingContext(%s) exit returning to old_context=%s (source: %s)", + self._new_context, + self._old_context, + format_stack()[-2] + .replace("/home/eric/Documents/github/element/synapse/", "") + .replace("\n", "->"), + ) context = set_current_context(self._old_context) if context != self._new_context: if not context: logcontext_error( - "Expected logging context %s was lost" % (self._new_context,) + "PreserveLoggingContext: Expected logging context %s was lost" + % (self._new_context,) ) else: logcontext_error( - "Expected logging context %s but found %s" + "PreserveLoggingContext: Expected logging context %s but found %s" % ( self._new_context, context, @@ -677,6 +714,10 @@ def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSe current = current_context() + logger.debug( + "set_current_context(%s) old_context=%s - %s", context, current, format_stack() + ) + if current is not context: rusage = get_thread_resource_usage() current.stop(rusage) From 0a865fd2ab6b4608ac9e141b92b8dabf4c913236 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 4 Sep 2025 12:20:06 -0500 Subject: [PATCH 08/33] Revert "Fix `run_as_background_process` not following Synapse log context rules" This reverts commit 675d94a0ec09b66121cae28cf45b45663604ed59. Things get stuck with this, see https://github.com/element-hq/synapse/pull/18870#discussion_r2322891926 --- synapse/metrics/background_process_metrics.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index ffa69e82103..f7f2d88885e 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -47,7 +47,7 @@ from synapse.logging.context import ( ContextResourceUsage, LoggingContext, - make_deferred_yieldable, + PreserveLoggingContext, ) from synapse.logging.opentracing import SynapseTags, start_active_span from synapse.metrics import SERVER_NAME_LABEL @@ -223,9 +223,10 @@ def run_as_background_process( This should be used to wrap processes which are fired off to run in the background, instead of being associated with a particular request. - It returns a Deferred which completes when the function completes, which makes it - appropriate for passing to clock.looping_call and friends (or for - firing-and-forgetting in the middle of a normal synapse async function). + It returns a Deferred which completes when the function completes, but it doesn't + follow the synapse logcontext rules, which makes it appropriate for passing to + clock.looping_call and friends (or for firing-and-forgetting in the middle of a + normal synapse async function). Args: desc: a description for this background process type @@ -240,6 +241,8 @@ def run_as_background_process( Returns: Deferred which returns the result of func, or `None` if func raises. + Note that the returned Deferred does not follow the synapse logcontext + rules. """ async def run() -> Optional[R]: @@ -277,9 +280,10 @@ async def run() -> Optional[R]: name=desc, **{SERVER_NAME_LABEL: server_name} ).dec() + with PreserveLoggingContext(): # Note that we return a Deferred here so that it can be used in a # looping_call and other places that expect a Deferred. - return make_deferred_yieldable(defer.ensureDeferred(run())) + return defer.ensureDeferred(run()) P = ParamSpec("P") From b97b2ddac6e63b23e1a31779d491b808c3417036 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 5 Sep 2025 18:50:50 -0500 Subject: [PATCH 09/33] Debug: Eliminate variables Reproduction: 1. `poetry run synapse_homeserver --config-path homeserver.yaml` 1. Ctrl + C to stop the server 1. Notice `LoggingContext: Expected logging context main was lost` in the logs --- synapse/handlers/federation.py | 17 ++- synapse/logging/context.py | 144 +++++++++++++++++- synapse/metrics/background_process_metrics.py | 8 +- synapse/storage/database.py | 2 +- synapse/storage/databases/main/room.py | 40 ++--- 5 files changed, 175 insertions(+), 36 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 34aae7ef3ce..e9854bdd0f5 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -188,7 +188,8 @@ def __init__(self, hs: "HomeServer"): # were shut down. if not hs.config.worker.worker_app: run_as_background_process( - "resume_sync_partial_state_room", + # "resume_sync_partial_state_room", + "rspsr", self.server_name, self._resume_partial_state_room_sync, ) @@ -1788,15 +1789,17 @@ async def get_room_complexity( async def _resume_partial_state_room_sync(self) -> None: """Resumes resyncing of all partial-state rooms after a restart.""" + logger.info("asdf->_resume_partial_state_room_sync") assert not self.config.worker.worker_app partial_state_rooms = await self.store.get_partial_state_room_resync_info() - for room_id, resync_info in partial_state_rooms.items(): - self._start_partial_state_room_sync( - initial_destination=resync_info.joined_via, - other_destinations=resync_info.servers_in_room, - room_id=room_id, - ) + # for room_id, resync_info in partial_state_rooms.items(): + # self._start_partial_state_room_sync( + # initial_destination=resync_info.joined_via, + # other_destinations=resync_info.servers_in_room, + # room_id=room_id, + # ) + logger.info("asdf->_resume_partial_state_room_sync done") def _start_partial_state_room_sync( self, diff --git a/synapse/logging/context.py b/synapse/logging/context.py index f1d459b2fbf..b67adf31dab 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -49,6 +49,8 @@ overload, ) +import secrets +import string import attr from typing_extensions import ParamSpec from traceback import format_stack @@ -386,6 +388,10 @@ def __enter__(self) -> "LoggingContext": self.previous_context, format_stack()[-2] .replace("/home/eric/Documents/github/element/synapse/", "") + .replace( + "/home/eric/.cache/pypoetry/virtualenvs/matrix-synapse-xCtC9ulO-py3.13/lib/", + "", + ) .replace("\n", "->"), ) old_context = set_current_context(self) @@ -416,6 +422,10 @@ def __exit__( self.previous_context, format_stack()[-2] .replace("/home/eric/Documents/github/element/synapse/", "") + .replace( + "/home/eric/.cache/pypoetry/virtualenvs/matrix-synapse-xCtC9ulO-py3.13/lib/", + "", + ) .replace("\n", "->"), ) current = set_current_context(self.previous_context) @@ -635,25 +645,39 @@ def filter(self, record: logging.LogRecord) -> Literal[True]: return True +def random_string(length: int) -> str: + """Generate a cryptographically secure string of random letters. + + Drawn from the characters: `a-z` and `A-Z` + """ + return "".join(secrets.choice(string.ascii_letters) for _ in range(length)) + + class PreserveLoggingContext: """Context manager which replaces the logging context The previous logging context is restored on exit.""" - __slots__ = ["_old_context", "_new_context"] + __slots__ = ["_old_context", "_new_context", "_instance_id"] def __init__( self, new_context: LoggingContextOrSentinel = SENTINEL_CONTEXT ) -> None: self._new_context = new_context + self._instance_id = random_string(5) def __enter__(self) -> None: logger.debug( - "PreserveLoggingContext(%s) enter (old_context=%s) (source: %s)", + "PreserveLoggingContext(%s) %s enter (old_context=%s) (source: %s)", self._new_context, + self._instance_id, current_context(), format_stack()[-2] .replace("/home/eric/Documents/github/element/synapse/", "") + .replace( + "/home/eric/.cache/pypoetry/virtualenvs/matrix-synapse-xCtC9ulO-py3.13/lib/", + "", + ) .replace("\n", "->"), ) self._old_context = set_current_context(self._new_context) @@ -665,11 +689,16 @@ def __exit__( traceback: Optional[TracebackType], ) -> None: logger.debug( - "PreserveLoggingContext(%s) exit returning to old_context=%s (source: %s)", + "PreserveLoggingContext(%s) %s exit returning to old_context=%s (source: %s)", self._new_context, + self._instance_id, self._old_context, format_stack()[-2] .replace("/home/eric/Documents/github/element/synapse/", "") + .replace( + "/home/eric/.cache/pypoetry/virtualenvs/matrix-synapse-xCtC9ulO-py3.13/lib/", + "", + ) .replace("\n", "->"), ) context = set_current_context(self._old_context) @@ -715,7 +744,16 @@ def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSe current = current_context() logger.debug( - "set_current_context(%s) old_context=%s - %s", context, current, format_stack() + "| set_current_context(%s) old_context=%s - %s", + context, + current, + [ + x.replace("/home/eric/Documents/github/element/synapse/", "").replace( + "/home/eric/.cache/pypoetry/virtualenvs/matrix-synapse-xCtC9ulO-py3.13/lib/", + "", + ) + for x in format_stack() + ], ) if current is not context: @@ -837,7 +875,25 @@ def run_in_background( CRITICAL error about an unhandled error will be logged without much indication about where it came from. """ + instance_id = random_string(5) + stack = ( + format_stack()[-2] + .replace("/home/eric/Documents/github/element/synapse/", "") + .replace( + "/home/eric/.cache/pypoetry/virtualenvs/matrix-synapse-xCtC9ulO-py3.13/lib/", + "", + ) + .replace("\n", "->") + ) + current = current_context() + logger.info( + "asdf run_in_background1 %s start context=%s - %s", + instance_id, + current, + stack, + ) + try: res = f(*args, **kwargs) except Exception: @@ -868,6 +924,12 @@ def run_in_background( # The function may have reset the context before returning, so # we need to restore it now. + logger.info( + "asdf run_in_background2 %s restore log context=%s - %s", + instance_id, + current, + stack, + ) ctx = set_current_context(current) # The original context will be restored when the deferred @@ -882,7 +944,17 @@ def run_in_background( # which is supposed to have a single entry and exit point. But # by spawning off another deferred, we are effectively # adding a new exit point.) - d.addBoth(_set_context_cb, ctx) + + def _asdf(result: ResultT, context: LoggingContextOrSentinel) -> ResultT: + logger.info( + "asdf run_in_background3 %s reset log context at end ctx=%s - %s", + instance_id, + ctx, + stack, + ) + return _set_context_cb(result, context) + + d.addBoth(_asdf, ctx) return d @@ -901,6 +973,15 @@ def run_coroutine_in_background( do not run until called, and so calling an async function without awaiting cannot change the log contexts. """ + stack = ( + format_stack()[-2] + .replace("/home/eric/Documents/github/element/synapse/", "") + .replace( + "/home/eric/.cache/pypoetry/virtualenvs/matrix-synapse-xCtC9ulO-py3.13/lib/", + "", + ) + .replace("\n", "->") + ) current = current_context() d = defer.ensureDeferred(coroutine) @@ -921,7 +1002,15 @@ def run_coroutine_in_background( # which is supposed to have a single entry and exit point. But # by spawning off another deferred, we are effectively # adding a new exit point.) - d.addBoth(_set_context_cb, ctx) + def _asdf(result: ResultT, context: LoggingContextOrSentinel) -> ResultT: + logger.info( + "asdf run_coroutine_in_background reset log context at end ctx=%s - %s", + ctx, + stack, + ) + return _set_context_cb(result, context) + + d.addBoth(_asdf, ctx) return d @@ -940,6 +1029,30 @@ def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T] (This is more-or-less the opposite operation to run_in_background.) """ + instance_id = random_string(5) + stack = ( + format_stack()[-2] + .replace("/home/eric/Documents/github/element/synapse/", "") + .replace( + "/home/eric/.cache/pypoetry/virtualenvs/matrix-synapse-xCtC9ulO-py3.13/lib/", + "", + ) + .replace("\n", "->") + ) + full_stack = [ + x.replace("/home/eric/Documents/github/element/synapse/", "").replace( + "/home/eric/.cache/pypoetry/virtualenvs/matrix-synapse-xCtC9ulO-py3.13/lib/", + "", + ) + for x in format_stack() + ] + logger.info( + "asdf make_deferred_yieldable1 %s start - %s - full stack=%s", + instance_id, + stack, + full_stack, + ) + if deferred.called and not deferred.paused: # it looks like this deferred is ready to run any callbacks we give it # immediately. We may as well optimise out the logcontext faffery. @@ -947,8 +1060,25 @@ def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T] # ok, we can't be sure that a yield won't block, so let's reset the # logcontext, and add a callback to the deferred to restore it. + logger.info( + "asdf make_deferred_yieldable2 %s reset log context - %s - full stack=%s", + instance_id, + stack, + full_stack, + ) prev_context = set_current_context(SENTINEL_CONTEXT) - deferred.addBoth(_set_context_cb, prev_context) + + def _asdf(result: ResultT, context: LoggingContextOrSentinel) -> ResultT: + logger.info( + "asdf make_deferred_yieldable3 %s restore log context at end ctx=%s - %s - full stack=%s", + instance_id, + prev_context, + stack, + full_stack, + ) + return _set_context_cb(result, context) + + deferred.addBoth(_asdf, prev_context) return deferred diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index f7f2d88885e..b2f65928808 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -48,6 +48,7 @@ ContextResourceUsage, LoggingContext, PreserveLoggingContext, + run_in_background, ) from synapse.logging.opentracing import SynapseTags, start_active_span from synapse.metrics import SERVER_NAME_LABEL @@ -244,6 +245,9 @@ def run_as_background_process( Note that the returned Deferred does not follow the synapse logcontext rules. """ + # TODO: Remove + if desc != "rspsr": + return async def run() -> Optional[R]: with _bg_metrics_lock: @@ -258,7 +262,7 @@ async def run() -> Optional[R]: ).inc() with BackgroundProcessLoggingContext( - name=desc, server_name=server_name, instance_id=count + name="bg-" + desc, server_name=server_name, instance_id=count ) as context: try: if bg_start_span: @@ -285,6 +289,8 @@ async def run() -> Optional[R]: # looping_call and other places that expect a Deferred. return defer.ensureDeferred(run()) + # return run_in_background(run) + P = ParamSpec("P") diff --git a/synapse/storage/database.py b/synapse/storage/database.py index f7aec16c969..63cda70b9ba 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1049,7 +1049,7 @@ def inner_func(conn: _PoolConnection, *args: P.args, **kwargs: P.kwargs) -> R: assert not self.engine.in_transaction(conn) with LoggingContext( - str(curr_context), parent_context=parent_context + "db-" + str(curr_context), parent_context=parent_context ) as context: with opentracing.start_active_span( operation_name="db.connection", diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 6ffc3aed34b..bb2e7eaa5b3 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1479,26 +1479,26 @@ async def get_partial_state_room_resync_info( for room_id, joined_via in rows: room_servers[room_id] = PartialStateResyncInfo(joined_via=joined_via) - rows = cast( - List[Tuple[str, str]], - await self.db_pool.simple_select_list( - "partial_state_rooms_servers", - keyvalues=None, - retcols=("room_id", "server_name"), - desc="get_partial_state_rooms", - ), - ) - - for room_id, server_name in rows: - entry = room_servers.get(room_id) - if entry is None: - # There is a foreign key constraint which enforces that every room_id in - # partial_state_rooms_servers appears in partial_state_rooms. So we - # expect `entry` to be non-null. (This reasoning fails if we've - # partial-joined between the two SELECTs, but this is unlikely to happen - # in practice.) - continue - entry.servers_in_room.add(server_name) + # rows = cast( + # List[Tuple[str, str]], + # await self.db_pool.simple_select_list( + # "partial_state_rooms_servers", + # keyvalues=None, + # retcols=("room_id", "server_name"), + # desc="get_partial_state_rooms", + # ), + # ) + + # for room_id, server_name in rows: + # entry = room_servers.get(room_id) + # if entry is None: + # # There is a foreign key constraint which enforces that every room_id in + # # partial_state_rooms_servers appears in partial_state_rooms. So we + # # expect `entry` to be non-null. (This reasoning fails if we've + # # partial-joined between the two SELECTs, but this is unlikely to happen + # # in practice.) + # continue + # entry.servers_in_room.add(server_name) return room_servers From 301a71449c87027847e627b2c516700be7515627 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 5 Sep 2025 21:59:12 -0500 Subject: [PATCH 10/33] DEBUG: test --- synapse/handlers/federation.py | 14 ++++- synapse/metrics/background_process_metrics.py | 6 ++- tests/util/test_logcontext.py | 52 ++++++++++++++++++- 3 files changed, 66 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index e9854bdd0f5..4708b087e4c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -39,6 +39,7 @@ Union, ) +from twisted.internet import defer import attr from prometheus_client import Histogram from signedjson.key import decode_verify_key_bytes @@ -69,7 +70,7 @@ from synapse.federation.federation_client import InvalidResponseError from synapse.handlers.pagination import PURGE_PAGINATION_LOCK_NAME from synapse.http.servlet import assert_params_in_dict -from synapse.logging.context import nested_logging_context +from synapse.logging.context import nested_logging_context, make_deferred_yieldable from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace from synapse.metrics import SERVER_NAME_LABEL from synapse.metrics.background_process_metrics import run_as_background_process @@ -1792,7 +1793,16 @@ async def _resume_partial_state_room_sync(self) -> None: logger.info("asdf->_resume_partial_state_room_sync") assert not self.config.worker.worker_app - partial_state_rooms = await self.store.get_partial_state_room_resync_info() + # a function which returns an incomplete deferred, but doesn't follow + # the synapse rules. + def blocking_function() -> defer.Deferred: + d: defer.Deferred = defer.Deferred() + self.hs.get_reactor().callLater(0, d.callback, None) + return d + + await make_deferred_yieldable(blocking_function()) + + # partial_state_rooms = await self.store.get_partial_state_room_resync_info() # for room_id, resync_info in partial_state_rooms.items(): # self._start_partial_state_room_sync( # initial_destination=resync_info.joined_via, diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index b2f65928808..aa62778252a 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -49,6 +49,7 @@ LoggingContext, PreserveLoggingContext, run_in_background, + make_deferred_yieldable, ) from synapse.logging.opentracing import SynapseTags, start_active_span from synapse.metrics import SERVER_NAME_LABEL @@ -246,8 +247,8 @@ def run_as_background_process( rules. """ # TODO: Remove - if desc != "rspsr": - return + # if desc != "rspsr": + # return async def run() -> Optional[R]: with _bg_metrics_lock: @@ -289,6 +290,7 @@ async def run() -> Optional[R]: # looping_call and other places that expect a Deferred. return defer.ensureDeferred(run()) + # return make_deferred_yieldable(defer.ensureDeferred(run())) # return run_in_background(run) diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py index af36e685d7c..b6e25a4971a 100644 --- a/tests/util/test_logcontext.py +++ b/tests/util/test_logcontext.py @@ -27,12 +27,14 @@ from synapse.logging.context import ( SENTINEL_CONTEXT, LoggingContext, + _Sentinel, PreserveLoggingContext, current_context, make_deferred_yieldable, nested_logging_context, run_in_background, ) +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import ISynapseReactor from synapse.util import Clock @@ -44,8 +46,12 @@ class LoggingContextTestCase(unittest.TestCase): def _check_test_key(self, value: str) -> None: context = current_context() - assert isinstance(context, LoggingContext) - self.assertEqual(context.name, value) + assert isinstance(context, LoggingContext) or isinstance(context, _Sentinel), ( + f"Expected LoggingContext({value}) but saw {context}" + ) + self.assertEqual( + str(context), value, f"Expected LoggingContext({value}) but saw {context}" + ) def test_with_context(self) -> None: with LoggingContext("test"): @@ -187,6 +193,48 @@ def test_nested_logging_context(self) -> None: nested_context = nested_logging_context(suffix="bar") self.assertEqual(nested_context.name, "foo-bar") + async def test_asdf(self) -> defer.Deferred: + sentinel_context = current_context() + callback_completed = False + + async def testfunc() -> None: + print("testfunc1=%s", current_context()) + + # a function which returns an incomplete deferred, but doesn't follow + # the synapse rules. + def blocking_function() -> defer.Deferred: + d: defer.Deferred = defer.Deferred() + reactor.callLater(0, d.callback, None) + return d + + await make_deferred_yieldable(blocking_function()) + print("testfunc2=%s", current_context()) + callback_completed = True + + print("1=%s", current_context()) + self._check_test_key("sentinel") + with LoggingContext("main"): + print("2=%s", current_context()) + self._check_test_key("main") + + bg_process_d = run_as_background_process( + "bg_process", + server_name="test_server", + func=testfunc, + ) + + print("3=%s", current_context()) + self._check_test_key("main") + + # Wait for callback_completed + await bg_process_d + + print("4=%s", current_context()) + self._check_test_key("main") + + # Test is done when the deferred finishes. + return bg_process_d + # a function which returns a deferred which has been "called", but # which had a function which returned another incomplete deferred on From c1996b62cfcd72badcec146bece8ca4236a9bf19 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 8 Sep 2025 10:59:25 -0500 Subject: [PATCH 11/33] WIP: Pair with @erikjohnston --- synapse/app/_base.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 348d13d26a1..68622f7ac64 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -80,6 +80,7 @@ from synapse.module_api.callbacks.third_party_event_rules_callbacks import ( load_legacy_third_party_event_rules, ) +from synapse.logging.context import PreserveLoggingContext from synapse.types import ISynapseReactor, StrCollection from synapse.util import SYNAPSE_VERSION from synapse.util.caches.lrucache import setup_expire_lru_cache_entries @@ -182,7 +183,13 @@ def run() -> None: if gc_thresholds: gc.set_threshold(*gc_thresholds) install_gc_manager() - run_command() + + # Reset the logging context when we start the reactor (whenever we yield control + # to the reactor, the `sentinel` logging context needs to be set so we don't + # leak the current logging context and erroneously apply it to the next task the + # reactor event loop picks up) + with PreserveLoggingContext(): + run_command() if daemonize: assert pid_file is not None @@ -233,6 +240,7 @@ def redirect_stdio_to_logs() -> None: print("Redirected stdout/stderr to logs") +# TODO: Re-establish log context at this point def register_start( cb: Callable[P, Awaitable], *args: P.args, **kwargs: P.kwargs ) -> None: From d8f68d09dd48fffa70c5a5b6f7075a37d5ff171a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 8 Sep 2025 12:34:42 -0500 Subject: [PATCH 12/33] Explain the `sentinel` context better --- docs/log_contexts.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/docs/log_contexts.md b/docs/log_contexts.md index 9d087d11ef2..0403b274a30 100644 --- a/docs/log_contexts.md +++ b/docs/log_contexts.md @@ -59,6 +59,23 @@ def do_request_handling(): logger.debug("phew") ``` +### The `sentinel` context + +The default context is `synapse.logging.context.SENTINEL_CONTEXT`, which is an empty +sentinel value to represent the root context. This is what is used when there is no +other context set. The phrase "clear the logcontext" means to set the current context to +the `sentinel` context. + +No CPU/database usage metrics are recorded against the `sentinel` context. + +Ideally, nothing from the Synapse homeserver would be logged against the `sentinel` +context as we want to know which server the logs came from. In practice, this is not +always the case yet especially outside of request handling. + +Whenever yielding control back to the reactor (event loop), the `sentinel` logcontext is +used to avoid leaking the current logcontext into the other reactor which would +erroneously get attached to the next operation picked up by the event loop. + ## Using logcontexts with awaitables Awaitables break the linear flow of code so that there is no longer a single entry point From 71cd3c43cea1fb04b1ada13c1abc572f1e760543 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 8 Sep 2025 12:35:22 -0500 Subject: [PATCH 13/33] Clarify when to use the `sentinel` context --- docs/log_contexts.md | 2 +- synapse/logging/context.py | 11 ++++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/docs/log_contexts.md b/docs/log_contexts.md index 0403b274a30..ae3c3fee70a 100644 --- a/docs/log_contexts.md +++ b/docs/log_contexts.md @@ -73,7 +73,7 @@ context as we want to know which server the logs came from. In practice, this is always the case yet especially outside of request handling. Whenever yielding control back to the reactor (event loop), the `sentinel` logcontext is -used to avoid leaking the current logcontext into the other reactor which would +set to avoid leaking the current logcontext into the other reactor which would erroneously get attached to the next operation picked up by the event loop. ## Using logcontexts with awaitables diff --git a/synapse/logging/context.py b/synapse/logging/context.py index b67adf31dab..10085c17f87 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -230,7 +230,16 @@ class ContextRequest: class _Sentinel: - """Sentinel to represent the root context""" + """ + Sentinel to represent the root context + + This should only be used for tasks outside of Synapse like when we yield control + back to the Twisted reactor (event loop) so we don't leak the current logging + context to other tasks that are scheduled next in the event loop. + + Nothing from the Synapse homeserver should be logged with the sentinel context. i.e. + we should always know which server the logs are coming from. + """ __slots__ = ["previous_context", "finished", "request", "tag"] From 0e990e67e6e161578b68727a74b4f68c42e7d25c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 8 Sep 2025 13:18:54 -0500 Subject: [PATCH 14/33] Explain logging context in `run_as_background_process` --- synapse/handlers/delayed_events.py | 9 +++++++++ synapse/metrics/background_process_metrics.py | 18 ++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/synapse/handlers/delayed_events.py b/synapse/handlers/delayed_events.py index a6749801a50..c84e640a30c 100644 --- a/synapse/handlers/delayed_events.py +++ b/synapse/handlers/delayed_events.py @@ -328,6 +328,9 @@ async def cancel(self, requester: Requester, delay_id: str) -> None: requester, (requester.user.to_string(), requester.device_id), ) + # TODO: Because the deferred returned by `run_as_background_process` does not + # follow the synapse logcontext rules, this probably messes with log contexts. + # Use `make_deferred_yieldable` await self._initialized_from_db next_send_ts = await self._store.cancel_delayed_event( @@ -354,6 +357,9 @@ async def restart(self, requester: Requester, delay_id: str) -> None: requester, (requester.user.to_string(), requester.device_id), ) + # TODO: Because the deferred returned by `run_as_background_process` does not + # follow the synapse logcontext rules, this probably messes with log contexts. + # Use `make_deferred_yieldable` await self._initialized_from_db next_send_ts = await self._store.restart_delayed_event( @@ -380,6 +386,9 @@ async def send(self, requester: Requester, delay_id: str) -> None: # Use standard request limiter for sending delayed events on-demand, # as an on-demand send is similar to sending a regular event. await self._request_ratelimiter.ratelimit(requester) + # TODO: Because the deferred returned by `run_as_background_process` does not + # follow the synapse logcontext rules, this probably messes with log contexts. + # Use `make_deferred_yieldable` await self._initialized_from_db event, next_send_ts = await self._store.process_target_delayed_event( diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index aa62778252a..8d5d391c3c0 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -230,6 +230,11 @@ def run_as_background_process( clock.looping_call and friends (or for firing-and-forgetting in the middle of a normal synapse async function). + Because the returned Deferred does not follow the synapse logcontext rules, awaiting + the result of this function will result in the log context being cleared. In order + to properly await the result of this function and maintain the current log context, + use `make_deferred_yieldable`. + Args: desc: a description for this background process type server_name: The homeserver name that this background process is being run for @@ -285,7 +290,20 @@ async def run() -> Optional[R]: name=desc, **{SERVER_NAME_LABEL: server_name} ).dec() + # To explain how the log contexts work here: + # - When this function is called, the current context is stored (using + # `PreserveLoggingContext`), we kick off the background task, and we restore the + # original context before returning (also part of `PreserveLoggingContext`). + # - When the background task finishes, we don't want to leak our background context + # into the reactor which would erroneously get attached to the next operation + # picked up by the event loop. We use `PreserveLoggingContext` to set the + # `sentinel` context and means the new `BackgroundProcessLoggingContext` will + # remember the `sentinel` context as its previous context to return to when it + # exits and yields control back to the reactor. with PreserveLoggingContext(): + # The async `run` task is wrapped in a deferred, which will have the side effect + # of executing the coroutine. + # # Note that we return a Deferred here so that it can be used in a # looping_call and other places that expect a Deferred. return defer.ensureDeferred(run()) From 6a98096139367a9b5c5e8ee760151371b349a625 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 8 Sep 2025 13:21:20 -0500 Subject: [PATCH 15/33] Add potential future simplication of `run_as_background_process` --- synapse/metrics/background_process_metrics.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 8d5d391c3c0..e5d68f5447b 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -300,6 +300,9 @@ async def run() -> Optional[R]: # `sentinel` context and means the new `BackgroundProcessLoggingContext` will # remember the `sentinel` context as its previous context to return to when it # exits and yields control back to the reactor. + # + # TODO: I think we could simplify this whole block by using `return run_in_background(run)` + # which appears to give equivalent behaviour. with PreserveLoggingContext(): # The async `run` task is wrapped in a deferred, which will have the side effect # of executing the coroutine. From ea5a841678d201ab508e0c8de9af1dfac1800d88 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 8 Sep 2025 13:34:52 -0500 Subject: [PATCH 16/33] Add log context explanation to `run_in_background` --- synapse/logging/context.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/synapse/logging/context.py b/synapse/logging/context.py index 10085c17f87..d2a5e6fb18d 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -872,6 +872,14 @@ def run_in_background( return from the function, and that the sentinel context is set once the deferred returned by the function completes. + To explain how the log contexts work here: + - When this function is called, the current context is stored ("original"), we kick + off the background task, and we restore that original context before returning + - When the background task finishes, we don't want to leak our context into the + reactor which would erroneously get attached to the next operation picked up by + the event loop. We add a callback to the deferred which will clear the logging + context after it finishes and yields control back to the reactor. + Useful for wrapping functions that return a deferred or coroutine, which you don't yield or await on (for instance because you want to pass it to deferred.gatherResults()). @@ -963,6 +971,8 @@ def _asdf(result: ResultT, context: LoggingContextOrSentinel) -> ResultT: ) return _set_context_cb(result, context) + # TODO: Why aren't we using `sentinel` context here. We can't guarantee that `ctx` is + # `sentinel` here? d.addBoth(_asdf, ctx) return d From 1f4b3919e77b20d44f4a5d7eec36ec95cf852ea4 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 8 Sep 2025 21:12:03 -0500 Subject: [PATCH 17/33] Revert "Add log context explanation to `run_in_background`" This reverts commit ea5a841678d201ab508e0c8de9af1dfac1800d88. --- synapse/logging/context.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/synapse/logging/context.py b/synapse/logging/context.py index d2a5e6fb18d..10085c17f87 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -872,14 +872,6 @@ def run_in_background( return from the function, and that the sentinel context is set once the deferred returned by the function completes. - To explain how the log contexts work here: - - When this function is called, the current context is stored ("original"), we kick - off the background task, and we restore that original context before returning - - When the background task finishes, we don't want to leak our context into the - reactor which would erroneously get attached to the next operation picked up by - the event loop. We add a callback to the deferred which will clear the logging - context after it finishes and yields control back to the reactor. - Useful for wrapping functions that return a deferred or coroutine, which you don't yield or await on (for instance because you want to pass it to deferred.gatherResults()). @@ -971,8 +963,6 @@ def _asdf(result: ResultT, context: LoggingContextOrSentinel) -> ResultT: ) return _set_context_cb(result, context) - # TODO: Why aren't we using `sentinel` context here. We can't guarantee that `ctx` is - # `sentinel` here? d.addBoth(_asdf, ctx) return d From bbe1ee7b91108a763c031825e1c2574aecd494a0 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 8 Sep 2025 21:12:07 -0500 Subject: [PATCH 18/33] Revert "Add potential future simplication of `run_as_background_process`" This reverts commit 6a98096139367a9b5c5e8ee760151371b349a625. --- synapse/metrics/background_process_metrics.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index e5d68f5447b..8d5d391c3c0 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -300,9 +300,6 @@ async def run() -> Optional[R]: # `sentinel` context and means the new `BackgroundProcessLoggingContext` will # remember the `sentinel` context as its previous context to return to when it # exits and yields control back to the reactor. - # - # TODO: I think we could simplify this whole block by using `return run_in_background(run)` - # which appears to give equivalent behaviour. with PreserveLoggingContext(): # The async `run` task is wrapped in a deferred, which will have the side effect # of executing the coroutine. From b5ec2daba822cd7d2918bddc170ec9cddd5bc3ce Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 8 Sep 2025 21:12:09 -0500 Subject: [PATCH 19/33] Revert "Explain logging context in `run_as_background_process`" This reverts commit 0e990e67e6e161578b68727a74b4f68c42e7d25c. --- synapse/handlers/delayed_events.py | 9 --------- synapse/metrics/background_process_metrics.py | 18 ------------------ 2 files changed, 27 deletions(-) diff --git a/synapse/handlers/delayed_events.py b/synapse/handlers/delayed_events.py index c84e640a30c..a6749801a50 100644 --- a/synapse/handlers/delayed_events.py +++ b/synapse/handlers/delayed_events.py @@ -328,9 +328,6 @@ async def cancel(self, requester: Requester, delay_id: str) -> None: requester, (requester.user.to_string(), requester.device_id), ) - # TODO: Because the deferred returned by `run_as_background_process` does not - # follow the synapse logcontext rules, this probably messes with log contexts. - # Use `make_deferred_yieldable` await self._initialized_from_db next_send_ts = await self._store.cancel_delayed_event( @@ -357,9 +354,6 @@ async def restart(self, requester: Requester, delay_id: str) -> None: requester, (requester.user.to_string(), requester.device_id), ) - # TODO: Because the deferred returned by `run_as_background_process` does not - # follow the synapse logcontext rules, this probably messes with log contexts. - # Use `make_deferred_yieldable` await self._initialized_from_db next_send_ts = await self._store.restart_delayed_event( @@ -386,9 +380,6 @@ async def send(self, requester: Requester, delay_id: str) -> None: # Use standard request limiter for sending delayed events on-demand, # as an on-demand send is similar to sending a regular event. await self._request_ratelimiter.ratelimit(requester) - # TODO: Because the deferred returned by `run_as_background_process` does not - # follow the synapse logcontext rules, this probably messes with log contexts. - # Use `make_deferred_yieldable` await self._initialized_from_db event, next_send_ts = await self._store.process_target_delayed_event( diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 8d5d391c3c0..aa62778252a 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -230,11 +230,6 @@ def run_as_background_process( clock.looping_call and friends (or for firing-and-forgetting in the middle of a normal synapse async function). - Because the returned Deferred does not follow the synapse logcontext rules, awaiting - the result of this function will result in the log context being cleared. In order - to properly await the result of this function and maintain the current log context, - use `make_deferred_yieldable`. - Args: desc: a description for this background process type server_name: The homeserver name that this background process is being run for @@ -290,20 +285,7 @@ async def run() -> Optional[R]: name=desc, **{SERVER_NAME_LABEL: server_name} ).dec() - # To explain how the log contexts work here: - # - When this function is called, the current context is stored (using - # `PreserveLoggingContext`), we kick off the background task, and we restore the - # original context before returning (also part of `PreserveLoggingContext`). - # - When the background task finishes, we don't want to leak our background context - # into the reactor which would erroneously get attached to the next operation - # picked up by the event loop. We use `PreserveLoggingContext` to set the - # `sentinel` context and means the new `BackgroundProcessLoggingContext` will - # remember the `sentinel` context as its previous context to return to when it - # exits and yields control back to the reactor. with PreserveLoggingContext(): - # The async `run` task is wrapped in a deferred, which will have the side effect - # of executing the coroutine. - # # Note that we return a Deferred here so that it can be used in a # looping_call and other places that expect a Deferred. return defer.ensureDeferred(run()) From 4aa0aa0aa5d0fe52b6a5cd2280e47305d90583e8 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 8 Sep 2025 21:12:10 -0500 Subject: [PATCH 20/33] Revert "Clarify when to use the `sentinel` context" This reverts commit 71cd3c43cea1fb04b1ada13c1abc572f1e760543. --- docs/log_contexts.md | 2 +- synapse/logging/context.py | 11 +---------- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/docs/log_contexts.md b/docs/log_contexts.md index ae3c3fee70a..0403b274a30 100644 --- a/docs/log_contexts.md +++ b/docs/log_contexts.md @@ -73,7 +73,7 @@ context as we want to know which server the logs came from. In practice, this is always the case yet especially outside of request handling. Whenever yielding control back to the reactor (event loop), the `sentinel` logcontext is -set to avoid leaking the current logcontext into the other reactor which would +used to avoid leaking the current logcontext into the other reactor which would erroneously get attached to the next operation picked up by the event loop. ## Using logcontexts with awaitables diff --git a/synapse/logging/context.py b/synapse/logging/context.py index 10085c17f87..b67adf31dab 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -230,16 +230,7 @@ class ContextRequest: class _Sentinel: - """ - Sentinel to represent the root context - - This should only be used for tasks outside of Synapse like when we yield control - back to the Twisted reactor (event loop) so we don't leak the current logging - context to other tasks that are scheduled next in the event loop. - - Nothing from the Synapse homeserver should be logged with the sentinel context. i.e. - we should always know which server the logs are coming from. - """ + """Sentinel to represent the root context""" __slots__ = ["previous_context", "finished", "request", "tag"] From 37a388c767b600dd7172b787d4992531320babe7 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 8 Sep 2025 21:12:11 -0500 Subject: [PATCH 21/33] Revert "Explain the `sentinel` context better" This reverts commit d8f68d09dd48fffa70c5a5b6f7075a37d5ff171a. --- docs/log_contexts.md | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/docs/log_contexts.md b/docs/log_contexts.md index 0403b274a30..9d087d11ef2 100644 --- a/docs/log_contexts.md +++ b/docs/log_contexts.md @@ -59,23 +59,6 @@ def do_request_handling(): logger.debug("phew") ``` -### The `sentinel` context - -The default context is `synapse.logging.context.SENTINEL_CONTEXT`, which is an empty -sentinel value to represent the root context. This is what is used when there is no -other context set. The phrase "clear the logcontext" means to set the current context to -the `sentinel` context. - -No CPU/database usage metrics are recorded against the `sentinel` context. - -Ideally, nothing from the Synapse homeserver would be logged against the `sentinel` -context as we want to know which server the logs came from. In practice, this is not -always the case yet especially outside of request handling. - -Whenever yielding control back to the reactor (event loop), the `sentinel` logcontext is -used to avoid leaking the current logcontext into the other reactor which would -erroneously get attached to the next operation picked up by the event loop. - ## Using logcontexts with awaitables Awaitables break the linear flow of code so that there is no longer a single entry point From 4b61570cd134c310a8a31134440fd75c6c19fa24 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 8 Sep 2025 21:12:23 -0500 Subject: [PATCH 22/33] Revert "DEBUG: test" This reverts commit 301a71449c87027847e627b2c516700be7515627. --- synapse/handlers/federation.py | 14 +---- synapse/metrics/background_process_metrics.py | 6 +-- tests/util/test_logcontext.py | 52 +------------------ 3 files changed, 6 insertions(+), 66 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 4708b087e4c..e9854bdd0f5 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -39,7 +39,6 @@ Union, ) -from twisted.internet import defer import attr from prometheus_client import Histogram from signedjson.key import decode_verify_key_bytes @@ -70,7 +69,7 @@ from synapse.federation.federation_client import InvalidResponseError from synapse.handlers.pagination import PURGE_PAGINATION_LOCK_NAME from synapse.http.servlet import assert_params_in_dict -from synapse.logging.context import nested_logging_context, make_deferred_yieldable +from synapse.logging.context import nested_logging_context from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace from synapse.metrics import SERVER_NAME_LABEL from synapse.metrics.background_process_metrics import run_as_background_process @@ -1793,16 +1792,7 @@ async def _resume_partial_state_room_sync(self) -> None: logger.info("asdf->_resume_partial_state_room_sync") assert not self.config.worker.worker_app - # a function which returns an incomplete deferred, but doesn't follow - # the synapse rules. - def blocking_function() -> defer.Deferred: - d: defer.Deferred = defer.Deferred() - self.hs.get_reactor().callLater(0, d.callback, None) - return d - - await make_deferred_yieldable(blocking_function()) - - # partial_state_rooms = await self.store.get_partial_state_room_resync_info() + partial_state_rooms = await self.store.get_partial_state_room_resync_info() # for room_id, resync_info in partial_state_rooms.items(): # self._start_partial_state_room_sync( # initial_destination=resync_info.joined_via, diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index aa62778252a..b2f65928808 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -49,7 +49,6 @@ LoggingContext, PreserveLoggingContext, run_in_background, - make_deferred_yieldable, ) from synapse.logging.opentracing import SynapseTags, start_active_span from synapse.metrics import SERVER_NAME_LABEL @@ -247,8 +246,8 @@ def run_as_background_process( rules. """ # TODO: Remove - # if desc != "rspsr": - # return + if desc != "rspsr": + return async def run() -> Optional[R]: with _bg_metrics_lock: @@ -290,7 +289,6 @@ async def run() -> Optional[R]: # looping_call and other places that expect a Deferred. return defer.ensureDeferred(run()) - # return make_deferred_yieldable(defer.ensureDeferred(run())) # return run_in_background(run) diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py index b6e25a4971a..af36e685d7c 100644 --- a/tests/util/test_logcontext.py +++ b/tests/util/test_logcontext.py @@ -27,14 +27,12 @@ from synapse.logging.context import ( SENTINEL_CONTEXT, LoggingContext, - _Sentinel, PreserveLoggingContext, current_context, make_deferred_yieldable, nested_logging_context, run_in_background, ) -from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import ISynapseReactor from synapse.util import Clock @@ -46,12 +44,8 @@ class LoggingContextTestCase(unittest.TestCase): def _check_test_key(self, value: str) -> None: context = current_context() - assert isinstance(context, LoggingContext) or isinstance(context, _Sentinel), ( - f"Expected LoggingContext({value}) but saw {context}" - ) - self.assertEqual( - str(context), value, f"Expected LoggingContext({value}) but saw {context}" - ) + assert isinstance(context, LoggingContext) + self.assertEqual(context.name, value) def test_with_context(self) -> None: with LoggingContext("test"): @@ -193,48 +187,6 @@ def test_nested_logging_context(self) -> None: nested_context = nested_logging_context(suffix="bar") self.assertEqual(nested_context.name, "foo-bar") - async def test_asdf(self) -> defer.Deferred: - sentinel_context = current_context() - callback_completed = False - - async def testfunc() -> None: - print("testfunc1=%s", current_context()) - - # a function which returns an incomplete deferred, but doesn't follow - # the synapse rules. - def blocking_function() -> defer.Deferred: - d: defer.Deferred = defer.Deferred() - reactor.callLater(0, d.callback, None) - return d - - await make_deferred_yieldable(blocking_function()) - print("testfunc2=%s", current_context()) - callback_completed = True - - print("1=%s", current_context()) - self._check_test_key("sentinel") - with LoggingContext("main"): - print("2=%s", current_context()) - self._check_test_key("main") - - bg_process_d = run_as_background_process( - "bg_process", - server_name="test_server", - func=testfunc, - ) - - print("3=%s", current_context()) - self._check_test_key("main") - - # Wait for callback_completed - await bg_process_d - - print("4=%s", current_context()) - self._check_test_key("main") - - # Test is done when the deferred finishes. - return bg_process_d - # a function which returns a deferred which has been "called", but # which had a function which returned another incomplete deferred on From ae8055a313b452f077d5b08f2ad91c741914427e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 8 Sep 2025 21:12:29 -0500 Subject: [PATCH 23/33] Revert "Debug: Eliminate variables" This reverts commit b97b2ddac6e63b23e1a31779d491b808c3417036. --- synapse/handlers/federation.py | 17 +-- synapse/logging/context.py | 144 +----------------- synapse/metrics/background_process_metrics.py | 8 +- synapse/storage/database.py | 2 +- synapse/storage/databases/main/room.py | 40 ++--- 5 files changed, 36 insertions(+), 175 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index e9854bdd0f5..34aae7ef3ce 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -188,8 +188,7 @@ def __init__(self, hs: "HomeServer"): # were shut down. if not hs.config.worker.worker_app: run_as_background_process( - # "resume_sync_partial_state_room", - "rspsr", + "resume_sync_partial_state_room", self.server_name, self._resume_partial_state_room_sync, ) @@ -1789,17 +1788,15 @@ async def get_room_complexity( async def _resume_partial_state_room_sync(self) -> None: """Resumes resyncing of all partial-state rooms after a restart.""" - logger.info("asdf->_resume_partial_state_room_sync") assert not self.config.worker.worker_app partial_state_rooms = await self.store.get_partial_state_room_resync_info() - # for room_id, resync_info in partial_state_rooms.items(): - # self._start_partial_state_room_sync( - # initial_destination=resync_info.joined_via, - # other_destinations=resync_info.servers_in_room, - # room_id=room_id, - # ) - logger.info("asdf->_resume_partial_state_room_sync done") + for room_id, resync_info in partial_state_rooms.items(): + self._start_partial_state_room_sync( + initial_destination=resync_info.joined_via, + other_destinations=resync_info.servers_in_room, + room_id=room_id, + ) def _start_partial_state_room_sync( self, diff --git a/synapse/logging/context.py b/synapse/logging/context.py index b67adf31dab..f1d459b2fbf 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -49,8 +49,6 @@ overload, ) -import secrets -import string import attr from typing_extensions import ParamSpec from traceback import format_stack @@ -388,10 +386,6 @@ def __enter__(self) -> "LoggingContext": self.previous_context, format_stack()[-2] .replace("/home/eric/Documents/github/element/synapse/", "") - .replace( - "/home/eric/.cache/pypoetry/virtualenvs/matrix-synapse-xCtC9ulO-py3.13/lib/", - "", - ) .replace("\n", "->"), ) old_context = set_current_context(self) @@ -422,10 +416,6 @@ def __exit__( self.previous_context, format_stack()[-2] .replace("/home/eric/Documents/github/element/synapse/", "") - .replace( - "/home/eric/.cache/pypoetry/virtualenvs/matrix-synapse-xCtC9ulO-py3.13/lib/", - "", - ) .replace("\n", "->"), ) current = set_current_context(self.previous_context) @@ -645,39 +635,25 @@ def filter(self, record: logging.LogRecord) -> Literal[True]: return True -def random_string(length: int) -> str: - """Generate a cryptographically secure string of random letters. - - Drawn from the characters: `a-z` and `A-Z` - """ - return "".join(secrets.choice(string.ascii_letters) for _ in range(length)) - - class PreserveLoggingContext: """Context manager which replaces the logging context The previous logging context is restored on exit.""" - __slots__ = ["_old_context", "_new_context", "_instance_id"] + __slots__ = ["_old_context", "_new_context"] def __init__( self, new_context: LoggingContextOrSentinel = SENTINEL_CONTEXT ) -> None: self._new_context = new_context - self._instance_id = random_string(5) def __enter__(self) -> None: logger.debug( - "PreserveLoggingContext(%s) %s enter (old_context=%s) (source: %s)", + "PreserveLoggingContext(%s) enter (old_context=%s) (source: %s)", self._new_context, - self._instance_id, current_context(), format_stack()[-2] .replace("/home/eric/Documents/github/element/synapse/", "") - .replace( - "/home/eric/.cache/pypoetry/virtualenvs/matrix-synapse-xCtC9ulO-py3.13/lib/", - "", - ) .replace("\n", "->"), ) self._old_context = set_current_context(self._new_context) @@ -689,16 +665,11 @@ def __exit__( traceback: Optional[TracebackType], ) -> None: logger.debug( - "PreserveLoggingContext(%s) %s exit returning to old_context=%s (source: %s)", + "PreserveLoggingContext(%s) exit returning to old_context=%s (source: %s)", self._new_context, - self._instance_id, self._old_context, format_stack()[-2] .replace("/home/eric/Documents/github/element/synapse/", "") - .replace( - "/home/eric/.cache/pypoetry/virtualenvs/matrix-synapse-xCtC9ulO-py3.13/lib/", - "", - ) .replace("\n", "->"), ) context = set_current_context(self._old_context) @@ -744,16 +715,7 @@ def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSe current = current_context() logger.debug( - "| set_current_context(%s) old_context=%s - %s", - context, - current, - [ - x.replace("/home/eric/Documents/github/element/synapse/", "").replace( - "/home/eric/.cache/pypoetry/virtualenvs/matrix-synapse-xCtC9ulO-py3.13/lib/", - "", - ) - for x in format_stack() - ], + "set_current_context(%s) old_context=%s - %s", context, current, format_stack() ) if current is not context: @@ -875,25 +837,7 @@ def run_in_background( CRITICAL error about an unhandled error will be logged without much indication about where it came from. """ - instance_id = random_string(5) - stack = ( - format_stack()[-2] - .replace("/home/eric/Documents/github/element/synapse/", "") - .replace( - "/home/eric/.cache/pypoetry/virtualenvs/matrix-synapse-xCtC9ulO-py3.13/lib/", - "", - ) - .replace("\n", "->") - ) - current = current_context() - logger.info( - "asdf run_in_background1 %s start context=%s - %s", - instance_id, - current, - stack, - ) - try: res = f(*args, **kwargs) except Exception: @@ -924,12 +868,6 @@ def run_in_background( # The function may have reset the context before returning, so # we need to restore it now. - logger.info( - "asdf run_in_background2 %s restore log context=%s - %s", - instance_id, - current, - stack, - ) ctx = set_current_context(current) # The original context will be restored when the deferred @@ -944,17 +882,7 @@ def run_in_background( # which is supposed to have a single entry and exit point. But # by spawning off another deferred, we are effectively # adding a new exit point.) - - def _asdf(result: ResultT, context: LoggingContextOrSentinel) -> ResultT: - logger.info( - "asdf run_in_background3 %s reset log context at end ctx=%s - %s", - instance_id, - ctx, - stack, - ) - return _set_context_cb(result, context) - - d.addBoth(_asdf, ctx) + d.addBoth(_set_context_cb, ctx) return d @@ -973,15 +901,6 @@ def run_coroutine_in_background( do not run until called, and so calling an async function without awaiting cannot change the log contexts. """ - stack = ( - format_stack()[-2] - .replace("/home/eric/Documents/github/element/synapse/", "") - .replace( - "/home/eric/.cache/pypoetry/virtualenvs/matrix-synapse-xCtC9ulO-py3.13/lib/", - "", - ) - .replace("\n", "->") - ) current = current_context() d = defer.ensureDeferred(coroutine) @@ -1002,15 +921,7 @@ def run_coroutine_in_background( # which is supposed to have a single entry and exit point. But # by spawning off another deferred, we are effectively # adding a new exit point.) - def _asdf(result: ResultT, context: LoggingContextOrSentinel) -> ResultT: - logger.info( - "asdf run_coroutine_in_background reset log context at end ctx=%s - %s", - ctx, - stack, - ) - return _set_context_cb(result, context) - - d.addBoth(_asdf, ctx) + d.addBoth(_set_context_cb, ctx) return d @@ -1029,30 +940,6 @@ def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T] (This is more-or-less the opposite operation to run_in_background.) """ - instance_id = random_string(5) - stack = ( - format_stack()[-2] - .replace("/home/eric/Documents/github/element/synapse/", "") - .replace( - "/home/eric/.cache/pypoetry/virtualenvs/matrix-synapse-xCtC9ulO-py3.13/lib/", - "", - ) - .replace("\n", "->") - ) - full_stack = [ - x.replace("/home/eric/Documents/github/element/synapse/", "").replace( - "/home/eric/.cache/pypoetry/virtualenvs/matrix-synapse-xCtC9ulO-py3.13/lib/", - "", - ) - for x in format_stack() - ] - logger.info( - "asdf make_deferred_yieldable1 %s start - %s - full stack=%s", - instance_id, - stack, - full_stack, - ) - if deferred.called and not deferred.paused: # it looks like this deferred is ready to run any callbacks we give it # immediately. We may as well optimise out the logcontext faffery. @@ -1060,25 +947,8 @@ def make_deferred_yieldable(deferred: "defer.Deferred[T]") -> "defer.Deferred[T] # ok, we can't be sure that a yield won't block, so let's reset the # logcontext, and add a callback to the deferred to restore it. - logger.info( - "asdf make_deferred_yieldable2 %s reset log context - %s - full stack=%s", - instance_id, - stack, - full_stack, - ) prev_context = set_current_context(SENTINEL_CONTEXT) - - def _asdf(result: ResultT, context: LoggingContextOrSentinel) -> ResultT: - logger.info( - "asdf make_deferred_yieldable3 %s restore log context at end ctx=%s - %s - full stack=%s", - instance_id, - prev_context, - stack, - full_stack, - ) - return _set_context_cb(result, context) - - deferred.addBoth(_asdf, prev_context) + deferred.addBoth(_set_context_cb, prev_context) return deferred diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index b2f65928808..f7f2d88885e 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -48,7 +48,6 @@ ContextResourceUsage, LoggingContext, PreserveLoggingContext, - run_in_background, ) from synapse.logging.opentracing import SynapseTags, start_active_span from synapse.metrics import SERVER_NAME_LABEL @@ -245,9 +244,6 @@ def run_as_background_process( Note that the returned Deferred does not follow the synapse logcontext rules. """ - # TODO: Remove - if desc != "rspsr": - return async def run() -> Optional[R]: with _bg_metrics_lock: @@ -262,7 +258,7 @@ async def run() -> Optional[R]: ).inc() with BackgroundProcessLoggingContext( - name="bg-" + desc, server_name=server_name, instance_id=count + name=desc, server_name=server_name, instance_id=count ) as context: try: if bg_start_span: @@ -289,8 +285,6 @@ async def run() -> Optional[R]: # looping_call and other places that expect a Deferred. return defer.ensureDeferred(run()) - # return run_in_background(run) - P = ParamSpec("P") diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 63cda70b9ba..f7aec16c969 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1049,7 +1049,7 @@ def inner_func(conn: _PoolConnection, *args: P.args, **kwargs: P.kwargs) -> R: assert not self.engine.in_transaction(conn) with LoggingContext( - "db-" + str(curr_context), parent_context=parent_context + str(curr_context), parent_context=parent_context ) as context: with opentracing.start_active_span( operation_name="db.connection", diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index bb2e7eaa5b3..6ffc3aed34b 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1479,26 +1479,26 @@ async def get_partial_state_room_resync_info( for room_id, joined_via in rows: room_servers[room_id] = PartialStateResyncInfo(joined_via=joined_via) - # rows = cast( - # List[Tuple[str, str]], - # await self.db_pool.simple_select_list( - # "partial_state_rooms_servers", - # keyvalues=None, - # retcols=("room_id", "server_name"), - # desc="get_partial_state_rooms", - # ), - # ) - - # for room_id, server_name in rows: - # entry = room_servers.get(room_id) - # if entry is None: - # # There is a foreign key constraint which enforces that every room_id in - # # partial_state_rooms_servers appears in partial_state_rooms. So we - # # expect `entry` to be non-null. (This reasoning fails if we've - # # partial-joined between the two SELECTs, but this is unlikely to happen - # # in practice.) - # continue - # entry.servers_in_room.add(server_name) + rows = cast( + List[Tuple[str, str]], + await self.db_pool.simple_select_list( + "partial_state_rooms_servers", + keyvalues=None, + retcols=("room_id", "server_name"), + desc="get_partial_state_rooms", + ), + ) + + for room_id, server_name in rows: + entry = room_servers.get(room_id) + if entry is None: + # There is a foreign key constraint which enforces that every room_id in + # partial_state_rooms_servers appears in partial_state_rooms. So we + # expect `entry` to be non-null. (This reasoning fails if we've + # partial-joined between the two SELECTs, but this is unlikely to happen + # in practice.) + continue + entry.servers_in_room.add(server_name) return room_servers From 5c05a0b3099e977d2d031d0a43c57d998cef811f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 8 Sep 2025 21:12:48 -0500 Subject: [PATCH 24/33] Revert "DEBUG: lost `main` context after request" This reverts commit 7938e8cef4041cb754f8b21088e04c4ea1934994. --- synapse/app/homeserver.py | 1 - synapse/http/site.py | 7 ------ synapse/logging/context.py | 49 ++++---------------------------------- 3 files changed, 4 insertions(+), 53 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 019641aaa82..e027b5eaea2 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -415,7 +415,6 @@ def main() -> None: redirect_stdio_to_logs() run(hs) - logger.info("asdf outside main") if __name__ == "__main__": diff --git a/synapse/http/site.py b/synapse/http/site.py index 4cd7d0a59d1..55088fc190e 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -299,7 +299,6 @@ def get_authenticated_entity(self) -> Tuple[Optional[str], Optional[str]]: return None, None def render(self, resrc: Resource) -> None: - logger.info("asdf->SynapseRequest.render") # this is called once a Resource has been found to serve the request; in our # case the Resource in question will normally be a JsonResource. @@ -320,11 +319,6 @@ def render(self, resrc: Resource) -> None: user_agent=get_request_user_agent(self), ), ) - logger.info( - "asdf request logcontext=%s logcontext.previous_context=%s", - self.logcontext, - self.logcontext.previous_context, - ) # override the Server header which is set by twisted self.setHeader("Server", self.synapse_site.server_version_string) @@ -348,7 +342,6 @@ def render(self, resrc: Resource) -> None: servlet=self.request_metrics.name, **{SERVER_NAME_LABEL: self.our_server_name}, ).inc() - logger.info("asdf->SynapseRequest.render done") @contextlib.contextmanager def processing(self) -> Generator[None, None, None]: diff --git a/synapse/logging/context.py b/synapse/logging/context.py index f1d459b2fbf..7ea3f3d726d 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -51,7 +51,6 @@ import attr from typing_extensions import ParamSpec -from traceback import format_stack from twisted.internet import defer, threads from twisted.python.threadpool import ThreadPool @@ -380,14 +379,6 @@ def set_current_context( def __enter__(self) -> "LoggingContext": """Enters this logging context into thread local storage""" - logger.debug( - "LoggingContext(%s) enter (previous_context=%s) (source: %s)", - self, - self.previous_context, - format_stack()[-2] - .replace("/home/eric/Documents/github/element/synapse/", "") - .replace("\n", "->"), - ) old_context = set_current_context(self) if self.previous_context != old_context: logcontext_error( @@ -410,24 +401,13 @@ def __exit__( Returns: None to avoid suppressing any exceptions that were thrown. """ - logger.debug( - "LoggingContext(%s) exit: returning to previous_context=%s (source: %s)", - self, - self.previous_context, - format_stack()[-2] - .replace("/home/eric/Documents/github/element/synapse/", "") - .replace("\n", "->"), - ) current = set_current_context(self.previous_context) if current is not self: if current is SENTINEL_CONTEXT: - logcontext_error( - "LoggingContext: Expected logging context %s was lost" % (self,) - ) + logcontext_error("Expected logging context %s was lost" % (self,)) else: logcontext_error( - "LoggingContext: Expected logging context %s but found %s" - % (self, current) + "Expected logging context %s but found %s" % (self, current) ) # the fact that we are here suggests that the caller thinks that everything @@ -648,14 +628,6 @@ def __init__( self._new_context = new_context def __enter__(self) -> None: - logger.debug( - "PreserveLoggingContext(%s) enter (old_context=%s) (source: %s)", - self._new_context, - current_context(), - format_stack()[-2] - .replace("/home/eric/Documents/github/element/synapse/", "") - .replace("\n", "->"), - ) self._old_context = set_current_context(self._new_context) def __exit__( @@ -664,25 +636,16 @@ def __exit__( value: Optional[BaseException], traceback: Optional[TracebackType], ) -> None: - logger.debug( - "PreserveLoggingContext(%s) exit returning to old_context=%s (source: %s)", - self._new_context, - self._old_context, - format_stack()[-2] - .replace("/home/eric/Documents/github/element/synapse/", "") - .replace("\n", "->"), - ) context = set_current_context(self._old_context) if context != self._new_context: if not context: logcontext_error( - "PreserveLoggingContext: Expected logging context %s was lost" - % (self._new_context,) + "Expected logging context %s was lost" % (self._new_context,) ) else: logcontext_error( - "PreserveLoggingContext: Expected logging context %s but found %s" + "Expected logging context %s but found %s" % ( self._new_context, context, @@ -714,10 +677,6 @@ def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSe current = current_context() - logger.debug( - "set_current_context(%s) old_context=%s - %s", context, current, format_stack() - ) - if current is not context: rusage = get_thread_resource_usage() current.stop(rusage) From b8c0857263f01a1d55e4f896a6b9c9c9c3e8f63f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 8 Sep 2025 21:52:47 -0500 Subject: [PATCH 25/33] Re-establish logcontext at `start` We lose the `main` context once we start the reactor. We need to re-establish the logcontext at the reactor callback entrypoints. --- synapse/app/_base.py | 1 - synapse/app/generic_worker.py | 7 ++++++- synapse/app/homeserver.py | 16 +++++++++------- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 68622f7ac64..22cf76216d1 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -240,7 +240,6 @@ def redirect_stdio_to_logs() -> None: print("Redirected stdout/stderr to logs") -# TODO: Re-establish log context at this point def register_start( cb: Callable[P, Awaitable], *args: P.args, **kwargs: P.kwargs ) -> None: diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 4f5bea6bd67..543b26d8ba8 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -355,7 +355,12 @@ def start(config_options: List[str]) -> None: except Exception as e: handle_startup_exception(e) - register_start(_base.start, hs) + async def start() -> None: + # Re-establish log context now that we're back from the reactor + with LoggingContext("start"): + await _base.start(hs) + + register_start(start) # redirect stdio to the logs, if configured. if not hs.config.logging.no_redirect_stdio: diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index e027b5eaea2..dfc4a007197 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -377,15 +377,17 @@ def setup(config_options: List[str]) -> SynapseHomeServer: handle_startup_exception(e) async def start() -> None: - # Load the OIDC provider metadatas, if OIDC is enabled. - if hs.config.oidc.oidc_enabled: - oidc = hs.get_oidc_handler() - # Loading the provider metadata also ensures the provider config is valid. - await oidc.load_metadata() + # Re-establish log context now that we're back from the reactor + with LoggingContext("start"): + # Load the OIDC provider metadatas, if OIDC is enabled. + if hs.config.oidc.oidc_enabled: + oidc = hs.get_oidc_handler() + # Loading the provider metadata also ensures the provider config is valid. + await oidc.load_metadata() - await _base.start(hs) + await _base.start(hs) - hs.get_datastores().main.db_pool.updates.start_doing_background_updates() + hs.get_datastores().main.db_pool.updates.start_doing_background_updates() register_start(start) From 74ab47f97b4a1c7903874685a2f23af7d6311fae Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 8 Sep 2025 21:58:02 -0500 Subject: [PATCH 26/33] Add context for "Shutting down..." log --- synapse/app/_base.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 22cf76216d1..56282e5d1c8 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -80,7 +80,7 @@ from synapse.module_api.callbacks.third_party_event_rules_callbacks import ( load_legacy_third_party_event_rules, ) -from synapse.logging.context import PreserveLoggingContext +from synapse.logging.context import LoggingContext, PreserveLoggingContext from synapse.types import ISynapseReactor, StrCollection from synapse.util import SYNAPSE_VERSION from synapse.util.caches.lrucache import setup_expire_lru_cache_entries @@ -599,10 +599,12 @@ def run_sighup(*args: Any, **kwargs: Any) -> None: hs.get_datastores().main.db_pool.start_profiling() hs.get_pusherpool().start() + def log_shutdown() -> None: + with LoggingContext("log_shutdown"): + logger.info("Shutting down...") + # Log when we start the shut down process. - hs.get_reactor().addSystemEventTrigger( - "before", "shutdown", logger.info, "Shutting down..." - ) + hs.get_reactor().addSystemEventTrigger("before", "shutdown", log_shutdown) setup_sentry(hs) setup_sdnotify(hs) From e741daf24e22995dbd68e128081b11cf00e7ce86 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 8 Sep 2025 21:59:15 -0500 Subject: [PATCH 27/33] Simplify `atexit` logcontext --- synapse/util/daemonize.py | 23 +---------------------- 1 file changed, 1 insertion(+), 22 deletions(-) diff --git a/synapse/util/daemonize.py b/synapse/util/daemonize.py index 21ad8c6ce1f..76712a73629 100644 --- a/synapse/util/daemonize.py +++ b/synapse/util/daemonize.py @@ -150,30 +150,9 @@ def sigterm(signum: int, frame: Optional[FrameType]) -> NoReturn: signal.signal(signal.SIGTERM, sigterm) - # Create a logging context that we can use later as these `atexit` handlers will run - # after the `with LoggingContext("main")` context manager finishes and we still want - # some context here to know which server is logging. - # - # We're using `PreserveLoggingContext(SENTINEL_CONTEXT)` so our new `LoggingContext` - # ends up with `LoggingContext.previous_context = SENTINEL_CONTEXT` so that when the - # `LoggingContext` exits and restores the previous context, we don't leak some - # context into the reactor that would be erroneously be picked up by something else - # down the line. - with PreserveLoggingContext(SENTINEL_CONTEXT): - exit_logging_context = LoggingContext( - "atexit", - # TODO: In the future, we will want - # `server_name=calling_context.server_name` so we know which server this log - # pertains to, https://github.com/element-hq/synapse/pull/18868 - # - # No parent_context as we don't want to attribute the metrics/traces to the - # calling context. `atexit` is completely out-of-band from our application - # so it doesn't make sense to associate it back. - ) - # Cleanup pid file at exit. def exit() -> None: - with PreserveLoggingContext(exit_logging_context): + with LoggingContext("atexit"): logger.warning("Stopping daemon.") os.remove(pid_file) sys.exit(0) From fab546d6180ebc30879272a04488b52c85cc8a87 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 8 Sep 2025 22:04:57 -0500 Subject: [PATCH 28/33] Add logcontext to database profiling loop --- synapse/storage/database.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index f7aec16c969..068f2acba9f 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -697,22 +697,23 @@ def start_profiling(self) -> None: self._previous_loop_ts = monotonic_time() def loop() -> None: - curr = self._current_txn_total_time - prev = self._previous_txn_total_time - self._previous_txn_total_time = curr + with LoggingContext("db.profiling"): + curr = self._current_txn_total_time + prev = self._previous_txn_total_time + self._previous_txn_total_time = curr - time_now = monotonic_time() - time_then = self._previous_loop_ts - self._previous_loop_ts = time_now + time_now = monotonic_time() + time_then = self._previous_loop_ts + self._previous_loop_ts = time_now - duration = time_now - time_then - ratio = (curr - prev) / duration + duration = time_now - time_then + ratio = (curr - prev) / duration - top_three_counters = self._txn_perf_counters.interval(duration, limit=3) + top_three_counters = self._txn_perf_counters.interval(duration, limit=3) - perf_logger.debug( - "Total database time: %.3f%% {%s}", ratio * 100, top_three_counters - ) + perf_logger.debug( + "Total database time: %.3f%% {%s}", ratio * 100, top_three_counters + ) self._clock.looping_call(loop, 10000) From e626f093ad6cc9774c7b061ee72ef5554c294a4a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 9 Sep 2025 00:15:03 -0500 Subject: [PATCH 29/33] Add changelog --- changelog.d/18870.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/18870.misc diff --git a/changelog.d/18870.misc b/changelog.d/18870.misc new file mode 100644 index 00000000000..e4c9d41c828 --- /dev/null +++ b/changelog.d/18870.misc @@ -0,0 +1 @@ +Refactor logging to use specific logging contexts, reserving the `sentinel` context only for Twisted reactor bookkeeping. From 6bbd0bf6866f36bb8e32c06effd95ce47d404d9f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Sep 2025 15:14:24 -0500 Subject: [PATCH 30/33] Revise changelog --- changelog.d/18870.misc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/18870.misc b/changelog.d/18870.misc index e4c9d41c828..e54ba4f37ae 100644 --- a/changelog.d/18870.misc +++ b/changelog.d/18870.misc @@ -1 +1 @@ -Refactor logging to use specific logging contexts, reserving the `sentinel` context only for Twisted reactor bookkeeping. +Remove `sentinel` logcontext usage where we log in `setup`, `start` and exit. From 9c74f6919e01264b3049a60a8a9ec5bc69b1dd54 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Sep 2025 15:14:53 -0500 Subject: [PATCH 31/33] Fix lints --- synapse/app/_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/app/_base.py b/synapse/app/_base.py index ec40d2e8f53..cf3d260e65e 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -72,6 +72,7 @@ from synapse.events.presence_router import load_legacy_presence_router from synapse.handlers.auth import load_legacy_password_auth_providers from synapse.http.site import SynapseSite +from synapse.logging.context import LoggingContext, PreserveLoggingContext from synapse.logging.opentracing import init_tracer from synapse.metrics import install_gc_manager, register_threadpool from synapse.metrics.background_process_metrics import run_as_background_process @@ -80,7 +81,6 @@ from synapse.module_api.callbacks.third_party_event_rules_callbacks import ( load_legacy_third_party_event_rules, ) -from synapse.logging.context import LoggingContext, PreserveLoggingContext from synapse.types import ISynapseReactor, StrCollection from synapse.util import SYNAPSE_VERSION from synapse.util.caches.lrucache import setup_expire_lru_cache_entries From e6685cc6ec925a487b8aaf59fff8399e90a4814f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Sep 2025 15:23:31 -0500 Subject: [PATCH 32/33] Remove db profiling `looping_call` change See https://github.com/element-hq/synapse/pull/18870#discussion_r2337807988 --- synapse/storage/database.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 72863c37741..cfec36e0fa1 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -691,23 +691,22 @@ def start_profiling(self) -> None: self._previous_loop_ts = monotonic_time() def loop() -> None: - with LoggingContext("db.profiling"): - curr = self._current_txn_total_time - prev = self._previous_txn_total_time - self._previous_txn_total_time = curr + curr = self._current_txn_total_time + prev = self._previous_txn_total_time + self._previous_txn_total_time = curr - time_now = monotonic_time() - time_then = self._previous_loop_ts - self._previous_loop_ts = time_now + time_now = monotonic_time() + time_then = self._previous_loop_ts + self._previous_loop_ts = time_now - duration = time_now - time_then - ratio = (curr - prev) / duration + duration = time_now - time_then + ratio = (curr - prev) / duration - top_three_counters = self._txn_perf_counters.interval(duration, limit=3) + top_three_counters = self._txn_perf_counters.interval(duration, limit=3) - perf_logger.debug( - "Total database time: %.3f%% {%s}", ratio * 100, top_three_counters - ) + perf_logger.debug( + "Total database time: %.3f%% {%s}", ratio * 100, top_three_counters + ) self._clock.looping_call(loop, 10000) From 1f95a3b8674c2c2db2f6a1b9d93e155cad29ac69 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Sep 2025 15:28:37 -0500 Subject: [PATCH 33/33] Prefer `PreserveLoggingContext()` defaults See https://github.com/element-hq/synapse/pull/18870#discussion_r2337820664 --- synapse/util/daemonize.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/util/daemonize.py b/synapse/util/daemonize.py index 76712a73629..e653abff97b 100644 --- a/synapse/util/daemonize.py +++ b/synapse/util/daemonize.py @@ -30,7 +30,6 @@ from typing import NoReturn, Optional, Type from synapse.logging.context import ( - SENTINEL_CONTEXT, LoggingContext, PreserveLoggingContext, ) @@ -75,7 +74,7 @@ def daemonize_process(pid_file: str, logger: logging.Logger, chdir: str = "/") - # because we're comparing the resource usage from the original process to the forked # process. `PreserveLoggingContext` already takes care of restarting the original # context *after* the block. - with PreserveLoggingContext(SENTINEL_CONTEXT): + with PreserveLoggingContext(): # Fork, creating a new process for the child. process_id = os.fork()