Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't invalidate the entire event cache when we purge history #16905

Merged
merged 5 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16905.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Don't invalidate the entire event cache when we purge history.
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ def _invalidate_caches_for_room_events(self, room_id: str) -> None:
deleted.
"""

self._invalidate_local_get_event_cache_all() # type: ignore[attr-defined]
self._invalidate_local_get_event_cache_room_id(room_id) # type: ignore[attr-defined]
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved

self._attempt_to_invalidate_cache("have_seen_event", (room_id,))
self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
Expand Down
14 changes: 8 additions & 6 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ def __init__(
] = AsyncLruCache(
cache_name="*getEvent*",
max_size=hs.config.caches.event_cache_size,
# `extra_index_cb` Returns a tuple as that is the key type
extra_index_cb=lambda _, v: (v.event.room_id,),
)

# Map from event ID to a deferred that will result in a map from event
Expand Down Expand Up @@ -782,9 +784,9 @@ async def get_unredacted_events_from_cache_or_db(

if missing_events_ids:

async def get_missing_events_from_cache_or_db() -> Dict[
str, EventCacheEntry
]:
async def get_missing_events_from_cache_or_db() -> (
Dict[str, EventCacheEntry]
):
"""Fetches the events in `missing_event_ids` from the database.

Also creates entries in `self._current_event_fetches` to allow
Expand Down Expand Up @@ -910,12 +912,12 @@ def _invalidate_local_get_event_cache(self, event_id: str) -> None:
self._event_ref.pop(event_id, None)
self._current_event_fetches.pop(event_id, None)

def _invalidate_local_get_event_cache_all(self) -> None:
"""Clears the in-memory get event caches.
def _invalidate_local_get_event_cache_room_id(self, room_id: str) -> None:
"""Clears the in-memory get event caches for a room.

Used when we purge room history.
"""
self._get_event_cache.clear()
self._get_event_cache.invalidate_on_index_local((room_id,))
self._event_ref.clear()
self._current_event_fetches.clear()

Expand Down
57 changes: 57 additions & 0 deletions synapse/util/caches/lrucache.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
Iterable,
List,
Optional,
Set,
Tuple,
Type,
TypeVar,
Expand Down Expand Up @@ -386,6 +387,7 @@ def __init__(
apply_cache_factor_from_config: bool = True,
clock: Optional[Clock] = None,
prune_unread_entries: bool = True,
extra_index_cb: Optional[Callable[[KT, VT], KT]] = None,
):
"""
Args:
Expand Down Expand Up @@ -416,6 +418,20 @@ def __init__(
prune_unread_entries: If True, cache entries that haven't been read recently
will be evicted from the cache in the background. Set to False to
opt-out of this behaviour.

extra_index_cb: If provided, the cache keeps a second index from a
(different) key to a cache entry based on the return value of
the callback. This can then be used to invalidate entries based
on the second type of key.

For example, for the event cache this would be a callback that
maps an event to ita room ID, allowing invalidation of all
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
events in a given room.

Note: Though the two types of key have the same type, they are
in different namespaces.

Note: The new key does not have to be unique.
"""
# Default `clock` to something sensible. Note that we rename it to
# `real_clock` so that mypy doesn't think its still `Optional`.
Expand Down Expand Up @@ -463,6 +479,8 @@ def __init__(

lock = threading.Lock()

extra_index: Dict[KT, Set[KT]] = {}

def evict() -> None:
while cache_len() > self.max_size:
# Get the last node in the list (i.e. the oldest node).
Expand Down Expand Up @@ -521,6 +539,11 @@ def add_node(
if size_callback:
cached_cache_len[0] += size_callback(node.value)

if extra_index_cb:
index_key = extra_index_cb(node.key, node.value)
mapped_keys = extra_index.setdefault(index_key, set())
mapped_keys.add(node.key)

if caches.TRACK_MEMORY_USAGE and metrics:
metrics.inc_memory_usage(node.memory)

Expand All @@ -537,6 +560,14 @@ def delete_node(node: _Node[KT, VT]) -> int:

node.run_and_clear_callbacks()

if extra_index_cb:
index_key = extra_index_cb(node.key, node.value)
mapped_keys = extra_index.get(index_key)
if mapped_keys is not None:
mapped_keys.discard(node.key)
if not mapped_keys:
extra_index.pop(index_key, None)

if caches.TRACK_MEMORY_USAGE and metrics:
metrics.dec_memory_usage(node.memory)

Expand Down Expand Up @@ -748,13 +779,35 @@ def cache_clear() -> None:
if size_callback:
cached_cache_len[0] = 0

extra_index.clear()

if caches.TRACK_MEMORY_USAGE and metrics:
metrics.clear_memory_usage()

@synchronized
def cache_contains(key: KT) -> bool:
return key in cache

@synchronized
def cache_invalidate_on_index(index_key: KT) -> None:
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
"""Invalidates all entries that match the given extra index key.

This only makes sense to call when `extra_index_cb` was specified.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
"""

keys = extra_index.pop(index_key, None)
if not keys:
return

for key in keys:
node = cache.pop(key, None)
if not node:
continue

evicted_len = delete_node(node)
if metrics:
metrics.inc_evictions(EvictionReason.invalidation, evicted_len)

# make sure that we clear out any excess entries after we get resized.
self._on_resize = evict

Expand All @@ -771,6 +824,7 @@ def cache_contains(key: KT) -> bool:
self.len = synchronized(cache_len)
self.contains = cache_contains
self.clear = cache_clear
self.invalidate_on_index = cache_invalidate_on_index

def __getitem__(self, key: KT) -> VT:
result = self.get(key, _Sentinel.sentinel)
Expand Down Expand Up @@ -864,6 +918,9 @@ async def invalidate(self, key: KT) -> None:
# This method should invalidate any external cache and then invalidate the LruCache.
return self._lru_cache.invalidate(key)

def invalidate_on_index_local(self, index_key: KT) -> None:
self._lru_cache.invalidate_on_index(index_key)

def invalidate_local(self, key: KT) -> None:
"""Remove an entry from the local cache

Expand Down
31 changes: 31 additions & 0 deletions tests/util/test_lrucache.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,3 +383,34 @@ def test_evict_memory(self, jemalloc_interface: Mock) -> None:
# the items should still be in the cache
self.assertEqual(cache.get("key1"), 1)
self.assertEqual(cache.get("key2"), 2)


class ExtraIndexLruCacheTestCase(unittest.HomeserverTestCase):
def test_invalidate_simple(self) -> None:
cache: LruCache[str, int] = LruCache(10, extra_index_cb=lambda k, v: str(v))
cache["key1"] = 1
cache["key2"] = 2

cache.invalidate_on_index("key1")
self.assertEqual(cache.get("key1"), 1)
self.assertEqual(cache.get("key2"), 2)

cache.invalidate_on_index("1")
self.assertEqual(cache.get("key1"), None)
self.assertEqual(cache.get("key2"), 2)

def test_invalidate_multi(self) -> None:
cache: LruCache[str, int] = LruCache(10, extra_index_cb=lambda k, v: str(v))
cache["key1"] = 1
cache["key2"] = 1
cache["key3"] = 2

cache.invalidate_on_index("key1")
self.assertEqual(cache.get("key1"), 1)
self.assertEqual(cache.get("key2"), 1)
self.assertEqual(cache.get("key3"), 2)

cache.invalidate_on_index("1")
self.assertEqual(cache.get("key1"), None)
self.assertEqual(cache.get("key2"), None)
self.assertEqual(cache.get("key3"), 2)