Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 225 additions & 0 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,20 @@ pub use struct_builder::ScalarStructBuilder;
/// for the definitive reference.
///
/// [`NullArray`]: arrow::array::NullArray
/// Scalar value for RunEndEncoded type
///
/// RunEndEncoded is an Arrow data type for efficient storage of repeated values
/// through run-length encoding. It consists of two arrays:
/// - run_ends: defines the end positions of each run
/// - values: the actual values that are repeated
#[derive(Debug, Clone)]
pub struct RunEndEncodedScalar {
/// Array defining the end positions of each run
pub run_ends: ArrayRef,
/// Array containing the actual values
pub values: ArrayRef,
}

#[derive(Clone)]
pub enum ScalarValue {
/// represents `DataType::Null` (castable to/from any other type)
Expand Down Expand Up @@ -331,6 +345,71 @@ pub enum ScalarValue {
Union(Option<(i8, Box<ScalarValue>)>, UnionFields, UnionMode),
/// Dictionary type: index type and value
Dictionary(Box<DataType>, Box<ScalarValue>),
/// RunEndEncoded type for run-length encoding of repeated values
RunEndEncoded(Arc<RunEndEncodedScalar>),
}

impl PartialEq for RunEndEncodedScalar {
fn eq(&self, other: &Self) -> bool {
// For RunEndEncoded, we need to compare both run_ends and values
// Use as_ref() to compare the arrays without moving
self.run_ends.as_ref() == other.run_ends.as_ref()
&& self.values.as_ref() == other.values.as_ref()
}
}

impl Eq for RunEndEncodedScalar {}

impl PartialOrd for RunEndEncodedScalar {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
// Compare run_ends first using Arrow compute kernels
let run_ends_cmp = partial_cmp_arrays(&self.run_ends, &other.run_ends)?;
if run_ends_cmp != Ordering::Equal {
return Some(run_ends_cmp);
}
// If run_ends are equal, compare values
partial_cmp_arrays(&self.values, &other.values)
}
}

/// Helper function to compare two arrays using Arrow compute kernels
fn partial_cmp_arrays(arr1: &ArrayRef, arr2: &ArrayRef) -> Option<Ordering> {
if arr1.data_type() != arr2.data_type() {
return None;
}

let min_length = arr1.len().min(arr2.len());
let arr1_trimmed = arr1.slice(0, min_length);
let arr2_trimmed = arr2.slice(0, min_length);

let lt_res = arrow::compute::kernels::cmp::lt(&arr1_trimmed, &arr2_trimmed).ok()?;
let eq_res = arrow::compute::kernels::cmp::eq(&arr1_trimmed, &arr2_trimmed).ok()?;

for j in 0..lt_res.len() {
if arr1_trimmed.is_null(j) && !arr2_trimmed.is_null(j) {
return Some(Ordering::Greater);
}
if !arr1_trimmed.is_null(j) && arr2_trimmed.is_null(j) {
return Some(Ordering::Less);
}

if lt_res.is_valid(j) && lt_res.value(j) {
return Some(Ordering::Less);
}
if eq_res.is_valid(j) && !eq_res.value(j) {
return Some(Ordering::Greater);
}
}

Some(arr1.len().cmp(&arr2.len()))
}

impl Hash for RunEndEncodedScalar {
fn hash<H: Hasher>(&self, state: &mut H) {
// Use hash_nested_array for both run_ends and values
hash_nested_array(Arc::clone(&self.run_ends), state);
hash_nested_array(Arc::clone(&self.values), state);
}
}

