Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 238 additions & 0 deletions datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,76 @@ fn hash_fixed_list_array(
Ok(())
}

#[cfg(not(feature = "force_hash_collisions"))]
fn hash_run_array<R: RunEndIndexType>(
array: &RunArray<R>,
random_state: &RandomState,
hashes_buffer: &mut [u64],
rehash: bool,
) -> Result<()> {
// We find the relevant runs that cover potentially sliced arrays, so we can only hash those
// values. Then we find the runs that refer to the original runs and ensure that we apply
// hashes correctly to the sliced, whether sliced at the start, end, or both.
let array_offset = array.offset();
let array_len = array.len();

if array_len == 0 {
return Ok(());
}

let run_ends = array.run_ends();
let run_ends_values = run_ends.values();
let values = array.values();

let start_physical_index = array.get_start_physical_index();
// get_end_physical_index returns the inclusive last index, but we need the exclusive range end
// for the operations we use below.
let end_physical_index = array.get_end_physical_index() + 1;

let sliced_values = values.slice(
start_physical_index,
end_physical_index - start_physical_index,
);
let mut values_hashes = vec![0u64; sliced_values.len()];
create_hashes(
std::slice::from_ref(&sliced_values),
random_state,
&mut values_hashes,
)?;

let mut start_in_slice = 0;
for (adjusted_physical_index, &absolute_run_end) in run_ends_values
[start_physical_index..end_physical_index]
.iter()
.enumerate()
{
let is_null_value = sliced_values.is_null(adjusted_physical_index);
let absolute_run_end = absolute_run_end.as_usize();

let end_in_slice = (absolute_run_end - array_offset).min(array_len);

if rehash {
if !is_null_value {
let value_hash = values_hashes[adjusted_physical_index];
for hash in hashes_buffer
.iter_mut()
.take(end_in_slice)
.skip(start_in_slice)
{
*hash = combine_hashes(value_hash, *hash);
}
}
} else {
let value_hash = values_hashes[adjusted_physical_index];
hashes_buffer[start_in_slice..end_in_slice].fill(value_hash);
}

start_in_slice = end_in_slice;
}

Ok(())
}

/// Internal helper function that hashes a single array and either initializes or combines
/// the hash values in the buffer.
#[cfg(not(feature = "force_hash_collisions"))]
Expand Down Expand Up @@ -535,6 +605,10 @@ fn hash_single_array(
let array = as_union_array(array)?;
hash_union_array(array, random_state, hashes_buffer)?;
}
DataType::RunEndEncoded(_, _) => downcast_run_array! {
array => hash_run_array(array, random_state, hashes_buffer, rehash)?,
_ => unreachable!()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe return _internal_err like below (line 646) here instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I felt like the unreachable was more appropriate, as it's what the dictionary case does as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, makes sense! Andrew has nudged me to return an error instead of unreachable a couple of times, so I figured maybe it's a stance he takes generally, that's why I remembered to comment on it. But if the dict case does it, it makes sense to keep it!

(e.g. here, but I think that was slightly different apache/arrow-rs#8589 (comment))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no strong opinion, whatever we do we should be consistent

}
_ => {
// This is internal because we should have caught this before.
return _internal_err!(
Expand Down Expand Up @@ -803,6 +877,74 @@ mod tests {
create_hash_string!(string_view_array, StringArray);
create_hash_string!(dict_string_array, DictionaryArray<Int8Type>);

#[test]
#[cfg(not(feature = "force_hash_collisions"))]
fn create_hashes_for_run_array() -> Result<()> {
let values = Arc::new(Int32Array::from(vec![10, 20, 30]));
let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
let array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());

let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; array.len()];
let hashes = create_hashes(
&[Arc::clone(&array) as ArrayRef],
&random_state,
hashes_buff,
)?;

assert_eq!(hashes.len(), 7);
assert_eq!(hashes[0], hashes[1]);
assert_eq!(hashes[2], hashes[3]);
assert_eq!(hashes[3], hashes[4]);
assert_eq!(hashes[5], hashes[6]);
assert_ne!(hashes[0], hashes[2]);
assert_ne!(hashes[2], hashes[5]);
assert_ne!(hashes[0], hashes[5]);

Ok(())
}

#[test]
#[cfg(not(feature = "force_hash_collisions"))]
fn create_multi_column_hash_with_run_array() -> Result<()> {
let int_array = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7]));
let values = Arc::new(StringArray::from(vec!["foo", "bar", "baz"]));
let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
let run_array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());

