Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
513b476
feat(data-warehouse): allow enabling and disabling CDC after source c…
danielcarletti May 29, 2026
8b95a3e
fix(data-warehouse): check CDC prerequisites using stored source cred…
danielcarletti May 29, 2026
38ad970
feat(data-warehouse): show self-managed CDC setup SQL on config page
danielcarletti May 29, 2026
f4376ca
fix(data-warehouse): expose CDC state on config page + show live status
danielcarletti May 29, 2026
f814647
fix(data-warehouse): fetch CDC status via kea logic to avoid team-con…
danielcarletti May 29, 2026
f709c8d
Merge origin/master + address PR review
danielcarletti May 29, 2026
fdd009b
chore: update OpenAPI generated types
tests-posthog[bot] May 29, 2026
e89ba99
chore(data-warehouse): condense verbose CDC comments
danielcarletti May 29, 2026
e66263a
fix(data-warehouse): persist detected primary key on schema refresh
danielcarletti May 29, 2026
ca9af33
fix(data-warehouse): block CDC primary key change after data has synced
danielcarletti May 29, 2026
f10dbb9
fix(data-warehouse): qualify CDC change-event table names
danielcarletti May 29, 2026
d9f858f
Merge branch 'master' into posthog-code/cdc-edit-config
danielcarletti May 29, 2026
bd0296d
fix(data-warehouse): address CDC PR review
danielcarletti May 30, 2026
b2fc86d
Merge remote-tracking branch 'origin/master' into posthog-code/cdc-ed…
danielcarletti Jun 1, 2026
76c9019
test(mcp): update unit test snapshots
tests-posthog[bot] Jun 1, 2026
119957c
chore: update OpenAPI generated types
tests-posthog[bot] Jun 1, 2026
d9b6edd
test(data-warehouse): assert schedules_ready in enable_cdc success re…
danielcarletti Jun 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions frontend/src/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5582,6 +5582,64 @@ const api = {
.withAction('check_cdc_prerequisites')
.create({ data: payload })
},
async check_cdc_prerequisites_for_source(
sourceId: ExternalDataSource['id'],
payload: {
cdc_management_mode: 'posthog' | 'self_managed'
cdc_slot_name?: string | null
cdc_publication_name?: string | null
}
): Promise<{ valid: boolean; errors: string[] }> {
return await new ApiRequest()
.externalDataSource(sourceId)
.withAction('check_cdc_prerequisites_for_source')
.create({ data: payload })
},
async enable_cdc(
sourceId: ExternalDataSource['id'],
payload: {
cdc_management_mode: 'posthog' | 'self_managed'
cdc_slot_name?: string | null
cdc_publication_name?: string | null
cdc_auto_drop_slot?: boolean
cdc_lag_warning_threshold_mb?: number
cdc_lag_critical_threshold_mb?: number
}
): Promise<{ success: boolean }> {
return await new ApiRequest()
.externalDataSource(sourceId)
.withAction('enable_cdc')
.create({ data: payload })
},
async disable_cdc(sourceId: ExternalDataSource['id']): Promise<{ success: boolean }> {
return await new ApiRequest().externalDataSource(sourceId).withAction('disable_cdc').create()
},
async cdc_status(sourceId: ExternalDataSource['id']): Promise<{
enabled: boolean
management_mode?: 'posthog' | 'self_managed'
slot_name?: string
publication_name?: string
lag_warning_threshold_mb?: number
lag_critical_threshold_mb?: number
slot_exists?: boolean
publication_exists?: boolean
lag_bytes?: number | null
}> {
return await new ApiRequest().externalDataSource(sourceId).withAction('cdc_status').get()
},
async update_cdc_settings(
sourceId: ExternalDataSource['id'],
payload: {
cdc_auto_drop_slot?: boolean
cdc_lag_warning_threshold_mb?: number
cdc_lag_critical_threshold_mb?: number
}
): Promise<{ success: boolean }> {
return await new ApiRequest()
.externalDataSource(sourceId)
.withAction('update_cdc_settings')
.create({ data: payload })
},
async jobs(
sourceId: ExternalDataSource['id'],
before: string | null,
Expand Down
44 changes: 44 additions & 0 deletions posthog/temporal/data_imports/cdc/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,50 @@ def get_lag_bytes(self, conn: Any, slot_name: str) -> int | None: ...

def parse_cdc_config(self, source: ExternalDataSource) -> CDCConfigT_co: ...

def setup_resources(
self,
source: ExternalDataSource,
payload: dict[str, Any],
) -> tuple[dict[str, Any], str | None]:
"""Provision engine-side CDC resources for the source.

Reads management mode, identifiers (slot/publication, binlog channel, …),
and any engine-specific knobs from ``payload``. Returns either
``(resource_dict, None)`` where ``resource_dict`` contains the CDC
identifiers + metadata to merge into ``source.job_inputs`` (already keyed
with ``cdc_*`` prefixes), or ``({}, error_message)`` describing what
failed. On failure the adapter best-effort rolls back partial state.
"""
...

def cleanup_resources(self, source: ExternalDataSource) -> None:
"""Drop engine-side CDC resources owned by PostHog for the source.

Best-effort: logs and continues on errors. Must NOT touch resources owned
by the customer (e.g. self-managed publications). No-op when the source
has no CDC config or no PostHog-owned resources to drop.
"""
...

def get_status(self, source: ExternalDataSource) -> dict[str, Any]:
"""Live engine-side CDC health for the source, read from the source DB.

Opens a short management connection and returns at minimum
``{"slot_exists": bool, "publication_exists": bool, "lag_bytes": int | None}``.
Engines may add extra fields. Raises on connection failure — the caller
surfaces that as a 400 / unreachable state.
"""
...

def add_table(self, source: ExternalDataSource, schema: str, table: str) -> None:
"""Best-effort include a table in the change-capture set (PG: ALTER PUBLICATION ADD TABLE).
No-op when PostHog doesn't own the capture definition (e.g. self-managed)."""
...

def remove_table(self, source: ExternalDataSource, schema: str, table: str) -> None:
"""Best-effort exclude a table from the change-capture set. Inverse of ``add_table``."""
...


def get_cdc_adapter(source: ExternalDataSource) -> CDCSourceAdapter[CDCConfig]:
"""Return the CDC adapter for the given source's type.
Expand Down
162 changes: 156 additions & 6 deletions posthog/temporal/data_imports/sources/postgres/cdc/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,32 @@

from __future__ import annotations

import logging
from collections.abc import Iterator
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Literal

from posthog.temporal.data_imports.sources.postgres.cdc.config import PostgresCDCConfig
from posthog.temporal.data_imports.sources.postgres.cdc.slot_manager import (
add_table_to_publication,
cdc_pg_connection,
create_slot,
create_slot_and_publication,
drop_slot,
drop_slot_and_publication,
get_slot_lag_bytes,
publication_exists,
remove_table_from_publication,
slot_exists,
)

if TYPE_CHECKING:
from posthog.temporal.data_imports.cdc.types import CDCStreamReader

from products.warehouse_sources.backend.models.external_data_source import ExternalDataSource

logger = logging.getLogger(__name__)


class PostgresCDCAdapter:
def parse_cdc_config(self, source: ExternalDataSource) -> PostgresCDCConfig:
Expand Down Expand Up @@ -40,8 +55,6 @@ def create_reader(self, source: ExternalDataSource) -> CDCStreamReader:

@contextmanager
def management_connection(self, source: ExternalDataSource, connect_timeout: int = 15) -> Iterator[Any]:
from posthog.temporal.data_imports.sources.postgres.cdc.slot_manager import cdc_pg_connection

with cdc_pg_connection(source, connect_timeout=connect_timeout) as conn:
yield conn

Expand All @@ -67,11 +80,148 @@ def validate_prerequisites(
)

def drop_resources(self, conn: Any, slot_name: str, pub_name: str) -> None:
from posthog.temporal.data_imports.sources.postgres.cdc.slot_manager import drop_slot_and_publication

drop_slot_and_publication(conn, slot_name, pub_name)

def get_lag_bytes(self, conn: Any, slot_name: str) -> int | None:
from posthog.temporal.data_imports.sources.postgres.cdc.slot_manager import get_slot_lag_bytes

return get_slot_lag_bytes(conn, slot_name)

def setup_resources(
self,
source: ExternalDataSource,
payload: dict[str, Any],
) -> tuple[dict[str, Any], str | None]:
"""Create the logical replication slot (and the publication, for PostHog-managed
mode) on the source database. Returns the cdc_* fields the caller should merge
into ``source.job_inputs``, or an error string. Cleans up partial state on failure.
"""
management_mode: Literal["posthog", "self_managed"] = (
"self_managed" if payload.get("cdc_management_mode") == "self_managed" else "posthog"
)
slot_name = payload.get("cdc_slot_name") or f"posthog_{source.id.hex[:12]}"
default_pub_name = "posthog_pub" if management_mode == "self_managed" else f"posthog_pub_{source.id.hex[:12]}"
pub_name = payload.get("cdc_publication_name") or default_pub_name

schema = self._resolve_schema(source)

resource_fields: dict[str, Any] = {
"cdc_management_mode": management_mode,
"cdc_slot_name": slot_name,
"cdc_publication_name": pub_name,
}

if management_mode == "posthog":
try:
with cdc_pg_connection(source) as conn:
# Bail on pre-existing names so the rollback below only ever drops what we created.
if slot_exists(conn, slot_name):
return {}, f"A replication slot named '{slot_name}' already exists on your database."
if publication_exists(conn, pub_name):
return {}, f"A publication named '{pub_name}' already exists on your database."
resource_fields["cdc_consistent_point"] = create_slot_and_publication(
conn, slot_name, pub_name, schema, tables=[]
)
except Exception as e:
logger.exception("Failed to create CDC slot and publication: %s", e)
# Both verified absent above, so dropping anything present now is safe.
try:
with cdc_pg_connection(source, connect_timeout=10) as conn:
drop_slot_and_publication(conn, slot_name, pub_name)
Comment thread
veria-ai[bot] marked this conversation as resolved.
except Exception as rollback_error:
logger.exception("Failed to roll back partial CDC slot/publication: %s", rollback_error)
return {}, f"Failed to create replication slot: {e}"
return resource_fields, None

# self_managed: the publication is customer-owned; PostHog only creates the slot.
try:
with cdc_pg_connection(source) as conn:
if slot_exists(conn, slot_name):
return {}, f"A replication slot named '{slot_name}' already exists on your database."
if not publication_exists(conn, pub_name):
return (
{},
(
f"Publication '{pub_name}' does not exist. Run the CREATE PUBLICATION "
f"statement we showed you, then retry."
),
)
resource_fields["cdc_consistent_point"] = create_slot(conn, slot_name)
except Exception as e:
logger.exception("Failed to set up self-managed CDC slot: %s", e)
# Slot only — the publication is customer-owned.
try:
with cdc_pg_connection(source, connect_timeout=10) as conn:
drop_slot(conn, slot_name)
except Exception as rollback_error:
logger.exception("Failed to roll back partial self-managed CDC slot: %s", rollback_error)
return {}, f"Failed to create replication slot: {e}"
return resource_fields, None

def cleanup_resources(self, source: ExternalDataSource) -> None:
"""Drop the PostHog-managed replication slot (and publication, for PostHog-managed
mode) on the source database. Self-managed mode drops only the slot — the
publication is customer-owned. Best-effort: logs and continues on errors.
"""
cdc_config = self.parse_cdc_config(source)
if not cdc_config.enabled or not cdc_config.slot_name:
return
try:
with cdc_pg_connection(source, connect_timeout=10) as conn:
if cdc_config.management_mode == "self_managed":
drop_slot(conn, cdc_config.slot_name)
elif cdc_config.publication_name:
drop_slot_and_publication(conn, cdc_config.slot_name, cdc_config.publication_name)
except Exception:
logger.exception(
"Failed to drop CDC slot/publication on source DB (best-effort), source_id=%s slot=%s",
source.id,
cdc_config.slot_name,
)

def get_status(self, source: ExternalDataSource) -> dict[str, Any]:
"""Read live slot/publication existence and WAL lag from the source DB."""
cdc_config = self.parse_cdc_config(source)
with cdc_pg_connection(source, connect_timeout=10) as conn:
slot_present = slot_exists(conn, cdc_config.slot_name) if cdc_config.slot_name else False
pub_present = (
publication_exists(conn, cdc_config.publication_name) if cdc_config.publication_name else False
)
lag_bytes = get_slot_lag_bytes(conn, cdc_config.slot_name) if cdc_config.slot_name else None
return {
"slot_exists": slot_present,
"publication_exists": pub_present,
"lag_bytes": lag_bytes,
}

def add_table(self, source: ExternalDataSource, schema: str, table: str) -> None:
"""Best-effort ALTER PUBLICATION ADD TABLE. No-op for self-managed / no publication."""
self._alter_publication_membership(source, schema, table, add=True)

def remove_table(self, source: ExternalDataSource, schema: str, table: str) -> None:
"""Best-effort ALTER PUBLICATION DROP TABLE. No-op for self-managed / no publication."""
self._alter_publication_membership(source, schema, table, add=False)

def _alter_publication_membership(self, source: ExternalDataSource, schema: str, table: str, add: bool) -> None:
cdc_config = self.parse_cdc_config(source)
# PostHog only manages the publication in posthog-managed mode.
if cdc_config.management_mode != "posthog" or not cdc_config.publication_name:
return
try:
with cdc_pg_connection(source) as conn:
if add:
add_table_to_publication(conn, cdc_config.publication_name, schema, table)
else:
remove_table_from_publication(conn, cdc_config.publication_name, schema, table)
except Exception:
logger.exception(
"Failed to %s table %s.%s %s CDC publication '%s' (best-effort), source_id=%s",
"add" if add else "remove",
schema,
table,
"to" if add else "from",
cdc_config.publication_name,
source.id,
)

def _resolve_schema(self, source: ExternalDataSource) -> str:
raw = (source.job_inputs or {}).get("schema")
return raw.strip() if isinstance(raw, str) and raw.strip() else "public"
15 changes: 11 additions & 4 deletions posthog/temporal/data_imports/sources/postgres/cdc/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ class Relation:
replica_identity: int # 0=default, 1=nothing, 2=full, 3=index
columns: list[RelationColumn] = field(default_factory=list)

@property
def qualified_name(self) -> str:
"""`schema.table` — matches the qualified `ExternalDataSchema.name` the CDC
extraction activity keys on. Emitting the bare table name silently dropped
every change for qualified (multi-schema-era) schemas."""
return f"{self.schema_name}.{self.table_name}"


class PgOutputDecoder:
"""Stateful decoder for pgoutput binary messages.
Expand Down Expand Up @@ -231,7 +238,7 @@ def _handle_insert(self, payload: bytes, lsn: str) -> None:
self._tx_buffer.append(
ChangeEvent(
operation="I",
table_name=relation.table_name,
table_name=relation.qualified_name,
position_serialized=lsn,
timestamp=self._tx_timestamp or datetime.now(tz=UTC),
columns=columns,
Expand Down Expand Up @@ -268,7 +275,7 @@ def _handle_update(self, payload: bytes, lsn: str) -> None:
self._tx_buffer.append(
ChangeEvent(
operation="U",
table_name=relation.table_name,
table_name=relation.qualified_name,
position_serialized=lsn,
timestamp=self._tx_timestamp or datetime.now(tz=UTC),
columns=columns,
Expand Down Expand Up @@ -298,7 +305,7 @@ def _handle_delete(self, payload: bytes, lsn: str) -> None:
self._tx_buffer.append(
ChangeEvent(
operation="D",
table_name=relation.table_name,
table_name=relation.qualified_name,
position_serialized=lsn,
timestamp=self._tx_timestamp or datetime.now(tz=UTC),
columns=columns,
Expand All @@ -321,7 +328,7 @@ def _handle_truncate(self, payload: bytes) -> None:
relation.schema_name,
relation.table_name,
)
self._truncated_tables.append(relation.table_name)
self._truncated_tables.append(relation.qualified_name)

# --- Helpers ---

Expand Down
19 changes: 13 additions & 6 deletions posthog/temporal/data_imports/sources/postgres/cdc/slot_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,10 @@ def create_slot(conn: psycopg.Connection, slot_name: str) -> str:
return consistent_point


def drop_slot_and_publication(
conn: psycopg.Connection,
slot_name: str,
pub_name: str,
) -> None:
"""Drop a replication slot and publication. Best-effort — logs and continues on errors."""
def drop_slot(conn: psycopg.Connection, slot_name: str) -> None:
"""Drop just the replication slot. Best-effort — used by self-managed rollback,
where the publication is customer-owned and must not be touched.
"""
with conn.cursor() as cur:
try:
cur.execute(
Expand All @@ -128,6 +126,15 @@ def drop_slot_and_publication(
conn.rollback()
logger.exception("Failed to drop replication slot '%s'", slot_name)


def drop_slot_and_publication(
conn: psycopg.Connection,
slot_name: str,
pub_name: str,
) -> None:
"""Drop a replication slot and publication. Best-effort — logs and continues on errors."""
drop_slot(conn, slot_name)
with conn.cursor() as cur:
try:
cur.execute(
sql.SQL("DROP PUBLICATION IF EXISTS {}").format(sql.Identifier(pub_name)),
Expand Down
Loading
Loading