Skip to content

[MSE] Leverage pre-sorted pre-partitioned data layout: streaming merge join and sorted group-by aggregation #18667

@rohityadav1993

Description

@rohityadav1993

Proposal

Sorted Merge join
When the data from left input and right input is sorted by the join columns we can leverage sorted merge join for a more scalable join computation by avoiding hash based joins (e.g. ClickHouse full sort-merge join).

Sorted key group by
When data is sorted by the group-by keys in the aggregation query, we can avoid hash aggregations by computing the aggregation in a streaming manner (e.g. MySQL GROUP BY Optimization).

Why avoid hash join

Hash join materializes the entire right side into a hash table before probing begins, so memory grows linearly with right-side row count

protected MseBlock getNextBlock() {
if (!_isRightTableBuilt) {
buildRightTable();
}
. For multi-column join keys, Pinot falls back from primitive-typed lookup tables (IntLookupTable, LongLookupTable) to a generic ObjectLookupTable that allocates a composite key object per row, increasing GC pressure at scale. When data is already partitioned and sorted on the join key, a streaming merge join avoids the hash table entirely.

Why avoid hash aggregation

Hash-based GROUP BY in Pinot's MSE (MultistageGroupByExecutor) assigns each distinct group key a slot in a hash map — Long2IntOpenHashMap for single LONG keys, Object2IntOpenHashMap for multi-column keys — and accumulates aggregation state for every group simultaneously

private int[] generateGroupByKeys(MseBlock.Data block) {
return block.isRowHeap()
? generateGroupByKeys(block.asRowHeap().getRows())
: generateGroupByKeys(block.asSerialized().getDataBlock());
}
Memory grows with the number of distinct groups, which is why queries with high-cardinality keys require SET numGroupsLimit = 50000000 to avoid hitting the default cap.

When input rows are already sorted by the group-by key, none of that is necessary. E.g. MySQL calls this loose/tight index scan (https://dev.mysql.com/doc/refman/8.4/en/group-by-optimization.html): once rows are ordered by key, you only need to hold state for the current key — flush the accumulator to output when the key changes, reset, and continue. Memory is O(1) regardless of group count, and there are no hash collisions, no map resizes, and no FixedIntArray allocations per group.

Note: Can create two separate issues but starting with one as there are commonalities in improvement at leaf stage to achieve either.


POC

We did a POC on a table (user_events) sorted on correlation_id within each segment and partitioned across servers using a Murmur hash on correlation_id (128 buckets). This ensures all events for a given user land on the same server.

Design choices in SortedMergeJoinOperator:

  • Lazy block reading: blocks are fetched from left and right inputs on demand — only one block per side is held in memory at a time.
  • LeafStageSortJoinRule.addSortToLeafStage() injects a PhysicalSort with no fetch limit and no offset, so the leaf ORDER BY is unbounded by design.
Funnel count query using sorted join:
SET useMultistageEngine = true;
SET usePhysicalOptimizer = true;
SET maxRowsInJoin = 1234194304;
SET timeoutMs = 120000;

WITH joined AS (
  SELECT /*+ joinOptions(join_strategy='sorted') */
    CAST(
      DATETIMECONVERT(
        CAST(step1.occurred_at_min AS BIGINT) * 1000,
        '1:MILLISECONDS:EPOCH', '1:MILLISECONDS:EPOCH', '600:SECONDS'
      ) AS BIGINT
    ) AS ts,
    step1.correlation_id AS u1,
    step2.correlation_id AS u2
  FROM user_events
    /*+ tableOptions(partition_key='correlation_id', partition_function='Murmur', partition_size='128') */
    AS step1
  LEFT JOIN user_events
    /*+ tableOptions(partition_key='correlation_id', partition_function='Murmur', partition_size='128') */
    AS step2
    ON step1.correlation_id = step2.correlation_id
    AND step2.name = 'addToCart'
    AND step2.occurred_at_min BETWEEN {start_epoch} AND {end_epoch}
  WHERE step1.name = 'checkout'
    AND step1.occurred_at_min BETWEEN {start_epoch} AND {end_epoch}
)
SELECT ts, COUNT(DISTINCT u1) AS step1, COUNT(DISTINCT u2) AS step2
FROM joined
GROUP BY ts ORDER BY ts LIMIT 100000

Physical plan (EXPLAIN PLAN FOR with usePhysicalOptimizer=true):

PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])
  PhysicalSort(sort0=[$0], dir0=[ASC], fetch=[100000])
    PhysicalAggregate(group=[{1}], agg#0=[DISTINCTCOUNT($0)], ...)
      PhysicalJoin(condition=[=($0, $2)], joinType=[left])
        PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])
          PhysicalSort(sort0=[$0], dir0=[ASC])          ← pushed into leaf by LeafStageSortJoinRule
            PhysicalProject(correlation_id, ts=[DATETIMECONVERT(...)])
              PhysicalFilter(name=checkout AND ...)
                PhysicalTableScan(user_events)
        PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])
          PhysicalSort(sort0=[$0], dir0=[ASC])          ← pushed into leaf by LeafStageSortJoinRule
            PhysicalProject(correlation_id)
              PhysicalFilter(name=addToCart AND ...)
                PhysicalTableScan(user_events)

