feat(data-warehouse): allow enabling and disabling CDC after source creation#60708
Conversation
…reation Add three new viewset actions (enable_cdc, disable_cdc, update_cdc_settings) on ExternalDataSource and a Configuration tab CDC section that wires them up with confirmation modals. Disable forces CDC schemas to pick a new sync strategy by clearing sync_type and pausing. Generalize the CDC viewset code so it dispatches via CDCSourceAdapter (setup_resources + cleanup_resources) instead of instantiating PostgresSource or PostgresCDCConfig directly — future CDC engines (MySQL, etc) plug in by implementing the adapter Protocol. Postgres setup/cleanup logic now lives on the adapter alongside slot/publication primitives. Setup also rolls back partial slot/publication state on failure so a half- provisioned source DB doesn't leak replication infra. Generated-By: PostHog Code Task-Id: fcaae539-62ef-4f3c-ae16-27bc684d8446
Prompt To Fix All With AIFix the following 2 code review issues. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 2
products/data_warehouse/frontend/scenes/SourceScene/tabs/CDCSection.tsx:372-374
Stale prerequisite result after publication name change — when a user in self-managed mode runs the prereq check against "pub_a", then edits the input to "pub_b", the success banner for "pub_a" remains visible. The Enable button is not gated on a stale result (the server re-validates), but the UI can mislead the user into thinking "pub_b" was already validated.
```suggestion
<LemonField.Pure label="Publication name">
<LemonInput
value={publicationName}
onChange={(v) => {
setPublicationName(v)
setPrereqResult(null)
}}
placeholder="posthog_pub"
/>
</LemonField.Pure>
```
### Issue 2 of 2
products/data_warehouse/backend/api/external_data_source.py:1955-1967
Only the latest running job is cancelled before teardown — if another CDC extraction job for the same source is concurrently `Running` (e.g., a late-starting retry that hasn't yet registered a workflow ID, or two overlapping trigger calls), its active use of the replication slot will cause `pg_drop_replication_slot` to fail inside `cleanup_resources`. The best-effort wrapper suppresses the error, so the slot remains on the customer database consuming WAL. The `cdc_*` keys are still cleared locally, so PostHog won't restart CDC — but the orphaned slot persists until the concurrent job finishes or is cancelled manually.
Reviews (1): Last reviewed commit: "feat(data-warehouse): allow enabling and..." | Re-trigger Greptile |
|
|
||
| cdc_error = self._setup_cdc_resources(adapter, instance, request.data) | ||
| if cdc_error is not None: | ||
| return Response( | ||
| status=status.HTTP_400_BAD_REQUEST, | ||
| data={"message": cdc_error}, | ||
| ) | ||
|
|
||
| # Ensure the CDC extraction schedule + global cleanup schedule exist. No CDC | ||
| # schemas yet (user must switch sync_type=cdc on the Sync tab afterward), so | ||
| # `sync_cdc_extraction_schedule` no-ops or creates a paused schedule — that's | ||
| # fine, it'll start running on the next schema update. | ||
| try: |
There was a problem hiding this comment.
Only the latest running job is cancelled before teardown — if another CDC extraction job for the same source is concurrently
Running (e.g., a late-starting retry that hasn't yet registered a workflow ID, or two overlapping trigger calls), its active use of the replication slot will cause pg_drop_replication_slot to fail inside cleanup_resources. The best-effort wrapper suppresses the error, so the slot remains on the customer database consuming WAL. The cdc_* keys are still cleared locally, so PostHog won't restart CDC — but the orphaned slot persists until the concurrent job finishes or is cancelled manually.
Prompt To Fix With AI
This is a comment left during a code review.
Path: products/data_warehouse/backend/api/external_data_source.py
Line: 1955-1967
Comment:
Only the latest running job is cancelled before teardown — if another CDC extraction job for the same source is concurrently `Running` (e.g., a late-starting retry that hasn't yet registered a workflow ID, or two overlapping trigger calls), its active use of the replication slot will cause `pg_drop_replication_slot` to fail inside `cleanup_resources`. The best-effort wrapper suppresses the error, so the slot remains on the customer database consuming WAL. The `cdc_*` keys are still cleared locally, so PostHog won't restart CDC — but the orphaned slot persists until the concurrent job finishes or is cancelled manually.
How can I resolve this? If you propose a fix, please make it concise.…entials The Configuration-page "Check database prerequisites" button reused the creation-wizard endpoint, which expects the raw connection config (incl. password) in the request body. For an existing source those secret fields are stripped from API responses, so the client sent a config with no password and the server rejected it with "Required field 'password' is missing". Add a detail=True check_cdc_prerequisites_for_source action that validates against the source's stored (encrypted) credentials via the CDC adapter, and point the Configuration page at it. Generated-By: PostHog Code Task-Id: fcaae539-62ef-4f3c-ae16-27bc684d8446
When enabling self-managed CDC from the Configuration tab, walk the user through the CREATE PUBLICATION / GRANT SQL in a modal before enabling, mirroring the creation wizard. The publication must exist before PostHog can create the replication slot, so "Set up & enable CDC" opens the SQL modal; the user runs it, confirms, and "Verify & enable" runs enable_cdc (which re-validates prerequisites server-side and surfaces any failures inline). SQL is built client-side from the source's non-secret job_inputs (schema, user) plus the publication name, defaulting the table list to currently-synced tables. PostHog-managed enable is unchanged — PostHog creates the publication itself. Generated-By: PostHog Code Task-Id: fcaae539-62ef-4f3c-ae16-27bc684d8446
PR overviewAll previously flagged issues have been addressed. No open security concerns remain on this pull request. Security reviewNo open security issues remain on this pull request. Fixed/addressed: 2 · PR risk: 0/10 |
The Configuration page never reflected CDC as enabled because the cdc_* keys live in job_inputs but aren't source-config form fields, so the read serializer stripped them as "unknown". Expose them via an explicit non-secret allowlist so the UI can render the enabled state, the "Disable CDC" / "Update CDC configs" controls, and details. Also: - Coerce string-bool job_inputs values on the client (EncryptedJSONField stores scalars as strings, so cdc_enabled/cdc_auto_drop_slot arrive as "True"/"False"). - Add a cdc_status endpoint (and adapter get_status) returning live replication slot / publication existence and WAL lag, surfaced in a "Replication status" panel with a refresh button and missing-slot/publication warnings. - Rename the enabled-state primary button to "Update CDC configs". - Widen test_list_external_data_source's FuzzyInt lower bound to match its own documented cached-lookup variance (now flakes less under a larger test set). Generated-By: PostHog Code Task-Id: fcaae539-62ef-4f3c-ae16-27bc684d8446
…text error The replication-status panel fetched cdc_status from a React useEffect on mount, which could run before the global team id was set, throwing "Team ID is not known." Move the fetch into sourceSettingsLogic as a loader triggered from loadSourceSuccess (which only fires after a successful source load, so the team context is guaranteed). This also follows the kea convention of keeping data fetching out of React effects. The status call opens a connection to the customer DB, so it runs once per source plus on manual refresh — never on the 5s poll. Generated-By: PostHog Code Task-Id: fcaae539-62ef-4f3c-ae16-27bc684d8446
Resolves the import conflict in external_data_source.py (drops the now-unused PostgresCDCConfig import; keeps SQLSource from master). Review fixes bundled in: - setup_resources now pre-flight checks slot/publication existence and refuses to proceed if either already exists, so a failed create never rolls back (drops) a slot/publication PostHog didn't create (Veria security finding). - disable_cdc cancels ALL running extraction jobs, not just the latest, so a concurrent job can't hold the replication slot and fail the teardown (Greptile). - Editing the self-managed publication name clears the stale prereq banner (Greptile). - Moved add/remove-table-to-publication into the CDC adapter (add_table/remove_table); external_data_source and external_data_schema now go through the adapter instead of calling slot_manager directly. Generated-By: PostHog Code Task-Id: fcaae539-62ef-4f3c-ae16-27bc684d8446
|
Addressed the AI-review findings (commit f709c8d):
Also moved the add/remove-table-to-publication resource ops onto the CDC adapter ( |
|
Size Change: 0 B Total Size: 80.9 MB ℹ️ View Unchanged
|
MCP UI Apps size report
|
Generated-By: PostHog Code Task-Id: fcaae539-62ef-4f3c-ae16-27bc684d8446
|
⏭️ Skipped snapshot commit because branch advanced to The new commit will trigger its own snapshot update workflow. If you expected this workflow to succeed: This can happen due to concurrent commits. To get a fresh workflow run, either:
|
| try: | ||
| sync_cdc_extraction_schedule(instance, create=True) | ||
| ensure_cdc_slot_cleanup_schedule() | ||
| except Exception as e: | ||
| logger.exception("Could not create CDC schedules after enable_cdc", exc_info=e) |
There was a problem hiding this comment.
If CDC schedule creation fails after successfully writing cdc_enabled=True to job_inputs, the source will be marked as CDC-enabled but have no active schedule. The operation returns success (line 2060) despite the failure, leaving the system in an inconsistent state where CDC appears enabled but won't actually run.
# Current code silently logs and continues:
try:
sync_cdc_extraction_schedule(instance, create=True)
ensure_cdc_slot_cleanup_schedule()
except Exception as e:
logger.exception("Could not create CDC schedules after enable_cdc", exc_info=e)
# Should fail the operation if schedules can't be created:
try:
sync_cdc_extraction_schedule(instance, create=True)
ensure_cdc_slot_cleanup_schedule()
except Exception as e:
logger.exception("Could not create CDC schedules after enable_cdc", exc_info=e)
# Roll back the CDC config
job_inputs = dict(instance.job_inputs or {})
for key in list(job_inputs.keys()):
if key.startswith("cdc_"):
job_inputs.pop(key, None)
instance.job_inputs = job_inputs
instance.save(update_fields=["job_inputs", "updated_at"])
return Response(
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
data={"message": f"CDC resources created but schedule setup failed: {e}"},
)| try: | |
| sync_cdc_extraction_schedule(instance, create=True) | |
| ensure_cdc_slot_cleanup_schedule() | |
| except Exception as e: | |
| logger.exception("Could not create CDC schedules after enable_cdc", exc_info=e) | |
| try: | |
| sync_cdc_extraction_schedule(instance, create=True) | |
| ensure_cdc_slot_cleanup_schedule() | |
| except Exception as e: | |
| logger.exception("Could not create CDC schedules after enable_cdc", exc_info=e) | |
| # Roll back the CDC config | |
| job_inputs = dict(instance.job_inputs or {}) | |
| for key in list(job_inputs.keys()): | |
| if key.startswith("cdc_"): | |
| job_inputs.pop(key, None) | |
| instance.job_inputs = job_inputs | |
| instance.save(update_fields=["job_inputs", "updated_at"]) | |
| return Response( | |
| status=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| data={"message": f"CDC resources created but schedule setup failed: {e}"}, | |
| ) | |
Spotted by Graphite
Is this helpful? React 👍 or 👎 to let us know.
Tables discovered after source creation (via "refresh schemas") had their detected primary key dropped — reconcile_postgres_schemas wrote schema_metadata but never stored primary_key_columns. Switching such a table to CDC then failed with "refresh schema discovery to pick one up", which never actually worked because refresh didn't persist the PK. Persist source_schema.detected_primary_keys into sync_type_config.primary_key_columns during reconcile (without clobbering an existing/overridden value), so the error message's guidance holds and post-creation tables can be switched to CDC. Generated-By: PostHog Code Task-Id: fcaae539-62ef-4f3c-ae16-27bc684d8446
The CDC sync_type branch overwrote primary_key_columns unconditionally, unlike the incremental branch which refuses a PK change once a table has materialized. Since CDC uses the PK as the UPDATE/DELETE merge key, changing it mid-sync would corrupt dedup. Apply the same guard: reject when the PK differs and a synced table exists. Generated-By: PostHog Code Task-Id: fcaae539-62ef-4f3c-ae16-27bc684d8446
The pgoutput decoder emitted bare table names (cdc_test_orders) on every ChangeEvent, but the CDC extraction activity filters and keys against the qualified ExternalDataSchema.name (public.cdc_test_orders). Result: every change was dropped, event_count stayed 0, and the slot was never advanced — CDC streaming silently no-op'd for any schema with a qualified name (i.e. every source created after Postgres schema names were qualified). The replication slot just accumulated WAL forever. Emit schema-qualified `schema.table` from the decoder so events match the schema names the activity keys on, end to end (filter, schema_by_name, batcher, truncate handling). Generated-By: PostHog Code Task-Id: fcaae539-62ef-4f3c-ae16-27bc684d8446
|
|
||
| cdc_error = self._setup_cdc_resources(adapter, instance, request.data) | ||
| if cdc_error is not None: | ||
| return Response( | ||
| status=status.HTTP_400_BAD_REQUEST, | ||
| data={"message": cdc_error}, | ||
| ) | ||
|
|
||
| # Ensure the CDC extraction schedule + global cleanup schedule exist. No CDC | ||
| # schemas yet (user must switch sync_type=cdc on the Sync tab afterward), so | ||
| # `sync_cdc_extraction_schedule` no-ops or creates a paused schedule — that's | ||
| # fine, it'll start running on the next schema update. | ||
| try: |
| return Response(status=status.HTTP_200_OK, data={"success": True, "already_disabled": True}) | ||
|
|
||
| # Cancel ALL running workflows first — any one holding the slot fails pg_drop_replication_slot. | ||
| running_jobs = ExternalDataJob.objects.filter( |
There was a problem hiding this comment.
we should probably filter by cdc here
- disable_cdc: only cancel running jobs belonging to CDC schemas, not unrelated incremental/full-refresh syncs on the same source (reviewer: filter by cdc). - enable_cdc: capture (not just log) CDC schedule-creation failures and return schedules_ready so a failure isn't reported as clean success. The extraction schedule is authoritatively (re)created on the first CDC schema toggle, so this can't strand a "CDC on, never runs" source. - CDCSection self-managed SQL: escape embedded double-quotes in all identifiers (schema/table/user/publication) so a name containing `"` can't inject extra SQL into the generated copy-paste script. Generated-By: PostHog Code Task-Id: fcaae539-62ef-4f3c-ae16-27bc684d8446
|
Addressed the latest review (commit bd0296d):
Earlier findings already shipped: stale prereq banner cleared on publication-name change, rollback no longer drops pre-existing slots/publications (pre-flight existence check), and the CDC streaming no-op (decoder emitted bare table names vs qualified schema names) — fixed and verified live (slot drains, Local postgres is down on my box so the DB-backed enable/disable tests run in CI, not locally; decoder + frontend typecheck/lint pass locally. |
…it-config # Conflicts: # products/data_warehouse/backend/api/external_data_source.py
Query snapshots: Backend query snapshots updatedChanges: 1 snapshots (0 modified, 1 added, 0 deleted) What this means:
Next steps:
|
…sponse Generated-By: PostHog Code Task-Id: fcaae539-62ef-4f3c-ae16-27bc684d8446
|
Fixed (commit d9b6edd): the |
|
🎭 Playwright report · View test results →
These issues are not necessarily caused by your changes. |
…reation (#60708) Co-authored-by: tests-posthog[bot] <250237707+tests-posthog[bot]@users.noreply.github.com>
Problem
CDC was a one-shot decision at source creation. Users who wanted to flip it on later, turn it off, or tune lag thresholds had to delete and recreate the source — losing schedule + schema history. The Configuration tab on
data-management/sources/managed-<id>/configurationhad no CDC controls.Changes
Backend — three new viewset actions on
ExternalDataSource:POST /enable_cdc/— re-runs prereq checks server-side, provisions engine-side CDC resources, writes config tojob_inputs, ensures the extraction schedule exists. 409 on already-enabled, 403 when team flag off.POST /disable_cdc/— cancels any running CDC workflow, deletes the extraction schedule, delegates engine-side teardown to the adapter (drops slot + publication for Postgres, drops slot only for self-managed), soft-deletes_cdccompanion tables, and forces CDC schemas to pick a new strategy by clearingsync_typeand settingshould_sync=False. Clears everycdc_*key fromjob_inputsso re-enable starts clean.POST /update_cdc_settings/— editscdc_auto_drop_slotand lag thresholds with cross-field validation (warn < crit). Universal across engines; the other CDC fields stay immutable post-enable.Postgres-agnostic abstraction —
CDCSourceAdapterProtocol (posthog/temporal/data_imports/cdc/adapters.py) gainssetup_resources(source, payload) -> tuple[dict, str | None]andcleanup_resources(source) -> None. The viewset gates new actions viaget_cdc_adapter(source)— returns 400 with a generic "CDC is not supported for source type: X" message instead of hard-codingif source.source_type != POSTGRES. Adding a future engine (MySQL binlog, etc.) means adding an adapter, no viewset changes.Partial-state rollback —
PostgresCDCAdapter.setup_resourcesnow drops any partially-created slot/publication on failure. PostHog-managed mode usesdrop_slot_and_publication; self-managed mode uses a newdrop_slothelper so we never touch the customer-owned publication.Frontend — new
CDCSectioncomponent on the Configuration tab (gated on Postgres + warehouse +DWH_POSTGRES_CDCflag). Shows enabled-state (mode/slot/pub readouts + editable lag fields) or disabled-state (mode picker + prereq check + enable). Every save/enable/disable goes throughLemonDialog.openconfirmation. Buttons wrapped inAccessControlAction(Editor level). API client getsenable_cdc,disable_cdc,update_cdc_settings.How did you test this code?
I'm an agent. Automated coverage:
test_external_data_source.pycovering: non-CDC-source rejection, team flag gating, already-enabled (409), invalid management mode (parameterized), prereq failures, prereq connection exceptions, posthog-managed success, self-managed publication forwarding, slot-setup failure (no source delete), posthog rollback on partial slot failure, self-managed rollback (slot-only, never the customer-owned publication), running-workflow cancellation, schedule + slot cleanup ordering, atomic schema pause + cdc_* key clearing, companion_cdctable soft-deletion, partial-update preservation, threshold validation (warn vs persisted crit), no-op empty payload, bool coercion.Manual tests:
Did the same for a source with CDC enabled during creation. Everything worked.
Before enabling:

After enabling:

Automatic notifications
Docs update
🤖 Agent context
PostHog Code agent (Claude Opus 4.7). Conversation built up in stages:
pg_drop_replication_slotwhile active), stalecdc_consistent_pointcorrupting resume tracking if re-enabled, schedule + companion table cleanup. User picked "Option A" disable semantics (force schema re-pick) over silent fallback to incremental.EncryptedJSONFieldround-trips scalar values as strings, which is now baked into the test assertions.CDCSourceAdapterProtocol and extended it withsetup_resources+cleanup_resourcesrather than inventing a new abstraction. Viewset now adapter-only.@patch("source_module.X")no longer works oncefrom source_module import Xis at the top of the consumer). Patches updated to target the viewset module.master, one file conflicted on imports + thecreate()CDC flow. Upstream had restructuredcreate()to move CDC setup after PK validation and folded the source-type check intocdc_enableditself — adopted that ordering, kept the adapter-based call.User confirmed each major design decision before implementation (option A semantics, nice-to-haves bundle, modal-on-every-save UX).
Created with PostHog Code