Skip to content

feat: global file reorder in shared work queue for TopK optimization #21733

@zhuqi-lucas

Description

@zhuqi-lucas

Background

We now have several complementary optimizations for TopK queries on parquet:

  1. Dynamic work scheduling (Dynamic work scheduling in FileStream #21351, merged) — sibling FileStream partitions share a work queue and steal files from each other, ensuring no CPU sits idle.
  2. RG reorder by statistics (feat: statistics-driven TopK optimization for parquet (file reorder + RG reorder + threshold init) #21580, in progress) — reorders row groups within a file by min/max statistics so TopK reads the best RGs first.
  3. TopK threshold initialization from statistics (feat: initialize TopK dynamic filter threshold from parquet statistics #21712, draft) — initializes the dynamic filter threshold from RG statistics before reading any data.

When combining #21351 + #21580 (tested in #21731), the RG reorder shows minimal additional improvement over dynamic scheduling alone. This is because with multiple partitions reading files in parallel, one partition quickly finds good values "by luck" and updates the shared dynamic filter — making the precise intra-file RG ordering less impactful.

Problem

The shared work queue in #21351 (SharedWorkSource) uses the original file order — files are placed in the queue in whatever order they appear in file_groups. This means:

  • The first file picked by the first partition may have a poor value range
  • Threshold convergence depends on which partition happens to read a good file first
  • With many files of varying ranges, the "unlucky first pick" wastes parallel capacity

Proposed optimization

Sort files in the shared work queue by column statistics before any reading begins. For ORDER BY col DESC LIMIT K: put the file with the highest min value first. For ASC: lowest max first.

This ensures:

  1. The very first file read is the globally optimal one — tight threshold from the first RG
  2. Combined with TopK stats init (feat: initialize TopK dynamic filter threshold from parquet statistics #21712): threshold is set before reading a single byte and it's the global optimum
  3. All other partitions immediately benefit from the shared dynamic filter
  4. Subsequent files are already ordered by quality — if the first file's threshold prunes most RGs, the second-best file is next in line

Full optimization chain

Global file reorder (best file first in shared queue)
  → TopK stats init (threshold from RG stats before I/O)
    → RG reorder within file (best RG first)
      → Dynamic scheduling (idle partitions steal work)
        → Dynamic filter pruning (skip RGs/files below threshold)

Each layer builds on the previous: global file ordering ensures the optimal starting point, stats init avoids wasting I/O on the first file, RG reorder optimizes within-file order, dynamic scheduling keeps all CPUs busy, and the dynamic filter propagates the threshold globally.

Implementation sketch

In SharedWorkSource::from_config() (or a new constructor):

  1. Get the sort column and direction from the FileScanConfig's output ordering or from the DynamicFilterPhysicalExpr's sort_options
  2. For files that have PartitionedFile.statistics with min/max for the sort column:
    • DESC: sort files by column_statistics[sort_col].min_value descending (highest min first)
    • ASC: sort files by column_statistics[sort_col].max_value ascending (lowest max first)
  3. Files without statistics go to the end of the queue
  4. When preserve_order is true, skip reordering (correctness requirement)

Expected impact

  • Small data (SF=1): moderate improvement — fewer files to iterate before finding optimal threshold
  • Large data (SF=10/100): significant improvement — with hundreds of files, the difference between reading the best file first vs. a random file first determines whether 90% of subsequent files are pruned immediately or after several files
  • Combined with feat: initialize TopK dynamic filter threshold from parquet statistics #21712 (stats init): multiplicative — stats init on the globally-best file gives the tightest possible threshold without reading any data

Related

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions