[Feature] SSE Materialized View Creation and Ingestion#18528
Merged
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18528 +/- ##
============================================
+ Coverage 63.72% 64.23% +0.50%
+ Complexity 1932 1126 -806
============================================
Files 3292 3309 +17
Lines 201470 203555 +2085
Branches 31316 31684 +368
============================================
+ Hits 128396 130759 +2363
+ Misses 62789 62301 -488
- Partials 10285 10495 +210
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
7ab3df3 to
c712f9b
Compare
c712f9b to
4a1d1ae
Compare
ba83f2e to
59e38bc
Compare
1520791 to
2a016e0
Compare
PR 1 of a 2-PR split. This change introduces the view-creation and materialization pipeline; broker-side query rewrite ships in PR 2. **Scope of PR 1:** - MV table definition (analyzer, time-expression validator, ZK metadata) - Per-partition runtime metadata (PartitionInfo, PartitionState, PartitionFingerprint) and storage utilities - Controller-side consistency manager (subscribes to base-table segment events; debounces and CAS-marks affected MV partitions STALE) - Controller integration (PinotHelixResourceManager / SegmentDeletionManager notify methods; BaseControllerStarter wires the manager before listeners fire) - Minion task pipeline (generator + executor + segment lineage replace, saturation-LIMIT gate, partition-fingerprint CAS write to runtime znode) - pinot-materialized-view module: analysis/, consistency/, metadata/, scheduler/, executor/, context/ - Constants moved from MinionConstants.MaterializedViewTask to CommonConstants.MaterializedViewTask (pinot-spi) - Quickstart: airlineStatsMv example (TIMESTAMP MV column derived from base via DaysSinceEpoch * 86400000), with per-aggregation MV-vs-base result comparison - DESIGN.md documenting the time-windowed model and the planned fixed-partition extension **Out of scope (deferred to PR 2):** - Broker query rewrite engine + subsumption strategies (AggregationEquivalenceRegistry and the equivalence rule classes land with the engine that actually applies them) - Broker metadata cache + handler + split dispatcher - BrokerResponse.materializedViewQueried response annotation - BaseSingleStageBrokerRequestHandler MV integration Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-Authored-By: Hongkun Xu <xuhongkun666@163.com>
2a016e0 to
ca74920
Compare
xiangfu0
approved these changes
May 19, 2026
8 tasks
4 tasks
Contributor
|
Docs PR opened: pinot-contrib/pinot-docs#811 |
xiangfu0
added a commit
to pinot-contrib/pinot-docs
that referenced
this pull request
May 20, 2026
## What changed - add a new Querying & SQL page for the current materialized-view feature surface - wire the page into the Querying & SQL landing page and SUMMARY navigation - document the Data Explorer and controller REST discovery surface that landed with the MV UI work ## Source cross-checks - validated the docs against `pinot-materialized-view`, `MaterializedViewQuickStart`, `PinotMaterializedViewRestletResource`, and the bundled `airlineStatsMv` example config/schema in the local `apache/pinot` checkout - kept the docs scoped to the shipped surface: offline MV tables, append-only source tables, explicit `MaterializedViewTask` config, supported aggregations, and direct MV-table querying ## Validation - `git diff --check` - verified the edited internal links and content refs point at existing docs files
xiangfu0
added a commit
to hongkunxu/pinot
that referenced
this pull request
May 20, 2026
PR 2 of 2 in the SSE Materialized View series. PR 1 (apache#18528) landed the MV view-definition, controller routing, and minion-task plumbing; this change turns on the broker-side query rewrite that actually serves user queries from the materialized view. Highlights ---------- * Broker query rewrite engine with three subsumption strategies — Exact, Aggregation (re-aggregation via the AggregationEquivalence registry), and Scan — chosen at compile time from the rewrite plan. * Two execution modes: FULL_REWRITE swaps the server query, schema, and table name at compile time; SPLIT_REWRITE issues dual scatter-gather (base side `ts >= watermarkMs`, MV side `ts < watermarkMs`) and merges DataTables via IdentityHashMap so the same physical server can carry both sides. * Defense-in-depth fallbacks at both compile and execute time bump a new `QUERY_REWRITE_EXCEPTIONS` meter and fall back to the base-table query whenever the MV path raises, so an MV failure never surfaces as a 500. * MV cache lifecycle wired through the broker's resource state model: invalidate on OFFLINE/DROPPED, refresh on OFFLINE→ONLINE so a cycled broker resource cannot leave an MV silently un-queryable. * SPLIT_REWRITE detects the "all MV servers failed" case and refuses to return partial results — falls back to the base-table path instead. * Result-column names are preserved through rewrite via implicit aliases on the rewritten select list, so clients reading the result by column name are not silently broken when the MV path kicks in. * TIMESTAMP-only contract on the MV time column, validated at view creation, so the broker never has to guess time-format alignment between base and MV. * Per-MV `rewriteEnabled` and `stalenessThresholdMs` knobs for SLO control. * Quickstart example wires a `TIMESTAMP` time column on the airlineStatsView table via `definedSQL` (`daysSinceEpoch * 24 * 60 * 60 * 1000`). Co-Authored-By: Hongkun Xu <xuhongkun666@163.com> Co-Authored-By: Xiang Fu <xiangfu@startree.ai>
xiangfu0
added a commit
to hongkunxu/pinot
that referenced
this pull request
May 20, 2026
PR 2 of 2 in the SSE Materialized View series. PR 1 (apache#18528) landed the MV view-definition, controller routing, and minion-task plumbing; this change turns on the broker-side query rewrite that actually serves user queries from the materialized view. Highlights ---------- * Broker query rewrite engine with three subsumption strategies — Exact, Aggregation (re-aggregation via the AggregationEquivalence registry), and Scan — chosen at compile time from the rewrite plan. * Two execution modes: FULL_REWRITE swaps the server query, schema, and table name at compile time; SPLIT_REWRITE issues dual scatter-gather (base side `ts >= watermarkMs`, MV side `ts < watermarkMs`) and merges DataTables via IdentityHashMap so the same physical server can carry both sides. * Defense-in-depth fallbacks at both compile and execute time bump a new `QUERY_REWRITE_EXCEPTIONS` meter and fall back to the base-table query whenever the MV path raises, so an MV failure never surfaces as a 500. * MV cache lifecycle wired through the broker's resource state model: invalidate on OFFLINE/DROPPED, refresh on OFFLINE→ONLINE so a cycled broker resource cannot leave an MV silently un-queryable. * SPLIT_REWRITE detects "all MV servers failed" AND "all base servers failed" and refuses to return partial results — falls back to the base-table path. * FULL_REWRITE is rejected on hybrid base tables (where a batch MV would otherwise silently drop realtime data) and the broker-internal `materializedViewRewrite` query-option marker is stripped from user input so it can never bypass `BrokerReduceService`'s nested-query safety net. * Result-column names are preserved through rewrite via implicit aliases on the rewritten select list, so clients reading the result by column name are not silently broken when the MV path kicks in. * TIMESTAMP-only contract on the MV time column, validated at view creation, so the broker never has to guess time-format alignment between base and MV. * Per-MV `rewriteEnabled` and `stalenessThresholdMs` knobs for SLO control. * `BrokerGauge.MATERIALIZED_VIEW_CACHE_ENTRY_COUNT` exposes the MV metadata cache size so operators can detect unbounded growth. * Quickstart example wires a `TIMESTAMP` time column on the airlineStatsView table via `definedSQL` (`daysSinceEpoch * 24 * 60 * 60 * 1000`). Co-Authored-By: Hongkun Xu <xuhongkun666@163.com> Co-Authored-By: Xiang Fu <xiangfu@startree.ai>
xiangfu0
added a commit
to hongkunxu/pinot
that referenced
this pull request
May 20, 2026
PR 2 of 2 in the SSE Materialized View series. PR 1 (apache#18528) landed the MV view-definition, controller routing, and minion-task plumbing; this change turns on the broker-side query rewrite that actually serves user queries from the materialized view. Highlights ---------- * Broker query rewrite engine with three subsumption strategies — Exact, Aggregation (re-aggregation via the AggregationEquivalence registry), and Scan — chosen at compile time from the rewrite plan. * Two execution modes: FULL_REWRITE swaps the server query, schema, and table name at compile time; SPLIT_REWRITE issues dual scatter-gather (base side `ts >= watermarkMs`, MV side `ts < watermarkMs`) and merges DataTables via IdentityHashMap so the same physical server can carry both sides. * Defense-in-depth fallbacks at both compile and execute time bump a new `QUERY_REWRITE_EXCEPTIONS` meter and fall back to the base-table query whenever the MV path raises, so an MV failure never surfaces as a 500. The fallback path refreshes routing locals from the recomputed route so cancel + metrics + error reporting reflect the live state. * MV cache lifecycle wired through the broker's resource state model: invalidate on OFFLINE/DROPPED, refresh on OFFLINE→ONLINE so a cycled broker resource cannot leave an MV silently un-queryable. * SPLIT_REWRITE refuses to return partial results when *all* MV-side or *all* base-side servers failed — falls back to the base-table path. * FULL_REWRITE is rejected on hybrid base tables (where a batch MV would otherwise silently drop realtime data) and the broker-internal `materializedViewRewrite` query-option marker is stripped from user input so it can never bypass `BrokerReduceService`'s nested-query safety net. * Result-column names are preserved through rewrite via implicit aliases on the rewritten select list, so clients reading the result by column name are not silently broken when the MV path kicks in. * TIMESTAMP-only contract on the MV time column, validated at view creation, so the broker never has to guess time-format alignment between base and MV. * Per-MV `rewriteEnabled` and `stalenessThresholdMs` knobs for SLO control. * `BrokerGauge.MATERIALIZED_VIEW_CACHE_ENTRY_COUNT` exposes the MV metadata cache size so operators can detect unbounded growth. * Quickstart example wires a `TIMESTAMP` time column on the airlineStatsView table via `definedSQL` (`daysSinceEpoch * 24 * 60 * 60 * 1000`). Co-Authored-By: Hongkun Xu <xuhongkun666@163.com> Co-Authored-By: Xiang Fu <xiangfu@startree.ai>
xiangfu0
added a commit
to hongkunxu/pinot
that referenced
this pull request
May 20, 2026
PR 2 of 2 in the SSE Materialized View series. PR 1 (apache#18528) landed the MV view-definition, controller routing, and minion-task plumbing; this change turns on the broker-side query rewrite that actually serves user queries from the materialized view. Highlights ---------- * Broker query rewrite engine with three subsumption strategies — Exact, Aggregation (re-aggregation via the AggregationEquivalence registry), and Scan — chosen at compile time from the rewrite plan. * Two execution modes: FULL_REWRITE swaps the server query, schema, and table name at compile time; SPLIT_REWRITE issues dual scatter-gather (base side `ts >= watermarkMs`, MV side `ts < watermarkMs`) and merges DataTables via IdentityHashMap so the same physical server can carry both sides. * Defense-in-depth fallbacks at both compile and execute time bump a new `QUERY_REWRITE_EXCEPTIONS` meter and fall back to the base-table query whenever the MV path raises, so an MV failure never surfaces as a 500. The fallback path refreshes routing locals from the recomputed route so cancel + metrics + error reporting reflect the live state. * MV cache lifecycle wired through the broker's resource state model: invalidate on OFFLINE/DROPPED, refresh on OFFLINE→ONLINE so a cycled broker resource cannot leave an MV silently un-queryable. * SPLIT_REWRITE refuses to return partial results when *all* MV-side or *all* base-side servers failed — falls back to the base-table path. * FULL_REWRITE is rejected on hybrid base tables (where a batch MV would otherwise silently drop realtime data) and the broker-internal `materializedViewRewrite` query-option marker is stripped from user input so it can never bypass `BrokerReduceService`'s nested-query safety net. Both guards have regression tests pinning the contract. * Result-column names are preserved through rewrite via implicit aliases on the rewritten select list, so clients reading the result by column name are not silently broken when the MV path kicks in. * TIMESTAMP-only contract on the MV time column, validated at view creation, so the broker never has to guess time-format alignment between base and MV. * Per-MV `rewriteEnabled` and `stalenessThresholdMs` knobs for SLO control. * `BrokerGauge.MATERIALIZED_VIEW_CACHE_ENTRY_COUNT` exposes the MV metadata cache size so operators can detect unbounded growth. * Integration test polls real cache observables (not fixed sleeps) so the test runs as fast as the cluster's actual ZK propagation allows. * Quickstart example wires a `TIMESTAMP` time column on the airlineStatsView table via `definedSQL` (`daysSinceEpoch * 24 * 60 * 60 * 1000`). Co-Authored-By: Hongkun Xu <xuhongkun666@163.com> Co-Authored-By: Xiang Fu <xiangfu@startree.ai>
xiangfu0
added a commit
to hongkunxu/pinot
that referenced
this pull request
May 20, 2026
PR 2 of 2 in the SSE Materialized View series. PR 1 (apache#18528) landed the MV view-definition, controller routing, and minion-task plumbing; this change turns on the broker-side query rewrite that actually serves user queries from the materialized view. Highlights ---------- * Broker query rewrite engine with three subsumption strategies — Exact, Aggregation (re-aggregation via the AggregationEquivalence registry), and Scan — chosen at compile time from the rewrite plan. * Two execution modes: FULL_REWRITE swaps the server query, schema, and table name at compile time; SPLIT_REWRITE issues dual scatter-gather (base side `ts >= watermarkMs`, MV side `ts < watermarkMs`) and merges DataTables via IdentityHashMap so the same physical server can carry both sides. * Defense-in-depth fallbacks at both compile and execute time bump a new `QUERY_REWRITE_EXCEPTIONS` meter and fall back to the base-table query whenever the MV path raises, so an MV failure never surfaces as a 500. The fallback path refreshes routing locals from the recomputed route so cancel + metrics + error reporting reflect the live state. * MV cache lifecycle wired through the broker's resource state model: invalidate on OFFLINE/DROPPED, refresh on OFFLINE→ONLINE so a cycled broker resource cannot leave an MV silently un-queryable. * SPLIT_REWRITE refuses to return partial results when *all* MV-side or *all* base-side servers failed — falls back to the base-table path. * FULL_REWRITE is rejected on hybrid base tables (where a batch MV would otherwise silently drop realtime data) and the broker-internal `materializedViewRewrite` query-option marker is stripped from user input inside `doHandleRequest` so it can never bypass `BrokerReduceService`'s nested-query safety net — even on the IN_SUBQUERY recursion path. * Result-column names are preserved through rewrite via implicit aliases on the rewritten select list, so clients reading the result by column name are not silently broken when the MV path kicks in. * TIMESTAMP-only contract on the MV time column, validated at view creation, so the broker never has to guess time-format alignment between base and MV. * Per-MV `rewriteEnabled` and `stalenessThresholdMs` knobs for SLO control. * `BrokerGauge.MATERIALIZED_VIEW_CACHE_ENTRY_COUNT` exposes the MV metadata cache size so operators can detect unbounded growth. * Integration test polls real cache observables (not fixed sleeps) so the test runs as fast as the cluster's actual ZK propagation allows. * Quickstart example wires a `TIMESTAMP` time column on the airlineStatsView table via `definedSQL` (`daysSinceEpoch * 24 * 60 * 60 * 1000`). Co-Authored-By: Hongkun Xu <xuhongkun666@163.com> Co-Authored-By: Xiang Fu <xiangfu@startree.ai>
7 tasks
xiangfu0
added a commit
to hongkunxu/pinot
that referenced
this pull request
May 21, 2026
PR 2 of 2 in the SSE Materialized View series. PR 1 (apache#18528) landed the MV view-definition, controller routing, and minion-task plumbing; this change turns on the broker-side query rewrite that actually serves user queries from the materialized view. Highlights ---------- * Broker query rewrite engine with three subsumption strategies — Exact, Aggregation (re-aggregation via the AggregationEquivalence registry), and Scan — chosen at compile time from the rewrite plan. * Two execution modes: FULL_REWRITE swaps the server query, schema, and table name at compile time; SPLIT_REWRITE issues dual scatter-gather (base side `ts >= watermarkMs`, MV side `ts < watermarkMs`) and merges DataTables via IdentityHashMap so the same physical server can carry both sides. * Defense-in-depth fallbacks at both compile and execute time bump a new `QUERY_REWRITE_EXCEPTIONS` meter and fall back to the base-table query whenever the MV path raises, so an MV failure never surfaces as a 500. The fallback path refreshes routing locals from the recomputed route so cancel + metrics + error reporting reflect the live state. * MV cache lifecycle wired through the broker's resource state model: invalidate on OFFLINE/DROPPED, refresh on OFFLINE→ONLINE for BOTH MV-table cycles (direct rehydrate) and base-table cycles (walk MV defs and reload any whose baseTables reference the transitioning table) so a transient broker-resource bounce never silently disables MV rewrite. * `annotateResponse` stamps `materializedViewQueried` only when a swap was actually committed (`isFullRewrite() || isSplitRewrite()`) — no false-positives on `fromRewriteResult` (skip) paths. * SPLIT_REWRITE refuses to return partial results when *all* MV-side or *all* base-side servers failed — falls back to the base-table path. * FULL_REWRITE is rejected on hybrid base tables (where a batch MV would otherwise silently drop realtime data) and the broker-internal `materializedViewRewrite` query-option marker is stripped from user input inside `doHandleRequest` so it can never bypass `BrokerReduceService`'s nested-query safety net — even on the IN_SUBQUERY recursion path. * Result-column names are preserved through rewrite via implicit aliases on the rewritten select list, so clients reading the result by column name are not silently broken when the MV path kicks in. * TIMESTAMP-only contract on the MV time column, validated at view creation, so the broker never has to guess time-format alignment between base and MV. * Per-MV `rewriteEnabled` and `stalenessThresholdMs` knobs for SLO control. * `BrokerGauge.MATERIALIZED_VIEW_CACHE_ENTRY_COUNT` exposes the MV metadata cache size so operators can detect unbounded growth. * Integration test polls real cache observables (not fixed sleeps) so the test runs as fast as the cluster's actual ZK propagation allows. * Quickstart example wires a `TIMESTAMP` time column on the airlineStatsView table via `definedSQL` (`daysSinceEpoch * 24 * 60 * 60 * 1000`). Co-Authored-By: Hongkun Xu <xuhongkun666@163.com> Co-Authored-By: Xiang Fu <xiangfu@startree.ai>
xiangfu0
added a commit
to hongkunxu/pinot
that referenced
this pull request
May 21, 2026
PR 2 of 2 in the SSE Materialized View series. PR 1 (apache#18528) landed the MV view-definition, controller routing, and minion-task plumbing; this change turns on the broker-side query rewrite that actually serves user queries from the materialized view. Highlights ---------- * Broker query rewrite engine with three subsumption strategies — Exact, Aggregation (re-aggregation via the AggregationEquivalence registry), and Scan — chosen at compile time from the rewrite plan. * Two execution modes: FULL_REWRITE swaps the server query, schema, and table name at compile time; SPLIT_REWRITE issues dual scatter-gather (base side `ts >= watermarkMs`, MV side `ts < watermarkMs`) and merges DataTables via IdentityHashMap so the same physical server can carry both sides. * Defense-in-depth fallbacks at both compile and execute time bump a new `QUERY_REWRITE_EXCEPTIONS` meter and fall back to the base-table query whenever the MV path raises, so an MV failure never surfaces as a 500. The fallback path refreshes routing locals from the recomputed route so cancel + metrics + error reporting reflect the live state. * MV cache lifecycle wired through the broker's resource state model: invalidate on OFFLINE/DROPPED, refresh on OFFLINE→ONLINE for BOTH MV-table cycles (direct rehydrate) and base-table cycles (walk MV defs and reload any whose baseTables reference the transitioning table) so a transient broker-resource bounce never silently disables MV rewrite. * `annotateResponse` stamps `materializedViewQueried` only when a swap was actually committed (`isFullRewrite() || isSplitRewrite()`) — no false-positives on `fromRewriteResult` (skip) paths. * SPLIT_REWRITE refuses to return partial results when *all* MV-side or *all* base-side servers failed — falls back to the base-table path. * FULL_REWRITE is rejected on hybrid base tables (where a batch MV would otherwise silently drop realtime data) and the broker-internal `materializedViewRewrite` query-option marker is stripped from user input inside `doHandleRequest` so it can never bypass `BrokerReduceService`'s nested-query safety net — even on the IN_SUBQUERY recursion path. * Result-column names are preserved through rewrite via implicit aliases on the rewritten select list, so clients reading the result by column name are not silently broken when the MV path kicks in. * TIMESTAMP-only contract on the MV time column, validated at view creation, so the broker never has to guess time-format alignment between base and MV. * Per-MV `rewriteEnabled` and `stalenessThresholdMs` knobs for SLO control. * `BrokerGauge.MATERIALIZED_VIEW_CACHE_ENTRY_COUNT` exposes the MV metadata cache size so operators can detect unbounded growth. * Integration test polls real cache observables (not fixed sleeps) so the test runs as fast as the cluster's actual ZK propagation allows. * Quickstart example wires a `TIMESTAMP` time column on the airlineStatsView table via `definedSQL` (`daysSinceEpoch * 24 * 60 * 60 * 1000`). Co-Authored-By: Hongkun Xu <xuhongkun666@163.com> Co-Authored-By: Xiang Fu <xiangfu@startree.ai>
xiangfu0
added a commit
to hongkunxu/pinot
that referenced
this pull request
May 21, 2026
PR 2 of 2 in the SSE Materialized View series. PR 1 (apache#18528) landed the MV view-definition, controller routing, and minion-task plumbing; this change turns on the broker-side query rewrite that actually serves user queries from the materialized view. Highlights ---------- * Broker query rewrite engine with three subsumption strategies — Exact, Aggregation (re-aggregation via the AggregationEquivalence registry), and Scan — chosen at compile time from the rewrite plan. * Two execution modes: FULL_REWRITE swaps the server query, schema, and table name at compile time; SPLIT_REWRITE issues dual scatter-gather (base side `ts >= watermarkMs`, MV side `ts < watermarkMs`) and merges DataTables via IdentityHashMap so the same physical server can carry both sides. * Defense-in-depth fallbacks at both compile and execute time bump a new `QUERY_REWRITE_EXCEPTIONS` meter and fall back to the base-table query whenever the MV path raises, so an MV failure never surfaces as a 500. The fallback path refreshes routing locals from the recomputed route so cancel + metrics + error reporting reflect the live state. * MV cache lifecycle wired through the broker's resource state model: invalidate on OFFLINE/DROPPED, refresh on OFFLINE→ONLINE for BOTH MV-table cycles (direct rehydrate) and base-table cycles (walk MV defs and reload any whose baseTables reference the transitioning table) so a transient broker-resource bounce never silently disables MV rewrite. * `annotateResponse` stamps `materializedViewQueried` only when a swap was actually committed (`isFullRewrite() || isSplitRewrite()`) — no false-positives on `fromRewriteResult` (skip) paths. * SPLIT_REWRITE refuses to return partial results when *all* MV-side or *all* base-side servers failed — falls back to the base-table path. * FULL_REWRITE is rejected on hybrid base tables (where a batch MV would otherwise silently drop realtime data) and the broker-internal `materializedViewRewrite` query-option marker is stripped from user input inside `doHandleRequest` so it can never bypass `BrokerReduceService`'s nested-query safety net — even on the IN_SUBQUERY recursion path. * Result-column names are preserved through rewrite via implicit aliases on the rewritten select list, so clients reading the result by column name are not silently broken when the MV path kicks in. * TIMESTAMP-only contract on the MV time column, validated at view creation, so the broker never has to guess time-format alignment between base and MV. * Per-MV `rewriteEnabled` and `stalenessThresholdMs` knobs for SLO control. * `BrokerGauge.MATERIALIZED_VIEW_CACHE_ENTRY_COUNT` exposes the MV metadata cache size so operators can detect unbounded growth. * Integration test polls real cache observables (not fixed sleeps) so the test runs as fast as the cluster's actual ZK propagation allows. * Quickstart example wires a `TIMESTAMP` time column on the airlineStatsView table via `definedSQL` (`daysSinceEpoch * 24 * 60 * 60 * 1000`). Co-Authored-By: Hongkun Xu <xuhongkun666@163.com> Co-Authored-By: Xiang Fu <xiangfu@startree.ai>
xiangfu0
added a commit
to hongkunxu/pinot
that referenced
this pull request
May 21, 2026
PR 2 of 2 in the SSE Materialized View series. PR 1 (apache#18528) landed the MV view-definition, controller routing, and minion-task plumbing; this change turns on the broker-side query rewrite that actually serves user queries from the materialized view. Highlights ---------- * Broker query rewrite engine with three subsumption strategies — Exact, Aggregation (re-aggregation via the AggregationEquivalence registry), and Scan — chosen at compile time from the rewrite plan. * Two execution modes: FULL_REWRITE swaps the server query, schema, and table name at compile time; SPLIT_REWRITE issues dual scatter-gather (base side `ts >= watermarkMs`, MV side `ts < watermarkMs`) and merges DataTables via IdentityHashMap so the same physical server can carry both sides. * Defense-in-depth fallbacks at both compile and execute time bump a new `QUERY_REWRITE_EXCEPTIONS` meter and fall back to the base-table query whenever the MV path raises, so an MV failure never surfaces as a 500. The fallback path refreshes routing locals from the recomputed route so cancel + metrics + error reporting reflect the live state. * MV cache lifecycle wired through the broker's resource state model: invalidate on OFFLINE/DROPPED, refresh on OFFLINE→ONLINE for BOTH MV-table cycles (direct rehydrate) and base-table cycles (walk MV defs and reload any whose baseTables reference the transitioning table) so a transient broker-resource bounce never silently disables MV rewrite. * `annotateResponse` stamps `materializedViewQueried` only when a swap was actually committed (`isFullRewrite() || isSplitRewrite()`) — no false-positives on `fromRewriteResult` (skip) paths. * SPLIT_REWRITE refuses to return partial results when *all* MV-side or *all* base-side servers failed — falls back to the base-table path. * FULL_REWRITE is rejected on hybrid base tables (where a batch MV would otherwise silently drop realtime data) and the broker-internal `materializedViewRewrite` query-option marker is stripped from user input inside `doHandleRequest` so it can never bypass `BrokerReduceService`'s nested-query safety net — even on the IN_SUBQUERY recursion path. * Result-column names are preserved through rewrite via implicit aliases on the rewritten select list, so clients reading the result by column name are not silently broken when the MV path kicks in. * TIMESTAMP-only contract on the MV time column, validated at view creation, so the broker never has to guess time-format alignment between base and MV. * Per-MV `rewriteEnabled` and `stalenessThresholdMs` knobs for SLO control. * `BrokerGauge.MATERIALIZED_VIEW_CACHE_ENTRY_COUNT` exposes the MV metadata cache size so operators can detect unbounded growth. * Integration test polls real cache observables (not fixed sleeps) so the test runs as fast as the cluster's actual ZK propagation allows. * Quickstart example wires a `TIMESTAMP` time column on the airlineStatsView table via `definedSQL` (`daysSinceEpoch * 24 * 60 * 60 * 1000`). Co-Authored-By: Hongkun Xu <xuhongkun666@163.com> Co-Authored-By: Xiang Fu <xiangfu@startree.ai>
xiangfu0
added a commit
to hongkunxu/pinot
that referenced
this pull request
May 21, 2026
PR 2 of 2 in the SSE Materialized View series. PR 1 (apache#18528) landed the MV view-definition, controller routing, and minion-task plumbing; this change turns on the broker-side query rewrite that actually serves user queries from the materialized view. Highlights ---------- * Broker query rewrite engine with three subsumption strategies — Exact, Aggregation (re-aggregation via the AggregationEquivalence registry), and Scan — chosen at compile time from the rewrite plan. * Two execution modes: FULL_REWRITE swaps the server query, schema, and table name at compile time; SPLIT_REWRITE issues dual scatter-gather (base side `ts >= watermarkMs`, MV side `ts < watermarkMs`) and merges DataTables via IdentityHashMap so the same physical server can carry both sides. * Defense-in-depth fallbacks at both compile and execute time bump a new `QUERY_REWRITE_EXCEPTIONS` meter and fall back to the base-table query whenever the MV path raises, so an MV failure never surfaces as a 500. The fallback path refreshes routing locals from the recomputed route so cancel + metrics + error reporting reflect the live state. * MV cache lifecycle wired through the broker's resource state model: invalidate on OFFLINE/DROPPED, refresh on OFFLINE→ONLINE for BOTH MV-table cycles (direct rehydrate) and base-table cycles (walk MV defs and reload any whose baseTables reference the transitioning table) so a transient broker-resource bounce never silently disables MV rewrite. * `annotateResponse` stamps `materializedViewQueried` only when a swap was actually committed (`isFullRewrite() || isSplitRewrite()`) — no false-positives on `fromRewriteResult` (skip) paths. * SPLIT_REWRITE refuses to return partial results when *all* MV-side or *all* base-side servers failed — falls back to the base-table path. * FULL_REWRITE is rejected on hybrid base tables (where a batch MV would otherwise silently drop realtime data) and the broker-internal `materializedViewRewrite` query-option marker is stripped from user input inside `doHandleRequest` so it can never bypass `BrokerReduceService`'s nested-query safety net — even on the IN_SUBQUERY recursion path. * Result-column names are preserved through rewrite via implicit aliases on the rewritten select list, so clients reading the result by column name are not silently broken when the MV path kicks in. * TIMESTAMP-only contract on the MV time column, validated at view creation, so the broker never has to guess time-format alignment between base and MV. * Per-MV `rewriteEnabled` and `stalenessThresholdMs` knobs for SLO control. * `BrokerGauge.MATERIALIZED_VIEW_CACHE_ENTRY_COUNT` exposes the MV metadata cache size so operators can detect unbounded growth. * Integration test polls real cache observables (not fixed sleeps) so the test runs as fast as the cluster's actual ZK propagation allows. * Quickstart example wires a `TIMESTAMP` time column on the airlineStatsView table via `definedSQL` (`daysSinceEpoch * 24 * 60 * 60 * 1000`). Co-Authored-By: Hongkun Xu <xuhongkun666@163.com> Co-Authored-By: Xiang Fu <xiangfu@startree.ai>
xiangfu0
added a commit
to hongkunxu/pinot
that referenced
this pull request
May 21, 2026
PR 2 of 2 in the SSE Materialized View series. PR 1 (apache#18528) landed the MV view-definition, controller routing, and minion-task plumbing; this change turns on the broker-side query rewrite that actually serves user queries from the materialized view. Highlights ---------- * Broker query rewrite engine with three subsumption strategies — Exact, Aggregation (re-aggregation via the AggregationEquivalence registry), and Scan — chosen at compile time from the rewrite plan. * Two execution modes: FULL_REWRITE swaps the server query, schema, and table name at compile time; SPLIT_REWRITE issues dual scatter-gather (base side `ts >= watermarkMs`, MV side `ts < watermarkMs`) and merges DataTables via IdentityHashMap so the same physical server can carry both sides. * Defense-in-depth fallbacks at both compile and execute time bump a new `QUERY_REWRITE_EXCEPTIONS` meter and fall back to the base-table query whenever the MV path raises, so an MV failure never surfaces as a 500. The fallback path refreshes routing locals from the recomputed route so cancel + metrics + error reporting reflect the live state. * MV cache lifecycle wired through the broker's resource state model: invalidate on OFFLINE/DROPPED, refresh on OFFLINE→ONLINE for BOTH MV-table cycles (direct rehydrate) and base-table cycles (walk MV defs and reload any whose baseTables reference the transitioning table) so a transient broker-resource bounce never silently disables MV rewrite. * `annotateResponse` stamps `materializedViewQueried` only when a swap was actually committed (`isFullRewrite() || isSplitRewrite()`) — no false-positives on `fromRewriteResult` (skip) paths. * SPLIT_REWRITE refuses to return partial results when *all* MV-side or *all* base-side servers failed — falls back to the base-table path. * FULL_REWRITE is rejected on hybrid base tables (where a batch MV would otherwise silently drop realtime data) and the broker-internal `materializedViewRewrite` query-option marker is stripped from user input inside `doHandleRequest` so it can never bypass `BrokerReduceService`'s nested-query safety net — even on the IN_SUBQUERY recursion path. * `BaseSingleStageBrokerRequestHandler` keeps MV-specific logic in dedicated helper methods (`applyMaterializedViewRewriteAtCompile`, `tryExecuteMaterializedViewSplit`) so the non-MV call sites in `compileRequest` / `doHandleRequest` stay short — a non-MV deployment reads the existing flow with only a small guarded MV branch added. * Result-column names are preserved through rewrite via implicit aliases on the rewritten select list, so clients reading the result by column name are not silently broken when the MV path kicks in. * TIMESTAMP-only contract on the MV time column, validated at view creation, so the broker never has to guess time-format alignment between base and MV. * Per-MV `rewriteEnabled` and `stalenessThresholdMs` knobs for SLO control. * `BrokerGauge.MATERIALIZED_VIEW_CACHE_ENTRY_COUNT` exposes the MV metadata cache size so operators can detect unbounded growth. * Integration test polls real cache observables (not fixed sleeps) so the test runs as fast as the cluster's actual ZK propagation allows. * Quickstart example wires a `TIMESTAMP` time column on the airlineStatsView table via `definedSQL` (`daysSinceEpoch * 24 * 60 * 60 * 1000`). Co-Authored-By: Hongkun Xu <xuhongkun666@163.com> Co-Authored-By: Xiang Fu <xiangfu@startree.ai>
xiangfu0
added a commit
to hongkunxu/pinot
that referenced
this pull request
May 21, 2026
PR 2 of 2 in the SSE Materialized View series. PR 1 (apache#18528) landed the MV view-definition, controller routing, and minion-task plumbing; this change turns on the broker-side query rewrite that actually serves user queries from the materialized view. Highlights ---------- * Broker query rewrite engine with three subsumption strategies — Exact, Aggregation (re-aggregation via the AggregationEquivalence registry), and Scan — chosen at compile time from the rewrite plan. * Two execution modes: FULL_REWRITE swaps the server query, schema, and table name at compile time; SPLIT_REWRITE issues dual scatter-gather (base side `ts >= watermarkMs`, MV side `ts < watermarkMs`) and merges DataTables via IdentityHashMap so the same physical server can carry both sides. * Defense-in-depth fallbacks at both compile and execute time bump a new `QUERY_REWRITE_EXCEPTIONS` meter and fall back to the base-table query whenever the MV path raises, so an MV failure never surfaces as a 500. The fallback path refreshes routing locals from the recomputed route so cancel + metrics + error reporting reflect the live state. * MV cache lifecycle wired through the broker's resource state model: invalidate on OFFLINE/DROPPED, refresh on OFFLINE→ONLINE for BOTH MV-table cycles (direct rehydrate) and base-table cycles (walk MV defs and reload any whose baseTables reference the transitioning table) so a transient broker-resource bounce never silently disables MV rewrite. * `annotateResponse` stamps `materializedViewQueried` only when a swap was actually committed (`isFullRewrite() || isSplitRewrite()`) — no false-positives on `fromRewriteResult` (skip) paths. * SPLIT_REWRITE refuses to return partial results when *all* MV-side or *all* base-side servers failed — falls back to the base-table path. * FULL_REWRITE is rejected on hybrid base tables (where a batch MV would otherwise silently drop realtime data) and the broker-internal `materializedViewRewrite` query-option marker is stripped from user input inside `doHandleRequest` so it can never bypass `BrokerReduceService`'s nested-query safety net — even on the IN_SUBQUERY recursion path. * `BaseSingleStageBrokerRequestHandler` keeps MV-specific logic in dedicated helper methods (`applyMaterializedViewRewriteAtCompile`, `tryExecuteMaterializedViewSplit`) so the non-MV call sites in `compileRequest` / `doHandleRequest` stay short — a non-MV deployment reads the existing flow with only a small guarded MV branch added. * Result-column names are preserved through rewrite via implicit aliases on the rewritten select list, so clients reading the result by column name are not silently broken when the MV path kicks in. * TIMESTAMP-only contract on the MV time column, validated at view creation, so the broker never has to guess time-format alignment between base and MV. * Per-MV `rewriteEnabled` and `stalenessThresholdMs` knobs for SLO control. * `BrokerGauge.MATERIALIZED_VIEW_CACHE_ENTRY_COUNT` exposes the MV metadata cache size so operators can detect unbounded growth. * Integration test polls real cache observables (not fixed sleeps) so the test runs as fast as the cluster's actual ZK propagation allows. * Quickstart example wires a `TIMESTAMP` time column on the airlineStatsView table via `definedSQL` (`daysSinceEpoch * 24 * 60 * 60 * 1000`). Co-Authored-By: Hongkun Xu <xuhongkun666@163.com> Co-Authored-By: Xiang Fu <xiangfu@startree.ai>
xiangfu0
added a commit
to hongkunxu/pinot
that referenced
this pull request
May 21, 2026
PR 2 of 2 in the SSE Materialized View series. PR 1 (apache#18528) landed the MV view-definition, controller routing, and minion-task plumbing; this change turns on the broker-side query rewrite that actually serves user queries from the materialized view. Highlights ---------- * Broker query rewrite engine with three subsumption strategies — Exact, Aggregation (re-aggregation via the AggregationEquivalence registry), and Scan — chosen at compile time from the rewrite plan. * Two execution modes: FULL_REWRITE swaps the server query, schema, and table name at compile time; SPLIT_REWRITE issues dual scatter-gather (base side `ts >= watermarkMs`, MV side `ts < watermarkMs`) and merges DataTables via IdentityHashMap so the same physical server can carry both sides. * Defense-in-depth fallbacks at both compile and execute time bump a new `QUERY_REWRITE_EXCEPTIONS` meter and fall back to the base-table query whenever the MV path raises, so an MV failure never surfaces as a 500. * MV cache lifecycle wired through the broker's resource state model: invalidate on OFFLINE/DROPPED, refresh on OFFLINE→ONLINE for both MV-table and base-table cycles so a transient broker-resource bounce never silently disables MV rewrite. `MaterializedViewHandler.close()` is invoked from broker shutdown so a hot-reload or test teardown does not leak Helix watcher slots. * `annotateResponse` stamps `materializedViewQueried` only when a swap was actually committed — no false-positives on `fromRewriteResult` paths. * SPLIT_REWRITE refuses to return partial results when *all* MV-side or *all* base-side servers failed. * FULL_REWRITE is rejected on hybrid base tables (where a batch MV would otherwise silently drop realtime data) and the broker-internal `materializedViewRewrite` query-option marker is stripped from user input inside `doHandleRequest` so it can never bypass `BrokerReduceService`'s nested-query safety net. * `BaseSingleStageBrokerRequestHandler` keeps MV-specific logic in dedicated helpers (`applyMaterializedViewRewriteAtCompile`, `tryExecuteMaterializedViewSplit`) so the non-MV call sites stay short. * Result-column names are preserved through rewrite via implicit aliases on the rewritten select list. * TIMESTAMP-only contract on the MV time column, validated at view creation. * Per-MV `rewriteEnabled` and `stalenessThresholdMs` knobs for SLO control. * `BrokerGauge.MATERIALIZED_VIEW_CACHE_ENTRY_COUNT` exposes the MV metadata cache size so operators can detect unbounded growth. * Integration test polls real cache observables (not fixed sleeps). * Quickstart example wires a `TIMESTAMP` time column on the airlineStatsView table via `definedSQL` (`daysSinceEpoch * 24 * 60 * 60 * 1000`). Co-Authored-By: Hongkun Xu <xuhongkun666@163.com> Co-Authored-By: Xiang Fu <xiangfu@startree.ai>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Related Issue
This PR is part 1/2 of the solution to PEP-Request #17298 — Support for Materialised Query Rewrite in Apache Pinot using MV Tables and Calcite.
Part 1 — Materialized View: Create, Metadata & Ingestion
Summary
This PR introduces the storage / ingestion half of Apache Pinot's Materialized View (MV) feature: an MV is modeled as a derived Pinot table whose contents are built and incrementally refreshed by a Minion task framework that reacts to base-table segment changes. It lands the ZK-persisted MV definition + runtime model, the controller-side consistency manager that marks partitions STALE on base-table changes, partition-level cryptographic fingerprinting for collision-resistant change detection, and the APPEND / OVERWRITE / DELETE task lifecycle.
This PR is consumer-agnostic: once built, the MV table is queryable by name like any other Pinot OFFLINE table. The Companion PR (linked at the end) ships the broker-side rewrite engine that automatically routes user queries against the base table onto the MV.
Key capabilities:
TaskConfigUtils.validateTaskConfigsfarmHashFingerprint64over sorted(segmentName, crc)) for collision-resistant change detectionmaxTasksPerBatch)CasConflictExceptionseparates retry-worthy conflicts from real validation errorsmaxNumRecordsPerSegment, not by total window row countpinot.minion.materializedview.broker.grpc.*Scope
pinot-materialized-viewmodule:analysis/,consistency/,metadata/,scheduler/,executor/,context/rewrite/package (AggregationEquivalenceRegistry + Passthrough/SketchMerge rules) — lands with the engine that applies themnotifyMaterializedView*hooks onPinotHelixResourceManager(every segment-metadata write) andSegmentDeletionManagerMaterializedViewQueryRewriteEngine+ subsumption strategiesBaseControllerStarter:MaterializedViewConsistencyManagerregistered before segment lifecycle listenersMaterializedViewMetadataCacheon the brokerMaterializedViewTaskminion plugin (generator + executor + factory + observer)BrokerResponse.materializedViewQueriedannotationMaterializedViewTaskconstants moved fromMinionConstantstoCommonConstants(pinot-spi)BaseSingleStageBrokerRequestHandlerMV integrationairlineStatsMvquickstart with MV-vs-base result comparison for every demonstrated aggregationrewriteEnabledenforcement + SLO-based eligibility gatepinot-materialized-view/DESIGN.mdcovering today's time-windowed model and the planned fixed-partition extensionDesign Doc
Long-form design doc: https://docs.google.com/document/d/1ToJfN42IMNySEY8YODb99Beis9YpLa8A8OWLcPcvG0M/edit?usp=sharing
In-repo design notes for the next contributor:
pinot-materialized-view/DESIGN.md— covers today's time-windowed partition model, what's already partition-shape-neutral, and the migration plan for fixed-partition (CATEGORICAL / SINGLETON) MVs.Architecture (this PR)
graph TB subgraph User Q["User Query<br/>(queries airlineStatsMv directly by name<br/>until Companion PR lands)"] end subgraph Controller PHRM["PinotHelixResourceManager<br/>createSegmentZkMetadata / updateZkMetadata<br/>notifyMaterializedView*"] SDM["SegmentDeletionManager<br/>notifyMaterializedView*"] MVCM["MaterializedViewConsistencyManager<br/>debounce + CAS-mark STALE<br/>onBaseTableDataChange(range)<br/>onBaseTableFullInvalidation()"] TASKVAL["TaskConfigUtils.validateTaskConfigs<br/>→ MaterializedViewAnalyzer.analyze<br/>(REST table-create / update)"] end subgraph ZK["ZooKeeper"] DEF["/CONFIGS/MATERIALIZED_VIEW/DEFINITION<br/>definedSql, baseTables, splitSpec,<br/>partitionExprMaps,<br/>rewriteEnabled, stalenessThresholdMs"] RUN["/CONFIGS/MATERIALIZED_VIEW/RUNTIME<br/>watermarkMs +<br/>partitions { bucketStartMs →<br/>(state, fingerprint, lastRefreshTime) }<br/>typed ZNRecord MapField per partition"] end subgraph Minion TG["MaterializedViewTaskGenerator<br/>APPEND / OVERWRITE / DELETE<br/>maxTasksPerBatch"] TE["MaterializedViewTaskExecutor<br/>1. stream gRPC rows from broker<br/>2. chunk into segments (bounded heap)<br/>3. upload + segment-replace<br/>4. CAS-update runtime metadata"] GRPC_FACTORY["GrpcMaterializedViewQueryExecutor<br/>broker discovery + round-robin<br/>BrokerGrpcQueryClient cache<br/>QueryHandle exposes Iterator<Object[]>"] end subgraph Broker BG["gRPC broker query endpoint<br/>existing, unchanged"] end BT["Base Table"] -->|segment add/replace/delete| PHRM PHRM --> MVCM SDM --> MVCM MVCM -->|CAS write STALE via persist| RUN TASKVAL -.->|reads| DEF TG -->|read| DEF TG -->|read| RUN TG -->|emit task| TE TE --> GRPC_FACTORY GRPC_FACTORY -->|SQL| BG BG -->|streamed frames| TE TE -->|segment upload| Server[Server segments] TE -->|CAS update runtime via persist| RUNCoverage Model
The MV runtime ZNode tracks coverage as a per-partition map keyed by
bucketStartMs:partitions[bucketStart] = { state: VALID | STALE, fingerprint, lastRefreshTime }— partition presentwatermarkMs— highest contiguous VALID prefix from epoch. Advances monotonically as APPEND tasks succeed; resets on OVERWRITE that punches a hole below the current watermark.EXPIREDstate.PartitionInfois serialized as a typedMap<String,String>ZNRecord MapField per bucket (keys:state,segmentCount,crc,lastRefreshTime), forward-compatible to new keys via "unknown keys ignored on read".Key Components
MaterializedViewConsistencyManagerpinot-materialized-viewMaterializedViewDefinitionMetadatapinot-materialized-viewrewriteEnabled,stalenessThresholdMs)MaterializedViewRuntimeMetadatapinot-materialized-viewwatermarkMs+ per-bucketpartitionsmap (typed ZNRecordMapFieldper partition); defensive-copy on constructionMaterializedViewRuntimeMetadataUtilspinot-materialized-viewpersist(...)helper. Throws typedCasConflictExceptionon CAS failure so callers distinguish "retry me" from validation / transport errors.PartitionInfo/PartitionFingerprint/PartitionStatepinot-materialized-viewfarmHashFingerprint64over sorted(segmentName, crc).PartitionFingerprint.encodeMapsorts by key for deterministic output.MaterializedViewAnalyzerpinot-materialized-viewbucketTimePeriod, LIMIT/OFFSET/nested-SELECT rules, time-column format inference, source-table eligibility (rejects UPSERT/DEDUP/DIMENSION/REFRESH/REALTIME sources).MaterializedViewTaskSchedulerpinot-materialized-viewMaterializedViewAnalyzer.analyzefromvalidateTaskConfigs(wired throughTaskConfigUtilsat REST table-create time).MaterializedViewTaskGeneratorpinot-plugins/pinot-minion-tasksMaterializedViewTaskand delegates to the scheduler.MaterializedViewTaskExecutorpinot-plugins/pinot-minion-taskstotalRows >= effectiveLimit) fails BEFORE segment commit; narrowZkException-only retry boundary; lineage rollback on failure.GrpcMaterializedViewQueryExecutorpinot-materialized-viewQueryHandleexposing schema +Iterator<Object[]>so the executor pulls rows frame-by-frame. Discovers brokers via Helix, round-robins, caches oneBrokerGrpcQueryClientper endpoint, evicts stale clients. Configured viapinot.minion.materializedview.broker.grpc.*(TLS, max inbound size, keepalive).MaterializedViewQuickStartpinot-toolsairlineStatsfixture, materializesairlineStatsMv, then runs each aggregation against both tables and asserts results match (numeric-tolerant comparator).Quick Start
Launches a local cluster, loads
airlineStats(base) andairlineStatsMv(the MV), back-fills all 31 days of MV segments via batched APPEND tasks (maxTasksPerBatch=31), then runs each demo aggregation twice:SUMoversum_ArrDelay,SUMoverflight_count,DISTINCTCOUNTHLLover the raw-sketch column, etc.)The quickstart prints both result sets side by side and reports
MATCH/DIFFERso you can see the correctness contract that the Companion PR's rewriter will rely on. End-to-end smoke test results: MV table created, 432 pre-aggregated rows materialized in ~40s, all 5 base-vs-MV comparisons return MATCH.Configuration Reference
MV task config —
task.taskTypeConfigsMap.MaterializedViewTaskdefinedSQLbucketTimePeriod1d,1h,30m). Required: no implicit default to avoid silent drift.bufferTimePeriod0d>= 0.maxTasksPerBatch4[1, 1000].maxNumRecordsPerSegment5_000_000stalenessThresholdMs0(no SLO)isEligiblecheck. Persisted today on the definition znode.Minion gRPC client config —
pinot.minion.materializedview.broker.grpc.*The minion ships with plaintext defaults — fine for local quickstarts, must be set on TLS-enabled clusters. Subset prefix used by the gRPC client factory in
MaterializedViewTaskExecutorFactory.Per-task auth credentials (Bearer tokens, etc.) are unaffected by this prefix — they flow through the task's
AuthProvideras gRPC metadata.Caps and cluster-config overrides
Each cap has a compile-time default and a Helix CLUSTER-scope cluster-config key. The consumer site reads the override live on every call (no controller / minion restart needed); an unset / malformed / non-positive value falls back to the compile-time default.
DEFAULT_MATERIALIZED_VIEW_QUERY_LIMIT1_000_000pinot.materialized.view.query.default.limitdefinedSQLomits one. Per-MV alternative: declare an explicitLIMIT NindefinedSQL.MAX_MATERIALIZED_VIEW_QUERY_LIMIT100_000_000pinot.materialized.view.query.max.limitMAX_TASKS_PER_BATCH_USER_CAP1_000pinot.materialized.view.scheduler.max.tasks.per.batch.capmaxTasksPerBatch.DEFAULT_MAX_BATCH_LOOP_ITERATIONS100_000pinot.materialized.view.scheduler.max.batch.loop.iterationsDEFAULT_MAX_RUNTIME_UPDATE_ATTEMPTS128pinot.materialized.view.executor.runtime.update.max.attemptsDEFAULT_DEBOUNCE_DELAY_MS5_000mspinot.materialized.view.consistency.debounce.msSetting an override (via the controller REST
/cluster/configs):Reads are live — operators do not need to restart any role for the new value to take effect. Existing in-flight tasks complete with whatever value they captured at the start of their cycle.
Defense-in-depth against silent truncation
definedSQLmay omit LIMIT. The system protects against silent result truncation in the materialized segments at three layers:1. Analyzer (table-create time, REST API):
bucketTimePeriod— no implicit default.MAX_MATERIALIZED_VIEW_QUERY_LIMIT.DateTimeFieldSpec.2. Generator (per scheduling cycle):
effectiveLimitvia AST.LIMIT effectiveLimitto the broker SQL when the user did not declare one.EFFECTIVE_LIMIT_KEYfor the executor.3. Executor (per task):
parseEffectiveLimitfails loud ifEFFECTIVE_LIMIT_KEYis missing or non-positive.totalRows >= effectiveLimitthe task fails BEFORE any segment is committed via lineage. The broker enforces LIMIT by truncating at exactly N rows, so receiving exactly N is treated as "possibly truncated" — conservative by design.Task Lifecycle
APPENDbufferTimePeriod[watermarkMs, watermarkMs + bucketMs), advances watermark on success. WithmaxTasksPerBatch > 1, schedules up to N consecutive windows per cycle.OVERWRITESTALEby the consistency manager and a fresh fingerprint computation confirms a real changeDELETEDELETE / OVERWRITE are mutually exclusive (both touch existing MV segments via segment-replace) and gated at one in-flight per MV table. APPEND is gated by
maxTasksPerBatch.End-to-end Example
1. Base table (
airlineStats) — already part of the standard quickstart2. MV table config (
airlineStatsMv_offline_table_config.json){ "tableName": "airlineStatsMv", "tableType": "OFFLINE", "segmentsConfig": { "timeColumnName": "tsMs", "timeType": "MILLISECONDS", "segmentPushType": "APPEND", "replication": "1" }, "tableIndexConfig": { "loadMode": "MMAP" }, "task": { "taskTypeConfigsMap": { "MaterializedViewTask": { "definedSQL": "SELECT DaysSinceEpoch * 86400000 AS tsMs, Carrier, SUM(ArrDelay) AS sum_ArrDelay, COUNT(*) AS flight_count, MIN(ArrDelay) AS min_ArrDelay, MAX(ArrDelay) AS max_ArrDelay, DISTINCTCOUNTRAWHLL(FlightNum) AS raw_hll_FlightNum, DISTINCTCOUNTRAWHLLPLUS(FlightNum) AS raw_hllplus_FlightNum FROM airlineStats GROUP BY DaysSinceEpoch * 86400000, Carrier", "bucketTimePeriod": "1d", "maxTasksPerBatch": "31" } } } }3. MV schema (
airlineStatsMv_schema.json){ "schemaName": "airlineStatsMv", "dimensionFieldSpecs": [ { "name": "Carrier", "dataType": "STRING" } ], "metricFieldSpecs": [ { "name": "sum_ArrDelay", "dataType": "LONG" }, { "name": "flight_count", "dataType": "LONG" }, { "name": "min_ArrDelay", "dataType": "INT" }, { "name": "max_ArrDelay", "dataType": "INT" }, { "name": "raw_hll_FlightNum", "dataType": "BYTES" }, { "name": "raw_hllplus_FlightNum", "dataType": "BYTES" } ], "dateTimeFieldSpecs": [ { "name": "tsMs", "dataType": "TIMESTAMP", "format": "1:MILLISECONDS:TIMESTAMP", "granularity": "1:MILLISECONDS" } ] }4. Querying the MV directly (until Companion PR lands)
Re-aggregation rules used by both the quickstart's verification step and the Companion PR's broker rewriter:
SUM(ArrDelay)SUM(sum_ArrDelay)COUNT(*)SUM(flight_count)— COUNT is aSUM-with-trivial-input under re-aggregationMIN(ArrDelay)MIN(min_ArrDelay)MAX(ArrDelay)MAX(max_ArrDelay)DISTINCTCOUNTHLL(FlightNum)DISTINCTCOUNTHLL(raw_hll_FlightNum)— same function applied to the raw-sketch column merges sketchesDISTINCTCOUNTHLLPLUS(FlightNum)DISTINCTCOUNTHLLPLUS(raw_hllplus_FlightNum)Concrete example:
MV Definition Examples
Scan MV (no aggregation):
Aggregation MV:
HLL sketch MV (raw sketch storage, mergeable at query time):
DATETRUNCbucket MV (coarser MV time column than the base):The MV's designated
segmentsConfig.timeColumnNamemust bedayBucket, its unit must matchbucketTimePeriod=1d, and the analyzer enforces both at create time.Common Errors
MaterializedViewTask requires 'bucketTimePeriod'bucketTimePeriodmissing on task config"bucketTimePeriod": "1d"(or other Joda period).LIMIT must be strictly positiveLIMIT … exceeds maximum allowed …definedSQL must not declare OFFSETnested SELECT / subquerydefinedSQLSource table … is mutable (UPSERT/DEDUP/DIMENSION/REFRESH)Source table … is a REALTIME table … not yet supported_REALTIMEonlyMV definedSQL uses aggregation 'X' for which no MV-side re-aggregation is supporteddefinedSQLuses an aggregation the rewriter (in the Companion PR) cannot re-aggregateSUM/MIN/MAX/COUNT/DISTINCTCOUNTRAWHLL/DISTINCTCOUNTRAWHLLPLUS/DISTINCTCOUNTRAWTHETASKETCH.MV result saturated LIMIT>= effectiveLimitbucketTimePeriodor add WHERE filters indefinedSQL.Missing effectiveLimit in task configCannot delete table 'X': N materialized view(s) depend on it: […]. Drop the dependent materialized views first.Operational Notes
BaseControllerStarter), so segment events arriving as Helix comes online don't slip past the notify path./CONFIGS/MATERIALIZED_VIEW/{DEFINITION,RUNTIME}.PartitionInfois a typedZNRecord.MapFieldper partition; new fields can be added without breaking older readers.PinotHelixResourceManager.notifyMaterializedViewConsistencyManagerexits early viagetDependentMaterializedViews(rawTableName).isEmpty()before any log emission or downstream dispatch, so every controller-side ZK segment-metadata write — including the high-volume realtime LLC-commit path — does a single ConcurrentHashMap lookup and returns.maxTasksPerBatch=1000), one task can spend up to ~25 seconds in jittered backoff before exhaustingMAX_RUNTIME_UPDATE_ATTEMPTS=128. Size the minion thread pool to absorb this. Non-CAS exceptions (e.g.IllegalStateExceptionfrom invariant checks, validation errors) propagate immediately without retry.Testing
pinot-materialized-view, 25 inpinot-minion-builtin-tasks):PartitionFingerprintTest,PartitionInfoTest,PartitionStateTest,MaterializedViewMetadataTest)MaterializedViewAnalyzerTest): LIMIT / OFFSET / nested-SELECT, trailing-comment probe, time-column format inference, mutable-source rejection, realtime-source rejectionGrpcMaterializedViewQueryExecutorTest): client cache, broker discovery, stale-client evictionMaterializedViewTaskExecutorTest): saturation gate, lineage rollback, narrow-retry boundaryMaterializedViewTaskSchedulerTest): end-to-end LIMIT injection, cold-start metadata, batched APPENDMaterializedViewConsistencyManagerTest): debounce + CAS retry, no synthetic-STALE on full invalidation, no-op when no dependent MVTimeExprValidatorTest): identity passthrough vsDATETRUNCrulesMaterializedViewQuickStartexercises the full ingestion pipeline end-to-end against the airlineStats fixture and asserts per-aggregation equality between base and MV queries using a numeric-tolerant comparator. Verified locally end-to-end: cluster boot 60s, 432-row materialization in ~40s, all 5 comparisons MATCH.Modules Affected
pinot-spiCommonConstants.MaterializedViewTask(new),MINION_BROKER_GRPC_CONFIG_PREFIX, 6 cluster-config-override keyspinot-commonZKMetadataProviderhelpers for MV definition / runtime znode pathspinot-controllerMaterializedViewConsistencyManagerwiring inBaseControllerStarter;notifyMaterializedView*hooks increateSegmentZkMetadata/updateZkMetadata/addNewSegment/endReplaceSegments(PHRM) andSegmentDeletionManager; base-table delete blocked when MVs dependpinot-materialized-view(NEW MODULE)DESIGN.mdpinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasksMaterializedViewTaskGenerator,MaterializedViewTaskExecutor, factory, observer factorypinot-toolsMaterializedViewQuickStart+airlineStatsMvexample resourcesLimitations / Deferred Work
pinot-materialized-view/DESIGN.md.CREATE MATERIALIZED VIEWsyntax — table-config-driven only. DDL parser hooks land alongside the broker rewrite.airlineStatsMv_OFFLINE) — same as any other Pinot OFFLINE table.Companion PR
Part 2 of this work — the broker-side query rewrite engine that transparently routes user queries against the base table onto the MV table — ships as a separate PR:
That PR adds the subsumption strategies (Exact / Scan / Aggregation), the
MaterializedViewQueryRewriteEngine, the broker-side metadata cache, hybrid (MV + base) execution, the per-MVrewriteEnabled/stalenessThresholdMsgates on the MV definition, and thematerializedViewQueriedfield on the broker response. It depends on this PR for the metadata model and the populated runtime watermarks.Without the Companion PR, the artifacts produced by this PR are still fully functional: MVs are built and incrementally refreshed, and operators can query the MV table directly by name like any other Pinot OFFLINE table — exactly the round-trip the quickstart's comparison step verifies.