Skip to content

Add non-allocating column hash/scatter primitives for batched sharding in query execution#106120

Open
harikrishnan94 wants to merge 4 commits into
ClickHouse:masterfrom
harikrishnan94:icolumn-shuffle-primitives
Open

Add non-allocating column hash/scatter primitives for batched sharding in query execution#106120
harikrishnan94 wants to merge 4 commits into
ClickHouse:masterfrom
harikrishnan94:icolumn-shuffle-primitives

Conversation

@harikrishnan94
Copy link
Copy Markdown

This PR implements the hash-production and scatter halves of the radix-shuffle API from #105936, and wires them into BufferedShardByHashTransform with a bytes-budget input batching mode.

Motivation

Sharded aggregation (enable_sharding_aggregator) routes rows to per-shard aggregators via hash partitioning. The legacy path allocates a WeakHash32 per chunk, chains IColumn::getWeakHash32 (CRC32C-based) across key columns, then scatters each chunk independently with virtual dispatch per column type. That per-chunk allocation and dispatch overhead dominates for narrow keys and high throughput.

Changes

IColumn::computeHashInto — non-allocating per-row hash kernel writing 32-bit fmix32-based hashes into a caller buffer, with SIMD overrides for the common column types. Multi-column keys compose via in-place hashCombine32 without intermediate arrays.

DB::ColumnsScatter::scatter — batched physical split with O(1) type-index dispatch (no virtual call on the hot path). The caller passes one column from each of B pending chunks plus shared partition ids and precomputed per-shard row counts.

BufferedShardByHashTransform input batching — new setting shard_by_hash_input_batch_bytes (default 0 = legacy path). When set, the transform accumulates input chunks until a byte budget is reached, then flushes via ColumnsScatter once per column position. Recommended starting value: 2 MiB.

CleanupIColumn::getWeakHash32, WeakHash32, and WeakHash.h/.cpp removed; remaining consumers migrated to computeHashInto.

Partition routing hash changes from CRC32C to fmix32. Both consumers route in-memory per query, so per-key stable shard identity was never guaranteed. Verified grace_hash joins match default hash joins across common key types.

Measured on AWS c7i.metal-24xl: 100M-row sharded GROUP BY over Memory tables shows roughly 7–25% wall-time improvement depending on key width and cardinality at the 2 MiB budget.

Changelog category (leave one):

  • Performance Improvement

Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):

Speed up sharded aggregation by replacing per-chunk getWeakHash32 hashing with a non-allocating IColumn::computeHashInto kernel and batched column scattering. A new setting shard_by_hash_input_batch_bytes (default 0, recommended 2097152) enables bytes-budget input batching in BufferedShardByHashTransform; when set to 0 the legacy per-chunk path is used.

Documentation entry for user-facing changes

  • Documentation is added for new features
  • Documentation is changed for user-facing changes
  • Documentation is not needed

Closes #105936

Made with Cursor

@CLAassistant
Copy link
Copy Markdown

CLAassistant commented May 29, 2026

CLA assistant check
All committers have signed the CLA.

@clickhouse-gh
Copy link
Copy Markdown
Contributor

clickhouse-gh Bot commented May 29, 2026

Workflow [PR], commit [e817438]

Summary:

job_name test_name status info comment
Style check FAIL
cpp FAIL cidb
Build (amd_debug) DROPPED
Build (amd_asan_ubsan) DROPPED
Build (amd_tsan) DROPPED
Build (amd_msan) DROPPED
Build (amd_binary) DROPPED
Build (arm_debug) DROPPED
Build (arm_asan_ubsan) DROPPED
Build (arm_tsan) DROPPED
Build (arm_msan) DROPPED

AI Review

Summary

The PR replaces getWeakHash32 with IColumn::computeHashInto, adds ColumnsScatter::scatter, and wires optional batched input scattering into sharded aggregation. The direction is reasonable, but the current hash/scatter contracts are not representation-independent enough for query routing, and a few changed paths can route equal keys to different shards or materialize wrong values.

Findings

