diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index 9df91cbd00af..33a18de72890 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -5582,6 +5582,64 @@ const api = { .withAction('check_cdc_prerequisites') .create({ data: payload }) }, + async check_cdc_prerequisites_for_source( + sourceId: ExternalDataSource['id'], + payload: { + cdc_management_mode: 'posthog' | 'self_managed' + cdc_slot_name?: string | null + cdc_publication_name?: string | null + } + ): Promise<{ valid: boolean; errors: string[] }> { + return await new ApiRequest() + .externalDataSource(sourceId) + .withAction('check_cdc_prerequisites_for_source') + .create({ data: payload }) + }, + async enable_cdc( + sourceId: ExternalDataSource['id'], + payload: { + cdc_management_mode: 'posthog' | 'self_managed' + cdc_slot_name?: string | null + cdc_publication_name?: string | null + cdc_auto_drop_slot?: boolean + cdc_lag_warning_threshold_mb?: number + cdc_lag_critical_threshold_mb?: number + } + ): Promise<{ success: boolean }> { + return await new ApiRequest() + .externalDataSource(sourceId) + .withAction('enable_cdc') + .create({ data: payload }) + }, + async disable_cdc(sourceId: ExternalDataSource['id']): Promise<{ success: boolean }> { + return await new ApiRequest().externalDataSource(sourceId).withAction('disable_cdc').create() + }, + async cdc_status(sourceId: ExternalDataSource['id']): Promise<{ + enabled: boolean + management_mode?: 'posthog' | 'self_managed' + slot_name?: string + publication_name?: string + lag_warning_threshold_mb?: number + lag_critical_threshold_mb?: number + slot_exists?: boolean + publication_exists?: boolean + lag_bytes?: number | null + }> { + return await new ApiRequest().externalDataSource(sourceId).withAction('cdc_status').get() + }, + async update_cdc_settings( + sourceId: ExternalDataSource['id'], + payload: { + cdc_auto_drop_slot?: boolean + cdc_lag_warning_threshold_mb?: number + cdc_lag_critical_threshold_mb?: number + } + ): Promise<{ success: boolean }> { + return await new ApiRequest() + .externalDataSource(sourceId) + .withAction('update_cdc_settings') + .create({ data: payload }) + }, async jobs( sourceId: ExternalDataSource['id'], before: string | null, diff --git a/posthog/temporal/data_imports/cdc/adapters.py b/posthog/temporal/data_imports/cdc/adapters.py index 1e7daef33970..d83e8289e4e3 100644 --- a/posthog/temporal/data_imports/cdc/adapters.py +++ b/posthog/temporal/data_imports/cdc/adapters.py @@ -76,6 +76,50 @@ def get_lag_bytes(self, conn: Any, slot_name: str) -> int | None: ... def parse_cdc_config(self, source: ExternalDataSource) -> CDCConfigT_co: ... + def setup_resources( + self, + source: ExternalDataSource, + payload: dict[str, Any], + ) -> tuple[dict[str, Any], str | None]: + """Provision engine-side CDC resources for the source. + + Reads management mode, identifiers (slot/publication, binlog channel, …), + and any engine-specific knobs from ``payload``. Returns either + ``(resource_dict, None)`` where ``resource_dict`` contains the CDC + identifiers + metadata to merge into ``source.job_inputs`` (already keyed + with ``cdc_*`` prefixes), or ``({}, error_message)`` describing what + failed. On failure the adapter best-effort rolls back partial state. + """ + ... + + def cleanup_resources(self, source: ExternalDataSource) -> None: + """Drop engine-side CDC resources owned by PostHog for the source. + + Best-effort: logs and continues on errors. Must NOT touch resources owned + by the customer (e.g. self-managed publications). No-op when the source + has no CDC config or no PostHog-owned resources to drop. + """ + ... + + def get_status(self, source: ExternalDataSource) -> dict[str, Any]: + """Live engine-side CDC health for the source, read from the source DB. + + Opens a short management connection and returns at minimum + ``{"slot_exists": bool, "publication_exists": bool, "lag_bytes": int | None}``. + Engines may add extra fields. Raises on connection failure — the caller + surfaces that as a 400 / unreachable state. + """ + ... + + def add_table(self, source: ExternalDataSource, schema: str, table: str) -> None: + """Best-effort include a table in the change-capture set (PG: ALTER PUBLICATION ADD TABLE). + No-op when PostHog doesn't own the capture definition (e.g. self-managed).""" + ... + + def remove_table(self, source: ExternalDataSource, schema: str, table: str) -> None: + """Best-effort exclude a table from the change-capture set. Inverse of ``add_table``.""" + ... + def get_cdc_adapter(source: ExternalDataSource) -> CDCSourceAdapter[CDCConfig]: """Return the CDC adapter for the given source's type. diff --git a/posthog/temporal/data_imports/sources/postgres/cdc/adapter.py b/posthog/temporal/data_imports/sources/postgres/cdc/adapter.py index 923a223278ef..6e0ca7c38e7c 100644 --- a/posthog/temporal/data_imports/sources/postgres/cdc/adapter.py +++ b/posthog/temporal/data_imports/sources/postgres/cdc/adapter.py @@ -2,17 +2,32 @@ from __future__ import annotations +import logging from collections.abc import Iterator from contextlib import contextmanager from typing import TYPE_CHECKING, Any, Literal from posthog.temporal.data_imports.sources.postgres.cdc.config import PostgresCDCConfig +from posthog.temporal.data_imports.sources.postgres.cdc.slot_manager import ( + add_table_to_publication, + cdc_pg_connection, + create_slot, + create_slot_and_publication, + drop_slot, + drop_slot_and_publication, + get_slot_lag_bytes, + publication_exists, + remove_table_from_publication, + slot_exists, +) if TYPE_CHECKING: from posthog.temporal.data_imports.cdc.types import CDCStreamReader from products.warehouse_sources.backend.models.external_data_source import ExternalDataSource +logger = logging.getLogger(__name__) + class PostgresCDCAdapter: def parse_cdc_config(self, source: ExternalDataSource) -> PostgresCDCConfig: @@ -40,8 +55,6 @@ def create_reader(self, source: ExternalDataSource) -> CDCStreamReader: @contextmanager def management_connection(self, source: ExternalDataSource, connect_timeout: int = 15) -> Iterator[Any]: - from posthog.temporal.data_imports.sources.postgres.cdc.slot_manager import cdc_pg_connection - with cdc_pg_connection(source, connect_timeout=connect_timeout) as conn: yield conn @@ -67,11 +80,148 @@ def validate_prerequisites( ) def drop_resources(self, conn: Any, slot_name: str, pub_name: str) -> None: - from posthog.temporal.data_imports.sources.postgres.cdc.slot_manager import drop_slot_and_publication - drop_slot_and_publication(conn, slot_name, pub_name) def get_lag_bytes(self, conn: Any, slot_name: str) -> int | None: - from posthog.temporal.data_imports.sources.postgres.cdc.slot_manager import get_slot_lag_bytes - return get_slot_lag_bytes(conn, slot_name) + + def setup_resources( + self, + source: ExternalDataSource, + payload: dict[str, Any], + ) -> tuple[dict[str, Any], str | None]: + """Create the logical replication slot (and the publication, for PostHog-managed + mode) on the source database. Returns the cdc_* fields the caller should merge + into ``source.job_inputs``, or an error string. Cleans up partial state on failure. + """ + management_mode: Literal["posthog", "self_managed"] = ( + "self_managed" if payload.get("cdc_management_mode") == "self_managed" else "posthog" + ) + slot_name = payload.get("cdc_slot_name") or f"posthog_{source.id.hex[:12]}" + default_pub_name = "posthog_pub" if management_mode == "self_managed" else f"posthog_pub_{source.id.hex[:12]}" + pub_name = payload.get("cdc_publication_name") or default_pub_name + + schema = self._resolve_schema(source) + + resource_fields: dict[str, Any] = { + "cdc_management_mode": management_mode, + "cdc_slot_name": slot_name, + "cdc_publication_name": pub_name, + } + + if management_mode == "posthog": + try: + with cdc_pg_connection(source) as conn: + # Bail on pre-existing names so the rollback below only ever drops what we created. + if slot_exists(conn, slot_name): + return {}, f"A replication slot named '{slot_name}' already exists on your database." + if publication_exists(conn, pub_name): + return {}, f"A publication named '{pub_name}' already exists on your database." + resource_fields["cdc_consistent_point"] = create_slot_and_publication( + conn, slot_name, pub_name, schema, tables=[] + ) + except Exception as e: + logger.exception("Failed to create CDC slot and publication: %s", e) + # Both verified absent above, so dropping anything present now is safe. + try: + with cdc_pg_connection(source, connect_timeout=10) as conn: + drop_slot_and_publication(conn, slot_name, pub_name) + except Exception as rollback_error: + logger.exception("Failed to roll back partial CDC slot/publication: %s", rollback_error) + return {}, f"Failed to create replication slot: {e}" + return resource_fields, None + + # self_managed: the publication is customer-owned; PostHog only creates the slot. + try: + with cdc_pg_connection(source) as conn: + if slot_exists(conn, slot_name): + return {}, f"A replication slot named '{slot_name}' already exists on your database." + if not publication_exists(conn, pub_name): + return ( + {}, + ( + f"Publication '{pub_name}' does not exist. Run the CREATE PUBLICATION " + f"statement we showed you, then retry." + ), + ) + resource_fields["cdc_consistent_point"] = create_slot(conn, slot_name) + except Exception as e: + logger.exception("Failed to set up self-managed CDC slot: %s", e) + # Slot only — the publication is customer-owned. + try: + with cdc_pg_connection(source, connect_timeout=10) as conn: + drop_slot(conn, slot_name) + except Exception as rollback_error: + logger.exception("Failed to roll back partial self-managed CDC slot: %s", rollback_error) + return {}, f"Failed to create replication slot: {e}" + return resource_fields, None + + def cleanup_resources(self, source: ExternalDataSource) -> None: + """Drop the PostHog-managed replication slot (and publication, for PostHog-managed + mode) on the source database. Self-managed mode drops only the slot — the + publication is customer-owned. Best-effort: logs and continues on errors. + """ + cdc_config = self.parse_cdc_config(source) + if not cdc_config.enabled or not cdc_config.slot_name: + return + try: + with cdc_pg_connection(source, connect_timeout=10) as conn: + if cdc_config.management_mode == "self_managed": + drop_slot(conn, cdc_config.slot_name) + elif cdc_config.publication_name: + drop_slot_and_publication(conn, cdc_config.slot_name, cdc_config.publication_name) + except Exception: + logger.exception( + "Failed to drop CDC slot/publication on source DB (best-effort), source_id=%s slot=%s", + source.id, + cdc_config.slot_name, + ) + + def get_status(self, source: ExternalDataSource) -> dict[str, Any]: + """Read live slot/publication existence and WAL lag from the source DB.""" + cdc_config = self.parse_cdc_config(source) + with cdc_pg_connection(source, connect_timeout=10) as conn: + slot_present = slot_exists(conn, cdc_config.slot_name) if cdc_config.slot_name else False + pub_present = ( + publication_exists(conn, cdc_config.publication_name) if cdc_config.publication_name else False + ) + lag_bytes = get_slot_lag_bytes(conn, cdc_config.slot_name) if cdc_config.slot_name else None + return { + "slot_exists": slot_present, + "publication_exists": pub_present, + "lag_bytes": lag_bytes, + } + + def add_table(self, source: ExternalDataSource, schema: str, table: str) -> None: + """Best-effort ALTER PUBLICATION ADD TABLE. No-op for self-managed / no publication.""" + self._alter_publication_membership(source, schema, table, add=True) + + def remove_table(self, source: ExternalDataSource, schema: str, table: str) -> None: + """Best-effort ALTER PUBLICATION DROP TABLE. No-op for self-managed / no publication.""" + self._alter_publication_membership(source, schema, table, add=False) + + def _alter_publication_membership(self, source: ExternalDataSource, schema: str, table: str, add: bool) -> None: + cdc_config = self.parse_cdc_config(source) + # PostHog only manages the publication in posthog-managed mode. + if cdc_config.management_mode != "posthog" or not cdc_config.publication_name: + return + try: + with cdc_pg_connection(source) as conn: + if add: + add_table_to_publication(conn, cdc_config.publication_name, schema, table) + else: + remove_table_from_publication(conn, cdc_config.publication_name, schema, table) + except Exception: + logger.exception( + "Failed to %s table %s.%s %s CDC publication '%s' (best-effort), source_id=%s", + "add" if add else "remove", + schema, + table, + "to" if add else "from", + cdc_config.publication_name, + source.id, + ) + + def _resolve_schema(self, source: ExternalDataSource) -> str: + raw = (source.job_inputs or {}).get("schema") + return raw.strip() if isinstance(raw, str) and raw.strip() else "public" diff --git a/posthog/temporal/data_imports/sources/postgres/cdc/decoder.py b/posthog/temporal/data_imports/sources/postgres/cdc/decoder.py index 0bbaedc3caa4..9390de0a7e1f 100644 --- a/posthog/temporal/data_imports/sources/postgres/cdc/decoder.py +++ b/posthog/temporal/data_imports/sources/postgres/cdc/decoder.py @@ -74,6 +74,13 @@ class Relation: replica_identity: int # 0=default, 1=nothing, 2=full, 3=index columns: list[RelationColumn] = field(default_factory=list) + @property + def qualified_name(self) -> str: + """`schema.table` — matches the qualified `ExternalDataSchema.name` the CDC + extraction activity keys on. Emitting the bare table name silently dropped + every change for qualified (multi-schema-era) schemas.""" + return f"{self.schema_name}.{self.table_name}" + class PgOutputDecoder: """Stateful decoder for pgoutput binary messages. @@ -231,7 +238,7 @@ def _handle_insert(self, payload: bytes, lsn: str) -> None: self._tx_buffer.append( ChangeEvent( operation="I", - table_name=relation.table_name, + table_name=relation.qualified_name, position_serialized=lsn, timestamp=self._tx_timestamp or datetime.now(tz=UTC), columns=columns, @@ -268,7 +275,7 @@ def _handle_update(self, payload: bytes, lsn: str) -> None: self._tx_buffer.append( ChangeEvent( operation="U", - table_name=relation.table_name, + table_name=relation.qualified_name, position_serialized=lsn, timestamp=self._tx_timestamp or datetime.now(tz=UTC), columns=columns, @@ -298,7 +305,7 @@ def _handle_delete(self, payload: bytes, lsn: str) -> None: self._tx_buffer.append( ChangeEvent( operation="D", - table_name=relation.table_name, + table_name=relation.qualified_name, position_serialized=lsn, timestamp=self._tx_timestamp or datetime.now(tz=UTC), columns=columns, @@ -321,7 +328,7 @@ def _handle_truncate(self, payload: bytes) -> None: relation.schema_name, relation.table_name, ) - self._truncated_tables.append(relation.table_name) + self._truncated_tables.append(relation.qualified_name) # --- Helpers --- diff --git a/posthog/temporal/data_imports/sources/postgres/cdc/slot_manager.py b/posthog/temporal/data_imports/sources/postgres/cdc/slot_manager.py index 209817be0032..7d6b44c65ed3 100644 --- a/posthog/temporal/data_imports/sources/postgres/cdc/slot_manager.py +++ b/posthog/temporal/data_imports/sources/postgres/cdc/slot_manager.py @@ -108,12 +108,10 @@ def create_slot(conn: psycopg.Connection, slot_name: str) -> str: return consistent_point -def drop_slot_and_publication( - conn: psycopg.Connection, - slot_name: str, - pub_name: str, -) -> None: - """Drop a replication slot and publication. Best-effort — logs and continues on errors.""" +def drop_slot(conn: psycopg.Connection, slot_name: str) -> None: + """Drop just the replication slot. Best-effort — used by self-managed rollback, + where the publication is customer-owned and must not be touched. + """ with conn.cursor() as cur: try: cur.execute( @@ -128,6 +126,15 @@ def drop_slot_and_publication( conn.rollback() logger.exception("Failed to drop replication slot '%s'", slot_name) + +def drop_slot_and_publication( + conn: psycopg.Connection, + slot_name: str, + pub_name: str, +) -> None: + """Drop a replication slot and publication. Best-effort — logs and continues on errors.""" + drop_slot(conn, slot_name) + with conn.cursor() as cur: try: cur.execute( sql.SQL("DROP PUBLICATION IF EXISTS {}").format(sql.Identifier(pub_name)), diff --git a/posthog/temporal/data_imports/sources/postgres/cdc/tests/test_adapter.py b/posthog/temporal/data_imports/sources/postgres/cdc/tests/test_adapter.py new file mode 100644 index 000000000000..42479f7fabc2 --- /dev/null +++ b/posthog/temporal/data_imports/sources/postgres/cdc/tests/test_adapter.py @@ -0,0 +1,128 @@ +from unittest.mock import MagicMock, patch + +from posthog.temporal.data_imports.sources.postgres.cdc.adapter import PostgresCDCAdapter + +_ADAPTER = "posthog.temporal.data_imports.sources.postgres.cdc.adapter" + + +def _source(**cdc_overrides): + job_inputs = { + "host": "localhost", + "port": 5432, + "database": "app", + "user": "user", + "password": "pass", + "schema": "public", + **cdc_overrides, + } + source = MagicMock() + source.job_inputs = job_inputs + return source + + +def _fake_conn(): + cm = MagicMock() + cm.return_value.__enter__.return_value = object() + cm.return_value.__exit__.return_value = None + return cm + + +class TestSetupResourcesPreflight: + @patch(f"{_ADAPTER}.drop_slot_and_publication") + @patch(f"{_ADAPTER}.create_slot_and_publication") + @patch(f"{_ADAPTER}.publication_exists", return_value=False) + @patch(f"{_ADAPTER}.slot_exists", return_value=True) + @patch(f"{_ADAPTER}.cdc_pg_connection", new_callable=_fake_conn) + def test_posthog_refuses_when_slot_already_exists( + self, _conn, _slot_exists, _pub_exists, mock_create, mock_drop + ) -> None: + fields, error = PostgresCDCAdapter().setup_resources( + _source(), {"cdc_management_mode": "posthog", "cdc_slot_name": "existing_slot"} + ) + assert fields == {} + assert error is not None and "already exists" in error + # Must not create, and must not roll back (drop) a slot it didn't create. + mock_create.assert_not_called() + mock_drop.assert_not_called() + + @patch(f"{_ADAPTER}.drop_slot_and_publication") + @patch(f"{_ADAPTER}.create_slot_and_publication") + @patch(f"{_ADAPTER}.publication_exists", return_value=True) + @patch(f"{_ADAPTER}.slot_exists", return_value=False) + @patch(f"{_ADAPTER}.cdc_pg_connection", new_callable=_fake_conn) + def test_posthog_refuses_when_publication_already_exists( + self, _conn, _slot_exists, _pub_exists, mock_create, mock_drop + ) -> None: + fields, error = PostgresCDCAdapter().setup_resources( + _source(), {"cdc_management_mode": "posthog", "cdc_publication_name": "existing_pub"} + ) + assert fields == {} + assert error is not None and "already exists" in error + mock_create.assert_not_called() + mock_drop.assert_not_called() + + @patch(f"{_ADAPTER}.drop_slot") + @patch(f"{_ADAPTER}.create_slot") + @patch(f"{_ADAPTER}.publication_exists", return_value=True) + @patch(f"{_ADAPTER}.slot_exists", return_value=True) + @patch(f"{_ADAPTER}.cdc_pg_connection", new_callable=_fake_conn) + def test_self_managed_refuses_when_slot_already_exists( + self, _conn, _slot_exists, _pub_exists, mock_create, mock_drop + ) -> None: + fields, error = PostgresCDCAdapter().setup_resources( + _source(), {"cdc_management_mode": "self_managed", "cdc_slot_name": "existing_slot"} + ) + assert fields == {} + assert error is not None and "already exists" in error + mock_create.assert_not_called() + mock_drop.assert_not_called() + + @patch(f"{_ADAPTER}.drop_slot_and_publication") + @patch(f"{_ADAPTER}.create_slot_and_publication", side_effect=RuntimeError("boom")) + @patch(f"{_ADAPTER}.publication_exists", return_value=False) + @patch(f"{_ADAPTER}.slot_exists", return_value=False) + @patch(f"{_ADAPTER}.cdc_pg_connection", new_callable=_fake_conn) + def test_posthog_rolls_back_only_after_verifying_absence( + self, _conn, _slot_exists, _pub_exists, _create, mock_drop + ) -> None: + # Both verified absent, then create fails → safe to drop what we just made. + fields, error = PostgresCDCAdapter().setup_resources( + _source(), + {"cdc_management_mode": "posthog", "cdc_slot_name": "s", "cdc_publication_name": "p"}, + ) + assert fields == {} + assert error is not None and "boom" in error + mock_drop.assert_called_once() + assert mock_drop.call_args.args[1:] == ("s", "p") + + +class TestAlterPublicationMembership: + @patch(f"{_ADAPTER}.add_table_to_publication") + @patch(f"{_ADAPTER}.cdc_pg_connection", new_callable=_fake_conn) + def test_add_table_noop_for_self_managed(self, _conn, mock_add) -> None: + source = _source(cdc_enabled=True, cdc_management_mode="self_managed", cdc_publication_name="pub") + PostgresCDCAdapter().add_table(source, "public", "orders") + mock_add.assert_not_called() + + @patch(f"{_ADAPTER}.add_table_to_publication") + @patch(f"{_ADAPTER}.cdc_pg_connection", new_callable=_fake_conn) + def test_add_table_for_posthog_managed(self, _conn, mock_add) -> None: + source = _source(cdc_enabled=True, cdc_management_mode="posthog", cdc_publication_name="pub") + PostgresCDCAdapter().add_table(source, "public", "orders") + mock_add.assert_called_once() + assert mock_add.call_args.args[1:] == ("pub", "public", "orders") + + @patch(f"{_ADAPTER}.remove_table_from_publication") + @patch(f"{_ADAPTER}.cdc_pg_connection", new_callable=_fake_conn) + def test_remove_table_for_posthog_managed(self, _conn, mock_remove) -> None: + source = _source(cdc_enabled=True, cdc_management_mode="posthog", cdc_publication_name="pub") + PostgresCDCAdapter().remove_table(source, "analytics", "events") + mock_remove.assert_called_once() + assert mock_remove.call_args.args[1:] == ("pub", "analytics", "events") + + @patch(f"{_ADAPTER}.add_table_to_publication") + @patch(f"{_ADAPTER}.cdc_pg_connection", new_callable=_fake_conn) + def test_add_table_noop_without_publication(self, _conn, mock_add) -> None: + source = _source(cdc_enabled=True, cdc_management_mode="posthog") + PostgresCDCAdapter().add_table(source, "public", "orders") + mock_add.assert_not_called() diff --git a/posthog/temporal/data_imports/sources/postgres/cdc/tests/test_decoder.py b/posthog/temporal/data_imports/sources/postgres/cdc/tests/test_decoder.py index 2915338a5c31..3a7ba9c63d54 100644 --- a/posthog/temporal/data_imports/sources/postgres/cdc/tests/test_decoder.py +++ b/posthog/temporal/data_imports/sources/postgres/cdc/tests/test_decoder.py @@ -145,6 +145,20 @@ def test_begin_commit_empty_transaction(self): events = decoder.decode_message(commit, "0/200") assert events == [] + def test_event_table_name_is_schema_qualified(self): + # ChangeEvent.table_name must be `schema.table` to match the qualified + # ExternalDataSchema.name the CDC extraction activity filters on. A bare table + # name silently drops every change for multi-schema-era (qualified) schemas. + decoder = self._setup_decoder_with_relation( + relation_id=7, table="cdc_test_orders", columns=[("id", _OID_INT4, -1)] + ) + decoder.decode_message(_make_begin(), "0/100") + decoder.decode_message(_make_insert(7, [("t", "1")]), "0/150") + events = decoder.decode_message(_make_commit(), "0/200") + + assert len(events) == 1 + assert events[0].table_name == "public.cdc_test_orders" + def test_insert_event(self): decoder = self._setup_decoder_with_relation(columns=[("id", _OID_INT4, -1), ("name", _OID_TEXT, -1)]) @@ -159,7 +173,7 @@ def test_insert_event(self): assert len(events) == 1 event = events[0] assert event.operation == "I" - assert event.table_name == "users" + assert event.table_name == "public.users" assert event.columns["id"] == 42 assert event.columns["name"] == "Alice" @@ -177,7 +191,7 @@ def test_update_event(self): assert len(events) == 1 event = events[0] assert event.operation == "U" - assert event.table_name == "users" + assert event.table_name == "public.users" assert event.columns["id"] == 42 assert event.columns["name"] == "Bob" @@ -214,7 +228,7 @@ def test_delete_event(self): assert len(events) == 1 event = events[0] assert event.operation == "D" - assert event.table_name == "users" + assert event.table_name == "public.users" assert event.columns["id"] == 42 assert event.columns.get("name") is None @@ -359,7 +373,7 @@ def test_truncate_marks_table(self): truncate = _make_truncate([1]) decoder.decode_message(truncate, "0/100") - assert decoder.truncated_tables == ["users"] + assert decoder.truncated_tables == ["public.users"] decoder.clear_truncated_tables() assert decoder.truncated_tables == [] @@ -378,9 +392,9 @@ def test_multiple_tables(self): events = decoder.decode_message(_make_commit(), "0/200") assert len(events) == 2 - assert events[0].table_name == "users" + assert events[0].table_name == "public.users" assert events[0].columns["id"] == 1 - assert events[1].table_name == "orders" + assert events[1].table_name == "public.orders" assert events[1].columns["id"] == 100 assert events[1].columns["user_id"] == 1 diff --git a/products/data_warehouse/backend/api/external_data_schema.py b/products/data_warehouse/backend/api/external_data_schema.py index 0aadab41a12f..9ed44b02dd0d 100644 --- a/products/data_warehouse/backend/api/external_data_schema.py +++ b/products/data_warehouse/backend/api/external_data_schema.py @@ -18,12 +18,12 @@ from posthog.exceptions_capture import capture_exception from posthog.models.activity_logging.activity_log import ActivityContextBase, Detail, changes_between, log_activity from posthog.models.signals import model_activity_signal, mutable_receiver +from posthog.temporal.data_imports.cdc.adapters import get_cdc_adapter from posthog.temporal.data_imports.sources import SourceRegistry from posthog.temporal.data_imports.sources.common.base import WebhookSource from posthog.temporal.data_imports.sources.common.sql import ( filter_dwh_columns_by_enabled_columns as _filter_dwh_columns_by_enabled_columns, ) -from posthog.temporal.data_imports.sources.postgres.cdc.config import PostgresCDCConfig from products.data_warehouse.backend.data_load.service import ( cancel_external_data_workflow, @@ -424,6 +424,14 @@ def update(self, instance: ExternalDataSchema, validated_data: dict[str, Any]) - # discovery already stored; refuse the switch when neither is set. new_pk = data.get("primary_key_columns") if new_pk: + old_pk = payload.get("primary_key_columns") + # Same rule as incremental: the PK is the merge key, so it can't change once data + # has synced (`instance.table` exists). Delete the synced data to change it. + if new_pk != old_pk and instance.table is not None: + raise ValidationError( + "Primary key cannot be changed after data has been synced. " + "Delete the synced data first, then change the primary key." + ) payload["primary_key_columns"] = new_pk elif not payload.get("primary_key_columns"): raise ValidationError( @@ -661,12 +669,12 @@ def _handle_cdc_publication_change( should_sync: bool | None, sync_type: str | None, ) -> None: - """Manage CDC publication tables when a schema is toggled or newly set to CDC.""" - cdc_config = PostgresCDCConfig.from_source(source) + """Add/remove the table from the CDC capture set when a schema is toggled or set to CDC.""" + adapter = get_cdc_adapter(source) + cdc_config = adapter.parse_cdc_config(source) if cdc_config.management_mode != "posthog" or not cdc_config.publication_name: return - pub_name = cdc_config.publication_name _, db_schema, source_table_name = get_postgres_source_location( schema_name=instance.name, schema_metadata=instance.schema_metadata, @@ -677,9 +685,9 @@ def _handle_cdc_publication_change( sync_type == ExternalDataSchema.SyncType.CDC and instance.sync_type != ExternalDataSchema.SyncType.CDC ) - # Add table to publication when enabling CDC or toggling sync on + # Add table to capture set when enabling CDC or toggling sync on if newly_set_to_cdc or (should_sync is True and not instance.should_sync): - self._alter_cdc_publication(source, pub_name, db_schema, source_table_name, action="add") + adapter.add_table(source, db_schema, source_table_name) # Always force a full re-snapshot on re-enable: while removed from the # publication the replication slot kept advancing, so any changes made @@ -689,39 +697,9 @@ def _handle_cdc_publication_change( instance.initial_sync_complete = False instance.save(update_fields=["sync_type_config", "initial_sync_complete", "updated_at"]) - # Remove table from publication when toggling sync off + # Remove table from capture set when toggling sync off elif should_sync is False and instance.should_sync: - self._alter_cdc_publication(source, pub_name, db_schema, source_table_name, action="remove") - - def _alter_cdc_publication( - self, - source: ExternalDataSource, - pub_name: str, - db_schema: str, - table_name: str, - action: str, - ) -> None: - """Best-effort add/remove a table from the CDC publication.""" - from posthog.temporal.data_imports.sources.postgres.cdc.slot_manager import ( - add_table_to_publication, - cdc_pg_connection, - remove_table_from_publication, - ) - - try: - with cdc_pg_connection(source) as conn: - if action == "add": - add_table_to_publication(conn, pub_name, db_schema, table_name) - else: - remove_table_from_publication(conn, pub_name, db_schema, table_name) - except Exception as e: - logger.exception( - "Failed to alter CDC publication", - action=action, - table=table_name, - pub_name=pub_name, - error=str(e), - ) + adapter.remove_table(source, db_schema, source_table_name) class SimpleExternalDataSchemaSerializer(serializers.ModelSerializer): diff --git a/products/data_warehouse/backend/api/external_data_source.py b/products/data_warehouse/backend/api/external_data_source.py index 3d1131a1d643..3afda70802b7 100644 --- a/products/data_warehouse/backend/api/external_data_source.py +++ b/products/data_warehouse/backend/api/external_data_source.py @@ -42,6 +42,7 @@ from posthog.models.user import User from posthog.rbac.access_control_api_mixin import AccessControlViewSetMixin from posthog.rbac.user_access_control import UserAccessControlSerializerMixin +from posthog.temporal.data_imports.cdc.adapters import CDCSourceAdapter, get_cdc_adapter from posthog.temporal.data_imports.sources import SourceRegistry from posthog.temporal.data_imports.sources.common.base import AnySource, ExternalWebhookInfo, FieldType, WebhookSource from posthog.temporal.data_imports.sources.common.config import Config @@ -52,7 +53,8 @@ is_custom_source_available_for_team, manifest_request_hosts, ) -from posthog.temporal.data_imports.sources.postgres.cdc.config import PostgresCDCConfig +from posthog.temporal.data_imports.sources.postgres.cdc.slot_manager import cdc_pg_connection +from posthog.temporal.data_imports.sources.postgres.postgres import get_primary_key_columns, source_requires_ssl from posthog.temporal.data_imports.sources.postgres.source import PostgresSource from products.cdp.backend.api.hog_function import HogFunctionSerializer @@ -66,10 +68,13 @@ bulk_create_external_data_job_schedules, bulk_delete_external_data_schedules, cancel_external_data_workflow, + delete_cdc_extraction_schedule, delete_discover_schemas_schedule, delete_external_data_schedule, + ensure_cdc_slot_cleanup_schedule, is_any_external_data_schema_paused, is_cdc_enabled_for_team, + sync_cdc_extraction_schedule, sync_discover_schemas_schedule, trigger_external_data_source_workflow, ) @@ -223,6 +228,20 @@ def get_nonsensitive_and_sensitive_field_names(fields: list[FieldType]) -> tuple # Config metadata keys that are always safe to include in nested dicts _CONFIG_META_KEYS = {"selection", "enabled"} +# CDC config lives in job_inputs but isn't part of any source's user-facing form field +# tree, so it would otherwise be stripped from API reads as "unknown". None of these are +# secrets — they're operational config the Configuration page needs to render CDC state. +_CDC_EXPOSED_JOB_INPUT_KEYS = { + "cdc_enabled", + "cdc_management_mode", + "cdc_slot_name", + "cdc_publication_name", + "cdc_auto_drop_slot", + "cdc_lag_warning_threshold_mb", + "cdc_lag_critical_threshold_mb", + "cdc_consistent_point", +} + def strip_sensitive_from_dict(data: dict, nonsensitive: set[str], sensitive: set[str]) -> dict: """Return a copy of data with sensitive and unknown keys removed. @@ -278,8 +297,6 @@ def get_direct_postgres_connection_metadata( if not callable(metadata_fetcher): return fallback or {} - from posthog.temporal.data_imports.sources.postgres.postgres import source_requires_ssl - require_ssl = source_model is not None and source_requires_ssl(source_model, source_config) try: @@ -567,6 +584,8 @@ def to_representation(self, instance): source_type_model = ExternalDataSourceType(instance.source_type) source = SourceRegistry.get_source(source_type_model) nonsensitive, sensitive = get_nonsensitive_and_sensitive_field_names(source.get_source_config.fields) + # CDC fields aren't form fields but are non-secret operational config the UI needs. + nonsensitive = nonsensitive | _CDC_EXPOSED_JOB_INPUT_KEYS except (ValueError, KeyError): representation["job_inputs"] = {} return representation @@ -956,8 +975,12 @@ class ExternalDataSourceViewSet(TeamAndOrgViewSetMixin, AccessControlViewSetMixi "update_webhook_inputs", "delete_webhook", "check_cdc_prerequisites", + "check_cdc_prerequisites_for_source", + "enable_cdc", + "disable_cdc", + "update_cdc_settings", ] - scope_object_read_actions = ["list", "retrieve", "jobs", "wizard", "webhook_info", "connections"] + scope_object_read_actions = ["list", "retrieve", "jobs", "wizard", "webhook_info", "connections", "cdc_status"] queryset = ExternalDataSource.objects.all() serializer_class = ExternalDataSourceSerializers filter_backends = [filters.SearchFilter] @@ -1108,12 +1131,14 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response: access_method=access_method, ) - # CDC is Postgres-only today — fold the source-type check in here so every downstream - # `if cdc_enabled` block is safe without repeating it. + # CDC: gate per-source-type adapter availability up front so downstream blocks + # can `if cdc_enabled` without repeating the source-type check. + try: + cdc_adapter: CDCSourceAdapter | None = get_cdc_adapter(new_source_model) + except ValueError: + cdc_adapter = None cdc_enabled = ( - payload.get("cdc_enabled", False) - and source_type_model == ExternalDataSourceType.POSTGRES - and is_cdc_enabled_for_team(self.team) + payload.get("cdc_enabled", False) and cdc_adapter is not None and is_cdc_enabled_for_team(self.team) ) source_schemas = source.get_schemas(source_config, self.team_id) @@ -1146,7 +1171,7 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response: data={"message": "Schemas given do not exist in source"}, ) - # Refuse per-schema `sync_type=cdc` when source-level CDC is off — `_setup_cdc_slot` + # Refuse per-schema `sync_type=cdc` when source-level CDC is off — `_setup_cdc_resources` # would be skipped, leaving the source with no replication slot/publication. if not cdc_enabled: cdc_schemas_in_payload = sorted( @@ -1194,9 +1219,6 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response: cdc_schema_name_by_location[(resolved_source_schema, resolved_source_table_name)] = schema_name if cdc_table_names_by_schema: - from posthog.temporal.data_imports.sources.postgres.cdc.slot_manager import cdc_pg_connection - from posthog.temporal.data_imports.sources.postgres.postgres import get_primary_key_columns - with cdc_pg_connection(new_source_model) as conn: for db_schema, cdc_table_names in cdc_table_names_by_schema.items(): queried_pks = get_primary_key_columns(conn, db_schema, list(cdc_table_names)) @@ -1205,7 +1227,7 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response: if schema_name is not None: pk_columns_by_table[schema_name] = primary_key_columns - # CDC needs a PK for UPDATE/DELETE merges. Refuse here so `_setup_cdc_slot` doesn't + # CDC needs a PK for UPDATE/DELETE merges. Refuse here so `_setup_cdc_resources` doesn't # create replication state on the source for a config we're about to reject. tables_missing_pk = sorted( { @@ -1229,12 +1251,17 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response: }, ) - # Slot + publication setup runs after PK validation so we don't leave replication state - # on the source for a config we're about to refuse. + # Engine-side CDC resource setup runs after PK validation so we don't leave + # replication state on the source for a config we're about to refuse. if cdc_enabled: - cdc_result = self._setup_cdc_slot(source, source_config, new_source_model, payload) - if cdc_result is not None: - return cdc_result + assert cdc_adapter is not None # narrowed by `cdc_enabled` + cdc_error = self._setup_cdc_resources(cdc_adapter, new_source_model, payload) + if cdc_error is not None: + new_source_model.delete() + return Response( + status=status.HTTP_400_BAD_REQUEST, + data={"message": cdc_error}, + ) # Create all ExternalDataSchema objects and enable syncing for active schemas for schema in payload_schemas: @@ -1349,16 +1376,13 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response: # CDC + direct-postgres paths are Postgres-only — `get_postgres_source_table_location` # guarantees non-None schema/table in that branch above. `cast` narrows for mypy - # without a runtime check. - if is_cdc_schema and should_sync and cdc_enabled: - cdc_config = PostgresCDCConfig.from_source(new_source_model) - if cdc_config.management_mode == "posthog" and cdc_config.publication_name: - self._add_table_to_cdc_publication( - new_source_model, - cdc_config.publication_name, - cast(str, metadata_source_schema), - cast(str, metadata_source_table_name), - ) + # without a runtime check. The adapter no-ops for self-managed / no-publication. + if is_cdc_schema and should_sync and cdc_enabled and cdc_adapter is not None: + cdc_adapter.add_table( + new_source_model, + cast(str, metadata_source_schema), + cast(str, metadata_source_table_name), + ) if new_source_model.is_direct_postgres and should_sync: # Apply the picker's column subset on the very first DataWarehouseTable build, @@ -1416,11 +1440,6 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response: # Start CDC extraction schedule if any CDC schemas are active if cdc_enabled: try: - from products.data_warehouse.backend.data_load.service import ( - ensure_cdc_slot_cleanup_schedule, - sync_cdc_extraction_schedule, - ) - sync_cdc_extraction_schedule(new_source_model, create=True) ensure_cdc_slot_cleanup_schedule() except Exception as e: @@ -1436,115 +1455,35 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response: return Response(status=status.HTTP_201_CREATED, data={"id": new_source_model.pk}) - def _setup_cdc_slot( - self, source_impl, source_config, source_model: ExternalDataSource, payload: dict - ) -> Response | None: - """Set up CDC replication slot and publication on the source database. - - PostHog-managed: PostHog creates both the publication and the slot (requires - table ownership on the source, plus REPLICATION). - - Self-managed: the customer's DBA creates the publication out-of-band; PostHog - only verifies it exists and then creates the slot itself (publication creation - requires table ownership, slot creation only requires REPLICATION — which the - PostHog user must have either way to read the slot). + def _setup_cdc_resources( + self, adapter: CDCSourceAdapter, source_model: ExternalDataSource, payload: dict + ) -> str | None: + """Provision CDC for an existing source by delegating to the engine adapter. - Updates source_model.job_inputs with CDC config. Returns a Response on error, - None on success. + Writes universal CDC fields (mode, lag thresholds, auto-drop policy) plus the + adapter-supplied resource fields (slot/publication identifiers, consistent + point, …) into ``source_model.job_inputs`` and saves. Returns an error string + on failure, or None on success. Callers decide whether to delete the source + on failure (create flow does; enable_cdc does not). """ - from posthog.temporal.data_imports.sources.postgres.cdc.slot_manager import cdc_pg_connection + resource_fields, error = adapter.setup_resources(source_model, payload) + if error is not None: + return error - management_mode = payload.get("cdc_management_mode", "posthog") - slot_name = payload.get("cdc_slot_name") or f"posthog_{source_model.id.hex[:12]}" - default_pub_name = ( - "posthog_pub" if management_mode == "self_managed" else f"posthog_pub_{source_model.id.hex[:12]}" - ) - pub_name = payload.get("cdc_publication_name") or default_pub_name - - # Store CDC config in job_inputs - job_inputs = source_model.job_inputs or {} + job_inputs = dict(source_model.job_inputs or {}) job_inputs.update( { "cdc_enabled": True, - "cdc_management_mode": management_mode, - "cdc_slot_name": slot_name, - "cdc_publication_name": pub_name, "cdc_auto_drop_slot": payload.get("cdc_auto_drop_slot", True), "cdc_lag_warning_threshold_mb": payload.get("cdc_lag_warning_threshold_mb", 1024), "cdc_lag_critical_threshold_mb": payload.get("cdc_lag_critical_threshold_mb", 10240), } ) - - if management_mode == "posthog": - from posthog.temporal.data_imports.sources.postgres.cdc.slot_manager import create_slot_and_publication - - try: - with cdc_pg_connection(source_model) as conn: - consistent_point = create_slot_and_publication( - conn, slot_name, pub_name, source_config.schema, tables=[] - ) - job_inputs["cdc_consistent_point"] = consistent_point - except Exception as e: - source_model.delete() - logger.exception("Failed to create CDC slot and publication", error=str(e)) - return Response( - status=status.HTTP_400_BAD_REQUEST, - data={ - "message": f"Failed to create replication slot: {e}", - "detail": str(e), - }, - ) - - elif management_mode == "self_managed": - from posthog.temporal.data_imports.sources.postgres.cdc.slot_manager import create_slot, publication_exists - - try: - with cdc_pg_connection(source_model) as conn: - if not publication_exists(conn, pub_name): - source_model.delete() - return Response( - status=status.HTTP_400_BAD_REQUEST, - data={ - "message": ( - f"Publication '{pub_name}' does not exist. Run the CREATE PUBLICATION " - f"statement we showed you, then retry." - ) - }, - ) - consistent_point = create_slot(conn, slot_name) - job_inputs["cdc_consistent_point"] = consistent_point - except Exception as e: - source_model.delete() - logger.exception("Failed to set up self-managed CDC slot", error=str(e)) - return Response( - status=status.HTTP_400_BAD_REQUEST, - data={"message": f"Failed to create replication slot: {e}"}, - ) - + job_inputs.update(resource_fields) source_model.job_inputs = job_inputs source_model.save(update_fields=["job_inputs", "updated_at"]) return None - def _add_table_to_cdc_publication( - self, source_model: ExternalDataSource, pub_name: str, db_schema: str, table_name: str - ) -> None: - """Best-effort add a table to the CDC publication during source creation.""" - from posthog.temporal.data_imports.sources.postgres.cdc.slot_manager import ( - add_table_to_publication, - cdc_pg_connection, - ) - - try: - with cdc_pg_connection(source_model) as conn: - add_table_to_publication(conn, pub_name, db_schema, table_name) - except Exception as e: - logger.exception( - "Failed to add table to CDC publication", - table=table_name, - pub_name=pub_name, - error=str(e), - ) - def prefix_required(self, source_type: str) -> bool: # A prefix is only needed when a no-prefix source of the same type already # exists. Two no-prefix sources would write to the same table names; sources @@ -1935,8 +1874,6 @@ def check_cdc_prerequisites(self, request: Request, *arg: Any, **kwargs: Any): data={"message": "CDC prerequisite checks are only supported for Postgres."}, ) - from posthog.temporal.data_imports.sources.postgres.source import PostgresSource - source_impl: PostgresSource = PostgresSource() is_valid, errors = source_impl.validate_config(request.data) if not is_valid: @@ -1995,6 +1932,349 @@ def check_cdc_prerequisites(self, request: Request, *arg: Any, **kwargs: Any): data={"valid": len(prereq_errors) == 0, "errors": prereq_errors}, ) + def _get_cdc_adapter_or_400(self, instance: ExternalDataSource) -> tuple[CDCSourceAdapter | None, Response | None]: + """Look up the engine adapter for an existing source. Returns 400 if the + source's type doesn't support CDC.""" + try: + return get_cdc_adapter(instance), None + except ValueError: + return None, Response( + status=status.HTTP_400_BAD_REQUEST, + data={"message": f"CDC is not supported for source type: {instance.source_type}"}, + ) + + @action(methods=["POST"], detail=True) + def check_cdc_prerequisites_for_source(self, request: Request, *arg: Any, **kwargs: Any): + """Validate CDC prerequisites for an existing source using its stored credentials. + + The detail=False ``check_cdc_prerequisites`` action is for the creation wizard, + where the client still holds the raw connection config (incl. password) in the + form. On the Configuration page the source already exists and secret fields are + stripped from API responses — so the client can't supply them. This reads the + stored (encrypted) credentials from the DB via the adapter instead. + + Body params: ``cdc_management_mode`` (``"posthog"`` | ``"self_managed"``), + ``cdc_slot_name`` (optional), ``cdc_publication_name`` (optional). + """ + instance: ExternalDataSource = self.get_object() + + adapter, err = self._get_cdc_adapter_or_400(instance) + if err is not None: + return err + assert adapter is not None # narrowed by _get_cdc_adapter_or_400 + + management_mode = request.data.get("cdc_management_mode", "posthog") + if management_mode not in ("posthog", "self_managed"): + return Response( + status=status.HTTP_400_BAD_REQUEST, + data={"message": "cdc_management_mode must be 'posthog' or 'self_managed'."}, + ) + + schema_hint = (instance.job_inputs or {}).get("schema") or "public" + try: + prereq_errors = adapter.validate_prerequisites( + instance, + management_mode=management_mode, + tables=[], + schema=schema_hint, + slot_name=request.data.get("cdc_slot_name") or None, + publication_name=request.data.get("cdc_publication_name") or None, + ) + except Exception as e: + capture_exception(e, {"source_id": str(instance.id), "team_id": self.team_id}) + return Response( + status=status.HTTP_400_BAD_REQUEST, + data={"message": f"Could not connect to source to check prerequisites: {e}"}, + ) + + return Response( + status=status.HTTP_200_OK, + data={"valid": len(prereq_errors) == 0, "errors": prereq_errors}, + ) + + @action(methods=["POST"], detail=True) + def enable_cdc(self, request: Request, *arg: Any, **kwargs: Any): + """Enable CDC on an existing source. + + Provisions engine-side CDC resources via the source's adapter, writes the CDC + config into ``source.job_inputs``, and ensures the CDC extraction schedule + exists. Re-runs prereq checks server-side so we never trust a stale + client-side check. + + Body params: ``cdc_management_mode`` (``"posthog"`` | ``"self_managed"``), + plus engine-specific identifier hints (e.g. ``cdc_slot_name``, + ``cdc_publication_name`` for Postgres). Universal tuning fields: + ``cdc_auto_drop_slot`` (optional bool), ``cdc_lag_warning_threshold_mb`` + (optional int), ``cdc_lag_critical_threshold_mb`` (optional int). + """ + instance: ExternalDataSource = self.get_object() + + adapter, err = self._get_cdc_adapter_or_400(instance) + if err is not None: + return err + assert adapter is not None # narrowed by _get_cdc_adapter_or_400 + + if not is_cdc_enabled_for_team(self.team): + return Response( + status=status.HTTP_403_FORBIDDEN, + data={"message": "CDC is not enabled for this team."}, + ) + + existing = adapter.parse_cdc_config(instance) + if existing.enabled: + return Response( + status=status.HTTP_409_CONFLICT, + data={"message": "CDC is already enabled on this source."}, + ) + + management_mode = request.data.get("cdc_management_mode", "posthog") + if management_mode not in ("posthog", "self_managed"): + return Response( + status=status.HTTP_400_BAD_REQUEST, + data={"message": "cdc_management_mode must be 'posthog' or 'self_managed'."}, + ) + + # Validate prerequisites server-side — never trust a client-only check. + schema_hint = (instance.job_inputs or {}).get("schema") or "public" + try: + prereq_errors = adapter.validate_prerequisites( + instance, + management_mode=management_mode, + tables=[], + schema=schema_hint, + slot_name=request.data.get("cdc_slot_name") or None, + publication_name=request.data.get("cdc_publication_name") or None, + ) + except Exception as e: + capture_exception(e, {"source_id": str(instance.id), "team_id": self.team_id}) + return Response( + status=status.HTTP_400_BAD_REQUEST, + data={"message": f"Could not connect to source to check prerequisites: {e}"}, + ) + + if prereq_errors: + return Response( + status=status.HTTP_400_BAD_REQUEST, + data={"message": "CDC prerequisites not met.", "errors": prereq_errors}, + ) + + cdc_error = self._setup_cdc_resources(adapter, instance, request.data) + if cdc_error is not None: + return Response( + status=status.HTTP_400_BAD_REQUEST, + data={"message": cdc_error}, + ) + + # Ensure the global cleanup schedule exists. There are no CDC schemas yet (the user + # picks sync_type=cdc per schema afterward), so `sync_cdc_extraction_schedule` is a + # no-op here — the extraction schedule is authoritatively (re)created when a schema is + # switched to CDC. A failure here therefore can't leave a "CDC on, never runs" state: + # the slot + config are valid and the schedule self-heals on the first CDC schema + # toggle. Surface failures (capture, not just log) and flag them in the response. + schedules_ok = True + try: + sync_cdc_extraction_schedule(instance, create=True) + ensure_cdc_slot_cleanup_schedule() + except Exception as e: + schedules_ok = False + logger.exception("Could not create CDC schedules after enable_cdc", exc_info=e) + capture_exception(e, {"source_id": str(instance.id), "team_id": self.team_id}) + + return Response(status=status.HTTP_200_OK, data={"success": True, "schedules_ready": schedules_ok}) + + @action(methods=["POST"], detail=True) + def disable_cdc(self, request: Request, *arg: Any, **kwargs: Any): + """Disable CDC on an existing source. + + Cancels any running CDC extraction workflow, deletes the extraction schedule, + delegates engine-side teardown to the source's adapter (drops slot/publication + for Postgres; equivalent for other engines), clears ``cdc_*`` keys from + ``job_inputs``, soft-deletes companion CDC tables, and sets all CDC schemas to + ``sync_type=None``, ``should_sync=False`` so the user must pick a new sync + strategy before they resume. + """ + instance: ExternalDataSource = self.get_object() + + adapter, err = self._get_cdc_adapter_or_400(instance) + if err is not None: + return err + assert adapter is not None + + cdc_config = adapter.parse_cdc_config(instance) + if not cdc_config.enabled: + return Response(status=status.HTTP_200_OK, data={"success": True, "already_disabled": True}) + + # Cancel running jobs for this source's CDC schemas — one holding the slot fails + # pg_drop_replication_slot. Scope to CDC schemas so we don't cancel unrelated + # incremental/full-refresh syncs on the same source. Read before the sync_type reset + # below, while these schemas are still marked CDC. + cdc_schema_ids = list( + ExternalDataSchema.objects.filter( + source=instance, + sync_type=ExternalDataSchema.SyncType.CDC, + ) + .exclude(deleted=True) + .values_list("id", flat=True) + ) + running_jobs = ExternalDataJob.objects.filter( + pipeline_id=instance.pk, + team_id=instance.team_id, + status="Running", + schema_id__in=cdc_schema_ids, + ).exclude(workflow_id__isnull=True) + for running_job in running_jobs: + if not running_job.workflow_id: + continue + try: + cancel_external_data_workflow(running_job.workflow_id) + except Exception as e: + capture_exception(e, {"source_id": str(instance.id), "workflow_id": running_job.workflow_id}) + + # Generic schedule teardown: schedule lives on our side, independent of engine. + try: + delete_cdc_extraction_schedule(str(instance.id)) + except Exception: + logger.exception("Failed to delete CDC extraction schedule", extra={"source_id": str(instance.id)}) + + # Engine-side teardown: best-effort, never blocks the disable. + try: + adapter.cleanup_resources(instance) + except Exception as e: + logger.exception("Failed engine-side CDC cleanup during disable_cdc", exc_info=e) + capture_exception(e, {"source_id": str(instance.id)}) + + with transaction.atomic(): + # Force CDC schemas to pick a new strategy by clearing sync_type and pausing. + ExternalDataSchema.objects.filter( + source=instance, + sync_type=ExternalDataSchema.SyncType.CDC, + ).exclude(deleted=True).update(sync_type=None, should_sync=False) + + # Soft-delete `_cdc` companion DataWarehouseTable rows so the next sync + # rebuilds them once the user picks a new strategy. + DataWarehouseTable.objects.filter( + external_data_source_id=instance.id, + team_id=self.team_id, + deleted=False, + name__endswith="_cdc", + ).update(deleted=True) + + # Clear ALL cdc_* keys from job_inputs — leaving stale engine identifiers + # behind (e.g. `cdc_consistent_point`) would corrupt resume tracking if + # CDC is later re-enabled. + job_inputs = dict(instance.job_inputs or {}) + for key in list(job_inputs.keys()): + if key.startswith("cdc_"): + job_inputs.pop(key, None) + instance.job_inputs = job_inputs + instance.save(update_fields=["job_inputs", "updated_at"]) + + return Response(status=status.HTTP_200_OK, data={"success": True}) + + @action(methods=["POST"], detail=True) + def update_cdc_settings(self, request: Request, *arg: Any, **kwargs: Any): + """Update CDC tuning fields without enabling/disabling. + + Lets users edit ``cdc_auto_drop_slot``, ``cdc_lag_warning_threshold_mb``, and + ``cdc_lag_critical_threshold_mb`` independently. These fields are universal + across engines. Engine-specific identifiers (slot name, management mode, …) + are immutable post-enable — switching them requires disable + enable. + """ + instance: ExternalDataSource = self.get_object() + + adapter, err = self._get_cdc_adapter_or_400(instance) + if err is not None: + return err + assert adapter is not None + + cdc_config = adapter.parse_cdc_config(instance) + if not cdc_config.enabled: + return Response( + status=status.HTTP_400_BAD_REQUEST, + data={"message": "CDC is not enabled on this source."}, + ) + + job_inputs = dict(instance.job_inputs or {}) + updates: dict[str, Any] = {} + + if "cdc_auto_drop_slot" in request.data: + updates["cdc_auto_drop_slot"] = bool(request.data["cdc_auto_drop_slot"]) + + for field in ("cdc_lag_warning_threshold_mb", "cdc_lag_critical_threshold_mb"): + if field in request.data: + try: + value = int(request.data[field]) + except (TypeError, ValueError): + return Response( + status=status.HTTP_400_BAD_REQUEST, + data={"message": f"{field} must be an integer."}, + ) + if value < 1: + return Response( + status=status.HTTP_400_BAD_REQUEST, + data={"message": f"{field} must be >= 1."}, + ) + updates[field] = value + + warn = updates.get("cdc_lag_warning_threshold_mb", job_inputs.get("cdc_lag_warning_threshold_mb")) + crit = updates.get("cdc_lag_critical_threshold_mb", job_inputs.get("cdc_lag_critical_threshold_mb")) + if warn is not None and crit is not None and int(warn) >= int(crit): + return Response( + status=status.HTTP_400_BAD_REQUEST, + data={"message": "Warning threshold must be less than critical threshold."}, + ) + + if not updates: + return Response(status=status.HTTP_200_OK, data={"success": True, "unchanged": True}) + + job_inputs.update(updates) + instance.job_inputs = job_inputs + instance.save(update_fields=["job_inputs", "updated_at"]) + + return Response(status=status.HTTP_200_OK, data={"success": True}) + + @action(methods=["GET"], detail=True) + def cdc_status(self, request: Request, *arg: Any, **kwargs: Any): + """Live CDC health for an existing source: slot/publication existence and WAL lag. + + Reads from the source DB via the engine adapter. Returns ``{"enabled": false}`` + when CDC is off, or the stored config plus live ``slot_exists`` / + ``publication_exists`` / ``lag_bytes`` when on. 400s if the source DB is + unreachable so the UI can show a degraded/unreachable state. + """ + instance: ExternalDataSource = self.get_object() + + adapter, err = self._get_cdc_adapter_or_400(instance) + if err is not None: + return err + assert adapter is not None + + cdc_config = adapter.parse_cdc_config(instance) + if not cdc_config.enabled: + return Response(status=status.HTTP_200_OK, data={"enabled": False}) + + try: + live_status = adapter.get_status(instance) + except Exception as e: + capture_exception(e, {"source_id": str(instance.id), "team_id": self.team_id}) + return Response( + status=status.HTTP_400_BAD_REQUEST, + data={"message": f"Could not connect to source to read CDC status: {e}"}, + ) + + return Response( + status=status.HTTP_200_OK, + data={ + "enabled": True, + "management_mode": cdc_config.management_mode, + "slot_name": cdc_config.slot_name, + "publication_name": cdc_config.publication_name, + "lag_warning_threshold_mb": cdc_config.lag_warning_threshold_mb, + "lag_critical_threshold_mb": cdc_config.lag_critical_threshold_mb, + **live_status, + }, + ) + @action(methods=["POST"], detail=False) def source_prefix(self, request: Request, *arg: Any, **kwargs: Any): prefix = request.data.get("prefix", None) diff --git a/products/data_warehouse/backend/api/test/test_cdc_table_mode_patch.py b/products/data_warehouse/backend/api/test/test_cdc_table_mode_patch.py index ce8cd2a61fe2..b0f14a8c1bd6 100644 --- a/products/data_warehouse/backend/api/test/test_cdc_table_mode_patch.py +++ b/products/data_warehouse/backend/api/test/test_cdc_table_mode_patch.py @@ -28,8 +28,10 @@ _PATCH_TARGETS = { "is_cdc_enabled_for_team": "products.data_warehouse.backend.api.external_data_schema.is_cdc_enabled_for_team", + # Single private method backing both add_table/remove_table on the adapter — patching it + # no-ops the engine-side ALTER PUBLICATION without touching parse_cdc_config gating. "alter_cdc_publication": ( - "products.data_warehouse.backend.api.external_data_schema.ExternalDataSchemaSerializer._alter_cdc_publication" + "posthog.temporal.data_imports.sources.postgres.cdc.adapter.PostgresCDCAdapter._alter_publication_membership" ), "external_data_workflow_exists": ( "products.data_warehouse.backend.api.external_data_schema.external_data_workflow_exists" diff --git a/products/data_warehouse/backend/api/test/test_external_data_schema.py b/products/data_warehouse/backend/api/test/test_external_data_schema.py index 9c340e273fa7..cb1a63a894d1 100644 --- a/products/data_warehouse/backend/api/test/test_external_data_schema.py +++ b/products/data_warehouse/backend/api/test/test_external_data_schema.py @@ -405,6 +405,41 @@ def test_update_schema_to_cdc( assert schema.sync_type_config["primary_key_columns"] == expected_pk_columns assert schema.sync_type_config["cdc_mode"] == "snapshot" + def test_update_cdc_schema_rejects_primary_key_change_with_existing_data(self): + # CDC uses the PK as the UPDATE/DELETE merge key, so — same as incremental — it can't be + # changed once data has synced (the schema has a materialized table). + source = ExternalDataSource.objects.create( + team=self.team, + source_type=ExternalDataSourceType.POSTGRES, + job_inputs={"host": "h", "port": 5432, "database": "d", "user": "u", "password": "p", "schema": "public"}, + ) + table = DataWarehouseTable.objects.create(team=self.team) + schema = ExternalDataSchema.objects.create( + name="public.orders", + team=self.team, + source=source, + should_sync=True, + status=ExternalDataSchema.Status.COMPLETED, + sync_type=ExternalDataSchema.SyncType.CDC, + sync_type_config={"cdc_mode": "streaming", "primary_key_columns": ["id"]}, + table=table, + ) + + with mock.patch( + "products.data_warehouse.backend.api.external_data_schema.is_cdc_enabled_for_team", + return_value=True, + ): + response = self.client.patch( + f"/api/environments/{self.team.pk}/external_data_schemas/{schema.id}", + data={"sync_type": "cdc", "primary_key_columns": ["order_key"]}, + ) + + assert response.status_code == 400 + assert "primary key cannot be changed" in str(response.json()).lower() + + schema.refresh_from_db() + assert schema.sync_type_config["primary_key_columns"] == ["id"] + def test_update_schema_enable_should_sync_rejects_cdc_without_primary_key(self): # Schemas already in CDC mode with an empty primary_key_columns (created before the # API gate landed) must not be re-enabled until a PK is added on the source side. @@ -1221,8 +1256,8 @@ def test_update_schema_cdc_with_blank_source_schema_uses_physical_schema_metadat return_value=True, ), mock.patch( - "products.data_warehouse.backend.api.external_data_schema.ExternalDataSchemaSerializer._alter_cdc_publication" - ) as mock_alter_cdc_publication, + "posthog.temporal.data_imports.sources.postgres.cdc.adapter.PostgresCDCAdapter.add_table" + ) as mock_add_table, mock.patch( "products.data_warehouse.backend.api.external_data_schema.external_data_workflow_exists", return_value=False, @@ -1248,13 +1283,10 @@ def test_update_schema_cdc_with_blank_source_schema_uses_physical_schema_metadat ) assert response.status_code == 200 - mock_alter_cdc_publication.assert_called_once() - assert mock_alter_cdc_publication.call_args.args == ( - source, - "test_pub", - "analytics", - "events", - ) + # The adapter reads the publication name from config itself, so the call is + # (source, schema, table). + mock_add_table.assert_called_once() + assert mock_add_table.call_args.args == (source, "analytics", "events") def test_delete_data_hides_direct_postgres_table(self, team, user, client: HttpClient, temporal): client.force_login(user) diff --git a/products/data_warehouse/backend/api/test/test_external_data_source.py b/products/data_warehouse/backend/api/test/test_external_data_source.py index e848ea0e5583..f1e76a23436f 100644 --- a/products/data_warehouse/backend/api/test/test_external_data_source.py +++ b/products/data_warehouse/backend/api/test/test_external_data_source.py @@ -1000,7 +1000,7 @@ def test_list_external_data_source(self): self._create_external_data_source() # A cached instance setting lookup can shave off one query depending on test order. - with self.assertNumQueries(FuzzyInt(24, 25)): + with self.assertNumQueries(FuzzyInt(23, 25)): response = self.client.get(f"/api/environments/{self.team.pk}/external_data_sources/") payload = response.json() @@ -2362,20 +2362,18 @@ def test_create_direct_postgres_blank_schema_prefixes_table_names_and_preserves_ self.assertEqual(analytics_schema.sync_type_config["schema_metadata"]["source_schema"], "analytics") @patch("products.data_warehouse.backend.api.external_data_source.is_cdc_enabled_for_team", return_value=True) - @patch( - "products.data_warehouse.backend.api.external_data_source.ExternalDataSourceViewSet._add_table_to_cdc_publication" - ) - @patch("products.data_warehouse.backend.api.external_data_source.ExternalDataSourceViewSet._setup_cdc_slot") - @patch("posthog.temporal.data_imports.sources.postgres.postgres.get_primary_key_columns") - @patch("posthog.temporal.data_imports.sources.postgres.cdc.slot_manager.cdc_pg_connection") + @patch("posthog.temporal.data_imports.sources.postgres.cdc.adapter.PostgresCDCAdapter.add_table") + @patch("products.data_warehouse.backend.api.external_data_source.ExternalDataSourceViewSet._setup_cdc_resources") + @patch("products.data_warehouse.backend.api.external_data_source.get_primary_key_columns") + @patch("products.data_warehouse.backend.api.external_data_source.cdc_pg_connection") @patch("products.data_warehouse.backend.api.external_data_source.SourceRegistry.get_source") def test_create_postgres_cdc_with_blank_schema_uses_physical_schema_metadata( self, mock_get_source, mock_cdc_pg_connection, mock_get_primary_key_columns, - mock_setup_cdc_slot, - mock_add_table_to_cdc_publication, + mock_setup_cdc_resources, + mock_add_table, _mock_is_cdc_enabled_for_team, ): source_mock = mock_get_source.return_value @@ -2409,7 +2407,7 @@ def test_create_postgres_cdc_with_blank_schema_uses_physical_schema_metadata( mock_cdc_pg_connection.return_value.__exit__.return_value = None mock_get_primary_key_columns.return_value = {"events": ["id"]} - def setup_cdc_slot(_source_impl, _source_config, source_model, _payload): + def setup_cdc_slot(_adapter, source_model, _payload): source_model.job_inputs = { **(source_model.job_inputs or {}), "cdc_enabled": True, @@ -2420,7 +2418,7 @@ def setup_cdc_slot(_source_impl, _source_config, source_model, _payload): source_model.save(update_fields=["job_inputs", "updated_at"]) return None - mock_setup_cdc_slot.side_effect = setup_cdc_slot + mock_setup_cdc_resources.side_effect = setup_cdc_slot response = self.client.post( f"/api/environments/{self.team.pk}/external_data_sources/", @@ -2450,24 +2448,24 @@ def setup_cdc_slot(_source_impl, _source_config, source_model, _payload): assert mock_get_primary_key_columns.call_args.args[1] == "analytics" assert mock_get_primary_key_columns.call_args.args[2] == ["events"] - mock_add_table_to_cdc_publication.assert_called_once() - assert mock_add_table_to_cdc_publication.call_args.args[1:] == ("test_pub", "analytics", "events") + # The adapter reads the publication name from config itself, so the call is just + # (source, schema, table). The first arg is the source model. + mock_add_table.assert_called_once() + assert mock_add_table.call_args.args[1:] == ("analytics", "events") @patch("products.data_warehouse.backend.api.external_data_source.is_cdc_enabled_for_team", return_value=True) - @patch( - "products.data_warehouse.backend.api.external_data_source.ExternalDataSourceViewSet._add_table_to_cdc_publication" - ) - @patch("products.data_warehouse.backend.api.external_data_source.ExternalDataSourceViewSet._setup_cdc_slot") - @patch("posthog.temporal.data_imports.sources.postgres.postgres.get_primary_key_columns") - @patch("posthog.temporal.data_imports.sources.postgres.cdc.slot_manager.cdc_pg_connection") + @patch("posthog.temporal.data_imports.sources.postgres.cdc.adapter.PostgresCDCAdapter.add_table") + @patch("products.data_warehouse.backend.api.external_data_source.ExternalDataSourceViewSet._setup_cdc_resources") + @patch("products.data_warehouse.backend.api.external_data_source.get_primary_key_columns") + @patch("products.data_warehouse.backend.api.external_data_source.cdc_pg_connection") @patch("products.data_warehouse.backend.api.external_data_source.SourceRegistry.get_source") def test_create_postgres_cdc_rejects_table_without_primary_key( self, mock_get_source, mock_cdc_pg_connection, mock_get_primary_key_columns, - mock_setup_cdc_slot, - mock_add_table_to_cdc_publication, + mock_setup_cdc_resources, + mock_add_table, _mock_is_cdc_enabled_for_team, ): # CDC (logical replication) cannot identify rows on UPDATE/DELETE without a primary key. @@ -2533,20 +2531,20 @@ def test_create_postgres_cdc_rejects_table_without_primary_key( assert ExternalDataSource.objects.filter(team_id=self.team.pk).count() == 0 # CDC slot setup must not run when validation rejects — otherwise we'd leave a # replication slot + publication on the source for a config we're about to refuse. - mock_setup_cdc_slot.assert_not_called() - mock_add_table_to_cdc_publication.assert_not_called() + mock_setup_cdc_resources.assert_not_called() + mock_add_table.assert_not_called() @patch("products.data_warehouse.backend.api.external_data_source.is_cdc_enabled_for_team", return_value=True) - @patch("products.data_warehouse.backend.api.external_data_source.ExternalDataSourceViewSet._setup_cdc_slot") + @patch("products.data_warehouse.backend.api.external_data_source.ExternalDataSourceViewSet._setup_cdc_resources") @patch("products.data_warehouse.backend.api.external_data_source.SourceRegistry.get_source") def test_create_rejects_cdc_schemas_when_source_cdc_disabled( self, mock_get_source, - mock_setup_cdc_slot, + mock_setup_cdc_resources, _mock_is_cdc_enabled_for_team, ): # If the user never toggled CDC on at the source-setup step, `payload.cdc_enabled` is - # False and `_setup_cdc_slot` never runs — so no replication slot/publication exists + # False and `_setup_cdc_resources` never runs — so no replication slot/publication exists # on the source. Accepting per-schema `sync_type=cdc` in that state would persist # broken configs (no slot, empty PKs, snapshot→streaming flip leaving everything # Failed). Backend must reject up front. @@ -2601,7 +2599,7 @@ def test_create_rejects_cdc_schemas_when_source_cdc_disabled( assert "cdc must be enabled" in response.json()["message"].lower() assert "borrower" in response.json()["message"] assert ExternalDataSource.objects.filter(team_id=self.team.pk).count() == 0 - mock_setup_cdc_slot.assert_not_called() + mock_setup_cdc_resources.assert_not_called() @parameterized.expand( [ @@ -7025,3 +7023,947 @@ def test_create_rejects_invalid_input(self, _name: str, data: dict) -> None: format="json", ) assert response.status_code == 400 + + +def _make_postgres_source( + team_id: int, + user, + *, + cdc_enabled: bool = False, + management_mode: str = "posthog", + slot_name: str = "posthog_slot", + pub_name: str = "posthog_pub", + extra_job_inputs: dict | None = None, +) -> ExternalDataSource: + job_inputs: dict[str, t.Any] = { + "host": "localhost", + "port": 5432, + "database": "app", + "user": "user", + "password": "pass", + "schema": "public", + } + if cdc_enabled: + job_inputs.update( + { + "cdc_enabled": True, + "cdc_management_mode": management_mode, + "cdc_slot_name": slot_name, + "cdc_publication_name": pub_name, + "cdc_auto_drop_slot": True, + "cdc_lag_warning_threshold_mb": 1024, + "cdc_lag_critical_threshold_mb": 10240, + "cdc_consistent_point": "0/12345", + } + ) + if extra_job_inputs: + job_inputs.update(extra_job_inputs) + return ExternalDataSource.objects.create( + team_id=team_id, + source_id=str(uuid.uuid4()), + connection_id=str(uuid.uuid4()), + destination_id=str(uuid.uuid4()), + source_type="Postgres", + created_by=user, + prefix="pg_", + job_inputs=job_inputs, + ) + + +class TestCheckCDCPrerequisitesForSource(APIBaseTest): + def test_rejects_source_type_without_cdc_support(self) -> None: + source = ExternalDataSource.objects.create( + team_id=self.team.pk, + source_id=str(uuid.uuid4()), + connection_id=str(uuid.uuid4()), + destination_id=str(uuid.uuid4()), + source_type="Stripe", + created_by=self.user, + job_inputs={"stripe_secret_key": "sk_test_123"}, + ) + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/check_cdc_prerequisites_for_source/", + data={"cdc_management_mode": "posthog"}, + format="json", + ) + assert response.status_code == 400 + assert "CDC is not supported" in response.json()["message"] + + def test_rejects_invalid_management_mode(self) -> None: + source = _make_postgres_source(self.team.pk, self.user) + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/check_cdc_prerequisites_for_source/", + data={"cdc_management_mode": "nonsense"}, + format="json", + ) + assert response.status_code == 400 + assert "cdc_management_mode" in response.json()["message"] + + @patch( + "posthog.temporal.data_imports.sources.postgres.cdc.adapter.PostgresCDCAdapter.validate_prerequisites", + return_value=[], + ) + def test_uses_stored_credentials_not_client_payload(self, mock_validate) -> None: + # The whole point of this endpoint: the client never sends the password (it's + # stripped from API responses), so prereqs must validate against the stored source. + source = _make_postgres_source(self.team.pk, self.user) + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/check_cdc_prerequisites_for_source/", + data={"cdc_management_mode": "posthog"}, + format="json", + ) + assert response.status_code == 200, response.content + assert response.json() == {"valid": True, "errors": []} + # The adapter was handed the stored source model — not a client-supplied config dict. + called_source = mock_validate.call_args.args[0] + assert called_source.pk == source.pk + + @patch( + "posthog.temporal.data_imports.sources.postgres.cdc.adapter.PostgresCDCAdapter.validate_prerequisites", + return_value=["wal_level must be 'logical'"], + ) + def test_returns_errors_when_prereqs_fail(self, _mock_validate) -> None: + source = _make_postgres_source(self.team.pk, self.user) + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/check_cdc_prerequisites_for_source/", + data={"cdc_management_mode": "posthog"}, + format="json", + ) + assert response.status_code == 200, response.content + body = response.json() + assert body["valid"] is False + assert body["errors"] == ["wal_level must be 'logical'"] + + @patch( + "posthog.temporal.data_imports.sources.postgres.cdc.adapter.PostgresCDCAdapter.validate_prerequisites", + return_value=[], + ) + def test_forwards_self_managed_publication_name(self, mock_validate) -> None: + source = _make_postgres_source(self.team.pk, self.user) + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/check_cdc_prerequisites_for_source/", + data={"cdc_management_mode": "self_managed", "cdc_publication_name": "customer_pub"}, + format="json", + ) + assert response.status_code == 200, response.content + assert mock_validate.call_args.kwargs["publication_name"] == "customer_pub" + assert mock_validate.call_args.kwargs["management_mode"] == "self_managed" + + @patch( + "posthog.temporal.data_imports.sources.postgres.cdc.adapter.PostgresCDCAdapter.validate_prerequisites", + side_effect=psycopg.OperationalError("connection refused"), + ) + def test_returns_400_on_connection_exception(self, _mock_validate) -> None: + source = _make_postgres_source(self.team.pk, self.user) + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/check_cdc_prerequisites_for_source/", + data={"cdc_management_mode": "posthog"}, + format="json", + ) + assert response.status_code == 400 + assert "Could not connect to source" in response.json()["message"] + + +class TestEnableCDC(APIBaseTest): + @patch("products.data_warehouse.backend.api.external_data_source.is_cdc_enabled_for_team", return_value=True) + def test_enable_cdc_rejects_source_type_without_cdc_support(self, _flag) -> None: + # Stripe has no CDC adapter — the viewset must surface that as a 400, not crash. + source = ExternalDataSource.objects.create( + team_id=self.team.pk, + source_id=str(uuid.uuid4()), + connection_id=str(uuid.uuid4()), + destination_id=str(uuid.uuid4()), + source_type="Stripe", + created_by=self.user, + job_inputs={"stripe_secret_key": "sk_test_123"}, + ) + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/enable_cdc/", + data={"cdc_management_mode": "posthog"}, + format="json", + ) + assert response.status_code == 400 + assert "CDC is not supported" in response.json()["message"] + + @patch("products.data_warehouse.backend.api.external_data_source.is_cdc_enabled_for_team", return_value=False) + def test_enable_cdc_rejects_when_team_flag_off(self, _flag) -> None: + source = _make_postgres_source(self.team.pk, self.user) + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/enable_cdc/", + data={"cdc_management_mode": "posthog"}, + format="json", + ) + assert response.status_code == 403 + + @patch("products.data_warehouse.backend.api.external_data_source.is_cdc_enabled_for_team", return_value=True) + def test_enable_cdc_rejects_when_already_enabled(self, _flag) -> None: + source = _make_postgres_source(self.team.pk, self.user, cdc_enabled=True) + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/enable_cdc/", + data={"cdc_management_mode": "posthog"}, + format="json", + ) + assert response.status_code == 409 + + @parameterized.expand( + [ + ("blank", ""), + ("invalid", "garbage_mode"), + ("none", None), + ] + ) + @patch("products.data_warehouse.backend.api.external_data_source.is_cdc_enabled_for_team", return_value=True) + def test_enable_cdc_rejects_invalid_management_mode(self, _name: str, mode_value, _flag) -> None: + source = _make_postgres_source(self.team.pk, self.user) + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/enable_cdc/", + data={"cdc_management_mode": mode_value}, + format="json", + ) + assert response.status_code == 400 + assert "cdc_management_mode" in response.json()["message"] + + @patch("products.data_warehouse.backend.api.external_data_source.is_cdc_enabled_for_team", return_value=True) + @patch( + "posthog.temporal.data_imports.sources.postgres.cdc.adapter.PostgresCDCAdapter.validate_prerequisites", + return_value=["wal_level must be 'logical'", "Missing REPLICATION privilege"], + ) + def test_enable_cdc_returns_400_when_prereqs_fail(self, _check, _flag) -> None: + source = _make_postgres_source(self.team.pk, self.user) + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/enable_cdc/", + data={"cdc_management_mode": "posthog"}, + format="json", + ) + assert response.status_code == 400 + body = response.json() + assert body["message"] == "CDC prerequisites not met." + assert body["errors"] == ["wal_level must be 'logical'", "Missing REPLICATION privilege"] + source.refresh_from_db() + assert (source.job_inputs or {}).get("cdc_enabled") is not True + + @patch("products.data_warehouse.backend.api.external_data_source.is_cdc_enabled_for_team", return_value=True) + @patch( + "posthog.temporal.data_imports.sources.postgres.cdc.adapter.PostgresCDCAdapter.validate_prerequisites", + side_effect=psycopg.OperationalError("connection refused"), + ) + def test_enable_cdc_returns_400_on_prereq_check_exception(self, _check, _flag) -> None: + source = _make_postgres_source(self.team.pk, self.user) + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/enable_cdc/", + data={"cdc_management_mode": "posthog"}, + format="json", + ) + assert response.status_code == 400 + assert "Could not connect to source" in response.json()["message"] + + @patch("products.data_warehouse.backend.api.external_data_source.is_cdc_enabled_for_team", return_value=True) + @patch( + "posthog.temporal.data_imports.sources.postgres.cdc.adapter.PostgresCDCAdapter.validate_prerequisites", + return_value=[], + ) + @patch("products.data_warehouse.backend.api.external_data_source.ExternalDataSourceViewSet._setup_cdc_resources") + @patch("products.data_warehouse.backend.api.external_data_source.sync_cdc_extraction_schedule") + @patch("products.data_warehouse.backend.api.external_data_source.ensure_cdc_slot_cleanup_schedule") + def test_enable_cdc_posthog_managed_success( + self, + mock_ensure_cleanup, + mock_sync_extraction, + mock_setup_cdc_resources, + _check, + _flag, + ) -> None: + source = _make_postgres_source(self.team.pk, self.user) + + def setup_cdc_slot(_adapter, source_model, payload): + job_inputs = dict(source_model.job_inputs or {}) + job_inputs.update( + { + "cdc_enabled": True, + "cdc_management_mode": payload.get("cdc_management_mode", "posthog"), + "cdc_slot_name": payload.get("cdc_slot_name") or f"posthog_{source_model.id.hex[:12]}", + "cdc_publication_name": ( + payload.get("cdc_publication_name") or f"posthog_pub_{source_model.id.hex[:12]}" + ), + "cdc_auto_drop_slot": payload.get("cdc_auto_drop_slot", True), + "cdc_lag_warning_threshold_mb": payload.get("cdc_lag_warning_threshold_mb", 1024), + "cdc_lag_critical_threshold_mb": payload.get("cdc_lag_critical_threshold_mb", 10240), + "cdc_consistent_point": "0/ABCDEF", + } + ) + source_model.job_inputs = job_inputs + source_model.save(update_fields=["job_inputs", "updated_at"]) + return None + + mock_setup_cdc_resources.side_effect = setup_cdc_slot + + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/enable_cdc/", + data={ + "cdc_management_mode": "posthog", + "cdc_auto_drop_slot": False, + "cdc_lag_warning_threshold_mb": 512, + "cdc_lag_critical_threshold_mb": 4096, + }, + format="json", + ) + + assert response.status_code == 200, response.content + assert response.json() == {"success": True, "schedules_ready": True} + + source.refresh_from_db() + ji = source.job_inputs or {} + # `EncryptedJSONField` round-trips scalar values as strings. + assert ji["cdc_enabled"] == "True" + assert ji["cdc_management_mode"] == "posthog" + assert ji["cdc_auto_drop_slot"] == "False" + assert int(ji["cdc_lag_warning_threshold_mb"]) == 512 + assert int(ji["cdc_lag_critical_threshold_mb"]) == 4096 + assert ji["cdc_consistent_point"] == "0/ABCDEF" + + mock_sync_extraction.assert_called_once() + mock_ensure_cleanup.assert_called_once() + + @patch("products.data_warehouse.backend.api.external_data_source.is_cdc_enabled_for_team", return_value=True) + @patch( + "posthog.temporal.data_imports.sources.postgres.cdc.adapter.PostgresCDCAdapter.validate_prerequisites", + return_value=[], + ) + @patch("products.data_warehouse.backend.api.external_data_source.ExternalDataSourceViewSet._setup_cdc_resources") + @patch("products.data_warehouse.backend.api.external_data_source.sync_cdc_extraction_schedule") + @patch("products.data_warehouse.backend.api.external_data_source.ensure_cdc_slot_cleanup_schedule") + def test_enable_cdc_self_managed_passes_publication_name( + self, + _mock_ensure_cleanup, + _mock_sync_extraction, + mock_setup_cdc_resources, + mock_check, + _flag, + ) -> None: + source = _make_postgres_source(self.team.pk, self.user) + + def setup_cdc_slot(_adapter, source_model, payload): + job_inputs = dict(source_model.job_inputs or {}) + job_inputs.update( + { + "cdc_enabled": True, + "cdc_management_mode": "self_managed", + "cdc_slot_name": f"posthog_{source_model.id.hex[:12]}", + "cdc_publication_name": payload.get("cdc_publication_name") or "posthog_pub", + "cdc_consistent_point": "0/DEADBEEF", + } + ) + source_model.job_inputs = job_inputs + source_model.save(update_fields=["job_inputs", "updated_at"]) + return None + + mock_setup_cdc_resources.side_effect = setup_cdc_slot + + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/enable_cdc/", + data={ + "cdc_management_mode": "self_managed", + "cdc_publication_name": "customer_pub", + }, + format="json", + ) + + assert response.status_code == 200, response.content + # Prereq check forwarded the publication name we received. + assert mock_check.call_args.kwargs["publication_name"] == "customer_pub" + + source.refresh_from_db() + ji = source.job_inputs or {} + assert ji["cdc_management_mode"] == "self_managed" + assert ji["cdc_publication_name"] == "customer_pub" + + @patch("products.data_warehouse.backend.api.external_data_source.is_cdc_enabled_for_team", return_value=True) + @patch( + "posthog.temporal.data_imports.sources.postgres.cdc.adapter.PostgresCDCAdapter.validate_prerequisites", + return_value=[], + ) + @patch("products.data_warehouse.backend.api.external_data_source.ExternalDataSourceViewSet._setup_cdc_resources") + def test_enable_cdc_returns_400_when_slot_setup_fails(self, mock_setup_cdc_resources, _check, _flag) -> None: + source = _make_postgres_source(self.team.pk, self.user) + mock_setup_cdc_resources.return_value = "Failed to create replication slot: connection lost" + + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/enable_cdc/", + data={"cdc_management_mode": "posthog"}, + format="json", + ) + assert response.status_code == 400 + assert "connection lost" in response.json()["message"] + + # Source must NOT be deleted (this is not the create path). + source.refresh_from_db() + assert source.deleted is False + assert (source.job_inputs or {}).get("cdc_enabled") is not True + + @patch("products.data_warehouse.backend.api.external_data_source.is_cdc_enabled_for_team", return_value=True) + @patch( + "posthog.temporal.data_imports.sources.postgres.cdc.adapter.PostgresCDCAdapter.validate_prerequisites", + return_value=[], + ) + @patch("posthog.temporal.data_imports.sources.postgres.cdc.adapter.publication_exists", return_value=False) + @patch("posthog.temporal.data_imports.sources.postgres.cdc.adapter.slot_exists", return_value=False) + @patch("posthog.temporal.data_imports.sources.postgres.cdc.adapter.drop_slot_and_publication") + @patch("posthog.temporal.data_imports.sources.postgres.cdc.adapter.create_slot_and_publication") + @patch("posthog.temporal.data_imports.sources.postgres.cdc.adapter.cdc_pg_connection") + def test_enable_cdc_posthog_rolls_back_partial_slot_on_failure( + self, + mock_cdc_pg_connection, + mock_create_slot_and_publication, + mock_drop_slot_and_publication, + _mock_slot_exists, + _mock_publication_exists, + _check, + _flag, + ) -> None: + source = _make_postgres_source(self.team.pk, self.user) + + # Fake a no-op connection context — actual slot calls are themselves mocked. + mock_cdc_pg_connection.return_value.__enter__.return_value = object() + mock_cdc_pg_connection.return_value.__exit__.return_value = None + + mock_create_slot_and_publication.side_effect = RuntimeError("max_replication_slots reached") + + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/enable_cdc/", + data={ + "cdc_management_mode": "posthog", + "cdc_slot_name": "leaky_slot", + "cdc_publication_name": "leaky_pub", + }, + format="json", + ) + + assert response.status_code == 400 + assert "max_replication_slots reached" in response.json()["message"] + + # Best-effort cleanup must run with the exact slot + publication we tried to create. + mock_drop_slot_and_publication.assert_called_once() + call = mock_drop_slot_and_publication.call_args + assert call.args[1] == "leaky_slot" + assert call.args[2] == "leaky_pub" + + # Source's job_inputs must NOT have been persisted with cdc_enabled — we never reached save(). + source.refresh_from_db() + assert (source.job_inputs or {}).get("cdc_enabled") is not True + assert source.deleted is False + + @patch("products.data_warehouse.backend.api.external_data_source.is_cdc_enabled_for_team", return_value=True) + @patch( + "posthog.temporal.data_imports.sources.postgres.cdc.adapter.PostgresCDCAdapter.validate_prerequisites", + return_value=[], + ) + @patch("posthog.temporal.data_imports.sources.postgres.cdc.adapter.drop_slot_and_publication") + @patch("posthog.temporal.data_imports.sources.postgres.cdc.adapter.drop_slot") + @patch("posthog.temporal.data_imports.sources.postgres.cdc.adapter.create_slot") + @patch("posthog.temporal.data_imports.sources.postgres.cdc.adapter.slot_exists", return_value=False) + @patch("posthog.temporal.data_imports.sources.postgres.cdc.adapter.publication_exists", return_value=True) + @patch("posthog.temporal.data_imports.sources.postgres.cdc.adapter.cdc_pg_connection") + def test_enable_cdc_self_managed_rolls_back_slot_only_on_failure( + self, + mock_cdc_pg_connection, + _mock_publication_exists, + _mock_slot_exists, + mock_create_slot, + mock_drop_slot, + mock_drop_slot_and_publication, + _check, + _flag, + ) -> None: + source = _make_postgres_source(self.team.pk, self.user) + + mock_cdc_pg_connection.return_value.__enter__.return_value = object() + mock_cdc_pg_connection.return_value.__exit__.return_value = None + + mock_create_slot.side_effect = RuntimeError("replication permission denied") + + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/enable_cdc/", + data={ + "cdc_management_mode": "self_managed", + "cdc_slot_name": "self_slot", + "cdc_publication_name": "customer_owned_pub", + }, + format="json", + ) + + assert response.status_code == 400 + assert "replication permission denied" in response.json()["message"] + + # In self-managed mode the publication is customer-owned — only drop the slot. + mock_drop_slot.assert_called_once() + assert mock_drop_slot.call_args.args[1] == "self_slot" + mock_drop_slot_and_publication.assert_not_called() + + source.refresh_from_db() + assert (source.job_inputs or {}).get("cdc_enabled") is not True + assert source.deleted is False + + +class TestDisableCDC(APIBaseTest): + def test_disable_cdc_rejects_source_type_without_cdc_support(self) -> None: + # Stripe has no CDC adapter — the viewset must surface that as a 400, not crash. + source = ExternalDataSource.objects.create( + team_id=self.team.pk, + source_id=str(uuid.uuid4()), + connection_id=str(uuid.uuid4()), + destination_id=str(uuid.uuid4()), + source_type="Stripe", + created_by=self.user, + job_inputs={"stripe_secret_key": "sk_test_123"}, + ) + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/disable_cdc/", + ) + assert response.status_code == 400 + assert "CDC is not supported" in response.json()["message"] + + def test_disable_cdc_noops_when_cdc_not_enabled(self) -> None: + source = _make_postgres_source(self.team.pk, self.user) + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/disable_cdc/", + ) + assert response.status_code == 200 + body = response.json() + assert body["success"] is True + assert body.get("already_disabled") is True + + @patch( + "posthog.temporal.data_imports.sources.postgres.cdc.adapter.PostgresCDCAdapter.cleanup_resources", + return_value=None, + ) + def test_disable_cdc_clears_cdc_keys_and_pauses_schemas(self, _cleanup) -> None: + source = _make_postgres_source(self.team.pk, self.user, cdc_enabled=True) + + cdc_schema = ExternalDataSchema.objects.create( + name="cdc_table", + team_id=self.team.pk, + source_id=source.pk, + sync_type=ExternalDataSchema.SyncType.CDC, + should_sync=True, + ) + non_cdc_schema = ExternalDataSchema.objects.create( + name="incremental_table", + team_id=self.team.pk, + source_id=source.pk, + sync_type=ExternalDataSchema.SyncType.INCREMENTAL, + should_sync=True, + ) + + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/disable_cdc/", + ) + assert response.status_code == 200, response.content + + source.refresh_from_db() + ji = source.job_inputs or {} + # Every cdc_* key must be gone — leaving a stale consistent_point behind + # would corrupt LSN tracking on re-enable. + assert not any(k.startswith("cdc_") for k in ji.keys()) + # Non-CDC connection fields are preserved. + assert ji["host"] == "localhost" + assert ji["user"] == "user" + + cdc_schema.refresh_from_db() + assert cdc_schema.sync_type is None + assert cdc_schema.should_sync is False + + # Non-CDC schema must NOT be touched. + non_cdc_schema.refresh_from_db() + assert non_cdc_schema.sync_type == ExternalDataSchema.SyncType.INCREMENTAL + assert non_cdc_schema.should_sync is True + + @patch( + "posthog.temporal.data_imports.sources.postgres.cdc.adapter.PostgresCDCAdapter.cleanup_resources", + return_value=None, + ) + def test_disable_cdc_soft_deletes_companion_cdc_tables(self, _cleanup) -> None: + source = _make_postgres_source(self.team.pk, self.user, cdc_enabled=True) + + cdc_companion = DataWarehouseTable.objects.create( + team_id=self.team.pk, + name="pg_orders_cdc", + external_data_source_id=source.pk, + format=DataWarehouseTable.TableFormat.DeltaS3Wrapper, + url_pattern="s3://bucket/orders_cdc", + ) + main_table = DataWarehouseTable.objects.create( + team_id=self.team.pk, + name="pg_orders", + external_data_source_id=source.pk, + format=DataWarehouseTable.TableFormat.DeltaS3Wrapper, + url_pattern="s3://bucket/orders", + ) + + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/disable_cdc/", + ) + assert response.status_code == 200, response.content + + cdc_companion.refresh_from_db() + assert cdc_companion.deleted is True + + # Non-_cdc-suffixed table must NOT be soft-deleted by disable_cdc — + # the user might still pick a new sync strategy and reuse it. + main_table.refresh_from_db() + assert main_table.deleted is False + + @patch( + "posthog.temporal.data_imports.sources.postgres.cdc.adapter.PostgresCDCAdapter.cleanup_resources", + return_value=None, + ) + @patch("products.data_warehouse.backend.api.external_data_source.cancel_external_data_workflow") + def test_disable_cdc_cancels_running_workflow(self, mock_cancel, _cleanup) -> None: + source = _make_postgres_source(self.team.pk, self.user, cdc_enabled=True) + cdc_schema = ExternalDataSchema.objects.create( + name="cdc_table", + team_id=self.team.pk, + source_id=source.pk, + sync_type=ExternalDataSchema.SyncType.CDC, + should_sync=True, + ) + running_job = ExternalDataJob.objects.create( + pipeline_id=source.pk, + team_id=self.team.pk, + schema=cdc_schema, + workflow_id="cdc-extraction-running-workflow", + status="Running", + rows_synced=0, + ) + + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/disable_cdc/", + ) + assert response.status_code == 200, response.content + mock_cancel.assert_called_once_with(running_job.workflow_id) + + @patch( + "posthog.temporal.data_imports.sources.postgres.cdc.adapter.PostgresCDCAdapter.cleanup_resources", + return_value=None, + ) + @patch("products.data_warehouse.backend.api.external_data_source.cancel_external_data_workflow") + def test_disable_cdc_does_not_cancel_non_cdc_running_jobs(self, mock_cancel, _cleanup) -> None: + # A running incremental sync on the same source must NOT be cancelled by disable_cdc. + source = _make_postgres_source(self.team.pk, self.user, cdc_enabled=True) + incremental_schema = ExternalDataSchema.objects.create( + name="incremental_table", + team_id=self.team.pk, + source_id=source.pk, + sync_type=ExternalDataSchema.SyncType.INCREMENTAL, + should_sync=True, + ) + ExternalDataJob.objects.create( + pipeline_id=source.pk, + team_id=self.team.pk, + schema=incremental_schema, + workflow_id="incremental-running-workflow", + status="Running", + rows_synced=0, + ) + + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/disable_cdc/", + ) + assert response.status_code == 200, response.content + mock_cancel.assert_not_called() + + @patch( + "posthog.temporal.data_imports.sources.postgres.cdc.adapter.PostgresCDCAdapter.cleanup_resources", + return_value=None, + ) + @patch("products.data_warehouse.backend.api.external_data_source.cancel_external_data_workflow") + def test_disable_cdc_does_not_cancel_non_running_workflow(self, mock_cancel, _cleanup) -> None: + source = _make_postgres_source(self.team.pk, self.user, cdc_enabled=True) + ExternalDataJob.objects.create( + pipeline_id=source.pk, + team_id=self.team.pk, + workflow_id="cdc-extraction-completed-workflow", + status="Completed", + rows_synced=10, + ) + + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/disable_cdc/", + ) + assert response.status_code == 200, response.content + mock_cancel.assert_not_called() + + @patch( + "posthog.temporal.data_imports.sources.postgres.cdc.adapter.PostgresCDCAdapter.cleanup_resources", + side_effect=RuntimeError("slot drop network blip"), + ) + def test_disable_cdc_succeeds_even_if_external_cleanup_fails(self, _cleanup) -> None: + source = _make_postgres_source(self.team.pk, self.user, cdc_enabled=True) + cdc_schema = ExternalDataSchema.objects.create( + name="cdc_table", + team_id=self.team.pk, + source_id=source.pk, + sync_type=ExternalDataSchema.SyncType.CDC, + should_sync=True, + ) + + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/disable_cdc/", + ) + # The user's intent to disable CDC must be honored even if the customer's DB + # is briefly unreachable for the slot drop — we still clear local state. + assert response.status_code == 200, response.content + + source.refresh_from_db() + ji = source.job_inputs or {} + assert "cdc_enabled" not in ji + + cdc_schema.refresh_from_db() + assert cdc_schema.sync_type is None + assert cdc_schema.should_sync is False + + @patch( + "posthog.temporal.data_imports.sources.postgres.cdc.adapter.PostgresCDCAdapter.cleanup_resources", + return_value=None, + ) + def test_disable_cdc_calls_source_cleanup_helper(self, mock_cleanup) -> None: + source = _make_postgres_source(self.team.pk, self.user, cdc_enabled=True) + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/disable_cdc/", + ) + assert response.status_code == 200, response.content + + # Helper called with the source model (drops schedule + slot + publication). + mock_cleanup.assert_called_once() + called_with = mock_cleanup.call_args.args[0] + assert called_with.pk == source.pk + + +class TestUpdateCDCSettings(APIBaseTest): + def test_update_cdc_settings_rejects_source_type_without_cdc_support(self) -> None: + # Stripe has no CDC adapter — the viewset must surface that as a 400, not crash. + source = ExternalDataSource.objects.create( + team_id=self.team.pk, + source_id=str(uuid.uuid4()), + connection_id=str(uuid.uuid4()), + destination_id=str(uuid.uuid4()), + source_type="Stripe", + created_by=self.user, + job_inputs={"stripe_secret_key": "sk_test_123"}, + ) + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/update_cdc_settings/", + data={"cdc_lag_warning_threshold_mb": 100}, + format="json", + ) + assert response.status_code == 400 + assert "CDC is not supported" in response.json()["message"] + + def test_update_cdc_settings_rejects_when_cdc_not_enabled(self) -> None: + source = _make_postgres_source(self.team.pk, self.user) + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/update_cdc_settings/", + data={"cdc_lag_warning_threshold_mb": 100}, + format="json", + ) + assert response.status_code == 400 + assert "CDC is not enabled" in response.json()["message"] + + @parameterized.expand( + [ + ("non_numeric", "fast"), + ("negative", -10), + ("zero", 0), + ] + ) + def test_update_cdc_settings_rejects_invalid_thresholds(self, _name: str, value) -> None: + source = _make_postgres_source(self.team.pk, self.user, cdc_enabled=True) + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/update_cdc_settings/", + data={"cdc_lag_warning_threshold_mb": value}, + format="json", + ) + assert response.status_code == 400 + + def test_update_cdc_settings_rejects_warn_not_less_than_crit(self) -> None: + source = _make_postgres_source(self.team.pk, self.user, cdc_enabled=True) + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/update_cdc_settings/", + data={ + "cdc_lag_warning_threshold_mb": 5000, + "cdc_lag_critical_threshold_mb": 5000, + }, + format="json", + ) + assert response.status_code == 400 + assert "less than" in response.json()["message"] + + def test_update_cdc_settings_validates_warn_vs_existing_crit(self) -> None: + # If only `warning` is sent, we must still compare against the persisted critical value. + source = _make_postgres_source(self.team.pk, self.user, cdc_enabled=True) + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/update_cdc_settings/", + data={"cdc_lag_warning_threshold_mb": 99999}, + format="json", + ) + assert response.status_code == 400 + assert "less than" in response.json()["message"] + + def test_update_cdc_settings_partial_update_preserves_other_fields(self) -> None: + source = _make_postgres_source(self.team.pk, self.user, cdc_enabled=True) + original_slot = source.job_inputs["cdc_slot_name"] + original_pub = source.job_inputs["cdc_publication_name"] + original_mode = source.job_inputs["cdc_management_mode"] + + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/update_cdc_settings/", + data={"cdc_auto_drop_slot": False}, + format="json", + ) + assert response.status_code == 200, response.content + + source.refresh_from_db() + ji = source.job_inputs + # `EncryptedJSONField` round-trips scalar values as strings. + assert ji["cdc_auto_drop_slot"] == "False" + # Untouched fields preserved. + assert ji["cdc_slot_name"] == original_slot + assert ji["cdc_publication_name"] == original_pub + assert ji["cdc_management_mode"] == original_mode + # Thresholds preserved at defaults. + assert int(ji["cdc_lag_warning_threshold_mb"]) == 1024 + assert int(ji["cdc_lag_critical_threshold_mb"]) == 10240 + + def test_update_cdc_settings_empty_payload_is_unchanged(self) -> None: + source = _make_postgres_source(self.team.pk, self.user, cdc_enabled=True) + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/update_cdc_settings/", + data={}, + format="json", + ) + assert response.status_code == 200, response.content + body = response.json() + assert body["success"] is True + assert body.get("unchanged") is True + + def test_update_cdc_settings_updates_all_tunable_fields(self) -> None: + source = _make_postgres_source(self.team.pk, self.user, cdc_enabled=True) + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/update_cdc_settings/", + data={ + "cdc_auto_drop_slot": False, + "cdc_lag_warning_threshold_mb": 256, + "cdc_lag_critical_threshold_mb": 2048, + }, + format="json", + ) + assert response.status_code == 200, response.content + + source.refresh_from_db() + ji = source.job_inputs + # `EncryptedJSONField` round-trips scalar values as strings. + assert ji["cdc_auto_drop_slot"] == "False" + assert int(ji["cdc_lag_warning_threshold_mb"]) == 256 + assert int(ji["cdc_lag_critical_threshold_mb"]) == 2048 + + def test_update_cdc_settings_coerces_bool_truthiness(self) -> None: + source = _make_postgres_source(self.team.pk, self.user, cdc_enabled=True) + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/update_cdc_settings/", + data={"cdc_auto_drop_slot": True}, + format="json", + ) + assert response.status_code == 200, response.content + + source.refresh_from_db() + # `EncryptedJSONField` round-trips scalar values as strings. + assert source.job_inputs["cdc_auto_drop_slot"] == "True" + + +class TestCDCJobInputsExposure(APIBaseTest): + def test_retrieve_exposes_cdc_fields_but_not_password(self) -> None: + # cdc_* keys aren't source-config form fields; without the explicit allowlist they'd be + # stripped from reads as "unknown" and the Configuration page would never see CDC as on. + source = _make_postgres_source(self.team.pk, self.user, cdc_enabled=True) + + response = self.client.get(f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/") + assert response.status_code == 200, response.content + + job_inputs = response.json()["job_inputs"] + assert job_inputs["cdc_enabled"] == "True" + assert job_inputs["cdc_management_mode"] == "posthog" + assert job_inputs["cdc_slot_name"] == "posthog_slot" + assert job_inputs["cdc_publication_name"] == "posthog_pub" + assert "cdc_lag_warning_threshold_mb" in job_inputs + assert "cdc_lag_critical_threshold_mb" in job_inputs + # Secret connection field must still be stripped. + assert "password" not in job_inputs + + def test_retrieve_omits_cdc_fields_when_not_enabled(self) -> None: + source = _make_postgres_source(self.team.pk, self.user) + response = self.client.get(f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/") + assert response.status_code == 200, response.content + job_inputs = response.json()["job_inputs"] + assert not any(k.startswith("cdc_") for k in job_inputs) + + +class TestCDCStatus(APIBaseTest): + def test_rejects_source_type_without_cdc_support(self) -> None: + source = ExternalDataSource.objects.create( + team_id=self.team.pk, + source_id=str(uuid.uuid4()), + connection_id=str(uuid.uuid4()), + destination_id=str(uuid.uuid4()), + source_type="Stripe", + created_by=self.user, + job_inputs={"stripe_secret_key": "sk_test_123"}, + ) + response = self.client.get(f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/cdc_status/") + assert response.status_code == 400 + assert "CDC is not supported" in response.json()["message"] + + def test_returns_disabled_when_cdc_off(self) -> None: + source = _make_postgres_source(self.team.pk, self.user) + response = self.client.get(f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/cdc_status/") + assert response.status_code == 200, response.content + assert response.json() == {"enabled": False} + + @patch( + "posthog.temporal.data_imports.sources.postgres.cdc.adapter.PostgresCDCAdapter.get_status", + return_value={"slot_exists": True, "publication_exists": True, "lag_bytes": 2048}, + ) + def test_returns_live_status_when_enabled(self, mock_get_status) -> None: + source = _make_postgres_source(self.team.pk, self.user, cdc_enabled=True) + response = self.client.get(f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/cdc_status/") + assert response.status_code == 200, response.content + body = response.json() + assert body["enabled"] is True + assert body["management_mode"] == "posthog" + assert body["slot_name"] == "posthog_slot" + assert body["publication_name"] == "posthog_pub" + assert body["slot_exists"] is True + assert body["publication_exists"] is True + assert body["lag_bytes"] == 2048 + # Read against the stored source model, not a client payload. + assert mock_get_status.call_args.args[0].pk == source.pk + + @patch( + "posthog.temporal.data_imports.sources.postgres.cdc.adapter.PostgresCDCAdapter.get_status", + return_value={"slot_exists": False, "publication_exists": True, "lag_bytes": None}, + ) + def test_surfaces_missing_slot(self, _mock_get_status) -> None: + source = _make_postgres_source(self.team.pk, self.user, cdc_enabled=True) + response = self.client.get(f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/cdc_status/") + assert response.status_code == 200, response.content + body = response.json() + assert body["slot_exists"] is False + assert body["lag_bytes"] is None + + @patch( + "posthog.temporal.data_imports.sources.postgres.cdc.adapter.PostgresCDCAdapter.get_status", + side_effect=psycopg.OperationalError("connection refused"), + ) + def test_returns_400_when_source_unreachable(self, _mock_get_status) -> None: + source = _make_postgres_source(self.team.pk, self.user, cdc_enabled=True) + response = self.client.get(f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/cdc_status/") + assert response.status_code == 400 + assert "Could not connect to source" in response.json()["message"] diff --git a/products/data_warehouse/backend/api/test/test_postgres_warehouse_migration.py b/products/data_warehouse/backend/api/test/test_postgres_warehouse_migration.py index 714c1bc1cec4..291eace25eed 100644 --- a/products/data_warehouse/backend/api/test/test_postgres_warehouse_migration.py +++ b/products/data_warehouse/backend/api/test/test_postgres_warehouse_migration.py @@ -461,3 +461,90 @@ def test_clearing_postgres_schema_drops_duplicate_qualified_row(self, mock_get_s # The orphan duplicate is soft-deleted. orphan.refresh_from_db() assert orphan.deleted is True + + @patch("products.data_warehouse.backend.api.external_data_source.SourceRegistry.get_source") + def test_refresh_schemas_persists_detected_primary_key_for_cdc(self, mock_get_source): + # A table added after source creation is discovered via refresh. Its detected primary key + # must be persisted to sync_type_config.primary_key_columns so it can later be switched to + # CDC (which requires a PK) — otherwise the toggle fails with "refresh to pick one up". + mock_get_source.return_value.parse_config.return_value = None + mock_get_source.return_value.get_schemas.return_value = [ + SourceSchema( + name="public.cdc_test_orders", + supports_incremental=False, + supports_append=False, + columns=[("id", "integer", False), ("customer", "text", False)], + foreign_keys=[], + source_schema="public", + source_table_name="cdc_test_orders", + detected_primary_keys=["id"], + ), + ] + source = ExternalDataSource.objects.create( + team_id=self.team.pk, + source_id=str(uuid.uuid4()), + connection_id=str(uuid.uuid4()), + destination_id=str(uuid.uuid4()), + source_type="Postgres", + created_by=self.user, + access_method=ExternalDataSource.AccessMethod.WAREHOUSE, + job_inputs={"host": "localhost", "port": 5432, "schema": "public"}, + ) + schema = ExternalDataSchema.objects.create( + team_id=self.team.pk, + source_id=source.pk, + name="public.cdc_test_orders", + should_sync=False, + ) + + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/refresh_schemas/" + ) + assert response.status_code == status.HTTP_200_OK, response.content + + schema.refresh_from_db() + assert schema.sync_type_config.get("primary_key_columns") == ["id"] + assert schema.primary_key_columns == ["id"] + + @patch("products.data_warehouse.backend.api.external_data_source.SourceRegistry.get_source") + def test_refresh_schemas_does_not_clobber_existing_primary_key(self, mock_get_source): + # A user-set / previously-stored PK must survive refresh even if discovery detects a + # different one — the explicit choice wins. + mock_get_source.return_value.parse_config.return_value = None + mock_get_source.return_value.get_schemas.return_value = [ + SourceSchema( + name="public.orders", + supports_incremental=False, + supports_append=False, + columns=[("id", "integer", False), ("order_key", "text", False)], + foreign_keys=[], + source_schema="public", + source_table_name="orders", + detected_primary_keys=["id"], + ), + ] + source = ExternalDataSource.objects.create( + team_id=self.team.pk, + source_id=str(uuid.uuid4()), + connection_id=str(uuid.uuid4()), + destination_id=str(uuid.uuid4()), + source_type="Postgres", + created_by=self.user, + access_method=ExternalDataSource.AccessMethod.WAREHOUSE, + job_inputs={"host": "localhost", "port": 5432, "schema": "public"}, + ) + schema = ExternalDataSchema.objects.create( + team_id=self.team.pk, + source_id=source.pk, + name="public.orders", + should_sync=False, + sync_type_config={"primary_key_columns": ["order_key"]}, + ) + + response = self.client.post( + f"/api/environments/{self.team.pk}/external_data_sources/{source.pk}/refresh_schemas/" + ) + assert response.status_code == status.HTTP_200_OK, response.content + + schema.refresh_from_db() + assert schema.sync_type_config.get("primary_key_columns") == ["order_key"] diff --git a/products/data_warehouse/backend/postgres_helpers.py b/products/data_warehouse/backend/postgres_helpers.py index c5b7a7602bf6..f9d6cd3b6792 100644 --- a/products/data_warehouse/backend/postgres_helpers.py +++ b/products/data_warehouse/backend/postgres_helpers.py @@ -227,7 +227,13 @@ def reconcile_postgres_schemas( source_schema=resolved_schema, source_table_name=resolved_table, ) - matched.sync_type_config = {**(matched.sync_type_config or {}), "schema_metadata": schema_metadata} + new_sync_type_config = {**(matched.sync_type_config or {}), "schema_metadata": schema_metadata} + # Persist the detected primary key so tables discovered after source creation can be + # switched to CDC (which requires a PK). Don't clobber a value already stored — e.g. an + # explicit override set during creation or a prior refresh. + if source_schema.detected_primary_keys and not new_sync_type_config.get("primary_key_columns"): + new_sync_type_config["primary_key_columns"] = source_schema.detected_primary_keys + matched.sync_type_config = new_sync_type_config update_fields = ["sync_type_config", "updated_at"] # Drop dead columns so next sync doesn't emit `SELECT … missing_col`. diff --git a/products/data_warehouse/frontend/generated/api.ts b/products/data_warehouse/frontend/generated/api.ts index 831aacbddeab..5df5f2935cdd 100644 --- a/products/data_warehouse/frontend/generated/api.ts +++ b/products/data_warehouse/frontend/generated/api.ts @@ -749,6 +749,59 @@ export const externalDataSourcesBulkUpdateSchemasPartialUpdate = async ( ) } +export const getExternalDataSourcesCdcStatusRetrieveUrl = (projectId: string, id: string) => { + return `/api/projects/${projectId}/external_data_sources/${id}/cdc_status/` +} + +/** + * Live CDC health for an existing source: slot/publication existence and WAL lag. + +Reads from the source DB via the engine adapter. Returns ``{"enabled": false}`` +when CDC is off, or the stored config plus live ``slot_exists`` / +``publication_exists`` / ``lag_bytes`` when on. 400s if the source DB is +unreachable so the UI can show a degraded/unreachable state. + */ +export const externalDataSourcesCdcStatusRetrieve = async ( + projectId: string, + id: string, + options?: RequestInit +): Promise => { + return apiMutator(getExternalDataSourcesCdcStatusRetrieveUrl(projectId, id), { + ...options, + method: 'GET', + }) +} + +export const getExternalDataSourcesCheckCdcPrerequisitesForSourceCreateUrl = (projectId: string, id: string) => { + return `/api/projects/${projectId}/external_data_sources/${id}/check_cdc_prerequisites_for_source/` +} + +/** + * Validate CDC prerequisites for an existing source using its stored credentials. + +The detail=False ``check_cdc_prerequisites`` action is for the creation wizard, +where the client still holds the raw connection config (incl. password) in the +form. On the Configuration page the source already exists and secret fields are +stripped from API responses — so the client can't supply them. This reads the +stored (encrypted) credentials from the DB via the adapter instead. + +Body params: ``cdc_management_mode`` (``"posthog"`` | ``"self_managed"``), +``cdc_slot_name`` (optional), ``cdc_publication_name`` (optional). + */ +export const externalDataSourcesCheckCdcPrerequisitesForSourceCreate = async ( + projectId: string, + id: string, + externalDataSourceSerializersApi: NonReadonly, + options?: RequestInit +): Promise => { + return apiMutator(getExternalDataSourcesCheckCdcPrerequisitesForSourceCreateUrl(projectId, id), { + ...options, + method: 'POST', + headers: { 'Content-Type': 'application/json', ...options?.headers }, + body: JSON.stringify(externalDataSourceSerializersApi), + }) +} + export const getExternalDataSourcesCreateWebhookCreateUrl = (projectId: string, id: string) => { return `/api/projects/${projectId}/external_data_sources/${id}/create_webhook/` } @@ -791,6 +844,66 @@ export const externalDataSourcesDeleteWebhookCreate = async ( }) } +export const getExternalDataSourcesDisableCdcCreateUrl = (projectId: string, id: string) => { + return `/api/projects/${projectId}/external_data_sources/${id}/disable_cdc/` +} + +/** + * Disable CDC on an existing source. + +Cancels any running CDC extraction workflow, deletes the extraction schedule, +delegates engine-side teardown to the source's adapter (drops slot/publication +for Postgres; equivalent for other engines), clears ``cdc_*`` keys from +``job_inputs``, soft-deletes companion CDC tables, and sets all CDC schemas to +``sync_type=None``, ``should_sync=False`` so the user must pick a new sync +strategy before they resume. + */ +export const externalDataSourcesDisableCdcCreate = async ( + projectId: string, + id: string, + externalDataSourceSerializersApi: NonReadonly, + options?: RequestInit +): Promise => { + return apiMutator(getExternalDataSourcesDisableCdcCreateUrl(projectId, id), { + ...options, + method: 'POST', + headers: { 'Content-Type': 'application/json', ...options?.headers }, + body: JSON.stringify(externalDataSourceSerializersApi), + }) +} + +export const getExternalDataSourcesEnableCdcCreateUrl = (projectId: string, id: string) => { + return `/api/projects/${projectId}/external_data_sources/${id}/enable_cdc/` +} + +/** + * Enable CDC on an existing source. + +Provisions engine-side CDC resources via the source's adapter, writes the CDC +config into ``source.job_inputs``, and ensures the CDC extraction schedule +exists. Re-runs prereq checks server-side so we never trust a stale +client-side check. + +Body params: ``cdc_management_mode`` (``"posthog"`` | ``"self_managed"``), +plus engine-specific identifier hints (e.g. ``cdc_slot_name``, +``cdc_publication_name`` for Postgres). Universal tuning fields: +``cdc_auto_drop_slot`` (optional bool), ``cdc_lag_warning_threshold_mb`` +(optional int), ``cdc_lag_critical_threshold_mb`` (optional int). + */ +export const externalDataSourcesEnableCdcCreate = async ( + projectId: string, + id: string, + externalDataSourceSerializersApi: NonReadonly, + options?: RequestInit +): Promise => { + return apiMutator(getExternalDataSourcesEnableCdcCreateUrl(projectId, id), { + ...options, + method: 'POST', + headers: { 'Content-Type': 'application/json', ...options?.headers }, + body: JSON.stringify(externalDataSourceSerializersApi), + }) +} + export const getExternalDataSourcesJobsRetrieveUrl = (projectId: string, id: string) => { return `/api/projects/${projectId}/external_data_sources/${id}/jobs/` } @@ -872,6 +985,32 @@ export const externalDataSourcesRevenueAnalyticsConfigPartialUpdate = async ( }) } +export const getExternalDataSourcesUpdateCdcSettingsCreateUrl = (projectId: string, id: string) => { + return `/api/projects/${projectId}/external_data_sources/${id}/update_cdc_settings/` +} + +/** + * Update CDC tuning fields without enabling/disabling. + +Lets users edit ``cdc_auto_drop_slot``, ``cdc_lag_warning_threshold_mb``, and +``cdc_lag_critical_threshold_mb`` independently. These fields are universal +across engines. Engine-specific identifiers (slot name, management mode, …) +are immutable post-enable — switching them requires disable + enable. + */ +export const externalDataSourcesUpdateCdcSettingsCreate = async ( + projectId: string, + id: string, + externalDataSourceSerializersApi: NonReadonly, + options?: RequestInit +): Promise => { + return apiMutator(getExternalDataSourcesUpdateCdcSettingsCreateUrl(projectId, id), { + ...options, + method: 'POST', + headers: { 'Content-Type': 'application/json', ...options?.headers }, + body: JSON.stringify(externalDataSourceSerializersApi), + }) +} + export const getExternalDataSourcesUpdateWebhookInputsCreateUrl = (projectId: string, id: string) => { return `/api/projects/${projectId}/external_data_sources/${id}/update_webhook_inputs/` } diff --git a/products/data_warehouse/frontend/generated/api.zod.ts b/products/data_warehouse/frontend/generated/api.zod.ts index 6350fdf89450..a00690ea958b 100644 --- a/products/data_warehouse/frontend/generated/api.zod.ts +++ b/products/data_warehouse/frontend/generated/api.zod.ts @@ -832,6 +832,44 @@ export const ExternalDataSourcesBulkUpdateSchemasPartialUpdateBody = /* @__PURE_ .describe('Schema updates to apply in a single batch.'), }) +/** + * Validate CDC prerequisites for an existing source using its stored credentials. + +The detail=False ``check_cdc_prerequisites`` action is for the creation wizard, +where the client still holds the raw connection config (incl. password) in the +form. On the Configuration page the source already exists and secret fields are +stripped from API responses — so the client can't supply them. This reads the +stored (encrypted) credentials from the DB via the adapter instead. + +Body params: ``cdc_management_mode`` (``"posthog"`` | ``"self_managed"``), +``cdc_slot_name`` (optional), ``cdc_publication_name`` (optional). + */ +export const externalDataSourcesCheckCdcPrerequisitesForSourceCreateBodyPrefixMax = 100 + +export const externalDataSourcesCheckCdcPrerequisitesForSourceCreateBodyDescriptionMax = 400 + +export const ExternalDataSourcesCheckCdcPrerequisitesForSourceCreateBody = /* @__PURE__ */ zod + .object({ + created_via: zod + .union([ + zod.enum(['web', 'api', 'mcp']).describe('\* `web` - web\n\* `api` - api\n\* `mcp` - mcp'), + zod.null(), + ]) + .optional() + .describe( + 'How this source was created. Defaults to `api` on create when omitted. `web` for the in-app UI, `api` for direct API callers, `mcp` for agent\/MCP tool calls. Ignored on update.\n\n\* `web` - web\n\* `api` - api\n\* `mcp` - mcp' + ), + client_secret: zod.string(), + account_id: zod.string(), + prefix: zod.string().max(externalDataSourcesCheckCdcPrerequisitesForSourceCreateBodyPrefixMax).nullish(), + description: zod + .string() + .max(externalDataSourcesCheckCdcPrerequisitesForSourceCreateBodyDescriptionMax) + .nullish(), + job_inputs: zod.unknown().optional(), + }) + .describe('Mixin for serializers to add user access control fields') + /** * Create, Read, Update and Delete External data Sources. */ @@ -884,6 +922,76 @@ export const ExternalDataSourcesDeleteWebhookCreateBody = /* @__PURE__ */ zod }) .describe('Mixin for serializers to add user access control fields') +/** + * Disable CDC on an existing source. + +Cancels any running CDC extraction workflow, deletes the extraction schedule, +delegates engine-side teardown to the source's adapter (drops slot/publication +for Postgres; equivalent for other engines), clears ``cdc_*`` keys from +``job_inputs``, soft-deletes companion CDC tables, and sets all CDC schemas to +``sync_type=None``, ``should_sync=False`` so the user must pick a new sync +strategy before they resume. + */ +export const externalDataSourcesDisableCdcCreateBodyPrefixMax = 100 + +export const externalDataSourcesDisableCdcCreateBodyDescriptionMax = 400 + +export const ExternalDataSourcesDisableCdcCreateBody = /* @__PURE__ */ zod + .object({ + created_via: zod + .union([ + zod.enum(['web', 'api', 'mcp']).describe('\* `web` - web\n\* `api` - api\n\* `mcp` - mcp'), + zod.null(), + ]) + .optional() + .describe( + 'How this source was created. Defaults to `api` on create when omitted. `web` for the in-app UI, `api` for direct API callers, `mcp` for agent\/MCP tool calls. Ignored on update.\n\n\* `web` - web\n\* `api` - api\n\* `mcp` - mcp' + ), + client_secret: zod.string(), + account_id: zod.string(), + prefix: zod.string().max(externalDataSourcesDisableCdcCreateBodyPrefixMax).nullish(), + description: zod.string().max(externalDataSourcesDisableCdcCreateBodyDescriptionMax).nullish(), + job_inputs: zod.unknown().optional(), + }) + .describe('Mixin for serializers to add user access control fields') + +/** + * Enable CDC on an existing source. + +Provisions engine-side CDC resources via the source's adapter, writes the CDC +config into ``source.job_inputs``, and ensures the CDC extraction schedule +exists. Re-runs prereq checks server-side so we never trust a stale +client-side check. + +Body params: ``cdc_management_mode`` (``"posthog"`` | ``"self_managed"``), +plus engine-specific identifier hints (e.g. ``cdc_slot_name``, +``cdc_publication_name`` for Postgres). Universal tuning fields: +``cdc_auto_drop_slot`` (optional bool), ``cdc_lag_warning_threshold_mb`` +(optional int), ``cdc_lag_critical_threshold_mb`` (optional int). + */ +export const externalDataSourcesEnableCdcCreateBodyPrefixMax = 100 + +export const externalDataSourcesEnableCdcCreateBodyDescriptionMax = 400 + +export const ExternalDataSourcesEnableCdcCreateBody = /* @__PURE__ */ zod + .object({ + created_via: zod + .union([ + zod.enum(['web', 'api', 'mcp']).describe('\* `web` - web\n\* `api` - api\n\* `mcp` - mcp'), + zod.null(), + ]) + .optional() + .describe( + 'How this source was created. Defaults to `api` on create when omitted. `web` for the in-app UI, `api` for direct API callers, `mcp` for agent\/MCP tool calls. Ignored on update.\n\n\* `web` - web\n\* `api` - api\n\* `mcp` - mcp' + ), + client_secret: zod.string(), + account_id: zod.string(), + prefix: zod.string().max(externalDataSourcesEnableCdcCreateBodyPrefixMax).nullish(), + description: zod.string().max(externalDataSourcesEnableCdcCreateBodyDescriptionMax).nullish(), + job_inputs: zod.unknown().optional(), + }) + .describe('Mixin for serializers to add user access control fields') + /** * Fetch current schema/table list from the source and create any new ExternalDataSchema rows (no data sync). */ @@ -965,6 +1073,37 @@ export const ExternalDataSourcesRevenueAnalyticsConfigPartialUpdateBody = /* @__ }) .describe('Mixin for serializers to add user access control fields') +/** + * Update CDC tuning fields without enabling/disabling. + +Lets users edit ``cdc_auto_drop_slot``, ``cdc_lag_warning_threshold_mb``, and +``cdc_lag_critical_threshold_mb`` independently. These fields are universal +across engines. Engine-specific identifiers (slot name, management mode, …) +are immutable post-enable — switching them requires disable + enable. + */ +export const externalDataSourcesUpdateCdcSettingsCreateBodyPrefixMax = 100 + +export const externalDataSourcesUpdateCdcSettingsCreateBodyDescriptionMax = 400 + +export const ExternalDataSourcesUpdateCdcSettingsCreateBody = /* @__PURE__ */ zod + .object({ + created_via: zod + .union([ + zod.enum(['web', 'api', 'mcp']).describe('\* `web` - web\n\* `api` - api\n\* `mcp` - mcp'), + zod.null(), + ]) + .optional() + .describe( + 'How this source was created. Defaults to `api` on create when omitted. `web` for the in-app UI, `api` for direct API callers, `mcp` for agent\/MCP tool calls. Ignored on update.\n\n\* `web` - web\n\* `api` - api\n\* `mcp` - mcp' + ), + client_secret: zod.string(), + account_id: zod.string(), + prefix: zod.string().max(externalDataSourcesUpdateCdcSettingsCreateBodyPrefixMax).nullish(), + description: zod.string().max(externalDataSourcesUpdateCdcSettingsCreateBodyDescriptionMax).nullish(), + job_inputs: zod.unknown().optional(), + }) + .describe('Mixin for serializers to add user access control fields') + /** * Create, Read, Update and Delete External data Sources. */ diff --git a/products/data_warehouse/frontend/scenes/SourceScene/tabs/CDCSection.tsx b/products/data_warehouse/frontend/scenes/SourceScene/tabs/CDCSection.tsx new file mode 100644 index 000000000000..a94958669547 --- /dev/null +++ b/products/data_warehouse/frontend/scenes/SourceScene/tabs/CDCSection.tsx @@ -0,0 +1,692 @@ +import { useActions, useValues } from 'kea' +import { useMemo, useState } from 'react' + +import { IconCopy } from '@posthog/icons' +import { + LemonButton, + LemonCheckbox, + LemonDivider, + LemonInput, + LemonModal, + LemonSkeleton, + LemonSwitch, + LemonTag, +} from '@posthog/lemon-ui' + +import api from 'lib/api' +import { AccessControlAction } from 'lib/components/AccessControlAction' +import { FEATURE_FLAGS } from 'lib/constants' +import { LemonBanner } from 'lib/lemon-ui/LemonBanner' +import { LemonDialog } from 'lib/lemon-ui/LemonDialog' +import { LemonField } from 'lib/lemon-ui/LemonField' +import { LemonRadio } from 'lib/lemon-ui/LemonRadio' +import { lemonToast } from 'lib/lemon-ui/LemonToast' +import { featureFlagLogic } from 'lib/logic/featureFlagLogic' +import { humanizeBytes } from 'lib/utils' +import { copyToClipboard } from 'lib/utils/copyToClipboard' + +import { AccessControlLevel, AccessControlResourceType, ExternalDataSource } from '~/types' + +import { sourceSettingsLogic } from './sourceSettingsLogic' + +type ManagementMode = 'posthog' | 'self_managed' + +const DEFAULT_WARN_THRESHOLD_MB = 1024 +const DEFAULT_CRIT_THRESHOLD_MB = 10240 + +// job_inputs is an EncryptedJSONField — scalar values round-trip as strings, so a stored +// Python `True`/`False` arrives as "True"/"False". Coerce defensively. +function coerceBool(value: unknown, fallback: boolean): boolean { + if (typeof value === 'boolean') { + return value + } + if (typeof value === 'string') { + return value.toLowerCase() === 'true' + } + return fallback +} + +function getCdcConfig(source: ExternalDataSource): { + enabled: boolean + management_mode: ManagementMode + slot_name: string + publication_name: string + auto_drop_slot: boolean + lag_warning_threshold_mb: number + lag_critical_threshold_mb: number +} { + const ji = (source.job_inputs ?? {}) as Record + return { + enabled: coerceBool(ji.cdc_enabled, false), + management_mode: (ji.cdc_management_mode === 'self_managed' ? 'self_managed' : 'posthog') as ManagementMode, + slot_name: ji.cdc_slot_name ?? '', + publication_name: ji.cdc_publication_name ?? '', + auto_drop_slot: coerceBool(ji.cdc_auto_drop_slot, true), + lag_warning_threshold_mb: Number(ji.cdc_lag_warning_threshold_mb ?? DEFAULT_WARN_THRESHOLD_MB), + lag_critical_threshold_mb: Number(ji.cdc_lag_critical_threshold_mb ?? DEFAULT_CRIT_THRESHOLD_MB), + } +} + +// Quote a single SQL identifier, escaping embedded double-quotes ("" per the SQL spec) so a +// source/table/user/publication name containing `"` can't break out of the quotes and inject +// extra statements into the generated copy-paste setup script. +function quoteIdent(ident: string): string { + return `"${ident.replace(/"/g, '""')}"` +} + +// Quote a (possibly schema-qualified) table identifier. `orders` -> "public"."orders"; +// `analytics.events` -> "analytics"."events". +function quoteTable(name: string, defaultSchema: string): string { + if (name.includes('.')) { + return name.split('.').map(quoteIdent).join('.') + } + return `${quoteIdent(defaultSchema)}.${quoteIdent(name)}` +} + +// Self-managed CDC setup SQL the customer's DBA runs before PostHog creates the slot. +// schema/user come from `job_inputs` (non-secret, unlike the password). +function buildSelfManagedCdcSql(source: ExternalDataSource, publicationName: string): string { + const ji = (source.job_inputs ?? {}) as Record + const schema = (ji.schema as string) || 'public' + const dbUser = (ji.user as string) || '' + const pubName = publicationName.trim() || 'posthog_pub' + + // No CDC tables picked yet at enable time, so default to the source's synced tables. + const syncedTables = (source.schemas ?? []).filter((s) => s.should_sync).map((s) => s.name) + const tableList = + syncedTables.length > 0 + ? syncedTables.map((t) => quoteTable(t, schema)).join(', ') + : `${quoteIdent(schema)}.${quoteIdent('your_table')}` + + // All identifiers go through quoteIdent so a name with an embedded `"` can't inject SQL. + const user = quoteIdent(dbUser) + const sch = quoteIdent(schema) + const pub = quoteIdent(pubName) + + return `-- 1. Grants for the PostHog user +-- Reading a replication slot requires REPLICATION (or rds_replication on RDS). +-- Run ONE of the lines below, depending on your environment: +ALTER USER ${user} WITH REPLICATION; -- self-hosted / most clouds +-- GRANT rds_replication TO ${user}; -- AWS RDS +GRANT USAGE ON SCHEMA ${sch} TO ${user}; +GRANT SELECT ON ${tableList} TO ${user}; + +-- 2. Publication covering the tables you'll sync via CDC. +-- Run this as the table owner (or a superuser). Adjust the table list to match the +-- tables you intend to switch to CDC on the Schemas tab. PostHog creates and manages +-- the replication slot itself once you enable CDC. +CREATE PUBLICATION ${pub} FOR TABLE ${tableList} + WITH (publish_via_partition_root = true); + +-- Later, to add a new table to the publication: +-- ALTER PUBLICATION ${pub} ADD TABLE ${sch}.${quoteIdent('new_table')};` +} + +function confirmThen(opts: { + title: string + description: React.ReactNode + primaryText: string + primaryStatus?: 'danger' | 'alt' + onConfirm: () => void | Promise +}): void { + LemonDialog.open({ + title: opts.title, + description: opts.description, + primaryButton: { + children: opts.primaryText, + status: opts.primaryStatus, + onClick: () => { + void opts.onConfirm() + }, + }, + secondaryButton: { children: 'Cancel' }, + }) +} + +export function CDCSection({ source }: { source: ExternalDataSource }): JSX.Element | null { + const { featureFlags } = useValues(featureFlagLogic) + + if (source.source_type !== 'Postgres') { + return null + } + if (source.access_method !== 'warehouse') { + return null + } + if (!featureFlags[FEATURE_FLAGS.DWH_POSTGRES_CDC]) { + return null + } + + const cdc = getCdcConfig(source) + + return ( +
+
+

Change data capture (CDC)

+ Alpha + {cdc.enabled && Enabled} +
+

+ Real-time sync via PostgreSQL logical replication. Captures inserts, updates, and{' '} + deletes with no full table scans. +

+ + {cdc.enabled ? : } +
+ ) +} + +function EnabledControls({ source }: { source: ExternalDataSource }): JSX.Element { + const { loadSource, loadCdcStatus } = useActions(sourceSettingsLogic) + const { + cdcStatus: status, + cdcStatusLoading: statusLoading, + cdcStatusError: statusError, + } = useValues(sourceSettingsLogic) + const cdc = getCdcConfig(source) + + const [autoDrop, setAutoDrop] = useState(cdc.auto_drop_slot) + const [warnMb, setWarnMb] = useState(cdc.lag_warning_threshold_mb) + const [critMb, setCritMb] = useState(cdc.lag_critical_threshold_mb) + const [busy, setBusy] = useState(false) + + const dirty = + autoDrop !== cdc.auto_drop_slot || + warnMb !== cdc.lag_warning_threshold_mb || + critMb !== cdc.lag_critical_threshold_mb + const thresholdsInvalid = warnMb >= critMb + const validationError = thresholdsInvalid ? 'Warning threshold must be less than critical threshold.' : null + + const onSave = (): void => { + if (validationError) { + lemonToast.error(validationError) + return + } + confirmThen({ + title: 'Update CDC settings', + description: + 'Slot protection and lag thresholds will be updated for this source. New values take effect on the next CDC tick.', + primaryText: 'Update settings', + onConfirm: async () => { + setBusy(true) + try { + await api.externalDataSources.update_cdc_settings(source.id, { + cdc_auto_drop_slot: autoDrop, + cdc_lag_warning_threshold_mb: warnMb, + cdc_lag_critical_threshold_mb: critMb, + }) + lemonToast.success('CDC settings updated') + loadSource() + } catch (e: any) { + lemonToast.error(e?.message ?? "Couldn't update CDC settings") + } finally { + setBusy(false) + } + }, + }) + } + + const onDisable = (): void => { + confirmThen({ + title: 'Disable CDC', + description: ( +
+

Disabling CDC will:

+
    +
  • + Drop the replication slot{cdc.management_mode === 'posthog' && ' and publication'} on your + source database. +
  • +
  • Pause every schema currently syncing via CDC and clear its sync type.
  • +
  • + Require you to pick a new sync strategy (incremental, append, or full refresh) per schema + before they resume. +
  • +
+

+ You can re-enable CDC later — it will start from the current LSN, not from history. +

+
+ ), + primaryText: 'Disable CDC', + primaryStatus: 'danger', + onConfirm: async () => { + setBusy(true) + try { + await api.externalDataSources.disable_cdc(source.id) + lemonToast.success('CDC disabled') + loadSource() + } catch (e: any) { + lemonToast.error(e?.message ?? "Couldn't disable CDC") + } finally { + setBusy(false) + } + }, + }) + } + + // Lag relative to the critical threshold (bytes), for coloring the WAL-lag readout. + const lagBytes = status?.lag_bytes ?? null + const critBytes = critMb * 1024 * 1024 + const warnBytes = warnMb * 1024 * 1024 + const lagTagType = + lagBytes == null ? 'muted' : lagBytes >= critBytes ? 'danger' : lagBytes >= warnBytes ? 'warning' : 'success' + + return ( +
+
+
+
Management mode
+
{cdc.management_mode === 'posthog' ? 'PostHog-managed' : 'Self-managed'}
+
+
+
Replication slot
+ {cdc.slot_name} +
+
+
Publication
+ {cdc.publication_name} +
+
+ + + +
+
+

Replication status

+ + Refresh + +
+ {statusLoading && !status ? ( + + ) : statusError ? ( + + Couldn't read live status from your database: {statusError} + + ) : status ? ( +
+
+
Replication slot
+ + {status.slot_exists ? 'Active' : 'Missing'} + +
+
+
Publication
+ + {status.publication_exists ? 'Active' : 'Missing'} + +
+
+
WAL lag
+ + {lagBytes == null ? 'Unknown' : humanizeBytes(lagBytes)} + +
+
+ ) : null} + {status && (status.slot_exists === false || status.publication_exists === false) && ( + + {status.slot_exists === false + ? 'The replication slot is missing on your database — CDC syncs will fail until it is recreated. Disable and re-enable CDC to recreate it.' + : 'The publication is missing on your database — recreate it (self-managed) or disable and re-enable CDC (PostHog-managed).'} + + )} +
+ + + +
+ + + +

+ When enabled, PostHog drops the replication slot if WAL lag exceeds the critical threshold — + preventing disk exhaustion on your database. +

+
+ + {autoDrop && cdc.management_mode === 'posthog' && ( +
+ + setWarnMb(Number(v) || 0)} min={1} /> + + + setCritMb(Number(v) || 0)} min={1} /> + +
+ )} + + {validationError && {validationError}} + +
+ + + Disable CDC + + + + + Update CDC configs + + +
+
+ ) +} + +function DisabledControls({ source }: { source: ExternalDataSource }): JSX.Element { + const { loadSource } = useActions(sourceSettingsLogic) + + const [mode, setMode] = useState('posthog') + const [publicationName, setPublicationName] = useState('') + const [autoDrop, setAutoDrop] = useState(true) + const [warnMb, setWarnMb] = useState(DEFAULT_WARN_THRESHOLD_MB) + const [critMb, setCritMb] = useState(DEFAULT_CRIT_THRESHOLD_MB) + const [prereqResult, setPrereqResult] = useState<{ valid: boolean; errors: string[] } | null>(null) + const [checking, setChecking] = useState(false) + const [enabling, setEnabling] = useState(false) + const [setupModalOpen, setSetupModalOpen] = useState(false) + const [sqlConfirmed, setSqlConfirmed] = useState(false) + const [modalErrors, setModalErrors] = useState(null) + + const thresholdsInvalid = warnMb >= critMb + + const selfManagedSql = useMemo(() => buildSelfManagedCdcSql(source, publicationName), [source, publicationName]) + + const doEnable = async (): Promise => { + setEnabling(true) + setModalErrors(null) + try { + await api.externalDataSources.enable_cdc(source.id, { + cdc_management_mode: mode, + cdc_publication_name: mode === 'self_managed' && publicationName ? publicationName : null, + cdc_auto_drop_slot: autoDrop, + cdc_lag_warning_threshold_mb: warnMb, + cdc_lag_critical_threshold_mb: critMb, + }) + lemonToast.success('CDC enabled') + setSetupModalOpen(false) + loadSource() + } catch (e: any) { + // enable_cdc re-validates server-side; surface its `errors` list inline for retry. + const errs = e?.data?.errors + if (Array.isArray(errs) && errs.length > 0) { + setModalErrors(errs) + } else { + lemonToast.error(e?.message ?? "Couldn't enable CDC") + } + } finally { + setEnabling(false) + } + } + + const onCheckPrereqs = async (): Promise => { + setChecking(true) + setPrereqResult(null) + try { + // Use the stored-credentials endpoint: this source already exists and its secret + // fields (password) are stripped from API responses, so we can't resend them. + const result = await api.externalDataSources.check_cdc_prerequisites_for_source(source.id, { + cdc_management_mode: mode, + cdc_publication_name: mode === 'self_managed' && publicationName ? publicationName : null, + }) + setPrereqResult(result) + } catch (e: any) { + lemonToast.error(e?.message ?? "Couldn't check prerequisites") + } finally { + setChecking(false) + } + } + + const onEnable = (): void => { + if (thresholdsInvalid) { + lemonToast.error('Warning threshold must be less than critical threshold.') + return + } + + // Self-managed: the publication must exist before PostHog can create the slot. Walk the + // user through the setup SQL first, then enable (which verifies it server-side). + if (mode === 'self_managed') { + if (!publicationName.trim()) { + lemonToast.error('Enter a publication name first.') + return + } + setModalErrors(null) + setSqlConfirmed(false) + setSetupModalOpen(true) + return + } + + // PostHog-managed: PostHog creates the slot + publication itself, no SQL to run. + confirmThen({ + title: 'Enable CDC', + description: ( +
+

Enabling CDC will:

+
    +
  • Create a replication slot and publication on your source database.
  • +
  • Start the CDC extraction schedule.
  • +
+

+ Schemas won't switch to CDC automatically — pick CDC as the sync type on the Schemas tab once + this finishes. +

+
+ ), + primaryText: 'Enable CDC', + onConfirm: doEnable, + }) + } + + return ( + <> +
+ + { + setMode(v) + setPrereqResult(null) + }} + options={[ + { + value: 'posthog', + label: ( +
+
PostHog-managed
+
+ PostHog creates and manages the replication slot and publication. Requires a + DB user with REPLICATION and table ownership. +
+
+ ), + }, + { + value: 'self_managed', + label: ( +
+
Self-managed
+
+ You (or your DBA) create the publication once as the table owner. PostHog + creates the slot and needs REPLICATION + SELECT on synced tables. +
+
+ ), + }, + ]} + /> +
+ + {mode === 'self_managed' && ( + + { + setPublicationName(v) + // Drop any prior check result — it was for the old publication name. + setPrereqResult(null) + }} + placeholder="posthog_pub" + /> + + )} + +
+ + + +

+ PostHog will drop the slot if WAL lag exceeds the critical threshold. +

+
+ + {autoDrop && mode === 'posthog' && ( +
+ + setWarnMb(Number(v) || 0)} + min={1} + /> + + + setCritMb(Number(v) || 0)} + min={1} + /> + +
+ )} + + {thresholdsInvalid && ( + Warning threshold must be less than critical threshold. + )} + +
+ + Check database prerequisites + + {prereqResult && ( + + {prereqResult.valid ? ( +

Your database is ready for CDC.

+ ) : ( + <> +

Some prerequisites are not met:

+
    + {prereqResult.errors.map((err, i) => ( +
  • {err}
  • + ))} +
+ + )} +
+ )} +
+ +
+ + + {mode === 'self_managed' ? 'Set up & enable CDC' : 'Enable CDC'} + + +
+
+ + setSetupModalOpen(false)} + title="Create your publication" + description="Self-managed CDC needs the publication to exist before PostHog connects — PostHog creates and manages the replication slot itself. Run the SQL below as the table owner, then enable CDC." + width={720} + footer={ + <> + setSetupModalOpen(false)} + disabledReason={enabling ? 'Enabling...' : undefined} + > + Back + + void doEnable()} + > + Verify & enable CDC + + + } + > +
+
+ } + onClick={() => void copyToClipboard(selfManagedSql, 'Setup SQL')} + > + Copy SQL + +
+
+                        {selfManagedSql}
+                    
+ + + + {modalErrors && modalErrors.length > 0 && ( + +

+ Verification failed — please fix the following and retry: +

+
    + {modalErrors.map((err, i) => ( +
  • {err}
  • + ))} +
+
+ )} +
+
+ + ) +} diff --git a/products/data_warehouse/frontend/scenes/SourceScene/tabs/ConfigurationTab.tsx b/products/data_warehouse/frontend/scenes/SourceScene/tabs/ConfigurationTab.tsx index 224eb5a873fa..8ab49ad146c5 100644 --- a/products/data_warehouse/frontend/scenes/SourceScene/tabs/ConfigurationTab.tsx +++ b/products/data_warehouse/frontend/scenes/SourceScene/tabs/ConfigurationTab.tsx @@ -12,6 +12,7 @@ import { SourceFormComponent } from 'products/data_warehouse/frontend/shared/com import { availableSourcesLogic } from '../../NewSourceScene/availableSourcesLogic' import { buildKeaFormDefaultFromSourceDetails } from '../../NewSourceScene/sourceWizardLogic' +import { CDCSection } from './CDCSection' import { sourceSettingsLogic } from './sourceSettingsLogic' interface ConfigurationTabProps { @@ -97,6 +98,7 @@ function UpdateSourceConnectionFormContainer(): JSX.Element { + ) } diff --git a/products/data_warehouse/frontend/scenes/SourceScene/tabs/sourceSettingsLogic.ts b/products/data_warehouse/frontend/scenes/SourceScene/tabs/sourceSettingsLogic.ts index 4504e1bc4db1..0e29ef8c6a33 100644 --- a/products/data_warehouse/frontend/scenes/SourceScene/tabs/sourceSettingsLogic.ts +++ b/products/data_warehouse/frontend/scenes/SourceScene/tabs/sourceSettingsLogic.ts @@ -32,6 +32,18 @@ export interface SourceSettingsLogicProps { availableSources?: Record } +export interface CdcStatus { + enabled: boolean + management_mode?: 'posthog' | 'self_managed' + slot_name?: string + publication_name?: string + lag_warning_threshold_mb?: number + lag_critical_threshold_mb?: number + slot_exists?: boolean + publication_exists?: boolean + lag_bytes?: number | null +} + const REFRESH_INTERVAL = 5000 const SCHEMA_UPDATE_DEBOUNCE_MS = 500 const JOBS_POLL_MAX_BACKOFF_MS = 60000 @@ -322,6 +334,15 @@ export const sourceSettingsLogic = kea([ }, }, ], + cdcStatus: [ + null as CdcStatus | null, + { + // Opens a connection to the customer DB, so it's on-demand (never polled). + loadCdcStatus: async () => { + return await api.externalDataSources.cdc_status(values.sourceId) + }, + }, + ], })), reducers(({ props }) => ({ sourceId: [ @@ -391,6 +412,14 @@ export const sourceSettingsLogic = kea([ submitSourceConfigFailure: () => false, }, ], + cdcStatusError: [ + null as string | null, + { + loadCdcStatus: () => null, + loadCdcStatusSuccess: () => null, + loadCdcStatusFailure: (_, { error }) => error || 'Could not read CDC status from your database.', + }, + ], })), selectors({ sourceFieldConfig: [ @@ -595,6 +624,14 @@ export const sourceSettingsLogic = kea([ } } + // Fetch CDC status once per source — here (not a React effect) so team context is set. + const ji = (values.source?.job_inputs ?? {}) as Record + const cdcEnabled = ji.cdc_enabled === true || ji.cdc_enabled === 'True' || ji.cdc_enabled === 'true' + if (cdcEnabled && cache.cdcStatusFetchedForSourceId !== values.source?.id) { + cache.cdcStatusFetchedForSourceId = values.source?.id + actions.loadCdcStatus() + } + const breadcrumbName = values.source?.access_method === 'direct' ? values.source?.prefix || values.source?.source_type || 'Source' diff --git a/products/data_warehouse/mcp/tools.yaml b/products/data_warehouse/mcp/tools.yaml index 3d493c3354a5..dbdb9484dad6 100644 --- a/products/data_warehouse/mcp/tools.yaml +++ b/products/data_warehouse/mcp/tools.yaml @@ -193,6 +193,9 @@ tools: external-data-sources-bulk-update-schemas-partial-update: operation: external_data_sources_bulk_update_schemas_partial_update enabled: false + external-data-sources-cdc-status-retrieve: + operation: external_data_sources_cdc_status_retrieve + enabled: false external-data-sources-check-cdc-prerequisites-create: operation: external_data_sources_check_cdc_prerequisites_create enabled: true @@ -211,6 +214,9 @@ tools: param_overrides: source_type: input_schema: ExternalDataSourceTypeSchema + external-data-sources-check-cdc-prerequisites-for-source-create: + operation: external_data_sources_check_cdc_prerequisites_for_source_create + enabled: false external-data-sources-connections-list: operation: external_data_sources_connections_list enabled: true @@ -310,6 +316,12 @@ tools: Delete a data warehouse source and all its table schemas, synced tables, and sync schedules. This is a soft delete. Any running syncs are cancelled first. This cannot be undone — the source must be recreated to resume syncing. + external-data-sources-disable-cdc-create: + operation: external_data_sources_disable_cdc_create + enabled: false + external-data-sources-enable-cdc-create: + operation: external_data_sources_enable_cdc_create + enabled: false external-data-sources-jobs: operation: external_data_sources_jobs_retrieve enabled: false @@ -402,6 +414,9 @@ tools: external-data-sources-update: operation: external_data_sources_update enabled: false + external-data-sources-update-cdc-settings-create: + operation: external_data_sources_update_cdc_settings_create + enabled: false external-data-sources-update-webhook-inputs-create: operation: external_data_sources_update_webhook_inputs_create enabled: true diff --git a/services/mcp/definitions/data_warehouse.yaml b/services/mcp/definitions/data_warehouse.yaml index 8273c466373e..2e42fc921497 100644 --- a/services/mcp/definitions/data_warehouse.yaml +++ b/services/mcp/definitions/data_warehouse.yaml @@ -93,9 +93,15 @@ tools: external-data-sources-bulk-update-schemas-partial-update: operation: external_data_sources_bulk_update_schemas_partial_update enabled: false + external-data-sources-cdc-status-retrieve: + operation: external_data_sources_cdc_status_retrieve + enabled: false external-data-sources-check-cdc-prerequisites-create: operation: external_data_sources_check_cdc_prerequisites_create enabled: false + external-data-sources-check-cdc-prerequisites-for-source-create: + operation: external_data_sources_check_cdc_prerequisites_for_source_create + enabled: false external-data-sources-connections-list: operation: external_data_sources_connections_list enabled: false @@ -114,6 +120,12 @@ tools: external-data-sources-destroy: operation: external_data_sources_destroy enabled: false + external-data-sources-disable-cdc-create: + operation: external_data_sources_disable_cdc_create + enabled: false + external-data-sources-enable-cdc-create: + operation: external_data_sources_enable_cdc_create + enabled: false external-data-sources-jobs-retrieve: operation: external_data_sources_jobs_retrieve enabled: false @@ -141,6 +153,9 @@ tools: external-data-sources-update: operation: external_data_sources_update enabled: false + external-data-sources-update-cdc-settings-create: + operation: external_data_sources_update_cdc_settings_create + enabled: false external-data-sources-update-webhook-inputs-create: operation: external_data_sources_update_webhook_inputs_create enabled: false diff --git a/services/mcp/schema/generated-tool-definitions.json b/services/mcp/schema/generated-tool-definitions.json index 8a2613cc3ad6..56736dc7d711 100644 --- a/services/mcp/schema/generated-tool-definitions.json +++ b/services/mcp/schema/generated-tool-definitions.json @@ -1015,7 +1015,6 @@ "summary": "Delete dashboard tile", "title": "Delete dashboard tile", "required_scopes": ["dashboard:write"], - "new_mcp": true, "annotations": { "destructiveHint": true, "idempotentHint": false, diff --git a/services/mcp/schema/tool-definitions-all.json b/services/mcp/schema/tool-definitions-all.json index daa5a0702e21..5651734a412f 100644 --- a/services/mcp/schema/tool-definitions-all.json +++ b/services/mcp/schema/tool-definitions-all.json @@ -1031,7 +1031,6 @@ "summary": "Delete dashboard tile", "title": "Delete dashboard tile", "required_scopes": ["dashboard:write"], - "new_mcp": true, "annotations": { "destructiveHint": true, "idempotentHint": false, diff --git a/services/mcp/tests/unit/__snapshots__/tool-schemas/dashboard-delete-tile.json b/services/mcp/tests/unit/__snapshots__/tool-schemas/dashboard-delete-tile.json new file mode 100644 index 000000000000..5214f1b5e03b --- /dev/null +++ b/services/mcp/tests/unit/__snapshots__/tool-schemas/dashboard-delete-tile.json @@ -0,0 +1,15 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "properties": { + "id": { + "description": "A unique integer value identifying this dashboard.", + "type": "number" + }, + "tile_id": { + "description": "ID of the dashboard tile to delete. Use dashboard-get to look up tile IDs.", + "type": "number" + } + }, + "required": ["id", "tile_id"], + "type": "object" +}