Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

/sync: fix bug in calculating state response #16930

Merged
merged 2 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16930.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a long-standing bug which could cause state to be omitted from `/sync` responses when certain events are filtered out of the timeline.
54 changes: 13 additions & 41 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1187,15 +1187,14 @@ async def _compute_state_delta_for_full_sync(
await_full_state = True
lazy_load_members = False

if batch:
state_at_timeline_end = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[-1].event_id,
state_filter=state_filter,
await_full_state=await_full_state,
)
)
state_at_timeline_end = await self.get_state_at(
room_id,
stream_position=end_token,
state_filter=state_filter,
await_full_state=await_full_state,
)

if batch:
state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id,
Expand All @@ -1204,13 +1203,6 @@ async def _compute_state_delta_for_full_sync(
)
)
else:
state_at_timeline_end = await self.get_state_at(
room_id,
stream_position=end_token,
state_filter=state_filter,
await_full_state=await_full_state,
)

state_at_timeline_start = state_at_timeline_end

state_ids = _calculate_state(
Expand Down Expand Up @@ -1305,23 +1297,12 @@ async def _compute_state_delta_for_incremental_sync(
await_full_state=await_full_state,
)

if batch:
state_at_timeline_end = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[-1].event_id,
state_filter=state_filter,
await_full_state=await_full_state,
)
)
else:
# We can get here if the user has ignored the senders of all
# the recent events.
state_at_timeline_end = await self.get_state_at(
room_id,
stream_position=end_token,
state_filter=state_filter,
await_full_state=await_full_state,
)
state_at_timeline_end = await self.get_state_at(
room_id,
stream_position=end_token,
state_filter=state_filter,
await_full_state=await_full_state,
)

state_ids = _calculate_state(
timeline_contains=timeline_state,
Expand Down Expand Up @@ -2847,15 +2828,6 @@ def _calculate_state(
# timeline. We have no way to represent that in the /sync response, and we don't
# even try; it is ether omitted or plonked into `state` as if it were at the start
# of the timeline, depending on what else is in the timeline.)
#
# ----------
#
# Aside 2: it's worth noting that `timeline_end`, as provided to us, is actually
# the state *before* the final event in the timeline. In other words: if the final
# event in the timeline is a state event, it won't be included in `timeline_end`.
# However, that doesn't matter here, because the only difference can be in that
# one piece of state, and by definition that event is in the timeline, so we
# don't need to include it in the `state` section.

state_ids = (
(timeline_end_ids | timeline_start_ids)
Expand Down
80 changes: 80 additions & 0 deletions tests/handlers/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,86 @@ def test_state_includes_changes_on_forks(self) -> None:
[s2_event],
)

def test_state_includes_changes_on_forks_when_events_excluded(self) -> None:
"""A variation on the previous test, but where one event is filtered

The DAG is the same as the previous test, but E4 is excluded by the filter.

E1
↗ ↖
| S2
| ↑
--|------|----
| |
E3 |
↖ /
(E4)

"""

alice = self.register_user("alice", "password")
alice_tok = self.login(alice, "password")
alice_requester = create_requester(alice)
room_id = self.helper.create_room_as(alice, is_public=True, tok=alice_tok)

# Do an initial sync as Alice to get a known starting point.
initial_sync_result = self.get_success(
self.sync_handler.wait_for_sync_for_user(
alice_requester, generate_sync_config(alice)
)
)
last_room_creation_event_id = (
initial_sync_result.joined[0].timeline.events[-1].event_id
)

# Send a state event, and a regular event, both using the same prev ID
with self._patch_get_latest_events([last_room_creation_event_id]):
s2_event = self.helper.send_state(room_id, "s2", {}, tok=alice_tok)[
"event_id"
]
e3_event = self.helper.send(room_id, "e3", tok=alice_tok)["event_id"]

# Send a final event, joining the two branches of the dag
self.helper.send(room_id, "e4", type="not_a_normal_message", tok=alice_tok)[
"event_id"
]

# do an incremental sync, with a filter that will only return E3, excluding S2
# and E4.
incremental_sync = self.get_success(
self.sync_handler.wait_for_sync_for_user(
alice_requester,
generate_sync_config(
alice,
filter_collection=FilterCollection(
self.hs,
{
"room": {
"timeline": {
"limit": 1,
"not_types": ["not_a_normal_message"],
}
}
},
),
),
since_token=initial_sync_result.next_batch,
)
)

# The state event should appear in the 'state' section of the response.
room_sync = incremental_sync.joined[0]
self.assertEqual(room_sync.room_id, room_id)
self.assertTrue(room_sync.timeline.limited)
self.assertEqual(
[e.event_id for e in room_sync.timeline.events],
[e3_event],
)
self.assertEqual(
[e.event_id for e in room_sync.state.values()],
[s2_event],
)

@parameterized.expand(
[
(False, False),
Expand Down