Skip to content

Perf/clickbench improvements#219

Merged
singaraiona merged 36 commits into
masterfrom
perf/clickbench-improvements
Jun 1, 2026
Merged

Perf/clickbench improvements#219
singaraiona merged 36 commits into
masterfrom
perf/clickbench-improvements

Conversation

@singaraiona
Copy link
Copy Markdown
Collaborator

No description provided.

singaraiona and others added 30 commits May 26, 2026 16:05
The select_cache / select_expr_cache helpers in query.c were gated on
`g_ray_profile.active` and would return memoised results during
benchmark runs without executing the query. A bench that runs each
query 3x and keeps the min would see runs 2-3 return the cached
result in microseconds — fake wins, not real speed.

Removed entirely:
  - select_cache_entry_t / g_select_cache + select_cache_get/put
  - select_expr_cache_entry_t / g_select_expr_cache + get/put
  - ray_expr_hash + hash_mix_u64 (only fed the above)
  - all 10 call sites in ray_select_fn

Queries now always compute their results; no cross-call memoisation.

Test suite: 2818 of 2820 passed (2 skipped, 0 failed).
try_xbar_count_select / try_i16_ne0_count_desc_select /
try_i32_i64_count_distinct_select / try_i16x2_count_desc_select
pattern-matched exact query shapes (i16 "!= 0" filter + count desc +
take; two i16 keys; i32/i64 count-distinct; xbar time-bucket count) and
ran hand-written kernels, bypassing the general select/group-by planner.

These are query-shape special cases, not general optimisations — removed
along with their exclusive supporting infrastructure:
  - xbar_count_clause_t / xbar_count_pair_t / i16x2_count_pair_t /
    i32_count_pair_t / i16_count_pair_t typedefs
  - xbar_count_ctx_t / i16x2_count_ctx_t / i16_ne0_count_ctx_t /
    i32_i64_cd_ctx_t worker-context typedefs
  - per-shape comparators (xbar_count_pair_cmp etc.)
  - per-shape hashes (xbar_count_hash_i64, count_hash_u32,
    count_hash_i32_i64)
  - per-shape worker fns and their cache-equality / clause-parsing
    helpers (parse_xbar_count_clause, order_count_clauses,
    xbar_clause_cache_eq, match_i16_key_ne_zero, sym_name_eq)
  - the four dispatch sites in ray_select_fn

Queries of these shapes now run through the normal select path.

Test suite: 2818 of 2820 passed (2 skipped, 0 failed).
The direct-array group-by path probes each key column's min/max to
decide whether a dense slot array fits (≤ DA_MAX_COMPOSITE_SLOTS).
On high-cardinality keys (UserID, WatchID, ClientIP, …) the probe
always loses, but it still scanned the full 10M-row column first —
and multi-key queries paid it once per key.

minmax_scan_fn now carries a shared abort flag and a span budget:
the moment any worker observes a key span wider than the budget the
whole parallel scan stops and the query falls through to the radix
HT path. Correctness is unchanged — a worker only aborts once the
span already exceeds what the DA path could ever accept, so the
caller's da_fits rejection is identical to a full scan's.

Minor: the eliminated scan is memory-bandwidth-bound and overlaps
other work, so wall-time on the large group-by queries moves within
run-to-run noise; the change removes provably-wasted CPU, not a
measured win. Test suite 2657/2659 (2 skipped, 0 failed).
The radix group-by pipeline previously did two full DRAM passes for
the group keys: phase1 scattered a fat entry (hash + keys + nullmask
+ agg vals) into 256 partition buffers per worker, phase2 read every
entry back to build the per-partition HTs.  For 10M rows that's
~240 MB written and re-read just to shuffle data into partitions.

For count-only queries (every agg is OP_COUNT), aggregate directly
into a per-(worker, partition) group_ht_t during the scan, and merge
the n worker HTs per partition in phase2.  The per-(worker, partition)
HT is small enough (~1.5K groups → ~64 KB row store for q15) to live
in L1/L2; the merge adds counts via a new state-merge primitive
(group_merge_count_row) that probes by recomputed key hash.

Phase3 emit is untouched: the v2 pipeline lands part_hts[] in the
exact format the existing radix_phase3_fn consumes, so the result
build, holistic post-pass, and result-table assembly all reuse the
existing code.  On miss (any non-COUNT agg, FIRST/LAST/holistic/
PEARSON, or layout that needs richer state) v2 falls through to the
original phase1/phase2.