impl Hash for Fl<f16> {
Expand Down Expand Up @@ -460,6 +539,8 @@ impl PartialEq for ScalarValue {
(Union(_, _, _), _) => false,
(Dictionary(k1, v1), Dictionary(k2, v2)) => k1.eq(k2) && v1.eq(v2),
(Dictionary(_, _), _) => false,
(RunEndEncoded(s1), RunEndEncoded(s2)) => s1.eq(s2),
(RunEndEncoded(_), _) => false,
(Null, Null) => true,
(Null, _) => false,
}
Expand All @@ -474,6 +555,8 @@ impl PartialOrd for ScalarValue {
// any newly added enum variant will require editing this list
// or else face a compile error
match (self, other) {
(RunEndEncoded(s1), RunEndEncoded(s2)) => s1.partial_cmp(s2),
(RunEndEncoded(_), _) => None,
(Decimal32(v1, p1, s1), Decimal32(v2, p2, s2)) => {
if p1.eq(p2) && s1.eq(s2) {
v1.partial_cmp(v2)
Expand Down Expand Up @@ -870,6 +953,7 @@ impl Hash for ScalarValue {
k.hash(state);
v.hash(state);
}
RunEndEncoded(s) => s.hash(state),
// stable hash for Null value
Null => 1.hash(state),
}
Expand Down Expand Up @@ -1184,6 +1268,15 @@ impl ScalarValue {
DataType::Union(fields, mode) => {
ScalarValue::Union(None, fields.clone(), *mode)
}
DataType::RunEndEncoded(run_ends_field, values_field) => {
// Create empty arrays for null RunEndEncoded scalar
let run_ends_array = new_null_array(run_ends_field.data_type(), 0);
let values_array = new_null_array(values_field.data_type(), 0);
ScalarValue::RunEndEncoded(Arc::new(RunEndEncodedScalar {
run_ends: run_ends_array,
values: values_array,
}))
}
DataType::Null => ScalarValue::Null,
_ => {
return _not_impl_err!(
Expand Down Expand Up @@ -1878,6 +1971,18 @@ impl ScalarValue {
ScalarValue::Dictionary(k, v) => {
DataType::Dictionary(k.clone(), Box::new(v.data_type()))
}
ScalarValue::RunEndEncoded(ree) => {
// RunEndEncoded type is determined by the values array type
// We need to construct the RunEndEncoded type from the values array
DataType::RunEndEncoded(
Arc::new(Field::new(
"run_ends",
ree.run_ends.data_type().clone(),
false,
)),
Arc::new(Field::new("values", ree.values.data_type().clone(), true)),
)
}
ScalarValue::Null => DataType::Null,
}
}
Expand Down Expand Up @@ -2008,6 +2113,11 @@ impl ScalarValue {
tz.clone(),
))
}
ScalarValue::RunEndEncoded(_) => {
_internal_err!(
"Can not run arithmetic negative on RunEndEncoded scalar value"
)
}
value => _internal_err!(
"Can not run arithmetic negative on scalar value {value:?}"
),
Expand Down Expand Up @@ -2157,6 +2267,10 @@ impl ScalarValue {
None => true,
},
ScalarValue::Dictionary(_, v) => v.is_null(),
ScalarValue::RunEndEncoded(ree) => {
// RunEndEncoded is null if both arrays are empty or all values are null
ree.values.is_empty() || ree.values.null_count() == ree.values.len()
}
}
}

Expand Down Expand Up @@ -3172,6 +3286,9 @@ impl ScalarValue {
_ => unreachable!("Invalid dictionary keys type: {}", key_type),
}
}
ScalarValue::RunEndEncoded(_) => {
return _internal_err!("RunEndEncoded scalar values cannot be converted to arrays of arbitrary size");
}
ScalarValue::Null => get_or_create_cached_null_array(size),
})
}
Expand Down Expand Up @@ -3947,6 +4064,9 @@ impl ScalarValue {
None => v.is_null(),
}
}
ScalarValue::RunEndEncoded(_) => {
return _internal_err!("RunEndEncoded scalar values cannot be compared with arrays using eq_array");
}
ScalarValue::Null => array.is_null(index),
})
}
Expand Down Expand Up @@ -4036,6 +4156,11 @@ impl ScalarValue {
// `dt` and `sv` are boxed, so they are NOT already included in `self`
dt.size() + sv.size()
}
ScalarValue::RunEndEncoded(ree) => {
// Size of the Arc wrapper + size of the arrays
ree.run_ends.get_array_memory_size()
+ ree.values.get_array_memory_size()
}
}
}

Expand Down Expand Up @@ -4151,6 +4276,10 @@ impl ScalarValue {
ScalarValue::Dictionary(_, value) => {
value.compact();
}
ScalarValue::RunEndEncoded(_ree) => {
// RunEndEncoded contains arrays, but they are already Arc'd and don't need compaction
// The arrays themselves are immutable, so no action needed
}
}
}

