feat(llm-gateway): mirror captures to a configurable secondary instance#60416
Conversation
Add an optional second capture target so the EU gateway can fan out each $ai_generation event to the EU PostHog instance in addition to the existing US capture, mirroring the ee/hogai/core/runner.py:201-206 pattern. Every event is now stamped with an `instance` group whose value is the customer-origin region URL, so the regional usage-report runs in posthog/tasks/usage_report.py attribute each event to the right aggregation regardless of which instance received the copy. When the new secondary host/token settings are unset, behaviour is identical to today (single capture to the configured primary). The matching charts override that wires up the EU fanout lives in a separate PR.
…e_url The plan-resolver URL is already per-deployment and carries the same value we want to stamp on captured events as the customer-origin region. Multi-purposing it removes one config field and one env var per deployment without losing any flexibility — both uses derive from the same per-region source of truth.
The SDK's groups={'instance': ...} resolution depends on the
destination project's group_type_index for 'instance'. The billing
query in posthog/tasks/usage_report.py hardcodes $group_1, so the
gateway must guarantee that slot regardless of how each AI Obs team
happens to have its group types registered. Write the property
directly alongside the existing groups argument so both the
hardcoded-index path (billing) and the SDK group-resolution path
(any future queries via group breakdowns) stay consistent.
- config.py: the secondary-capture comment described the original 'fan out to US for visibility' shape; with the staged rollout it's now the EU billing path. Update to match. - _build_groups: docstring claimed billing region attribution flowed through the SDK group_type_index resolution, but $group_1 is now hardcoded directly — the groups dict is just for LLM Analytics breakdowns. Reword accordingly. - callbacks/__init__.py: trim the three-sentence justification for reusing posthog_api_base_url to one line — the parameter name carries most of the intent already.
| callbacks.append(PrometheusCallback()) | ||
|
|
||
| litellm.callbacks = callbacks | ||
| litellm.callbacks = callbacks # ty: ignore[invalid-assignment] |
There was a problem hiding this comment.
Type ignore comment has a typo: # ty: ignore should be # type: ignore. The current syntax will not suppress type checker errors as intended.
litellm.callbacks = callbacks # type: ignore[invalid-assignment]| litellm.callbacks = callbacks # ty: ignore[invalid-assignment] | |
| litellm.callbacks = callbacks # type: ignore[invalid-assignment] | |
Spotted by Graphite
Is this helpful? React 👍 or 👎 to let us know.
Prompt To Fix All With AIFix the following 3 code review issues. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 3
services/llm-gateway/tests/callbacks/test_posthog.py:434-526
**New tests cover only `_on_success`; `_on_failure` parity is missing**
`_on_failure` received the same changes: it now always stamps `$group_1` and passes `_build_groups()` as `groups`. The three new tests exercise only the success path. There is no test verifying that `_on_failure` propagates the EU region URL, stamps `$group_1` correctly, or mirrors to the secondary destination when configured — so a regression there would go undetected. Mirror tests 2 and 3 for `_on_failure`.
### Issue 2 of 3
services/llm-gateway/tests/callbacks/test_posthog.py:434-526
**Three new tests are not parameterised**
The project preference is to use parameterised tests. `test_on_success_uses_configured_region_url_in_groups`, `test_on_success_skips_secondary_destination_when_unconfigured`, and the existing `test_on_success_captures_event` all call `_on_success` and then assert on `groups` / `$group_1` under different `PostHogCallback` configurations. These are natural candidates for a single `@pytest.mark.parametrize` over `(region_url, secondary_api_key, secondary_host, expected_groups, expected_group_1)`.
### Issue 3 of 3
services/llm-gateway/src/llm_gateway/callbacks/posthog.py:318-325
**Secondary capture is silently skipped if the primary's exception path itself throws**
`_capture_sync` calls the two destinations back-to-back with no guarding `try/except` around the first call. Inside `_capture_to_destination`, if `client.capture_exception(e, ...)` raises (the exception handler itself fails), that exception escapes the `finally` block and propagates up to `_capture_sync`, leaving the `if self._secondary_api_key` branch unreachable. For a mirroring feature where the secondary is the EU billing path, silently losing that capture on an SDK-level error in the primary's error handler is an undetected failure. Wrapping each `_capture_to_destination` call in its own `try/except` in `_capture_sync` would isolate the two destinations.
Reviews (1): Last reviewed commit: "chore(llm-gateway): tighten stale commen..." | Re-trigger Greptile |
| @pytest.mark.asyncio | ||
| async def test_on_success_uses_configured_region_url_in_groups( | ||
| self, | ||
| auth_user: AuthenticatedUser, | ||
| standard_logging_object: dict, | ||
| mock_posthog_client: tuple, | ||
| ) -> None: | ||
| _, mock_client = mock_posthog_client | ||
| callback = PostHogCallback( | ||
| api_key="eu-key", | ||
| host="https://eu.i.posthog.com", | ||
| region_url="https://eu.posthog.com", | ||
| ) | ||
| kwargs = {"standard_logging_object": standard_logging_object, "litellm_params": {}} | ||
|
|
||
| with ( | ||
| patch("llm_gateway.callbacks.posthog.get_auth_user", return_value=auth_user), | ||
| patch("llm_gateway.callbacks.posthog.get_product", return_value="slack_app"), | ||
| ): | ||
| await callback._on_success(kwargs, None, 0.0, 1.0, end_user_id=None) | ||
|
|
||
| call_kwargs = mock_client.capture.call_args.kwargs | ||
| assert call_kwargs["groups"] == {"instance": "https://eu.posthog.com", "project": 456} | ||
| # $group_1 is stamped explicitly so the usage-report query's hardcoded | ||
| # filter matches regardless of how the destination team registers | ||
| # `instance` in GroupTypeMapping. | ||
| assert call_kwargs["properties"]["$group_1"] == "https://eu.posthog.com" | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_on_success_mirrors_to_secondary_destination_when_configured( | ||
| self, | ||
| auth_user: AuthenticatedUser, | ||
| standard_logging_object: dict, | ||
| ) -> None: | ||
| mock_client = MagicMock() | ||
| with patch("llm_gateway.callbacks.posthog.Posthog", return_value=mock_client) as mock_cls: | ||
| callback = PostHogCallback( | ||
| api_key="eu-key", | ||
| host="https://eu.i.posthog.com", | ||
| region_url="https://eu.posthog.com", | ||
| secondary_api_key="us-key", | ||
| secondary_host="https://us.i.posthog.com", | ||
| ) | ||
| kwargs = {"standard_logging_object": standard_logging_object, "litellm_params": {}} | ||
|
|
||
| with ( | ||
| patch("llm_gateway.callbacks.posthog.get_auth_user", return_value=auth_user), | ||
| patch("llm_gateway.callbacks.posthog.get_product", return_value="slack_app"), | ||
| ): | ||
| await callback._on_success(kwargs, None, 0.0, 1.0, end_user_id=None) | ||
|
|
||
| # One Posthog client constructed per destination, each captured once. | ||
| primary_call, secondary_call = mock_cls.call_args_list | ||
| assert primary_call.args == ("eu-key",) | ||
| assert primary_call.kwargs["host"] == "https://eu.i.posthog.com" | ||
| assert secondary_call.args == ("us-key",) | ||
| assert secondary_call.kwargs["host"] == "https://us.i.posthog.com" | ||
| assert mock_client.capture.call_count == 2 | ||
|
|
||
| # Both copies carry the EU origin region — via the SDK `groups` arg | ||
| # and via an explicit $group_1 property — so the US-region usage | ||
| # report filter ($group_1 = 'https://us.posthog.com') excludes the | ||
| # mirrored copy and does not double-count. | ||
| for capture_call in mock_client.capture.call_args_list: | ||
| assert capture_call.kwargs["groups"] == { | ||
| "instance": "https://eu.posthog.com", | ||
| "project": 456, | ||
| } | ||
| assert capture_call.kwargs["properties"]["$group_1"] == "https://eu.posthog.com" | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_on_success_skips_secondary_destination_when_unconfigured( | ||
| self, | ||
| callback: PostHogCallback, | ||
| auth_user: AuthenticatedUser, | ||
| standard_logging_object: dict, | ||
| mock_posthog_client: tuple, | ||
| ) -> None: | ||
| mock_cls, mock_client = mock_posthog_client | ||
| kwargs = {"standard_logging_object": standard_logging_object, "litellm_params": {}} | ||
|
|
||
| with ( | ||
| patch("llm_gateway.callbacks.posthog.get_auth_user", return_value=auth_user), | ||
| patch("llm_gateway.callbacks.posthog.get_product", return_value="slack_app"), | ||
| ): | ||
| await callback._on_success(kwargs, None, 0.0, 1.0, end_user_id=None) | ||
|
|
||
| assert mock_cls.call_count == 1 | ||
| assert mock_client.capture.call_count == 1 | ||
|
|
||
|
|
||
| class TestNormalizeTraceId: | ||
| def test_returns_fresh_uuid_when_value_is_falsy(self) -> None: |
There was a problem hiding this comment.
New tests cover only
_on_success; _on_failure parity is missing
_on_failure received the same changes: it now always stamps $group_1 and passes _build_groups() as groups. The three new tests exercise only the success path. There is no test verifying that _on_failure propagates the EU region URL, stamps $group_1 correctly, or mirrors to the secondary destination when configured — so a regression there would go undetected. Mirror tests 2 and 3 for _on_failure.
Prompt To Fix With AI
This is a comment left during a code review.
Path: services/llm-gateway/tests/callbacks/test_posthog.py
Line: 434-526
Comment:
**New tests cover only `_on_success`; `_on_failure` parity is missing**
`_on_failure` received the same changes: it now always stamps `$group_1` and passes `_build_groups()` as `groups`. The three new tests exercise only the success path. There is no test verifying that `_on_failure` propagates the EU region URL, stamps `$group_1` correctly, or mirrors to the secondary destination when configured — so a regression there would go undetected. Mirror tests 2 and 3 for `_on_failure`.
How can I resolve this? If you propose a fix, please make it concise.| @pytest.mark.asyncio | ||
| async def test_on_success_uses_configured_region_url_in_groups( | ||
| self, | ||
| auth_user: AuthenticatedUser, | ||
| standard_logging_object: dict, | ||
| mock_posthog_client: tuple, | ||
| ) -> None: | ||
| _, mock_client = mock_posthog_client | ||
| callback = PostHogCallback( | ||
| api_key="eu-key", | ||
| host="https://eu.i.posthog.com", | ||
| region_url="https://eu.posthog.com", | ||
| ) | ||
| kwargs = {"standard_logging_object": standard_logging_object, "litellm_params": {}} | ||
|
|
||
| with ( | ||
| patch("llm_gateway.callbacks.posthog.get_auth_user", return_value=auth_user), | ||
| patch("llm_gateway.callbacks.posthog.get_product", return_value="slack_app"), | ||
| ): | ||
| await callback._on_success(kwargs, None, 0.0, 1.0, end_user_id=None) | ||
|
|
||
| call_kwargs = mock_client.capture.call_args.kwargs | ||
| assert call_kwargs["groups"] == {"instance": "https://eu.posthog.com", "project": 456} | ||
| # $group_1 is stamped explicitly so the usage-report query's hardcoded | ||
| # filter matches regardless of how the destination team registers | ||
| # `instance` in GroupTypeMapping. | ||
| assert call_kwargs["properties"]["$group_1"] == "https://eu.posthog.com" | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_on_success_mirrors_to_secondary_destination_when_configured( | ||
| self, | ||
| auth_user: AuthenticatedUser, | ||
| standard_logging_object: dict, | ||
| ) -> None: | ||
| mock_client = MagicMock() | ||
| with patch("llm_gateway.callbacks.posthog.Posthog", return_value=mock_client) as mock_cls: | ||
| callback = PostHogCallback( | ||
| api_key="eu-key", | ||
| host="https://eu.i.posthog.com", | ||
| region_url="https://eu.posthog.com", | ||
| secondary_api_key="us-key", | ||
| secondary_host="https://us.i.posthog.com", | ||
| ) | ||
| kwargs = {"standard_logging_object": standard_logging_object, "litellm_params": {}} | ||
|
|
||
| with ( | ||
| patch("llm_gateway.callbacks.posthog.get_auth_user", return_value=auth_user), | ||
| patch("llm_gateway.callbacks.posthog.get_product", return_value="slack_app"), | ||
| ): | ||
| await callback._on_success(kwargs, None, 0.0, 1.0, end_user_id=None) | ||
|
|
||
| # One Posthog client constructed per destination, each captured once. | ||
| primary_call, secondary_call = mock_cls.call_args_list | ||
| assert primary_call.args == ("eu-key",) | ||
| assert primary_call.kwargs["host"] == "https://eu.i.posthog.com" | ||
| assert secondary_call.args == ("us-key",) | ||
| assert secondary_call.kwargs["host"] == "https://us.i.posthog.com" | ||
| assert mock_client.capture.call_count == 2 | ||
|
|
||
| # Both copies carry the EU origin region — via the SDK `groups` arg | ||
| # and via an explicit $group_1 property — so the US-region usage | ||
| # report filter ($group_1 = 'https://us.posthog.com') excludes the | ||
| # mirrored copy and does not double-count. | ||
| for capture_call in mock_client.capture.call_args_list: | ||
| assert capture_call.kwargs["groups"] == { | ||
| "instance": "https://eu.posthog.com", | ||
| "project": 456, | ||
| } | ||
| assert capture_call.kwargs["properties"]["$group_1"] == "https://eu.posthog.com" | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_on_success_skips_secondary_destination_when_unconfigured( | ||
| self, | ||
| callback: PostHogCallback, | ||
| auth_user: AuthenticatedUser, | ||
| standard_logging_object: dict, | ||
| mock_posthog_client: tuple, | ||
| ) -> None: | ||
| mock_cls, mock_client = mock_posthog_client | ||
| kwargs = {"standard_logging_object": standard_logging_object, "litellm_params": {}} | ||
|
|
||
| with ( | ||
| patch("llm_gateway.callbacks.posthog.get_auth_user", return_value=auth_user), | ||
| patch("llm_gateway.callbacks.posthog.get_product", return_value="slack_app"), | ||
| ): | ||
| await callback._on_success(kwargs, None, 0.0, 1.0, end_user_id=None) | ||
|
|
||
| assert mock_cls.call_count == 1 | ||
| assert mock_client.capture.call_count == 1 | ||
|
|
||
|
|
||
| class TestNormalizeTraceId: | ||
| def test_returns_fresh_uuid_when_value_is_falsy(self) -> None: |
There was a problem hiding this comment.
Three new tests are not parameterised
The project preference is to use parameterised tests. test_on_success_uses_configured_region_url_in_groups, test_on_success_skips_secondary_destination_when_unconfigured, and the existing test_on_success_captures_event all call _on_success and then assert on groups / $group_1 under different PostHogCallback configurations. These are natural candidates for a single @pytest.mark.parametrize over (region_url, secondary_api_key, secondary_host, expected_groups, expected_group_1).
Prompt To Fix With AI
This is a comment left during a code review.
Path: services/llm-gateway/tests/callbacks/test_posthog.py
Line: 434-526
Comment:
**Three new tests are not parameterised**
The project preference is to use parameterised tests. `test_on_success_uses_configured_region_url_in_groups`, `test_on_success_skips_secondary_destination_when_unconfigured`, and the existing `test_on_success_captures_event` all call `_on_success` and then assert on `groups` / `$group_1` under different `PostHogCallback` configurations. These are natural candidates for a single `@pytest.mark.parametrize` over `(region_url, secondary_api_key, secondary_host, expected_groups, expected_group_1)`.
How can I resolve this? If you propose a fix, please make it concise.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| # The AI usage report hardcodes `$group_1` (posthog/tasks/usage_report.py) | ||
| # so the gateway must guarantee that slot regardless of how the | ||
| # destination team's group types are registered. | ||
| "$group_1": self._region_url, |
There was a problem hiding this comment.
Medium: Billing region override
get_posthog_properties() is populated from X-PostHog-Property-* request headers and is merged immediately after this, so an authenticated Anthropic caller can send X-PostHog-Property-$group_1: anything-else and replace the region value used by the usage-report query. Merge untrusted properties before setting billing-controlled fields, or reject reserved keys like $group_1, $ai_billable, and team_id when copying header-derived properties.
PR overviewThere is one remaining security issue affecting how header-derived PostHog properties are merged into billing-related metadata. An authenticated caller can override reserved fields such as the billing region, which could distort or misdirect usage reporting unless those keys are protected. No issues have been addressed yet, so the PR still needs a targeted fix before the security posture is acceptable. Open issues (1)
Fixed/addressed: 0 · PR risk: 6/10 |
|
|
||
| default_fallback_cost_usd: float = 0.01 | ||
|
|
||
| posthog_api_base_url: str = "https://us.posthog.com" |
There was a problem hiding this comment.
kinda thinking — now that posthog_api_base_url also feeds the $group_1 billing tag, the default makes a misconfigured EU deployment fail silently (events get tagged US) instead of just breaking the plan/quota API calls. probably fine if the charts side always sets it, but wouldnt hurt to either drop the default or validate against CLOUD_REGION_TO_URL at startup.
There was a problem hiding this comment.
i will leave it as it is and will update it as follow up once all the pieces are deployed (including charts) if that's fine
Address PR review feedback on the dual-region capture path: - Wrap each `_capture_to_destination` call in `_capture_sync` in its own try/except so a primary failure (Posthog() construction raising, or capture_exception itself raising inside the inner handler) does not skip the mirror capture that the regional usage-report depends on. - Shallow-copy the `properties` and `groups` dicts inside `_capture_to_destination` so an in-place SDK mutation on one capture (e.g. adding $lib / $lib_version) cannot bleed into the other. - Add an `_on_failure` mirror test to cover the dual-destination path for error events alongside the existing success-path tests.
Address Josh's review feedback: the secondary capture mirrors regional billing state, so we want both destinations to succeed or fail together rather than have one silently succeed while the other errors. Reverts the outer try/except around each `_capture_to_destination` call that the previous commit added. If the primary raises, the secondary intentionally does not run — keeping the two PostHog instances in sync is more valuable than guaranteeing the mirror always fires. Shallow-copy + the `_on_failure` mirror test from the previous commit stay; they're independent of the propagation policy.
|
The thread on my earlier If the decision was to accept this risk intentionally, that's fine — but the thread being resolved without a corresponding code change is easy to miss on a later pass. |
| # No outer try/except: the destinations feed regional billing | ||
| # aggregations, so we want them to succeed or fail together. If the | ||
| # primary raises, the secondary intentionally does not run so the two | ||
| # PostHog instances stay in sync rather than diverging on billing state. | ||
| self._capture_to_destination(self._api_key, self._host, **capture_kwargs) | ||
| if self._secondary_api_key and self._secondary_host: | ||
| self._capture_to_destination( | ||
| self._secondary_api_key, | ||
| self._secondary_host, | ||
| **capture_kwargs, | ||
| ) |
There was a problem hiding this comment.
Exception handling mismatch with stated intent. The comment claims "we want them to succeed or fail together" and "If the primary raises, the secondary intentionally does not run," but this is undermined by the fire-and-forget pattern in _capture_fire_and_forget.
Since _capture_sync runs in loop.run_in_executor() without awaiting the result, exceptions are not propagated back to the caller. If the primary succeeds but the secondary fails, the exception is silently swallowed by the executor, causing the two instances to diverge on billing state - exactly what the comment says should not happen.
Either:
- Await the executor future to propagate exceptions, or
- Update the comment to reflect that secondary failures are actually tolerated and won't prevent primary capture success
| # No outer try/except: the destinations feed regional billing | |
| # aggregations, so we want them to succeed or fail together. If the | |
| # primary raises, the secondary intentionally does not run so the two | |
| # PostHog instances stay in sync rather than diverging on billing state. | |
| self._capture_to_destination(self._api_key, self._host, **capture_kwargs) | |
| if self._secondary_api_key and self._secondary_host: | |
| self._capture_to_destination( | |
| self._secondary_api_key, | |
| self._secondary_host, | |
| **capture_kwargs, | |
| ) | |
| # Note: _capture_to_destination uses a fire-and-forget async pattern, | |
| # so exceptions from either destination are not propagated back to the | |
| # caller. If the secondary fails, the failure is silently swallowed and | |
| # the two PostHog instances may diverge on billing state. Both captures | |
| # are best-effort: primary failure does not prevent secondary capture, | |
| # and secondary failure does not surface to the caller. | |
| self._capture_to_destination(self._api_key, self._host, **capture_kwargs) | |
| if self._secondary_api_key and self._secondary_host: | |
| self._capture_to_destination( | |
| self._secondary_api_key, | |
| self._secondary_host, | |
| **capture_kwargs, | |
| ) | |
Spotted by Graphite
Is this helpful? React 👍 or 👎 to let us know.
Problem
The LLM gateway captures every
$ai_generationevent to a single PostHog instance (currentlyhttps://us.i.posthog.comin both regional deployments). The regional usage-report runs inposthog/tasks/usage_report.py:1082-1241aggregate per region — EU run from team_id=1 on EU, US run from team_id=2 on US — and filter by$group_<N>matching the customer region URL. Today EU gateway events never land in EU PostHog at all, so EU customer spend is invisible to billing.ee/hogai/core/runner.py:201-206already solves this for the PostHog AI web path by appending a US client when the deployment is EU; this PR ports the same idea to the gateway's flat-call model.Context from the internal Slack thread that prompted this work: https://posthog.slack.com/archives/C0ADR896HJ9/p1779955029038159?thread_ts=1779805800.149569&cid=C0ADR896HJ9
Changes
posthog_secondary_project_token,posthog_secondary_host. Region URL is reused fromposthog_api_base_url(same per-deployment value).PostHogCallbacknow stamps$group_1directly on every event (the value the billing query atposthog/tasks/usage_report.py:1200hardcodes) and also passesgroups={"instance": region_url, "project": team_id}so SDK group resolution stays consistent.Companion PR
Charts override that wires up the EU fanout: PostHog/charts#11401
Deployment order
Either order works. The gateway's pydantic settings use the default
extra="ignore"policy (env_prefix only, no override), so unknownLLM_GATEWAY_POSTHOG_SECONDARY_*env vars are silently dropped by a binary that doesn't know about them. The gateway code is also backward-compatible when those settings are unset.End state is the same regardless of order — EU billing starts working only once both PRs are deployed. Pick whichever fits your deploy windows; the only thing that goes live on this PR alone is the
$group_1stamp on every captured event (using the existingposthog_api_base_urlvalue per deployment), which makes this PR the cleaner rollback target if anything looks off.Testing
I'm an agent.
uv run pytest tests/callbacks/test_posthog.py— 56 passed, four new tests covering region URL stamping, secondary mirroring on_on_success, secondary mirroring on_on_failure, and the no-secondary fallback. Full gateway suite: 932 passed.bin/mprocs.yamlwas wired up with the new env vars so the local gateway picked them up on restart):LLM_GATEWAY_POSTHOG_SECONDARY_*→ exactly onePostHog capturing eventlog per request, properties carry$group_1: "https://us.posthog.com", groups carries{"instance": "https://us.posthog.com", "project": <team_id>}. Confirms backward compatibility.LLM_GATEWAY_POSTHOG_API_BASE_URL=https://eu.posthog.comand bothLLM_GATEWAY_POSTHOG_SECONDARY_*set → both destinations receive the event (verified via temporary debug logsposthog_capture_sync_entered+posthog_capture_firing_secondary, then removed), and both copies carry$group_1: "https://eu.posthog.com"so the US-region usage-report filter excludes the mirror.ruff checkandruff format --checkon touched files — clean.Publish to changelog?
no
🤖 Agent context
Co-authored with Claude Code at Vojta's direction. The staged rollout (charts keeps primary on US in Phase 1, flips later) was specifically chosen so today's billing/observability path is untouched while EU billing starts being attributed correctly via the secondary fanout.