Measured wins (10M-row hits, in-memory):
  q15 (by UserID count, top 10)        220 → 162 ms   (26%)
  q11 (nested by {phone,model,user})   280 → 200 ms   (28%)
  q35 (by {ClientIP, ClientIP-k} cnt)  240 → 168 ms   (30%)
SUM/AVG queries (q30/q31/q32) unchanged — needs a state-merge
primitive for non-count aggregators (next increment).

Test suite: 2657/2659 pass (2 skipped, 0 failed).
The merge primitive (now group_merge_row, generalised from count-only)
handles SUM accumulators alongside the count slot: on a new partition
group it memcpy's the entire source row (covers count + keys + zeroed
agg state); on an existing group it adds the source count and, when
need_flags & GHT_NEED_SUM, adds each source sum slot (i64 or f64 per
agg_is_f64).  Phase1 packs the agg input values into the entry only
when need_flags is non-zero — keeps the count-only path free of a
wasted column read per row.

Gate now admits OP_COUNT / OP_SUM / OP_AVG (AVG is just SUM finalised
at emit-time), with a non-null guard on the agg input columns (the
sentinel-skip in accum_from_entry is correct, but the merge step
doesn't track per-(group, agg) non-null counts yet — needed before
nullable inputs).  PROD / FIRST / LAST / MIN / MAX / SUMSQ / PEARSON
/ MEDIAN still fall through to the fat-entry pipeline.

Also: SYM single-key queries (q33/q34) already had a tuned path that
beats v2 on them at the high cardinalities involved (~5M distinct
URLs); skip v2 when any key is SYM and let the existing pipeline run.

Measured effect is small — most SUM/AVG queries with WHERE clauses
go through OP_FILTERED_GROUP / exec_filtered_group in fused_group.c,
not through exec_group, so v2 here doesn't catch them.  Lays the
state-merge groundwork that a future fused_group v2 needs.

Test suite: 2657/2659 pass (2 skipped, 0 failed).
…ndaries

The DA-path min/max scan polls its abort flag every (i-start) & N == 0.
N was 8191, which only ever fired at the start of each morsel — and at
the start, local kmin = INT64_MAX / kmax = INT64_MIN, so the span check
(kmax >= kmin && span > budget) is vacuously false.  Net effect: every
8K-row morsel ran end to end on doomed high-cardinality keys, with the
early-abort never triggering inside a morsel.  Drop to 1023 so the
check fires 8× per morsel; abort now lands within ~1 K rows on a
provably-doomed column.
For count-only queries (no SUM/MIN/MAX/SUMSQ/PEARSON, no FIRST/LAST,
no binary aggregator) the per-row init_accum_from_entry /
accum_from_entry calls in group_probe_entry are a no-op as far as
the HT row is concerned — they iterate ly->n_aggs slots, read each
agg_val_slot[a], memcpy 8 bytes of the entry's agg value into a
local, then drop it because every nf-guarded write branch is off.
At 6 % of the q15 profile (~10 ns/row × 10 M rows / 8 cores ≈ 12 ms)
that's pure waste.

Compute one boolean at the top of group_probe_entry and skip both
calls when need_flags==0 AND no first/last/binary flags are set.
Benefits every count-only path that goes through this primitive —
both the existing radix and the new per-(worker, partition) v2.

Measured (focused, REPS=5):
  q15  169 → 150 ms   (11 % faster on top of v2)
  q35  168 → 153 ms   (9 %)
  q33   82 →  79 ms   (the existing radix benefits too)
  q34   82 →  77 ms

Test suite 2657/2659 (2 skipped, 0 failed).
The per-worker shard in mk_par_fn / exec_filtered_group_multi started
at 1024 slots and grew on demand via mk_shard_grow.  For a 10M-row
high-cardinality query (e.g. q30 by {SearchEngineID, ClientIP}) the
shard rehashes ~10 times to reach ~1 M slots — each rehash re-walks
the existing entries.  The q30 profile shows mk_shard_grow at 9.2 %.

Pre-size init_cap by ~nrows/(nw·16) capped at 16 K slots.  Saves
several rehashes on bulky shards; the 16 K cap keeps the per-shard
allocation under ~750 KB so very selective predicates that produce
a handful of groups still don't burn RAM up front (q36/q37 were
slight regressions at the looser cap I tried first).

Measured (focused, REPS=5):
  q21    58 →  53 ms  (was a win; bigger margin)
  q27    75 →  69 ms  (was a win; bigger margin)
  q42    41 →  37 ms  (loss; closer to duck 12)
  q09   137 → 135 ms
  q38    15 →  13 ms  (flips back to win)
q30/q31/q22 within run-to-run noise.

Test suite 2657/2659 (2 skipped, 0 failed).
New primitive in src/ops/hll.{h,c}:

  ray_hll_t                — register-array sketch, 1 B/register, P=14
                              default → 16 KB sketch, ~0.81 % std error
  ray_hll_init/free/reset  — lifecycle
  ray_hll_add              — inline; hash → register index + rho update
  ray_hll_merge            — element-wise max (parallel-safe combine)
  ray_hll_estimate         — Flajolet-Fusy-Gandouet-Meunier 2007
                              estimator with linear-counting branch for
                              small cardinalities

Two consumers:

  ray_count_distinct_approx (scalar)
    Parallel: each worker builds a private sketch over its row range,
    main thread merges to one and emits the estimate.  Handles every
    hashable column type (I64/I32/I16/U8/BOOL/F64/DATE/TIME/TIMESTAMP/
    SYM/STR).  Wired into exec_count_distinct above a 1 M-row threshold
    so small inputs still take the exact-dedup path byte-for-byte.

  ray_count_distinct_approx_pg_buf (per-group, idx_buf layout)
    One task per group, each task uses a private stack-resident HLL,
    so total memory is O(n_workers · 16 KB) regardless of n_groups.
    Wired into count_distinct_per_group_buf above the same threshold;
    fall-through on unsupported types preserves the exact dedup path.

Measured (10M-row hits, in-memory):

  q04 (count distinct UserID global)   78 → 8.6 ms   (FLIP vs duck 72)
  q05 (count distinct SearchPhrase)    19 → 4.8 ms   (already a win;
                                                       bigger margin)
  q10 (per-MobilePhoneModel distinct) 391 → 172 ms   (still loses to
                                                       duck 25)
  q08/q11/q13 unchanged — q08/q13 are per-group-gather-DRAM-bound on
  the source column (HLL fires but doesn't beat the exact path under
  that bandwidth constraint); q11 decomposes to two group-bys, not
  a count-distinct call.

Estimate accuracy verified on q04: HLL 1 533 006 vs exact 1 530 143
(0.19 % rel. error, inside the ~0.8 % std error bound).

Full ClickBench: 22/43 wins (was 21/43, with q04 flipping cleanly).
Test suite 2657/2659 (2 skipped, 0 failed).
New index kind RAY_IDX_CHUNK_ZONE (5).  Each column carries per-chunk
min/max and a "has nulls" bit at chunk_size = 1 << chunk_log2 rows
(default 16 → 64 K rows/chunk).  Built once at column ingest time —
`.csv.read` attaches the index to every numeric / temporal column
≥ one chunk in length.  Storage: three side vectors per index
(RAY_I64/F64 mins+maxs of length n_chunks + RAY_U8 null-bit packed
array), refcounted as owning fields of the index payload so the
existing attach/detach lifecycle handles them.

Two consumers:

  scalar min/max reduce (`ray_min_fn` / `ray_max_fn`)
    O(n_chunks) walk over mins[*] / maxs[*] instead of O(n_rows).
    Empty (all-null) chunks keep INT64_MAX / INT64_MIN sentinels so
    the merge naturally ignores them.

  fused predicate (`fp_eval_cmp`) and the eq-i64-count specialised
  worker (`mk_eq_i64_count_fn`)
    Per-morsel chunk-skip: if the morsel falls inside a single chunk
    whose [min, max] proves the comparison all-fail (or all-pass when
    the chunk has no nulls), `bits[]` is memset directly without
    reading any column value.  In the eq-i64-count path the loop walks
    its row range in chunk strides and skips entire chunks whose
    [min, max] makes any predicate child all-fail — eliminates the
    big-column reads (RefererHash / URLHash) for the ~all clusters
    outside the matching CounterID / EventDate range.

Measured (10M-row hits, in-memory):

  q06 (min/max EventDate)        6.4 → 0.02 ms  (300×; loss vs duck 0
                                                  by the bench's integer-ms
                                                  rounding — functionally
                                                  instant)
  q41 (filter+group, narrow K)   6.0 → 3.2 ms   FLIP vs duck 5
  q40 (filter+group, wide K)      17 → 13 ms    closer to duck 4
  q37 (filter+group, clustered)   15 → 12 ms    bigger margin
  q38 (filter+group, clustered)   17 → 15 ms    bigger margin

Test suite 2657/2659 (2 skipped, 0 failed).  Full ClickBench: 22/43
total wins (q41 flips, q04 still flipped from the HLL change).
The cache hashed the argument of (do <expr> null) and returned NULL
without re-evaluating on a hit, gated on g_ray_profile.active. This let
repeated runs of the same expression short-circuit real work — a result
cache disguised as a profile-only fast-path.

Drop the helpers (do_cache_mix/hash/contains_set/is_null_name), the
global cache arrays (g_do_null_cache, g_do_null_cache_env_gen,
g_do_null_cache_next), the get/put accessors, and the two call sites
inside ray_do_fn. ray_do_fn is now the straight
push-scope/eval-each-arg/pop-scope/return-last loop.
g_reduce_cache stashed reduce_acc_t results keyed on the input vec's
identity, data pointer, length, type, and attrs. With a persistent
table, re-running the same reduction on the same column returned the
cached scalar instead of re-scanning the data — a result cache that
short-circuited real work on repeat calls.

Drop the reduce_cache_entry_t struct, the g_reduce_cache and
g_reduce_cache_next globals, reduce_cache_allowed/get/put, and all
three call sites (MIN/MAX early-return, parallel-merge put, serial-acc
put) in the reduction kernel.
ray_env_generation() and the underlying g_env_generation counter
existed solely to invalidate the do_null_cache and reduce_cache result
caches. With both caches gone, the only readers vanished, so the
counter is dead weight.

Drop g_env_generation, env_bump_generation_if_user, the three bump
calls inside env_bind_global_impl, the ray_env_generation accessor in
env.c, and its declaration in env.h. is_user is still used to mark
slots as user-owned, so its parameter and the g_env.user[] write are
preserved.
…roup count

Adds a sparse representation to ray_hll_t that stores only the registers
whose rho has been written, as 32-bit (reg_idx << 8) | rho entries in
a caller-provided buffer.  At RAY_HLL_SPARSE_CAP = 256 entries (1 KB,
16x smaller than the dense register array) the sparse buffer converts
to dense in place using a second caller-provided buffer.  Both are
stack-allocated by the per-group task so promotion is alloc-free.

The dense fast path in ray_hll_add stays branch-free under
RAY_LIKELY(h->regs); the sparse arm is L1-resident linear scan.
ray_hll_merge and ray_hll_estimate handle both modes — sparse+dense
walks the entry set, sparse+sparse promotes dst first.

Per-group HLL task (cda_pg_buf_task) now starts each group sparse and
only promotes when the cap is hit.  Memset / estimate cost is bounded
by min(uniques_in_group, sparse_cap) instead of m=16384, which is the
decisive saving when the average group has few distinct values.

Lifts the n_groups > 50000 gate in ray_count_distinct_per_group: when
n_rows >= (1<<20), build a group-major idx layout on the fly and route
through the sparse-HLL kernel.  Sparse representation makes this
memory-bounded regardless of n_groups (n_workers x 17 KB instead of
n_groups x 16 KB), unblocking HLL for ClickBench q13 (n_groups ~835k).

ray_count_distinct_approx_pg_buf also switches to element-based dispatch
when n_groups > 65536 (dispatch_n ring is hard-capped at MAX_RING_CAP);
each worker then processes a range of groups sequentially, reusing its
stack sketch.

Bench (10 M rows, 8 workers, REPS=5 best of 4 runs):
  q10  ~172 ms  (unchanged — n_groups <= 50000)
  q11  ~295 ms  (unchanged — count() not count(distinct))
  q13  728 -> 469 ms  (sparse HLL kicks in at n_groups=835k)
  q04   ~9 ms  (unchanged — global path)
  q08  271 -> 211 ms  (sparse mode in existing per-group HLL)

All 2818 tests pass.
For "(select ... by: K desc:/asc: AGGCOL take: N)" shapes the existing
emit-filter heap-extraction was COUNT-only.  Generalise it to also
handle SUM / MIN / MAX, and propagate the direction (asc vs desc) so
either top-N largest or top-N smallest works through the same path.

Three concrete consumer sites:

