Skip to content
This repository has been archived by the owner on Mar 26, 2024. It is now read-only.

Commit

Permalink
Beeper user notification counts (#50)
Browse files Browse the repository at this point in the history
* Implement notification count table insert and aggregate

* Add Beeper notification count table migration

* Appease sqlite

* Fix aggregation query

* Add experimental flag to enable user notification counts

* More explicit column name

* Offset recent notification counts by 100k most recent events

Super arbitrary but this only exists as a temporary change, idea here
is that the very latest counts are much more likely to be cleared by
live receipts and we want to minimize serialization retries on the
receipt transaction.

* Don't allow local users to send receipts for unknown events

This isn't firing in production but now must be checked to prevent issues
with notification counts.

* Clear notification counts on receipt

* Fix starting value insert in migration

* Use real rooms/events in receipt tests
  • Loading branch information
Fizzadar committed Jan 16, 2023
1 parent 09f7874 commit 2672e3b
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 11 deletions.
5 changes: 5 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,8 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
# Enable room version (and thus applicable push rules from MSC3931/3932)
version_id = RoomVersions.MSC1767v10.identifier
KNOWN_ROOM_VERSIONS[version_id] = RoomVersions.MSC1767v10

self.beeper_user_notification_counts_enabled = experimental.get(
"beeper_user_notification_counts_enabled",
False,
)
161 changes: 159 additions & 2 deletions synapse/storage/databases/main/beeper.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,46 @@
# Beep beep!

import logging
from typing import Optional, Tuple, cast
from typing import TYPE_CHECKING, List, Optional, Tuple, cast

from synapse.events import EventBase
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import LoggingTransaction
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.types import RoomStreamToken
from synapse.util.caches.descriptors import cached

if TYPE_CHECKING:
from synapse.server import HomeServer

logger = logging.getLogger(__name__)


class BeeperStore(SQLBaseStore):
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)

self.user_notification_counts_enabled: bool = (
hs.config.experimental.beeper_user_notification_counts_enabled
)

if (
self.user_notification_counts_enabled
and hs.config.worker.run_background_tasks
):
self.aggregate_notification_counts_loop = self._clock.looping_call(
self.beeper_aggregate_notification_counts, 30 * 1000
)
self.is_aggregating_notification_counts = False

@cached(max_entries=50000, num_args=2, tree=True)
async def beeper_preview_event_for_room_id_and_user_id(
self, room_id: str, user_id: str, to_key: RoomStreamToken
Expand Down Expand Up @@ -82,3 +111,131 @@ def beeper_cleanup_tombstoned_room_txn(txn: LoggingTransaction) -> None:
"beeper_cleanup_tombstoned_room",
beeper_cleanup_tombstoned_room_txn,
)

def beeper_add_notification_counts_txn(
self,
txn: LoggingTransaction,
notifiable_events: List[EventBase],
) -> None:
if not self.user_notification_counts_enabled:
return

sql = """
INSERT INTO beeper_user_notification_counts (
room_id, event_stream_ordering,
user_id, thread_id, notifs, unreads, highlights
)
SELECT ?, ?, user_id, thread_id, notif, unread, highlight
FROM event_push_actions_staging
WHERE event_id = ?
"""

txn.execute_batch(
sql,
(
(
event.room_id,
event.internal_metadata.stream_ordering,
event.event_id,
)
for event in notifiable_events
),
)

def beeper_clear_notification_counts_txn(
self,
txn: LoggingTransaction,
user_id: str,
room_id: str,
stream_ordering: int,
) -> None:
if not self.user_notification_counts_enabled:
return

sql = """
DELETE FROM beeper_user_notification_counts
WHERE
user_id = ?
AND room_id = ?
AND event_stream_ordering <= ?
"""

txn.execute(sql, (user_id, room_id, stream_ordering))

async def beeper_aggregate_notification_counts(self) -> None:
if not self.user_notification_counts_enabled:
return

def aggregate_txn(txn: LoggingTransaction) -> None:
sql = """
WITH recent_rows AS ( -- Aggregate the tables, flag aggregated rows for deletion
SELECT
user_id,
room_id
FROM
beeper_user_notification_counts
WHERE
event_stream_ordering > (
SELECT event_stream_ordering FROM beeper_user_notification_counts_stream_ordering
)
-- Arbitrary 100k offset for now
AND event_stream_ordering < SELECT MAX(stream_ordering) - 100000 FROM events
)
UPDATE
beeper_user_notification_counts AS epc
SET
unreads = CASE WHEN epc.event_stream_ordering = agg.max_eso THEN agg.unreads ELSE 0 END,
notifs = CASE WHEN epc.event_stream_ordering = agg.max_eso THEN agg.notifs ELSE 0 END,
highlights = CASE WHEN epc.event_stream_ordering = agg.max_eso THEN agg.highlights ELSE 0 END,
aggregated = epc.event_stream_ordering != agg.max_eso
FROM (
SELECT
user_id,
room_id,
SUM(unreads) AS unreads,
SUM(notifs) AS notifs,
SUM(highlights) AS highlights,
MAX(event_stream_ordering) AS max_eso
FROM
beeper_user_notification_counts
WHERE
user_id IN(SELECT user_id FROM recent_rows)
AND room_id IN(SELECT room_id FROM recent_rows)
GROUP BY
user_id,
room_id
) AS agg
WHERE
epc.room_id = agg.room_id
AND epc.user_id = agg.user_id
RETURNING
event_stream_ordering;
"""

