Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add revised partition stats tables #22163

Merged
merged 9 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.kafka_client.topics import KAFKA_EVENTS_PLUGIN_INGESTION
from posthog.models.kafka_partition_stats.sql import (
PartitionStatsKafkaTable,
CREATE_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS_MV,
CREATE_KAFKA_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS,
EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS,
)
from posthog.settings.data_stores import KAFKA_HOSTS

operations = [
run_sql_with_exceptions(CREATE_KAFKA_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS),
run_sql_with_exceptions(
PartitionStatsKafkaTable(KAFKA_HOSTS, KAFKA_EVENTS_PLUGIN_INGESTION).get_create_table_sql()
),
run_sql_with_exceptions(EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS()),
run_sql_with_exceptions(CREATE_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS_MV),
]
11 changes: 8 additions & 3 deletions posthog/clickhouse/migrations/0042_kafka_partitions_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@
KAFKA_SESSION_RECORDING_EVENTS,
)
from posthog.models.kafka_partition_stats.sql import (
CREATE_PARTITION_STATISTICS_KAFKA_TABLE,
PartitionStatsKafkaTable,
CREATE_PARTITION_STATISTICS_MV,
)
from posthog.settings.data_stores import KAFKA_HOSTS, SESSION_RECORDING_KAFKA_HOSTS

operations = [
run_sql_with_exceptions(CREATE_PARTITION_STATISTICS_KAFKA_TABLE(KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW)),
run_sql_with_exceptions(
PartitionStatsKafkaTable(KAFKA_HOSTS, KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW).get_create_table_sql()
),
run_sql_with_exceptions(CREATE_PARTITION_STATISTICS_MV(KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW)),
run_sql_with_exceptions(CREATE_PARTITION_STATISTICS_KAFKA_TABLE(KAFKA_SESSION_RECORDING_EVENTS)),
run_sql_with_exceptions(
PartitionStatsKafkaTable(SESSION_RECORDING_KAFKA_HOSTS, KAFKA_SESSION_RECORDING_EVENTS).get_create_table_sql()
),
run_sql_with_exceptions(CREATE_PARTITION_STATISTICS_MV(KAFKA_SESSION_RECORDING_EVENTS)),
]
44 changes: 44 additions & 0 deletions posthog/clickhouse/migrations/0062_partition_stats_v2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.kafka_client.topics import (
KAFKA_EVENTS_PLUGIN_INGESTION,
KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL,
KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW,
)
from posthog.models.kafka_partition_stats.sql import (
PartitionStatsKafkaTable as KafkaTable,
PartitionStatsV2MaterializedView as MaterializedView,
PartitionStatsV2Table as Table,
)
from posthog.settings.data_stores import KAFKA_HOSTS, SESSION_RECORDING_KAFKA_HOSTS

table = Table()

existing_kafka_tables = [
# 0041 added KAFKA_EVENTS_PLUGIN_INGESTION
KafkaTable(KAFKA_HOSTS, KAFKA_EVENTS_PLUGIN_INGESTION),
# 0042 added KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW (and KAFKA_SESSION_RECORDING_EVENTS, now unused)
KafkaTable(KAFKA_HOSTS, KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW),
]

new_kafka_tables = [
KafkaTable(KAFKA_HOSTS, KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL),
KafkaTable(SESSION_RECORDING_KAFKA_HOSTS, KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS),
KafkaTable(SESSION_RECORDING_KAFKA_HOSTS, KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW),
Comment on lines +26 to +28
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Including mutable configuration in what should be an immutable migration seems like a dangerous thing to me but not something I want to tackle right now. This is at least clearer than it was before, where KAFKA_HOSTS was added to the table engine as a default value several layers down the stack.

]

operations = [
run_sql_with_exceptions(table.get_create_table_sql()),
]

for kafka_table in existing_kafka_tables:
operations.append(run_sql_with_exceptions(MaterializedView(table, kafka_table).get_create_table_sql()))