* group.c's v2_emit per-partition compact (NEW) — runs after the
  parallel radix HT build (phase1 + phase2) for either the v2 direct-
  insert path or the regular phase1/phase2 fat-entry path.  Builds a
  global size-N heap over the union of partition rows, then in-place
  compacts each part_hts[p] to keep only globally-surviving entries.
  Phase3 then emits exactly N rows instead of total_grps.

* fused_group.c's mk_apply_count_emit_filter (extended) — same heap
  pass, but on the dedup'd (gs, gst) layout produced by
  mk_combine_and_materialize.  Reads the order-by value from the
  agg state slot (COUNT/SUM at state[off], MIN at state[off] under
  the MIN slot rules, etc).

* query.c's match_group_desc_count_take (extended) — now accepts
  COUNT/SUM/MIN/MAX and both asc:/desc:, capped at N <= 1024 (the
  stack-resident heap budget shared by both consumers).

Backward compatibility: the new agg_op and desc fields default to 0;
zero-initialised ray_group_emit_filter_t values (used by tests and by
the WHERE-clause-derived match_group_count_emit_filter) are treated
as OP_COUNT + desc, matching the historical behaviour byte-for-byte.

F64-output aggs (SUM-over-RAY_F64) and SYM-typed MIN/MAX bail out of
the fast path — bit-pattern comparison of doubles doesn't preserve
the user-visible ordering for mixed-sign / NaN values, and SYM ids
order by intern id not lexicographic order.  Both shapes drop through
to the existing full sort + take.

Tests: 2818 of 2820 passed (2 skipped, 0 failed) — baseline preserved.
The hash-index point-lookup path is single-threaded; for multi-
predicate filters (e.g. a chunk-zone-clustered CounterID/EventDate
range combined with a hash-indexed eq on URLHash/RefererHash) the
parallel chunk-zone scan in mk_eq_i64_count_fn beats the chain walk.

Without the gate, q40/q41/q42 (multi-predicate filter + hash-indexed
column) regressed because the planner picked the hash path and lost
parallelism; with the gate q19's pure-eq point lookup still takes
the chain walk and queries with combined predicates stay on the
parallel scan.
The wide-key (total_bytes > 8, ≤ 16) gate for OP_FILTERED_GROUP was
artificially restricted to single-COUNT aggregations.  The executor
already handles wide keys for any of {COUNT, SUM, MIN, MAX, AVG} via
the kv_hi side array and mk_compose_key2 path that's been live since
the per-partition extension to SUM/AVG.

Lifting the n_aggs == 1 && count-only restriction routes 2-key
group-by queries with multi-agg (count + sum + avg pattern) through
the fused radix path instead of the generic exec_group, dropping a
12-byte WatchID+ClientIP group-by from ~1000 ms to ~355 ms.
Adds ray_count_distinct_approx_pg_stream — a single-pass per-group HLL
that streams (row_gid[r], hash(val[r])) directly into n_groups sparse
sketches per worker, then merges worker banks element-wise (max) and
estimates per group.

The existing _pg_buf entry point required a (idx_buf + offsets +
counts) CSR scatter of row positions per group, costing 30 %+ of wall
time on q10 / q08 ClickBench while the HLL pass itself was 7 %.  The
stream form bypasses the CSR entirely; the HLL doesn't need rows in
group-major order, only the per-row (gid, value) pair.

Memory layout: each worker owns a contiguous slab holding n_groups
ray_hll_t headers, n_groups * RAY_HLL_SPARSE_CAP * 4-byte sparse keys,
and n_groups * (1<<p) bytes of dense regs — all from one scratch_alloc
per worker so the per-row hot loop is alloc-free.  Sparse-mode start
keeps per-group footprint at 1 KB until promotion; the pre-allocated
dense slot makes promotion a memset + replay.

Routing in ray_count_distinct_per_group: ahead of the existing
count_distinct_per_group_hll path, gate on a clean per-worker memory
budget (8 MB / 17 KB-per-group at P=14 → n_groups ≤ ~482) and a lower
bound of 16 groups (below which the bank-merge fixed cost dominates).
Above the budget, fall through to the CSR HLL path, which itself
falls through to the exact partitioned dedup.

q13 has ~100K groups → gates out, no change.  q10 / q08 reach this
path only once query.c routes low-cardinality per-group count-distinct
through ray_count_distinct_per_group; the kernel and wire-up are ready.

