feat(data-warehouse): add tracked gRPC transport for warehouse sources#60439
Conversation
Prompt To Fix All With AIFix the following 2 code review issues. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 2
posthog/management/commands/warehouse_sources_capture_grpc_samples.py:149-158
**Implicit wildcard primary rule silently swallows `--rule` entries**
`_enable` always prepends a `primary` rule built from the flat flags (which default to `*/*/*/*/50`). Because `_first_match` stops at the first match, when an operator follows the documented multi-rule example — supplying only `--rule` flags and `--ttl` — every call matches the wildcard primary with `limit=50` before the explicit `--rule` entries are ever evaluated, so those rules are dead code.
For the exact command in the module docstring:
```
enable --rule source_type=google_ads,… --rule source_type=bigquery,… --ttl 1h
```
the effective rule list is `[CaptureRule(*, *, *, *, 50), google_ads rule, bigquery rule]`: the wildcard fires on every call, the two targeted rules never fire. The operator gets a noisy broad capture instead of the intended targeted one. The test `test_enable_supports_extra_rules` avoids the problem by explicitly passing `--source-type google_ads` on the primary, so it does not catch this case.
### Issue 2 of 2
posthog/temporal/data_imports/sources/common/grpc/proto_utils.py:34-45
**`code` in `_REDACT_PARAM_NAMES` may over-redact protobuf fields**
`_REDACT_PARAM_NAMES` is designed for OAuth URL parameters and includes generic names like `code`. When applied to proto-to-dict output via `_redact_keys`, any protobuf field named `code` — such as the `code` field on Google Ads `ErrorCode` or `Status` protos — will be replaced with `"REDACTED"`, making the captured sample less useful for debugging. The other names in the set (`client_secret`, `developer_token`, `refresh_token`, etc.) are specifically auth-bearing and are safe to share across HTTP and gRPC contexts, but `code` is broadly overloaded in protobuf schemas.
Reviews (1): Last reviewed commit: "chore(data-warehouse): document temporal..." | Re-trigger Greptile |
There was a problem hiding this comment.
Pull request overview
This PR adds a tracked gRPC transport for warehouse-source syncs so outbound gRPC calls (notably Google Ads and BigQuery Storage Read API) get consistent logs, OTel metrics, and opt-in sample capture—bringing gRPC sources to parity with the existing tracked HTTP transport.
Changes:
- Introduces tracked gRPC client interceptors + observer/metrics + sample-capture pipeline (Redis-gated, scrubbed payloads written to S3).
- Wires the tracked gRPC transport into
google_adsandbigquerysources, and updates shared context/scrubbing primitives. - Adds a Semgrep rule to prevent raw gRPC channel/client construction in
sources/**, plus docs updates.
Reviewed changes
Copilot reviewed 26 out of 26 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| posthog/temporal/data_imports/workflow_activities/import_data_sync.py | Switches job-context binding import to the new transport-neutral module. |
| posthog/temporal/data_imports/sources/SOURCES.md | Updates source inventory/docs to reflect gRPC tracking support. |
| posthog/temporal/data_imports/sources/google_ads/google_ads.py | Passes tracked gRPC interceptors into Google Ads get_service(...) calls. |
| posthog/temporal/data_imports/sources/bigquery/bigquery.py | Builds a credential-bearing BigQuery Storage channel and wraps it with tracked interceptors. |
| posthog/temporal/data_imports/sources/common/job_context.py | Adds transport-neutral JobContext + bind/scoped/current helpers. |
| posthog/temporal/data_imports/sources/common/http/context.py | Re-exports JobContext from the neutral module to preserve existing imports. |
| posthog/temporal/data_imports/sources/common/sample_scrub.py | Extracts shared capture config/rules + scrubadub-based scrubbing. |
| posthog/temporal/data_imports/sources/common/http/sampling.py | Switches HTTP sample capture to shared primitives from sample_scrub.py. |
| posthog/temporal/data_imports/sources/common/http/url_utils.py | Adds developer_token to redaction list. |
| posthog/temporal/data_imports/sources/common/grpc/init.py | Exposes tracked gRPC transport API surface. |
| posthog/temporal/data_imports/sources/common/grpc/transport.py | Implements unary + streaming client interceptors and channel wrapping. |
| posthog/temporal/data_imports/sources/common/grpc/observer.py | Adds per-call logging, metrics emission, and sample-capture delegation. |
| posthog/temporal/data_imports/sources/common/grpc/metrics.py | Adds OTel instruments + status-class mapping and instrument caching. |
| posthog/temporal/data_imports/sources/common/grpc/sampling.py | Implements gRPC sample capture (Redis config, S3 writes, truncation/size caps). |
| posthog/temporal/data_imports/sources/common/grpc/proto_utils.py | Adds proto sizing + proto→dict conversion + scrubbing/redaction helpers. |
| posthog/management/commands/warehouse_sources_capture_grpc_samples.py | Adds management command to enable/disable/list gRPC sample capture. |
| posthog/management/commands/test/test_warehouse_sources_capture_grpc_samples.py | Tests for the gRPC sample capture management command. |
| posthog/temporal/data_imports/sources/common/test/test_job_context.py | Tests for the job-context neutralization + HTTP shim behavior. |
| posthog/temporal/data_imports/sources/common/test/test_http_sampling.py | Updates HTTP sampling tests to patch scrubber from the new shared module. |
| posthog/temporal/data_imports/sources/common/test/test_grpc_transport.py | Unit tests for the gRPC interceptors + channel/interceptor factories. |
| posthog/temporal/data_imports/sources/common/test/test_grpc_observer.py | Unit tests for observer logging/metrics/capture behavior and failure isolation. |
| posthog/temporal/data_imports/sources/common/test/test_grpc_metrics.py | Unit tests for metric instruments, caching, and status bucketing. |
| posthog/temporal/data_imports/sources/common/test/test_grpc_sampling.py | Unit tests for proto scrubbing, rule matching, Redis gating, and S3 layout. |
| .semgrep/rules/data-imports-grpc-transport.yaml | Adds Semgrep rules enforcing usage of the tracked gRPC transport. |
| .semgrep/rules/data-imports-grpc-transport.py | Semgrep rule test fixtures. |
| .agents/skills/implementing-warehouse-sources/SKILL.md | Documents the gRPC transport requirement and how to use it in new sources. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
PR overviewAll previously flagged issues have been addressed. No open security concerns remain on this pull request. Security reviewNo open security issues remain on this pull request. Fixed/addressed: 1 · PR risk: 0/10 |
1887f0d to
490a9ad
Compare
Adds a tracked gRPC transport so outbound gRPC calls from warehouse-source syncs (Google Ads, BigQuery Storage Read API) get consistent logging, OTel metrics, and opt-in sample capture — bringing gRPC sources to parity with the tracked HTTP transport. - Tracked gRPC client interceptors + observer/metrics + Redis-gated sample-capture pipeline (scrubbed payloads written to S3). - Transport-neutral JobContext and shared scrubbing primitives (sample_scrub) reused by both HTTP and gRPC. - Wires the tracked gRPC transport into google_ads and bigquery sources. - Semgrep rule preventing raw gRPC channel/client construction under sources/**, plus SOURCES.md and skill docs updates. Rebased onto master to resolve conflicts in SOURCES.md and http/sampling.py. Generated-By: PostHog Code Task-Id: 1a08165a-e60b-4ca8-8dc7-fe6de8e876c8
Prompt To Fix All With AIFix the following 1 code review issue. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 1
posthog/temporal/data_imports/sources/common/grpc/transport.py:115-189
**Stream telemetry silently dropped on early consumer termination**
`_TrackedStreamWrapper` only calls `_record` from `__next__` (on `StopIteration`, `RpcError`, or `Exception`). When a consumer exits iteration early — via `break`, an early return, or an explicit `close()` call — Python's generator protocol calls `close()` on the wrapper. Because `close()` is not defined on `_TrackedStreamWrapper`, `__getattr__` delegates it to `self._iter.close()` but `_record` is never triggered.
In practice this means: if a BigQuery or Google Ads streaming read is aborted mid-way (pipeline cancellation, timeout, `break` in the consumer loop), the call produces no log line, no metric increment, and no sample. The wrapper becomes invisible to telemetry. Adding a `close()` method that calls `self._record(code=None, exception=None)` before delegating to `self._iter.close()` would close the gap, guarded by the existing `_recorded` flag so double-recording is impossible.
Reviews (2): Last reviewed commit: "feat(data-warehouse): add tracked gRPC t..." | Re-trigger Greptile |
Address review feedback: - bigquery: move BigQueryReadClient construction inside the try block so transport.close() still runs (and the gRPC channel is freed) when client construction raises. - grpc transport: add _TrackedStreamWrapper.close() so a stream aborted early (explicit close, cancellation, generator teardown) still records its telemetry once, guarded by the existing _recorded flag, before delegating to the inner iterator's close(). - tests for the new close() behavior (partial-stream record, idempotency after completion, inner-close delegation). Generated-By: PostHog Code Task-Id: 1a08165a-e60b-4ca8-8dc7-fe6de8e876c8
|
Addressed the latest bot review feedback in a930adc:
|
|
🎭 Playwright report · View test results →
These issues are not necessarily caused by your changes. |
|
Alright, so I could not get a BQ source to test this, so I went into a different direction: try to break it, make it not work, provide evidence that this doesn't actually work as intended. Happy to say me and PH Code failed, although we did find something. Sending the full report below, but this is basically what we need to fix: Telemetry gap on early-terminated streams _TrackedStreamWrapper in transport.py (class at line 115) only records via _record() when next hits terminal StopIteration/RpcError (lines 148–159) or when close() is called explicitly (lines 186–194). It's a plain class iterator with no del, so on early termination the call's metric + sample + log are silently lost. The real trigger: pipeline.py:117 ends a source by calling iterator.close() on the source generator in a finally (cancel/stop). That GeneratorExit unwinds the for … yield loop but does not cascade .close() down to _TrackedStreamWrapper, so _record() never fires. Proven: full-drain records once; break / GC-abandon / source_generator.close() after partial read all record 0 times. Impact: telemetry-only (no data loss / sync failure), but it blinds metrics+samples exactly on cancelled and failing streaming syncs — the highest-value observability moments. Affects bigquery read_rows (the SDK layers between the dlt generator and our wrapper make full cascade unlikely; not 100% confirmed E2E without creds). (Note: contextvar propagation is fine — pipeline.py:88 copy_context().run preserves the JobContext; verified.) Fix — add a del safety net to _TrackedStreamWrapper in transport.py (right after close(), ~line 195). Idempotent via the existing _recorded guard; _record is already try/except-wrapped: def del(self) -> None: |
|
Full report: Adversarial test report — tracked gRPC transport (PR #60439)Goal: try to falsify the PR — find a way it silently fails. Honest headlineI could not show that "the PR doesn't work." Its main paths are sound — the But I did find one real, narrow defect: on early stream termination It is a telemetry-only gap: no data loss, no corruption, no sync failure. Root cause (one bug, three demonstrations)
Python does not call Attack results
Why attack 4 is the one that matters
Why attack 3 is NOT a flawThe job-context docstring claims contextvars propagate via When does this actually bite?The success path (stream ends naturally →
These are precisely the failure / interruption moments where an operator most Recommended fix (small)Add a def __del__(self) -> None:
# Last-resort: a stream torn down without reaching StopIteration/close()
# (early break, generator.close() from the pipeline, GC) still records once.
try:
self._record(code=None, exception=None)
except Exception:
passThis closes attacks 1, 2, and 4 (after teardown the wrapper is unreferenced → GC → Two known Verdict
|
danielcarletti
left a comment
There was a problem hiding this comment.
Approved with a small fix, see my comment with the summary of my analysis
Problem
Warehouse-source syncs make a lot of outbound gRPC traffic from the two gRPC sources — google_ads (
GoogleAdsClient) and bigquery (Storage Read API) — and none of it is observable today. The tracked HTTP transport gave everyrequests-based source uniform logs, per-source latency/byte metrics, and an opt-in sample-capture pipeline, but it stops at HTTP. gRPC calls bypass all of it: noteam_id/source_typeon the logs, no metrics, no easy way to grab a real request/response pair as a test fixture, and nothing stopping a new gRPC source from dropping in a raw channel.This brings the two gRPC sources to parity.
Changes
posthog/temporal/data_imports/sources/common/grpc/. Client interceptors (TrackedUnaryUnaryClientInterceptor,TrackedUnaryStreamClientInterceptor) time, size, and log every RPC, emit three OTel instruments (data_import_grpc_requests_total,data_import_grpc_latency_ms,data_import_grpc_response_bytes) labelled byteam_id,source_type,method, and a low-cardinality gRPCstatus_class(ok/client_error/resource_exhausted/unavailable/server_error/error), and feed opt-in sample capture. Streaming responses are sized-and-released — never buffered — so BigQuery's large reads keep streaming; only a small head of messages is retained, and only when capture is armed. All telemetry is wrapped so it can never raise into the call path.google_ads/google_ads.pypassesinterceptors=tracked_interceptors(host)to bothget_service(...)calls. google-ads rebuilds the channel on everyget_service, so the interceptors are re-supplied each time rather than wrapping a single persistent channel.bigquery/bigquery.pybuilds a credential-bearing channel viaBigQueryReadGrpcTransport.create_channel(...), wraps it withmake_tracked_channel(...), and hands it to the transport.create_read_session(unary) andread_rows(server-streaming) both ride the interceptors. The REST/metadata path was already HTTP-tracked and is untouched.common/grpc/sampling.py) plus awarehouse_sources_capture_grpc_samplesmanagement command. Redis-gated config with TTL, protobuf → scrubbed JSON viaMessageToDict+ scrubadub, auth keys (incl.developer_token/refresh_token/client_secret) dropped by name, streaming responses truncated to a small head with atruncatedflag, and a 256 KiB payload ceiling. Samples land in S3 underwarehouse-sources-grpc-samples/{capture_id}/{source_type}/{seq}.json. Rules match on the gRPC status class or numeric code.JobContext(and the bind/scoped/current helpers) moved tocommon/job_context.py;common/http/context.pyis now a re-export shim so existing HTTP imports are unchanged.CaptureRule/CaptureConfigand the scrubadub scrubbing moved tocommon/sample_scrub.py, shared by both transports.data-imports-grpc-transportbans rawgrpc.*_channel(...)and directBigQueryReadClient(...)/GoogleAdsClient(...)construction insidesources/**(excluding thecommon/grpc/package and the two reference source files, whose enforced invariant is routing through the tracked transport). 3/3 fixtures pass; 0 findings on the sources tree.SOURCES.mdflips google_ads and bigquery's gRPC state to ✅ and updates the legend; theimplementing-warehouse-sourcesskill gains an "Outbound gRPC must go through the tracked gRPC transport" section.How did you test this code?
I'm an agent (Claude Opus 4.7). Only automated tests — I did not exercise either source against its real upstream API (no credentials locally).
common/test/test_grpc_{transport,metrics,observer,sampling}.py,common/test/test_job_context.py, and the management-command test. Coverage includes: unary success/error/observer-raises (future returned unchanged so the SDK still raises), streaming success/lazy-consumption/error-mid-iteration/head-retention, the fullstatus_classmapping over everygrpc.StatusCode, metric instrument caching + null fallback, protobuf scrubbing + key redaction, rule matching on gRPC status, Redis slot reservation, payload truncation + size cap, S3 key layout, and the context shim re-export identity.XMinioStorageFullMinIO infra errors unrelated to this change.semgrep --teston the rule fixtures (3/3) and a full scan ofsources/(0 findings).ruff format+ruff check --fixover all changed files.Publish to changelog?
no
Docs update
n/a
🤖 Agent context
Authored end-to-end by an agent (Claude Opus 4.7) at the request of the human author, who reviews and owns the PR. Scoped from the existing HTTP-transport PR with three decisions made up front: (1) full parity including the sample-capture pipeline rather than logs/metrics only; (2) extract the shared
JobContextinto a transport-neutral module with a re-export shim, rather than have gRPC import from the HTTP package; (3) a semgrep guard banning raw channel/client construction rather than relying on docs alone.Key non-obvious decisions worth a reviewer's eye:
get_servicecall, so there is no single channel to wrap once — the interceptors must be passed on every call. Both call sites (get_schemas,get_rows) do this.channel=is supplied, so the channel must be built withcreate_channel(credentials=...)before interception; passing both credentials and a channel would silently drop the credentials..code()/.result()off the already-resolved future for telemetry), so error propagation via the SDK's own.result()is preserved.