diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index fadd2e41eaba..b90c2aee4a78 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -222,6 +222,20 @@ pub use struct_builder::ScalarStructBuilder; /// for the definitive reference. /// /// [`NullArray`]: arrow::array::NullArray +/// Scalar value for RunEndEncoded type +/// +/// RunEndEncoded is an Arrow data type for efficient storage of repeated values +/// through run-length encoding. It consists of two arrays: +/// - run_ends: defines the end positions of each run +/// - values: the actual values that are repeated +#[derive(Debug, Clone)] +pub struct RunEndEncodedScalar { + /// Array defining the end positions of each run + pub run_ends: ArrayRef, + /// Array containing the actual values + pub values: ArrayRef, +} + #[derive(Clone)] pub enum ScalarValue { /// represents `DataType::Null` (castable to/from any other type) @@ -331,6 +345,71 @@ pub enum ScalarValue { Union(Option<(i8, Box)>, UnionFields, UnionMode), /// Dictionary type: index type and value Dictionary(Box, Box), + /// RunEndEncoded type for run-length encoding of repeated values + RunEndEncoded(Arc), +} + +impl PartialEq for RunEndEncodedScalar { + fn eq(&self, other: &Self) -> bool { + // For RunEndEncoded, we need to compare both run_ends and values + // Use as_ref() to compare the arrays without moving + self.run_ends.as_ref() == other.run_ends.as_ref() + && self.values.as_ref() == other.values.as_ref() + } +} + +impl Eq for RunEndEncodedScalar {} + +impl PartialOrd for RunEndEncodedScalar { + fn partial_cmp(&self, other: &Self) -> Option { + // Compare run_ends first using Arrow compute kernels + let run_ends_cmp = partial_cmp_arrays(&self.run_ends, &other.run_ends)?; + if run_ends_cmp != Ordering::Equal { + return Some(run_ends_cmp); + } + // If run_ends are equal, compare values + partial_cmp_arrays(&self.values, &other.values) + } +} + +/// Helper function to compare two arrays using Arrow compute kernels +fn partial_cmp_arrays(arr1: &ArrayRef, arr2: &ArrayRef) -> Option { + if arr1.data_type() != arr2.data_type() { + return None; + } + + let min_length = arr1.len().min(arr2.len()); + let arr1_trimmed = arr1.slice(0, min_length); + let arr2_trimmed = arr2.slice(0, min_length); + + let lt_res = arrow::compute::kernels::cmp::lt(&arr1_trimmed, &arr2_trimmed).ok()?; + let eq_res = arrow::compute::kernels::cmp::eq(&arr1_trimmed, &arr2_trimmed).ok()?; + + for j in 0..lt_res.len() { + if arr1_trimmed.is_null(j) && !arr2_trimmed.is_null(j) { + return Some(Ordering::Greater); + } + if !arr1_trimmed.is_null(j) && arr2_trimmed.is_null(j) { + return Some(Ordering::Less); + } + + if lt_res.is_valid(j) && lt_res.value(j) { + return Some(Ordering::Less); + } + if eq_res.is_valid(j) && !eq_res.value(j) { + return Some(Ordering::Greater); + } + } + + Some(arr1.len().cmp(&arr2.len())) +} + +impl Hash for RunEndEncodedScalar { + fn hash(&self, state: &mut H) { + // Use hash_nested_array for both run_ends and values + hash_nested_array(Arc::clone(&self.run_ends), state); + hash_nested_array(Arc::clone(&self.values), state); + } } impl Hash for Fl { @@ -460,6 +539,8 @@ impl PartialEq for ScalarValue { (Union(_, _, _), _) => false, (Dictionary(k1, v1), Dictionary(k2, v2)) => k1.eq(k2) && v1.eq(v2), (Dictionary(_, _), _) => false, + (RunEndEncoded(s1), RunEndEncoded(s2)) => s1.eq(s2), + (RunEndEncoded(_), _) => false, (Null, Null) => true, (Null, _) => false, } @@ -474,6 +555,8 @@ impl PartialOrd for ScalarValue { // any newly added enum variant will require editing this list // or else face a compile error match (self, other) { + (RunEndEncoded(s1), RunEndEncoded(s2)) => s1.partial_cmp(s2), + (RunEndEncoded(_), _) => None, (Decimal32(v1, p1, s1), Decimal32(v2, p2, s2)) => { if p1.eq(p2) && s1.eq(s2) { v1.partial_cmp(v2) @@ -870,6 +953,7 @@ impl Hash for ScalarValue { k.hash(state); v.hash(state); } + RunEndEncoded(s) => s.hash(state), // stable hash for Null value Null => 1.hash(state), } @@ -1184,6 +1268,15 @@ impl ScalarValue { DataType::Union(fields, mode) => { ScalarValue::Union(None, fields.clone(), *mode) } + DataType::RunEndEncoded(run_ends_field, values_field) => { + // Create empty arrays for null RunEndEncoded scalar + let run_ends_array = new_null_array(run_ends_field.data_type(), 0); + let values_array = new_null_array(values_field.data_type(), 0); + ScalarValue::RunEndEncoded(Arc::new(RunEndEncodedScalar { + run_ends: run_ends_array, + values: values_array, + })) + } DataType::Null => ScalarValue::Null, _ => { return _not_impl_err!( @@ -1878,6 +1971,18 @@ impl ScalarValue { ScalarValue::Dictionary(k, v) => { DataType::Dictionary(k.clone(), Box::new(v.data_type())) } + ScalarValue::RunEndEncoded(ree) => { + // RunEndEncoded type is determined by the values array type + // We need to construct the RunEndEncoded type from the values array + DataType::RunEndEncoded( + Arc::new(Field::new( + "run_ends", + ree.run_ends.data_type().clone(), + false, + )), + Arc::new(Field::new("values", ree.values.data_type().clone(), true)), + ) + } ScalarValue::Null => DataType::Null, } } @@ -2008,6 +2113,11 @@ impl ScalarValue { tz.clone(), )) } + ScalarValue::RunEndEncoded(_) => { + _internal_err!( + "Can not run arithmetic negative on RunEndEncoded scalar value" + ) + } value => _internal_err!( "Can not run arithmetic negative on scalar value {value:?}" ), @@ -2157,6 +2267,10 @@ impl ScalarValue { None => true, }, ScalarValue::Dictionary(_, v) => v.is_null(), + ScalarValue::RunEndEncoded(ree) => { + // RunEndEncoded is null if both arrays are empty or all values are null + ree.values.is_empty() || ree.values.null_count() == ree.values.len() + } } } @@ -3172,6 +3286,9 @@ impl ScalarValue { _ => unreachable!("Invalid dictionary keys type: {}", key_type), } } + ScalarValue::RunEndEncoded(_) => { + return _internal_err!("RunEndEncoded scalar values cannot be converted to arrays of arbitrary size"); + } ScalarValue::Null => get_or_create_cached_null_array(size), }) } @@ -3947,6 +4064,9 @@ impl ScalarValue { None => v.is_null(), } } + ScalarValue::RunEndEncoded(_) => { + return _internal_err!("RunEndEncoded scalar values cannot be compared with arrays using eq_array"); + } ScalarValue::Null => array.is_null(index), }) } @@ -4036,6 +4156,11 @@ impl ScalarValue { // `dt` and `sv` are boxed, so they are NOT already included in `self` dt.size() + sv.size() } + ScalarValue::RunEndEncoded(ree) => { + // Size of the Arc wrapper + size of the arrays + ree.run_ends.get_array_memory_size() + + ree.values.get_array_memory_size() + } } } @@ -4151,6 +4276,10 @@ impl ScalarValue { ScalarValue::Dictionary(_, value) => { value.compact(); } + ScalarValue::RunEndEncoded(_ree) => { + // RunEndEncoded contains arrays, but they are already Arc'd and don't need compaction + // The arrays themselves are immutable, so no action needed + } } } @@ -4768,6 +4897,14 @@ impl fmt::Display for ScalarValue { None => write!(f, "NULL")?, }, ScalarValue::Dictionary(_k, v) => write!(f, "{v}")?, + ScalarValue::RunEndEncoded(ree) => { + write!( + f, + "RunEndEncoded(run_ends: {}, values: {})", + ree.run_ends.len(), + ree.values.len() + )?; + } ScalarValue::Null => write!(f, "NULL")?, }; Ok(()) @@ -4946,6 +5083,14 @@ impl fmt::Debug for ScalarValue { None => write!(f, "Union(NULL)"), }, ScalarValue::Dictionary(k, v) => write!(f, "Dictionary({k:?}, {v:?})"), + ScalarValue::RunEndEncoded(ree) => { + write!( + f, + "RunEndEncoded(run_ends: {}, values: {})", + ree.run_ends.len(), + ree.values.len() + ) + } ScalarValue::Null => write!(f, "NULL"), } } @@ -8713,6 +8858,11 @@ mod tests { UnionMode::Dense, )) .unwrap(), + ScalarValue::try_new_null(&DataType::RunEndEncoded( + Arc::new(Field::new("run_ends", DataType::Int32, false)), + Arc::new(Field::new("values", DataType::Utf8, true)), + )) + .unwrap(), ]; assert!(scalars.iter().all(|s| s.is_null())); } @@ -9168,4 +9318,79 @@ mod tests { ] ); } + + #[test] + fn test_run_end_encoded_equality() { + // Тест для проверки равенства двух RunEndEncoded значений + let run_ends1 = Arc::new(Int32Array::from(vec![2, 4, 6])); + let values1 = Arc::new(Int32Array::from(vec![1, 2, 3])); + let ree1 = RunEndEncodedScalar { + run_ends: run_ends1, + values: values1, + }; + + let run_ends2 = Arc::new(Int32Array::from(vec![2, 4, 6])); + let values2 = Arc::new(Int32Array::from(vec![1, 2, 3])); + let ree2 = RunEndEncodedScalar { + run_ends: run_ends2, + values: values2, + }; + + assert_eq!(ree1, ree2); + + let scalar1 = ScalarValue::RunEndEncoded(Arc::new(ree1)); + let scalar2 = ScalarValue::RunEndEncoded(Arc::new(ree2)); + assert_eq!(scalar1, scalar2); + } + + #[test] + fn test_run_end_encoded_hash() { + // Тест для проверки хэширования + let run_ends = Arc::new(Int32Array::from(vec![2, 4, 6])); + let values = Arc::new(Int32Array::from(vec![1, 2, 3])); + let ree = RunEndEncodedScalar { + run_ends: run_ends.clone(), + values: values.clone(), + }; + + let scalar1 = ScalarValue::RunEndEncoded(Arc::new(ree)); + let scalar2 = ScalarValue::RunEndEncoded(Arc::new(RunEndEncodedScalar { + run_ends: run_ends, + values: values, + })); + + use std::collections::hash_map::DefaultHasher; + use std::hash::Hash; + + let mut hasher1 = DefaultHasher::new(); + let mut hasher2 = DefaultHasher::new(); + + scalar1.hash(&mut hasher1); + scalar2.hash(&mut hasher2); + + assert_eq!(hasher1.finish(), hasher2.finish()); + } + + #[test] + fn test_run_end_encoded_partial_ord() { + // Тест для частичного упорядочивания + let run_ends1 = Arc::new(Int32Array::from(vec![2, 4, 6])); + let values1 = Arc::new(Int32Array::from(vec![1, 2, 3])); + let ree1 = RunEndEncodedScalar { + run_ends: run_ends1, + values: values1, + }; + + let run_ends2 = Arc::new(Int32Array::from(vec![2, 4, 6])); + let values2 = Arc::new(Int32Array::from(vec![1, 2, 3])); + let ree2 = RunEndEncodedScalar { + run_ends: run_ends2, + values: values2, + }; + + let scalar1 = ScalarValue::RunEndEncoded(Arc::new(ree1)); + let scalar2 = ScalarValue::RunEndEncoded(Arc::new(ree2)); + + assert_eq!(scalar1.partial_cmp(&scalar2), Some(Ordering::Equal)); + } }