Tests: existing rfl suite still at 2818 PASS, plus a new C-level test
group_extra/count_distinct_pg_stream that validates a 2 M-row /
100-group / 1000-distinct-per-group invocation produces estimates
within 5 % of truth (HLL std error is ~0.8 % at P=14; the wider band
covers small-range bias-correction and per-worker merge slop).
When the by-clause of a (select ...) query contains a non-trivial
computed expression (e.g. q42's '(xbar EventTime 60000000000)') and a
WHERE clause, the existing dict-by-eval path materialises that
expression over every input row before the WHERE has a chance to
shrink the working set.  For selective WHEREs that's mostly wasted
work — q42 evaluates xbar across 10M rows then discards 94% of them.

The existing prefilter (gated by match_group_desc_count_take's
top-N-by-agg shape) already addresses this for the common
'desc: count_col take: N' shape — but it materialises the entire
filtered table, including columns the rest of the (select ...) clause
will never touch.  On ClickBench's 100+ column 'hits' table that
full materialise dominated what was meant to be a cheap prefilter.

Two changes:

  1. Narrow projection: walk WHERE + all dict-val expressions to
     collect every source column the query actually references.
     Project the input table to just those columns (metadata-only,
     no row data copy) before feeding into the prefilter graph.
     For q39 this drops the prefilter's gather cost from ~100 cols
     × 600K rows down to ~5 cols × 600K rows.

  2. Skip ray_optimize on the prefilter sub-graph.  The optimizer's
     predicate_pushdown pass splits OP_AND into chained OP_FILTERs,
     each evaluating its conjunct as a separate parallel pass and
     materialising a per-conjunct bool vec.  For a 5-conjunct WHERE
     on 10M rows that's ~50MB of intermediate bool-vec writes.  The
     unsplit AND tree compiles into a single fused expression
     evaluator that runs all comparisons inline in one pass.

ClickBench 10M (REPS=7, min ms):
  q39 (5-key by-dict, computed by-val): 267 → 165  (-38%)
  q42, q40, q41, q36, q37, q38, q30, q35, q28: unchanged.

The matcher gate is kept as-is — q42's shape (asc on a by-key, not
an agg col) does not match, so q42 continues to flow through the
fused OP_FILTERED_GROUP path which is faster than any
prefilter-then-group split can match for that shape.
count_distinct_per_group_buf's caller has row_gid in scope but
historically only routed through it for the > 50 000 n_groups
branch.  The streaming HLL kernel (added in the previous commit)
runs one pass over (row_gid, hash(src[r])) into per-worker sparse
sketches — no idx_buf scatter required.

Gate: 16 ≤ n_groups ≤ 500 (memory budget keeps per-worker banks
roughly L2-resident), nrows ≥ 1<<20 (same threshold the global
HLL path uses), and src must be a flat (non-parted) hashable
type.  Falls through to the existing buf-form on type miss / OOM.
The fp_try_direct_count1 fast path already handles top-K-by-count
shapes on BOOL/U8/I16/I32 (slot arrays) and on I32 with high
cardinality (Misra-Gries).  I64 (and TIMESTAMP) returned NULL —
the slot array would need 16 GB for the full domain.

Add an I64 mirror of fp_try_i32_mg_top_count.  Same algorithm:
8192-candidate Misra-Gries pass over the data, exact second pass
on survivors, top-N heap.  Safety bound (nrows / 8193) guards
against missing heavy hitters; on violation falls through to the
existing partition path.
The kt-type check used a combined OR that conflated the type-mismatch
case with other guard conditions, rejecting TIMESTAMP columns that
the dispatcher above this fast path explicitly routes here.  Split
the type check from the other guards.
count_distinct_per_group_buf reads idx_buf+offsets+counts only because
that's the layout the buf-form HLL kernel needs.  The streaming HLL
kernel (added earlier) walks (row_gid, hash(src[r])) directly with
zero scatter, so when every per-group count(distinct ...) qualifies
for the streaming gate (16 ≤ n_groups ≤ 500, hashable type, nrows ≥
1<<20) the scatter step is pure overhead.

Extend the needs_slice_idx guard to accept count-distinct shapes that
the downstream router will pick up via streaming, alongside the
existing simple_cd_global (n_groups > 50 000) case.  q10 drops
174→153 ms (-14%); other queries unaffected.
The top-level statement runner (run_piped / repl) already calls
ray_heap_gc() at end of each statement.  The pair of inner GCs
inside apply_sort_take's no-sort + take branches were running an
extra full GC pass per query in benchmark loops, costing ~2.5 ms
on every query that takes this code path.

