fix(workflow_engine): Add a cache for Workflows to reduce DB load#106925
fix(workflow_engine): Add a cache for Workflows to reduce DB load#106925saponifi3d merged 35 commits intomasterfrom
Conversation
| .distinct() | ||
| ) | ||
|
|
||
| cache.set(cache_key, workflows, timeout=CACHE_TTL) |
There was a problem hiding this comment.
I'm curious what our observability of caching is here.
I know in traces one type of cache (django? is this django cache or only sometimes?) doesn't show up, and that's been a bit of a pain for debugging.
Also, it'd be nice if we could have counters for hit/miss so we can brag about how many queries we're avoiding.
There was a problem hiding this comment.
yeah, i kinda purposefully was avoiding obs / counters thus far. 😅
did you have any specific obs in mind? i'm thinking a metric for cache hit / miss / invalidation.
🤔 maybe debug logs for cache miss and when we invalidate? (thinking a stack trace might be handy with signals. could at least see which models are causing invalidations etc)
| This method uses a read-through cache, and returns which workflows to evaluate. | ||
| """ | ||
| env_id = environment.id if environment is not None else None | ||
| cache_key = processing_workflow_cache_key(detector.id, env_id) |
There was a problem hiding this comment.
if you like barely justfied abstractions, we have CacheAccess[T] thing.
The idea is that you define a subclass like
class _ProcessingWorkflowCacheAccess(CacheAcess[set[Workflow]]):
def __init__(self, ..., ttl=DEFAULT_TTL) -> None:
# verify params, save key
def key(self) -> str:
return self._key
...
cache_access = _ProcessingWorkflowCacheAccess(detector, environment)
workflows = cache_access.get()
..
cache_access.set(workflows)
Not game changing, but this was after we were using the wrong key in one place and had some wrong type assumptions about cached values, so it seemed appropriate to try for an abstraction that ensures consistent key use and type safety.
(it doesn't have delete, but it should).
There was a problem hiding this comment.
👍 -- i like it. i was thinking of something similar tbh 🤣 i always fear text based keys.
| from sentry.workflow_engine.models import Detector | ||
|
|
||
|
|
||
| @receiver(post_save, sender=Detector) |
There was a problem hiding this comment.
Q: why did we end up going with post_save signals on detector? Is it cause the lack of SOPA?
There was a problem hiding this comment.
we ended up needing to use receivers, because we aren't modifying these models only through the API / validators as initial thought (😢). to make this easier for now (and make sure we invalidate at any point needed), decided to use the monitor.
if we end up pulling workflow_engine into it's own full service, then we can simplify to only using the API validators. (that would help perf, and simplify the logic flows)
| from sentry.workflow_engine.models import DetectorWorkflow | ||
|
|
||
|
|
||
| @receiver(post_migrate, sender=DetectorWorkflow) |
There was a problem hiding this comment.
Q: why are these signals on post_migrate and pre_save for invalidation?
There was a problem hiding this comment.
if we run a migration that effects these models, then we need to clear the cache. it likely mutated relationships, and pre_save is not triggered by a migration.
other pro-tip, these receivers run per test run, so it seems like i might not want to have this after all, just from a CI slowdown perspective. 🤔
| if detector_id is None: | ||
| detector_id = "*" |
There was a problem hiding this comment.
Q: This part is prob still in progress but I'm confused on why we're putting a wildcard here since it doesn't look like we ever set one in the initial cache population? Also we should prob add a comment or seperate out the function to be so that if both detector_id + env_id being None = clear everything.
I guess is it possible only one of the params would be None, and if so when would that happen?
There was a problem hiding this comment.
These wildcards allow us to invalidate the cache in different ways / at different times.
When we modify a detector, we need to invalidate all the environments for that detector id.
When we run a migration that effects the relationship with detectors and workflows, we need to invalidate the entire cache (we don't know which models were effected, but need to invalidate the cache because those models could be wrong)
Then all the other cases, we know which specific cache is effected and we target it specifically. If you want to see all the cases, they're in sentry/workflow_engine/models/signals/.
There was a problem hiding this comment.
All that said, wildcard invalidation like this is super sus, it's currently not implemented and doesn't work here (lol, i just add a * to the key).
so i'm also trying out a few other approaches right now:
- an approach that we could keep a list of all these keys in another bit of redis then look them up -- but then we have to manage the cache with something else and then 😵 , caches on caches.
- also investigating if we could make a namespace for the cache, and a namespace for the detector_id. if we can do that, then we could just say like
workflow_cache.clear(f"workflow-cache:{detector_id}")kind of thing.
| try: | ||
| # This lookup trade-off is okay, because we rarely update these relationships | ||
| # Most cases are delete / create new DetectorWorkflow relationships. | ||
| old_instance = DetectorWorkflow.objects.get(pk=instance.pk) |
There was a problem hiding this comment.
note to self for tomorrow: make sure to include the env_id in the .get lookup here.
There was a problem hiding this comment.
nah, not any more -- surprised it didn't go away. Since this is using the primary key to do the lookup, we shouldn't need any additional filtering.
bcbebec to
d3fa8dc
Compare
15264ff to
c04ca8b
Compare
c04ca8b to
4a5fee2
Compare
this addressed the mypy error as well. - Address mypy errors in cache/test_workflow.py
e7bc66e to
c118412
Compare
…ooking up the connections is handled
…he integration test because the caches are proerly invalidated now.
|
Let me know if this is too much to review at once, i couldn't decide if it'd be easier with all the context together or to split caching and invalidation stuff as separate PRs. Also, planning on making some higher level abstractions, but need to get another example in to think through. Seems like we could make an ABC like |
kcons
left a comment
There was a problem hiding this comment.
I want to look again at the invalidation path, but all seems reasonable to me, and we have tests and such so no need to block on it.
|
|
||
| Args: | ||
| detector_id: Detector ID to invalidate (required) | ||
| env_id: {int|None} - The environment the workflow is triggered on, if not set, |
There was a problem hiding this comment.
This is a slightly hard kind of interface to express in python.
I might suggest a class AllEnvs: pass magic value over a string const, but I'm not sure enough it's better to suggest the change.
There was a problem hiding this comment.
🤔 yeah, agree. the env_id interface was rough -- hence the default bit i tried to introduce as a way to get around that None has a different meaning here.
Maybe we should extra it out to it's own type in workflow_engine/types.py or maybe even in the Environment model? then reuse that type everywhere we discuss Environment?
| try: | ||
| # This lookup trade-off is okay, because we rarely update these relationships | ||
| # Most cases are delete / create new DetectorWorkflow relationships. | ||
| old_instance = DetectorWorkflow.objects.get(pk=instance.pk) |
| CACHE_TTL = 60 # TODO - Increase TTL once we confirm everything | ||
| METRIC_PREFIX = "workflow_engine.cache.processing_workflow" | ||
|
|
||
| DEFAULT_VALUE: Literal["default"] = "default" |
…the env being none.
| global_by_detector: dict[int, set[Workflow]] = {d_id: set() for d_id in detector_ids} | ||
| env_by_detector: dict[int, set[Workflow]] = {d_id: set() for d_id in detector_ids} |
There was a problem hiding this comment.
nit: the convention is to not have inline typing unless needed since mypy can generally infer the type w/o
There was a problem hiding this comment.
🤔 not sure i agree with this one tbh, while having inline types might be a little slower for mypy, it also gives things like type completion and knowledge that all the values will be a workflow -- fwiw, mypy did not correctly infer these types.
| env_result = _check_caches_for_detectors(detectors, env_id) | ||
| workflows |= env_result.cached_workflows | ||
|
|
||
| missed_detector_ids = set(global_result.missed_detector_ids) |
There was a problem hiding this comment.
should we be making missed_detector_ids a set in the dataclass?
There was a problem hiding this comment.
planning to refactor this pretty heavily tbh, now that we have a couple examples of these caches we can make an abstraction to handle all this. (said abstraction exists on another branch of mine)
| workflows = _get_associated_workflows(event_detectors.detectors, environment) | ||
|
|
||
| if workflows: | ||
| metrics_incr("process_workflows", len(workflows)) |
There was a problem hiding this comment.
what's the purpose of this metric?
There was a problem hiding this comment.
It's used to track how many workflows are being processed. metrics_incr includes a data context to grab things like detector type etc and automatically decorate it too. so in the end we could use this to filter and see all the workflows being evaluated for metric issues for example. (jfyi, this is an existing metric, just moved it to a shared part of the code)
…06925) # Description We select workflows from the DB very frequently. This has added substantial load to our DB, even though the query is very fast / efficient. This PR introduces a caching layer for this high frequency db query.
Description
We select workflows from the DB very frequently. This has added substantial load to our DB, even though the query is very fast / efficient.
This PR introduces a caching layer for this high frequency db query.