Skip to content

Commit

Permalink
Pull out fewer receipts from DB when doing push (#17049)
Browse files Browse the repository at this point in the history
Before we were pulling out *all* read receipts for a user for every
event we pushed. Instead let's only pull out the relevant receipts.

This also pulled out the event rows for each receipt, causing load on
the events table.
  • Loading branch information
erikjohnston committed Apr 5, 2024
1 parent 0e68e9b commit 5360bae
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 22 deletions.
1 change: 1 addition & 0 deletions changelog.d/17049.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve database performance by reducing number of receipts fetched when sending push notifications.
124 changes: 102 additions & 22 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
)
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.types import JsonDict
from synapse.types import JsonDict, StrCollection
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached

Expand Down Expand Up @@ -859,37 +859,86 @@ def f(txn: LoggingTransaction) -> List[str]:

return await self.db_pool.runInteraction("get_push_action_users_in_range", f)

def _get_receipts_by_room_txn(
self, txn: LoggingTransaction, user_id: str
def _get_receipts_for_room_and_threads_txn(
self,
txn: LoggingTransaction,
user_id: str,
room_ids: StrCollection,
thread_ids: StrCollection,
) -> Dict[str, _RoomReceipt]:
"""
Generate a map of room ID to the latest stream ordering that has been
read by the given user.
Get (private) read receipts for a user in each of the given room IDs
and thread IDs.
Args:
txn:
user_id: The user to fetch receipts for.
Note: The corresponding room ID for each thread must appear in
`room_ids` arg.
Returns:
A map including all rooms the user is in with a receipt. It maps
room IDs to _RoomReceipt instances
"""
receipt_types_clause, args = make_in_list_sql_clause(

receipt_types_clause, receipts_args = make_in_list_sql_clause(
self.database_engine,
"receipt_type",
(ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
)

thread_ids_clause, thread_ids_args = make_in_list_sql_clause(
self.database_engine,
"thread_id",
thread_ids,
)

room_ids_clause, room_ids_args = make_in_list_sql_clause(
self.database_engine,
"room_id",
room_ids,
)

# We use the union of two (almost identical) queries here, the first to
# fetch the specific thread receipts and the second to fetch the
# unthreaded receipts.
#
# This SQL is optimized to use the indices we have on
# `receipts_linearized`.
#
# We compare room ID and thread IDs independently due to the above,
# which means that this query might return more rows than we need if the
# same thread ID appears across different rooms (e.g. 'main' thread ID).
# This doesn't cause any logic issues, and isn't a performance concern
# given this function generally gets called with only one room and
# thread ID.
sql = f"""
SELECT room_id, thread_id, MAX(stream_ordering)
FROM receipts_linearized
INNER JOIN events USING (room_id, event_id)
WHERE {receipt_types_clause}
AND {thread_ids_clause}
AND {room_ids_clause}
AND user_id = ?
GROUP BY room_id, thread_id
UNION ALL
SELECT room_id, thread_id, MAX(stream_ordering)
FROM receipts_linearized
INNER JOIN events USING (room_id, event_id)
WHERE {receipt_types_clause}
AND {room_ids_clause}
AND thread_id IS NULL
AND user_id = ?
GROUP BY room_id, thread_id
"""

args.extend((user_id,))
args = list(receipts_args)
args.extend(thread_ids_args)
args.extend(room_ids_args)
args.append(user_id)
args.extend(receipts_args)
args.extend(room_ids_args)
args.append(user_id)

txn.execute(sql, args)

result: Dict[str, _RoomReceipt] = {}
Expand Down Expand Up @@ -925,12 +974,6 @@ async def get_unread_push_actions_for_user_in_range_for_http(
The list will have between 0~limit entries.
"""

receipts_by_room = await self.db_pool.runInteraction(
"get_unread_push_actions_for_user_in_range_http_receipts",
self._get_receipts_by_room_txn,
user_id=user_id,
)

def get_push_actions_txn(
txn: LoggingTransaction,
) -> List[Tuple[str, str, str, int, str, bool]]:
Expand All @@ -952,6 +995,27 @@ def get_push_actions_txn(
"get_unread_push_actions_for_user_in_range_http", get_push_actions_txn
)

room_ids = set()
thread_ids = []
for (
_,
room_id,
thread_id,
_,
_,
_,
) in push_actions:
room_ids.add(room_id)
thread_ids.append(thread_id)

receipts_by_room = await self.db_pool.runInteraction(
"get_unread_push_actions_for_user_in_range_http_receipts",
self._get_receipts_for_room_and_threads_txn,
user_id=user_id,
room_ids=room_ids,
thread_ids=thread_ids,
)

notifs = [
HttpPushAction(
event_id=event_id,
Expand Down Expand Up @@ -998,12 +1062,6 @@ async def get_unread_push_actions_for_user_in_range_for_email(
The list will have between 0~limit entries.
"""

receipts_by_room = await self.db_pool.runInteraction(
"get_unread_push_actions_for_user_in_range_email_receipts",
self._get_receipts_by_room_txn,
user_id=user_id,
)

def get_push_actions_txn(
txn: LoggingTransaction,
) -> List[Tuple[str, str, str, int, str, bool, int]]:
Expand All @@ -1026,6 +1084,28 @@ def get_push_actions_txn(
"get_unread_push_actions_for_user_in_range_email", get_push_actions_txn
)

room_ids = set()
thread_ids = []
for (
_,
room_id,
thread_id,
_,
_,
_,
_,
) in push_actions:
room_ids.add(room_id)
thread_ids.append(thread_id)

receipts_by_room = await self.db_pool.runInteraction(
"get_unread_push_actions_for_user_in_range_email_receipts",
self._get_receipts_for_room_and_threads_txn,
user_id=user_id,
room_ids=room_ids,
thread_ids=thread_ids,
)

# Make a list of dicts from the two sets of results.
notifs = [
EmailPushAction(
Expand Down

0 comments on commit 5360bae

Please sign in to comment.