From e4312fec9952199f6735da927c3a6aa122a91dbd Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Tue, 7 May 2024 13:42:08 -0700 Subject: [PATCH 1/8] refactor kafka table --- posthog/models/kafka_partition_stats/sql.py | 63 +++++++++++++-------- 1 file changed, 40 insertions(+), 23 deletions(-) diff --git a/posthog/models/kafka_partition_stats/sql.py b/posthog/models/kafka_partition_stats/sql.py index 2a9fe901ffc0c..6a12daf95069a 100644 --- a/posthog/models/kafka_partition_stats/sql.py +++ b/posthog/models/kafka_partition_stats/sql.py @@ -1,32 +1,49 @@ +from dataclasses import dataclass from posthog.clickhouse.kafka_engine import kafka_engine from posthog.clickhouse.table_engines import AggregatingMergeTree from posthog.kafka_client.topics import KAFKA_EVENTS_PLUGIN_INGESTION from posthog.settings import CLICKHOUSE_CLUSTER, CLICKHOUSE_DATABASE +from posthog.settings.data_stores import KAFKA_HOSTS -CREATE_PARTITION_STATISTICS_KAFKA_TABLE = ( - lambda monitored_topic: f""" -CREATE TABLE IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.kafka_{monitored_topic}_partition_statistics ON CLUSTER '{CLICKHOUSE_CLUSTER}' -( - `uuid` String, - `distinct_id` String, - `ip` String, - `site_url` String, - `data` String, - `team_id` Int64, - `now` String, - `sent_at` String, - `token` String -) -ENGINE={kafka_engine(topic=monitored_topic, group="partition_statistics")} -SETTINGS input_format_values_interpret_expressions=0, kafka_skip_broken_messages = 100; -""" -) -DROP_PARTITION_STATISTICS_KAFKA_TABLE = ( - lambda monitored_topic: f""" -DROP TABLE IF EXISTS `{CLICKHOUSE_DATABASE}`.kafka_{monitored_topic}_partition_statistics ON CLUSTER '{CLICKHOUSE_CLUSTER}'; -""" -) +@dataclass +class PartitionStatsKafkaTable: + brokers: list[str] + topic: str + consumer_group: str = "partition_statistics" + + @property + def table_name(self) -> str: + return f"kafka_{self.topic}_partition_statistics" + + def get_create_table_sql(self) -> str: + return f""" + CREATE TABLE IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' + ( + `uuid` String, + `distinct_id` String, + `ip` String, + `site_url` String, + `data` String, + `team_id` Int64, + `now` String, + `sent_at` String, + `token` String + ) + ENGINE={kafka_engine(kafka_host=",".join(self.brokers), topic=self.topic, group=self.consumer_group)} + SETTINGS input_format_values_interpret_expressions=0, kafka_skip_broken_messages = 100 + """ + + def get_drop_table_sql(self) -> None: + return f""" + DROP TABLE IF EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' + """ + + +CREATE_PARTITION_STATISTICS_KAFKA_TABLE = lambda topic: PartitionStatsKafkaTable( + KAFKA_HOSTS, topic +).get_create_table_sql() +DROP_PARTITION_STATISTICS_KAFKA_TABLE = lambda topic: PartitionStatsKafkaTable(KAFKA_HOSTS, topic).get_drop_table_sql() EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS_TABLE_ENGINE = lambda: AggregatingMergeTree( "events_plugin_ingestion_partition_statistics" From 46c83a412fb7ad993412b724e10b0fd244dfafce Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Tue, 7 May 2024 13:54:25 -0700 Subject: [PATCH 2/8] update migrations --- .../migrations/0041_kafka_partitions_stats.py | 8 ++++++-- .../migrations/0042_kafka_partitions_stats.py | 11 ++++++++--- posthog/models/kafka_partition_stats/sql.py | 14 -------------- 3 files changed, 14 insertions(+), 19 deletions(-) diff --git a/posthog/clickhouse/migrations/0041_kafka_partitions_stats.py b/posthog/clickhouse/migrations/0041_kafka_partitions_stats.py index 1b83e8b491edc..1ed91bb14cd8e 100644 --- a/posthog/clickhouse/migrations/0041_kafka_partitions_stats.py +++ b/posthog/clickhouse/migrations/0041_kafka_partitions_stats.py @@ -1,12 +1,16 @@ from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions +from posthog.kafka_client.topics import KAFKA_EVENTS_PLUGIN_INGESTION from posthog.models.kafka_partition_stats.sql import ( + PartitionStatsKafkaTable, CREATE_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS_MV, - CREATE_KAFKA_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS, EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS, ) +from posthog.settings.data_stores import KAFKA_HOSTS operations = [ - run_sql_with_exceptions(CREATE_KAFKA_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS), + run_sql_with_exceptions( + PartitionStatsKafkaTable(KAFKA_HOSTS, KAFKA_EVENTS_PLUGIN_INGESTION).get_create_table_sql() + ), run_sql_with_exceptions(EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS()), run_sql_with_exceptions(CREATE_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS_MV), ] diff --git a/posthog/clickhouse/migrations/0042_kafka_partitions_stats.py b/posthog/clickhouse/migrations/0042_kafka_partitions_stats.py index 1a588a1092474..cf691e04cac50 100644 --- a/posthog/clickhouse/migrations/0042_kafka_partitions_stats.py +++ b/posthog/clickhouse/migrations/0042_kafka_partitions_stats.py @@ -4,13 +4,18 @@ KAFKA_SESSION_RECORDING_EVENTS, ) from posthog.models.kafka_partition_stats.sql import ( - CREATE_PARTITION_STATISTICS_KAFKA_TABLE, + PartitionStatsKafkaTable, CREATE_PARTITION_STATISTICS_MV, ) +from posthog.settings.data_stores import KAFKA_HOSTS, SESSION_RECORDING_KAFKA_HOSTS operations = [ - run_sql_with_exceptions(CREATE_PARTITION_STATISTICS_KAFKA_TABLE(KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW)), + run_sql_with_exceptions( + PartitionStatsKafkaTable(KAFKA_HOSTS, KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW).get_create_table_sql() + ), run_sql_with_exceptions(CREATE_PARTITION_STATISTICS_MV(KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW)), - run_sql_with_exceptions(CREATE_PARTITION_STATISTICS_KAFKA_TABLE(KAFKA_SESSION_RECORDING_EVENTS)), + run_sql_with_exceptions( + PartitionStatsKafkaTable(SESSION_RECORDING_KAFKA_HOSTS, KAFKA_SESSION_RECORDING_EVENTS).get_create_table_sql() + ), run_sql_with_exceptions(CREATE_PARTITION_STATISTICS_MV(KAFKA_SESSION_RECORDING_EVENTS)), ] diff --git a/posthog/models/kafka_partition_stats/sql.py b/posthog/models/kafka_partition_stats/sql.py index 6a12daf95069a..e9f1f341c73d2 100644 --- a/posthog/models/kafka_partition_stats/sql.py +++ b/posthog/models/kafka_partition_stats/sql.py @@ -3,7 +3,6 @@ from posthog.clickhouse.table_engines import AggregatingMergeTree from posthog.kafka_client.topics import KAFKA_EVENTS_PLUGIN_INGESTION from posthog.settings import CLICKHOUSE_CLUSTER, CLICKHOUSE_DATABASE -from posthog.settings.data_stores import KAFKA_HOSTS @dataclass @@ -40,11 +39,6 @@ def get_drop_table_sql(self) -> None: """ -CREATE_PARTITION_STATISTICS_KAFKA_TABLE = lambda topic: PartitionStatsKafkaTable( - KAFKA_HOSTS, topic -).get_create_table_sql() -DROP_PARTITION_STATISTICS_KAFKA_TABLE = lambda topic: PartitionStatsKafkaTable(KAFKA_HOSTS, topic).get_drop_table_sql() - EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS_TABLE_ENGINE = lambda: AggregatingMergeTree( "events_plugin_ingestion_partition_statistics" ) @@ -100,14 +94,6 @@ def get_drop_table_sql(self) -> None: """ ) -CREATE_KAFKA_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS = CREATE_PARTITION_STATISTICS_KAFKA_TABLE( - KAFKA_EVENTS_PLUGIN_INGESTION -) - CREATE_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS_MV = CREATE_PARTITION_STATISTICS_MV(KAFKA_EVENTS_PLUGIN_INGESTION) -DROP_KAFKA_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS = DROP_PARTITION_STATISTICS_KAFKA_TABLE( - KAFKA_EVENTS_PLUGIN_INGESTION -) - DROP_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS_MV = DROP_PARTITION_STATISTICS_MV(KAFKA_EVENTS_PLUGIN_INGESTION) From f5f8eaa4ed9039e23b5a4c25c5689601aad59919 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Tue, 7 May 2024 14:15:25 -0700 Subject: [PATCH 3/8] new table definitions --- posthog/models/kafka_partition_stats/sql.py | 64 ++++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/posthog/models/kafka_partition_stats/sql.py b/posthog/models/kafka_partition_stats/sql.py index e9f1f341c73d2..da14d9bb59831 100644 --- a/posthog/models/kafka_partition_stats/sql.py +++ b/posthog/models/kafka_partition_stats/sql.py @@ -1,6 +1,6 @@ from dataclasses import dataclass from posthog.clickhouse.kafka_engine import kafka_engine -from posthog.clickhouse.table_engines import AggregatingMergeTree +from posthog.clickhouse.table_engines import AggregatingMergeTree, ReplacingMergeTree from posthog.kafka_client.topics import KAFKA_EVENTS_PLUGIN_INGESTION from posthog.settings import CLICKHOUSE_CLUSTER, CLICKHOUSE_DATABASE @@ -97,3 +97,65 @@ def get_drop_table_sql(self) -> None: CREATE_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS_MV = CREATE_PARTITION_STATISTICS_MV(KAFKA_EVENTS_PLUGIN_INGESTION) DROP_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS_MV = DROP_PARTITION_STATISTICS_MV(KAFKA_EVENTS_PLUGIN_INGESTION) + +# V2 + + +class PartitionStatsV2Table: + table_name: str = "events_plugin_ingestion_partition_statistics_v2" + + def get_create_table_sql(self) -> str: + engine = ReplacingMergeTree(self.table_name, ver="timestamp") + return f""" + CREATE TABLE IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' ( + topic LowCardinality(String), + partition UInt64, + offset UInt64, + token String, + distinct_id String, + ip String, + event String, + data_length UInt64, + timestamp DateTime64 + ) + ENGINE = {engine} + PARTITION BY (topic, toStartOfDay(timestamp)) + ORDER BY (topic, partition, offset) + """ + + def get_drop_table_sql(self) -> str: + return f""" + DROP TABLE IF EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' SYNC + """ + + +@dataclass +class PartitionStatsV2MaterializedView: + to_table: PartitionStatsV2Table + from_table: PartitionStatsKafkaTable + + @property + def table_name(self) -> str: + return f"{self.from_table.topic}_partition_statistics_v2_mv" + + def get_create_table_sql(self) -> str: + return f""" + CREATE MATERIALIZED VIEW IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' + TO `{CLICKHOUSE_DATABASE}`.{self.to_table.table_name} + AS SELECT + _topic AS topic, + _partition AS partition, + _offset AS offset, + token, + distinct_id, + ip, + if(startsWith(JSONExtractString(data, 'event') AS _event, '$'), _event, '') AS event, + length(data) AS data_length, + _timestamp AS timestamp + FROM {CLICKHOUSE_DATABASE}.{self.from_table.table_name} + """ + + def get_drop_table_sql(self) -> str: + return f""" + DROP TABLE IF EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' SYNC + """ From 07ea390b2fdb7fe5a036d97b3b7c31a9c489e81c Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Tue, 7 May 2024 14:28:06 -0700 Subject: [PATCH 4/8] add migration --- .../migrations/0062_partition_stats_v2.py | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 posthog/clickhouse/migrations/0062_partition_stats_v2.py diff --git a/posthog/clickhouse/migrations/0062_partition_stats_v2.py b/posthog/clickhouse/migrations/0062_partition_stats_v2.py new file mode 100644 index 0000000000000..16ca89bc1efd8 --- /dev/null +++ b/posthog/clickhouse/migrations/0062_partition_stats_v2.py @@ -0,0 +1,44 @@ +from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions +from posthog.kafka_client.topics import ( + KAFKA_EVENTS_PLUGIN_INGESTION, + KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL, + KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW, + KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, + KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW, +) +from posthog.models.kafka_partition_stats.sql import ( + PartitionStatsKafkaTable as KafkaTable, + PartitionStatsV2MaterializedView as MaterializedView, + PartitionStatsV2Table as Table, +) +from posthog.settings.data_stores import KAFKA_HOSTS, SESSION_RECORDING_KAFKA_HOSTS + +table = Table() + +existing_kafka_tables = [ + # 0041 added KAFKA_EVENTS_PLUGIN_INGESTION + KafkaTable(KAFKA_HOSTS, KAFKA_EVENTS_PLUGIN_INGESTION), + # 0042 added KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW (and KAFKA_SESSION_RECORDING_EVENTS, now unused) + KafkaTable(KAFKA_HOSTS, KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW), +] + +new_kafka_tables = [ + KafkaTable(KAFKA_HOSTS, KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL), + KafkaTable(SESSION_RECORDING_KAFKA_HOSTS, KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS), + KafkaTable(SESSION_RECORDING_KAFKA_HOSTS, KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW), +] + +operations = [ + run_sql_with_exceptions(table.get_create_table_sql()), +] + +for kafka_table in existing_kafka_tables: + operations.append(run_sql_with_exceptions(MaterializedView(table, kafka_table).get_create_table_sql())) + +for kafka_table in new_kafka_tables: + operations.extend( + [ + run_sql_with_exceptions(kafka_table.get_create_table_sql()), + run_sql_with_exceptions(MaterializedView(table, kafka_table).get_create_table_sql()), + ] + ) From dd57cd1c54b120adbf2397d989606e68b77c3bd9 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Tue, 7 May 2024 14:30:47 -0700 Subject: [PATCH 5/8] add ttl --- posthog/models/kafka_partition_stats/sql.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/posthog/models/kafka_partition_stats/sql.py b/posthog/models/kafka_partition_stats/sql.py index da14d9bb59831..14c8a698eeac2 100644 --- a/posthog/models/kafka_partition_stats/sql.py +++ b/posthog/models/kafka_partition_stats/sql.py @@ -116,11 +116,12 @@ def get_create_table_sql(self) -> str: ip String, event String, data_length UInt64, - timestamp DateTime64 + timestamp DateTime ) ENGINE = {engine} PARTITION BY (topic, toStartOfDay(timestamp)) ORDER BY (topic, partition, offset) + TTL timestamp + INTERVAL 30 DAY """ def get_drop_table_sql(self) -> str: From 49f695671d278c50ec7afd80ad62eccaa42d43b8 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Tue, 7 May 2024 15:32:32 -0700 Subject: [PATCH 6/8] fix dumb typing mistake --- posthog/models/kafka_partition_stats/sql.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/posthog/models/kafka_partition_stats/sql.py b/posthog/models/kafka_partition_stats/sql.py index 14c8a698eeac2..f0f6946f17f31 100644 --- a/posthog/models/kafka_partition_stats/sql.py +++ b/posthog/models/kafka_partition_stats/sql.py @@ -33,7 +33,7 @@ def get_create_table_sql(self) -> str: SETTINGS input_format_values_interpret_expressions=0, kafka_skip_broken_messages = 100 """ - def get_drop_table_sql(self) -> None: + def get_drop_table_sql(self) -> str: return f""" DROP TABLE IF EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' """ From 3bdfe633b07c013359e543e2fa184ca4866ed207 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Wed, 22 May 2024 16:45:02 -0700 Subject: [PATCH 7/8] use native types for `ip` column (poor man's variant) --- posthog/models/kafka_partition_stats/sql.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/posthog/models/kafka_partition_stats/sql.py b/posthog/models/kafka_partition_stats/sql.py index f0f6946f17f31..781a6541b962c 100644 --- a/posthog/models/kafka_partition_stats/sql.py +++ b/posthog/models/kafka_partition_stats/sql.py @@ -113,7 +113,7 @@ def get_create_table_sql(self) -> str: offset UInt64, token String, distinct_id String, - ip String, + ip Tuple(v4 IPv4, v6 IPv6), event String, data_length UInt64, timestamp DateTime @@ -149,11 +149,11 @@ def get_create_table_sql(self) -> str: _offset AS offset, token, distinct_id, - ip, + (toIPv4OrDefault(kafka_table.ip), toIPv6OrDefault(kafka_table.ip)) AS ip, if(startsWith(JSONExtractString(data, 'event') AS _event, '$'), _event, '') AS event, length(data) AS data_length, _timestamp AS timestamp - FROM {CLICKHOUSE_DATABASE}.{self.from_table.table_name} + FROM {CLICKHOUSE_DATABASE}.{self.from_table.table_name} AS kafka_table """ def get_drop_table_sql(self) -> str: From f8d96718af41bd67a90b4a2b10e4090ae1878452 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Wed, 22 May 2024 16:49:44 -0700 Subject: [PATCH 8/8] don't constrain event types --- posthog/models/kafka_partition_stats/sql.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/posthog/models/kafka_partition_stats/sql.py b/posthog/models/kafka_partition_stats/sql.py index 781a6541b962c..c74a0407ff276 100644 --- a/posthog/models/kafka_partition_stats/sql.py +++ b/posthog/models/kafka_partition_stats/sql.py @@ -150,7 +150,7 @@ def get_create_table_sql(self) -> str: token, distinct_id, (toIPv4OrDefault(kafka_table.ip), toIPv6OrDefault(kafka_table.ip)) AS ip, - if(startsWith(JSONExtractString(data, 'event') AS _event, '$'), _event, '') AS event, + JSONExtractString(data, 'event') AS event, length(data) AS data_length, _timestamp AS timestamp FROM {CLICKHOUSE_DATABASE}.{self.from_table.table_name} AS kafka_table