Skip to content

Commit

Permalink
feat: Add revised partition stats tables (#22163)
Browse files Browse the repository at this point in the history
  • Loading branch information
tkaemming committed May 23, 2024
1 parent ea83358 commit 6e13f17
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 37 deletions.
8 changes: 6 additions & 2 deletions posthog/clickhouse/migrations/0041_kafka_partitions_stats.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.kafka_client.topics import KAFKA_EVENTS_PLUGIN_INGESTION
from posthog.models.kafka_partition_stats.sql import (
PartitionStatsKafkaTable,
CREATE_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS_MV,
CREATE_KAFKA_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS,
EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS,
)
from posthog.settings.data_stores import KAFKA_HOSTS

operations = [
run_sql_with_exceptions(CREATE_KAFKA_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS),
run_sql_with_exceptions(
PartitionStatsKafkaTable(KAFKA_HOSTS, KAFKA_EVENTS_PLUGIN_INGESTION).get_create_table_sql()
),
run_sql_with_exceptions(EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS()),
run_sql_with_exceptions(CREATE_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS_MV),
]
11 changes: 8 additions & 3 deletions posthog/clickhouse/migrations/0042_kafka_partitions_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@
KAFKA_SESSION_RECORDING_EVENTS,
)
from posthog.models.kafka_partition_stats.sql import (
CREATE_PARTITION_STATISTICS_KAFKA_TABLE,
PartitionStatsKafkaTable,
CREATE_PARTITION_STATISTICS_MV,
)
from posthog.settings.data_stores import KAFKA_HOSTS, SESSION_RECORDING_KAFKA_HOSTS

operations = [
run_sql_with_exceptions(CREATE_PARTITION_STATISTICS_KAFKA_TABLE(KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW)),
run_sql_with_exceptions(
PartitionStatsKafkaTable(KAFKA_HOSTS, KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW).get_create_table_sql()
),
run_sql_with_exceptions(CREATE_PARTITION_STATISTICS_MV(KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW)),
run_sql_with_exceptions(CREATE_PARTITION_STATISTICS_KAFKA_TABLE(KAFKA_SESSION_RECORDING_EVENTS)),
run_sql_with_exceptions(
PartitionStatsKafkaTable(SESSION_RECORDING_KAFKA_HOSTS, KAFKA_SESSION_RECORDING_EVENTS).get_create_table_sql()
),
run_sql_with_exceptions(CREATE_PARTITION_STATISTICS_MV(KAFKA_SESSION_RECORDING_EVENTS)),
]
44 changes: 44 additions & 0 deletions posthog/clickhouse/migrations/0062_partition_stats_v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.kafka_client.topics import (
KAFKA_EVENTS_PLUGIN_INGESTION,
KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL,
KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW,
)
from posthog.models.kafka_partition_stats.sql import (
PartitionStatsKafkaTable as KafkaTable,
PartitionStatsV2MaterializedView as MaterializedView,
PartitionStatsV2Table as Table,
)
from posthog.settings.data_stores import KAFKA_HOSTS, SESSION_RECORDING_KAFKA_HOSTS

table = Table()

existing_kafka_tables = [
# 0041 added KAFKA_EVENTS_PLUGIN_INGESTION
KafkaTable(KAFKA_HOSTS, KAFKA_EVENTS_PLUGIN_INGESTION),
# 0042 added KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW (and KAFKA_SESSION_RECORDING_EVENTS, now unused)
KafkaTable(KAFKA_HOSTS, KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW),
]

new_kafka_tables = [
KafkaTable(KAFKA_HOSTS, KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL),
KafkaTable(SESSION_RECORDING_KAFKA_HOSTS, KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS),
KafkaTable(SESSION_RECORDING_KAFKA_HOSTS, KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW),
]

operations = [
run_sql_with_exceptions(table.get_create_table_sql()),
]

for kafka_table in existing_kafka_tables:
operations.append(run_sql_with_exceptions(MaterializedView(table, kafka_table).get_create_table_sql()))

for kafka_table in new_kafka_tables:
operations.extend(
[
run_sql_with_exceptions(kafka_table.get_create_table_sql()),
run_sql_with_exceptions(MaterializedView(table, kafka_table).get_create_table_sql()),
]
)
130 changes: 98 additions & 32 deletions posthog/models/kafka_partition_stats/sql.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,43 @@
from dataclasses import dataclass
from posthog.clickhouse.kafka_engine import kafka_engine
from posthog.clickhouse.table_engines import AggregatingMergeTree
from posthog.clickhouse.table_engines import AggregatingMergeTree, ReplacingMergeTree
from posthog.kafka_client.topics import KAFKA_EVENTS_PLUGIN_INGESTION
from posthog.settings import CLICKHOUSE_CLUSTER, CLICKHOUSE_DATABASE

