Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/19211.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Expire sliding sync connections that are too old or have too much pending data.
59 changes: 57 additions & 2 deletions synapse/handlers/sliding_sync/room_lists.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@
EventTypes,
Membership,
)
from synapse.api.errors import SlidingSyncUnknownPosition
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import StrippedStateEvent
from synapse.events.utils import parse_stripped_state_event
from synapse.logging.opentracing import start_active_span, trace
from synapse.storage.databases.main.sliding_sync import UPDATE_INTERVAL_LAST_USED_TS_MS
from synapse.storage.databases.main.state import (
ROOM_UNKNOWN_SENTINEL,
Sentinel as StateSentinel,
Expand Down Expand Up @@ -68,6 +70,7 @@
)
from synapse.types.state import StateFilter
from synapse.util import MutableOverlayMapping
from synapse.util.constants import MILLISECONDS_PER_SECOND, ONE_HOUR_SECONDS
from synapse.util.sentinel import Sentinel

if TYPE_CHECKING:
Expand All @@ -77,6 +80,27 @@
logger = logging.getLogger(__name__)


# Minimum time in milliseconds since the last sync before we consider expiring
# the connection due to too many rooms to send. This stops from getting into
# tight loops with clients that request lots of data at once.
#
# c.f. `NUM_ROOMS_THRESHOLD`. These values are somewhat arbitrary picked.
MINIMUM_NOT_USED_AGE_EXPIRY_MS = ONE_HOUR_SECONDS * MILLISECONDS_PER_SECOND

# How many rooms with updates we allow before we consider the connection expired
# due to too many rooms to send.
#
# c.f. `MINIMUM_NOT_USED_AGE_EXPIRY_MS`. These values are somewhat arbitrary
# picked.
NUM_ROOMS_THRESHOLD = 100

# Sanity check that our minimum age is sensible compared to the update interval,
# i.e. if `MINIMUM_NOT_USED_AGE_EXPIRY_MS` is too small then we might expire the
# connection even if it is actively being used (and we're just not updating the
# DB frequently enough). We arbitrarily double the update interval to give some
# wiggle room.
assert 2 * UPDATE_INTERVAL_LAST_USED_TS_MS < MINIMUM_NOT_USED_AGE_EXPIRY_MS

# Helper definition for the types that we might return. We do this to avoid
# copying data between types (which can be expensive for many rooms).
RoomsForUserType = RoomsForUserStateReset | RoomsForUser | RoomsForUserSlidingSync
Expand Down Expand Up @@ -176,6 +200,7 @@ def __init__(self, hs: "HomeServer"):
self.storage_controllers = hs.get_storage_controllers()
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
self.is_mine_id = hs.is_mine_id
self._clock = hs.get_clock()

