Skip to content

[Bug] dense_rank() / rank() with partition top-n filter (WHERE rk = 1) silently drops rows, returning only batch_size rows per pipeline instance #64172

@lide-reed

Description

@lide-reed

Search before asking

  • I had searched in the issues and found no similar issues.

Version

Reproduced on the current master. The same buggy code path
(PartitionSorter::_read_row_rank) is present in 2.1 / 3.x / 4.x as well.
The following reproduce verifying in 4.1.1

What's Wrong?

A window query that filters on dense_rank() (or rank()), such as

SELECT ... FROM (
    SELECT *, dense_rank() OVER (PARTITION BY p ORDER BY o DESC) AS rk
    FROM t
) WHERE rk = 1;

is rewritten by Nereids into a VPartitionTopN node with
partition limit = 1 and partition topn phase: TWO_PHASE_LOCAL_PTOPN.
When the dense_rank = 1 group of a partition contains more rows than
batch_size
, the query silently returns only ~batch_size rows per
pipeline instance
instead of the full group. No error is raised — the
result is just wrong.

PartitionSorter::_read_row_rank() declares EOS via a Defer
(be/src/exec/sort/partition_sorter.cpp):

Status PartitionSorter::_read_row_rank(Block* output_block, bool* eos, int batch_size) {
    ...
    size_t merged_rows = 0;

    Defer defer {[&]() {
        if (merged_rows == 0 || _get_enough_data()) {   // <-- BUG
            *eos = true;
        }
    }};

    while (queue.is_valid() && merged_rows < batch_size) {
        ...
        for (...; merged_rows < batch_size; ...) {
            bool cmp_res = _previous_row->impl && _previous_row->compare_two_rows(current->impl);
            if (!cmp_res) {
                if (_get_enough_data()) {
                    return Status::OK();
                }
                *_previous_row = *current;
                _output_distinct_rows++;
            }
            // emit row
            ...
            merged_rows++;
            _output_total_rows++;
            ...
        }
    }
    return Status::OK();
}

For DENSE_RANK, _get_enough_data() is
(be/src/exec/sort/partition_sorter.h):

bool _get_enough_data() const {
    if (_top_n_algorithm == TopNAlgorithm::DENSE_RANK) {
        return _output_distinct_rows >= _partition_inner_limit;
    } else {
        return _output_total_rows >= _partition_inner_limit;
    }
}

With partition_inner_limit = 1 (i.e. WHERE rk = 1),
_output_distinct_rows becomes 1 right after the first row of the
dense_rank = 1 group is emitted, so _get_enough_data() returns true
immediately. The rest of that group has not been emitted yet — it is
supposed to be drained across multiple get_next() calls.

But when the inner loop exits because merged_rows reached batch_size,
the Defer unconditionally sets *eos = true. The source operator then
treats the sorter as fully drained
(be/src/exec/operator/partition_sort_source_operator.cpp):

if (local_state._sort_idx < sorter_size) {
    RETURN_IF_ERROR(
            sorters[local_state._sort_idx]->get_next(state, output_block, &current_eos));
}
if (current_eos) {
    local_state._sort_idx++;   // advance to next sorter, dropping the rest
    ...
}

so it advances _sort_idx to the next sorter and permanently drops all
remaining rows of the current dense_rank = 1 group
.

Net effect: each pipeline instance emits at most batch_size rows of the
dense_rank = 1 group, so the observed row count is roughly
batch_size * number_of_pipeline_instances, regardless of the group's
true size.

row_number() is not affected: its _get_enough_data() uses
_output_total_rows >= _partition_inner_limit, which correctly becomes
true only after exactly partition_inner_limit rows are emitted.

What You Expected?

The query should return the entire dense_rank = 1 group (all rows
that share the top sort-key value within each partition), independent of
batch_size or the number of pipeline instances.

How to Reproduce?

Single BE, single tablet, single partition value — no special data layout
needed (the bug is deterministic):

DROP DATABASE IF EXISTS bug_repro;
CREATE DATABASE bug_repro;
USE bug_repro;

CREATE TABLE t_min (
    ts  DATE   NOT NULL,
    pk  BIGINT NOT NULL,
    val INT
)
DUPLICATE KEY(ts, pk)
DISTRIBUTED BY HASH(pk) BUCKETS 1
PROPERTIES ("replication_num" = "1");

-- dense_rank = 1 group (ts = 2024-01-02): 10000 rows, far larger than batch_size.
-- Total rows (10003) < PARTITION_SORT_ROWS_THRESHOLD (20000 in release build),
-- so partial sort is not triggered and the lost count is cleanly = batch_size.
INSERT INTO t_min
SELECT DATE '2024-01-02', 1, number FROM numbers("number" = "10000");

