feat(property-vals): add two-stage aggregation to collapse pod amplification#60309
Merged
Conversation
…ication Stage 1 (events/groups workers) consumes raw events, fans out to per-pod aggregates, and produces to a new intermediate topic keyed by the full tuple (team_id:property_type:property_key:property_value). Stage 2 consumes the intermediate topic and re-aggregates so each unique tuple collapses to one record before reaching clickhouse_property_values. The shape of the cost we're cutting: with N pods consuming events spread across all partitions of team_event_partitioned_events_json (the random suffix in the input key spreads each team across most partitions), every pod's local HashMap independently holds an entry for a low-cardinality tuple like (team=2, \$browser, Chrome). At flush time all N pods produce a record for it, so clickhouse_property_values sees N records per tuple per window — measured at ~46% of total output in the 12-pod, 10% rollout state. Two-stage routes those N records to the single pod that owns hash(tuple) on the intermediate topic; that pod sums them into one record before forwarding to the final topic. Implementation details: - fan_out / fan_out_group now return Vec<(TupleKey, u64)>; stage 1 always emits count=1, but the worker_loop now adds the count instead of hardcoding 1, so the same loop handles stage 2. - PropertyValueMessage mirrors the producer's wire format. A serde round-trip test pins it to the producer struct so a future drift fails loudly instead of silently dropping records. - Producer partition key changes from "team:key" to "team:type:key:value" so the same tuple always lands on the same partition for stage 2 to find. - Stage 2's worker bypasses the team filter; every record on the intermediate already passed stage 1's filter, and re-applying after a rollout_percentage shrink would silently drop partial aggregates and break sum-conservation. - INTERMEDIATE_TOPIC and STAGE2_CONSUMER_GROUP default to property_vals_intermediate and clickhouse-property-vals-rs-stage2. Rollout requires creating the new Kafka topic ahead of the deploy; chart changes follow in a separate PR. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
The function reshapes one PropertyValueMessage into the `(TupleKey, count)` shape the aggregator works with. The cross-pod merging happens inside the aggregator's HashMap, not in this function, so the fan_in name was misleading. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
emit_from_blob now returns plain TupleKeys. The count=1 tag is added at the fan_out / fan_out_group boundary where it belongs semantically (one observation per event per property). emit_from_blob stays focused on JSON-to-tuples extraction. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
User preference: keep the count=1 inline in emit_from_blob alongside the TupleKey push. Reverts the prior layered factoring. This reverts commit dedb3b476fc1f6c0c1aa6f1e90efd95d40f9d6d4.
stage1/stage2 are positional and don't describe what each worker does. The events/groups workers already have descriptive names — only the new one was awkward. Rename: - stage2_consumer_group -> collapser_consumer_group - STAGE2_CONSUMER_GROUP -> COLLAPSER_CONSUMER_GROUP - clickhouse-property-vals-rs-stage2 -> clickhouse-property-vals-rs-collapser - stage2-worker k8s component -> collapser-worker The role describes the action: the collapser merges the per-pod duplicates that the events/groups workers emit for the same tuple. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
merger reads more naturally than collapser when describing what the worker does. Same role, better word: it merges per-pod partial aggregates for the same tuple into one final record. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Field names already say it.
Architectural purity I overdid. The merger now applies the same team filter as the events/groups workers. The only practical difference is that during a rollout_percentage shrink, a few seconds of in-flight records for the now-excluded team get dropped on the merger side. That's fine: the data is property autocomplete ranking counts, an undercount of a few seconds on a tuple already seen millions of times is invisible. Simpler code, no boolean parameter, no comment to explain.
Contributor
Prompt To Fix All With AIFix the following 2 code review issues. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 2
rust/property-vals-rs/src/types.rs:63-71
The canonical string representations for `PropertyType` ("event", "person", "group_N") are expressed twice: once in `as_kafka_key_segment` and once in the `Serialize` impl. The two impls must stay in sync, but nothing enforces that. `Serialize` can delegate to `as_kafka_key_segment` to make it OnceAndOnlyOnce.
```suggestion
impl Serialize for PropertyType {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
serializer.serialize_str(&self.as_kafka_key_segment())
}
}
```
### Issue 2 of 2
rust/property-vals-rs/src/fan_out.rs:421-428
**Round-trip test uses a local copy of `Outgoing`, not the real one**
The test defines its own `Outgoing` struct identical to `producer.rs::Outgoing`. If a field is renamed in the actual producer struct (e.g., `property_count` → `count`), the real producer starts emitting different JSON while this test's local copy still serializes the old field name — the test stays green but the merger silently drops the count from every message.
Making `Outgoing` `pub(crate)` in `producer.rs` and importing it here would let the test reference the real struct, so any drift between the two sides fails the build rather than the test.
Reviews (1): Last reviewed commit: "chore(property-vals): update last stage-..." | Re-trigger Greptile |
- cargo fmt fix on main.rs that was missed in the bypass removal
- PropertyType::Serialize now delegates to as_kafka_key_segment so the
canonical strings ("event", "person", "group_N") are defined exactly
once
- Round-trip test now imports the real producer::Outgoing struct instead
of duplicating it. If a producer field is renamed, the test breaks at
the build, not silently.
robbie-c
approved these changes
May 28, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
property-vals-rs aggregates
(team, type, key, value)tuples in each pod's local HashMap and flushes toclickhouse_property_valuesevery 30s. The input topicteam_event_partitioned_events_jsonkeys byteam_id-event-random(0..6), which spreads a single team's events across most of the 128 partitions. With N pods, the same low-cardinality tuple like(team=2, $browser, Chrome)lives in N different pods' HashMaps, so each pod emits a record for it at flush. ClickHouse ends up consuming N records for what should be one logical update per window.When I collasped the number of replicas in prod from 12 to 1, the records-per-event ratio from 0.17 to 0.092, a ~46% reduction. That portion of
clickhouse_property_valuestraffic is entirely cross-pod duplication, not unique data.Changes
Add a second aggregation pass so duplicates collapse before they reach
clickhouse_property_values.flowchart TB E[team_event_partitioned_events_json<br/>128 partitions, keyed team-event-random] subgraph POD1["pod 1"] S1_1["events/groups worker<br/>fan-out + local HashMap"] S2_1["merger<br/>merge cross-pod copies"] end subgraph POD2["pod 2"] S1_2["events/groups worker"] S2_2["merger"] end subgraph PODN["pod ...N"] S1_N["events/groups worker"] S2_N["merger"] end INT["property_vals_intermediate<br/>NEW, 64 partitions<br/>keyed hash team:type:key:value"] OUT["clickhouse_property_values<br/>1 record per tuple per window"] CH[(ClickHouse<br/>kafka_property_values MV<br/>AggregatingMergeTree sum)] E --> S1_1 & S1_2 & S1_N S1_1 & S1_2 & S1_N -->|partial aggregates<br/>routed by tuple hash| INT INT -->|each partition owned<br/>by one merger pod| S2_1 & S2_2 & S2_N S2_1 & S2_2 & S2_N -->|collapsed| OUT OUT --> CH style INT fill:#90EE90,stroke:#006400,color:#000 style S1_1 fill:#FFE4B5,color:#000 style S1_2 fill:#FFE4B5,color:#000 style S1_N fill:#FFE4B5,color:#000 style S2_1 fill:#FFB6C1,color:#000 style S2_2 fill:#FFB6C1,color:#000 style S2_N fill:#FFB6C1,color:#000The events and groups workers are existing. Their only behavior change is the destination topic and the partition key. The merger is a new consumer group in the same binary; each pod owns a partition slice and re-aggregates the per-pod copies the events/groups workers produced for the same tuple before forwarding to the final output topic.
Implementation notes:
fan_out/fan_out_groupnow returnVec<(TupleKey, u64)>. The events/groups workers still emit count=1 per tuple; the wider signature lets the sameworker_loophandle the merger, where the count comes from the input message.PropertyValueMessagetype for the merger's input. A round-trip test pins the schema to the producer's wire format so future drift fails loudly instead of silently dropping records.team:keytoteam:type:key:valueso the same tuple always lands on the same intermediate partition.INTERMEDIATE_TOPIC=property_vals_intermediate,MERGER_CONSUMER_GROUP=clickhouse-property-vals-rs-merger.How did you test this code?
I'm an agent and have not run the binary or the prod stack manually.
property-vals-rspass locally:cargo test -p property-vals-rscargo clippy -p property-vals-rs --no-deps --all-targetsis cleancargo fmt -p property-vals-rsis a no-opPropertyValueMessagedeserialization against the producer'sOutgoingserialization to catch silent schema drift in either direction.Publish to changelog?
no
🤖 Agent context
Co-authored with Claude. Earlier in the same session we measured the 46%/54% split between pod-amplification cost and unique-singleton cost by scaling property-vals-rs from 12 pods to 1 and comparing the
flush_tuples_sum / events_receivedratio. That experiment is what motivated investing in the two-pass architecture as opposed to alternatives like an MV-sideGROUP BY(which only helps if the CH-side bottleneck is INSERT/merge, not consume — and the data showedDelayedInserts=0,RejectedInserts=0, idle merge pool, whileKafkaRowsReadwas plateaued at ~62k/replica during the lag spike).Considered and rejected:
kafka_property_values: only helps stages 4–5 of the CH pipeline; the bottleneck data showed stages 1–2 (consume + parse) were where the ceiling lived.A singleton emission gate (drop tuples with count=1 at flush time) is a complementary follow-up — it addresses the other ~54% of the cost, which is unique high-cardinality identifiers that don't aggregate.