❌ Blockers

  • [src/Columns/ColumnNullable.cpp:85] ColumnNullable hashes the nested value before mixing the null map, so two SQL-equal NULL keys can hash differently when their hidden nested payloads differ. That breaks the sharded aggregation and grace_hash join invariant that equal keys must reach the same partition. Make null rows overwrite or combine a fixed null hash independent of the nested column, and add a regression with two null rows whose nested values differ.
  • [src/Processors/Transforms/BufferedShardByHashTransform.cpp:215] Batched scatter groups pending chunks only by column position, but ColumnsScatter::scatter dispatches from source_columns[0] and assumes every source has the same concrete representation. A position can mix ColumnVector and ColumnConst chunks for the same type; depending on order, the fast path can cast the later chunk to the wrong concrete class or the fallback can clone a ColumnConst destination and then insert materialized values incorrectly. Normalize or split pending sources by concrete representation, or make ColumnsScatter::scatter detect mixed representations and fall back to a destination that can hold all rows.
  • [src/Columns/IColumn.h:396] The computeHashInto non-initial contract is split across implementations. Primitive columns combine the raw value with the prior hash, while transparent wrappers such as ColumnConst, ColumnLowCardinality, ColumnSparse, and ColumnReplicated combine a finalized nested hash. Equal multi-column keys can therefore get different hashes when one chunk has a materialized second key and another has a wrapped second key. Make the non-initial path canonical across primitive and wrapper columns, or delegate wrappers to the nested column's non-initial path row by row.

⚠️ Majors

  • [src/Columns/ColumnArray.cpp:337] ColumnArray relies on element folding to imply length, but fmix32(0) and fmix32Combined(0, 0) are both zero. Arrays such as [], [0], and [0, 0] all hash to zero and route to shard 0, which creates severe skew for common default-value array keys. Mix the array length explicitly before finalizing or combining the row hash.
Tests
  • ⚠️ Add focused regressions for Nullable null rows with different hidden nested payloads, batched scatter with mixed ColumnConst and materialized chunks, multi-key representation equivalence for wrapped versus full columns, and all-zero array keys. The current tests compare some batched and unbatched outputs, but they do not exercise these invariants.
Final Verdict

Status: ❌ Block

Minimum required actions: fix the ColumnNullable null hashing, make batched scatter safe for mixed concrete representations, make computeHashInto composition representation-independent, and add the focused regressions above.

@clickhouse-gh clickhouse-gh Bot added the pr-performance Pull request with some performance improvements label May 29, 2026
@harikrishnan94 harikrishnan94 changed the title Add non-allocating column hash/scatter primitives for batched sharded aggregation Add non-allocating column hash/scatter primitives for batched sharding in query execution May 29, 2026
@nickitat nickitat added the can be tested Allows running workflows for external contributors label May 30, 2026
@nickitat nickitat self-assigned this May 30, 2026
void ColumnNullable::computeHashInto(size_t row_begin, size_t row_end, uint32_t * hash_out, bool initial) const
{
// First, hash the nested column values into the buffer.
nested_column->computeHashInto(row_begin, row_end, hash_out, initial);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ColumnNullable hashes the nested value before mixing the null map, so two rows with null_map = 1 but different hidden nested payloads get different hash values. That violates the partitioning invariant for sharded aggregation and grace_hash joins: equal SQL keys (NULL) must always be routed to the same shard/partition. The previous getWeakHash32 path explicitly reset null rows to one fixed value, and row-wise updateHashWithValue also skips the nested value for nulls. Please make the null path overwrite/combine a fixed null hash independent of the nested column, and add a regression with two null rows whose nested values differ.

auto split = column->scatter(num_shards, selector);
for (size_t b = 0; b < num_batched; ++b)
col_ptrs_buf[b] = pending_input[b].chunk.getColumns()[c].get();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flushBatch batches columns by position only, but ColumnsScatter::scatter dispatches from source_columns[0] and assumes every source has the same concrete column class. The header only guarantees the same type, not the same representation; a single stream can contain a materialized UInt64 chunk followed by a ColumnConst(UInt64) chunk, for example after UNION/constant folding. If the materialized chunk is first, the fast path statically casts the later ColumnConst to ColumnVector in release; if the const chunk is first, the fallback clones a ColumnConst destination and then insertRangeFrom silently turns later materialized values into the first constant. Please normalize/split pending sources by concrete representation before the batched scatter, or make ColumnsScatter::scatter detect mixed representations and fall back to per-source scattering with destinations that can hold all values. Add a batched test mixing ColumnConst and ColumnVector for the same column position.

harikrishnan94 and others added 4 commits May 30, 2026 22:47
Implements the hash-production half of the radix-shuffle API proposed in
issue ClickHouse#105936.

`IColumn::computeHashInto(row_begin, row_end, uint32_t* hash_out, bool initial)`

A non-virtual-dispatch-per-call, non-allocating per-row hash that writes
32-bit `fmix32`-based hashes into a caller-provided buffer.  When
`initial == false` the buffer is updated in-place via the 32-bit
`boost::hash_combine` mixer, composing hashes across K key columns
without any intermediate allocation.

The pipeline that replaces `getWeakHash32`:

    col[0]->computeHashInto(0, n, buf, /*initial=*/true);
    for k in 1..K-1:
        col[k]->computeHashInto(0, n, buf, /*initial=*/false);
    selector[i] = (uint64_t{buf[i]} * P) >> 32;   // Lemire fastrange

No `WeakHash32` allocation, no `intHashCRC32` finalizer, no intermediate
`PaddedPODArray` per column.

- `src/Common/HashCombine32.h` (new) — `fmix32` (MurmurHash3 32-bit
  finalizer) and `hashCombine32` (32-bit boost::hash_combine mixer).

- `src/Columns/IColumn.h/.cpp` — virtual declaration + default fallback
  that wraps `getWeakHash32` (correct but still allocates; non-overriding
  types are unaffected).

- Overrides with `MULTITARGET_FUNCTION_X86_V4_V3` (scalar baseline +
  AVX2 + AVX-512 with `no-prefer-256-bit`) in:
  - `ColumnVector<T>` (UInt8–UInt64, Int8–Int256, Float32/64, UUID, IPv4/6)
  - `ColumnDecimal<T>` (Decimal32/64/128/256)
  - `ColumnFixedString`
  - `ColumnString`
  - `ColumnNullable(X)` (null byte participates; delegates to nested)

- `BufferedShardByHashTransform::generateOutputChunks` — updated to use
  the new API.  Drops `WeakHash32 hash(num_rows)` allocation per chunk,
  drops `hash.update(col->getWeakHash32())` chain, drops `JoinCommon::
  hashToSelector` call.  `hash_buffer` (reused `PaddedPODArray<UInt32>`)
  replaces the per-chunk `WeakHash32`.

`src/Columns/tests/gtest_compute_hash_into.cpp`:
- Row-range independence (split-call == full-call)
- `initial` flag semantics
- Multi-column composition determinism
- Sequential inputs produce distinct hashes (fmix32 avalanche)
- `ColumnNullable`: null vs. non-null rows with identical nested bytes
  hash differently
- `ColumnString` tail-handling (lengths 0–20)
- `ColumnFixedString` length participates in the hash
- Chi-squared uniformity for K=1 and K=4 with P=64

`src/Columns/benchmarks/benchmark_compute_hash_into.cpp`:
Compares `computeHashInto` vs `getWeakHash32` chain across
UInt32/UInt64/String/Nullable column types, K ∈ {1,2,4,8}, batch ∈
{1024,4096,16384}.

End-to-end `BM_HashAndFastrange_Old` vs `BM_HashAndFastrange_New` covers
the full pipeline as executed by `generateOutputChunks` (hash + Lemire
fastrange selector fill).

Partition assignment in `BufferedShardByHashTransform` changes (different
hash function family: CRC32C → fmix32).  Distribution uniformity is
preserved; no per-key stable shard identity was ever guaranteed.

Related: ClickHouse#105936
Co-authored-by: Cursor <cursoragent@cursor.com>
Implements the scatter half of the radix-shuffle API proposed in issue ClickHouse#105936
together with its consumer in `BufferedShardByHashTransform`.

`DB::ColumnsScatter::scatter(source_columns, pids_per_source, per_shard_rows, num_shards)`
is a batched, zero-virtual-dispatch physical-split primitive. The caller supplies
one `IColumn` from each of B pending chunks (same column-position across a batch),
along with a per-source span of `UInt32` partition-ids and the precomputed
per-shard row counts. An `O(1)` dispatch table keyed on the column's type index
selects the per-type kernel; there is no virtual call and no linear type switch on
the hot path.

Each typed kernel (`ColumnVector`, `ColumnDecimal`, `ColumnFixedString`,
`ColumnString`, `ColumnNullable`, `ColumnTuple`) follows the same shape: a single
`O(P)` setup loop that allocates destinations via `reserve_exact`, initialises a
stack-resident write-pointer cache and commits the final size via
`resize_assume_reserved`, then a hot inner loop `*wp[pids[j]]++ = src[j]` across
all B sources. `ColumnString` keeps the chars, cursor and offsets scatter
single-pass and uses `memcpySmallAllowReadWriteOverflow15` (safe because both
source and destination `PaddedPODArray<UInt8>` carry right-padding).
`ColumnNullable` scatters the null map via the `UInt8` kernel then recurses into
the nested column; `ColumnTuple` recurses per element. Types without a fast path
(`ColumnArray`, Map, LowCardinality, Sparse, ...) and `ColumnConst` fall back to
the legacy `IColumn::scatter` virtual.

`DB::ColumnsScatter::computeHistogram` returns the per-shard row counts as
`std::span<const UInt32>`; `flushBatch` computes them once per flush and passes
them to every `scatter` call, letting each kernel skip its internal pids re-scan.
The scatter kernels and `per_shard_rows` consistently use `UInt32` for per-shard
counts, matching the `UInt32` partition-id width.

Consumer - `BufferedShardByHashTransform` input batching:

The new setting `shard_by_hash_input_batch_bytes` (default 0 = legacy per-chunk
path) enables input batching. When non-zero the transform stashes input chunks in
`pending_input`, accumulating each chunk's `UInt32` pids into a shared buffer.
Once the accumulated input bytes reach the budget, or on input exhaustion,
`flushBatch` calls `ColumnsScatter::scatter` once per column-position across all
pending chunks, producing exact-sized per-shard `MutableColumns` with no per-chunk
KxP `IColumn` allocation.

The flush is bounded by accumulated bytes rather than a row count. Counting bytes
keeps each flush short and its per-shard destination buffers small for wide rows
(a long single-threaded flush starves the parallel downstream
`AggregatingTransform` instances and faults large transient buffers), while narrow
rows still accumulate until the budget is reached. Chunks are never split, so a
batch may overshoot the budget by at most one input chunk's bytes.

The `selector` member is a `std::variant<IColumn::Selector, PaddedPODArray<UInt32>>`:
the `UInt64` `IColumn::Selector` alternative serves the legacy path (`mapToRange`
writes `UInt64` directly, no widening copy), the `PaddedPODArray<UInt32>`
alternative is the shared batched pids buffer. `mapToRange` gains dual `UInt32` /
`UInt64` write overloads sharing the `MULTITARGET_FUNCTION_X86_V4_V3` machinery.

Measured performance:

Sweep on an AWS `c7i.metal-24xl` (Intel Xeon Platinum 8488C, Sapphire Rapids,
1 socket, 48 cores / 96 threads, ~188 GiB, `ap-south-2`). Workload: 100M-row
`SELECT keys, count() ... GROUP BY keys` over `Memory` tables with
`enable_sharding_aggregator=1` and `max_threads=16`. Key/row width was swept
8-64 B (1-8 `UInt64` key columns, spanning the `key64`/`key128`/`key256`/
serialized aggregation methods) and cardinality over 100 / 1e5 / 1e7 distinct
keys. Wall time is the median of 9-15 interleaved iterations vs the legacy
per-chunk path (`shard_by_hash_input_batch_bytes = 0`). Ranges below span the
three cardinalities at the recommended 2 MiB budget:

  -  8 B keys: -9% to -25%   (largest gain: low cardinality, -25%)
  - 16 B keys: neutral to -10%  (high cardinality is ~flat at 2 MiB; smaller
               budgets reach -15% there)
  - 32 B keys: -8% to -12%
  - 64 B keys: -7% to -10% at medium/high cardinality, neutral at low
               cardinality (within run-to-run noise)

The byte budget must stay bounded: an oversized 8 MiB budget reintroduces the
wide-row regression it exists to prevent (+16% on low-cardinality 64 B keys, and
4 MiB already starts to erode the wide-row wins), because each flush again grows
long enough to starve the downstream aggregators. 2 MiB is the robust optimum
across the sweep (best worst-case and best narrow-row peak), hence the documented
recommended starting value.

Also:
- `BuildQueryPipelineSettings` exposes `shard_by_hash_input_batch_bytes`.
- `benchmark_columns_scatter.cpp`: RFC sweep over K and P.
- `gtest_columns_scatter.cpp`: equivalence vs legacy for all fast-path types
  (including `Time64`), `ColumnArray` / `ColumnConst` fallback coverage, and
  per-key count assertions that catch misrouted rows.
- `04278_04101_sharded_aggregation_batched_input.sql`: batched vs legacy
  per-key-identical `GROUP BY` results across key types.

Closes ClickHouse#105936

Co-authored-by: Cursor <cursoragent@cursor.com>
`computeHashInto` now has overrides for every column type, so the legacy
`IColumn::getWeakHash32` and the `WeakHash32` PODArray wrapper are no longer
needed. `IColumn::computeHashInto` is made pure virtual and the two remaining
consumers (`JoinUtils::scatterBlockByHash` and `ScatterByPartitionTransform`)
now fill a `PaddedPODArray<UInt32>` via chained `computeHashInto` instead of
`WeakHash32`. `ColumnIndex::getWeakHash` becomes `ColumnIndex::computeHashInto`
(gather + combine). The CRC32C `updateWeakHash32` primitive stays; its default
initial value is relocated to `WEAK_HASH32_INITIAL_VALUE` in `Hash.h`.

Routing hash values change from CRC32 to the `fmix32` finalizer, which is safe
because both consumers route in-memory per query. Verified that `grace_hash`
joins match the default `hash` join across UInt/String/LowCardinality/Nullable
keys, and the parallel-window scatter tests still pass.

Co-authored-by: Cursor <cursoragent@cursor.com>
…owsPerShard`

Route partition scatter through the fast `ColumnsScatter::scatter` path with
`mapToRange` and pre-computed `rows_per_shard`. Rename `computeHistogram` and
`per_shard_rows`/`row_histogram` to `countRowsPerShard`/`rows_per_shard`.

Co-authored-by: Cursor <cursoragent@cursor.com>
@harikrishnan94 harikrishnan94 force-pushed the icolumn-shuffle-primitives branch from 2d5170d to e817438 Compare May 30, 2026 17:17
/// Much better implementation - to add offsets as an optional argument to updateWeakHash32.
hash_data[i] = static_cast<UInt32>(intHashCRC32(internal_hash_data[row], hash_data[i]));
uint32_t & out = hash_out[i - row_begin];
out = initial ? fmix32(acc) : fmix32Combined(acc, out);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ColumnArray says the array length is implicitly mixed, but if every element hash is zero the accumulator never changes: fmix32(0) is 0 and fmix32Combined(0, 0) is also 0. That makes [], [0], [0, 0], and similar all-zero arrays hash identically and map to shard 0, which defeats the new sharding hash for a common default-value key pattern. Please mix the array length explicitly, for example by seeding or finalizing acc with offsets_data[i] - prev_offset, and add the regression with repeated 0 elements because the current [7] test does not cover this case.

Comment thread src/Columns/IColumn.h
///
/// When `initial == false` the buffer is combined with the per-row hash,
/// composing hashes across multiple key columns without intermediate allocations:
/// hash_out[i] = fmix32Combined(h(row_begin + i), hash_out[i])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

computeHashInto needs representation-independent composition for equal SQL keys, but the current contract/implementations split into two different meanings. ColumnVector combines the raw value on initial = false (fmix32Combined(raw, prior)), while transparent wrappers such as ColumnConst, ColumnLowCardinality, ColumnSparse, and ColumnReplicated precompute the nested initial = true hash and then combine that finalized hash (fmix32Combined(fmix32(raw), prior)). For a composite key (k, c), a materialized UInt64 second column and a ColumnConst(UInt64) second column with the same value produce different hashes for the same k, so equal keys from different chunks can be routed to different aggregation shards or grace_hash partitions. Please make initial = false canonical across primitive and wrapper columns, or have transparent wrappers delegate to the nested column's non-initial path for each row instead of combining a finalized hash.

@nickitat
Copy link
Copy Markdown
Member

nickitat commented Jun 1, 2026

@harikrishnan94, pls fix the style check and merge conflicts to unblock the CI.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

can be tested Allows running workflows for external contributors pr-performance Pull request with some performance improvements

Projects

None yet

Development

Successfully merging this pull request may close these issues.

RFC: Reduce per-block IColumn allocation overhead during Shuffle with computeHashInto + scatter_into

3 participants