-- dense_rank = 2 group (ts = 2024-01-01): 3 rows (just to create a 2nd distinct ts).
INSERT INTO t_min
SELECT DATE '2024-01-01', 1, number FROM numbers("number" = "3");

-- Single instance for a clean, deterministic count.
SET parallel_pipeline_task_num = 1;

SELECT /*+ SET_VAR(batch_size =   256) */ ts, COUNT(*) cnt FROM (
  SELECT t.*, dense_rank() OVER (PARTITION BY pk ORDER BY ts DESC) dr FROM t_min t
) x WHERE dr = 1 GROUP BY ts ORDER BY ts;

SELECT /*+ SET_VAR(batch_size =  1024) */ ts, COUNT(*) cnt FROM (
  SELECT t.*, dense_rank() OVER (PARTITION BY pk ORDER BY ts DESC) dr FROM t_min t
) x WHERE dr = 1 GROUP BY ts ORDER BY ts;

SELECT /*+ SET_VAR(batch_size =  4096) */ ts, COUNT(*) cnt FROM (
  SELECT t.*, dense_rank() OVER (PARTITION BY pk ORDER BY ts DESC) dr FROM t_min t
) x WHERE dr = 1 GROUP BY ts ORDER BY ts;

SELECT /*+ SET_VAR(batch_size = 16384) */ ts, COUNT(*) cnt FROM (
  SELECT t.*, dense_rank() OVER (PARTITION BY pk ORDER BY ts DESC) dr FROM t_min t
) x WHERE dr = 1 GROUP BY ts ORDER BY ts;

Actual output (buggy):

batch_size = 256    -> ts=2024-01-02, cnt=  256
batch_size = 1024   -> ts=2024-01-02, cnt= 1024
batch_size = 4096   -> ts=2024-01-02, cnt= 4096
batch_size = 16384  -> ts=2024-01-02, cnt=10000   (>= group size, so nothing dropped)

cnt is exactly min(batch_size, group_size) — the row count tracks
batch_size, which is clearly wrong.

Expected output (correct): always

ts=2024-01-02, cnt=10000

Confirm via EXPLAIN and the golden path

EXPLAIN
SELECT /*+ SET_VAR(batch_size = 1024) */ ts, COUNT(*) cnt FROM (
  SELECT t.*, dense_rank() OVER (PARTITION BY pk ORDER BY ts DESC) dr FROM t_min t
) x WHERE dr = 1 GROUP BY ts ORDER BY ts;
-- contains:
--   VPartitionTopN
--     functions: dense_rank
--     partition limit: 1
--     partition topn phase: TWO_PHASE_LOCAL_PTOPN

-- Disabling partition top-n routes through plain VAnalytic and is always correct:
SELECT /*+ SET_VAR(enable_partition_topn = false, batch_size = 256) */
       ts, COUNT(*) cnt FROM (
  SELECT t.*, dense_rank() OVER (PARTITION BY pk ORDER BY ts DESC) dr FROM t_min t
) x WHERE dr = 1 GROUP BY ts ORDER BY ts;
-- ts=2024-01-02, cnt=10000  (stable for any batch_size)

Anything Else?

Suggested fix: the Defer must not treat _get_enough_data() == true as
"input exhausted". Introduce a bool finished flag that is set to true
only when (a) the loop hits the boundary of the next distinct group with
the limit already satisfied (the if (_get_enough_data()) return; branch),
or (b) the input queue is fully drained. The Defer then signals EOS only
when finished is true (or when no rows were emitted at all):

Status PartitionSorter::_read_row_rank(Block* output_block, bool* eos, int batch_size) {
    ...
    size_t merged_rows = 0;
    bool finished = false;                          // NEW

    Defer defer {[&]() {
        if (merged_rows == 0 || finished) {         // CHANGED: was _get_enough_data()
            *eos = true;
        }
    }};

    while (queue.is_valid() && merged_rows < batch_size) {
        ...
        for (...; merged_rows < batch_size; ...) {
            ...
            if (!cmp_res) {
                if (_get_enough_data()) {
                    finished = true;                // NEW: real group boundary reached
                    return Status::OK();
                }
                *_previous_row = *current;
                _output_distinct_rows++;
            }
            ...
        }
    }

    if (!queue.is_valid()) {                         // NEW: input fully drained
        finished = true;
    }
    return Status::OK();
}

With this fix the same group is drained across multiple get_next() calls
and the full dense_rank = 1 group is returned for any batch_size.
Verified on a 3-BE cluster: all of batch_size = 256 / 1024 / 2048 / 10240 / 20480 return the correct, stable row count after the fix.

I'm happy to submit a PR with the fix and BE unit tests covering both
DENSE_RANK and RANK with a first group larger than batch_size.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions