Skip to content

[fix](window) Fix dense_rank/rank partition top-n silently dropping r…#64177

Open
lide-reed wants to merge 1 commit into
apache:masterfrom
lide-reed:master
Open

[fix](window) Fix dense_rank/rank partition top-n silently dropping r…#64177
lide-reed wants to merge 1 commit into
apache:masterfrom
lide-reed:master

Conversation

@lide-reed
Copy link
Copy Markdown
Contributor

…ows when the rk=1 group exceeds batch_size

Issue Number: close #64172

Problem Summary:

A window query that filters on dense_rank() / rank(), e.g.

SELECT ... FROM (
    SELECT *, dense_rank() OVER (PARTITION BY p ORDER BY o DESC) AS rk FROM t
) WHERE rk = 1;

is rewritten into a VPartitionTopN node (partition limit = 1, TWO_PHASE_LOCAL_PTOPN). When a partition's dense_rank = 1 group has more rows than batch_size, the query silently returns only ~batch_size rows per pipeline instance instead of the whole group. No error is raised.

Root cause is in PartitionSorter::_read_row_rank() (be/src/exec/sort/partition_sorter.cpp). Its Defer signals EOS whenever _get_enough_data() is true:

Defer defer {[&]() {
    if (merged_rows == 0 || _get_enough_data()) {   // BUG
        *eos = true;
    }
}};

For DENSE_RANK, _get_enough_data() is
_output_distinct_rows >= _partition_inner_limit. With partition_inner_limit = 1, it becomes true right after the first row of the group is emitted. When the loop exits because merged_rows reached batch_size, the Defer sets *eos = true, so
PartitionSortSourceOperator advances _sort_idx to the next sorter and drops the rest of the current dense_rank = 1 group. Observed row count ends up being batch_size * num_pipeline_instances.

row_number() is not affected (its _get_enough_data() uses _output_total_rows, which only becomes true after exactly partition_inner_limit rows are emitted).

Introduce a bool finished flag that is set to true only when:

  • the loop reaches the boundary of the next distinct group with the limit already satisfied (if (_get_enough_data()) { finished = true; return; }), or
  • the input queue is fully drained.

The Defer then signals EOS only on merged_rows == 0 || finished, so the same dense_rank = 1 group is drained across multiple get_next() calls.

 Status PartitionSorter::_read_row_rank(Block* output_block, bool* eos, int batch_size) {
     ...
     size_t merged_rows = 0;
+    bool finished = false;

     Defer defer {[&]() {
-        if (merged_rows == 0 || _get_enough_data()) {
+        if (merged_rows == 0 || finished) {
             *eos = true;
         }
     }};

     while (queue.is_valid() && merged_rows < batch_size) {
         ...
         for (...; merged_rows < batch_size; ...) {
             ...
             if (!cmp_res) {
                 if (_get_enough_data()) {
+                    finished = true;
                     return Status::OK();
                 }
                 *_previous_row = *current;
                 _output_distinct_rows++;
             }
             ...
         }
     }
+
+    if (!queue.is_valid()) {
+        finished = true;
+    }
     return Status::OK();
 }

Fix dense_rank() / rank() window functions with a top-n filter (e.g. WHERE rk = 1) silently dropping rows when the matching group is larger than batch_size.

  • Test

    • Regression test
    • Unit Test
    • Manual test (add detailed scripts or steps below)
    • No need to test or manual test. Explain why:

    Manual test (single BE, single tablet, single partition value; the bug is
    deterministic):

    CREATE TABLE t_min (ts DATE NOT NULL, pk BIGINT NOT NULL, val INT)
    DUPLICATE KEY(ts, pk) DISTRIBUTED BY HASH(pk) BUCKETS 1
    PROPERTIES ("replication_num" = "1");
    
    INSERT INTO t_min SELECT DATE '2024-01-02', 1, number FROM numbers("number" = "10000");
    INSERT INTO t_min SELECT DATE '2024-01-01', 1, number FROM numbers("number" = "3");
    
    SET parallel_pipeline_task_num = 1;
    SELECT /*+ SET_VAR(batch_size = 1024) */ ts, COUNT(*) cnt FROM (
      SELECT t.*, dense_rank() OVER (PARTITION BY pk ORDER BY ts DESC) dr FROM t_min t
    ) x WHERE dr = 1 GROUP BY ts ORDER BY ts;
    batch_size before (buggy) after (fixed)
    256 256 10000
    1024 1024 10000
    4096 4096 10000
    16384 10000 10000

    Also verified on a 3-BE cluster against a 15M-row production table: before
    the fix the same query returned varying wrong counts
    (e.g. 149990 / 109350 across runs, and 8 * batch_size exactly under
    parallel_pipeline_task_num = 1 per instance); after the fix it is stable
    and matches the golden value from SET enable_partition_topn = false.

    New unit tests:
    test_dense_rank_first_group_exceeds_batch_size_regression and
    test_rank_first_group_exceeds_batch_size_regression build a first group
    of 5000 rows with batch_size = 4096 and assert that all 5000 rows are
    emitted across multiple get_next() calls (they fail before the fix,
    pass after).

  • Behavior changed:

    • No.
  • Does this need documentation?

    • No.
  • Confirm the release note

  • Confirm test cases

  • Confirm document

  • Add branch pick label

What problem does this PR solve?

Issue Number: close #xxx

Related PR: #xxx

Problem Summary:

Release note

None

Check List (For Author)

  • Test

    • Regression test
    • Unit Test
    • Manual test (add detailed scripts or steps below)
    • No need to test or manual test. Explain why:
      • This is a refactor/code format and no logic has been changed.
      • Previous test can cover this change.
      • No code files have been changed.
      • Other reason
  • Behavior changed:

    • No.
    • Yes.
  • Does this need documentation?

    • No.
    • Yes.

Check List (For Reviewer who merge this PR)

  • Confirm the release note
  • Confirm test cases
  • Confirm document
  • Add branch pick label

…ows when the rk=1 group exceeds batch_size

Issue Number: close apache#64172

Problem Summary:

A window query that filters on `dense_rank()` / `rank()`, e.g.

```sql
SELECT ... FROM (
    SELECT *, dense_rank() OVER (PARTITION BY p ORDER BY o DESC) AS rk FROM t
) WHERE rk = 1;
```

is rewritten into a `VPartitionTopN` node (`partition limit = 1`,
`TWO_PHASE_LOCAL_PTOPN`). When a partition's `dense_rank = 1` group has more
rows than `batch_size`, the query silently returns only ~`batch_size` rows
per pipeline instance instead of the whole group. No error is raised.

Root cause is in `PartitionSorter::_read_row_rank()`
(`be/src/exec/sort/partition_sorter.cpp`). Its `Defer` signals EOS whenever
`_get_enough_data()` is true:

```cpp
Defer defer {[&]() {
    if (merged_rows == 0 || _get_enough_data()) {   // BUG
        *eos = true;
    }
}};
```

For `DENSE_RANK`, `_get_enough_data()` is
`_output_distinct_rows >= _partition_inner_limit`. With
`partition_inner_limit = 1`, it becomes true right after the *first* row of
the group is emitted. When the loop exits because `merged_rows` reached
`batch_size`, the `Defer` sets `*eos = true`, so
`PartitionSortSourceOperator` advances `_sort_idx` to the next sorter and
drops the rest of the current `dense_rank = 1` group. Observed row count
ends up being `batch_size * num_pipeline_instances`.

`row_number()` is not affected (its `_get_enough_data()` uses
`_output_total_rows`, which only becomes true after exactly
`partition_inner_limit` rows are emitted).

Introduce a `bool finished` flag that is set to true only when:
- the loop reaches the boundary of the next distinct group with the limit
  already satisfied (`if (_get_enough_data()) { finished = true; return; }`),
  or
- the input queue is fully drained.

The `Defer` then signals EOS only on `merged_rows == 0 || finished`, so the
same `dense_rank = 1` group is drained across multiple `get_next()` calls.

```diff
 Status PartitionSorter::_read_row_rank(Block* output_block, bool* eos, int batch_size) {
     ...
     size_t merged_rows = 0;
+    bool finished = false;

     Defer defer {[&]() {
-        if (merged_rows == 0 || _get_enough_data()) {
+        if (merged_rows == 0 || finished) {
             *eos = true;
         }
     }};

     while (queue.is_valid() && merged_rows < batch_size) {
         ...
         for (...; merged_rows < batch_size; ...) {
             ...
             if (!cmp_res) {
                 if (_get_enough_data()) {
+                    finished = true;
                     return Status::OK();
                 }
                 *_previous_row = *current;
                 _output_distinct_rows++;
             }
             ...
         }
     }
+
+    if (!queue.is_valid()) {
+        finished = true;
+    }
     return Status::OK();
 }
```

Fix `dense_rank()` / `rank()` window functions with a top-n filter (e.g.
`WHERE rk = 1`) silently dropping rows when the matching group is larger
than `batch_size`.

- Test
    - [ ] Regression test
    - [x] Unit Test <!-- new BE unit tests in be/test/exec/sort/partition_sorter_test.cpp -->
    - [x] Manual test (add detailed scripts or steps below)
    - [ ] No need to test or manual test. Explain why:

  Manual test (single BE, single tablet, single partition value; the bug is
  deterministic):

  ```sql
  CREATE TABLE t_min (ts DATE NOT NULL, pk BIGINT NOT NULL, val INT)
  DUPLICATE KEY(ts, pk) DISTRIBUTED BY HASH(pk) BUCKETS 1
  PROPERTIES ("replication_num" = "1");

  INSERT INTO t_min SELECT DATE '2024-01-02', 1, number FROM numbers("number" = "10000");
  INSERT INTO t_min SELECT DATE '2024-01-01', 1, number FROM numbers("number" = "3");

  SET parallel_pipeline_task_num = 1;
  SELECT /*+ SET_VAR(batch_size = 1024) */ ts, COUNT(*) cnt FROM (
    SELECT t.*, dense_rank() OVER (PARTITION BY pk ORDER BY ts DESC) dr FROM t_min t
  ) x WHERE dr = 1 GROUP BY ts ORDER BY ts;
  ```

  | batch_size | before (buggy) | after (fixed) |
  |---|---|---|
  | 256   | 256   | 10000 |
  | 1024  | 1024  | 10000 |
  | 4096  | 4096  | 10000 |
  | 16384 | 10000 | 10000 |

  Also verified on a 3-BE cluster against a 15M-row production table: before
  the fix the same query returned varying wrong counts
  (e.g. 149990 / 109350 across runs, and `8 * batch_size` exactly under
  `parallel_pipeline_task_num = 1` per instance); after the fix it is stable
  and matches the golden value from `SET enable_partition_topn = false`.

  New unit tests:
  `test_dense_rank_first_group_exceeds_batch_size_regression` and
  `test_rank_first_group_exceeds_batch_size_regression` build a first group
  of 5000 rows with `batch_size = 4096` and assert that all 5000 rows are
  emitted across multiple `get_next()` calls (they fail before the fix,
  pass after).

- Behavior changed:
    - [x] No. <!-- It fixes wrong results; correct queries are unaffected, and row_number()/other window functions are unchanged. -->

- Does this need documentation?
    - [x] No.

- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label
@hello-stephen
Copy link
Copy Markdown
Contributor

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@lide-reed lide-reed requested review from BiteTheDDDDt and Mryange June 7, 2026 08:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] dense_rank() / rank() with partition top-n filter (WHERE rk = 1) silently drops rows, returning only batch_size rows per pipeline instance

2 participants