CREATE_PARTITION_STATISTICS_KAFKA_TABLE = (
lambda monitored_topic: f"""
CREATE TABLE IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.kafka_{monitored_topic}_partition_statistics ON CLUSTER '{CLICKHOUSE_CLUSTER}'
(
`uuid` String,
`distinct_id` String,
`ip` String,
`site_url` String,
`data` String,
`team_id` Int64,
`now` String,
`sent_at` String,
`token` String
)
ENGINE={kafka_engine(topic=monitored_topic, group="partition_statistics")}
SETTINGS input_format_values_interpret_expressions=0, kafka_skip_broken_messages = 100;
"""
)

DROP_PARTITION_STATISTICS_KAFKA_TABLE = (
lambda monitored_topic: f"""
DROP TABLE IF EXISTS `{CLICKHOUSE_DATABASE}`.kafka_{monitored_topic}_partition_statistics ON CLUSTER '{CLICKHOUSE_CLUSTER}';
"""
)
@dataclass
class PartitionStatsKafkaTable:
brokers: list[str]
topic: str
consumer_group: str = "partition_statistics"

@property
def table_name(self) -> str:
return f"kafka_{self.topic}_partition_statistics"

def get_create_table_sql(self) -> str:
return f"""
CREATE TABLE IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}'
(
`uuid` String,
`distinct_id` String,
`ip` String,
`site_url` String,
`data` String,
`team_id` Int64,
`now` String,
`sent_at` String,
`token` String
)
ENGINE={kafka_engine(kafka_host=",".join(self.brokers), topic=self.topic, group=self.consumer_group)}
SETTINGS input_format_values_interpret_expressions=0, kafka_skip_broken_messages = 100
"""

def get_drop_table_sql(self) -> str:
return f"""
DROP TABLE IF EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}'
"""


EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS_TABLE_ENGINE = lambda: AggregatingMergeTree(
"events_plugin_ingestion_partition_statistics"
Expand Down Expand Up @@ -83,14 +94,69 @@
"""
)

CREATE_KAFKA_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS = CREATE_PARTITION_STATISTICS_KAFKA_TABLE(
KAFKA_EVENTS_PLUGIN_INGESTION
)

CREATE_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS_MV = CREATE_PARTITION_STATISTICS_MV(KAFKA_EVENTS_PLUGIN_INGESTION)

DROP_KAFKA_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS = DROP_PARTITION_STATISTICS_KAFKA_TABLE(
KAFKA_EVENTS_PLUGIN_INGESTION
)

DROP_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS_MV = DROP_PARTITION_STATISTICS_MV(KAFKA_EVENTS_PLUGIN_INGESTION)

# V2


class PartitionStatsV2Table:
table_name: str = "events_plugin_ingestion_partition_statistics_v2"

def get_create_table_sql(self) -> str:
engine = ReplacingMergeTree(self.table_name, ver="timestamp")
return f"""
CREATE TABLE IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' (
topic LowCardinality(String),
partition UInt64,
offset UInt64,
token String,
distinct_id String,
ip Tuple(v4 IPv4, v6 IPv6),
event String,
data_length UInt64,
timestamp DateTime
)
ENGINE = {engine}
PARTITION BY (topic, toStartOfDay(timestamp))
ORDER BY (topic, partition, offset)
TTL timestamp + INTERVAL 30 DAY
"""

def get_drop_table_sql(self) -> str:
return f"""
DROP TABLE IF EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' SYNC
"""


@dataclass
class PartitionStatsV2MaterializedView:
to_table: PartitionStatsV2Table
from_table: PartitionStatsKafkaTable

@property
def table_name(self) -> str:
return f"{self.from_table.topic}_partition_statistics_v2_mv"

def get_create_table_sql(self) -> str:
return f"""
CREATE MATERIALIZED VIEW IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}'
TO `{CLICKHOUSE_DATABASE}`.{self.to_table.table_name}
AS SELECT
_topic AS topic,
_partition AS partition,
_offset AS offset,
token,
distinct_id,
(toIPv4OrDefault(kafka_table.ip), toIPv6OrDefault(kafka_table.ip)) AS ip,
JSONExtractString(data, 'event') AS event,
length(data) AS data_length,
_timestamp AS timestamp
FROM {CLICKHOUSE_DATABASE}.{self.from_table.table_name} AS kafka_table
"""

def get_drop_table_sql(self) -> str:
return f"""
DROP TABLE IF EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' SYNC
"""

0 comments on commit 6e13f17

Please sign in to comment.