Sorted merge: Fix latent bugs in SortedMergeAdapterRescan#8583
Closed
neildsh wants to merge 18 commits into
Closed
Sorted merge: Fix latent bugs in SortedMergeAdapterRescan#8583neildsh wants to merge 18 commits into
neildsh wants to merge 18 commits into
Conversation
DESCRIPTION: set search paths for UDFs
Use relaxed version check in MajorVersionsCompatible
…data#8547) DESCRIPTION: PR description that will go into the change log, up to 78 characters
Co-authored-by: Copilot <copilot@github.com> DESCRIPTION: fix for the devcontainer post dep updates Co-authored-by: Copilot <copilot@github.com>
…ata#7799) (citusdata#8540) DESCRIPTION: Fix deadlock when adding named constraint on partitioned table with long partition name. Previously, SwitchToSequentialAndLocalExecutionIfConstraintNameTooLong was only called when the user did NOT provide an explicit constraint name. When a name was provided (e.g., ALTER TABLE ... ADD CONSTRAINT myname UNIQUE(...)), the code fell through to the standard parallel DDL path without checking if the auto-generated constraint names on partition shards would exceed NAMEDATALEN, causing potential self-deadlocks. This commit adds calls to SwitchToSequentialAndLocalExecutionIfConstraintNameTooLong for both named foreign key constraints and named non-FK constraints (PRIMARY KEY, UNIQUE, EXCLUDE, CHECK) in PreprocessAlterTableStmt, so that the long partition shard name check is performed regardless of whether the user specified a constraint name. Also adds regression tests covering named PRIMARY KEY, UNIQUE, and CHECK constraints on partitioned tables with long partition names, including the error case when a parallel query has already been executed in the same transaction. DESCRIPTION: PR description that will go into the change log, up to 78 characters
…er-wide backup coordination (citusdata#8543) ### Problem Citus uses Two-Phase Commit (2PC) for distributed write transactions. When taking cluster-wide backups, independent per-node snapshots can capture mid-flight 2PC transactions in inconsistent states, For example a coordinator with the commit record but a worker missing the prepared transaction, leading to irrecoverable data inconsistency on restore. ### Solution Temporarily block distributed 2PC commit decisions and schema/topology changes across the entire cluster while disk snapshots are taken. Acquiring ExclusiveLock on pg_dist_transaction guarantees that all in-flight 2PC transactions have fully completed (including COMMIT PREPARED on every worker) before the lock is granted, and no new commit decisions can begin while it is held, because the 2PC commit path takes RowExclusiveLock on pg_dist_transaction to record the commit, which conflicts with our ExclusiveLock. This creates a clean partition: everything before the lock is fully committed on all nodes; everything after hasn't started. **Read queries and single-shard writes are never blocked. Multi-shard / cross-node writes that go through 2PC are queued (not aborted) and proceed after unblock.** ### New UDFs All require superuser and must be called on the coordinator. block/unblock are revoked from PUBLIC; block_status is readable by any role. **`citus_cluster_changes_block(timeout_ms int DEFAULT 300000)`** Spawns a dedicated background worker that acquires **ExclusiveLock on pg_dist_node, pg_dist_partition, and pg_dist_transaction on the coordinator,** then sends LOCK TABLE … IN EXCLUSIVE MODE for pg_dist_partition and pg_dist_transaction to every metadata worker. (pg_dist_node is intentionally coordinator-only, node management is not delegated.) Returns true when all locks are held cluster-wide. timeout_ms is bounded 1..1,800,000. **`citus_cluster_changes_unblock()`** Signals the background worker to release all locks and exit. Can be called from any session. Returns true on success, false if no block was active (idempotent, never errors). **`citus_cluster_changes_block_status()`** Returns a single row: (state, worker_pid, requestor_pid, block_start_time, timeout_ms, node_count). Possible States - inactive - starting - active - releasing - error ### Architecture A dedicated background worker holds a single transaction whose lifetime equals the lock-holding period. This ensures locks survive the caller's session disconnect. All termination paths (explicit release, timeout expiry, SIGTERM, error, postmaster death) end the transaction, guaranteeing locks cannot leak. The worker uses PostgreSQL's WaitEventSet API to simultaneously monitor its latch (for release / SIGTERM), the postmaster, and every remote connection socket. If any worker-node connection drops, the failure is detected within milliseconds via PQconsumeInput / PQstatus and the block is released with an error. Shared memory (ClusterChangesBlockControlData, protected by a dedicated LWLock) coordinates state between the UDFs and the worker. The state machine is INACTIVE → STARTING → ACTIVE → RELEASING → INACTIVE (or → ERROR on failure). Concurrent block() calls are serialised under a single LW_EXCLUSIVE acquisition (no TOCTOU). Stale state from a crashed worker is auto-cleaned by block_status() via a kill(pid, 0) liveness check. Remote lock acquisition is bounded by SET LOCAL statement_timeout on each worker connection, preventing indefinite hangs if a remote node is unresponsive during the LOCK TABLE phase. The BLOCK_DISTRIBUTED_WRITES_COMMAND macro (shared with citus_create_restore_point) is defined in cluster_changes_block.h to avoid duplication. ### Out of scope This UDF only guarantees no 2PC commit decisions cross the snapshot boundary. WAL flushing, snapshot orchestration, and per-node backup timing remain the operator's responsibility. ### Tests New regression test citus_cluster_changes_block covering happy path, argument validation, permission enforcement, double-block rejection, idempotent unblock, repeated cycles, SIGTERM cleanup, timeout-driven auto-release, and structural + behavioural lock verification on the coordinator and every metadata worker.
…or-local shard (citusdata#8426) (citusdata#8561) ## Summary Fixes citusdata#8426. When `citus.writable_standby_coordinator` is enabled on a hot standby, modifying queries are forwarded to primary nodes via libpq. However, if the targeted shard has a placement on the local (coordinator) group, the local executor was invoked anyway. The local plan was then finalized against a read-only storage layer, leading to a NULL \`custom_private\` dereference in \`IsCitusCustomScan\` and a backend crash. This patch detects the situation in \`TaskAccessesLocalNode\` -- the single chokepoint that all local-vs-remote routing decisions pass through -- and raises a clear \`ERROR\` (\`errcode read_only_sql_transaction\`) before entering the broken local-execution path. Read tasks remain unaffected and continue to execute locally, which is the intended use of a hot standby. ## Behavior change The only user-visible change is the error message text. Previously, INSERT/UPDATE/DELETE against a coordinator-local shard on a writable standby would either crash (the original report) or surface a misleading PostgreSQL-level error like \`cannot execute INSERT in a read-only transaction\`. After this patch: \`\`\` ERROR: cannot execute modification on a hot standby for a shard placement that resides on this node DETAIL: citus.writable_standby_coordinator forwards writes to primary nodes, but the targeted shard has a placement on the local node, whose storage is read-only during recovery. HINT: Place the affected shards on primary worker nodes only, or fail over this node to perform writes. \`\`\` The session remains usable after the error. ## Tests - Updated [\`multi_follower_dml\`](src/test/regress/expected/multi_follower_dml.out) expected output: every pre-existing \`citus_local_table\` modification line that previously bottomed out at PG's read-only check now emits the new error (8 sites). - Added an explicit regression block tagged with the issue number, exercising INSERT, UPDATE, DELETE, modifying CTE, and a follow-up SELECT to assert the session is still usable. \`make -C src/test/regress check-follower-cluster\` passes all 8 tests locally. ## Files - [\`src/backend/distributed/executor/local_executor.c\`](src/backend/distributed/executor/local_executor.c) — added \`#include \"access/xlog.h\"\` and the recovery-aware error in \`TaskAccessesLocalNode\`. - [\`src/test/regress/sql/multi_follower_dml.sql\`](src/test/regress/sql/multi_follower_dml.sql) — explicit citusdata#8426 regression block. - [\`src/test/regress/expected/multi_follower_dml.out\`](src/test/regress/expected/multi_follower_dml.out) — refreshed expected output.
…usdata#8548) (citusdata#8556) DESCRIPTION: Fix segfault in EXPLAIN with LEFT JOIN and correlated subqueries (citusdata#8548) Fix by adding a NULL check that skips discarded subplan entries. This is needed because PostgreSQL's setrefs.c (set_plan_references) resolves AlternativeSubPlan nodes by picking one alternative and setting the discarded subplan entries to NULL in PlannedStmt->subplans. PlanContainsDistributedSubPlanRTE() iterated this list without a NULL check, hitting the segfault reported in citusdata#8548. Also refactor the function to accept DistributedPlanningContext* instead of a raw List*, making it evident that we're iterating PostgreSQL's PlannedStmt->subplans (where NULL entries are expected). Co-authored-by: ibrahim halatci <ihalatci@gmail.com>
…Y queries (citusdata#8529) DESCRIPTION: add ORDER BY pushdown and coordinator merge for multi-shard queries. This PR adds a streaming sorted-merge execution path for multi-shard `SELECT ... ORDER BY` queries. When `citus.enable_sorted_merge` is enabled at planning time, Citus pushes safe `ORDER BY` clauses to workers, advertises sorted output from the coordinator `CustomScan`, and streams globally sorted tuples through a binary-heap merge over per-task tuplestores. The main effect is that PostgreSQL no longer needs to add a coordinator-side `Sort` node above the Citus scan. Worker shards do shard-local sorting in parallel, and the coordinator performs an `O(N log K)` k-way merge, where `K` is the task count, instead of an `O(N log N)` full-result sort. - Adds hidden experimental GUC `citus.enable_sorted_merge` (currently default `on`, `PGC_SUSET`). - Extends worker sort pushdown beyond the existing LIMIT/DISTINCT cases when the query is safe for sorted merge: - query has an `ORDER BY`, - window functions, if any, are pushable, - `ORDER BY` does not contain aggregate expressions, - `GROUP BY` is empty or grouped by the partitioning side. - Records `DistributedPlan.useSortedMerge` during physical planning. - Sets Citus CustomScan `pathkeys` to the combine query's required sort order so PostgreSQL elides the parent Sort. - Uses a dedicated `CustomScanMethods` entry, shown in EXPLAIN as `Custom Scan (Citus Sorted Merge Adaptive)`. - Adds `sorted_merge.c` / `sorted_merge.h` with a `SortedMergeAdapter` modeled after PostgreSQL `MergeAppend`. - Creates one tuplestore per task and routes worker tuples directly into the matching task store through `task->tupleDest`. - Shares one `TupleDestinationStats` object across all per-task destinations so `citus.max_intermediate_result_size` is enforced across the whole result, not per task. - Builds `SortSupportData` from the worker query sort clauses and uses a binary heap to return the next globally sorted tuple. - Adds a sorted-merge-specific `ExecCustomScan` callback that reads directly from the adapter, avoiding per-row branching between adapter and normal tuplestore paths. - Clears cached task `tupleDest` pointers after execution so prepared-plan reuse cannot see stale execution-local state. The streaming adapter is forward-only. - Sorted-merge plans do not advertise `CUSTOMPATH_SUPPORT_BACKWARD_SCAN`. - For `SCROLL` cursors, Citus reinserts the same Material wrapper PostgreSQL would normally add for non-backward-scannable plans. This is necessary because Citus replaces the plan tree after `standard_planner()`. - Non-scroll backward fetches remain unsupported, matching cursor semantics. - `SortedMergeAdapterRescan()` exists as a defensive/rescan path and rebuilds the heap from the per-task stores. - EXPLAIN plan shape changes from `Sort -> Custom Scan (Citus Adaptive)` to `Custom Scan (Citus Sorted Merge Adaptive)` for eligible queries. - The executor type is tracked as `MULTI_EXECUTOR_SORTED_MERGE` so query stats can distinguish sorted-merge executions from regular adaptive executions. Eligible examples: ```sql SELECT id, val FROM distributed_table ORDER BY id; SELECT id, val, created_at FROM distributed_table WHERE created_at >= now() - interval '1 day' ORDER BY created_at, id; SELECT tenant_id, count(*) FROM distributed_table GROUP BY tenant_id ORDER BY tenant_id; ``` Intentionally not eligible: ```sql -- ORDER BY aggregate expression SELECT tenant_id, sum(amount) AS total FROM distributed_table GROUP BY tenant_id ORDER BY total; -- GROUP BY non-distribution column SELECT user_id, count(*) FROM distributed_table GROUP BY user_id ORDER BY user_id; ``` Numbers below are from `ssm_presentation_0505.md`, using the latest documented run: - PostgreSQL 18.3 release build under `$HOME/pg18-release` - Citus built from this branch - 1 coordinator on port 9700, 4 workers on ports 9701-9704 - 8 shards, `work_mem=64MB`, `shared_buffers=4GB`, `jit=off` - `pgbench -n -T 60 -c $c -j $c -P 30 -p 9700 -d citus` - 2 tables x 2 configs x 8 query shapes x 3 client counts, 3 runs per cell - `BASE`: `citus.enable_sorted_merge=off` - `STREAM`: `citus.enable_sorted_merge=on` | Query | Shape | STREAM faster cells | Median speedup | |---|---|---:|---:| | q1 | small index only scan | 1/6 | 0.99x | | q2 | wider projection sorted by `created_at` | 6/6 | 1.03x | | q3 | multi-key sort | 6/6 | 1.08x | | q4 | unindexed sort key | 6/6 | 1.12x | | q5 | full row, no LIMIT | 6/6 | 1.57x | | q6 | full row with LIMIT | 6/6 | 1.65x | | q7/q8 | safety counter-tests | flat | 1.00x | - The strongest wins are full-row sorted reads: - q5 (`SELECT * ... ORDER BY event_id`) improved about **1.50x-1.63x**. - q6 (`SELECT * ... ORDER BY event_id LIMIT 20000`) improved about **1.64x-1.94x**. - q4, which orders by an unindexed key, improved in every cell by about **1.10x-1.14x**, showing the benefit of parallel worker sorts plus coordinator merge. - q2/q3 show smaller but consistent wins when the coordinator Sort is not the dominant cost. - q1 is neutral to slightly negative on the indexed table; the result is small enough that sorted-merge overhead can outweigh the coordinator Sort it removes. - q7/q8 counter-tests stayed flat, validating that the planner exclusions for aggregate `ORDER BY` and non-partition-column grouping avoid changing those query shapes. - Adds `src/test/regress/sql/multi_orderby_pushdown.sql` and expected output. - Adds coverage for: - ASC/DESC, NULLS FIRST/LAST, mixed direction, multi-column ORDER BY, - ORDER BY non-distribution columns, - GROUP BY distribution column, - ORDER BY expressions, - LIMIT/OFFSET, DISTINCT, DISTINCT ON, UNION ALL, CTEs, subqueries, joins, - prepared statements where the GUC is toggled after plan caching, - non-scroll cursor backward-fetch behavior, - SCROLL cursor behavior via Material insertion, - small `work_mem`, - `citus.max_intermediate_result_size`, - MX mode. - Updates EXPLAIN expected output where eligible queries now show `Custom Scan (Citus Sorted Merge Adaptive)` instead of a coordinator Sort. - The adapter streams from per-task tuplestores after `RunDistributedExecution()` has drained worker output. It does not yet interleave network reads with merge output. - The feature is intentionally limited to safe `ORDER BY` shapes. Aggregate `ORDER BY` and non-partition-column grouping remain on existing planning paths.
This needs to be taken to main branch too. 1. Move more tests that are creating Citus extension into multi_1_create_citus_schedule. 2. Remove unnecessary worker tests from schema_based_sharding.sql as we're anyway testing them a lot in other test files. 3. Also add a placeholder section at the end of multi_1_create_citus_schedule for the tests that need to be moved back to their original schedules with the next major version of Citus.
…_catalog.worker_adjust_identity_column_seq_ranges At Citus 13 and 14, they were kept to maintain backward compatibility during minor version upgrades.
Previously, the script required that whenever an upgrade migration script was "changed", the matching downgrade script also "showed up" in the diff. That was not quite right: if an upgrade script is removed, the downgrade should be removed too; and if an upgrade script is added or modified, the downgrade should be added or modified too (and vice versa). When the script compared release-13.2-cp-mx-0506 to origin/main, the downgrades/citus--15.0-1--14.0-1.sql from origin/main was effectively renamed to downgrades/citus--13.3-1--13.2-1.sql (the contents are similar enough that git detects it as a rename). As a result, `git diff` reports only the new path, hiding the deletion side of the rename from the old check. The new downgrade file then looks like a change without a matching upgrade change, producing a false positive. Pass --no-renames so a rename shows up as a delete + add pair, and require that each upgrade/downgrade pair share the same status (both deleted, both changed, or both absent). Also walk the changed downgrade files to catch downgrade-only changes with no matching upgrade change. (cherry picked from commit d4f3749)
Could be done in a separate PR but withouts this, even the checks in this PR are failing.
…e follow-up work (citusdata#8567) In the context of citusdata#8541: - Drop legacy impls for pg_catalog.worker_apply_sequence_command and pg_catalog.worker_adjust_identity_column_seq_ranges. - Update technical readme. --- Also bring the fixes applied to release 13 / 14 to main: - Permanently resolve some incompatibilities for N-1 tests. - Fix check_migration_files.sh. --- Also, let's use GHA runners for test-arbitrary-configs too. This could be done in a separate PR but without this, even the checks in this PR are failing.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fix two latent bugs in
SortedMergeAdapterRescanSummary
This PR fixes two correctness/robustness bugs in the sorted-merge rescan callback that surfaced during a pen test of #8529. Both bugs live in code that is unreachable in production because the
materialize_finished_planwrapper added in #8529 absorbs every rescan that PostgreSQL would otherwise forward to the sorted-mergeCustomScan. The fixes harden the defensive rescan path so it produces correct results — and does not crash — if the absorber is ever weakened, removed, or bypassed by a future PG release.Background
SortedMergeAdapterstreams globally-sorted tuples by running a binary min-heap over per-task tuplestores (sorted_merge.c).SortedMergeAdapterNext()has two branches:!initialized): rewind every per-task store, seed the heap with the first tuple from each, then return the heap's current winner.initialized): advance the previous winner's store, update the heap, then return the new winner.SortedMergeAdapterRescan()is the rescan callback installed on the sorted-mergeCustomScan. PostgreSQL invokes it fromExecutorRewind()on the only SQL shape that reaches it: aSCROLL CURSOR WITH HOLDwhose holdable portal is being persisted byPersistHoldablePortal()atCOMMITtime. In production, the Material node inserted bymaterialize_finished_plan()indistributed_planner.cabsorbs that rescan, so the sorted-merge callback never runs.Bug 1 — off-by-one tuple drop after rescan (correctness)
Where:
src/backend/distributed/executor/sorted_merge.c::SortedMergeAdapterRescanThe original implementation pre-seeded the heap with the first tuple from each store and then set
initialized = true:Because the adapter is now marked as initialized, the next
SortedMergeAdapterNext()call enters theelsebranch, treats the freshly-seeded heap winner as a previous winner, advances that store and only then returns the new winner. The first globally-minimal tuple after every rescan is silently dropped.Reproduction (with the Material absorber temporarily removed)
Fix
Don't seed the heap in
Rescan. Mark the adapter as uninitialized and let the nextNext()call do its normal first-call seed via the existingif (!adapter->initialized)branch, which already rewinds each store, seeds the heap, and returns the winner instead of advancing past it:Bug 2 — NULL-deref crash when rescan fires before any FETCH
Where:
src/backend/distributed/executor/citus_custom_scan.c::SortedMergeReScanThe sorted-merge
CustomScanrunsAdaptiveExecutor()(and thusCreatePerTaskDispatchDests(), which installsscanState->mergeAdapter) lazily — only on the first tuple fetched from the scan. So aSCROLL CURSOR WITH HOLDthat is declared, never fetched from, and then committed will reachSortedMergeReScan()withscanState->mergeAdapter == NULL.The previous code dereferenced it unconditionally:
Reproduction (with the Material absorber temporarily removed)
Fix
Guard the adapter pointer before calling rescan, mirroring the existing
CitusReScan()pattern that already guardsscanState->tuplestorestate:Test coverage
Added seven new sub-tests (
N2..N7) to the existingCategory N(SCROLL CURSOR WITH HOLD rescan) section ofsrc/test/regress/sql/multi_orderby_pushdown.sql, alongside the pre-existingN1test. These document the exact SQL shapes that reproduced the bugs and act as regression guards:N2WITH HOLD+FETCH 3+COMMIT+FETCH ALL, no LIMITN3WITH HOLD+FETCH 0+COMMIT+FETCH ALLN4WITH HOLD+ multi-keyORDER BY id%3, id+ rescanN5WITH HOLD+ORDER BY id DESC+ rescanN6WITH HOLD+ repeated single-row fetches after rescanN7WITH HOLDover empty result + rescanIn production (with
materialize_finished_planin place) all of these queries already returned correct results because the Material node absorbed the rescan. They are still kept in the suite as documentation and to catch any future regression where the absorber is changed, weakened, or bypassed by a new PG release. With the absorber temporarily removed during pen-testing,N2/N4/N5/N6reproduce the row-drop bug andN3reproduces the crash; with the fixes applied (and the absorber still removed) all seven produce correct results.Impact
materialize_finished_planabsorber, so no released sorted-merge plan can hit them today.Rescan()into the nextNext()call (where the existing uninitialized-path code already lives). Zero extra work either way.Files changed
src/backend/distributed/executor/sorted_merge.c— Fix Bug 1 inSortedMergeAdapterRescan.src/backend/distributed/executor/citus_custom_scan.c— Fix Bug 2 inSortedMergeReScan.src/test/regress/sql/multi_orderby_pushdown.sql— AddN2..N7tests under the existing Category N.src/test/regress/expected/multi_orderby_pushdown.out— Expected output for the new tests.Verification
make -C src/test/regress check-base EXTRA_TESTS='multi_orderby_pushdown'passes (15/15 tests, including the newN2..N7).materialize_finished_plancall (used only locally for verification — not part of this PR):N2returns 16 rows instead of 17 (row 4 missing);N3SIGSEGVs inSortedMergeAdapterRescanon a NULL adapter.N2..N7all return correct results.