feat(personhog): add caller tag attribution to Django client#60369
feat(personhog): add caller tag attribution to Django client#60369nickbest-ph wants to merge 4 commits into
Conversation
Add caller-tag attribution to the Django gRPC client so each request carries an x-caller-tag header identifying the code path that made it. - ContextVar-based _caller_tag with default "unknown" - personhog_caller_tag() context manager for manual tagging - set_caller_tag() / get_caller_tag() helpers - CallerTagInterceptor injecting x-caller-tag metadata - caller_tag label added to client-side request duration and count metrics - Wired into interceptor chain in PersonHogClient
Add PersonHogCallerTagMiddleware that derives an x-caller-tag from the resolved Django URL name. Covers persons, cohorts, feature flags, groups, insights, dashboards, events, query, and more. Unmapped API routes fall back to "api/other", non-API routes to "web/other".
Add task_prerun/task_postrun signal handling that derives a caller tag from the Celery task name (e.g. "celery/calculate_cohort_ch") and sets the ContextVar so downstream personhog gRPC calls carry the tag.
Wrap key personhog call sites with personhog_caller_tag() for finer attribution beyond what the middleware and Celery auto-tagging provide: - insights/actor-query: actor retrieval in get_people() and get_groups() - session-recordings/persons: person lookup in recording list - feature-flags/local-evaluation: group type mapping bulk load - admin/team-delete: bulk person/group/mapping deletion
|
🎭 Playwright didn't run on this PR — your changes touch code that could affect E2E behavior, but Playwright is opt-in via label now to keep CI cost down. Add the Most PRs don't need this. Real regressions still get caught on master and fix-forward. |
Prompt To Fix All With AIFix the following 2 code review issues. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 2
posthog/personhog_client/middleware.py:68-69
The fallback uses `"api/" in request.path`, which is a substring check rather than a prefix check. Any path that contains `api/` somewhere in the middle (e.g. `/analytics/v2/api/results/`) would be tagged `api/other` instead of `web/other`. Using `startswith` on the normalized path is more precise.
```suggestion
if request.path.startswith("/api/"):
return "api/other"
```
### Issue 2 of 2
posthog/celery.py:237-246
**Private ContextVar imported directly for reset**
Both `celery.py` (here) and `middleware.py` (line 90 in its `finally` block) reach into `interceptor.py` and import the private `_caller_tag` symbol in order to call `.reset(token)`. The public surface already has `set_caller_tag` and `get_caller_tag`; adding a `reset_caller_tag(token)` function would let both callers stay on the public API and avoid coupling to the private name.
Reviews (1): Last reviewed commit: "feat(personhog): add manual caller tags ..." | Re-trigger Greptile |
| if "api/" in request.path: | ||
| return "api/other" |
There was a problem hiding this comment.
The fallback uses
"api/" in request.path, which is a substring check rather than a prefix check. Any path that contains api/ somewhere in the middle (e.g. /analytics/v2/api/results/) would be tagged api/other instead of web/other. Using startswith on the normalized path is more precise.
| if "api/" in request.path: | |
| return "api/other" | |
| if request.path.startswith("/api/"): | |
| return "api/other" |
Prompt To Fix With AI
This is a comment left during a code review.
Path: posthog/personhog_client/middleware.py
Line: 68-69
Comment:
The fallback uses `"api/" in request.path`, which is a substring check rather than a prefix check. Any path that contains `api/` somewhere in the middle (e.g. `/analytics/v2/api/results/`) would be tagged `api/other` instead of `web/other`. Using `startswith` on the normalized path is more precise.
```suggestion
if request.path.startswith("/api/"):
return "api/other"
```
How can I resolve this? If you propose a fix, please make it concise.| from posthog.personhog_client.interceptor import _caller_tag | ||
|
|
||
| if task_id in task_timings: | ||
| start_time = task_timings.pop(task_id, None) | ||
| if start_time: | ||
| CELERY_TASK_DURATION_HISTOGRAM.labels(task_name=task.name).observe(time.time() - start_time) | ||
|
|
||
| token = _caller_tag_tokens.pop(task_id, None) | ||
| if token is not None: | ||
| _caller_tag.reset(token) |
There was a problem hiding this comment.
Private ContextVar imported directly for reset
Both celery.py (here) and middleware.py (line 90 in its finally block) reach into interceptor.py and import the private _caller_tag symbol in order to call .reset(token). The public surface already has set_caller_tag and get_caller_tag; adding a reset_caller_tag(token) function would let both callers stay on the public API and avoid coupling to the private name.
Prompt To Fix With AI
This is a comment left during a code review.
Path: posthog/celery.py
Line: 237-246
Comment:
**Private ContextVar imported directly for reset**
Both `celery.py` (here) and `middleware.py` (line 90 in its `finally` block) reach into `interceptor.py` and import the private `_caller_tag` symbol in order to call `.reset(token)`. The public surface already has `set_caller_tag` and `get_caller_tag`; adding a `reset_caller_tag(token)` function would let both callers stay on the public API and avoid coupling to the private name.
How can I resolve this? If you propose a fix, please make it concise.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
haacked
left a comment
There was a problem hiding this comment.
Nice! No blocking issues but a few suggestions that should help perf and solidify this.
|
|
||
|
|
||
| def get_caller_tag() -> str: | ||
| return _caller_tag.get() |
There was a problem hiding this comment.
suggestion: Building on @greptile-apps's reset-helper note: the codebase has prior art for exactly this shape in posthog/clickhouse/query_tagging.py, which exposes tag_queries(**kwargs) and reset_query_tags() and keeps the ContextVar private. The concrete fix here is one helper next to set_caller_tag:
| return _caller_tag.get() | |
| def set_caller_tag(tag: str) -> contextvars.Token[str]: | |
| return _caller_tag.set(tag) | |
| def reset_caller_tag(token: contextvars.Token[str]) -> None: | |
| _caller_tag.reset(token) | |
| def get_caller_tag() -> str: | |
| return _caller_tag.get() |
Then posthog/celery.py:237 drops from posthog.personhog_client.interceptor import _caller_tag in favor of reset_caller_tag, the awkward inline-import inside the middleware.py:90 finally block disappears, and _caller_tag stays actually private. test_interceptor.py's test_set_caller_tag_function (which currently reaches into _caller_tag for the same reason) collapses too.
There was a problem hiding this comment.
The suggestion is correct — adding reset_caller_tag is the right fix here. celery.py is already importing _caller_tag privately just to call .reset(token), which is the exact leak this helper closes.
The full cleanup needed alongside the interceptor change:
celery.py: swapfrom posthog.personhog_client.interceptor import _caller_tag→reset_caller_tag, and replace_caller_tag.reset(token)withreset_caller_tag(token)middleware.py:90: same — remove the inline import of_caller_tagin thefinallyblock and usereset_caller_taginsteadtest_interceptor.py(test_set_caller_tag_function): replace the direct_caller_tagaccess withreset_caller_tag+get_caller_tagto stay within the public API
That makes _caller_tag genuinely private and the interface consistent with tag_queries/reset_query_tags from query_tagging.py.
|
|
||
| token = _caller_tag_tokens.pop(task_id, None) | ||
| if token is not None: | ||
| _caller_tag.reset(token) |
There was a problem hiding this comment.
suggestion: This works today because production runs Celery's default prefork pool, so task_prerun and task_postrun for a given task fire in the same worker process and _caller_tag.reset(token) finds the token in the current Context.
Under gevent/eventlet pools, signals can fire in different greenlets with different copied contexts, and reset(token) then raises ValueError: <Token> was created in a different Context, leaking entries in _caller_tag_tokens.
Two options, either is fine:
A) Document the assumption with a comment near _caller_tag_tokens at celery.py:200.
B) Guard the reset so a pool switch is at worst a no-op:
token = _caller_tag_tokens.pop(task_id, None)
if token is not None:
try:
_caller_tag.reset(token)
except ValueError:
_caller_tag.set("unknown")(If you accept the reset_caller_tag helper above, the same guard goes inside it.)
| "personhog_django_grpc_request_duration_seconds", | ||
| "gRPC request duration from a Django personhog client to the personhog service", | ||
| labelnames=["method", "client_name"], | ||
| labelnames=["method", "client_name", "caller_tag"], |
There was a problem hiding this comment.
suggestion: This adds caller_tag to the label set of two already-shipped metrics (personhog_django_grpc_request_duration_seconds here and personhog_django_grpc_requests_total at line 59). I checked the personhog-django-client Grafana dashboard: all panels touching these metrics use top-level sum(rate(...)) or sum by (method) / sum by (le, method), so panels keep rendering through aggregation, and no Grafana alert rule references either metric. Two residual concerns worth flagging:
-
Histogram bucket continuity at rollout. Old bucket series go stale and new per-
caller_tagbuckets start at zero, sohistogram_quantile(0.95, sum(rate(..._bucket)) by (le))over a window straddling the deploy will dip until buckets warm up. Worth a heads-up to anyone watching p95 during the rollout window. -
Celery-derived tag cardinality isn't bounded by the diff.
_celery_caller_tag(task.name)produces onecaller_tagvalue per registered Celery task name. Today's mapped DRF prefixes give ~25 known values, but new task names produce new series automatically. If that's not desired, normalize at the boundary (cap to a known set, or hash long tails intocelery/other).
| # DRF view_name prefix → caller tag. | ||
| # view_name is "{basename}-{action}", e.g. "persons-list", "project_cohorts-detail". | ||
| # Checked with startswith so "persons-list" matches prefix "persons". | ||
| _VIEW_NAME_TO_CALLER_TAG: dict[str, str] = { |
There was a problem hiding this comment.
nit: _VIEW_NAME_TO_CALLER_TAG is hand-maintained and degrades silently — a new DRF viewset just falls into api/other with no test failure, and the gap only shows up later when someone is staring at a dashboard dominated by api/other. A one-line pointer comment is enough to keep it honest:
| _VIEW_NAME_TO_CALLER_TAG: dict[str, str] = { | |
| # DRF view_name prefix → caller tag. | |
| # view_name is "{basename}-{action}", e.g. "persons-list", "project_cohorts-detail". | |
| # Checked with startswith so "persons-list" matches prefix "persons". | |
| # Hand-maintained: when adding a new DRF viewset to posthog/api/__init__.py | |
| # (or related routers), add a prefix here so traffic doesn't fall into `api/other`. | |
| _VIEW_NAME_TO_CALLER_TAG: dict[str, str] = { |
|
|
||
| def _resolve_caller_tag(request: HttpRequest) -> str: | ||
| try: | ||
| route = resolve(request.path) |
There was a problem hiding this comment.
suggestion: _resolve_caller_tag calls resolve(request.path) inside PersonHogCallerTagMiddleware.__call__, but BaseHandler.get_response will turn around and call the resolver again to dispatch the view. Every request through the middleware chain pays for URL resolution twice. With posthog's URL config size, that's a non-trivial fixed cost on a hot path that runs for every request, not just the ones that end up making personhog gRPC calls.
The process_view hook runs after Django's own resolution and gives you request.resolver_match.view_name for free:
class PersonHogCallerTagMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request: HttpRequest) -> HttpResponse:
token = set_caller_tag("api/other" if "/api/" in request.path else "web/other")
try:
return self.get_response(request)
finally:
reset_caller_tag(token)
def process_view(self, request, view_func, view_args, view_kwargs) -> None:
view_name = (request.resolver_match.view_name or "") if request.resolver_match else ""
for prefix, tag in _VIEW_NAME_TO_CALLER_TAG.items():
if view_name.startswith(prefix):
set_caller_tag(tag)
return None
return None(Assumes the reset_caller_tag helper from the interceptor.py finding. The set_caller_tag inside process_view intentionally doesn't capture its token; the outer __call__ finally pops the original token, restoring whatever was set before this middleware ran.)
If keeping the current shape is preferred, an alternative is to cache resolution per-path on an lru_cache'd helper, but that adds memory pressure and doesn't address the architectural duplication.
| self.assertEqual(get_caller_tag(), "manual-tag") | ||
| from posthog.personhog_client.interceptor import _caller_tag | ||
|
|
||
| _caller_tag.reset(token) |
There was a problem hiding this comment.
suggestion: TestCallerTagInterceptor covers the new CallerTagInterceptor (default-unknown and explicit-context paths), but MetricsInterceptor also now reads _caller_tag.get() and stamps it on PERSONHOG_DJANGO_REQUEST_COUNT and PERSONHOG_DJANGO_REQUEST_DURATION, with no test asserting the label flows through. If someone later drops the caller_tag= kwarg, refactors the label set, or pins the value to "unknown", the rest of the suite keeps passing and dashboards built on the new label silently lose the attribution dimension.
def test_metrics_interceptor_records_caller_tag(self) -> None:
interceptor = MetricsInterceptor("posthog-django-web")
details = _make_call_details(method="/personhog.service.v1.PersonHogService/GetPerson")
class _Resp:
def code(self): return None
before = PERSONHOG_DJANGO_REQUEST_COUNT.labels(
method="GetPerson", status="OK", client_name="posthog-django-web", caller_tag="api/cohorts"
)._value.get()
with personhog_caller_tag("api/cohorts"):
interceptor.intercept_unary_unary(lambda d, r: _Resp(), details, request=b"")
after = PERSONHOG_DJANGO_REQUEST_COUNT.labels(
method="GetPerson", status="OK", client_name="posthog-django-web", caller_tag="api/cohorts"
)._value.get()
self.assertEqual(after - before, 1)| ("deeply_nested", "a.b.c.d.run", "celery/run"), | ||
| ] | ||
| ) | ||
| def test_derives_tag_from_task_name(self, _name: str, task_name: str, expected: str) -> None: |
There was a problem hiding this comment.
suggestion: TestCeleryCallerTag covers only the pure _celery_caller_tag helper. The signal-handler round-trip (the part that actually wires this into Celery and uses _caller_tag_tokens to round-trip the token between prerun and postrun) is unexercised. A regression where postrun_signal_handler is wired to the wrong key, the dict pop happens before the reset, or the reset is skipped on a failed task would leak the previous task's tag into the next one on the same worker, with no test catching it.
class TestCeleryCallerTagSignals(TestCase):
def test_prerun_sets_and_postrun_resets(self):
from types import SimpleNamespace
from posthog.celery import prerun_signal_handler, postrun_signal_handler, _caller_tag_tokens
from posthog.personhog_client.interceptor import get_caller_tag
task = SimpleNamespace(name="posthog.tasks.calculate_cohort.calculate_cohort_ch")
self.assertEqual(get_caller_tag(), "unknown")
prerun_signal_handler(task_id="t-1", task=task)
self.assertEqual(get_caller_tag(), "celery/calculate_cohort_ch")
self.assertIn("t-1", _caller_tag_tokens)
postrun_signal_handler(task_id="t-1", task=task)
self.assertEqual(get_caller_tag(), "unknown")
self.assertNotIn("t-1", _caller_tag_tokens)
def test_postrun_without_prerun_is_safe(self):
from types import SimpleNamespace
from posthog.celery import postrun_signal_handler
postrun_signal_handler(task_id="never-seen", task=SimpleNamespace(name="x.y.z"))
Problem
We need per-caller attribution on personhog gRPC requests to trace heavy queries back to specific Django views, Celery tasks, and code paths. The
x-caller-tagheader (consumed by the router in PR #60361) provides this on the server side — this PR sends it from the Django/Python side.Changes
Four commits, each building on the previous:
1. CallerTagInterceptor — Core machinery:
_caller_tagContextVar (default"unknown"),CallerTagInterceptorgRPC interceptor,personhog_caller_tag()context manager,set_caller_tag()/get_caller_tag()helpers. Addscaller_taglabel to client-sidePERSONHOG_DJANGO_REQUEST_DURATIONandPERSONHOG_DJANGO_REQUEST_COUNTmetrics. 6 new tests.2. Django middleware auto-tagging —
PersonHogCallerTagMiddlewarederives a caller tag from the resolved Django URL name (e.g.,persons-list→api/persons,project_cohorts-detail→api/cohorts). Maps ~20 known view prefixes; unmapped API routes →api/other, non-API →web/other. 15 new tests.3. Celery task auto-tagging —
task_prerunsignal handler derives tags from task names (e.g.,posthog.tasks.calculate_cohort.calculate_cohort_ch→celery/calculate_cohort_ch). Resets intask_postrun. 3 parameterized tests.4. Manual tags at known-heavy paths — Wraps specific call sites with
personhog_caller_tag()for finer attribution:insights/actor-query—get_people()andget_groups()in actor queriessession-recordings/persons— person lookup in recording list APIfeature-flags/local-evaluation— group type mapping bulk loadadmin/team-delete— bulk person/group/mapping deletionDeploy target: posthog-web, posthog-worker
How did you test this code?
Publish to changelog?
No
Docs update
N/A
🤖 Agent context
Cherry-picked from
nick/personhog-query-taggingdevelopment branch (4 commits). Part of a 4-PR series addingx-caller-tagattribution across the personhog stack: