From 285f19295df3a0fadf17a1a0e5e594b2a522cae3 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 5 Aug 2025 10:28:55 -0500 Subject: [PATCH 01/21] Fix `LaterGauge` metrics to collect from all servers (#18751) Fix `LaterGauge` metrics to collect from all servers Follow-up to https://github.com/element-hq/synapse/pull/18714 Previously, our `LaterGauge` metrics did include the `server_name` label as expected but we were only seeing the last server being reported in some cases. Any `LaterGauge` that we were creating multiple times was only reporting the last instance. This PR updates all `LaterGauge` to be created once and then we use `LaterGauge.register_hook(...)` to add in the metric callback as before. This works now because we store a list of callbacks instead of just one. I noticed this problem thanks to some [tests in the Synapse Pro for Small Hosts](https://github.com/element-hq/synapse-small-hosts/pull/173) repo that sanity check all metrics to ensure that we can see each metric includes data from multiple servers. ### Testing strategy 1. This is only noticeable when you run multiple Synapse instances in the same process. 1. TODO (see test that was added) ### Dev notes Previous non-global `LaterGauge`: ``` synapse_federation_send_queue_xxx synapse_federation_transaction_queue_pending_destinations synapse_federation_transaction_queue_pending_pdus synapse_federation_transaction_queue_pending_edus synapse_handlers_presence_user_to_current_state_size synapse_handlers_presence_wheel_timer_size synapse_notifier_listeners synapse_notifier_rooms synapse_notifier_users synapse_replication_tcp_resource_total_connections synapse_replication_tcp_command_queue synapse_background_update_status synapse_federation_known_servers synapse_scheduler_running_tasks ``` ### Pull Request Checklist * [x] Pull request is based on the develop branch * [x] Pull request includes a [changelog file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog). The entry should: - Be a short description of your change which makes sense to users. "Fixed a bug that prevented receiving messages from other servers." instead of "Moved X method from `EventStore` to `EventWorkerStore`.". - Use markdown where necessary, mostly for `code blocks`. - End with either a period (.) or an exclamation mark (!). - Start with a capital letter. - Feel free to credit yourself, by adding a sentence "Contributed by @github_username." or "Contributed by [Your Name]." to the end of the entry. * [x] [Code style](https://element-hq.github.io/synapse/latest/code_style.html) is correct (run the [linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters)) --- changelog.d/18751.misc | 1 + synapse/federation/send_queue.py | 42 +++++++----- synapse/federation/sender/__init__.py | 46 +++++++------ synapse/handlers/presence.py | 26 +++++--- synapse/http/request_metrics.py | 4 +- synapse/metrics/__init__.py | 68 +++++++++----------- synapse/notifier.py | 42 ++++++------ synapse/replication/tcp/handler.py | 28 ++++---- synapse/replication/tcp/protocol.py | 24 ++++--- synapse/storage/database.py | 13 ++-- synapse/storage/databases/main/roommember.py | 14 ++-- synapse/util/ratelimitutils.py | 16 +++-- synapse/util/task_scheduler.py | 14 ++-- tests/metrics/test_metrics.py | 44 ++++++++++++- 14 files changed, 241 insertions(+), 141 deletions(-) create mode 100644 changelog.d/18751.misc diff --git a/changelog.d/18751.misc b/changelog.d/18751.misc new file mode 100644 index 00000000000..6ecd4982867 --- /dev/null +++ b/changelog.d/18751.misc @@ -0,0 +1 @@ +Fix `LaterGauge` metrics to collect from all servers. diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 7f511d570c0..1e9722e0d41 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -37,6 +37,7 @@ """ import logging +from enum import Enum from typing import ( TYPE_CHECKING, Dict, @@ -67,6 +68,25 @@ logger = logging.getLogger(__name__) +class QueueNames(str, Enum): + PRESENCE_MAP = "presence_map" + KEYED_EDU = "keyed_edu" + KEYED_EDU_CHANGED = "keyed_edu_changed" + EDUS = "edus" + POS_TIME = "pos_time" + PRESENCE_DESTINATIONS = "presence_destinations" + + +queue_name_to_gauge_map: Dict[QueueNames, LaterGauge] = {} + +for queue_name in QueueNames: + queue_name_to_gauge_map[queue_name] = LaterGauge( + name=f"synapse_federation_send_queue_{queue_name.value}_size", + desc="", + labelnames=[SERVER_NAME_LABEL], + ) + + class FederationRemoteSendQueue(AbstractFederationSender): """A drop in replacement for FederationSender""" @@ -111,23 +131,15 @@ def __init__(self, hs: "HomeServer"): # we make a new function, so we need to make a new function so the inner # lambda binds to the queue rather than to the name of the queue which # changes. ARGH. - def register(name: str, queue: Sized) -> None: - LaterGauge( - name="synapse_federation_send_queue_%s_size" % (queue_name,), - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): len(queue)}, + def register(queue_name: QueueNames, queue: Sized) -> None: + queue_name_to_gauge_map[queue_name].register_hook( + lambda: {(self.server_name,): len(queue)} ) - for queue_name in [ - "presence_map", - "keyed_edu", - "keyed_edu_changed", - "edus", - "pos_time", - "presence_destinations", - ]: - register(queue_name, getattr(self, queue_name)) + for queue_name in QueueNames: + queue = getattr(self, queue_name.value) + assert isinstance(queue, Sized) + register(queue_name, queue=queue) self.clock.looping_call(self._clear_queue, 30 * 1000) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 8befbe37222..21af1235432 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -199,6 +199,24 @@ labelnames=[SERVER_NAME_LABEL], ) +transaction_queue_pending_destinations_gauge = LaterGauge( + name="synapse_federation_transaction_queue_pending_destinations", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + +transaction_queue_pending_pdus_gauge = LaterGauge( + name="synapse_federation_transaction_queue_pending_pdus", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + +transaction_queue_pending_edus_gauge = LaterGauge( + name="synapse_federation_transaction_queue_pending_edus", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + # Time (in s) to wait before trying to wake up destinations that have # catch-up outstanding. # Please note that rate limiting still applies, so while the loop is @@ -398,38 +416,28 @@ def __init__(self, hs: "HomeServer"): # map from destination to PerDestinationQueue self._per_destination_queues: Dict[str, PerDestinationQueue] = {} - LaterGauge( - name="synapse_federation_transaction_queue_pending_destinations", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: { + transaction_queue_pending_destinations_gauge.register_hook( + lambda: { (self.server_name,): sum( 1 for d in self._per_destination_queues.values() if d.transmission_loop_running ) - }, + } ) - - LaterGauge( - name="synapse_federation_transaction_queue_pending_pdus", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: { + transaction_queue_pending_pdus_gauge.register_hook( + lambda: { (self.server_name,): sum( d.pending_pdu_count() for d in self._per_destination_queues.values() ) - }, + } ) - LaterGauge( - name="synapse_federation_transaction_queue_pending_edus", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: { + transaction_queue_pending_edus_gauge.register_hook( + lambda: { (self.server_name,): sum( d.pending_edu_count() for d in self._per_destination_queues.values() ) - }, + } ) self._is_processing = False diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index b253117498a..fb9f962672f 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -173,6 +173,18 @@ labelnames=["locality", "from", "to", SERVER_NAME_LABEL], ) +presence_user_to_current_state_size_gauge = LaterGauge( + name="synapse_handlers_presence_user_to_current_state_size", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + +presence_wheel_timer_size_gauge = LaterGauge( + name="synapse_handlers_presence_wheel_timer_size", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + # If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them # "currently_active" LAST_ACTIVE_GRANULARITY = 60 * 1000 @@ -779,11 +791,8 @@ def __init__(self, hs: "HomeServer"): EduTypes.PRESENCE, self.incoming_presence ) - LaterGauge( - name="synapse_handlers_presence_user_to_current_state_size", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): len(self.user_to_current_state)}, + presence_user_to_current_state_size_gauge.register_hook( + lambda: {(self.server_name,): len(self.user_to_current_state)} ) # The per-device presence state, maps user to devices to per-device presence state. @@ -882,11 +891,8 @@ def __init__(self, hs: "HomeServer"): 60 * 1000, ) - LaterGauge( - name="synapse_handlers_presence_wheel_timer_size", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): len(self.wheel_timer)}, + presence_wheel_timer_size_gauge.register_hook( + lambda: {(self.server_name,): len(self.wheel_timer)} ) # Used to handle sending of presence to newly joined users/servers diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index a9b049f9043..c5274c758b9 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -164,12 +164,12 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]: return counts -LaterGauge( +in_flight_requests = LaterGauge( name="synapse_http_server_in_flight_requests_count", desc="", labelnames=["method", "servlet", SERVER_NAME_LABEL], - caller=_get_in_flight_counts, ) +in_flight_requests.register_hook(_get_in_flight_counts) class RequestMetrics: diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 11e2551a168..8c99d3c7700 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -31,6 +31,7 @@ Dict, Generic, Iterable, + List, Mapping, Optional, Sequence, @@ -73,8 +74,6 @@ METRICS_PREFIX = "/_synapse/metrics" -all_gauges: Dict[str, Collector] = {} - HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") SERVER_NAME_LABEL = "server_name" @@ -163,42 +162,47 @@ class LaterGauge(Collector): name: str desc: str labelnames: Optional[StrSequence] = attr.ib(hash=False) - # callback: should either return a value (if there are no labels for this metric), - # or dict mapping from a label tuple to a value - caller: Callable[ - [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] - ] + # List of callbacks: each callback should either return a value (if there are no + # labels for this metric), or dict mapping from a label tuple to a value + _hooks: List[ + Callable[ + [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] + ] + ] = attr.ib(factory=list, hash=False) def collect(self) -> Iterable[Metric]: # The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself # (we don't enforce it here, one level up). g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label] - try: - calls = self.caller() - except Exception: - logger.exception("Exception running callback for LaterGauge(%s)", self.name) - yield g - return + for hook in self._hooks: + try: + hook_result = hook() + except Exception: + logger.exception( + "Exception running callback for LaterGauge(%s)", self.name + ) + yield g + return + + if isinstance(hook_result, (int, float)): + g.add_metric([], hook_result) + else: + for k, v in hook_result.items(): + g.add_metric(k, v) - if isinstance(calls, (int, float)): - g.add_metric([], calls) - else: - for k, v in calls.items(): - g.add_metric(k, v) + yield g - yield g + def register_hook( + self, + hook: Callable[ + [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] + ], + ) -> None: + self._hooks.append(hook) def __attrs_post_init__(self) -> None: - self._register() - - def _register(self) -> None: - if self.name in all_gauges.keys(): - logger.warning("%s already registered, reregistering", self.name) - REGISTRY.unregister(all_gauges.pop(self.name)) - REGISTRY.register(self) - all_gauges[self.name] = self # `MetricsEntry` only makes sense when it is a `Protocol`, @@ -250,7 +254,7 @@ def __init__( # Protects access to _registrations self._lock = threading.Lock() - self._register_with_collector() + REGISTRY.register(self) def register( self, @@ -341,14 +345,6 @@ def collect(self) -> Iterable[Metric]: gauge.add_metric(labels=key, value=getattr(metrics, name)) yield gauge - def _register_with_collector(self) -> None: - if self.name in all_gauges.keys(): - logger.warning("%s already registered, reregistering", self.name) - REGISTRY.unregister(all_gauges.pop(self.name)) - - REGISTRY.register(self) - all_gauges[self.name] = self - class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily): """ diff --git a/synapse/notifier.py b/synapse/notifier.py index 448a715e2a8..d56a7b26bbb 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -86,6 +86,24 @@ labelnames=["stream", SERVER_NAME_LABEL], ) + +notifier_listeners_gauge = LaterGauge( + name="synapse_notifier_listeners", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + +notifier_rooms_gauge = LaterGauge( + name="synapse_notifier_rooms", + desc="", + labelnames=[SERVER_NAME_LABEL], +) +notifier_users_gauge = LaterGauge( + name="synapse_notifier_users", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + T = TypeVar("T") @@ -281,28 +299,16 @@ def count_listeners() -> Mapping[Tuple[str, ...], int]: ) } - LaterGauge( - name="synapse_notifier_listeners", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=count_listeners, - ) - - LaterGauge( - name="synapse_notifier_rooms", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: { + notifier_listeners_gauge.register_hook(count_listeners) + notifier_rooms_gauge.register_hook( + lambda: { (self.server_name,): count( bool, list(self.room_to_user_streams.values()) ) - }, + } ) - LaterGauge( - name="synapse_notifier_users", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): len(self.user_to_user_stream)}, + notifier_users_gauge.register_hook( + lambda: {(self.server_name,): len(self.user_to_user_stream)} ) def add_replication_callback(self, cb: Callable[[], None]) -> None: diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 0f14c7e3804..f033eaaeb57 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -106,6 +106,18 @@ "synapse_replication_tcp_resource_user_ip_cache", "", labelnames=[SERVER_NAME_LABEL] ) +tcp_resource_total_connections_gauge = LaterGauge( + name="synapse_replication_tcp_resource_total_connections", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + +tcp_command_queue_gauge = LaterGauge( + name="synapse_replication_tcp_command_queue", + desc="Number of inbound RDATA/POSITION commands queued for processing", + labelnames=["stream_name", SERVER_NAME_LABEL], +) + # the type of the entries in _command_queues_by_stream _StreamCommandQueue = Deque[ @@ -243,11 +255,8 @@ def __init__(self, hs: "HomeServer"): # outgoing replication commands to.) self._connections: List[IReplicationConnection] = [] - LaterGauge( - name="synapse_replication_tcp_resource_total_connections", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): len(self._connections)}, + tcp_resource_total_connections_gauge.register_hook( + lambda: {(self.server_name,): len(self._connections)} ) # When POSITION or RDATA commands arrive, we stick them in a queue and process @@ -266,14 +275,11 @@ def __init__(self, hs: "HomeServer"): # from that connection. self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {} - LaterGauge( - name="synapse_replication_tcp_command_queue", - desc="Number of inbound RDATA/POSITION commands queued for processing", - labelnames=["stream_name", SERVER_NAME_LABEL], - caller=lambda: { + tcp_command_queue_gauge.register_hook( + lambda: { (stream_name, self.server_name): len(queue) for stream_name, queue in self._command_queues_by_stream.items() - }, + } ) self._is_master = hs.config.worker.worker_app is None diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 969f0303e05..4d8381646aa 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -527,9 +527,11 @@ def replicate(self) -> None: name="synapse_replication_tcp_protocol_pending_commands", desc="", labelnames=["name", SERVER_NAME_LABEL], - caller=lambda: { +) +pending_commands.register_hook( + lambda: { (p.name, p.server_name): len(p.pending_commands) for p in connected_connections - }, + } ) @@ -544,9 +546,11 @@ def transport_buffer_size(protocol: BaseReplicationStreamProtocol) -> int: name="synapse_replication_tcp_protocol_transport_send_buffer", desc="", labelnames=["name", SERVER_NAME_LABEL], - caller=lambda: { +) +transport_send_buffer.register_hook( + lambda: { (p.name, p.server_name): transport_buffer_size(p) for p in connected_connections - }, + } ) @@ -571,10 +575,12 @@ def transport_kernel_read_buffer_size( name="synapse_replication_tcp_protocol_transport_kernel_send_buffer", desc="", labelnames=["name", SERVER_NAME_LABEL], - caller=lambda: { +) +tcp_transport_kernel_send_buffer.register_hook( + lambda: { (p.name, p.server_name): transport_kernel_read_buffer_size(p, False) for p in connected_connections - }, + } ) @@ -582,8 +588,10 @@ def transport_kernel_read_buffer_size( name="synapse_replication_tcp_protocol_transport_kernel_read_buffer", desc="", labelnames=["name", SERVER_NAME_LABEL], - caller=lambda: { +) +tcp_transport_kernel_read_buffer.register_hook( + lambda: { (p.name, p.server_name): transport_kernel_read_buffer_size(p, True) for p in connected_connections - }, + } ) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index f7aec16c969..bbdc5b9d278 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -100,6 +100,12 @@ labelnames=["desc", SERVER_NAME_LABEL], ) +background_update_status = LaterGauge( + name="synapse_background_update_status", + desc="Background update status", + labelnames=[SERVER_NAME_LABEL], +) + # Unique indexes which have been added in background updates. Maps from table name # to the name of the background update which added the unique index to that table. @@ -611,11 +617,8 @@ def __init__( ) self.updates = BackgroundUpdater(hs, self) - LaterGauge( - name="synapse_background_update_status", - desc="Background update status", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): self.updates.get_status()}, + background_update_status.register_hook( + lambda: {(self.server_name,): self.updates.get_status()}, ) self._previous_txn_total_time = 0.0 diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 654250fadc5..94a1274edb0 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -84,6 +84,13 @@ _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE = 1000 +federation_known_servers_gauge = LaterGauge( + name="synapse_federation_known_servers", + desc="", + labelnames=[SERVER_NAME_LABEL], +) + + @attr.s(frozen=True, slots=True, auto_attribs=True) class EventIdMembership: """Returned by `get_membership_from_event_ids`""" @@ -116,11 +123,8 @@ def __init__( 1, self._count_known_servers, ) - LaterGauge( - name="synapse_federation_known_servers", - desc="", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): self._known_servers_count}, + federation_known_servers_gauge.register_hook( + lambda: {(self.server_name,): self._known_servers_count} ) @wrap_as_background_process("_count_known_servers") diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index f5e592d80ee..b3c65676c6d 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -131,27 +131,31 @@ def _get_counts_from_rate_limiter_instance( # We track the number of affected hosts per time-period so we can # differentiate one really noisy homeserver from a general # ratelimit tuning problem across the federation. -LaterGauge( +sleep_affected_hosts_gauge = LaterGauge( name="synapse_rate_limit_sleep_affected_hosts", desc="Number of hosts that had requests put to sleep", labelnames=["rate_limiter_name", SERVER_NAME_LABEL], - caller=lambda: _get_counts_from_rate_limiter_instance( +) +sleep_affected_hosts_gauge.register_hook( + lambda: _get_counts_from_rate_limiter_instance( lambda rate_limiter_instance: sum( ratelimiter.should_sleep() for ratelimiter in rate_limiter_instance.ratelimiters.values() ) - ), + ) ) -LaterGauge( +reject_affected_hosts_gauge = LaterGauge( name="synapse_rate_limit_reject_affected_hosts", desc="Number of hosts that had requests rejected", labelnames=["rate_limiter_name", SERVER_NAME_LABEL], - caller=lambda: _get_counts_from_rate_limiter_instance( +) +reject_affected_hosts_gauge.register_hook( + lambda: _get_counts_from_rate_limiter_instance( lambda rate_limiter_instance: sum( ratelimiter.should_reject() for ratelimiter in rate_limiter_instance.ratelimiters.values() ) - ), + ) ) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index fdcacdf1289..904f99fa426 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -44,6 +44,13 @@ logger = logging.getLogger(__name__) +running_tasks_gauge = LaterGauge( + name="synapse_scheduler_running_tasks", + desc="The number of concurrent running tasks handled by the TaskScheduler", + labelnames=[SERVER_NAME_LABEL], +) + + class TaskScheduler: """ This is a simple task scheduler designed for resumable tasks. Normally, @@ -130,11 +137,8 @@ def __init__(self, hs: "HomeServer"): TaskScheduler.SCHEDULE_INTERVAL_MS, ) - LaterGauge( - name="synapse_scheduler_running_tasks", - desc="The number of concurrent running tasks handled by the TaskScheduler", - labelnames=[SERVER_NAME_LABEL], - caller=lambda: {(self.server_name,): len(self._running_tasks)}, + running_tasks_gauge.register_hook( + lambda: {(self.server_name,): len(self._running_tasks)} ) def register_action( diff --git a/tests/metrics/test_metrics.py b/tests/metrics/test_metrics.py index 61874564a6b..5a3c3c1c4ed 100644 --- a/tests/metrics/test_metrics.py +++ b/tests/metrics/test_metrics.py @@ -22,7 +22,13 @@ from prometheus_client.core import Sample -from synapse.metrics import REGISTRY, InFlightGauge, generate_latest +from synapse.metrics import ( + REGISTRY, + SERVER_NAME_LABEL, + InFlightGauge, + LaterGauge, + generate_latest, +) from synapse.util.caches.deferred_cache import DeferredCache from tests import unittest @@ -285,6 +291,42 @@ def test_cache_metric_multiple_servers(self) -> None: self.assertEqual(hs2_cache_max_size_metric_value, "777.0") +class LaterGaugeTests(unittest.HomeserverTestCase): + def test_later_gauge_multiple_servers(self) -> None: + """ + Test that LaterGauge metrics are reported correctly across multiple servers. We + will have an metrics entry for each homeserver that is labeled with the + `server_name` label. + """ + later_gauge = LaterGauge( + name="foo", + desc="", + labelnames=[SERVER_NAME_LABEL], + ) + later_gauge.register_hook(lambda: {("hs1",): 1}) + later_gauge.register_hook(lambda: {("hs2",): 2}) + + metrics_map = get_latest_metrics() + + # Find the metrics for the caches from both homeservers + hs1_metric = 'foo{server_name="hs1"}' + hs1_metric_value = metrics_map.get(hs1_metric) + self.assertIsNotNone( + hs1_metric_value, + f"Missing metric {hs1_metric} in cache metrics {metrics_map}", + ) + hs2_metric = 'foo{server_name="hs2"}' + hs2_metric_value = metrics_map.get(hs2_metric) + self.assertIsNotNone( + hs2_metric_value, + f"Missing metric {hs2_metric} in cache metrics {metrics_map}", + ) + + # Sanity check the metric values + self.assertEqual(hs1_metric_value, "1.0") + self.assertEqual(hs2_metric_value, "2.0") + + def get_latest_metrics() -> Dict[str, str]: """ Collect the latest metrics from the registry and parse them into an easy to use map. From 4dda2b12ed0267f0539485ff2369eee247c5e3fd Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 7 Aug 2025 16:10:41 -0500 Subject: [PATCH 02/21] First pass: keep track of hooks per server --- synapse/metrics/__init__.py | 63 ++++++++++++++++++++++++++++++++----- tests/unittest.py | 6 ++++ 2 files changed, 61 insertions(+), 8 deletions(-) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 8c99d3c7700..21562cb7bb0 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -31,7 +31,6 @@ Dict, Generic, Iterable, - List, Mapping, Optional, Sequence, @@ -162,20 +161,23 @@ class LaterGauge(Collector): name: str desc: str labelnames: Optional[StrSequence] = attr.ib(hash=False) - # List of callbacks: each callback should either return a value (if there are no - # labels for this metric), or dict mapping from a label tuple to a value - _hooks: List[ + _server_name_to_hook_map: Dict[ + str, # server_name Callable[ [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] - ] - ] = attr.ib(factory=list, hash=False) + ], + ] = attr.ib(factory=dict, hash=False) + """ + Map from server_name to a callback. Each callback should either return a value (if + there are no labels for this metric), or dict mapping from a label tuple to a value + """ def collect(self) -> Iterable[Metric]: # The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself # (we don't enforce it here, one level up). g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label] - for hook in self._hooks: + for hook in self._server_name_to_hook_map.values(): try: hook_result = hook() except Exception: @@ -195,15 +197,60 @@ def collect(self) -> Iterable[Metric]: def register_hook( self, + server_name: str, hook: Callable[ [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] ], ) -> None: - self._hooks.append(hook) + """ + Register a callback/hook that will be called to generate a metric samples for + the gauge. + + Args: + server_name: The homeserver name (`hs.hostname`) this hook is associated + with. This can be used later to lookup all hooks associated with a given + server name in order to unregister them. + hook: A callback that should either return a value (if there are no + labels for this metric), or dict mapping from a label tuple to a value + """ + # We shouldn't have multiple hooks registered for the same `server_name`. + existing_hook = self._server_name_to_hook_map.get(server_name) + assert existing_hook is None, ( + f"LaterGauge(name={self.name}) hook already registered for server_name={server_name}. " + "This is likely a Synapse bug and you forgot to unregister the previous hooks for " + "the server (especially in tests)." + ) + + self._server_name_to_hook_map[server_name] = hook + + def unregister_hooks_for_server_name(self, server_name: str) -> None: + """ + Unregister all hooks associated with the given `server_name`. This should be + called when a homeserver is shutdown to avoid extra hooks sitting around. + + Args: + server_name: The homeserver name to unregister hooks for (`hs.hostname`). + """ + if server_name in self._server_name_to_hook_map: + del self._server_name_to_hook_map[server_name] def __attrs_post_init__(self) -> None: REGISTRY.register(self) + # We shouldn't have multiple metrics with the same name. Typically, metrics + # should be created globally so you shouldn't be running into this. + existing_gauge = all_later_gauges_to_clean_up_on_shutdown.get(self.name) + assert existing_gauge is None, f"LaterGauge(name={self.name}) already exists. " + + # Keep track of the gauge so we can clean it up later. + all_later_gauges_to_clean_up_on_shutdown[self.name] = self + + +all_later_gauges_to_clean_up_on_shutdown: Dict[str, LaterGauge] = {} +""" +Keep track of all `LaterGauge` that we should look through when we shutdown a homeserver. +""" + # `MetricsEntry` only makes sense when it is a `Protocol`, # but `Protocol` can't be used as a `TypeVar` bound. diff --git a/tests/unittest.py b/tests/unittest.py index 5e6957dc6d2..a3ce6519640 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -77,6 +77,7 @@ current_context, set_current_context, ) +from synapse.metrics import all_later_gauges_to_clean_up_on_shutdown from synapse.rest import RegisterServletsFunc from synapse.server import HomeServer from synapse.storage.keys import FetchKeyResult @@ -471,6 +472,11 @@ async def get_requester(*args: Any, **kwargs: Any) -> Requester: self.prepare(self.reactor, self.clock, self.hs) def tearDown(self) -> None: + for later_gauge in all_later_gauges_to_clean_up_on_shutdown: + later_gauge.unregister_hooks_for_server_name( + self.hs.config.server.server_name + ) + # Reset to not use frozen dicts. events.USE_FROZEN_DICTS = False From 505263df43923e47d0f8b1c698e79b5202d28a69 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 7 Aug 2025 16:26:13 -0500 Subject: [PATCH 03/21] Some fix-ups --- synapse/metrics/__init__.py | 19 ++++++++++++------- tests/unittest.py | 2 +- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 21562cb7bb0..a45ff77f233 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -162,7 +162,7 @@ class LaterGauge(Collector): desc: str labelnames: Optional[StrSequence] = attr.ib(hash=False) _server_name_to_hook_map: Dict[ - str, # server_name + Optional[str], # server_name Callable[ [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] ], @@ -197,7 +197,8 @@ def collect(self) -> Iterable[Metric]: def register_hook( self, - server_name: str, + *, + server_name: Optional[str], hook: Callable[ [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] ], @@ -209,7 +210,8 @@ def register_hook( Args: server_name: The homeserver name (`hs.hostname`) this hook is associated with. This can be used later to lookup all hooks associated with a given - server name in order to unregister them. + server name in order to unregister them. This should only be omitted + for global hooks that work across all homeservers. hook: A callback that should either return a value (if there are no labels for this metric), or dict mapping from a label tuple to a value """ @@ -231,14 +233,16 @@ def unregister_hooks_for_server_name(self, server_name: str) -> None: Args: server_name: The homeserver name to unregister hooks for (`hs.hostname`). """ - if server_name in self._server_name_to_hook_map: - del self._server_name_to_hook_map[server_name] + self._server_name_to_hook_map.pop(server_name, None) def __attrs_post_init__(self) -> None: REGISTRY.register(self) # We shouldn't have multiple metrics with the same name. Typically, metrics - # should be created globally so you shouldn't be running into this. + # should be created globally so you shouldn't be running into this and this will + # catch any stupid mistakes. The `REGISTRY.register(self)` call above will also + # raise an error if the metric already exists but to make things explicit, we'll + # also check here. existing_gauge = all_later_gauges_to_clean_up_on_shutdown.get(self.name) assert existing_gauge is None, f"LaterGauge(name={self.name}) already exists. " @@ -248,7 +252,8 @@ def __attrs_post_init__(self) -> None: all_later_gauges_to_clean_up_on_shutdown: Dict[str, LaterGauge] = {} """ -Keep track of all `LaterGauge` that we should look through when we shutdown a homeserver. +Track all `LaterGauge` instances so we can remove any associated hooks during homeserver +shutdown. """ diff --git a/tests/unittest.py b/tests/unittest.py index a3ce6519640..6988004578c 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -472,7 +472,7 @@ async def get_requester(*args: Any, **kwargs: Any) -> Requester: self.prepare(self.reactor, self.clock, self.hs) def tearDown(self) -> None: - for later_gauge in all_later_gauges_to_clean_up_on_shutdown: + for later_gauge in all_later_gauges_to_clean_up_on_shutdown.values(): later_gauge.unregister_hooks_for_server_name( self.hs.config.server.server_name ) From 46e4053cee0cbcf2d078d51dcd5b2d8af4df961f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 7 Aug 2025 16:26:26 -0500 Subject: [PATCH 04/21] Add `server_name` to `register_hook` --- synapse/federation/send_queue.py | 3 ++- synapse/federation/sender/__init__.py | 15 +++++++++------ synapse/handlers/presence.py | 6 ++++-- synapse/http/request_metrics.py | 2 +- synapse/notifier.py | 12 ++++++++---- synapse/replication/tcp/handler.py | 8 +++++--- synapse/replication/tcp/protocol.py | 20 ++++++++++++-------- synapse/storage/database.py | 3 ++- synapse/storage/databases/main/roommember.py | 3 ++- synapse/util/ratelimitutils.py | 10 ++++++---- synapse/util/task_scheduler.py | 3 ++- tests/metrics/test_metrics.py | 4 ++-- 12 files changed, 55 insertions(+), 34 deletions(-) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 1e9722e0d41..6ca24e3bc6a 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -133,7 +133,8 @@ def __init__(self, hs: "HomeServer"): # changes. ARGH. def register(queue_name: QueueNames, queue: Sized) -> None: queue_name_to_gauge_map[queue_name].register_hook( - lambda: {(self.server_name,): len(queue)} + server_name=self.server_name, + hook=lambda: {(self.server_name,): len(queue)}, ) for queue_name in QueueNames: diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 21af1235432..11da53d1278 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -417,27 +417,30 @@ def __init__(self, hs: "HomeServer"): self._per_destination_queues: Dict[str, PerDestinationQueue] = {} transaction_queue_pending_destinations_gauge.register_hook( - lambda: { + server_name=self.server_name, + hook=lambda: { (self.server_name,): sum( 1 for d in self._per_destination_queues.values() if d.transmission_loop_running ) - } + }, ) transaction_queue_pending_pdus_gauge.register_hook( - lambda: { + server_name=self.server_name, + hook=lambda: { (self.server_name,): sum( d.pending_pdu_count() for d in self._per_destination_queues.values() ) - } + }, ) transaction_queue_pending_edus_gauge.register_hook( - lambda: { + server_name=self.server_name, + hook=lambda: { (self.server_name,): sum( d.pending_edu_count() for d in self._per_destination_queues.values() ) - } + }, ) self._is_processing = False diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index fb9f962672f..ab2531984eb 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -792,7 +792,8 @@ def __init__(self, hs: "HomeServer"): ) presence_user_to_current_state_size_gauge.register_hook( - lambda: {(self.server_name,): len(self.user_to_current_state)} + server_name=self.server_name, + hook=lambda: {(self.server_name,): len(self.user_to_current_state)}, ) # The per-device presence state, maps user to devices to per-device presence state. @@ -892,7 +893,8 @@ def __init__(self, hs: "HomeServer"): ) presence_wheel_timer_size_gauge.register_hook( - lambda: {(self.server_name,): len(self.wheel_timer)} + server_name=self.server_name, + hook=lambda: {(self.server_name,): len(self.wheel_timer)}, ) # Used to handle sending of presence to newly joined users/servers diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index c5274c758b9..2e040f92b70 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -169,7 +169,7 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]: desc="", labelnames=["method", "servlet", SERVER_NAME_LABEL], ) -in_flight_requests.register_hook(_get_in_flight_counts) +in_flight_requests.register_hook(server_name=None, hook=_get_in_flight_counts) class RequestMetrics: diff --git a/synapse/notifier.py b/synapse/notifier.py index d56a7b26bbb..458392377a3 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -299,16 +299,20 @@ def count_listeners() -> Mapping[Tuple[str, ...], int]: ) } - notifier_listeners_gauge.register_hook(count_listeners) + notifier_listeners_gauge.register_hook( + server_name=self.server_name, hook=count_listeners + ) notifier_rooms_gauge.register_hook( - lambda: { + server_name=self.server_name, + hook=lambda: { (self.server_name,): count( bool, list(self.room_to_user_streams.values()) ) - } + }, ) notifier_users_gauge.register_hook( - lambda: {(self.server_name,): len(self.user_to_user_stream)} + server_name=self.server_name, + hook=lambda: {(self.server_name,): len(self.user_to_user_stream)}, ) def add_replication_callback(self, cb: Callable[[], None]) -> None: diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index f033eaaeb57..17753426ef3 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -256,7 +256,8 @@ def __init__(self, hs: "HomeServer"): self._connections: List[IReplicationConnection] = [] tcp_resource_total_connections_gauge.register_hook( - lambda: {(self.server_name,): len(self._connections)} + server_name=self.server_name, + hook=lambda: {(self.server_name,): len(self._connections)}, ) # When POSITION or RDATA commands arrive, we stick them in a queue and process @@ -276,10 +277,11 @@ def __init__(self, hs: "HomeServer"): self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {} tcp_command_queue_gauge.register_hook( - lambda: { + server_name=self.server_name, + hook=lambda: { (stream_name, self.server_name): len(queue) for stream_name, queue in self._command_queues_by_stream.items() - } + }, ) self._is_master = hs.config.worker.worker_app is None diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 4d8381646aa..c287a4cf696 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -529,9 +529,10 @@ def replicate(self) -> None: labelnames=["name", SERVER_NAME_LABEL], ) pending_commands.register_hook( - lambda: { + server_name=None, + hook=lambda: { (p.name, p.server_name): len(p.pending_commands) for p in connected_connections - } + }, ) @@ -548,9 +549,10 @@ def transport_buffer_size(protocol: BaseReplicationStreamProtocol) -> int: labelnames=["name", SERVER_NAME_LABEL], ) transport_send_buffer.register_hook( - lambda: { + server_name=None, + hook=lambda: { (p.name, p.server_name): transport_buffer_size(p) for p in connected_connections - } + }, ) @@ -577,10 +579,11 @@ def transport_kernel_read_buffer_size( labelnames=["name", SERVER_NAME_LABEL], ) tcp_transport_kernel_send_buffer.register_hook( - lambda: { + server_name=None, + hook=lambda: { (p.name, p.server_name): transport_kernel_read_buffer_size(p, False) for p in connected_connections - } + }, ) @@ -590,8 +593,9 @@ def transport_kernel_read_buffer_size( labelnames=["name", SERVER_NAME_LABEL], ) tcp_transport_kernel_read_buffer.register_hook( - lambda: { + server_name=None, + hook=lambda: { (p.name, p.server_name): transport_kernel_read_buffer_size(p, True) for p in connected_connections - } + }, ) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index bbdc5b9d278..29c6abb56e0 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -618,7 +618,8 @@ def __init__( self.updates = BackgroundUpdater(hs, self) background_update_status.register_hook( - lambda: {(self.server_name,): self.updates.get_status()}, + server_name=self.server_name, + hook=lambda: {(self.server_name,): self.updates.get_status()}, ) self._previous_txn_total_time = 0.0 diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 94a1274edb0..83c9f1973e9 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -124,7 +124,8 @@ def __init__( self._count_known_servers, ) federation_known_servers_gauge.register_hook( - lambda: {(self.server_name,): self._known_servers_count} + server_name=self.server_name, + hook=lambda: {(self.server_name,): self._known_servers_count}, ) @wrap_as_background_process("_count_known_servers") diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index b3c65676c6d..2bc46ede62e 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -137,12 +137,13 @@ def _get_counts_from_rate_limiter_instance( labelnames=["rate_limiter_name", SERVER_NAME_LABEL], ) sleep_affected_hosts_gauge.register_hook( - lambda: _get_counts_from_rate_limiter_instance( + server_name=None, + hook=lambda: _get_counts_from_rate_limiter_instance( lambda rate_limiter_instance: sum( ratelimiter.should_sleep() for ratelimiter in rate_limiter_instance.ratelimiters.values() ) - ) + ), ) reject_affected_hosts_gauge = LaterGauge( name="synapse_rate_limit_reject_affected_hosts", @@ -150,12 +151,13 @@ def _get_counts_from_rate_limiter_instance( labelnames=["rate_limiter_name", SERVER_NAME_LABEL], ) reject_affected_hosts_gauge.register_hook( - lambda: _get_counts_from_rate_limiter_instance( + server_name=None, + hook=lambda: _get_counts_from_rate_limiter_instance( lambda rate_limiter_instance: sum( ratelimiter.should_reject() for ratelimiter in rate_limiter_instance.ratelimiters.values() ) - ) + ), ) diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 904f99fa426..515ee553f86 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -138,7 +138,8 @@ def __init__(self, hs: "HomeServer"): ) running_tasks_gauge.register_hook( - lambda: {(self.server_name,): len(self._running_tasks)} + server_name=self.server_name, + hook=lambda: {(self.server_name,): len(self._running_tasks)}, ) def register_action( diff --git a/tests/metrics/test_metrics.py b/tests/metrics/test_metrics.py index 5a3c3c1c4ed..5942eda3ac0 100644 --- a/tests/metrics/test_metrics.py +++ b/tests/metrics/test_metrics.py @@ -303,8 +303,8 @@ def test_later_gauge_multiple_servers(self) -> None: desc="", labelnames=[SERVER_NAME_LABEL], ) - later_gauge.register_hook(lambda: {("hs1",): 1}) - later_gauge.register_hook(lambda: {("hs2",): 2}) + later_gauge.register_hook(server_name="hs1", hook=lambda: {("hs1",): 1}) + later_gauge.register_hook(server_name="hs2", hook=lambda: {("hs2",): 2}) metrics_map = get_latest_metrics() From 2e5ac4404e6ed1add456b9dbd792d011d9dc329a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 7 Aug 2025 16:47:19 -0500 Subject: [PATCH 05/21] Move metric clean-up to catch all servers --- tests/server.py | 8 ++++++++ tests/unittest.py | 6 ------ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/server.py b/tests/server.py index 3a81a4c6d9b..8b4f1ac0093 100644 --- a/tests/server.py +++ b/tests/server.py @@ -92,6 +92,7 @@ from synapse.handlers.auth import load_legacy_password_auth_providers from synapse.http.site import SynapseRequest from synapse.logging.context import ContextResourceUsage +from synapse.metrics import all_later_gauges_to_clean_up_on_shutdown from synapse.module_api.callbacks.spamchecker_callbacks import load_legacy_spam_checkers from synapse.module_api.callbacks.third_party_event_rules_callbacks import ( load_legacy_third_party_event_rules, @@ -1215,6 +1216,13 @@ def cleanup() -> None: # Register the cleanup hook cleanup_func(cleanup) + def cleanup_metrics() -> None: + for later_gauge in all_later_gauges_to_clean_up_on_shutdown.values(): + later_gauge.unregister_hooks_for_server_name(hs.config.server.server_name) + + # Register the cleanup hook for metrics + cleanup_func(cleanup_metrics) + # bcrypt is far too slow to be doing in unit tests # Need to let the HS build an auth handler and then mess with it # because AuthHandler's constructor requires the HS, so we can't make one diff --git a/tests/unittest.py b/tests/unittest.py index 6988004578c..5e6957dc6d2 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -77,7 +77,6 @@ current_context, set_current_context, ) -from synapse.metrics import all_later_gauges_to_clean_up_on_shutdown from synapse.rest import RegisterServletsFunc from synapse.server import HomeServer from synapse.storage.keys import FetchKeyResult @@ -472,11 +471,6 @@ async def get_requester(*args: Any, **kwargs: Any) -> Requester: self.prepare(self.reactor, self.clock, self.hs) def tearDown(self) -> None: - for later_gauge in all_later_gauges_to_clean_up_on_shutdown.values(): - later_gauge.unregister_hooks_for_server_name( - self.hs.config.server.server_name - ) - # Reset to not use frozen dicts. events.USE_FROZEN_DICTS = False From 2f25722fb61cd779a636c07b7ef1b7dbced38e2d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 7 Aug 2025 16:50:07 -0500 Subject: [PATCH 06/21] Update changelog number --- changelog.d/{18751.misc => 18791.misc} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename changelog.d/{18751.misc => 18791.misc} (100%) diff --git a/changelog.d/18751.misc b/changelog.d/18791.misc similarity index 100% rename from changelog.d/18751.misc rename to changelog.d/18791.misc From a34122f3fcd2b8f3a8b59d888cc890f1714f1fe5 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 7 Aug 2025 16:59:26 -0500 Subject: [PATCH 07/21] Try fix `synapse/_scripts/synapse_port_db.py` script --- synapse/_scripts/synapse_port_db.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index 0f54cfc64af..f4fdb8f0741 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -353,6 +353,8 @@ def __init__( self.batch_size = batch_size self.hs_config = hs_config + self.mock_hs = MockHomeserver(self.hs_config) + async def setup_table(self, table: str) -> Tuple[str, int, int, int, int]: if table in APPEND_ONLY_TABLES: # It's safe to just carry on inserting. @@ -671,8 +673,7 @@ def build_db_store( engine = create_engine(db_config.config) - hs = MockHomeserver(self.hs_config) - server_name = hs.hostname + server_name = self.mock_hs.hostname with make_conn( db_config=db_config, @@ -685,7 +686,9 @@ def build_db_store( ) prepare_database(db_conn, engine, config=self.hs_config) # Type safety: ignore that we're using Mock homeservers here. - store = Store(DatabasePool(hs, db_config, engine), db_conn, hs) # type: ignore[arg-type] + store = Store( + DatabasePool(self.mock_hs, db_config, engine), db_conn, self.mock_hs + ) # type: ignore[arg-type] db_conn.commit() return store From b226f63fcc223ab4d2927c896b23e2435f853645 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 7 Aug 2025 17:12:59 -0500 Subject: [PATCH 08/21] Cleanup homeservers when we `synapse/_scripts/generate_workers_map.py` --- synapse/_scripts/generate_workers_map.py | 8 ++++++-- synapse/server.py | 16 ++++++++++++++++ tests/server.py | 11 +++-------- 3 files changed, 25 insertions(+), 10 deletions(-) diff --git a/synapse/_scripts/generate_workers_map.py b/synapse/_scripts/generate_workers_map.py index 09feb8cf30f..dd8ce8dec66 100755 --- a/synapse/_scripts/generate_workers_map.py +++ b/synapse/_scripts/generate_workers_map.py @@ -153,9 +153,13 @@ def get_registered_paths_for_default( """ hs = MockHomeserver(base_config, worker_app) + # TODO We only do this to avoid an error, but don't need the database etc - hs.setup() - return get_registered_paths_for_hs(hs) + registered_paths = get_registered_paths_for_hs(hs) + + hs.cleanup() + + return registered_paths def elide_http_methods_if_unconflicting( diff --git a/synapse/server.py b/synapse/server.py index bf82f79bec9..59a9fcb4454 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -89,6 +89,7 @@ PresenceHandler, WorkerPresenceHandler, ) +from synapse.metrics import all_later_gauges_to_clean_up_on_shutdown from synapse.handlers.profile import ProfileHandler from synapse.handlers.push_rules import PushRulesHandler from synapse.handlers.read_marker import ReadMarkerHandler @@ -369,6 +370,21 @@ def setup(self) -> None: if self.config.worker.run_background_tasks: self.setup_background_tasks() + def cleanup(self) -> None: + """ + WIP: Clean-up any references to the homeserver and stop any running related + processes, timers, loops, replication stream, etc. + """ + logger.info("Received cleanup request for %s.", self.hostname) + + # TODO: Stop background processes, timers, loops, replication stream, etc. + + # Cleanup metrics associated with the homeserver + for later_gauge in all_later_gauges_to_clean_up_on_shutdown.values(): + later_gauge.unregister_hooks_for_server_name(self.config.server.server_name) + + logger.info("Cleanup complete for %s.", self.hostname) + def start_listening(self) -> None: # noqa: B027 (no-op by design) """Start the HTTP, manhole, metrics, etc listeners diff --git a/tests/server.py b/tests/server.py index 8b4f1ac0093..ebff8b04b34 100644 --- a/tests/server.py +++ b/tests/server.py @@ -92,7 +92,6 @@ from synapse.handlers.auth import load_legacy_password_auth_providers from synapse.http.site import SynapseRequest from synapse.logging.context import ContextResourceUsage -from synapse.metrics import all_later_gauges_to_clean_up_on_shutdown from synapse.module_api.callbacks.spamchecker_callbacks import load_legacy_spam_checkers from synapse.module_api.callbacks.third_party_event_rules_callbacks import ( load_legacy_third_party_event_rules, @@ -1146,6 +1145,9 @@ def setup_test_homeserver( reactor=reactor, ) + # Register the cleanup hook + cleanup_func(hs.cleanup) + # Install @cache_in_self attributes for key, val in kwargs.items(): setattr(hs, "_" + key, val) @@ -1216,13 +1218,6 @@ def cleanup() -> None: # Register the cleanup hook cleanup_func(cleanup) - def cleanup_metrics() -> None: - for later_gauge in all_later_gauges_to_clean_up_on_shutdown.values(): - later_gauge.unregister_hooks_for_server_name(hs.config.server.server_name) - - # Register the cleanup hook for metrics - cleanup_func(cleanup_metrics) - # bcrypt is far too slow to be doing in unit tests # Need to let the HS build an auth handler and then mess with it # because AuthHandler's constructor requires the HS, so we can't make one From 75e74634a86643e96a91a937d870c8b451e17bdd Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 7 Aug 2025 17:21:26 -0500 Subject: [PATCH 09/21] Fix lints --- synapse/_scripts/synapse_port_db.py | 10 ++++++++-- synapse/server.py | 6 ++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index f4fdb8f0741..358741b60a0 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -687,8 +687,14 @@ def build_db_store( prepare_database(db_conn, engine, config=self.hs_config) # Type safety: ignore that we're using Mock homeservers here. store = Store( - DatabasePool(self.mock_hs, db_config, engine), db_conn, self.mock_hs - ) # type: ignore[arg-type] + DatabasePool( + self.mock_hs, # type: ignore[arg-type] + db_config, + engine, + ), + db_conn, + self.mock_hs, # type: ignore[arg-type] + ) db_conn.commit() return store diff --git a/synapse/server.py b/synapse/server.py index 59a9fcb4454..d51867bd57f 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -89,7 +89,6 @@ PresenceHandler, WorkerPresenceHandler, ) -from synapse.metrics import all_later_gauges_to_clean_up_on_shutdown from synapse.handlers.profile import ProfileHandler from synapse.handlers.push_rules import PushRulesHandler from synapse.handlers.read_marker import ReadMarkerHandler @@ -130,7 +129,10 @@ ) from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.media.media_repository import MediaRepository -from synapse.metrics import register_threadpool +from synapse.metrics import ( + all_later_gauges_to_clean_up_on_shutdown, + register_threadpool, +) from synapse.metrics.common_usage_metrics import CommonUsageMetricsManager from synapse.module_api import ModuleApi from synapse.module_api.callbacks import ModuleApiCallbacks From 68bb036f1e35efa625c403c46bb1fd5483dc4410 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 7 Aug 2025 18:12:39 -0500 Subject: [PATCH 10/21] Use `instance_id` instead of `server_name` to track metrics We use `instance_id` instead of `server_name` because it's possible to have multiple workers running in the same process with the same `server_name`. --- synapse/_scripts/synapse_port_db.py | 13 +++--- synapse/federation/send_queue.py | 2 +- synapse/federation/sender/__init__.py | 6 +-- synapse/handlers/presence.py | 4 +- synapse/http/request_metrics.py | 4 +- synapse/metrics/__init__.py | 44 ++++++++++++-------- synapse/notifier.py | 6 +-- synapse/replication/tcp/handler.py | 4 +- synapse/replication/tcp/protocol.py | 8 ++-- synapse/server.py | 4 +- synapse/storage/database.py | 2 +- synapse/storage/databases/main/roommember.py | 2 +- synapse/util/ratelimitutils.py | 4 +- synapse/util/task_scheduler.py | 2 +- tests/metrics/test_metrics.py | 8 +++- 15 files changed, 66 insertions(+), 47 deletions(-) diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index 358741b60a0..15bb93ad9ec 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -99,6 +99,7 @@ from synapse.storage.prepare_database import prepare_database from synapse.types import ISynapseReactor from synapse.util import SYNAPSE_VERSION, Clock +from synapse.util.stringutils import random_string # Cast safety: Twisted does some naughty magic which replaces the # twisted.internet.reactor module with a Reactor instance at runtime. @@ -330,6 +331,9 @@ def get_clock(self) -> Clock: def get_reactor(self) -> ISynapseReactor: return reactor + def get_instance_id(self) -> Optional[str]: + return random_string(5) + def get_instance_name(self) -> str: return "master" @@ -353,8 +357,6 @@ def __init__( self.batch_size = batch_size self.hs_config = hs_config - self.mock_hs = MockHomeserver(self.hs_config) - async def setup_table(self, table: str) -> Tuple[str, int, int, int, int]: if table in APPEND_ONLY_TABLES: # It's safe to just carry on inserting. @@ -673,7 +675,8 @@ def build_db_store( engine = create_engine(db_config.config) - server_name = self.mock_hs.hostname + hs = MockHomeserver(self.hs_config) + server_name = hs.hostname with make_conn( db_config=db_config, @@ -688,12 +691,12 @@ def build_db_store( # Type safety: ignore that we're using Mock homeservers here. store = Store( DatabasePool( - self.mock_hs, # type: ignore[arg-type] + hs, # type: ignore[arg-type] db_config, engine, ), db_conn, - self.mock_hs, # type: ignore[arg-type] + hs, # type: ignore[arg-type] ) db_conn.commit() diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 6ca24e3bc6a..2fdee9ac549 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -133,7 +133,7 @@ def __init__(self, hs: "HomeServer"): # changes. ARGH. def register(queue_name: QueueNames, queue: Sized) -> None: queue_name_to_gauge_map[queue_name].register_hook( - server_name=self.server_name, + homeserver_instance_id=hs.get_instance_id(), hook=lambda: {(self.server_name,): len(queue)}, ) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 11da53d1278..278a9573310 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -417,7 +417,7 @@ def __init__(self, hs: "HomeServer"): self._per_destination_queues: Dict[str, PerDestinationQueue] = {} transaction_queue_pending_destinations_gauge.register_hook( - server_name=self.server_name, + homeserver_instance_id=hs.get_instance_id(), hook=lambda: { (self.server_name,): sum( 1 @@ -427,7 +427,7 @@ def __init__(self, hs: "HomeServer"): }, ) transaction_queue_pending_pdus_gauge.register_hook( - server_name=self.server_name, + homeserver_instance_id=hs.get_instance_id(), hook=lambda: { (self.server_name,): sum( d.pending_pdu_count() for d in self._per_destination_queues.values() @@ -435,7 +435,7 @@ def __init__(self, hs: "HomeServer"): }, ) transaction_queue_pending_edus_gauge.register_hook( - server_name=self.server_name, + homeserver_instance_id=hs.get_instance_id(), hook=lambda: { (self.server_name,): sum( d.pending_edu_count() for d in self._per_destination_queues.values() diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index ab2531984eb..d7de20f8841 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -792,7 +792,7 @@ def __init__(self, hs: "HomeServer"): ) presence_user_to_current_state_size_gauge.register_hook( - server_name=self.server_name, + homeserver_instance_id=hs.get_instance_id(), hook=lambda: {(self.server_name,): len(self.user_to_current_state)}, ) @@ -893,7 +893,7 @@ def __init__(self, hs: "HomeServer"): ) presence_wheel_timer_size_gauge.register_hook( - server_name=self.server_name, + homeserver_instance_id=hs.get_instance_id(), hook=lambda: {(self.server_name,): len(self.wheel_timer)}, ) diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index 2e040f92b70..83f52edb7c7 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -169,7 +169,9 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]: desc="", labelnames=["method", "servlet", SERVER_NAME_LABEL], ) -in_flight_requests.register_hook(server_name=None, hook=_get_in_flight_counts) +in_flight_requests.register_hook( + homeserver_instance_id=None, hook=_get_in_flight_counts +) class RequestMetrics: diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index a45ff77f233..3c7823da37c 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -161,15 +161,19 @@ class LaterGauge(Collector): name: str desc: str labelnames: Optional[StrSequence] = attr.ib(hash=False) - _server_name_to_hook_map: Dict[ - Optional[str], # server_name + _instance_id_to_hook_map: Dict[ + Optional[str], # instance_id Callable[ [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] ], ] = attr.ib(factory=dict, hash=False) """ - Map from server_name to a callback. Each callback should either return a value (if - there are no labels for this metric), or dict mapping from a label tuple to a value + Map from homeserver instance_id to a callback. Each callback should either return a + value (if there are no labels for this metric), or dict mapping from a label tuple + to a value. + + We use `instance_id` instead of `server_name` because it's possible to have multiple + workers running in the same process with the same `server_name`. """ def collect(self) -> Iterable[Metric]: @@ -177,7 +181,7 @@ def collect(self) -> Iterable[Metric]: # (we don't enforce it here, one level up). g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label] - for hook in self._server_name_to_hook_map.values(): + for hook in self._instance_id_to_hook_map.values(): try: hook_result = hook() except Exception: @@ -198,7 +202,7 @@ def collect(self) -> Iterable[Metric]: def register_hook( self, *, - server_name: Optional[str], + homeserver_instance_id: Optional[str], hook: Callable[ [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] ], @@ -208,32 +212,36 @@ def register_hook( the gauge. Args: - server_name: The homeserver name (`hs.hostname`) this hook is associated - with. This can be used later to lookup all hooks associated with a given - server name in order to unregister them. This should only be omitted - for global hooks that work across all homeservers. + homeserver_instance_id: The unique ID for this Synapse process instance + (`hs.get_instance_id()`) that this hook is associated with. This can be used + later to lookup all hooks associated with a given server name in order to + unregister them. This should only be omitted for global hooks that work + across all homeservers. hook: A callback that should either return a value (if there are no labels for this metric), or dict mapping from a label tuple to a value """ - # We shouldn't have multiple hooks registered for the same `server_name`. - existing_hook = self._server_name_to_hook_map.get(server_name) + # We shouldn't have multiple hooks registered for the same homeserver `instance_id`. + existing_hook = self._instance_id_to_hook_map.get(homeserver_instance_id) assert existing_hook is None, ( - f"LaterGauge(name={self.name}) hook already registered for server_name={server_name}. " + f"LaterGauge(name={self.name}) hook already registered for homeserver_instance_id={homeserver_instance_id}. " "This is likely a Synapse bug and you forgot to unregister the previous hooks for " "the server (especially in tests)." ) - self._server_name_to_hook_map[server_name] = hook + self._instance_id_to_hook_map[homeserver_instance_id] = hook - def unregister_hooks_for_server_name(self, server_name: str) -> None: + def unregister_hooks_for_homeserver_instance_id( + self, homeserver_instance_id: str + ) -> None: """ - Unregister all hooks associated with the given `server_name`. This should be + Unregister all hooks associated with the given homeserver `instance_id`. This should be called when a homeserver is shutdown to avoid extra hooks sitting around. Args: - server_name: The homeserver name to unregister hooks for (`hs.hostname`). + homeserver_instance_id: The unique ID for this Synapse process instance to + unregister hooks for (`hs.get_instance_id()`). """ - self._server_name_to_hook_map.pop(server_name, None) + self._instance_id_to_hook_map.pop(homeserver_instance_id, None) def __attrs_post_init__(self) -> None: REGISTRY.register(self) diff --git a/synapse/notifier.py b/synapse/notifier.py index 458392377a3..7782c9ca659 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -300,10 +300,10 @@ def count_listeners() -> Mapping[Tuple[str, ...], int]: } notifier_listeners_gauge.register_hook( - server_name=self.server_name, hook=count_listeners + homeserver_instance_id=hs.get_instance_id(), hook=count_listeners ) notifier_rooms_gauge.register_hook( - server_name=self.server_name, + homeserver_instance_id=hs.get_instance_id(), hook=lambda: { (self.server_name,): count( bool, list(self.room_to_user_streams.values()) @@ -311,7 +311,7 @@ def count_listeners() -> Mapping[Tuple[str, ...], int]: }, ) notifier_users_gauge.register_hook( - server_name=self.server_name, + homeserver_instance_id=hs.get_instance_id(), hook=lambda: {(self.server_name,): len(self.user_to_user_stream)}, ) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 17753426ef3..dd7e38dd781 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -256,7 +256,7 @@ def __init__(self, hs: "HomeServer"): self._connections: List[IReplicationConnection] = [] tcp_resource_total_connections_gauge.register_hook( - server_name=self.server_name, + homeserver_instance_id=hs.get_instance_id(), hook=lambda: {(self.server_name,): len(self._connections)}, ) @@ -277,7 +277,7 @@ def __init__(self, hs: "HomeServer"): self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {} tcp_command_queue_gauge.register_hook( - server_name=self.server_name, + homeserver_instance_id=hs.get_instance_id(), hook=lambda: { (stream_name, self.server_name): len(queue) for stream_name, queue in self._command_queues_by_stream.items() diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index c287a4cf696..2ec25bf43da 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -529,7 +529,7 @@ def replicate(self) -> None: labelnames=["name", SERVER_NAME_LABEL], ) pending_commands.register_hook( - server_name=None, + homeserver_instance_id=None, hook=lambda: { (p.name, p.server_name): len(p.pending_commands) for p in connected_connections }, @@ -549,7 +549,7 @@ def transport_buffer_size(protocol: BaseReplicationStreamProtocol) -> int: labelnames=["name", SERVER_NAME_LABEL], ) transport_send_buffer.register_hook( - server_name=None, + homeserver_instance_id=None, hook=lambda: { (p.name, p.server_name): transport_buffer_size(p) for p in connected_connections }, @@ -579,7 +579,7 @@ def transport_kernel_read_buffer_size( labelnames=["name", SERVER_NAME_LABEL], ) tcp_transport_kernel_send_buffer.register_hook( - server_name=None, + homeserver_instance_id=None, hook=lambda: { (p.name, p.server_name): transport_kernel_read_buffer_size(p, False) for p in connected_connections @@ -593,7 +593,7 @@ def transport_kernel_read_buffer_size( labelnames=["name", SERVER_NAME_LABEL], ) tcp_transport_kernel_read_buffer.register_hook( - server_name=None, + homeserver_instance_id=None, hook=lambda: { (p.name, p.server_name): transport_kernel_read_buffer_size(p, True) for p in connected_connections diff --git a/synapse/server.py b/synapse/server.py index d51867bd57f..dd1cac405c4 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -383,7 +383,9 @@ def cleanup(self) -> None: # Cleanup metrics associated with the homeserver for later_gauge in all_later_gauges_to_clean_up_on_shutdown.values(): - later_gauge.unregister_hooks_for_server_name(self.config.server.server_name) + later_gauge.unregister_hooks_for_homeserver_instance_id( + self.get_instance_id() + ) logger.info("Cleanup complete for %s.", self.hostname) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 29c6abb56e0..78f228e2fbc 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -618,7 +618,7 @@ def __init__( self.updates = BackgroundUpdater(hs, self) background_update_status.register_hook( - server_name=self.server_name, + homeserver_instance_id=hs.get_instance_id(), hook=lambda: {(self.server_name,): self.updates.get_status()}, ) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 83c9f1973e9..1eb1983198d 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -124,7 +124,7 @@ def __init__( self._count_known_servers, ) federation_known_servers_gauge.register_hook( - server_name=self.server_name, + homeserver_instance_id=hs.get_instance_id(), hook=lambda: {(self.server_name,): self._known_servers_count}, ) diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 2bc46ede62e..88edc071611 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -137,7 +137,7 @@ def _get_counts_from_rate_limiter_instance( labelnames=["rate_limiter_name", SERVER_NAME_LABEL], ) sleep_affected_hosts_gauge.register_hook( - server_name=None, + homeserver_instance_id=None, hook=lambda: _get_counts_from_rate_limiter_instance( lambda rate_limiter_instance: sum( ratelimiter.should_sleep() @@ -151,7 +151,7 @@ def _get_counts_from_rate_limiter_instance( labelnames=["rate_limiter_name", SERVER_NAME_LABEL], ) reject_affected_hosts_gauge.register_hook( - server_name=None, + homeserver_instance_id=None, hook=lambda: _get_counts_from_rate_limiter_instance( lambda rate_limiter_instance: sum( ratelimiter.should_reject() diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 515ee553f86..0539989320f 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -138,7 +138,7 @@ def __init__(self, hs: "HomeServer"): ) running_tasks_gauge.register_hook( - server_name=self.server_name, + homeserver_instance_id=hs.get_instance_id(), hook=lambda: {(self.server_name,): len(self._running_tasks)}, ) diff --git a/tests/metrics/test_metrics.py b/tests/metrics/test_metrics.py index 5942eda3ac0..ecf9d58379e 100644 --- a/tests/metrics/test_metrics.py +++ b/tests/metrics/test_metrics.py @@ -303,8 +303,12 @@ def test_later_gauge_multiple_servers(self) -> None: desc="", labelnames=[SERVER_NAME_LABEL], ) - later_gauge.register_hook(server_name="hs1", hook=lambda: {("hs1",): 1}) - later_gauge.register_hook(server_name="hs2", hook=lambda: {("hs2",): 2}) + later_gauge.register_hook( + homeserver_instance_id="123", hook=lambda: {("hs1",): 1} + ) + later_gauge.register_hook( + homeserver_instance_id="456", hook=lambda: {("hs2",): 2} + ) metrics_map = get_latest_metrics() From 87cc52f0d0c042823184d2d1e49ebeaf97638bb5 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 7 Aug 2025 18:45:25 -0500 Subject: [PATCH 11/21] Fix `BaseStreamTestCase` Multiple `ReplicationCommandHandler` were getting created for the same HS registering the hook twice ``` 2025-08-07 18:38:14-0500 [-] synapse.replication.tcp.handler - 386 - ERROR - process-replication-data-11 - Failed to handle command Traceback (most recent call last): File "synapse/replication/tcp/handler.py", line 384, in _unsafe_process_queue await self._process_command(cmd, conn, stream_name) File "synapse/replication/tcp/handler.py", line 397, in _process_command await self._process_position(stream_name, conn, cmd) File "synapse/replication/tcp/handler.py", line 721, in _process_position (updates, current_token, missing_updates) = await stream.get_updates_since( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ cmd.instance_name, current_token, cmd.new_token ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ) ^ File "synapse/replication/tcp/streams/_base.py", line 213, in get_updates_since updates, upto_token, limited = await self.update_function( ^^^^^^^^^^^^^^^^^^^^^^^^^^^ ...<4 lines>... ) ^ File "synapse/replication/tcp/streams/_base.py", line 269, in update_function result = await client( ^^^^^^^^^^^^^ ...<4 lines>... ) ^ File "synapse/logging/opentracing.py", line 929, in _wrapper return await func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "synapse/replication/http/_base.py", line 232, in send_request streams = hs.get_replication_command_handler().get_streams_to_replicate() ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^ File "synapse/server.py", line 232, in _get dep = builder(self) File "synapse/server.py", line 756, in get_replication_command_handler return ReplicationCommandHandler(self) File "synapse/replication/tcp/handler.py", line 258, in __init__ tcp_resource_total_connections_gauge.register_hook( ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^ homeserver_instance_id=hs.get_instance_id(), ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ hook=lambda: {(self.server_name,): len(self._connections)}, ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ) ^ File "synapse/metrics/__init__.py", line 225, in register_hook assert existing_hook is None, ( ^^^^^^^^^^^^^^^^^^^^^ AssertionError: LaterGauge(name=synapse_replication_tcp_resource_total_connections) hook already registered for homeserver_instance_id=gZBmA. This is likely a Synapse bug and you forgot to unregister the previous hooks for the server (especially in tests). ``` --- tests/replication/_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/replication/_base.py b/tests/replication/_base.py index 453eb7750b3..0229aeba2bc 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -97,7 +97,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.test_handler = self._build_replication_data_handler() self.worker_hs._replication_data_handler = self.test_handler # type: ignore[attr-defined] - repl_handler = ReplicationCommandHandler(self.worker_hs) + repl_handler = self.worker_hs.get_replication_command_handler() self.client = ClientReplicationStreamProtocol( self.worker_hs, "client", From c7d1a78653cf0dfe086a91ea92129b688cfd8bc6 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 7 Aug 2025 18:56:15 -0500 Subject: [PATCH 12/21] Fix lints --- tests/replication/_base.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/replication/_base.py b/tests/replication/_base.py index 0229aeba2bc..e756021937d 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -32,7 +32,6 @@ from synapse.http.site import SynapseRequest, SynapseSite from synapse.replication.http import ReplicationRestResource from synapse.replication.tcp.client import ReplicationDataHandler -from synapse.replication.tcp.handler import ReplicationCommandHandler from synapse.replication.tcp.protocol import ( ClientReplicationStreamProtocol, ServerReplicationStreamProtocol, From 14aee2fe33482ed5dcec7445c5d7bebd6ef45ada Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 7 Aug 2025 19:34:19 -0500 Subject: [PATCH 13/21] Fix multiple databases registering metrics See https://github.com/element-hq/synapse/pull/18791#discussion_r2261684250 This also fixes a long-standing problem where we would only track metrics for the last database listed. --- synapse/storage/database.py | 12 +----------- synapse/storage/databases/__init__.py | 17 +++++++++++++++++ 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 78f228e2fbc..cfec36e0fa1 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -61,7 +61,7 @@ current_context, make_deferred_yieldable, ) -from synapse.metrics import SERVER_NAME_LABEL, LaterGauge, register_threadpool +from synapse.metrics import SERVER_NAME_LABEL, register_threadpool from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.background_updates import BackgroundUpdater from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine @@ -100,12 +100,6 @@ labelnames=["desc", SERVER_NAME_LABEL], ) -background_update_status = LaterGauge( - name="synapse_background_update_status", - desc="Background update status", - labelnames=[SERVER_NAME_LABEL], -) - # Unique indexes which have been added in background updates. Maps from table name # to the name of the background update which added the unique index to that table. @@ -617,10 +611,6 @@ def __init__( ) self.updates = BackgroundUpdater(hs, self) - background_update_status.register_hook( - homeserver_instance_id=hs.get_instance_id(), - hook=lambda: {(self.server_name,): self.updates.get_status()}, - ) self._previous_txn_total_time = 0.0 self._current_txn_total_time = 0.0 diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py index 6442ab6c7a3..a4aba96686f 100644 --- a/synapse/storage/databases/__init__.py +++ b/synapse/storage/databases/__init__.py @@ -22,6 +22,7 @@ import logging from typing import TYPE_CHECKING, Generic, List, Optional, Type, TypeVar +from synapse.metrics import SERVER_NAME_LABEL, LaterGauge from synapse.storage._base import SQLBaseStore from synapse.storage.database import DatabasePool, make_conn from synapse.storage.databases.main.events import PersistEventsStore @@ -40,6 +41,13 @@ DataStoreT = TypeVar("DataStoreT", bound=SQLBaseStore, covariant=True) +background_update_status = LaterGauge( + name="synapse_background_update_status", + desc="Background update status", + labelnames=["database_name", SERVER_NAME_LABEL], +) + + class Databases(Generic[DataStoreT]): """The various databases. @@ -143,6 +151,15 @@ def __init__(self, main_store_class: Type[DataStoreT], hs: "HomeServer"): db_conn.close() + # Track the background update status for each database + background_update_status.register_hook( + homeserver_instance_id=hs.get_instance_id(), + hook=lambda: { + (database.name(), server_name): database.updates.get_status() + for database in self.databases + }, + ) + # Sanity check that we have actually configured all the required stores. if not main: raise Exception("No 'main' database configured") From 59701ed0ee4c7c739a3a8a34adfe4132731b0c74 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 27 Aug 2025 13:34:09 -0500 Subject: [PATCH 14/21] Create consistent `instance_id` in `MockHomeserver` See https://github.com/element-hq/synapse/pull/18791#discussion_r2303738808 --- synapse/_scripts/synapse_port_db.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index 15bb93ad9ec..a81db3cfbfb 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -324,6 +324,7 @@ def __init__(self, config: HomeServerConfig): self.config = config self.hostname = config.server.server_name self.version_string = SYNAPSE_VERSION + self.instance_id = random_string(5) def get_clock(self) -> Clock: return self.clock @@ -331,8 +332,8 @@ def get_clock(self) -> Clock: def get_reactor(self) -> ISynapseReactor: return reactor - def get_instance_id(self) -> Optional[str]: - return random_string(5) + def get_instance_id(self) -> str: + return self.instance_id def get_instance_name(self) -> str: return "master" From 309a72d4d3f4d20ed81ebebbd51cfe6fde106228 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 27 Aug 2025 13:39:44 -0500 Subject: [PATCH 15/21] Note when `cleanup` should be called See https://github.com/element-hq/synapse/pull/18791#discussion_r2303749582 --- synapse/server.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/server.py b/synapse/server.py index dd1cac405c4..23d6c5409f0 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -376,6 +376,10 @@ def cleanup(self) -> None: """ WIP: Clean-up any references to the homeserver and stop any running related processes, timers, loops, replication stream, etc. + + This should be called wherever you care about the HomeServer being completely + garbage collected like in tests. It's not necessary to call if you plan to just + shut down the whole Python process anyway. """ logger.info("Received cleanup request for %s.", self.hostname) From 371aa5154e063fb62f7a317ccd4bec00b7b3baf2 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 27 Aug 2025 13:58:49 -0500 Subject: [PATCH 16/21] `cleanup` when homeserver is garbage collected See https://github.com/element-hq/synapse/pull/18791#discussion_r2303750494 --- synapse/server.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/synapse/server.py b/synapse/server.py index 23d6c5409f0..3eac271c907 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -372,6 +372,14 @@ def setup(self) -> None: if self.config.worker.run_background_tasks: self.setup_background_tasks() + def __del__(self) -> None: + """ + Called when an the homeserver is garbage collected. + + Make sure we actually do some clean-up, rather than leak data. + """ + self.cleanup() + def cleanup(self) -> None: """ WIP: Clean-up any references to the homeserver and stop any running related @@ -380,6 +388,8 @@ def cleanup(self) -> None: This should be called wherever you care about the HomeServer being completely garbage collected like in tests. It's not necessary to call if you plan to just shut down the whole Python process anyway. + + Can be called multiple times. """ logger.info("Received cleanup request for %s.", self.hostname) From 08755ae4557dd9b587271314bd7d6cd9d69bde57 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 27 Aug 2025 14:05:27 -0500 Subject: [PATCH 17/21] Only yield the metric once when we `collect` Previously, we we're yielding `g` multiple times while repeatedly adding extra metrics to the same `g` instance. See https://github.com/element-hq/synapse/pull/18791#discussion_r2303754088 --- synapse/metrics/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 3c7823da37c..4cb67ba9aa9 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -197,7 +197,7 @@ def collect(self) -> Iterable[Metric]: for k, v in hook_result.items(): g.add_metric(k, v) - yield g + yield g def register_hook( self, From 31ad15acd8c6099be35e0febf2925a0c1786f7c4 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 27 Aug 2025 14:10:52 -0500 Subject: [PATCH 18/21] Continue to return metrics that aren't broken See https://github.com/element-hq/synapse/pull/18791#discussion_r2303757012 --- synapse/metrics/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 4cb67ba9aa9..81a67e0b4e8 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -189,7 +189,8 @@ def collect(self) -> Iterable[Metric]: "Exception running callback for LaterGauge(%s)", self.name ) yield g - return + # Continue to return metrics that aren't broken + continue if isinstance(hook_result, (int, float)): g.add_metric([], hook_result) From b4f06b23fe4d968221da08cb29cce099e2f0cd22 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 27 Aug 2025 14:35:51 -0500 Subject: [PATCH 19/21] Add tests to ensure we get all metrics even if one hook throws exception --- synapse/metrics/__init__.py | 8 ++-- tests/metrics/test_metrics.py | 76 +++++++++++++++++++++++++++++------ 2 files changed, 68 insertions(+), 16 deletions(-) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 81a67e0b4e8..5525f39a26b 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -181,15 +181,17 @@ def collect(self) -> Iterable[Metric]: # (we don't enforce it here, one level up). g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label] - for hook in self._instance_id_to_hook_map.values(): + for homeserver_instance_id, hook in self._instance_id_to_hook_map.items(): try: hook_result = hook() except Exception: logger.exception( - "Exception running callback for LaterGauge(%s)", self.name + "Exception running callback for LaterGauge(%s) for homeserver_instance_id=%s", + self.name, + homeserver_instance_id, ) yield g - # Continue to return metrics that aren't broken + # Continue to return the rest of the metrics that aren't broken continue if isinstance(hook_result, (int, float)): diff --git a/tests/metrics/test_metrics.py b/tests/metrics/test_metrics.py index ecf9d58379e..832e9917305 100644 --- a/tests/metrics/test_metrics.py +++ b/tests/metrics/test_metrics.py @@ -18,7 +18,7 @@ # [This file includes modifications made by New Vector Limited] # # -from typing import Dict, Protocol, Tuple +from typing import Dict, NoReturn, Protocol, Tuple from prometheus_client.core import Sample @@ -27,6 +27,7 @@ SERVER_NAME_LABEL, InFlightGauge, LaterGauge, + all_later_gauges_to_clean_up_on_shutdown, generate_latest, ) from synapse.util.caches.deferred_cache import DeferredCache @@ -292,42 +293,91 @@ def test_cache_metric_multiple_servers(self) -> None: class LaterGaugeTests(unittest.HomeserverTestCase): + def setUp(self) -> None: + super().setUp() + self.later_gauge = LaterGauge( + name="foo", + desc="", + labelnames=[SERVER_NAME_LABEL], + ) + + def tearDown(self) -> None: + super().tearDown() + + REGISTRY.unregister(self.later_gauge) + all_later_gauges_to_clean_up_on_shutdown.pop(self.later_gauge.name, None) + def test_later_gauge_multiple_servers(self) -> None: """ Test that LaterGauge metrics are reported correctly across multiple servers. We will have an metrics entry for each homeserver that is labeled with the `server_name` label. """ - later_gauge = LaterGauge( - name="foo", - desc="", - labelnames=[SERVER_NAME_LABEL], - ) - later_gauge.register_hook( + self.later_gauge.register_hook( homeserver_instance_id="123", hook=lambda: {("hs1",): 1} ) - later_gauge.register_hook( + self.later_gauge.register_hook( homeserver_instance_id="456", hook=lambda: {("hs2",): 2} ) metrics_map = get_latest_metrics() - # Find the metrics for the caches from both homeservers + # Find the metrics from both homeservers hs1_metric = 'foo{server_name="hs1"}' hs1_metric_value = metrics_map.get(hs1_metric) self.assertIsNotNone( hs1_metric_value, - f"Missing metric {hs1_metric} in cache metrics {metrics_map}", + f"Missing metric {hs1_metric} in metrics {metrics_map}", ) + self.assertEqual(hs1_metric_value, "1.0") + hs2_metric = 'foo{server_name="hs2"}' hs2_metric_value = metrics_map.get(hs2_metric) self.assertIsNotNone( hs2_metric_value, - f"Missing metric {hs2_metric} in cache metrics {metrics_map}", + f"Missing metric {hs2_metric} in metrics {metrics_map}", ) + self.assertEqual(hs2_metric_value, "2.0") - # Sanity check the metric values - self.assertEqual(hs1_metric_value, "1.0") + def test_later_gauge_hook_exception(self) -> None: + """ + Test that LaterGauge metrics are collected across multiple servers even if one + hooks is throwing an exception. + """ + + def raise_exception() -> NoReturn: + raise Exception("fake error generating data") + + # Make the hook for hs1 throw an exception + self.later_gauge.register_hook( + homeserver_instance_id="123", hook=raise_exception + ) + # Metrics from hs2 still work fine + self.later_gauge.register_hook( + homeserver_instance_id="456", hook=lambda: {("hs2",): 2} + ) + + metrics_map = get_latest_metrics() + + # Since we encountered an exception while trying to collect metrics from hs1, we + # don't expect to see it here. + hs1_metric = 'foo{server_name="hs1"}' + hs1_metric_value = metrics_map.get(hs1_metric) + self.assertIsNone( + hs1_metric_value, + ( + "Since we encountered an exception while trying to collect metrics from hs1" + f"we don't expect to see it the metrics_map {metrics_map}" + ), + ) + + # We should still see metrics from hs2 though + hs2_metric = 'foo{server_name="hs2"}' + hs2_metric_value = metrics_map.get(hs2_metric) + self.assertIsNotNone( + hs2_metric_value, + f"Missing metric {hs2_metric} in cache metrics {metrics_map}", + ) self.assertEqual(hs2_metric_value, "2.0") From 1c1d6c28032c1465eae22205e5389014e2802b92 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 2 Sep 2025 09:59:57 -0500 Subject: [PATCH 20/21] Don't double yield the same gauge See https://github.com/element-hq/synapse/pull/18791#discussion_r2314059513 --- synapse/metrics/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 5525f39a26b..5b291aa893b 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -190,7 +190,6 @@ def collect(self) -> Iterable[Metric]: self.name, homeserver_instance_id, ) - yield g # Continue to return the rest of the metrics that aren't broken continue From 595c174854c88a5fcfa2330150317032cfed55eb Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 2 Sep 2025 11:27:46 -0500 Subject: [PATCH 21/21] Fix `generate_workers_map` script erroring out I think this was accidentally removed before. Testing strategy: ``` poetry run generate_workers_map --config-path homeserver.yaml ``` --- synapse/_scripts/generate_workers_map.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/_scripts/generate_workers_map.py b/synapse/_scripts/generate_workers_map.py index dd8ce8dec66..8878e364e2e 100755 --- a/synapse/_scripts/generate_workers_map.py +++ b/synapse/_scripts/generate_workers_map.py @@ -155,8 +155,8 @@ def get_registered_paths_for_default( hs = MockHomeserver(base_config, worker_app) # TODO We only do this to avoid an error, but don't need the database etc + hs.setup() registered_paths = get_registered_paths_for_hs(hs) - hs.cleanup() return registered_paths