Sorted Merge: Eliminate coordinator Sort node for multi-shard ORDER BY queries#8529
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #8529 +/- ##
==========================================
- Coverage 88.79% 88.76% -0.03%
==========================================
Files 287 288 +1
Lines 64053 64275 +222
Branches 8054 8089 +35
==========================================
+ Hits 56874 57056 +182
- Misses 4851 4880 +29
- Partials 2328 2339 +11 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
If we don't have any cases where non-streaming version performs faster than the streaming one, then I think we should always do a streaming merge when citus.enable_sorted_merge is enabled, and should remove citus.enable_streaming_sorted_merge. Do you think this makes sense?
Once we conclude above matter, I'll then provide deeper code review and would like to test the PR, so waiting atm. Btw, I had a chance to take a closer look into the PR and it's definitely going in the right direction, at the high level, thanks a lot!
Also, just to make sure if I'm making correct assumptions after yesterday's call - if we choose to always do streaming merge, do we have a way to "early" detect that backward scan is requested and we cannot fulfill it? "early" ideally means "in the planner", which I don't quite think possible, or in the executor. I meant, I'd even be okay if we fall back to the older behavior (where we do a final sort in the coordinating node) in the executor by emitting a notice, once / if we can detect that backwards scan is requested, even before trying to fetch the first tuple, which I'm not able to comment on either.
If none is possible, then yes, the current behavior with streaming merge is just fine, if we choose to drop "non-streaming" mode.
Also, I'd like to mention a few thing we can try before merging the PR, or occasionally during development:
- We should run the sorted merge tests by "disabling" the GUC in the test file. The expectation would be to have the same test output (except the SET .. GUC change) - this feels like a good way to validate the output.
- We should hardcode the default setting for the GUC to "ON" and should push maybe a new branch to the CI to see if rest of the test suite is still happy even while the GUC is turned ON. except any notice / debug messages added for the new feature.
Also, I didn't yet have a chance to check the regression tests added in this branch (although I'm seeing plenty of new ones, thanks!), but I'd still like to mention a few tests cases that we might want to add:
- insert .. select .. order by .. limit
- repartition joins
- router (single shard) queries
- tables with dropped columns (super important)
- weird / interesting queries, such as:
- select from dist order by column_a; -- yes, Postgres allows selecting 0 columns, so is Citus - make sure we don't crash
- select .. from .. order by .. limit 0; -- empty task tuple stores
- select .. from .. order by ..; -- on a distributed table that contains all its rows in a single shard
Phase 1 of the sorted-merge feature. This commit adds the data structures and GUC needed by later phases, with zero behavioral changes: - SortedMergeKey typedef in multi_physical_planner.h describing one sort key for the coordinator k-way merge - useSortedMerge, sortedMergeKeys[], sortedMergeKeyCount fields on DistributedPlan (plan-time decision, never checked at runtime via GUC) - sortedMergeEligible field on MultiExtendedOp (logical optimizer tag read by the physical planner) - Hidden GUC citus.enable_sorted_merge (PGC_SUSET, default off, GUC_NO_SHOW_ALL) consulted only during planning - Serialization in citus_outfuncs.c and deep-copy in citus_copyfuncs.c for all new fields All new fields default to false/0/NULL. Existing regression tests are unaffected. Co-authored-by: Copilot
…ro task code path by returning an exhausted adapter.
colm-mchugh
left a comment
There was a problem hiding this comment.
Lgtm overall, few remaining minor comments but looks good in general.
…statement plan execution after error
…Y queries (#8529) DESCRIPTION: add ORDER BY pushdown and coordinator merge for multi-shard queries. This PR adds a streaming sorted-merge execution path for multi-shard `SELECT ... ORDER BY` queries. When `citus.enable_sorted_merge` is enabled at planning time, Citus pushes safe `ORDER BY` clauses to workers, advertises sorted output from the coordinator `CustomScan`, and streams globally sorted tuples through a binary-heap merge over per-task tuplestores. The main effect is that PostgreSQL no longer needs to add a coordinator-side `Sort` node above the Citus scan. Worker shards do shard-local sorting in parallel, and the coordinator performs an `O(N log K)` k-way merge, where `K` is the task count, instead of an `O(N log N)` full-result sort. - Adds hidden experimental GUC `citus.enable_sorted_merge` (currently default `on`, `PGC_SUSET`). - Extends worker sort pushdown beyond the existing LIMIT/DISTINCT cases when the query is safe for sorted merge: - query has an `ORDER BY`, - window functions, if any, are pushable, - `ORDER BY` does not contain aggregate expressions, - `GROUP BY` is empty or grouped by the partitioning side. - Records `DistributedPlan.useSortedMerge` during physical planning. - Sets Citus CustomScan `pathkeys` to the combine query's required sort order so PostgreSQL elides the parent Sort. - Uses a dedicated `CustomScanMethods` entry, shown in EXPLAIN as `Custom Scan (Citus Sorted Merge Adaptive)`. - Adds `sorted_merge.c` / `sorted_merge.h` with a `SortedMergeAdapter` modeled after PostgreSQL `MergeAppend`. - Creates one tuplestore per task and routes worker tuples directly into the matching task store through `task->tupleDest`. - Shares one `TupleDestinationStats` object across all per-task destinations so `citus.max_intermediate_result_size` is enforced across the whole result, not per task. - Builds `SortSupportData` from the worker query sort clauses and uses a binary heap to return the next globally sorted tuple. - Adds a sorted-merge-specific `ExecCustomScan` callback that reads directly from the adapter, avoiding per-row branching between adapter and normal tuplestore paths. - Clears cached task `tupleDest` pointers after execution so prepared-plan reuse cannot see stale execution-local state. The streaming adapter is forward-only. - Sorted-merge plans do not advertise `CUSTOMPATH_SUPPORT_BACKWARD_SCAN`. - For `SCROLL` cursors, Citus reinserts the same Material wrapper PostgreSQL would normally add for non-backward-scannable plans. This is necessary because Citus replaces the plan tree after `standard_planner()`. - Non-scroll backward fetches remain unsupported, matching cursor semantics. - `SortedMergeAdapterRescan()` exists as a defensive/rescan path and rebuilds the heap from the per-task stores. - EXPLAIN plan shape changes from `Sort -> Custom Scan (Citus Adaptive)` to `Custom Scan (Citus Sorted Merge Adaptive)` for eligible queries. - The executor type is tracked as `MULTI_EXECUTOR_SORTED_MERGE` so query stats can distinguish sorted-merge executions from regular adaptive executions. Eligible examples: ```sql SELECT id, val FROM distributed_table ORDER BY id; SELECT id, val, created_at FROM distributed_table WHERE created_at >= now() - interval '1 day' ORDER BY created_at, id; SELECT tenant_id, count(*) FROM distributed_table GROUP BY tenant_id ORDER BY tenant_id; ``` Intentionally not eligible: ```sql -- ORDER BY aggregate expression SELECT tenant_id, sum(amount) AS total FROM distributed_table GROUP BY tenant_id ORDER BY total; -- GROUP BY non-distribution column SELECT user_id, count(*) FROM distributed_table GROUP BY user_id ORDER BY user_id; ``` Numbers below are from `ssm_presentation_0505.md`, using the latest documented run: - PostgreSQL 18.3 release build under `$HOME/pg18-release` - Citus built from this branch - 1 coordinator on port 9700, 4 workers on ports 9701-9704 - 8 shards, `work_mem=64MB`, `shared_buffers=4GB`, `jit=off` - `pgbench -n -T 60 -c $c -j $c -P 30 -p 9700 -d citus` - 2 tables x 2 configs x 8 query shapes x 3 client counts, 3 runs per cell - `BASE`: `citus.enable_sorted_merge=off` - `STREAM`: `citus.enable_sorted_merge=on` | Query | Shape | STREAM faster cells | Median speedup | |---|---|---:|---:| | q1 | small index only scan | 1/6 | 0.99x | | q2 | wider projection sorted by `created_at` | 6/6 | 1.03x | | q3 | multi-key sort | 6/6 | 1.08x | | q4 | unindexed sort key | 6/6 | 1.12x | | q5 | full row, no LIMIT | 6/6 | 1.57x | | q6 | full row with LIMIT | 6/6 | 1.65x | | q7/q8 | safety counter-tests | flat | 1.00x | - The strongest wins are full-row sorted reads: - q5 (`SELECT * ... ORDER BY event_id`) improved about **1.50x-1.63x**. - q6 (`SELECT * ... ORDER BY event_id LIMIT 20000`) improved about **1.64x-1.94x**. - q4, which orders by an unindexed key, improved in every cell by about **1.10x-1.14x**, showing the benefit of parallel worker sorts plus coordinator merge. - q2/q3 show smaller but consistent wins when the coordinator Sort is not the dominant cost. - q1 is neutral to slightly negative on the indexed table; the result is small enough that sorted-merge overhead can outweigh the coordinator Sort it removes. - q7/q8 counter-tests stayed flat, validating that the planner exclusions for aggregate `ORDER BY` and non-partition-column grouping avoid changing those query shapes. - Adds `src/test/regress/sql/multi_orderby_pushdown.sql` and expected output. - Adds coverage for: - ASC/DESC, NULLS FIRST/LAST, mixed direction, multi-column ORDER BY, - ORDER BY non-distribution columns, - GROUP BY distribution column, - ORDER BY expressions, - LIMIT/OFFSET, DISTINCT, DISTINCT ON, UNION ALL, CTEs, subqueries, joins, - prepared statements where the GUC is toggled after plan caching, - non-scroll cursor backward-fetch behavior, - SCROLL cursor behavior via Material insertion, - small `work_mem`, - `citus.max_intermediate_result_size`, - MX mode. - Updates EXPLAIN expected output where eligible queries now show `Custom Scan (Citus Sorted Merge Adaptive)` instead of a coordinator Sort. - The adapter streams from per-task tuplestores after `RunDistributedExecution()` has drained worker output. It does not yet interleave network reads with merge output. - The feature is intentionally limited to safe `ORDER BY` shapes. Aggregate `ORDER BY` and non-partition-column grouping remain on existing planning paths.
…Y queries (#8529) DESCRIPTION: add ORDER BY pushdown and coordinator merge for multi-shard queries. This PR adds a streaming sorted-merge execution path for multi-shard `SELECT ... ORDER BY` queries. When `citus.enable_sorted_merge` is enabled at planning time, Citus pushes safe `ORDER BY` clauses to workers, advertises sorted output from the coordinator `CustomScan`, and streams globally sorted tuples through a binary-heap merge over per-task tuplestores. The main effect is that PostgreSQL no longer needs to add a coordinator-side `Sort` node above the Citus scan. Worker shards do shard-local sorting in parallel, and the coordinator performs an `O(N log K)` k-way merge, where `K` is the task count, instead of an `O(N log N)` full-result sort. - Adds hidden experimental GUC `citus.enable_sorted_merge` (currently default `on`, `PGC_SUSET`). - Extends worker sort pushdown beyond the existing LIMIT/DISTINCT cases when the query is safe for sorted merge: - query has an `ORDER BY`, - window functions, if any, are pushable, - `ORDER BY` does not contain aggregate expressions, - `GROUP BY` is empty or grouped by the partitioning side. - Records `DistributedPlan.useSortedMerge` during physical planning. - Sets Citus CustomScan `pathkeys` to the combine query's required sort order so PostgreSQL elides the parent Sort. - Uses a dedicated `CustomScanMethods` entry, shown in EXPLAIN as `Custom Scan (Citus Sorted Merge Adaptive)`. - Adds `sorted_merge.c` / `sorted_merge.h` with a `SortedMergeAdapter` modeled after PostgreSQL `MergeAppend`. - Creates one tuplestore per task and routes worker tuples directly into the matching task store through `task->tupleDest`. - Shares one `TupleDestinationStats` object across all per-task destinations so `citus.max_intermediate_result_size` is enforced across the whole result, not per task. - Builds `SortSupportData` from the worker query sort clauses and uses a binary heap to return the next globally sorted tuple. - Adds a sorted-merge-specific `ExecCustomScan` callback that reads directly from the adapter, avoiding per-row branching between adapter and normal tuplestore paths. - Clears cached task `tupleDest` pointers after execution so prepared-plan reuse cannot see stale execution-local state. The streaming adapter is forward-only. - Sorted-merge plans do not advertise `CUSTOMPATH_SUPPORT_BACKWARD_SCAN`. - For `SCROLL` cursors, Citus reinserts the same Material wrapper PostgreSQL would normally add for non-backward-scannable plans. This is necessary because Citus replaces the plan tree after `standard_planner()`. - Non-scroll backward fetches remain unsupported, matching cursor semantics. - `SortedMergeAdapterRescan()` exists as a defensive/rescan path and rebuilds the heap from the per-task stores. - EXPLAIN plan shape changes from `Sort -> Custom Scan (Citus Adaptive)` to `Custom Scan (Citus Sorted Merge Adaptive)` for eligible queries. - The executor type is tracked as `MULTI_EXECUTOR_SORTED_MERGE` so query stats can distinguish sorted-merge executions from regular adaptive executions. Eligible examples: ```sql SELECT id, val FROM distributed_table ORDER BY id; SELECT id, val, created_at FROM distributed_table WHERE created_at >= now() - interval '1 day' ORDER BY created_at, id; SELECT tenant_id, count(*) FROM distributed_table GROUP BY tenant_id ORDER BY tenant_id; ``` Intentionally not eligible: ```sql -- ORDER BY aggregate expression SELECT tenant_id, sum(amount) AS total FROM distributed_table GROUP BY tenant_id ORDER BY total; -- GROUP BY non-distribution column SELECT user_id, count(*) FROM distributed_table GROUP BY user_id ORDER BY user_id; ``` Numbers below are from `ssm_presentation_0505.md`, using the latest documented run: - PostgreSQL 18.3 release build under `$HOME/pg18-release` - Citus built from this branch - 1 coordinator on port 9700, 4 workers on ports 9701-9704 - 8 shards, `work_mem=64MB`, `shared_buffers=4GB`, `jit=off` - `pgbench -n -T 60 -c $c -j $c -P 30 -p 9700 -d citus` - 2 tables x 2 configs x 8 query shapes x 3 client counts, 3 runs per cell - `BASE`: `citus.enable_sorted_merge=off` - `STREAM`: `citus.enable_sorted_merge=on` | Query | Shape | STREAM faster cells | Median speedup | |---|---|---:|---:| | q1 | small index only scan | 1/6 | 0.99x | | q2 | wider projection sorted by `created_at` | 6/6 | 1.03x | | q3 | multi-key sort | 6/6 | 1.08x | | q4 | unindexed sort key | 6/6 | 1.12x | | q5 | full row, no LIMIT | 6/6 | 1.57x | | q6 | full row with LIMIT | 6/6 | 1.65x | | q7/q8 | safety counter-tests | flat | 1.00x | - The strongest wins are full-row sorted reads: - q5 (`SELECT * ... ORDER BY event_id`) improved about **1.50x-1.63x**. - q6 (`SELECT * ... ORDER BY event_id LIMIT 20000`) improved about **1.64x-1.94x**. - q4, which orders by an unindexed key, improved in every cell by about **1.10x-1.14x**, showing the benefit of parallel worker sorts plus coordinator merge. - q2/q3 show smaller but consistent wins when the coordinator Sort is not the dominant cost. - q1 is neutral to slightly negative on the indexed table; the result is small enough that sorted-merge overhead can outweigh the coordinator Sort it removes. - q7/q8 counter-tests stayed flat, validating that the planner exclusions for aggregate `ORDER BY` and non-partition-column grouping avoid changing those query shapes. - Adds `src/test/regress/sql/multi_orderby_pushdown.sql` and expected output. - Adds coverage for: - ASC/DESC, NULLS FIRST/LAST, mixed direction, multi-column ORDER BY, - ORDER BY non-distribution columns, - GROUP BY distribution column, - ORDER BY expressions, - LIMIT/OFFSET, DISTINCT, DISTINCT ON, UNION ALL, CTEs, subqueries, joins, - prepared statements where the GUC is toggled after plan caching, - non-scroll cursor backward-fetch behavior, - SCROLL cursor behavior via Material insertion, - small `work_mem`, - `citus.max_intermediate_result_size`, - MX mode. - Updates EXPLAIN expected output where eligible queries now show `Custom Scan (Citus Sorted Merge Adaptive)` instead of a coordinator Sort. - The adapter streams from per-task tuplestores after `RunDistributedExecution()` has drained worker output. It does not yet interleave network reads with merge output. - The feature is intentionally limited to safe `ORDER BY` shapes. Aggregate `ORDER BY` and non-partition-column grouping remain on existing planning paths. Co-authored-by: onurctirtir <16804727+onurctirtir@users.noreply.github.com>
Streaming sorted merge for distributed
ORDER BYSummary
This PR adds a streaming sorted-merge execution path for multi-shard
SELECT ... ORDER BYqueries. Whencitus.enable_sorted_mergeis enabled at planning time, Citus pushes safeORDER BYclauses to workers, advertises sorted output from the coordinatorCustomScan, and streams globally sorted tuples through a binary-heap merge over per-task tuplestores.The main effect is that PostgreSQL no longer needs to add a coordinator-side
Sortnode above the Citus scan. Worker shards do shard-local sorting in parallel, and the coordinator performs anO(N log K)k-way merge, whereKis the task count, instead of anO(N log N)full-result sort.What changed
Planner
citus.enable_sorted_merge(currently defaulton,PGC_SUSET).ORDER BY,ORDER BYdoes not contain aggregate expressions,GROUP BYis empty or grouped by the partitioning side.DistributedPlan.useSortedMergeduring physical planning.pathkeysto the combine query's required sort order so PostgreSQL elides the parent Sort.CustomScanMethodsentry, shown in EXPLAIN asCustom Scan (Citus Sorted Merge Adaptive).Executor
sorted_merge.c/sorted_merge.hwith aSortedMergeAdaptermodeled after PostgreSQLMergeAppend.task->tupleDest.TupleDestinationStatsobject across all per-task destinations socitus.max_intermediate_result_sizeis enforced across the whole result, not per task.SortSupportDatafrom the worker query sort clauses and uses a binary heap to return the next globally sorted tuple.ExecCustomScancallback that reads directly from the adapter, avoiding per-row branching between adapter and normal tuplestore paths.tupleDestpointers after execution so prepared-plan reuse cannot see stale execution-local state.Cursor and backward-scan behavior
The streaming adapter is forward-only.
CUSTOMPATH_SUPPORT_BACKWARD_SCAN.SCROLLcursors, Citus reinserts the same Material wrapper PostgreSQL would normally add for non-backward-scannable plans. This is necessary because Citus replaces the plan tree afterstandard_planner().SortedMergeAdapterRescan()exists as a defensive/rescan path and rebuilds the heap from the per-task stores.Observability and stats
Sort -> Custom Scan (Citus Adaptive)toCustom Scan (Citus Sorted Merge Adaptive)for eligible queries.MULTI_EXECUTOR_SORTED_MERGEso query stats can distinguish sorted-merge executions from regular adaptive executions.Query behavior
Eligible examples:
Intentionally not eligible:
Performance summary
Numbers below are from
ssm_presentation_0505.md, using the latest documented run:$HOME/pg18-releasework_mem=64MB,shared_buffers=4GB,jit=offpgbench -n -T 60 -c $c -j $c -P 30 -p 9700 -d citusBASE:citus.enable_sorted_merge=offSTREAM:citus.enable_sorted_merge=onQuery-shape rollup
created_atMost important performance takeaways
SELECT * ... ORDER BY event_id) improved about 1.50x-1.63x.SELECT * ... ORDER BY event_id LIMIT 20000) improved about 1.64x-1.94x.ORDER BYand non-partition-column grouping avoid changing those query shapes.Tests
src/test/regress/sql/multi_orderby_pushdown.sqland expected output.work_mem,citus.max_intermediate_result_size,Custom Scan (Citus Sorted Merge Adaptive)instead of a coordinator Sort.Notes / limitations
RunDistributedExecution()has drained worker output. It does not yet interleave network reads with merge output.ORDER BYshapes. AggregateORDER BYand non-partition-column grouping remain on existing planning paths.