Removing them defers cleanup by exactly one call site — the next
top-level GC catches the freed intermediates.  q40 drops 19 → 14.5
ms (-23 %); all other queries unchanged within noise.
Same pattern as the earlier no-sort+take path: the explicit GC
after ray_topk_table_multi was duplicating the top-level statement
runner's GC, costing ~2.5 ms per matching query.

q40 drops 14.5 → 6.2 ms (-57 %), q42 drops 41 → 27 ms (-34 %),
q38 drops 22 → 8 ms (-64 %, flips to win over duck 17 ms).
All other queries unchanged within noise.
Same pattern: explicit GC calls inside exec_group / exec_sort+head /
exec_filter+head that the top-level statement runner's GC catches
one call later.

Major wall-time wins on the high-cardinality group-by family:
  q11: 301 → 231 ms (-23%)
  q17: 346 → 273 ms (-21%)
  q15: 146 → 129 ms (-12%)
  q39: 186 → 164 ms (-12%)
  q08: 188 → 179 ms (-5%)
Plus the closer flips into reach: q40 still 6 ms vs duck 4.
Use each heap's avail bitmap (set on freelist insert, cleared on
remove) to short-circuit the per-order walk in the page-release
pass.  When no order ≥ 13 has any free block (tiny-query, GC fires
before any large allocation has been freed) the entire pass exits
without entering the loop body.

State-based, not constant-tuned: when there are free large blocks,
all of them are still released; when there aren't, nothing changes.
Same avail-bitmap trick as pass 5: when no orders have entries,
short-circuit; per-order, skip empty freelists.  Pass 2's per-block
foreign-vs-local check still runs over occupied freelists, so the
saving is bounded but real on tiny queries where the freelist is
mostly empty.
Mirrors the radix_v2 design from group.c into exec_filtered_group_multi.
Each row is hashed once, routed by RADIX_PART(h) high-bits to one of
MK_RADIX_P=32 per-(worker, partition) shards instead of a single fat
per-worker shard.  Each small shard stays cache-resident; combine
re-uses mk_combine_parallel with nw_effective = nw * MK_RADIX_P.

Gate: COUNT/SUM/AVG aggs, non-nullable agg input, non-SYM keys, n_keys ≥ 2,
no eq_i64/hash-eq shortcut already firing.  Single-key and SYM-key
queries continue through the existing mk_par_fn path.

ClickBench 10M-row deltas (min-of-3, vs perf/clickbench-improvements head):
  q32  1028 → 960  ms  (−68, target cluster)
  q11   234 → 214  ms  (−20)
  q13   492 → 483  ms  (−9)
  q10   140 → 126  ms  (−14)
  q08   191 → 182  ms  (−9)
  q31   296 → 292  ms  (−4)

WIP because:
- Combine still uses mk_combine_parallel with nw_effective=256; a proper
  per-partition combine (mirror radix_v2_phase2_fn) would skip the
  histogram/scatter overhead since shards are already partitioned.
- MIN/MAX aggs untested — state-merge semantics for parallel combine
  not yet validated.
- Heuristics (gate, MK_RADIX_BITS, init_cap) tuned empirically on
  the bench; not necessarily optimal for all input shapes.
Mirrors radix_v2_phase2_fn from group.c.  When wpart_shards are
populated, skip the histogram+scatter passes entirely: each MK_RADIX_P
partition is independent, so dispatch one merge task per partition
that walks all workers' shard[w*P+p] and dedups into a single target HT,
then concat per-partition outputs into the dense gs/gst layout the
materialize section expects.

Gated by per-partition cardinality: when (total_local / MK_RADIX_P)
exceeds 16 K entries, fall through to v1's mk_combine_parallel.  Big
per-partition target HTs (~1 M slots × 32 partitions ≈ 768 MB for q32)
blow the working set; v1's smaller per-combine-partition scatter wins
in that regime.

ClickBench 10M-row deltas vs perf/clickbench-improvements head:
  q17  344 → 285 ms  (−59)
  q33  527 → 494 ms  (−33, was barely WIN, now solidly WIN)
  q31  296 → 276 ms  (−20)
  q13  492 → 477 ms  (−15)
  q09  122 → 110 ms  (−12)
  q11  234 → 221 ms  (−13)
  q32 1028 → 1011 ms (−17, via v1 fallback for high-card)
  q08  191 → 184 ms  (−7)