IDENTITY_EXCHANGE on both sides confirms partition co-location — no cross-server shuffle for the join input.

Alternate single-scan query:
SET useMultistageEngine = true;
SET numGroupsLimit = 50000000;
SET timeoutMs = 30000;

SET useMultistageEngine = true;
SET numGroupsLimit = 50000000;
SET timeoutMs = 30000;

WITH per_id AS (
  SELECT
  /*+ aggOptions(is_partitioned_by_group_by_keys='true') */
  FLOOR(occurred_at_min / 600) * 600 AS ts,
  correlation_id,
    MAX(CASE WHEN name = 'addToCart'  THEN 1 ELSE 0 END) AS has_step1,
    MAX(CASE WHEN name = 'checkout'   THEN 1 ELSE 0 END) AS has_step2
  FROM user_events
  WHERE occurred_at_min BETWEEN {start_epoch} AND {end_epoch}
    AND (name = 'addToCart' OR name = 'checkout')
  GROUP BY ts, correlation_id
)
SELECT
  ts,
  SUM(has_step1) AS step1_count,
  SUM(CASE WHEN has_step1 = 1 AND has_step2 = 1 THEN 1 ELSE 0 END) AS step2_count
FROM per_id
GROUP BY ts ORDER BY ts LIMIT 200

Explain plan

