diff --git a/datafusion/physical-plan/src/joins/join_hash_map.rs b/datafusion/physical-plan/src/joins/join_hash_map.rs index 8f0fb66b64fb..0d5765374dc4 100644 --- a/datafusion/physical-plan/src/joins/join_hash_map.rs +++ b/datafusion/physical-plan/src/joins/join_hash_map.rs @@ -400,12 +400,48 @@ where if map.len() == next_chain.len() { let start = offset.0; let end = (start + limit).min(hash_values.len()); - for (i, &hash) in hash_values[start..end].iter().enumerate() { + let hash_slice = &hash_values[start..end]; + + // Process in batches of 8 to exploit memory-level parallelism. + // Creating iter_hash() triggers Group::load (the cache-miss-inducing + // control byte read). By creating 8 iterators before consuming any, + // the CPU's out-of-order execution can overlap these memory accesses. + let chunks = hash_slice.chunks_exact(8); + let remainder = chunks.remainder(); + let mut chunk_offset = start; + for chunk in chunks { + // Phase 1: Create all iterators (front-loads 8 cache misses) + let mut iters = [ + map.iter_hash(chunk[0]), + map.iter_hash(chunk[1]), + map.iter_hash(chunk[2]), + map.iter_hash(chunk[3]), + map.iter_hash(chunk[4]), + map.iter_hash(chunk[5]), + map.iter_hash(chunk[6]), + map.iter_hash(chunk[7]), + ]; + // Phase 2: Consume iterators (control bytes now in cache) + for j in 0..8 { + let hash = chunk[j]; + for (h, idx) in &mut iters[j] { + if hash == *h { + input_indices.push((chunk_offset + j) as u32); + match_indices.push((*idx - one).into()); + break; + } + } + } + chunk_offset += 8; + } + // Handle remainder + for (i, &hash) in remainder.iter().enumerate() { if let Some((_, idx)) = map.find(hash, |(h, _)| hash == *h) { - input_indices.push(start as u32 + i as u32); + input_indices.push((chunk_offset + i) as u32); match_indices.push((*idx - one).into()); } } + return if end == hash_values.len() { None } else {