From 4a1b127fb7502a8b2526415204cdd2120d2cc3ae Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 21 Nov 2025 14:47:41 +0000 Subject: [PATCH 01/16] Track when connection was last used --- synapse/handlers/sliding_sync/store.py | 2 +- .../storage/databases/main/sliding_sync.py | 30 +++++++++++++++++-- .../main/delta/93/03_sss_pos_last_used.sql | 16 ++++++++++ synapse/types/handlers/sliding_sync.py | 8 +++++ 4 files changed, 53 insertions(+), 3 deletions(-) create mode 100644 synapse/storage/schema/main/delta/93/03_sss_pos_last_used.sql diff --git a/synapse/handlers/sliding_sync/store.py b/synapse/handlers/sliding_sync/store.py index 7bcd5f27eae..d01fab271f6 100644 --- a/synapse/handlers/sliding_sync/store.py +++ b/synapse/handlers/sliding_sync/store.py @@ -75,7 +75,7 @@ async def get_and_clear_connection_positions( """ # If this is our first request, there is no previous connection state to fetch out of the database if from_token is None or from_token.connection_position == 0: - return PerConnectionState() + return PerConnectionState(last_used_ts=None) conn_id = sync_config.conn_id or "" diff --git a/synapse/storage/databases/main/sliding_sync.py b/synapse/storage/databases/main/sliding_sync.py index 2b67e75ac47..d62f42b7780 100644 --- a/synapse/storage/databases/main/sliding_sync.py +++ b/synapse/storage/databases/main/sliding_sync.py @@ -45,6 +45,14 @@ logger = logging.getLogger(__name__) +# How often to update the `last_used_ts` column on +# `sliding_sync_connection_positions` when the client uses a connection +# position. We don't want to update it on every use to avoid excessive +# writes, but we want it to be reasonably up-to-date to help with +# cleaning up old connection positions. +UPDATE_INTERVAL_LAST_USED_TS_MS = 5 * 60_000 + + class SlidingSyncStore(SQLBaseStore): def __init__( self, @@ -384,7 +392,7 @@ def _get_and_clear_connection_positions_txn( # The `previous_connection_position` is a user-supplied value, so we # need to make sure that the one they supplied is actually theirs. sql = """ - SELECT connection_key + SELECT connection_key, last_used_ts FROM sliding_sync_connection_positions INNER JOIN sliding_sync_connections USING (connection_key) WHERE @@ -396,7 +404,20 @@ def _get_and_clear_connection_positions_txn( if row is None: raise SlidingSyncUnknownPosition() - (connection_key,) = row + (connection_key, last_used_ts) = row + + # Update the `last_used_ts` if it's due to be updated. We don't update + # every time to avoid excessive writes. + now = self.clock.time_msec() + if last_used_ts is None or now - last_used_ts > UPDATE_INTERVAL_LAST_USED_TS_MS: + self.db_pool.simple_update_txn( + txn, + table="sliding_sync_connections", + keyvalues={ + "connection_key": connection_key, + }, + updatevalues={"last_used_ts": now}, + ) # Now that we have seen the client has received and used the connection # position, we can delete all the other connection positions. @@ -480,6 +501,7 @@ def _get_and_clear_connection_positions_txn( logger.warning("Unrecognized sliding sync stream in DB %r", stream) return PerConnectionStateDB( + last_used_ts=last_used_ts, rooms=RoomStatusMap(rooms), receipts=RoomStatusMap(receipts), account_data=RoomStatusMap(account_data), @@ -498,6 +520,8 @@ class PerConnectionStateDB: When persisting this *only* contains updates to the state. """ + last_used_ts: int | None + rooms: "RoomStatusMap[str]" receipts: "RoomStatusMap[str]" account_data: "RoomStatusMap[str]" @@ -553,6 +577,7 @@ async def from_state( ) return PerConnectionStateDB( + last_used_ts=per_connection_state.last_used_ts, rooms=RoomStatusMap(rooms), receipts=RoomStatusMap(receipts), account_data=RoomStatusMap(account_data), @@ -596,6 +621,7 @@ async def to_state(self, store: "DataStore") -> "PerConnectionState": } return PerConnectionState( + last_used_ts=self.last_used_ts, rooms=RoomStatusMap(rooms), receipts=RoomStatusMap(receipts), account_data=RoomStatusMap(account_data), diff --git a/synapse/storage/schema/main/delta/93/03_sss_pos_last_used.sql b/synapse/storage/schema/main/delta/93/03_sss_pos_last_used.sql new file mode 100644 index 00000000000..f972331fc37 --- /dev/null +++ b/synapse/storage/schema/main/delta/93/03_sss_pos_last_used.sql @@ -0,0 +1,16 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2025 Element Creations, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- . + +-- Add a timestamp for when the sliding sync connection position was last used, +-- only updated with a small granularity. +ALTER TABLE sliding_sync_connections ADD COLUMN last_used_ts BIGINT; diff --git a/synapse/types/handlers/sliding_sync.py b/synapse/types/handlers/sliding_sync.py index 494e3570d05..03b3bcb3caf 100644 --- a/synapse/types/handlers/sliding_sync.py +++ b/synapse/types/handlers/sliding_sync.py @@ -850,12 +850,16 @@ class PerConnectionState: since the last time you made a sync request. Attributes: + last_used_ts: The time this connection was last used, in milliseconds. + This is only accurate to `UPDATE_CONNECTION_STATE_EVERY_MS`. rooms: The status of each room for the events stream. receipts: The status of each room for the receipts stream. room_configs: Map from room_id to the `RoomSyncConfig` of all rooms that we have previously sent down. """ + last_used_ts: int | None = None + rooms: RoomStatusMap[RoomStreamToken] = attr.Factory(RoomStatusMap) receipts: RoomStatusMap[MultiWriterStreamToken] = attr.Factory(RoomStatusMap) account_data: RoomStatusMap[int] = attr.Factory(RoomStatusMap) @@ -867,6 +871,7 @@ def get_mutable(self) -> "MutablePerConnectionState": room_configs = cast(MutableMapping[str, RoomSyncConfig], self.room_configs) return MutablePerConnectionState( + last_used_ts=self.last_used_ts, rooms=self.rooms.get_mutable(), receipts=self.receipts.get_mutable(), account_data=self.account_data.get_mutable(), @@ -875,6 +880,7 @@ def get_mutable(self) -> "MutablePerConnectionState": def copy(self) -> "PerConnectionState": return PerConnectionState( + last_used_ts=self.last_used_ts, rooms=self.rooms.copy(), receipts=self.receipts.copy(), account_data=self.account_data.copy(), @@ -889,6 +895,8 @@ def __len__(self) -> int: class MutablePerConnectionState(PerConnectionState): """A mutable version of `PerConnectionState`""" + last_used_ts: int | None + rooms: MutableRoomStatusMap[RoomStreamToken] receipts: MutableRoomStatusMap[MultiWriterStreamToken] account_data: MutableRoomStatusMap[int] From 5fde889621f859c8df6a961473b04b563965fce9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 21 Nov 2025 15:10:02 +0000 Subject: [PATCH 02/16] Better checking of rooms to send When checking which might have had updates, instead of just checking the in-memory cache of recent data also check the DB tables. This gives a more accurate view of how much data there is to send down when either a) the position is old or b) the server has recently restarted and so has a smaller cache. --- synapse/handlers/sliding_sync/room_lists.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index 3d119022367..a6effd6683e 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -857,8 +857,9 @@ async def _filter_relevant_rooms_to_send( # We only need to check for new events since any state changes # will also come down as new events. - rooms_that_have_updates = ( - self.store.get_rooms_that_might_have_updates( + + rooms_that_have_updates = await ( + self.store.get_rooms_that_have_updates_since_sliding_sync_table( relevant_room_map.keys(), from_token.room_key ) ) From e5a8061652d70c16d475125786aee0428f083a31 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 21 Nov 2025 15:12:32 +0000 Subject: [PATCH 03/16] Expire older connections with lots of updates It's generally faster for clients to reset the sliding sync connection than to try and send lots of data down on the existing connection. As a heuristic we arbitrarily choose to reset the connection if a) there are more than 100 rooms with updates and b) the connection hasn't been used for at least an hour. --- synapse/handlers/sliding_sync/room_lists.py | 29 +++++++++++++++++++ .../storage/databases/main/sliding_sync.py | 1 + 2 files changed, 30 insertions(+) diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index a6effd6683e..71009d687da 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -34,6 +34,7 @@ EventTypes, Membership, ) +from synapse.api.errors import SlidingSyncUnknownPosition from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import StrippedStateEvent from synapse.events.utils import parse_stripped_state_event @@ -77,6 +78,15 @@ logger = logging.getLogger(__name__) +# Minimum time in milliseconds since the last sync before we consider expiring +# the connection due to too many rooms to send. This stops from getting into +# tight loops with clients that request lots of data at once. +MINIMUM_NOT_USED_AGE_EXPIRY_MS = 60 * 60 * 1000 # 1 hour + +# How many rooms with updates we allow before we consider the connection +# expired due to too many rooms to send. +NUM_ROOMS_THRESHOLD = 100 + # Helper definition for the types that we might return. We do this to avoid # copying data between types (which can be expensive for many rooms). RoomsForUserType = RoomsForUserStateReset | RoomsForUser | RoomsForUserSlidingSync @@ -176,6 +186,7 @@ def __init__(self, hs: "HomeServer"): self.storage_controllers = hs.get_storage_controllers() self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync self.is_mine_id = hs.is_mine_id + self._clock = hs.get_clock() async def compute_interested_rooms( self, @@ -863,6 +874,24 @@ async def _filter_relevant_rooms_to_send( relevant_room_map.keys(), from_token.room_key ) ) + + # Check if we have lots of updates to send, if so then its + # better for us to tell the client to do a full resync + # instead. + # + # We only do this if the last sync was over + # `MINIMUM_NOT_USED_AGE_EXPIRY_MS` to ensure we don't get + # into tight loops with clients that keep requesting large + # sliding sync windows. + if len(rooms_that_have_updates) > NUM_ROOMS_THRESHOLD: + last_sync_ts = previous_connection_state.last_used_ts + if ( + last_sync_ts is not None + and (self._clock.time_msec() - last_sync_ts) + > MINIMUM_NOT_USED_AGE_EXPIRY_MS + ): + raise SlidingSyncUnknownPosition() + rooms_should_send.update(rooms_that_have_updates) relevant_rooms_to_send_map = { room_id: room_sync_config diff --git a/synapse/storage/databases/main/sliding_sync.py b/synapse/storage/databases/main/sliding_sync.py index d62f42b7780..7d4a017101d 100644 --- a/synapse/storage/databases/main/sliding_sync.py +++ b/synapse/storage/databases/main/sliding_sync.py @@ -210,6 +210,7 @@ def persist_per_connection_state_txn( "effective_device_id": device_id, "conn_id": conn_id, "created_ts": self.clock.time_msec(), + "last_used_ts": self.clock.time_msec(), }, returning=("connection_key",), ) From 41dfbd2ef362b9ddbe46b00e557e30f94e13d189 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 21 Nov 2025 15:41:21 +0000 Subject: [PATCH 04/16] Expire connections older than a week --- .../storage/databases/main/sliding_sync.py | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/synapse/storage/databases/main/sliding_sync.py b/synapse/storage/databases/main/sliding_sync.py index 7d4a017101d..c8e8d4b7b43 100644 --- a/synapse/storage/databases/main/sliding_sync.py +++ b/synapse/storage/databases/main/sliding_sync.py @@ -20,6 +20,7 @@ from synapse.api.errors import SlidingSyncUnknownPosition from synapse.logging.opentracing import log_kv +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import ( DatabasePool, @@ -53,6 +54,11 @@ UPDATE_INTERVAL_LAST_USED_TS_MS = 5 * 60_000 +# Time in milliseconds the connection hasn't been used before we consider it +# expired and delete it. +CONNECTION_EXPIRY_MS = 7 * 24 * 60 * 60 * 1000 # 7 days + + class SlidingSyncStore(SQLBaseStore): def __init__( self, @@ -84,6 +90,12 @@ def __init__( replaces_index="sliding_sync_membership_snapshots_user_id", ) + if self.hs.config.worker.run_background_tasks: + self.clock.looping_call( + self.delete_old_sliding_sync_connections, + 1 * 60 * 60 * 1000, # every hour + ) + async def get_latest_bump_stamp_for_room( self, room_id: str, @@ -170,6 +182,11 @@ def persist_per_connection_state_txn( if previous_connection_position is not None: # The `previous_connection_position` is a user-supplied value, so we # need to make sure that the one they supplied is actually theirs. + # + # We take out a `FOR UPDATE` lock on the row to prevent races with + # the connection deletion. If the connection gets deleted underneath + # then the query will return no rows and we raise + # `SlidingSyncUnknownPosition` exception. sql = """ SELECT connection_key FROM sliding_sync_connection_positions @@ -177,6 +194,7 @@ def persist_per_connection_state_txn( WHERE connection_position = ? AND user_id = ? AND effective_device_id = ? AND conn_id = ? + FOR UPDATE """ txn.execute( sql, (previous_connection_position, user_id, device_id, conn_id) @@ -509,6 +527,23 @@ def _get_and_clear_connection_positions_txn( room_configs=room_configs, ) + @wrap_as_background_process("delete_old_sliding_sync_connections") + async def delete_old_sliding_sync_connections(self) -> None: + """Delete sliding sync connections that have not been used for a long time.""" + cutoff_ts = self.clock.time_msec() - CONNECTION_EXPIRY_MS + + def delete_old_sliding_sync_connections_txn(txn: LoggingTransaction) -> None: + sql = """ + DELETE FROM sliding_sync_connections + WHERE last_used_ts IS NOT NULL AND last_used_ts < ? + """ + txn.execute(sql, (cutoff_ts,)) + + await self.db_pool.runInteraction( + "delete_old_sliding_sync_connections", + delete_old_sliding_sync_connections_txn, + ) + @attr.s(auto_attribs=True, frozen=True) class PerConnectionStateDB: From 321465b3df3a2b409cd1300c5cb4804c385d1685 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 21 Nov 2025 15:31:47 +0000 Subject: [PATCH 05/16] Add tests --- .../sliding_sync/test_connection_tracking.py | 102 ++++++++++++++++++ .../client/sliding_sync/test_sliding_sync.py | 28 +++-- 2 files changed, 124 insertions(+), 6 deletions(-) diff --git a/tests/rest/client/sliding_sync/test_connection_tracking.py b/tests/rest/client/sliding_sync/test_connection_tracking.py index 16d13fcc860..e0537fb5e3c 100644 --- a/tests/rest/client/sliding_sync/test_connection_tracking.py +++ b/tests/rest/client/sliding_sync/test_connection_tracking.py @@ -12,6 +12,7 @@ # . # import logging +from unittest.mock import patch from parameterized import parameterized, parameterized_class @@ -19,8 +20,11 @@ import synapse.rest.admin from synapse.api.constants import EventTypes +from synapse.api.errors import Codes +from synapse.handlers.sliding_sync import room_lists from synapse.rest.client import login, room, sync from synapse.server import HomeServer +from synapse.storage.databases.main.sliding_sync import CONNECTION_EXPIRY_MS from synapse.util.clock import Clock from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase @@ -395,3 +399,101 @@ def test_rooms_timeline_incremental_sync_NEVER(self) -> None: ) self.assertEqual(response_body["rooms"][room_id1]["limited"], True) self.assertEqual(response_body["rooms"][room_id1]["initial"], True) + + @patch("synapse.handlers.sliding_sync.room_lists.NUM_ROOMS_THRESHOLD", new=5) + def test_sliding_sync_connection_expires_with_too_much_data(self) -> None: + """ + Test that if we have too much data to send down for incremental sync, + we expire the connection and ask the client to do a full resync. + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # Create enough rooms that we can later trigger the too much data case + room_ids = [] + for _ in range(room_lists.NUM_ROOMS_THRESHOLD + 2): + room_id = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id, user1_id, tok=user1_tok) + room_ids.append(room_id) + + # Make sure we don't hit ratelimits + self.reactor.advance(60 * 1000) + + # Make the Sliding Sync request + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1000]], + "required_state": [], + "timeline_limit": 1, + } + } + } + + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + self.assertIn(room_id, response_body["rooms"]) + + # Send a lot of events to cause the connection to expire + for room_id in room_ids: + self.helper.send(room_id, "msg", tok=user2_tok) + + # If we don't advance the clock then we won't expire the connection. + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Send some more events. + for room_id in room_ids: + self.helper.send(room_id, "msg", tok=user2_tok) + + # Advance the clock to ensure that the last_used_ts is old enough + self.reactor.advance(2 * room_lists.MINIMUM_NOT_USED_AGE_EXPIRY_MS / 1000) + + # This sync should now raise SlidingSyncUnknownPosition + channel = self.make_sync_request(sync_body, since=from_token, tok=user1_tok) + self.assertEqual(channel.code, 400) + self.assertEqual(channel.json_body["errcode"], Codes.UNKNOWN_POS) + + def test_sliding_sync_connection_expires_after_time(self) -> None: + """ + Test that if we don't use a sliding sync connection for a long time, + we expire the connection and ask the client to do a full resync. + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id, user1_id, tok=user1_tok) + + # Make the Sliding Sync request + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1000]], + "required_state": [], + "timeline_limit": 1, + } + } + } + + _, from_token = self.do_sync(sync_body, tok=user1_tok) + + # We can keep syncing so long as the interval between requests is less + # than CONNECTION_EXPIRY_MS + for _ in range(5): + self.reactor.advance(0.5 * CONNECTION_EXPIRY_MS / 1000) + + _, from_token = self.do_sync(sync_body, tok=user1_tok) + + # ... but if we wait too long, the connection expires + self.reactor.advance(CONNECTION_EXPIRY_MS / 1000) + + # This sync should now raise SlidingSyncUnknownPosition + channel = self.make_sync_request(sync_body, since=from_token, tok=user1_tok) + self.assertEqual(channel.code, 400) + self.assertEqual(channel.json_body["errcode"], Codes.UNKNOWN_POS) diff --git a/tests/rest/client/sliding_sync/test_sliding_sync.py b/tests/rest/client/sliding_sync/test_sliding_sync.py index c27a712088d..bcd22d15ca6 100644 --- a/tests/rest/client/sliding_sync/test_sliding_sync.py +++ b/tests/rest/client/sliding_sync/test_sliding_sync.py @@ -46,7 +46,7 @@ from synapse.util.stringutils import random_string from tests import unittest -from tests.server import TimedOutException +from tests.server import FakeChannel, TimedOutException from tests.test_utils.event_injection import create_event logger = logging.getLogger(__name__) @@ -80,12 +80,10 @@ def default_config(self) -> JsonDict: config["experimental_features"] = {"msc3575_enabled": True} return config - def do_sync( + def make_sync_request( self, sync_body: JsonDict, *, since: str | None = None, tok: str - ) -> tuple[JsonDict, str]: - """Do a sliding sync request with given body. - - Asserts the request was successful. + ) -> FakeChannel: + """Make a sliding sync request with given body. Attributes: sync_body: The full request body to use @@ -106,6 +104,24 @@ def do_sync( content=sync_body, access_token=tok, ) + return channel + + def do_sync( + self, sync_body: JsonDict, *, since: str | None = None, tok: str + ) -> tuple[JsonDict, str]: + """Do a sliding sync request with given body. + + Asserts the request was successful. + + Attributes: + sync_body: The full request body to use + since: Optional since token + tok: Access token to use + + Returns: + A tuple of the response body and the `pos` field. + """ + channel = self.make_sync_request(sync_body, since=since, tok=tok) self.assertEqual(channel.code, 200, channel.json_body) return channel.json_body, channel.json_body["pos"] From a3d6299994076e4c46f9941334261fb30eb6bd86 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 21 Nov 2025 15:48:54 +0000 Subject: [PATCH 06/16] Newsfile --- changelog.d/19211.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/19211.misc diff --git a/changelog.d/19211.misc b/changelog.d/19211.misc new file mode 100644 index 00000000000..d8a4a44662e --- /dev/null +++ b/changelog.d/19211.misc @@ -0,0 +1 @@ +Expire sliding sync connections that are too old or have too much pending data. From 6b0de5f43baf663418c0192ceceb7a67bb6e0351 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Nov 2025 10:59:06 +0000 Subject: [PATCH 07/16] Remove uneeded `FOR UPDATE` lock. This doesn't work on SQLite, causing the tests to fail. I don't think we actually need this now I think about it more. Any DELETE on the rows will already conflict with the SELECT, so we don't need to acquire stronger row locks. --- synapse/storage/databases/main/sliding_sync.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/synapse/storage/databases/main/sliding_sync.py b/synapse/storage/databases/main/sliding_sync.py index c8e8d4b7b43..488375d6d5d 100644 --- a/synapse/storage/databases/main/sliding_sync.py +++ b/synapse/storage/databases/main/sliding_sync.py @@ -182,11 +182,6 @@ def persist_per_connection_state_txn( if previous_connection_position is not None: # The `previous_connection_position` is a user-supplied value, so we # need to make sure that the one they supplied is actually theirs. - # - # We take out a `FOR UPDATE` lock on the row to prevent races with - # the connection deletion. If the connection gets deleted underneath - # then the query will return no rows and we raise - # `SlidingSyncUnknownPosition` exception. sql = """ SELECT connection_key FROM sliding_sync_connection_positions @@ -194,7 +189,6 @@ def persist_per_connection_state_txn( WHERE connection_position = ? AND user_id = ? AND effective_device_id = ? AND conn_id = ? - FOR UPDATE """ txn.execute( sql, (previous_connection_position, user_id, device_id, conn_id) From 482c596146f2a9d6ab9e67c52c027bc1c7ff428b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Nov 2025 11:01:40 +0000 Subject: [PATCH 08/16] Tests: check all room IDs are received --- tests/rest/client/sliding_sync/test_connection_tracking.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/rest/client/sliding_sync/test_connection_tracking.py b/tests/rest/client/sliding_sync/test_connection_tracking.py index e0537fb5e3c..09b40211b76 100644 --- a/tests/rest/client/sliding_sync/test_connection_tracking.py +++ b/tests/rest/client/sliding_sync/test_connection_tracking.py @@ -435,7 +435,9 @@ def test_sliding_sync_connection_expires_with_too_much_data(self) -> None: response_body, from_token = self.do_sync(sync_body, tok=user1_tok) - self.assertIn(room_id, response_body["rooms"]) + # Check we got all the rooms down + for room_id in room_ids: + self.assertIn(room_id, response_body["rooms"]) # Send a lot of events to cause the connection to expire for room_id in room_ids: From 2308b04335358dedfb8ac0171ef5fa97bc2ed6df Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Nov 2025 11:12:11 +0000 Subject: [PATCH 09/16] Use duration constants --- synapse/handlers/sliding_sync/room_lists.py | 3 ++- synapse/storage/databases/main/sliding_sync.py | 16 ++++++++++++---- synapse/util/constants.py | 1 + 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index 71009d687da..faf2abdd148 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -69,6 +69,7 @@ ) from synapse.types.state import StateFilter from synapse.util import MutableOverlayMapping +from synapse.util.constants import MILLISECONDS_PER_SECOND, ONE_HOUR_SECONDS from synapse.util.sentinel import Sentinel if TYPE_CHECKING: @@ -81,7 +82,7 @@ # Minimum time in milliseconds since the last sync before we consider expiring # the connection due to too many rooms to send. This stops from getting into # tight loops with clients that request lots of data at once. -MINIMUM_NOT_USED_AGE_EXPIRY_MS = 60 * 60 * 1000 # 1 hour +MINIMUM_NOT_USED_AGE_EXPIRY_MS = ONE_HOUR_SECONDS * MILLISECONDS_PER_SECOND # How many rooms with updates we allow before we consider the connection # expired due to too many rooms to send. diff --git a/synapse/storage/databases/main/sliding_sync.py b/synapse/storage/databases/main/sliding_sync.py index 488375d6d5d..8cd3de8f40c 100644 --- a/synapse/storage/databases/main/sliding_sync.py +++ b/synapse/storage/databases/main/sliding_sync.py @@ -37,6 +37,12 @@ RoomSyncConfig, ) from synapse.util.caches.descriptors import cached +from synapse.util.constants import ( + MILLISECONDS_PER_SECOND, + ONE_DAY_SECONDS, + ONE_HOUR_SECONDS, + ONE_MINUTE_SECONDS, +) from synapse.util.json import json_encoder if TYPE_CHECKING: @@ -51,12 +57,14 @@ # position. We don't want to update it on every use to avoid excessive # writes, but we want it to be reasonably up-to-date to help with # cleaning up old connection positions. -UPDATE_INTERVAL_LAST_USED_TS_MS = 5 * 60_000 - +UPDATE_INTERVAL_LAST_USED_TS_MS = 5 * ONE_MINUTE_SECONDS * MILLISECONDS_PER_SECOND # Time in milliseconds the connection hasn't been used before we consider it # expired and delete it. -CONNECTION_EXPIRY_MS = 7 * 24 * 60 * 60 * 1000 # 7 days +CONNECTION_EXPIRY_MS = 7 * ONE_DAY_SECONDS * MILLISECONDS_PER_SECOND + +# How often we run the background process to delete old sliding sync connections. +CONNECTION_EXPIRY_FREQUENCY_MS = ONE_HOUR_SECONDS * MILLISECONDS_PER_SECOND class SlidingSyncStore(SQLBaseStore): @@ -93,7 +101,7 @@ def __init__( if self.hs.config.worker.run_background_tasks: self.clock.looping_call( self.delete_old_sliding_sync_connections, - 1 * 60 * 60 * 1000, # every hour + CONNECTION_EXPIRY_FREQUENCY_MS, ) async def get_latest_bump_stamp_for_room( diff --git a/synapse/util/constants.py b/synapse/util/constants.py index 7a3d073df55..f4491b58856 100644 --- a/synapse/util/constants.py +++ b/synapse/util/constants.py @@ -18,5 +18,6 @@ # readability and catching bugs. ONE_MINUTE_SECONDS = 60 ONE_HOUR_SECONDS = 60 * ONE_MINUTE_SECONDS +ONE_DAY_SECONDS = 24 * ONE_HOUR_SECONDS MILLISECONDS_PER_SECOND = 1000 From 5951328ad175328b1df8ba686dc61cf1c3d173e8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Nov 2025 12:29:33 +0000 Subject: [PATCH 10/16] Correctly handle sliding sync tables not being populated --- synapse/handlers/sliding_sync/room_lists.py | 9 +++++++-- synapse/storage/databases/main/stream.py | 14 +++++++++++++- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index faf2abdd148..b21516805fb 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -82,10 +82,15 @@ # Minimum time in milliseconds since the last sync before we consider expiring # the connection due to too many rooms to send. This stops from getting into # tight loops with clients that request lots of data at once. +# +# c.f. `NUM_ROOMS_THRESHOLD`. These values are somewhat arbitrary picked. MINIMUM_NOT_USED_AGE_EXPIRY_MS = ONE_HOUR_SECONDS * MILLISECONDS_PER_SECOND -# How many rooms with updates we allow before we consider the connection -# expired due to too many rooms to send. +# How many rooms with updates we allow before we consider the connection expired +# due to too many rooms to send. +# +# c.f. `MINIMUM_NOT_USED_AGE_EXPIRY_MS`. These values are somewhat arbitrary +# picked. NUM_ROOMS_THRESHOLD = 100 # Helper definition for the types that we might return. We do this to avoid diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 8644ff412ec..8fa1e2e5a97 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -740,7 +740,14 @@ async def get_rooms_that_have_updates_since_sliding_sync_table( from_key: RoomStreamToken, ) -> StrCollection: """Return the rooms that probably have had updates since the given - token (changes that are > `from_key`).""" + token (changes that are > `from_key`). + + May return false positives, but must not return false negatives. + + If `have_finished_sliding_sync_background_jobs` is False, then we return + all the room IDs, as we can't be sure that the sliding sync table is + fully populated. + """ # If the stream change cache is valid for the stream token, we can just # use the result of that. if from_key.stream >= self._events_stream_cache.get_earliest_known_position(): @@ -748,6 +755,11 @@ async def get_rooms_that_have_updates_since_sliding_sync_table( room_ids, from_key.stream ) + if not self.have_finished_sliding_sync_background_jobs(): + # If the table hasn't been populated yet, we have to assume all rooms + # have updates. + return room_ids + def get_rooms_that_have_updates_since_sliding_sync_table_txn( txn: LoggingTransaction, ) -> StrCollection: From 375ab970e066cbc4dadad08cc8a2caada6b866b7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Nov 2025 12:33:08 +0000 Subject: [PATCH 11/16] Expand comment about why we're doing this --- synapse/handlers/sliding_sync/room_lists.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index b21516805fb..1112f0dff47 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -883,7 +883,18 @@ async def _filter_relevant_rooms_to_send( # Check if we have lots of updates to send, if so then its # better for us to tell the client to do a full resync - # instead. + # instead (to try and avoid long SSS response times when + # there is new data). + # + # Due to the construction of the SSS API the client is in + # charge of setting the range of rooms to request updates + # for. Generally, it will start with a small range and then + # expand (and occasionally it may contract the range again + # if its been offline for a while). If we know there are a + # lot of updates, it's better to reset the connection and + # wait for the client to start again (with a much smaller + # range) than to try and send down a large number of updates + # (which can take a long time). # # We only do this if the last sync was over # `MINIMUM_NOT_USED_AGE_EXPIRY_MS` to ensure we don't get From 76a7af456cef9999d33490dbb6e503f00e815c9c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Nov 2025 12:36:46 +0000 Subject: [PATCH 12/16] Add assertion that constants make sense --- synapse/handlers/sliding_sync/room_lists.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index 1112f0dff47..50976aa54cf 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -39,6 +39,7 @@ from synapse.events import StrippedStateEvent from synapse.events.utils import parse_stripped_state_event from synapse.logging.opentracing import start_active_span, trace +from synapse.storage.databases.main.sliding_sync import UPDATE_INTERVAL_LAST_USED_TS_MS from synapse.storage.databases.main.state import ( ROOM_UNKNOWN_SENTINEL, Sentinel as StateSentinel, @@ -93,6 +94,13 @@ # picked. NUM_ROOMS_THRESHOLD = 100 +# Sanity check that our minimum age is sensible compared to the update interval, +# i.e. if `MINIMUM_NOT_USED_AGE_EXPIRY_MS` is too small then we might expire the +# connection even if it is actively being used (and we're just not updating the +# DB frequently enough). We arbitrarily double the update interval to give some +# wiggle room. +assert 2 * UPDATE_INTERVAL_LAST_USED_TS_MS < MINIMUM_NOT_USED_AGE_EXPIRY_MS + # Helper definition for the types that we might return. We do this to avoid # copying data between types (which can be expensive for many rooms). RoomsForUserType = RoomsForUserStateReset | RoomsForUser | RoomsForUserSlidingSync From ff4fde69455eda2a64a1ba41e12ce4f4ea509696 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Nov 2025 12:40:52 +0000 Subject: [PATCH 13/16] Expand commens on new column --- .../schema/main/delta/93/03_sss_pos_last_used.sql | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/synapse/storage/schema/main/delta/93/03_sss_pos_last_used.sql b/synapse/storage/schema/main/delta/93/03_sss_pos_last_used.sql index f972331fc37..747ba7a144b 100644 --- a/synapse/storage/schema/main/delta/93/03_sss_pos_last_used.sql +++ b/synapse/storage/schema/main/delta/93/03_sss_pos_last_used.sql @@ -13,4 +13,15 @@ -- Add a timestamp for when the sliding sync connection position was last used, -- only updated with a small granularity. +-- +-- This should be NOT NULL, but we need to consider existing rows. In future we +-- may want to either backfill this or delete all rows with a NULL value (and +-- then make it NOT NULL). ALTER TABLE sliding_sync_connections ADD COLUMN last_used_ts BIGINT; + +-- Note: We don't add an index on this column to allow HOT updates on PostgreSQL +-- to reduce the cost of the updates to the column. c.f. +-- https://www.postgresql.org/docs/current/storage-hot.html +-- +-- We do query this column directly to find expired connections, but we expect +-- that to be an infrequent operation and a sequential scan should be fine. From 1e74b34f6237adf060da77c8b41aabfd11999b39 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Nov 2025 12:57:55 +0000 Subject: [PATCH 14/16] Fix flakey test --- tests/rest/client/sliding_sync/test_connection_tracking.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/rest/client/sliding_sync/test_connection_tracking.py b/tests/rest/client/sliding_sync/test_connection_tracking.py index 09b40211b76..17bf2ade23f 100644 --- a/tests/rest/client/sliding_sync/test_connection_tracking.py +++ b/tests/rest/client/sliding_sync/test_connection_tracking.py @@ -493,7 +493,7 @@ def test_sliding_sync_connection_expires_after_time(self) -> None: _, from_token = self.do_sync(sync_body, tok=user1_tok) # ... but if we wait too long, the connection expires - self.reactor.advance(CONNECTION_EXPIRY_MS / 1000) + self.reactor.advance(1 + CONNECTION_EXPIRY_MS / 1000) # This sync should now raise SlidingSyncUnknownPosition channel = self.make_sync_request(sync_body, since=from_token, tok=user1_tok) From aaf541ca6bb8d267e8c25e9a756a2de783e49f74 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 25 Nov 2025 09:49:31 +0000 Subject: [PATCH 15/16] Update synapse/handlers/sliding_sync/room_lists.py Co-authored-by: Eric Eastwood --- synapse/handlers/sliding_sync/room_lists.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index 50976aa54cf..fa4ff22b645 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -894,7 +894,7 @@ async def _filter_relevant_rooms_to_send( # instead (to try and avoid long SSS response times when # there is new data). # - # Due to the construction of the SSS API the client is in + # Due to the construction of the SSS API, the client is in # charge of setting the range of rooms to request updates # for. Generally, it will start with a small range and then # expand (and occasionally it may contract the range again From b139c38b3c60d7110aaac736cdba5e237b18fffb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 25 Nov 2025 09:50:32 +0000 Subject: [PATCH 16/16] Expand test comment --- tests/rest/client/sliding_sync/test_connection_tracking.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/rest/client/sliding_sync/test_connection_tracking.py b/tests/rest/client/sliding_sync/test_connection_tracking.py index 17bf2ade23f..cdf63317e39 100644 --- a/tests/rest/client/sliding_sync/test_connection_tracking.py +++ b/tests/rest/client/sliding_sync/test_connection_tracking.py @@ -405,6 +405,10 @@ def test_sliding_sync_connection_expires_with_too_much_data(self) -> None: """ Test that if we have too much data to send down for incremental sync, we expire the connection and ask the client to do a full resync. + + Connections are only expired if they have not been used for a minimum + amount of time (MINIMUM_NOT_USED_AGE_EXPIRY_MS) to avoid expiring + connections that are actively being used. """ user1_id = self.register_user("user1", "pass")