Skip to content

Commit

Permalink
Faster partial join to room with complex auth graph (#7)
Browse files Browse the repository at this point in the history
Instead of persisting outliers in a bunch of batches, let's just do them
all at once.

This is fine because all `_auth_and_persist_outliers_inner` is doing is
checking the auth rules for each event, which requires the events to be
topologically sorted by the auth graph.
  • Loading branch information
erikjohnston committed Jan 10, 2024
1 parent a0f0fdf commit c3f2f0f
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 49 deletions.
1 change: 1 addition & 0 deletions changelog.d/7.misc
@@ -0,0 +1 @@
Faster partial join to room with complex auth graph.
79 changes: 30 additions & 49 deletions synapse/handlers/federation_event.py
Expand Up @@ -94,7 +94,7 @@
)
from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.iterutils import batch_iter, partition, sorted_topologically_batched
from synapse.util.iterutils import batch_iter, partition, sorted_topologically
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr

Expand Down Expand Up @@ -1678,65 +1678,44 @@ async def _auth_and_persist_outliers(

# We need to persist an event's auth events before the event.
auth_graph = {
ev: [event_map[e_id] for e_id in ev.auth_event_ids() if e_id in event_map]
ev.event_id: [e_id for e_id in ev.auth_event_ids() if e_id in event_map]
for ev in event_map.values()
}
for roots in sorted_topologically_batched(event_map.values(), auth_graph):
if not roots:
# if *none* of the remaining events are ready, that means
# we have a loop. This either means a bug in our logic, or that
# somebody has managed to create a loop (which requires finding a
# hash collision in room v2 and later).
logger.warning(
"Loop found in auth events while fetching missing state/auth "
"events: %s",
shortstr(event_map.keys()),
)
return

logger.info(
"Persisting %i of %i remaining outliers: %s",
len(roots),
len(event_map),
shortstr(e.event_id for e in roots),
)

await self._auth_and_persist_outliers_inner(room_id, roots)

async def _auth_and_persist_outliers_inner(
self, room_id: str, fetched_events: Collection[EventBase]
) -> None:
"""Helper for _auth_and_persist_outliers
Persists a batch of events where we have (theoretically) already persisted all
of their auth events.
Marks the events as outliers, auths them, persists them to the database, and,
where appropriate (eg, an invite), awakes the notifier.
sorted_auth_event_ids = sorted_topologically(event_map.keys(), auth_graph)
sorted_auth_events = [event_map[e_id] for e_id in sorted_auth_event_ids]
logger.info(
"Persisting %i remaining outliers: %s",
len(sorted_auth_events),
shortstr(e.event_id for e in sorted_auth_events),
)

Params:
origin: where the events came from
room_id: the room that the events are meant to be in (though this has
not yet been checked)
fetched_events: the events to persist
"""
# get all the auth events for all the events in this batch. By now, they should
# have been persisted.
auth_events = {
aid for event in fetched_events for aid in event.auth_event_ids()
auth_event_ids = {
aid for event in sorted_auth_events for aid in event.auth_event_ids()
}
auth_map = {
ev.event_id: ev
for ev in sorted_auth_events
if ev.event_id in auth_event_ids
}
persisted_events = await self._store.get_events(
auth_events,
allow_rejected=True,
)

missing_events = auth_event_ids.difference(auth_map)
if missing_events:
persisted_events = await self._store.get_events(
missing_events,
allow_rejected=True,
redact_behaviour=EventRedactBehaviour.as_is,
)
auth_map.update(persisted_events)

events_and_contexts_to_persist: List[Tuple[EventBase, EventContext]] = []

async def prep(event: EventBase) -> None:
with nested_logging_context(suffix=event.event_id):
auth = []
for auth_event_id in event.auth_event_ids():
ae = persisted_events.get(auth_event_id)
ae = auth_map.get(auth_event_id)
if not ae:
# the fact we can't find the auth event doesn't mean it doesn't
# exist, which means it is premature to reject `event`. Instead we
Expand All @@ -1755,7 +1734,9 @@ async def prep(event: EventBase) -> None:
context = EventContext.for_outlier(self._storage_controllers)
try:
validate_event_for_room_version(event)
await check_state_independent_auth_rules(self._store, event)
await check_state_independent_auth_rules(
self._store, event, batched_auth_events=auth_map
)
check_state_dependent_auth_rules(event, auth)
except AuthError as e:
logger.warning("Rejecting %r because %s", event, e)
Expand All @@ -1772,7 +1753,7 @@ async def prep(event: EventBase) -> None:

events_and_contexts_to_persist.append((event, context))

for event in fetched_events:
for event in sorted_auth_events:
await prep(event)

await self.persist_events_and_notify(
Expand Down

0 comments on commit c3f2f0f

Please sign in to comment.