async def compute_interested_rooms(
self,
Expand Down Expand Up @@ -857,11 +882,41 @@ async def _filter_relevant_rooms_to_send(

# We only need to check for new events since any state changes
# will also come down as new events.
rooms_that_have_updates = (
self.store.get_rooms_that_might_have_updates(

rooms_that_have_updates = await (
self.store.get_rooms_that_have_updates_since_sliding_sync_table(
relevant_room_map.keys(), from_token.room_key
)
)

# Check if we have lots of updates to send, if so then its
# better for us to tell the client to do a full resync
# instead (to try and avoid long SSS response times when
# there is new data).
#
# Due to the construction of the SSS API, the client is in
# charge of setting the range of rooms to request updates
# for. Generally, it will start with a small range and then
# expand (and occasionally it may contract the range again
# if its been offline for a while). If we know there are a
# lot of updates, it's better to reset the connection and
# wait for the client to start again (with a much smaller
# range) than to try and send down a large number of updates
# (which can take a long time).
#
# We only do this if the last sync was over
# `MINIMUM_NOT_USED_AGE_EXPIRY_MS` to ensure we don't get
# into tight loops with clients that keep requesting large
# sliding sync windows.
if len(rooms_that_have_updates) > NUM_ROOMS_THRESHOLD:
last_sync_ts = previous_connection_state.last_used_ts
if (
last_sync_ts is not None
and (self._clock.time_msec() - last_sync_ts)
> MINIMUM_NOT_USED_AGE_EXPIRY_MS
):
raise SlidingSyncUnknownPosition()

rooms_should_send.update(rooms_that_have_updates)
relevant_rooms_to_send_map = {
room_id: room_sync_config
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/sliding_sync/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async def get_and_clear_connection_positions(
"""
# If this is our first request, there is no previous connection state to fetch out of the database
if from_token is None or from_token.connection_position == 0:
return PerConnectionState()
return PerConnectionState(last_used_ts=None)

conn_id = sync_config.conn_id or ""

Expand Down
68 changes: 66 additions & 2 deletions synapse/storage/databases/main/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from synapse.api.errors import SlidingSyncUnknownPosition
from synapse.logging.opentracing import log_kv
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import (
DatabasePool,
Expand All @@ -36,6 +37,12 @@
RoomSyncConfig,
)
from synapse.util.caches.descriptors import cached
from synapse.util.constants import (
MILLISECONDS_PER_SECOND,
ONE_DAY_SECONDS,
ONE_HOUR_SECONDS,
ONE_MINUTE_SECONDS,
)
from synapse.util.json import json_encoder

if TYPE_CHECKING:
Expand All @@ -45,6 +52,21 @@
logger = logging.getLogger(__name__)


# How often to update the `last_used_ts` column on
# `sliding_sync_connection_positions` when the client uses a connection
# position. We don't want to update it on every use to avoid excessive
# writes, but we want it to be reasonably up-to-date to help with
# cleaning up old connection positions.
UPDATE_INTERVAL_LAST_USED_TS_MS = 5 * ONE_MINUTE_SECONDS * MILLISECONDS_PER_SECOND

# Time in milliseconds the connection hasn't been used before we consider it
# expired and delete it.
CONNECTION_EXPIRY_MS = 7 * ONE_DAY_SECONDS * MILLISECONDS_PER_SECOND

# How often we run the background process to delete old sliding sync connections.
CONNECTION_EXPIRY_FREQUENCY_MS = ONE_HOUR_SECONDS * MILLISECONDS_PER_SECOND


class SlidingSyncStore(SQLBaseStore):
def __init__(
self,
Expand Down Expand Up @@ -76,6 +98,12 @@ def __init__(
replaces_index="sliding_sync_membership_snapshots_user_id",
)

if self.hs.config.worker.run_background_tasks:
self.clock.looping_call(
self.delete_old_sliding_sync_connections,
CONNECTION_EXPIRY_FREQUENCY_MS,
)

async def get_latest_bump_stamp_for_room(
self,
room_id: str,
Expand Down Expand Up @@ -202,6 +230,7 @@ def persist_per_connection_state_txn(
"effective_device_id": device_id,
"conn_id": conn_id,
"created_ts": self.clock.time_msec(),
"last_used_ts": self.clock.time_msec(),
},
returning=("connection_key",),
)
Expand Down Expand Up @@ -384,7 +413,7 @@ def _get_and_clear_connection_positions_txn(
# The `previous_connection_position` is a user-supplied value, so we
# need to make sure that the one they supplied is actually theirs.
sql = """
SELECT connection_key
SELECT connection_key, last_used_ts
FROM sliding_sync_connection_positions
INNER JOIN sliding_sync_connections USING (connection_key)
WHERE
Expand All @@ -396,7 +425,20 @@ def _get_and_clear_connection_positions_txn(
if row is None:
raise SlidingSyncUnknownPosition()

(connection_key,) = row
(connection_key, last_used_ts) = row

# Update the `last_used_ts` if it's due to be updated. We don't update
# every time to avoid excessive writes.
now = self.clock.time_msec()
if last_used_ts is None or now - last_used_ts > UPDATE_INTERVAL_LAST_USED_TS_MS:
self.db_pool.simple_update_txn(
txn,
table="sliding_sync_connections",
keyvalues={
"connection_key": connection_key,
},
updatevalues={"last_used_ts": now},
)

# Now that we have seen the client has received and used the connection
# position, we can delete all the other connection positions.
Expand Down Expand Up @@ -480,12 +522,30 @@ def _get_and_clear_connection_positions_txn(
logger.warning("Unrecognized sliding sync stream in DB %r", stream)

return PerConnectionStateDB(
last_used_ts=last_used_ts,
rooms=RoomStatusMap(rooms),
receipts=RoomStatusMap(receipts),
account_data=RoomStatusMap(account_data),
room_configs=room_configs,
)

@wrap_as_background_process("delete_old_sliding_sync_connections")
async def delete_old_sliding_sync_connections(self) -> None:
"""Delete sliding sync connections that have not been used for a long time."""
cutoff_ts = self.clock.time_msec() - CONNECTION_EXPIRY_MS

def delete_old_sliding_sync_connections_txn(txn: LoggingTransaction) -> None:
sql = """
DELETE FROM sliding_sync_connections
WHERE last_used_ts IS NOT NULL AND last_used_ts < ?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like we should also have a background job that fills in last_used_ts for all of the existing rows so we can clean them all up at some point.

Could be split out to another PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I am a bit undecided whether next release we just delete all connections with a null last_used_ts and take care of the problem that way. So happy to punt to a separate PR.

"""
txn.execute(sql, (cutoff_ts,))

await self.db_pool.runInteraction(
"delete_old_sliding_sync_connections",
delete_old_sliding_sync_connections_txn,
)


@attr.s(auto_attribs=True, frozen=True)
class PerConnectionStateDB:
Expand All @@ -498,6 +558,8 @@ class PerConnectionStateDB:
When persisting this *only* contains updates to the state.
"""

last_used_ts: int | None

rooms: "RoomStatusMap[str]"
receipts: "RoomStatusMap[str]"
account_data: "RoomStatusMap[str]"
Expand Down Expand Up @@ -553,6 +615,7 @@ async def from_state(
)

return PerConnectionStateDB(
last_used_ts=per_connection_state.last_used_ts,
rooms=RoomStatusMap(rooms),
receipts=RoomStatusMap(receipts),
account_data=RoomStatusMap(account_data),
Expand Down Expand Up @@ -596,6 +659,7 @@ async def to_state(self, store: "DataStore") -> "PerConnectionState":
}

return PerConnectionState(
last_used_ts=self.last_used_ts,
rooms=RoomStatusMap(rooms),
receipts=RoomStatusMap(receipts),
account_data=RoomStatusMap(account_data),
Expand Down
14 changes: 13 additions & 1 deletion synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -740,14 +740,26 @@ async def get_rooms_that_have_updates_since_sliding_sync_table(
from_key: RoomStreamToken,
) -> StrCollection:
"""Return the rooms that probably have had updates since the given
token (changes that are > `from_key`)."""
token (changes that are > `from_key`).

May return false positives, but must not return false negatives.

If `have_finished_sliding_sync_background_jobs` is False, then we return
all the room IDs, as we can't be sure that the sliding sync table is
fully populated.
"""
# If the stream change cache is valid for the stream token, we can just
# use the result of that.
if from_key.stream >= self._events_stream_cache.get_earliest_known_position():
return self._events_stream_cache.get_entities_changed(
room_ids, from_key.stream
)

if not self.have_finished_sliding_sync_background_jobs():
# If the table hasn't been populated yet, we have to assume all rooms
# have updates.
return room_ids

def get_rooms_that_have_updates_since_sliding_sync_table_txn(
txn: LoggingTransaction,
) -> StrCollection:
Expand Down
27 changes: 27 additions & 0 deletions synapse/storage/schema/main/delta/93/03_sss_pos_last_used.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2025 Element Creations, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.

-- Add a timestamp for when the sliding sync connection position was last used,
-- only updated with a small granularity.
--
-- This should be NOT NULL, but we need to consider existing rows. In future we
-- may want to either backfill this or delete all rows with a NULL value (and
-- then make it NOT NULL).
ALTER TABLE sliding_sync_connections ADD COLUMN last_used_ts BIGINT;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does last_used_ts need an index to be efficient with the delete query?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have added a comment:

-- Note: We don't add an index on this column to allow HOT updates on PostgreSQL
-- to reduce the cost of the updates to the column. c.f.
-- https://www.postgresql.org/docs/current/storage-hot.html
--
-- We do query this column directly to find expired connections, but we expect
-- that to be an infrequent operation and a sequential scan should be fine.


-- Note: We don't add an index on this column to allow HOT updates on PostgreSQL
-- to reduce the cost of the updates to the column. c.f.
-- https://www.postgresql.org/docs/current/storage-hot.html
--
-- We do query this column directly to find expired connections, but we expect
-- that to be an infrequent operation and a sequential scan should be fine.
8 changes: 8 additions & 0 deletions synapse/types/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -850,12 +850,16 @@ class PerConnectionState:
since the last time you made a sync request.

Attributes:
last_used_ts: The time this connection was last used, in milliseconds.
This is only accurate to `UPDATE_CONNECTION_STATE_EVERY_MS`.
rooms: The status of each room for the events stream.
receipts: The status of each room for the receipts stream.
room_configs: Map from room_id to the `RoomSyncConfig` of all
rooms that we have previously sent down.
"""

last_used_ts: int | None = None

rooms: RoomStatusMap[RoomStreamToken] = attr.Factory(RoomStatusMap)
receipts: RoomStatusMap[MultiWriterStreamToken] = attr.Factory(RoomStatusMap)
account_data: RoomStatusMap[int] = attr.Factory(RoomStatusMap)
Expand All @@ -867,6 +871,7 @@ def get_mutable(self) -> "MutablePerConnectionState":
room_configs = cast(MutableMapping[str, RoomSyncConfig], self.room_configs)

return MutablePerConnectionState(
last_used_ts=self.last_used_ts,
rooms=self.rooms.get_mutable(),
receipts=self.receipts.get_mutable(),
account_data=self.account_data.get_mutable(),
Expand All @@ -875,6 +880,7 @@ def get_mutable(self) -> "MutablePerConnectionState":

def copy(self) -> "PerConnectionState":
return PerConnectionState(
last_used_ts=self.last_used_ts,
rooms=self.rooms.copy(),
receipts=self.receipts.copy(),
account_data=self.account_data.copy(),
Expand All @@ -889,6 +895,8 @@ def __len__(self) -> int:
class MutablePerConnectionState(PerConnectionState):
"""A mutable version of `PerConnectionState`"""

last_used_ts: int | None

rooms: MutableRoomStatusMap[RoomStreamToken]
receipts: MutableRoomStatusMap[MultiWriterStreamToken]
account_data: MutableRoomStatusMap[int]
Expand Down
1 change: 1 addition & 0 deletions synapse/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@
# readability and catching bugs.
ONE_MINUTE_SECONDS = 60
ONE_HOUR_SECONDS = 60 * ONE_MINUTE_SECONDS
ONE_DAY_SECONDS = 24 * ONE_HOUR_SECONDS

MILLISECONDS_PER_SECOND = 1000
Loading
Loading