feat(data-warehouse): add ClickHouse source#53601
Conversation
Adds a data warehouse source for ClickHouse, built for scalability with very large databases via clickhouse-connect's streaming Arrow reader, free row/byte counts from system.tables, and sorting-key-based primary key discovery. Supports HTTPS, SSL verification toggle, and SSH tunnel. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Size Change: +91 B (0%) Total Size: 129 MB ℹ️ View Unchanged
|
Migration SQL ChangesHey 👋, we've detected some migrations on this PR. Here's the SQL output for each migration, make sure they make sense:
|
🔍 Migration Risk AnalysisWe've analyzed your migrations for potential risks. Summary: 0 Safe | 1 Needs Review | 0 Blocked
|
|
⏭️ Skipped snapshot commit because branch advanced to The new commit will trigger its own snapshot update workflow. If you expected this workflow to succeed: This can happen due to concurrent commits. To get a fresh workflow run, either:
|
|
🎭 Playwright report · View test results →
These issues are not necessarily caused by your changes. |
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Query snapshots: Backend query snapshots updatedChanges: 2 snapshots (2 modified, 0 added, 0 deleted) What this means:
Next steps:
|
…eature-clickhouse-source
- Bound the duplicate-primary-key probe to a 10M-row prefix with read_overflow_mode='break' so misconfiguration detection is O(budget) instead of a full-table GROUP BY on every incremental sync. Fail-safe flips to True on unexpected errors to block merges against unverifiable keys. - Add optimize_read_in_order and max_bytes_before_external_sort to the data query. When the cursor leads the sorting key the top-level sort is skipped; otherwise we spill to disk instead of OOMing. Warn when the cursor isn't a sort-key prefix. - Accumulate streamed Arrow blocks into ~200 MiB / ~100k-row pa.Tables before yielding, collapsing the Delta commit count by ~5x on large tables without raising peak memory meaningfully. - Replace the full-table row count on incremental resumes with a bounded WHERE cursor > last_value count so progress reporting tracks actual work. Default rows_to_sync to None instead of 0 when unknown. - Widen _get_client exception wrapping to cover OSError and ssl.SSLError alongside ClickHouseError. Made-with: Cursor
…to dc-feature-clickhouse-source Made-with: Cursor # Conflicts: # posthog/api/test/__snapshots__/test_api_docs.ambr
…eature-clickhouse-source
|
⏭️ Skipped snapshot commit because branch advanced to The new commit will trigger its own snapshot update workflow. If you expected this workflow to succeed: This can happen due to concurrent commits. To get a fresh workflow run, either:
|
- password: str -> str | None across clickhouse.py signatures (matches ClickHouseSourceConfig), coerce to "" at the clickhouse-connect boundary - pa.timestamp: branch on optional tz and tighten _datetime_unit_for_precision return to Literal so the overload resolves - test: narrow response.items() away from AsyncIterable before list() Made-with: Cursor
|
⏭️ Skipped snapshot commit because branch advanced to The new commit will trigger its own snapshot update workflow. If you expected this workflow to succeed: This can happen due to concurrent commits. To get a fresh workflow run, either:
|
Prompt To Fix All With AIThis is a comment left during a code review.
Path: posthog/temporal/data_imports/sources/clickhouse/clickhouse.py
Line: 423-428
Comment:
**Wrong precision/scale for Decimal shorthand types**
For `Decimal32(S)`, `Decimal64(S)`, `Decimal128(S)`, and `Decimal256(S)` the single argument is the **scale**, not the precision — the precision is fixed by the variant (9 / 18 / 38 / 76). The current regex puts `S` into group 1 and interprets it as precision with an implied scale of 0, so `Decimal32(4)` produces `pa.decimal128(4, 0)` instead of the correct `pa.decimal128(9, 4)`. ClickHouse sends Arrow data with the real precision/scale, so the registered Delta schema and the actual wire schema disagree — downstream writes can fail or silently corrupt values.
The test `test_decimal_types` only asserts `isinstance(…, Decimal128Type)` so it doesn't catch the wrong precision/scale values.
Suggested fix — split the two forms:
```python
_DECIMAL_FIXED_WIDTHS: dict[str, int] = {"32": 9, "64": 18, "128": 38, "256": 76}
_DECIMAL_FIXED_RE = re.compile(r"^Decimal(32|64|128|256)\(\s*(\d+)\s*\)$")
_DECIMAL_VAR_RE = re.compile(r"^Decimal\(\s*(\d+)\s*(?:,\s*(\d+)\s*)?\)$")
```
Then in `_inner_to_arrow_type`:
```python
match_fixed = _DECIMAL_FIXED_RE.match(inner)
if match_fixed is not None:
precision = _DECIMAL_FIXED_WIDTHS[match_fixed.group(1)]
scale = int(match_fixed.group(2))
return build_pyarrow_decimal_type(precision, scale)
match_dec = _DECIMAL_VAR_RE.match(inner)
if match_dec is not None:
precision = int(match_dec.group(1))
scale = int(match_dec.group(2)) if match_dec.group(2) is not None else 0
return build_pyarrow_decimal_type(precision, scale)
```
And the test should be extended to assert both `field.type.precision` and `field.type.scale`.
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: posthog/temporal/data_imports/sources/clickhouse/clickhouse.py
Line: 672-699
Comment:
**`incremental_field_type` parameter accepted but never used**
`_build_query` accepts `incremental_field_type` solely to validate it is not `None`, but the value is never used in the query string or the returned parameter dict (the returned `{}` is always discarded with `_` at the call site in `get_rows`). The parameter is superfluous — the guard on `incremental_field` alone is sufficient, and the type-specific logic (`incremental_type_to_initial_value`) already lives in `get_rows`.
```suggestion
def _build_query(
*,
database: str,
table_name: str,
should_use_incremental_field: bool,
incremental_field: Optional[str],
) -> str:
"""Build the data extraction query.
Returns the SQL string. We never interpolate the incremental cursor
value directly — only identifiers (which are validated) end up in the
SQL string.
"""
qualified = _qualified_table(database, table_name)
if not should_use_incremental_field:
return f"SELECT * FROM {qualified}"
if incremental_field is None:
raise ValueError("incremental_field can't be None when should_use_incremental_field is True")
quoted_field = _quote_identifier(incremental_field)
return f"SELECT * FROM {qualified} WHERE {quoted_field} > %(last_value)s ORDER BY {quoted_field} ASC"
```
The call site in `get_rows` would change to `query = _build_query(...)` and you can drop `incremental_field_type=incremental_field_type` from that call.
How can I resolve this? If you propose a fix, please make it concise.Reviews (1): Last reviewed commit: "fix(data-warehouse): satisfy mypy for Cl..." | Re-trigger Greptile |
Query snapshots: Backend query snapshots updatedChanges: 1 snapshots (1 modified, 0 added, 0 deleted) What this means:
Next steps:
|
|
To use Codex here, create a Codex account and connect to github. |
- query_arrow_stream yields RecordBatches; switch accumulator to pa.Table.from_batches to avoid the pa.concat_tables type mismatch. - Build an explicit SELECT list and wrap Arrow-incompatible column types (UUID, IPv4/6, wide ints, Enum*, FixedString, Array, Map, Tuple, Nested, Variant, Dynamic, JSON, Object) in toString() to avoid ClickHouse error 50 on SELECT *. - Extend row-count discovery to Distributed tables (SELECT count() fallback) and MaterializedViews (resolve TO target, else .inner_id inner table). Plain views and no-counter engines stay skipped. - Upgrade discovery/query log lines to info so users see them on the syncs tab; add an entry log for get_rows(). - Frontend: show "Skipped" with an explanatory tooltip instead of "Unknown" when row count is unavailable.
- Add get_primary_keys_for_schemas that reuses _get_primary_keys per table and wire detected_primary_keys into SourceSchema so the frontend can suggest sorting-key columns during setup. - Split DecimalN(S) from Decimal(P[, S]) — the former has fixed precision (9/18/38/76) and the lone arg is scale. Previous regex mis-mapped Decimal32(4) to Decimal(4, 0). Tests now assert exact precision and scale. - Drop the unused incremental_field_type parameter from _build_query and return a plain SQL string instead of (str, dict). The type-aware cursor seeding already lives in get_rows.
Prompt To Fix All With AIThis is a comment left during a code review.
Path: products/data_warehouse/frontend/shared/components/forms/SchemaForm.tsx
Line: 109-113
Comment:
**ClickHouse-specific tooltip in a shared component**
The tooltip text references "Memory/Buffer/Log-engine tables, or Kafka/URL table functions" — these are ClickHouse engine names that make no sense to a Postgres, MySQL, or Snowflake user seeing a null row count. The `SchemaForm` component is shared across every source; any source that fails to return a row count (e.g. due to a permissions error, or simply because a given source never populates that field) will now surface ClickHouse-specific jargon to unrelated users.
```suggestion
return (
<Tooltip title="Row count is unavailable for this table. The table can still be synced — we just don't know its size up front.">
<span className="text-muted-alt cursor-help">Skipped</span>
</Tooltip>
)
```
How can I resolve this? If you propose a fix, please make it concise.Reviews (2): Last reviewed commit: "fix(data-warehouse): detect primary keys..." | Re-trigger Greptile |
Cast the table name parameter to str before dict lookup to match the typed dict's key.
Drop tests that only assert trivial string formatting or exact dict values — keep the ones that exercise real logic (regex parsing, error translation substring match, engine-specific row-count branches, etc.).
Query snapshots: Backend query snapshots updatedChanges: 1 snapshots (1 modified, 0 added, 0 deleted) What this means:
Next steps:
|
Check for an empty or missing table name before indexing so callers get the "Table name is missing" ValueError instead of an IndexError.
Resolve SchemaForm.tsx conflict — keep master's new primary-key column and nested LemonCollapse structure while preserving the "Skipped" tooltip that replaces the "Unknown" row-count label.
Problem
PostHog's data warehouse supports a long list of sources but not ClickHouse itself. Users running their own ClickHouse deployments want to pull that data into PostHog without building a custom pipeline, and the target databases are often multi-terabyte and multi-billion row — the import path has to stream, not buffer.
Changes
Adds a new warehouse source for ClickHouse under
posthog/temporal/data_imports/sources/clickhouse/, following the same split as the Postgres source (source.pyfor registration/form fields/validation,clickhouse.pyfor transport). Designed up front to scale to very large databases.Scalability:
clickhouse-connectwithquery_arrow_stream, so data flows as a sequence ofpa.RecordBatchchunks sized by ClickHouse'smax_block_size. Batches are accumulated into ~100k-row / 200 MiBpa.Tables viapa.Table.from_batchesbefore yielding, so Delta sees fewer, larger commits. Memory per worker is bounded regardless of table size.system.tables.total_rowsfor MergeTree tables — noSELECT COUNT(*)on multi-billion row tables. Distributed tables fall back toSELECT count()(cheap, distributed). MaterializedViews resolve to theirTOtarget'stotal_rowsor their.inner_id.<uuid>inner table. Plain views and no-counter engines (Memory/Buffer/Log/Kafka/URL) are reported as "Skipped" with an explanatory tooltip in the UI.system.tables.total_bytes/total_rowsrather than sampling the table, targetingDEFAULT_PARTITION_TARGET_SIZE_IN_BYTES(200 MiB) per partition.output_format_arrow_string_as_string=1,output_format_arrow_low_cardinality_as_dictionary=0,optimize_read_in_order=1,max_bytes_before_external_sort=500 MiB,max_execution_time, tunablemax_block_size.Arrow compatibility:
SELECTlist and wrap those columns intoString(col) AS colso the stream never crashes. Type mapping handlesNullable/LowCardinalitywrappers,DateTime/DateTime64with precision + timezone,Decimal[32-256],Date/Date32, signed/unsigned ints up to 64-bit (wide Int128/256 fall back to string),Enum8/16, and composites serialized to string.Schema discovery:
system.columnsfor the whole database.is_in_sorting_keyonsystem.columns. Because ClickHouse's sorting key is not necessarily unique, every incremental sync runs_has_duplicate_primary_keysfirst (bounded-prefix probe withmax_rows_to_read+read_overflow_mode='break').system.tables.engine.infolevel so they surface on the syncs tab.Incremental sync:
Int8-Int256,UInt8-UInt256) and temporal (Date,Date32,DateTime,DateTime64) cursor fields.%(last_value)s) — only validated, backtick-quoted identifiers land in the SQL string. Identifier quoting escapes embedded backticks and rejects null bytes.Connection options: host, port, database, user, password (optional), HTTPS toggle, SSL-verify toggle, optional SSH tunnel. SSH tunnel works transparently because we use HTTP(S).
Registration / plumbing:
CLICKHOUSEtoExternalDataSourceTypeinproducts/data_warehouse/backend/types.py,posthog/schema.py(viaschema:build), andfrontend/src/queries/schema/schema-general.ts.0042_alter_externaldatasource_source_typeadds the choice to the model.ClickHouseSourceinposthog/temporal/data_imports/sources/__init__.py.ClickHouseSourceConfigregenerated viagenerate:source-configs.SchemaFormrenders "Skipped" with a tooltip instead of "Unknown" when row count is unavailable, explaining that counting would require a full scan.How did you test this code?
This PR was authored by an agent (Claude Code). Verification so far is code-level plus one round of manual smoke-testing against a local ClickHouse:
test_clickhouse.pycovering identifier quoting, type modifier stripping (Nullable/LowCardinality), incremental-field filtering across every supported CH type, query-builder output (includingtoStringwrapping of Arrow-incompatible types),ClickHouseColumn → pa.Fieldmapping for all supported types (includingDateTime64precision/timezone,Decimal[32-256], wide ints, enums, composites), non-retryable error pattern matching, error translation, schema grouping (mocked client),validate_credentialserror paths, batch-accumulation boundaries inget_rows, MV target parsing (qualified/backticked/unqualified/none), andget_clickhouse_row_countacross MergeTree/Distributed/MV-with-TO/MV-inner/View paths. All 141 passing.count()fallback, views render as "Skipped":Follow-ups for merging:
clickhouse.pngicon intofrontend/public/services/(currently references a placeholder path).posthog.com/docs/cdp/sources/clickhouse(URL referenced indocsUrl).Publish to changelog?
Yes — new warehouse source.
🤖 LLM context
Authored by Claude Code (Opus 4.6, 1M context) across multiple sessions. The agent read the existing Postgres, Snowflake, and MySQL sources as reference, followed the
implementing-warehouse-sourcesskill, and choseclickhouse-connect'squery_arrow_streamoverclickhouse-driverspecifically for the Arrow streaming path (which bounds memory on huge tables). Later sessions hardened the source after discovering real-world failures: ClickHouse's Arrow output refuses several common types (error 50),query_arrow_streamyieldsRecordBatchnotTable, andsystem.tables.total_rowsis NULL for Distributed tables and MaterializedViews — all now handled.