txn.execute(sql)
orders = list(txn)
if not orders:
return

max_stream_ordering = max(orders)
txn.execute(
"UPDATE beeper_user_notification_counts_stream_ordering SET stream_ordering = ?",
max_stream_ordering,
)

logger.info(f"Aggregated {len(orders)} notification count rows")

if self.is_aggregating_notification_counts:
return

self.is_aggregating_notification_counts = True

try:
logger.debug("Aggregating notification counts")

await self.db_pool.runInteraction(
"beeper_aggregate_notification_counts",
aggregate_txn,
)
finally:
self.is_aggregating_notification_counts = False
2 changes: 2 additions & 0 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2234,6 +2234,8 @@ def _set_push_actions_for_event_and_users_txn(
),
)

self.store.beeper_add_notification_counts_txn(txn, notifiable_events)

# Now we delete the staging area for *all* events that were being
# persisted.
txn.execute_batch(
Expand Down
12 changes: 8 additions & 4 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,10 +634,9 @@ def _insert_linearized_receipt_txn(
)

if res is None and self.hs.is_mine_id(user_id):
logger.warning(
"Sending local user receipt for unknown event, roomID=%s, eventID=%s",
room_id,
event_id,
raise ValueError(
"Local users cannot send receipts for unknown event, "
f"roomID={room_id}, eventID={event_id}",
)

stream_ordering = int(res["stream_ordering"]) if res else None
Expand Down Expand Up @@ -710,6 +709,11 @@ def _insert_linearized_receipt_txn(
where_clause=where_clause,
)

if self.hs.is_mine_id(user_id):
self.beeper_clear_notification_counts_txn( # type: ignore[attr-defined]
txn, user_id, room_id, stream_ordering
)

return rx_ts

def _graph_to_linear(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
CREATE TABLE beeper_user_notification_counts (
user_id TEXT,
room_id TEXT,
thread_id TEXT,
event_stream_ordering BIGINT,
notifs BIGINT,
unreads BIGINT,
highlights BIGINT,
aggregated BOOLEAN,
UNIQUE (user_id, room_id, thread_id, event_stream_ordering)
);

CREATE TABLE beeper_user_notification_counts_stream_ordering (
lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
event_stream_ordering BIGINT NOT NULL,
CHECK (lock='X')
);

INSERT INTO beeper_user_notification_counts_stream_ordering (event_stream_ordering) VALUES (0);
10 changes: 7 additions & 3 deletions tests/handlers/test_appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ def test_sending_read_receipt_batches_to_application_services(self):
],
ApplicationService.NS_ROOMS: [
{
"regex": "!fakeroom_.*",
"regex": ".*",
"exclusive": True,
}
],
Expand All @@ -594,15 +594,19 @@ def test_sending_read_receipt_batches_to_application_services(self):
# Now, pretend that we receive a large burst of read receipts (300 total) that
# all come in at once.
for i in range(300):
room_id = self.helper.create_room_as(self.local_user, tok=self.local_user_token)
resp = self.helper.send(room_id, tok=self.local_user_token)
event_id = resp["event_id"]

self.get_success(
# Insert a fake read receipt into the database
self.hs.get_datastores().main.insert_receipt(
# We have to use unique room ID + user ID combinations here, as the db query
# is an upsert.
room_id=f"!fakeroom_{i}:test",
room_id=room_id,
receipt_type="m.read",
user_id=self.local_user,
event_ids=[f"$eventid_{i}"],
event_ids=[event_id],
thread_id=None,
data={},
)
Expand Down
9 changes: 7 additions & 2 deletions tests/rest/client/test_receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from twisted.test.proto_helpers import MemoryReactor

import synapse.rest.admin
from synapse.rest.client import login, receipts, register
from synapse.rest.client import login, receipts, register, room
from synapse.server import HomeServer
from synapse.util import Clock

Expand All @@ -26,6 +26,7 @@ class ReceiptsTestCase(unittest.HomeserverTestCase):
login.register_servlets,
register.register_servlets,
receipts.register_servlets,
room.register_servlets,
synapse.rest.admin.register_servlets,
]

Expand All @@ -34,9 +35,13 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.owner_tok = self.login("owner", "pass")

def test_send_receipt(self) -> None:
room_id = self.helper.create_room_as(self.owner, tok=self.owner_tok)
resp = self.helper.send(room_id, tok=self.owner_tok)
event_id = resp["event_id"]

channel = self.make_request(
"POST",
"/rooms/!abc:beep/receipt/m.read/$def",
f"/rooms/!abc:beep/receipt/m.read/{event_id}",
content={},
access_token=self.owner_tok,
)
Expand Down

0 comments on commit 2672e3b

Please sign in to comment.