Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 38 additions & 2 deletions datafusion/physical-plan/src/joins/join_hash_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,12 +400,48 @@ where
if map.len() == next_chain.len() {
let start = offset.0;
let end = (start + limit).min(hash_values.len());
for (i, &hash) in hash_values[start..end].iter().enumerate() {
let hash_slice = &hash_values[start..end];

// Process in batches of 8 to exploit memory-level parallelism.
// Creating iter_hash() triggers Group::load (the cache-miss-inducing
// control byte read). By creating 8 iterators before consuming any,
// the CPU's out-of-order execution can overlap these memory accesses.
let chunks = hash_slice.chunks_exact(8);
let remainder = chunks.remainder();
let mut chunk_offset = start;
for chunk in chunks {
// Phase 1: Create all iterators (front-loads 8 cache misses)
let mut iters = [
map.iter_hash(chunk[0]),
map.iter_hash(chunk[1]),
map.iter_hash(chunk[2]),
map.iter_hash(chunk[3]),
map.iter_hash(chunk[4]),
map.iter_hash(chunk[5]),
map.iter_hash(chunk[6]),
map.iter_hash(chunk[7]),
];
// Phase 2: Consume iterators (control bytes now in cache)
for j in 0..8 {
let hash = chunk[j];
for (h, idx) in &mut iters[j] {
if hash == *h {
input_indices.push((chunk_offset + j) as u32);
match_indices.push((*idx - one).into());
break;
}
}
}
chunk_offset += 8;
}
// Handle remainder
for (i, &hash) in remainder.iter().enumerate() {
if let Some((_, idx)) = map.find(hash, |(h, _)| hash == *h) {
input_indices.push(start as u32 + i as u32);
input_indices.push((chunk_offset + i) as u32);
match_indices.push((*idx - one).into());
}
}

return if end == hash_values.len() {
None
} else {
Expand Down
Loading