for kafka_table in new_kafka_tables:
operations.extend(
[
run_sql_with_exceptions(kafka_table.get_create_table_sql()),
run_sql_with_exceptions(MaterializedView(table, kafka_table).get_create_table_sql()),
]
)
130 changes: 98 additions & 32 deletions posthog/models/kafka_partition_stats/sql.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,43 @@
from dataclasses import dataclass
from posthog.clickhouse.kafka_engine import kafka_engine
from posthog.clickhouse.table_engines import AggregatingMergeTree
from posthog.clickhouse.table_engines import AggregatingMergeTree, ReplacingMergeTree
from posthog.kafka_client.topics import KAFKA_EVENTS_PLUGIN_INGESTION
from posthog.settings import CLICKHOUSE_CLUSTER, CLICKHOUSE_DATABASE

CREATE_PARTITION_STATISTICS_KAFKA_TABLE = (
lambda monitored_topic: f"""
CREATE TABLE IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.kafka_{monitored_topic}_partition_statistics ON CLUSTER '{CLICKHOUSE_CLUSTER}'
(
`uuid` String,
`distinct_id` String,
`ip` String,
`site_url` String,
`data` String,
`team_id` Int64,
`now` String,
`sent_at` String,
`token` String
)
ENGINE={kafka_engine(topic=monitored_topic, group="partition_statistics")}
SETTINGS input_format_values_interpret_expressions=0, kafka_skip_broken_messages = 100;
"""
)

DROP_PARTITION_STATISTICS_KAFKA_TABLE = (
lambda monitored_topic: f"""
DROP TABLE IF EXISTS `{CLICKHOUSE_DATABASE}`.kafka_{monitored_topic}_partition_statistics ON CLUSTER '{CLICKHOUSE_CLUSTER}';
"""
)
@dataclass
class PartitionStatsKafkaTable:
brokers: list[str]
topic: str
consumer_group: str = "partition_statistics"

@property
def table_name(self) -> str:
return f"kafka_{self.topic}_partition_statistics"

def get_create_table_sql(self) -> str:
return f"""
CREATE TABLE IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}'
(
`uuid` String,
`distinct_id` String,
`ip` String,
`site_url` String,
`data` String,
`team_id` Int64,
`now` String,
`sent_at` String,
`token` String
Comment on lines +22 to +30
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how accurate this is (some of these values don't seem to be sent any longer, or at least not sent consistently) but I'm not sure there's any canonical definition here for what is valid beyond "whatever capture is sending"? (This section is just a refactor of the existing consumer anyway. I tried to ensure the fields used in the new destination table and materialized view were only those that seem to be consistently sent.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish we could bring back
👏 Proto 👏 Buf 👏

#10932

)
ENGINE={kafka_engine(kafka_host=",".join(self.brokers), topic=self.topic, group=self.consumer_group)}
SETTINGS input_format_values_interpret_expressions=0, kafka_skip_broken_messages = 100
"""

def get_drop_table_sql(self) -> str:
return f"""
DROP TABLE IF EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}'
"""


EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS_TABLE_ENGINE = lambda: AggregatingMergeTree(
"events_plugin_ingestion_partition_statistics"
Expand Down Expand Up @@ -83,14 +94,69 @@
"""
)

CREATE_KAFKA_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS = CREATE_PARTITION_STATISTICS_KAFKA_TABLE(
KAFKA_EVENTS_PLUGIN_INGESTION
)

CREATE_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS_MV = CREATE_PARTITION_STATISTICS_MV(KAFKA_EVENTS_PLUGIN_INGESTION)

DROP_KAFKA_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS = DROP_PARTITION_STATISTICS_KAFKA_TABLE(
KAFKA_EVENTS_PLUGIN_INGESTION
)

DROP_EVENTS_PLUGIN_INGESTION_PARTITION_STATISTICS_MV = DROP_PARTITION_STATISTICS_MV(KAFKA_EVENTS_PLUGIN_INGESTION)

# V2


class PartitionStatsV2Table:
table_name: str = "events_plugin_ingestion_partition_statistics_v2"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not attached to this name (or any of the others in here for that matter), just following the existing pattern. I'm not sure that this table really represents "partition statistics" any longer, but I don't have any better ideas on what to call this either.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We often want more information about the events in a Kafka topic for diagnostic purposes, particularly during incidents.

events_plugin_ingestion_diagnostics_v2 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to leave this as-is for now just to avoid introducing additional names/concepts even if this one isn't quite right anymore.


def get_create_table_sql(self) -> str:
engine = ReplacingMergeTree(self.table_name, ver="timestamp")
return f"""
CREATE TABLE IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' (
topic LowCardinality(String),
partition UInt64,
offset UInt64,
Comment on lines +112 to +113
Copy link
Contributor Author

@tkaemming tkaemming May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delta encoding for partition and delta or double delta for offset would probably yield pretty significant compression here - can either try those here, or add them later to get a more interesting before/after comparison.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't worry too much about size on these tables since we are TTLing after 30 days.
That being said I do think this kind of codec could work nicely for something like this, especially offset where it's generally going to be small and repeated values especially compared to the original offset value

token String,
distinct_id String,
ip Tuple(v4 IPv4, v6 IPv6),
event String,
data_length UInt64,
timestamp DateTime
Copy link
Contributor Author

@tkaemming tkaemming May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timestamp probably would compress relatively well with either delta or Gorilla encoding.

Note that this is DateTime and not DateTime64. I don't think the extra precision would be particularly useful here for anything we're realistically using this table for?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would actually work out really nicely. I'm interested to try this 👀

Gorilla just uses Delta encoding for Timestamps right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gorilla is XOR based and the docs say "The smaller the difference between consecutive values is, i.e. the slower the values of the series changes, the better the compression rate" which makes me think it might make sense here (these values will change slowly but don't have a constant stride.)

T64 also looks interesting: https://clickhouse.com/docs/en/sql-reference/statements/create/table#t64

I'll leave this and the other columns from #22163 (comment) alone for now — we can play with them later to see their impact.

)
ENGINE = {engine}
PARTITION BY (topic, toStartOfDay(timestamp))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Date range here was also fairly arbitrarily selected, this could be week or month. For the query patterns I'm anticipating, I'm not sure it matters that much.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope it's not by month if TTL timestamp + INTERVAL 30 DAY !

I think toStartOfDay is kind of perfect

ORDER BY (topic, partition, offset)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't account for multiple clusters, which means that splitting off existing topics to new clusters or failing over to a new cluster will make queries wonky unless a timestamp predicate is added to the query based on when the offset reset occurred.

I'd prefer to have some sort of cluster ID here but I'm not sure this is the right place to start trying to start designing better multi-cluster support than just having KAFKA_HOSTS and SESSION_RECORDING_KAFKA_HOSTS.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could just have a static cluster string column for each kafka table that we can use to trace back to which kafka cluster the stats/data came from.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

I'd prefer to keep the naming consistent if we can across different parts of the system (rather than having the same cluster ending up being named different things in different contexts) so I'll punt on this for now to avoid the potential of introducing inconsistency.

TTL timestamp + INTERVAL 30 DAY
Copy link
Contributor Author

@tkaemming tkaemming May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

30 days was a pretty arbitrary choice, no issues with this being longer (though I'm not sure why we'd need more data) or shorter (probably would want to keep at least 7 days.)

I didn't really do any estimation on this, my expectation is that most of the table will compress pretty well aside from token, distinct ID and IP address. We can always drop partitions if it becomes untenably large.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no I think 30 days is just about perfect. We can always tune down if we need to.

"""

def get_drop_table_sql(self) -> str:
return f"""
DROP TABLE IF EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' SYNC
"""


@dataclass
class PartitionStatsV2MaterializedView:
to_table: PartitionStatsV2Table
from_table: PartitionStatsKafkaTable

@property
def table_name(self) -> str:
return f"{self.from_table.topic}_partition_statistics_v2_mv"

def get_create_table_sql(self) -> str:
return f"""
CREATE MATERIALIZED VIEW IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}'
TO `{CLICKHOUSE_DATABASE}`.{self.to_table.table_name}
AS SELECT
_topic AS topic,
_partition AS partition,
_offset AS offset,
token,
distinct_id,
(toIPv4OrDefault(kafka_table.ip), toIPv6OrDefault(kafka_table.ip)) AS ip,
JSONExtractString(data, 'event') AS event,
length(data) AS data_length,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be fair, I just stole this from your original table DDL.

_timestamp AS timestamp
FROM {CLICKHOUSE_DATABASE}.{self.from_table.table_name} AS kafka_table
"""

def get_drop_table_sql(self) -> str:
return f"""
DROP TABLE IF EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' SYNC
"""