LogicalSort(sort0=[$0], dir0=[ASC], offset=[0], fetch=[200])
  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])
    LogicalSort(sort0=[$0], dir0=[ASC], fetch=[200])
      PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[COUNT($2)], aggType=[FINAL])
        PinotLogicalExchange(distribution=[hash[0]])
          PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], agg#1=[COUNT() FILTER $2], aggType=[LEAF])
            LogicalProject(ts=[$0], has_step1=[$2], $f3=[AND(=($2, 1), =($3, 1))])
              PinotLogicalAggregate(group=[{0, 1}], agg#0=[MAX($2)], agg#1=[MAX($3)], aggType=[DIRECT])
                LogicalProject(ts=[*(FLOOR(/($31, 600)), 600)], correlation_id=[$14], ...)
                  LogicalFilter(...)
                    PinotLogicalTableScan(table=[[default, user_events]]) --- Notice this is a single scan

This uses a single TableScan (vs two in the join).


Challenges

1. Leaf stage ORDER BY in sorted join will materialize all segments into a single block

LeafStageSortJoinRule injects a PhysicalSort with no fetch limit or offset into the leaf stage so both join inputs arrive sorted. ServerPlanRequestVisitor.visitSort() only sets a LIMIT when fetch >= 0, so the injected sort — which has no fetch — generates an unbounded V1 ORDER BY with no LIMIT clause. CombinePlanNode selects MinMaxValueBasedSelectionOrderByCombineOperator for this ORDER BY, which merges results from all segments into a single block before returning. At large data volumes this exceeds the leaf stage's CPU budget, triggering EarlyTerminationException from ThreadAccountant inside SelectionOperatorUtils.mergeWithOrdering(), which propagates as a spurious Cancelled by sender to the broker.

Suggestions:

  • New MinMaxValueBasedSelectionOrderByCombineOperator to extend BaseStreamingCombineOperator, using a bounded queue and emitting sorted blocks incrementally via a k-way heap merge across segments. The existing MinMax segment-pruning optimization can be preserved. Consuming or unsorted segments would sort in memory per segment before joining the heap.
  • **Implement k-way merge in SortedMailboxReceiveOperator when the sending stage is known to be sorted (already noted as a TODO in that class), rather than accumulating all rows and re-sorting
    // TODO: This might not be efficient because we are sorting all the received rows. We should use a k-way merge
flowchart TD
    subgraph leaf["Leaf Stage — per server"]
        S1["Segment 1\n(sorted by join key)"]
        S2["Segment 2\n(sorted by join key)"]
        SN["Segment N\n(sorted or sort-in-memory)"]
        S1 & S2 & SN --> KH["K-way Heap Merge\nMinMaxValueBased→StreamingCombineOperator\npreserves collation, emits sorted blocks\nincrementally — MinMax pruning retained"]
    end

    KH --> MS["MailboxSendOperator\n(sorted blocks sent downstream)"]

    MS -->|"IDENTITY_EXCHANGE\n(co-located — no network shuffle)"| MR

    subgraph recv["Intermediate Stage"]
        MR["SortedMailboxReceiveOperator\nOption A: k-way merge across mailboxes\n\nOption B: receives already-merged stream\nfrom streaming combine"]
    end

    MR --> SMJ["SortedMergeJoinOperator\n(streaming two-pointer join\nlazy block loading — one block per side in memory)"]
Loading

2. Join queries that reduce per key can be rewritten as sorted GROUP BY, but MSE GROUP BY is not sort-aware

Some join queries — including the funnel query above — can be expressed as a single-scan GROUP BY (shown in the alternate query). MSE aggregation is also required for correctness. This is not just a rewrite convenience: SSE FUNNELCOUNT(settings('partitioned','sorted')) is not equivalent here because SSE processes each segment independently. When a user's funnel events span two segments on the same server, SortedAggregationResult resets its per-key state at the segment boundary and PartitionedMergeStrategy merges results by plain addition — producing incorrect counts.

With /*+ aggOptions(is_partitioned_by_group_by_keys='true') */, the alternate query achieves partition co-location by avoiding PinotLogicalExchange before first PinotLogicalAggregate, but the intermediate and final GROUP BY stages still use hash-based aggregation. When data is sorted on the group-by keys, a streaming sorted aggregation — accumulating state per key and flushing when the key changes — would eliminate hash table construction and the associated group count limits, mirroring how ClickHouse's MergingSortedTransform handles aggregation on MergeTree data. In the past we have seen hash based group by aggergation having issues with misconfigured hashtable size when cardinality is high.

Suggestions:

  • Implement a sorted-input-aware aggregation operator in MSE that streams over sorted blocks without a hash table, activated when the leaf stage advertises sorted collation on the group-by keys.
flowchart TB
    subgraph current["Current — two hash map layers"]
        direction TB

        subgraph leaf["V1 Leaf Stage (per server, across segments)"]
            direction TB
            S1["Seg 1\nGROUP BY scan\n→ GroupByResultsBlock\n(per-segment hash map)"]
            S2["Seg 2\nGROUP BY scan\n→ GroupByResultsBlock"]
            SN["Seg N\nGROUP BY scan\n→ GroupByResultsBlock"]
            S1 & S2 & SN --> GBC["GroupByCombineOperator\nextends BaseSingleBlockCombineOperator\n(NOT streaming)\nmerges all segments into\nshared IndexedTable hash map\n→ single GroupByResultsBlock"]
        end

        GBC -->|"mailbox send"| MSE

        subgraph mse["MSE Intermediate Stage"]
            direction TB
            MSE["MultistageGroupByExecutor\ngenerateGroupByKeys()\nObject2IntOpenHashMap or Long2IntOpenHashMap\nFixedIntArray per multi-column key\ngrows O(distinct groups)\n→ SET numGroupsLimit = 50000000 needed"]
        end
    end

    subgraph proposed["Proposed — sorted streaming (data sorted by GROUP BY key)"]
        direction TB

        subgraph pleaf["V1 Leaf Stage (streaming)"]
            direction TB
            T1["Seg 1\n(sorted by key)"] & T2["Seg 2\n(sorted by key)"] & TN["Seg N\n(sorted by key)"] --> KH["K-way Heap Merge\n(streaming combine)\nemits groups in key order\nno IndexedTable — O(1) per key"]
        end

        KH -->|"sorted blocks via mailbox"| SMSE

        subgraph smse["MSE Intermediate Stage (streaming)"]
            direction TB
            SMSE["Sorted Streaming Aggregator\nCompare key vs previous key"]
            SMSE -->|"same key"| ACC["Accumulate into\ncurrent state\nO(1) memory"]
            SMSE -->|"key changes"| EMIT["Emit completed group\nreset state"]
            ACC --> SMSE
            EMIT --> SMSE
        end
    end
Loading

Looking for community input on the preferred direction and would be happy to contribute the finalised approach.

Metadata

Metadata

Assignees

No one assigned

    Labels

    featureNew functionalitymulti-stageRelated to the multi-stage query engine

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions