diff --git a/posthog/clickhouse/migrations/0041_kafka_partitions_stats.py b/posthog/clickhouse/migrations/0041_kafka_partitions_stats.py index 1b83e8b491edc..1ed91bb14cd8e 100644 --- a/posthog/clickhouse/migrations/0041_kafka_partitions_stats.py +++ b/posthog/clickhouse/migrations/0041_kafka_partitions_stats.py @@ -1,12 +1,16 @@ from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions +from posthog.kafka_client.topics import KAFKA_EVENTS_PLUGIN_INGESTION from posthog.models.kafka_partition_stats.sql import ( + PartitionStatsKafkaTable, CREATE_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS_MV, - CREATE_KAFKA_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS, EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS, ) +from posthog.settings.data_stores import KAFKA_HOSTS operations = [ - run_sql_with_exceptions(CREATE_KAFKA_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS), + run_sql_with_exceptions( + PartitionStatsKafkaTable(KAFKA_HOSTS, KAFKA_EVENTS_PLUGIN_INGESTION).get_create_table_sql() + ), run_sql_with_exceptions(EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS()), run_sql_with_exceptions(CREATE_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS_MV), ] diff --git a/posthog/clickhouse/migrations/0042_kafka_partitions_stats.py b/posthog/clickhouse/migrations/0042_kafka_partitions_stats.py index 1a588a1092474..cf691e04cac50 100644 --- a/posthog/clickhouse/migrations/0042_kafka_partitions_stats.py +++ b/posthog/clickhouse/migrations/0042_kafka_partitions_stats.py @@ -4,13 +4,18 @@ KAFKA_SESSION_RECORDING_EVENTS, ) from posthog.models.kafka_partition_stats.sql import ( - CREATE_PARTITION_STATISTICS_KAFKA_TABLE, + PartitionStatsKafkaTable, CREATE_PARTITION_STATISTICS_MV, ) +from posthog.settings.data_stores import KAFKA_HOSTS, SESSION_RECORDING_KAFKA_HOSTS operations = [ - run_sql_with_exceptions(CREATE_PARTITION_STATISTICS_KAFKA_TABLE(KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW)), + run_sql_with_exceptions( + PartitionStatsKafkaTable(KAFKA_HOSTS, KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW).get_create_table_sql() + ), run_sql_with_exceptions(CREATE_PARTITION_STATISTICS_MV(KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW)), - run_sql_with_exceptions(CREATE_PARTITION_STATISTICS_KAFKA_TABLE(KAFKA_SESSION_RECORDING_EVENTS)), + run_sql_with_exceptions( + PartitionStatsKafkaTable(SESSION_RECORDING_KAFKA_HOSTS, KAFKA_SESSION_RECORDING_EVENTS).get_create_table_sql() + ), run_sql_with_exceptions(CREATE_PARTITION_STATISTICS_MV(KAFKA_SESSION_RECORDING_EVENTS)), ] diff --git a/posthog/clickhouse/migrations/0062_partition_stats_v2.py b/posthog/clickhouse/migrations/0062_partition_stats_v2.py new file mode 100644 index 0000000000000..16ca89bc1efd8 --- /dev/null +++ b/posthog/clickhouse/migrations/0062_partition_stats_v2.py @@ -0,0 +1,44 @@ +from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions +from posthog.kafka_client.topics import ( + KAFKA_EVENTS_PLUGIN_INGESTION, + KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL, + KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, + KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, + KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW, +) +from posthog.models.kafka_partition_stats.sql import ( + PartitionStatsKafkaTable as KafkaTable, + PartitionStatsV2MaterializedView as MaterializedView, + PartitionStatsV2Table as Table, +) +from posthog.settings.data_stores import KAFKA_HOSTS, SESSION_RECORDING_KAFKA_HOSTS + +table = Table() + +existing_kafka_tables = [ + # 0041 added KAFKA_EVENTS_PLUGIN_INGESTION + KafkaTable(KAFKA_HOSTS, KAFKA_EVENTS_PLUGIN_INGESTION), + # 0042 added KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW (and KAFKA_SESSION_RECORDING_EVENTS, now unused) + KafkaTable(KAFKA_HOSTS, KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW), +] + +new_kafka_tables = [ + KafkaTable(KAFKA_HOSTS, KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL), + KafkaTable(SESSION_RECORDING_KAFKA_HOSTS, KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS), + KafkaTable(SESSION_RECORDING_KAFKA_HOSTS, KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW), +] + +operations = [ + run_sql_with_exceptions(table.get_create_table_sql()), +] + +for kafka_table in existing_kafka_tables: + operations.append(run_sql_with_exceptions(MaterializedView(table, kafka_table).get_create_table_sql())) + +for kafka_table in new_kafka_tables: + operations.extend( + [ + run_sql_with_exceptions(kafka_table.get_create_table_sql()), + run_sql_with_exceptions(MaterializedView(table, kafka_table).get_create_table_sql()), + ] + ) diff --git a/posthog/models/kafka_partition_stats/sql.py b/posthog/models/kafka_partition_stats/sql.py index 2a9fe901ffc0c..c74a0407ff276 100644 --- a/posthog/models/kafka_partition_stats/sql.py +++ b/posthog/models/kafka_partition_stats/sql.py @@ -1,32 +1,43 @@ +from dataclasses import dataclass from posthog.clickhouse.kafka_engine import kafka_engine -from posthog.clickhouse.table_engines import AggregatingMergeTree +from posthog.clickhouse.table_engines import AggregatingMergeTree, ReplacingMergeTree from posthog.kafka_client.topics import KAFKA_EVENTS_PLUGIN_INGESTION from posthog.settings import CLICKHOUSE_CLUSTER, CLICKHOUSE_DATABASE -CREATE_PARTITION_STATISTICS_KAFKA_TABLE = ( - lambda monitored_topic: f""" -CREATE TABLE IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.kafka_{monitored_topic}_partition_statistics ON CLUSTER '{CLICKHOUSE_CLUSTER}' -( - `uuid` String, - `distinct_id` String, - `ip` String, - `site_url` String, - `data` String, - `team_id` Int64, - `now` String, - `sent_at` String, - `token` String -) -ENGINE={kafka_engine(topic=monitored_topic, group="partition_statistics")} -SETTINGS input_format_values_interpret_expressions=0, kafka_skip_broken_messages = 100; -""" -) -DROP_PARTITION_STATISTICS_KAFKA_TABLE = ( - lambda monitored_topic: f""" -DROP TABLE IF EXISTS `{CLICKHOUSE_DATABASE}`.kafka_{monitored_topic}_partition_statistics ON CLUSTER '{CLICKHOUSE_CLUSTER}'; -""" -) +@dataclass +class PartitionStatsKafkaTable: + brokers: list[str] + topic: str + consumer_group: str = "partition_statistics" + + @property + def table_name(self) -> str: + return f"kafka_{self.topic}_partition_statistics" + + def get_create_table_sql(self) -> str: + return f""" + CREATE TABLE IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' + ( + `uuid` String, + `distinct_id` String, + `ip` String, + `site_url` String, + `data` String, + `team_id` Int64, + `now` String, + `sent_at` String, + `token` String + ) + ENGINE={kafka_engine(kafka_host=",".join(self.brokers), topic=self.topic, group=self.consumer_group)} + SETTINGS input_format_values_interpret_expressions=0, kafka_skip_broken_messages = 100 + """ + + def get_drop_table_sql(self) -> str: + return f""" + DROP TABLE IF EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' + """ + EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS_TABLE_ENGINE = lambda: AggregatingMergeTree( "events_plugin_ingestion_partition_statistics" @@ -83,14 +94,69 @@ """ ) -CREATE_KAFKA_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS = CREATE_PARTITION_STATISTICS_KAFKA_TABLE( - KAFKA_EVENTS_PLUGIN_INGESTION -) - CREATE_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS_MV = CREATE_PARTITION_STATISTICS_MV(KAFKA_EVENTS_PLUGIN_INGESTION) -DROP_KAFKA_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS = DROP_PARTITION_STATISTICS_KAFKA_TABLE( - KAFKA_EVENTS_PLUGIN_INGESTION -) - DROP_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS_MV = DROP_PARTITION_STATISTICS_MV(KAFKA_EVENTS_PLUGIN_INGESTION) + +# V2 + + +class PartitionStatsV2Table: + table_name: str = "events_plugin_ingestion_partition_statistics_v2" + + def get_create_table_sql(self) -> str: + engine = ReplacingMergeTree(self.table_name, ver="timestamp") + return f""" + CREATE TABLE IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' ( + topic LowCardinality(String), + partition UInt64, + offset UInt64, + token String, + distinct_id String, + ip Tuple(v4 IPv4, v6 IPv6), + event String, + data_length UInt64, + timestamp DateTime + ) + ENGINE = {engine} + PARTITION BY (topic, toStartOfDay(timestamp)) + ORDER BY (topic, partition, offset) + TTL timestamp + INTERVAL 30 DAY + """ + + def get_drop_table_sql(self) -> str: + return f""" + DROP TABLE IF EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' SYNC + """ + + +@dataclass +class PartitionStatsV2MaterializedView: + to_table: PartitionStatsV2Table + from_table: PartitionStatsKafkaTable + + @property + def table_name(self) -> str: + return f"{self.from_table.topic}_partition_statistics_v2_mv" + + def get_create_table_sql(self) -> str: + return f""" + CREATE MATERIALIZED VIEW IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' + TO `{CLICKHOUSE_DATABASE}`.{self.to_table.table_name} + AS SELECT + _topic AS topic, + _partition AS partition, + _offset AS offset, + token, + distinct_id, + (toIPv4OrDefault(kafka_table.ip), toIPv6OrDefault(kafka_table.ip)) AS ip, + JSONExtractString(data, 'event') AS event, + length(data) AS data_length, + _timestamp AS timestamp + FROM {CLICKHOUSE_DATABASE}.{self.from_table.table_name} AS kafka_table + """ + + def get_drop_table_sql(self) -> str: + return f""" + DROP TABLE IF EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' SYNC + """