Expand Down Expand Up @@ -4768,6 +4897,14 @@ impl fmt::Display for ScalarValue {
None => write!(f, "NULL")?,
},
ScalarValue::Dictionary(_k, v) => write!(f, "{v}")?,
ScalarValue::RunEndEncoded(ree) => {
write!(
f,
"RunEndEncoded(run_ends: {}, values: {})",
ree.run_ends.len(),
ree.values.len()
)?;
}
ScalarValue::Null => write!(f, "NULL")?,
};
Ok(())
Expand Down Expand Up @@ -4946,6 +5083,14 @@ impl fmt::Debug for ScalarValue {
None => write!(f, "Union(NULL)"),
},
ScalarValue::Dictionary(k, v) => write!(f, "Dictionary({k:?}, {v:?})"),
ScalarValue::RunEndEncoded(ree) => {
write!(
f,
"RunEndEncoded(run_ends: {}, values: {})",
ree.run_ends.len(),
ree.values.len()
)
}
ScalarValue::Null => write!(f, "NULL"),
}
}
Expand Down Expand Up @@ -8713,6 +8858,11 @@ mod tests {
UnionMode::Dense,
))
.unwrap(),
ScalarValue::try_new_null(&DataType::RunEndEncoded(
Arc::new(Field::new("run_ends", DataType::Int32, false)),
Arc::new(Field::new("values", DataType::Utf8, true)),
))
.unwrap(),
];
assert!(scalars.iter().all(|s| s.is_null()));
}
Expand Down Expand Up @@ -9168,4 +9318,79 @@ mod tests {
]
);
}

#[test]
fn test_run_end_encoded_equality() {
// Тест для проверки равенства двух RunEndEncoded значений
let run_ends1 = Arc::new(Int32Array::from(vec![2, 4, 6]));
let values1 = Arc::new(Int32Array::from(vec![1, 2, 3]));
let ree1 = RunEndEncodedScalar {
run_ends: run_ends1,
values: values1,
};

let run_ends2 = Arc::new(Int32Array::from(vec![2, 4, 6]));
let values2 = Arc::new(Int32Array::from(vec![1, 2, 3]));
let ree2 = RunEndEncodedScalar {
run_ends: run_ends2,
values: values2,
};

assert_eq!(ree1, ree2);

let scalar1 = ScalarValue::RunEndEncoded(Arc::new(ree1));
let scalar2 = ScalarValue::RunEndEncoded(Arc::new(ree2));
assert_eq!(scalar1, scalar2);
}

#[test]
fn test_run_end_encoded_hash() {
// Тест для проверки хэширования
let run_ends = Arc::new(Int32Array::from(vec![2, 4, 6]));
let values = Arc::new(Int32Array::from(vec![1, 2, 3]));
let ree = RunEndEncodedScalar {
run_ends: run_ends.clone(),
values: values.clone(),
};

let scalar1 = ScalarValue::RunEndEncoded(Arc::new(ree));
let scalar2 = ScalarValue::RunEndEncoded(Arc::new(RunEndEncodedScalar {
run_ends: run_ends,
values: values,
}));

use std::collections::hash_map::DefaultHasher;
use std::hash::Hash;

let mut hasher1 = DefaultHasher::new();
let mut hasher2 = DefaultHasher::new();

scalar1.hash(&mut hasher1);
scalar2.hash(&mut hasher2);

assert_eq!(hasher1.finish(), hasher2.finish());
}

#[test]
fn test_run_end_encoded_partial_ord() {
// Тест для частичного упорядочивания
let run_ends1 = Arc::new(Int32Array::from(vec![2, 4, 6]));
let values1 = Arc::new(Int32Array::from(vec![1, 2, 3]));
let ree1 = RunEndEncodedScalar {
run_ends: run_ends1,
values: values1,
};

let run_ends2 = Arc::new(Int32Array::from(vec![2, 4, 6]));
let values2 = Arc::new(Int32Array::from(vec![1, 2, 3]));
let ree2 = RunEndEncodedScalar {
run_ends: run_ends2,
values: values2,
};

let scalar1 = ScalarValue::RunEndEncoded(Arc::new(ree1));
let scalar2 = ScalarValue::RunEndEncoded(Arc::new(ree2));

assert_eq!(scalar1.partial_cmp(&scalar2), Some(Ordering::Equal));
}
}