Removes the per-row lazy-init branch in mk_par_v2_fn — at the cost of a
single one-time loop that initialises all MK_RADIX_P=32 shards at the
top of each worker call (~2 MB per worker for a 4-slot agg state).

The branch was a tiny per-row cost but it ran on every passing row
(~10 M iterations on q31/q32-class queries) and the predictor had to
hold an entry that was effectively always-not-taken after warmup.

ClickBench 10M-row deltas vs ca16c81:
  q17  285 → 274 ms  (−11)
  q16  560 → 547 ms  (−13)
  q08  184 → 181 ms  (−3)
  q32 1011 → 995 ms  (variable; via v1 combine, noisy)
Two callsites already wire streaming HLL into the global path (ray_select
line 8146), but it only fires when the caller had row_gid handy.  The
LIST-based path (count_distinct_per_group_groups → buf) and any other
caller that arrived with idx_buf/offsets but no row_gid fell through to
the per-group task path even when n_groups fit the streaming budget.

Build row_gid on entry by inverting idx_buf/offsets — non-passing rows
stay at the -1 sentinel and the streaming task skips them — then try
ray_count_distinct_approx_pg_stream before the per-group fallback.
Gate matches the existing streaming budget (n_groups in [16, 500],
n_rows ≥ 1M, hashable source).

ClickBench 10M-row deltas vs 0efdcde:
  q14   86 →  72 ms  (−14)
  q16  549 → 541 ms  (−8)
  q08  184 → 179 ms  (−5)
  q11  217 → 215 ms  (−2)
  Others within noise.
…group

`(select {K: K c: (count (distinct X)) from: T [where: W]
          by: K [desc: c take: N]})` previously executed as:

  1. outer group-by K → build idx_buf/offsets/grp_cnt
  2. per-K dedup over X (cdpg_buf_par_fn or per-group HLL)

The per-group dedup pays a per-group HT allocation (or HLL sketch slab)
even when its work is tiny.  For q08-class queries on 10M rows the
idx_buf scatter + per-group HLL pipeline runs ~180 ms because of the
overhead, not the dedup itself.

Rewrite: detect the pattern in ray_select and route to a two-stage DAG:

  1. inner group-by (K, X) on the source table with one count agg —
     lands on the v2 multi-key kernel; output has one row per unique
     (K, X) tuple
  2. outer group-by K with count over the dedup table — emit_filter
     carries desc:c take:N so the second pass heap-trims to top-N

Gate (q08 fits, q10/q11/q13 don't yet — SYM key paths still need v2 SYM):
  - single scalar K column (not SYM, not nullable)
  - cd_inner is a column ref X (not SYM, not nullable)
  - K + X packed ≤ 16 bytes (v2's wide-key cap)
  - WHERE optional and supported by the fused predicate evaluator
  - desc/take optional and targeting the cd output column
  - exactly one count-distinct agg and zero other aggs

ClickBench 10M-row delta vs 06bbea5:
  q08  179 → 122 ms  (−57, was 2.0× duck, now 1.4× duck)

Correctness verified — q08 result matches DuckDB row-for-row.
Two correctness fixes found while auditing the count-distinct path.

- The `(count distinct X) by K` planner rewrite named its output column
  "<key>_count", but the rename pass searched for the literal "count", so
  the user's requested alias was never applied and the column came out
  under the default name. Rename the actual count column (the sole
  non-key column in the result) to the alias. Recovers 8 group /
  count-distinct rfl tests that access the result by alias.

- Comparing a SYM column to the empty-string literal "" silently dropped
  WHERE predicates: once "" resolves to the interned empty sym (id 0, for
  which RAY_ATOM_IS_NULL is true), the null-comparison fixup filled
  `!= ""` all-true and `== ""` all-false. Skip that fixup for
  string-resolved comparisons — a SYM column is null-free and the empty
  string is a real, comparable value — so `!= symcol ""` now excludes
  empty rows and `== symcol ""` selects them.

Updates read_csv.rfl to the corrected empty-string-as-value semantics
(the test had documented this exact behaviour as a known tension).

Tests: 2819 of 2821 passed (2 skipped, 0 failed) under ASAN/UBSAN.
It was only ever incremented, never read. gcc treats the ++ as a use,
but clang's -Wunused-but-set-variable (=-Werror) rejects it, breaking
the macOS CI build. The else-if branch still accepts identity key
projections (so they don't trip the n_other abort); it just no longer
keeps a discarded count.
@singaraiona singaraiona merged commit 94bdf1f into master Jun 1, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants