Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug in calculating state for non-gappy syncs #16942

Merged
merged 2 commits into from Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion changelog.d/16930.bugfix
@@ -1 +1 @@
Fix a long-standing bug which could cause state to be omitted from `/sync` responses when certain events are filtered out of the timeline.
Fix various long-standing bugs which could cause incorrect state to be returned from `/sync` in certain situations.
2 changes: 1 addition & 1 deletion changelog.d/16932.bugfix
@@ -1 +1 @@
Fix a long-standing bug which could cause incorrect state to be returned from `/sync` for rooms where the user has left.
Fix various long-standing bugs which could cause incorrect state to be returned from `/sync` in certain situations.
1 change: 1 addition & 0 deletions changelog.d/16942.bugfix
@@ -0,0 +1 @@
Fix various long-standing bugs which could cause incorrect state to be returned from `/sync` in certain situations.
91 changes: 37 additions & 54 deletions synapse/handlers/sync.py
Expand Up @@ -1195,6 +1195,8 @@ async def _compute_state_delta_for_full_sync(
)

if batch:
# Strictly speaking, this returns the state *after* the first event in the
# timeline, but that is good enough here.
state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id,
Expand Down Expand Up @@ -1257,25 +1259,25 @@ async def _compute_state_delta_for_incremental_sync(
await_full_state = True
lazy_load_members = False

if batch.limited:
if batch:
state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id,
state_filter=state_filter,
await_full_state=await_full_state,
)
)
else:
# We can get here if the user has ignored the senders of all
# the recent events.
state_at_timeline_start = await self.get_state_at(
room_id,
stream_position=end_token,
if batch:
state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id,
state_filter=state_filter,
await_full_state=await_full_state,
)
)
else:
# We can get here if the user has ignored the senders of all
# the recent events.
state_at_timeline_start = await self.get_state_at(
room_id,
stream_position=end_token,
state_filter=state_filter,
await_full_state=await_full_state,
)

if batch.limited:
# for now, we disable LL for gappy syncs - see
# https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346
# N.B. this slows down incr syncs as we are now processing way
Expand All @@ -1290,47 +1292,28 @@ async def _compute_state_delta_for_incremental_sync(
# about them).
state_filter = StateFilter.all()

state_at_previous_sync = await self.get_state_at(
room_id,
stream_position=since_token,
state_filter=state_filter,
await_full_state=await_full_state,
)
state_at_previous_sync = await self.get_state_at(
room_id,
stream_position=since_token,
state_filter=state_filter,
await_full_state=await_full_state,
)

state_at_timeline_end = await self.get_state_at(
room_id,
stream_position=end_token,
state_filter=state_filter,
await_full_state=await_full_state,
)
state_at_timeline_end = await self.get_state_at(
room_id,
stream_position=end_token,
state_filter=state_filter,
await_full_state=await_full_state,
)

state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_at_timeline_start,
timeline_end=state_at_timeline_end,
previous_timeline_end=state_at_previous_sync,
lazy_load_members=lazy_load_members,
)

state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_at_timeline_start,
timeline_end=state_at_timeline_end,
previous_timeline_end=state_at_previous_sync,
lazy_load_members=lazy_load_members,
)
else:
state_ids = {}
if lazy_load_members:
if members_to_fetch and batch.events:
# We're returning an incremental sync, with no
# "gap" since the previous sync, so normally there would be
# no state to return.
# But we're lazy-loading, so the client might need some more
# member events to understand the events in this timeline.
# So we fish out all the member events corresponding to the
# timeline here. The caller will then dedupe any redundant ones.

state_ids = await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id,
# we only want members!
state_filter=StateFilter.from_types(
(EventTypes.Member, member) for member in members_to_fetch
),
await_full_state=False,
)
return state_ids

async def _find_missing_partial_state_memberships(
Expand Down
105 changes: 105 additions & 0 deletions tests/handlers/test_sync.py
Expand Up @@ -435,6 +435,111 @@ def test_state_includes_changes_on_forks_when_events_excluded(self) -> None:
[s2_event],
)

def test_state_includes_changes_on_ungappy_syncs(self) -> None:
"""Test `state` where the sync is not gappy.

We start with a DAG like this:

E1
↗ ↖
| S2
|
--|---
|
E3

... and initialsync with `limit=1`, represented by the horizontal dashed line.
At this point, we do not expect S2 to appear in the response at all (since
it is excluded from the timeline by the `limit`, and the state is based on the
state after the most recent event before the sync token (E3), which doesn't
include S2.

Now more events arrive, and we do an incremental sync:

E1
↗ ↖
| S2
| ↑
E3 |
↑ |
--|------|----
| |
E4 |
↖ /
E5

This is the last chance for us to tell the client about S2, so it *must* be
included in the response.
"""
alice = self.register_user("alice", "password")
alice_tok = self.login(alice, "password")
alice_requester = create_requester(alice)
room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok)

# Do an initial sync to get a known starting point.
initial_sync_result = self.get_success(
self.sync_handler.wait_for_sync_for_user(
alice_requester, generate_sync_config(alice)
)
)
last_room_creation_event_id = (
initial_sync_result.joined[0].timeline.events[-1].event_id
)

# Send a state event, and a regular event, both using the same prev ID
with self._patch_get_latest_events([last_room_creation_event_id]):
s2_event = self.helper.send_state(room_id, "s2", {}, tok=alice_tok)[
"event_id"
]
e3_event = self.helper.send(room_id, "e3", tok=alice_tok)["event_id"]

# Another initial sync, with limit=1
initial_sync_result = self.get_success(
self.sync_handler.wait_for_sync_for_user(
alice_requester,
generate_sync_config(
alice,
filter_collection=FilterCollection(
self.hs, {"room": {"timeline": {"limit": 1}}}
),
),
)
)
room_sync = initial_sync_result.joined[0]
self.assertEqual(room_sync.room_id, room_id)
self.assertEqual(
[e.event_id for e in room_sync.timeline.events],
[e3_event],
)
self.assertNotIn(s2_event, [e.event_id for e in room_sync.state.values()])

# More events, E4 and E5
with self._patch_get_latest_events([e3_event]):
e4_event = self.helper.send(room_id, "e4", tok=alice_tok)["event_id"]
e5_event = self.helper.send(room_id, "e5", tok=alice_tok)["event_id"]

# Now incremental sync
incremental_sync = self.get_success(
self.sync_handler.wait_for_sync_for_user(
alice_requester,
generate_sync_config(alice),
since_token=initial_sync_result.next_batch,
)
)

# The state event should appear in the 'state' section of the response.
room_sync = incremental_sync.joined[0]
self.assertEqual(room_sync.room_id, room_id)
self.assertFalse(room_sync.timeline.limited)
self.assertEqual(
[e.event_id for e in room_sync.timeline.events],
[e4_event, e5_event],
)
self.assertEqual(
[e.event_id for e in room_sync.state.values()],
[s2_event],
)

@parameterized.expand(
[
(False, False),
Expand Down