From c10b78469f84aa67828b6c6160ed4724405db615 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 12 Dec 2025 14:22:00 +0000 Subject: [PATCH 1/7] Prune sliding_sync_connection_required_state --- .../storage/databases/main/sliding_sync.py | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/synapse/storage/databases/main/sliding_sync.py b/synapse/storage/databases/main/sliding_sync.py index c66002dae47..3c32550e971 100644 --- a/synapse/storage/databases/main/sliding_sync.py +++ b/synapse/storage/databases/main/sliding_sync.py @@ -450,6 +450,9 @@ def _get_and_clear_connection_positions_txn( # Now that we have seen the client has received and used the connection # position, we can delete all the other connection positions. + # + # Note: the rest of the code here assumes this is the only remaining + # connection position. sql = """ DELETE FROM sliding_sync_connection_positions WHERE connection_key = ? AND connection_position != ? @@ -515,6 +518,41 @@ def _get_and_clear_connection_positions_txn( required_state_map=required_state_map[required_state_id], ) + # Clean up any required state IDs that are no longer used by any + # connection position on this connection. + # + # We store the required state config per-connection per-room. Since this + # can be a lot of data, we deduplicate the required state JSON and store + # it separately, with multiple rooms referencing the same required state + # ID. Over time as the required state configs change, some required + # state IDs may no longer be referenced by any room config, so we need + # to clean them up. + # + # We do this by noting that we have pulled out *all* rows from + # `sliding_sync_connection_required_state` for this connection above. We + # have also pulled out all referenced required state IDs for *this* + # connection position, which is the only connection position that + # remains (we deleted the others above). + # + # Thus we can compute the unused required state IDs by looking for any + # required state IDs that are not referenced by the remaining connection + # position. + used_required_state_ids = { + required_state_id for _, _, required_state_id in room_config_rows + } + + unused_required_state_ids = required_state_map.keys() - used_required_state_ids + if unused_required_state_ids: + self.db_pool.simple_delete_many_batch_txn( + txn, + table="sliding_sync_connection_required_state", + keys=("connection_key", "required_state_id"), + values=[ + (connection_key, required_state_id) + for required_state_id in unused_required_state_ids + ], + ) + # Now look up the per-room stream data. rooms: dict[str, HaveSentRoom[str]] = {} receipts: dict[str, HaveSentRoom[str]] = {} From 80b73196c3cc2b49c5967674a10a597edd657892 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Dec 2025 10:41:57 +0000 Subject: [PATCH 2/7] Newsfile --- changelog.d/19306.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/19306.misc diff --git a/changelog.d/19306.misc b/changelog.d/19306.misc new file mode 100644 index 00000000000..463f87eac3a --- /dev/null +++ b/changelog.d/19306.misc @@ -0,0 +1 @@ +Prune stale entries from `sliding_sync_connection_required_state` table. From 9887b8e75460291af2fecccd4d2e1e3d08b3868c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 21 Jan 2026 11:07:37 +0000 Subject: [PATCH 3/7] s/required_state_map/stored_required_state_id_maps/ --- synapse/storage/databases/main/sliding_sync.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/main/sliding_sync.py b/synapse/storage/databases/main/sliding_sync.py index 3c32550e971..4bd4ed23272 100644 --- a/synapse/storage/databases/main/sliding_sync.py +++ b/synapse/storage/databases/main/sliding_sync.py @@ -488,9 +488,10 @@ def _get_and_clear_connection_positions_txn( ), ) - required_state_map: dict[int, dict[str, set[str]]] = {} + # Map from required_state_id -> event type -> set of state keys. + stored_required_state_id_maps: dict[int, dict[str, set[str]]] = {} for row in rows: - state = required_state_map[row[0]] = {} + state = stored_required_state_id_maps[row[0]] = {} for event_type, state_key in db_to_json(row[1]): state.setdefault(event_type, set()).add(state_key) @@ -515,7 +516,7 @@ def _get_and_clear_connection_positions_txn( ) in room_config_rows: room_configs[room_id] = RoomSyncConfig( timeline_limit=timeline_limit, - required_state_map=required_state_map[required_state_id], + required_state_map=stored_required_state_id_maps[required_state_id], ) # Clean up any required state IDs that are no longer used by any @@ -541,7 +542,9 @@ def _get_and_clear_connection_positions_txn( required_state_id for _, _, required_state_id in room_config_rows } - unused_required_state_ids = required_state_map.keys() - used_required_state_ids + unused_required_state_ids = ( + stored_required_state_id_maps.keys() - used_required_state_ids + ) if unused_required_state_ids: self.db_pool.simple_delete_many_batch_txn( txn, From 752c4a1ed1484080c8efc893a80ec6fa37ac982e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 21 Jan 2026 11:32:09 +0000 Subject: [PATCH 4/7] Add test --- tests/storage/test_sliding_sync_tables.py | 105 ++++++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/tests/storage/test_sliding_sync_tables.py b/tests/storage/test_sliding_sync_tables.py index cb9be29c5d7..0b48ceaedec 100644 --- a/tests/storage/test_sliding_sync_tables.py +++ b/tests/storage/test_sliding_sync_tables.py @@ -3120,6 +3120,111 @@ def test_lazy_loading_room_members_last_seen_ts(self) -> None: # The timestamp for user1 should be updated. self.assertGreater(lazy_member_entries[user1_id], prev_timestamp) + def test_pruning_sliding_sync_connection_required_state(self) -> None: + """Test that we prune old entries from + `sliding_sync_connection_required_state`. + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + self.helper.send_state( + room_id, EventTypes.Name, {"name": "A room"}, tok=user1_tok + ) + + # Do an initial sync, this will pull down the above room and thus cause + # us to store a single required state entry for the room. + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Member, StateValues.LAZY], + ], + "timeline_limit": 1, + } + } + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Check that we have an entry in sliding_sync_connection_required_state + connection_pos1 = self.get_success( + SlidingSyncStreamToken.from_string(self.store, from_token) + ).connection_position + + connection_key = self.get_success( + self.store.db_pool.simple_select_one_onecol( + table="sliding_sync_connection_positions", + keyvalues={"connection_position": connection_pos1}, + retcol="connection_key", + ) + ) + + required_state_entries = self.get_success( + self.store.db_pool.simple_select_list( + table="sliding_sync_connection_required_state", + keyvalues={"connection_key": connection_key}, + retcols=("required_state_id", "required_state"), + ) + ) + + # We expect a single entry here for the one room ID. + self.assertEqual(len(required_state_entries), 1) + first_required_state_id = required_state_entries[0][0] + + # Update the sync body to request more required state, so that we get + # another entry in the table. + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Name, ""], + [EventTypes.Member, StateValues.LAZY], + ], + "timeline_limit": 1, + } + } + } + + # We need to send a message to cause the room to come down the next + # sync. This shouldn't be necessary, but we don't currently implement + # immediately sending down the room when required_state is updated. + self.helper.send(room_id, "msg1", tok=user1_tok) + + _, from_token = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + required_state_entries = self.get_success( + self.store.db_pool.simple_select_list( + table="sliding_sync_connection_required_state", + keyvalues={"connection_key": connection_key}, + retcols=("required_state_id", "required_state"), + ) + ) + + # We expect two entries here, one for old state and one for new state. + # The old entry doesn't get pruned yet as the previous from_token could + # still be used. + self.assertEqual(len(required_state_entries), 2) + + # Sync again with the latest token. This time we expect the old + # entry to be pruned. + self.do_sync(sync_body, since=from_token, tok=user1_tok) + + required_state_entries = self.get_success( + self.store.db_pool.simple_select_list( + table="sliding_sync_connection_required_state", + keyvalues={"connection_key": connection_key}, + retcols=("required_state_id", "required_state"), + ) + ) + + self.assertEqual(len(required_state_entries), 1) + + # Double check that we have pruned the old entry. + self.assertNotEqual(required_state_entries[0][0], first_required_state_id) + class SlidingSyncTablesBackgroundUpdatesTestCase(SlidingSyncTablesTestCaseBase): """ From c16ed7ff56096ef4105d33d58fb1241a98fb9bfb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 22 Jan 2026 11:26:45 +0000 Subject: [PATCH 5/7] Update tests/storage/test_sliding_sync_tables.py Co-authored-by: Eric Eastwood --- tests/storage/test_sliding_sync_tables.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/storage/test_sliding_sync_tables.py b/tests/storage/test_sliding_sync_tables.py index 0b48ceaedec..aef0649775e 100644 --- a/tests/storage/test_sliding_sync_tables.py +++ b/tests/storage/test_sliding_sync_tables.py @@ -3190,7 +3190,8 @@ def test_pruning_sliding_sync_connection_required_state(self) -> None: # We need to send a message to cause the room to come down the next # sync. This shouldn't be necessary, but we don't currently implement - # immediately sending down the room when required_state is updated. + # immediately sending down the room when required_state is updated, + # see https://github.com/element-hq/synapse/issues/18844 self.helper.send(room_id, "msg1", tok=user1_tok) _, from_token = self.do_sync(sync_body, since=from_token, tok=user1_tok) From 67aadb8117c4f3ce9a572135ab07467dd9717a49 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 22 Jan 2026 11:28:07 +0000 Subject: [PATCH 6/7] Update synapse/storage/databases/main/sliding_sync.py Co-authored-by: Eric Eastwood --- synapse/storage/databases/main/sliding_sync.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/synapse/storage/databases/main/sliding_sync.py b/synapse/storage/databases/main/sliding_sync.py index 4bd4ed23272..9a09c0f9b54 100644 --- a/synapse/storage/databases/main/sliding_sync.py +++ b/synapse/storage/databases/main/sliding_sync.py @@ -519,24 +519,24 @@ def _get_and_clear_connection_positions_txn( required_state_map=stored_required_state_id_maps[required_state_id], ) - # Clean up any required state IDs that are no longer used by any + # Clean up any `required_state_id`s that are no longer used by any # connection position on this connection. # # We store the required state config per-connection per-room. Since this # can be a lot of data, we deduplicate the required state JSON and store - # it separately, with multiple rooms referencing the same required state - # ID. Over time as the required state configs change, some required - # state IDs may no longer be referenced by any room config, so we need + # it separately, with multiple rooms referencing the same `required_state_id`. + # Over time as the required state configs change, some `required_state_id`s + # may no longer be referenced by any room config, so we need # to clean them up. # # We do this by noting that we have pulled out *all* rows from # `sliding_sync_connection_required_state` for this connection above. We - # have also pulled out all referenced required state IDs for *this* + # have also pulled out all referenced `required_state_id`s for *this* # connection position, which is the only connection position that # remains (we deleted the others above). # - # Thus we can compute the unused required state IDs by looking for any - # required state IDs that are not referenced by the remaining connection + # Thus we can compute the unused `required_state_id`s by looking for any + # `required_state_id`s that are not referenced by the remaining connection # position. used_required_state_ids = { required_state_id for _, _, required_state_id in room_config_rows From edb467fa4aeaff0f5fa11109d0d80bc76bf29b85 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 22 Jan 2026 11:41:40 +0000 Subject: [PATCH 7/7] Add test for forked positions --- tests/storage/test_sliding_sync_tables.py | 136 ++++++++++++++++++++++ 1 file changed, 136 insertions(+) diff --git a/tests/storage/test_sliding_sync_tables.py b/tests/storage/test_sliding_sync_tables.py index aef0649775e..f5bbd496631 100644 --- a/tests/storage/test_sliding_sync_tables.py +++ b/tests/storage/test_sliding_sync_tables.py @@ -3226,6 +3226,142 @@ def test_pruning_sliding_sync_connection_required_state(self) -> None: # Double check that we have pruned the old entry. self.assertNotEqual(required_state_entries[0][0], first_required_state_id) + def test_pruning_sliding_sync_connection_required_state_forks(self) -> None: + """Test that we prune entries in + `sliding_sync_connection_required_state` for forked positions. + """ + + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + self.helper.send_state( + room_id, EventTypes.Name, {"name": "A room"}, tok=user1_tok + ) + + # Do an initial sync, this will pull down the above room and thus cause + # us to store a single required state entry for the room. + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Member, StateValues.LAZY], + ], + "timeline_limit": 1, + } + } + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) + + # Check that we have an entry in sliding_sync_connection_required_state + connection_pos1 = self.get_success( + SlidingSyncStreamToken.from_string(self.store, from_token) + ).connection_position + + connection_key = self.get_success( + self.store.db_pool.simple_select_one_onecol( + table="sliding_sync_connection_positions", + keyvalues={"connection_position": connection_pos1}, + retcol="connection_key", + ) + ) + + required_state_entries = self.get_success( + self.store.db_pool.simple_select_list( + table="sliding_sync_connection_required_state", + keyvalues={"connection_key": connection_key}, + retcols=("required_state_id", "required_state"), + ) + ) + + # We expect a single entry here for the one room ID. + self.assertEqual(len(required_state_entries), 1) + first_required_state_id = required_state_entries[0][0] + + # Update the sync body to request more required state, so that we get + # another entry in the table. + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Name, ""], + [EventTypes.Member, StateValues.LAZY], + ], + "timeline_limit": 1, + } + } + } + + # We need to send a message to cause the room to come down the next + # sync. This shouldn't be necessary, but we don't currently implement + # immediately sending down the room when required_state is updated, + # see https://github.com/element-hq/synapse/issues/18844 + self.helper.send(room_id, "msg1", tok=user1_tok) + + _, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + required_state_entries = self.get_success( + self.store.db_pool.simple_select_list( + table="sliding_sync_connection_required_state", + keyvalues={"connection_key": connection_key}, + retcols=("required_state_id", "required_state"), + ) + ) + + # We expect two entries here, one for old state and one for new state. + # The old entry doesn't get pruned yet as the previous from_token could + # still be used. + self.assertEqual(len(required_state_entries), 2) + second_required_state_id = sorted(required_state_entries)[1][0] + + # We sync again, but with the old token, creating a fork in the + # connection positions. We change the sync body again so that the + # `required_state` doesn't get deduplicated. + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + [EventTypes.Topic, ""], + [EventTypes.Member, StateValues.LAZY], + ], + "timeline_limit": 1, + } + } + } + _, from_token = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + # There should now be three entries, one for each of the required_state. + required_state_entries = self.get_success( + self.store.db_pool.simple_select_list( + table="sliding_sync_connection_required_state", + keyvalues={"connection_key": connection_key}, + retcols=("required_state_id", "required_state"), + ) + ) + + self.assertEqual(len(required_state_entries), 3) + + # Sync again with the latest token. This should prune all except the + # latest entry in `sliding_sync_connection_required_state`. + _, from_token = self.do_sync(sync_body, since=from_token, tok=user1_tok) + + required_state_entries = self.get_success( + self.store.db_pool.simple_select_list( + table="sliding_sync_connection_required_state", + keyvalues={"connection_key": connection_key}, + retcols=("required_state_id", "required_state"), + ) + ) + + self.assertEqual(len(required_state_entries), 1) + + # Double check that we have pruned the old entry. + self.assertNotEqual(required_state_entries[0][0], first_required_state_id) + self.assertNotEqual(required_state_entries[0][0], second_required_state_id) + class SlidingSyncTablesBackgroundUpdatesTestCase(SlidingSyncTablesTestCaseBase): """