let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut one_col_hashes = vec![0; int_array.len()];
create_hashes(
&[Arc::clone(&int_array) as ArrayRef],
&random_state,
&mut one_col_hashes,
)?;

let mut two_col_hashes = vec![0; int_array.len()];
create_hashes(
&[
Arc::clone(&int_array) as ArrayRef,
Arc::clone(&run_array) as ArrayRef,
],
&random_state,
&mut two_col_hashes,
)?;

assert_eq!(one_col_hashes.len(), 7);
assert_eq!(two_col_hashes.len(), 7);
assert_ne!(one_col_hashes, two_col_hashes);

let diff_0_vs_1_one_col = one_col_hashes[0] != one_col_hashes[1];
let diff_0_vs_1_two_col = two_col_hashes[0] != two_col_hashes[1];
assert_eq!(diff_0_vs_1_one_col, diff_0_vs_1_two_col);

let diff_2_vs_3_one_col = one_col_hashes[2] != one_col_hashes[3];
let diff_2_vs_3_two_col = two_col_hashes[2] != two_col_hashes[3];
assert_eq!(diff_2_vs_3_one_col, diff_2_vs_3_two_col);

Ok(())
}

#[test]
// Tests actual values of hashes, which are different if forcing collisions
#[cfg(not(feature = "force_hash_collisions"))]
Expand Down Expand Up @@ -1321,4 +1463,100 @@ mod tests {
// 67 vs 67
assert_eq!(hashes[0], hashes[4]);
}

#[test]
#[cfg(not(feature = "force_hash_collisions"))]
fn create_hashes_for_sliced_run_array() -> Result<()> {
let values = Arc::new(Int32Array::from(vec![10, 20, 30]));
let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
let array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());

let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut full_hashes = vec![0; array.len()];
create_hashes(
&[Arc::clone(&array) as ArrayRef],
&random_state,
&mut full_hashes,
)?;

let array_ref: ArrayRef = Arc::clone(&array) as ArrayRef;
let sliced_array = array_ref.slice(2, 3);

let mut sliced_hashes = vec![0; sliced_array.len()];
create_hashes(
std::slice::from_ref(&sliced_array),
&random_state,
&mut sliced_hashes,
)?;

assert_eq!(sliced_hashes.len(), 3);
assert_eq!(sliced_hashes[0], sliced_hashes[1]);
assert_eq!(sliced_hashes[1], sliced_hashes[2]);
assert_eq!(&sliced_hashes, &full_hashes[2..5]);

Ok(())
}

#[test]
#[cfg(not(feature = "force_hash_collisions"))]
fn test_run_array_with_nulls() -> Result<()> {
let values = Arc::new(Int32Array::from(vec![Some(10), None, Some(20)]));
let run_ends = Arc::new(Int32Array::from(vec![2, 4, 6]));
let array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());

let random_state = RandomState::with_seeds(0, 0, 0, 0);
let mut hashes = vec![0; array.len()];
create_hashes(
&[Arc::clone(&array) as ArrayRef],
&random_state,
&mut hashes,
)?;

assert_eq!(hashes[0], hashes[1]);
assert_ne!(hashes[0], 0);
assert_eq!(hashes[2], hashes[3]);
assert_eq!(hashes[2], 0);
assert_eq!(hashes[4], hashes[5]);
assert_ne!(hashes[4], 0);
assert_ne!(hashes[0], hashes[4]);

Ok(())
}

#[test]
#[cfg(not(feature = "force_hash_collisions"))]
fn test_run_array_with_nulls_multicolumn() -> Result<()> {
let primitive_array = Arc::new(Int32Array::from(vec![Some(10), None, Some(20)]));
let run_values = Arc::new(Int32Array::from(vec![Some(10), None, Some(20)]));
let run_ends = Arc::new(Int32Array::from(vec![1, 2, 3]));
let run_array =
Arc::new(RunArray::try_new(&run_ends, run_values.as_ref()).unwrap());
let second_col = Arc::new(Int32Array::from(vec![100, 200, 300]));

let random_state = RandomState::with_seeds(0, 0, 0, 0);

let mut primitive_hashes = vec![0; 3];
create_hashes(
&[
Arc::clone(&primitive_array) as ArrayRef,
Arc::clone(&second_col) as ArrayRef,
],
&random_state,
&mut primitive_hashes,
)?;

let mut run_hashes = vec![0; 3];
create_hashes(
&[
Arc::clone(&run_array) as ArrayRef,
Arc::clone(&second_col) as ArrayRef,
],
&random_state,
&mut run_hashes,
)?;

assert_eq!(primitive_hashes, run_hashes);

Ok(())
}
}