From c46e9a270e8def76c91a1add78bd76da066633fd Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Mon, 26 Aug 2024 09:27:48 +0800 Subject: [PATCH 1/2] use non-nested kernel for non-nested Signed-off-by: jayzhan211 --- datafusion/functions-nested/src/array_has.rs | 9 ++++----- datafusion/physical-expr-common/src/datum.rs | 13 ++++++++++++- .../physical-expr/src/expressions/in_list.rs | 14 ++++---------- 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/datafusion/functions-nested/src/array_has.rs b/datafusion/functions-nested/src/array_has.rs index 9b4357d0d14f..0362207b508e 100644 --- a/datafusion/functions-nested/src/array_has.rs +++ b/datafusion/functions-nested/src/array_has.rs @@ -24,9 +24,8 @@ use arrow_array::{Datum, GenericListArray, Scalar}; use datafusion_common::cast::as_generic_list_array; use datafusion_common::utils::string_utils::string_array_to_vec; use datafusion_common::{exec_err, Result, ScalarValue}; -use datafusion_expr::{ColumnarValue, Operator, ScalarUDFImpl, Signature, Volatility}; - -use datafusion_physical_expr_common::datum::compare_op_for_nested; +use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; +use datafusion_physical_expr_common::datum::compare_with_eq; use itertools::Itertools; use crate::utils::make_scalar_function; @@ -181,7 +180,7 @@ fn array_has_dispatch_for_array( } let arr = arr.unwrap(); let needle_row = Scalar::new(needle.slice(i, 1)); - let eq_array = compare_op_for_nested(Operator::Eq, &arr, &needle_row)?; + let eq_array = compare_with_eq(&arr, &needle_row)?; let is_contained = eq_array.true_count() > 0; boolean_builder.append_value(is_contained) } @@ -201,7 +200,7 @@ fn array_has_dispatch_for_scalar( if values.len() == 0 { return Ok(Arc::new(BooleanArray::from(vec![Some(false)]))); } - let eq_array = compare_op_for_nested(Operator::Eq, values, needle)?; + let eq_array = compare_with_eq(values, needle)?; let mut final_contained = vec![None; haystack.len()]; for (i, offset) in offsets.windows(2).enumerate() { let start = offset[0].to_usize().unwrap(); diff --git a/datafusion/physical-expr-common/src/datum.rs b/datafusion/physical-expr-common/src/datum.rs index 96c08d0d3a5b..87e621b8f38c 100644 --- a/datafusion/physical-expr-common/src/datum.rs +++ b/datafusion/physical-expr-common/src/datum.rs @@ -20,7 +20,8 @@ use arrow::array::{make_comparator, ArrayRef, Datum}; use arrow::buffer::NullBuffer; use arrow::compute::SortOptions; use arrow::error::ArrowError; -use datafusion_common::internal_err; +use datafusion_common::DataFusionError; +use datafusion_common::{arrow_datafusion_err, internal_err}; use datafusion_common::{Result, ScalarValue}; use datafusion_expr_common::columnar_value::ColumnarValue; use datafusion_expr_common::operator::Operator; @@ -87,6 +88,16 @@ pub fn apply_cmp_for_nested( } } +/// Compare with eq with either nested or non-nested +pub fn compare_with_eq(lhs: &dyn Datum, rhs: &dyn Datum) -> Result { + let (array, _is_scalar) = lhs.get(); + if array.data_type().is_nested() { + compare_op_for_nested(Operator::Eq, lhs, rhs) + } else { + arrow::compute::kernels::cmp::eq(lhs, rhs).map_err(|e| arrow_datafusion_err!(e)) + } +} + /// Compare on nested type List, Struct, and so on pub fn compare_op_for_nested( op: Operator, diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index dfc70551ccf6..97ee84dadb71 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -40,8 +40,8 @@ use datafusion_common::hash_utils::HashValue; use datafusion_common::{ exec_err, internal_err, not_impl_err, DFSchema, Result, ScalarValue, }; -use datafusion_expr::{ColumnarValue, Operator}; -use datafusion_physical_expr_common::datum::compare_op_for_nested; +use datafusion_expr::ColumnarValue; +use datafusion_physical_expr_common::datum::compare_with_eq; use ahash::RandomState; use hashbrown::hash_map::RawEntryMut; @@ -359,14 +359,8 @@ impl PhysicalExpr for InListExpr { let found = self.list.iter().map(|expr| expr.evaluate(batch)).try_fold( BooleanArray::new(BooleanBuffer::new_unset(num_rows), None), |result, expr| -> Result { - Ok(or_kleene( - &result, - &compare_op_for_nested( - Operator::Eq, - &value, - &expr?.into_array(num_rows)?, - )?, - )?) + let rhs = compare_with_eq(&value, &expr?.into_array(num_rows)?)?; + Ok(or_kleene(&result, &rhs)?) }, )?; From 40b34452808c210b79e705fcbb5b702fb33bbeda Mon Sep 17 00:00:00 2001 From: jayzhan211 Date: Mon, 26 Aug 2024 11:48:25 +0800 Subject: [PATCH 2/2] check is_nested outside of func Signed-off-by: jayzhan211 --- datafusion/functions-nested/src/array_has.rs | 6 ++++-- datafusion/physical-expr-common/src/datum.rs | 9 ++++++--- datafusion/physical-expr/src/expressions/in_list.rs | 7 ++++++- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/datafusion/functions-nested/src/array_has.rs b/datafusion/functions-nested/src/array_has.rs index 0362207b508e..7f66eba5df08 100644 --- a/datafusion/functions-nested/src/array_has.rs +++ b/datafusion/functions-nested/src/array_has.rs @@ -179,8 +179,9 @@ fn array_has_dispatch_for_array( continue; } let arr = arr.unwrap(); + let is_nested = arr.data_type().is_nested(); let needle_row = Scalar::new(needle.slice(i, 1)); - let eq_array = compare_with_eq(&arr, &needle_row)?; + let eq_array = compare_with_eq(&arr, &needle_row, is_nested)?; let is_contained = eq_array.true_count() > 0; boolean_builder.append_value(is_contained) } @@ -194,13 +195,14 @@ fn array_has_dispatch_for_scalar( ) -> Result { let haystack = as_generic_list_array::(haystack)?; let values = haystack.values(); + let is_nested = values.data_type().is_nested(); let offsets = haystack.value_offsets(); // If first argument is empty list (second argument is non-null), return false // i.e. array_has([], non-null element) -> false if values.len() == 0 { return Ok(Arc::new(BooleanArray::from(vec![Some(false)]))); } - let eq_array = compare_with_eq(values, needle)?; + let eq_array = compare_with_eq(values, needle, is_nested)?; let mut final_contained = vec![None; haystack.len()]; for (i, offset) in offsets.windows(2).enumerate() { let start = offset[0].to_usize().unwrap(); diff --git a/datafusion/physical-expr-common/src/datum.rs b/datafusion/physical-expr-common/src/datum.rs index 87e621b8f38c..c47ec9d75d50 100644 --- a/datafusion/physical-expr-common/src/datum.rs +++ b/datafusion/physical-expr-common/src/datum.rs @@ -89,9 +89,12 @@ pub fn apply_cmp_for_nested( } /// Compare with eq with either nested or non-nested -pub fn compare_with_eq(lhs: &dyn Datum, rhs: &dyn Datum) -> Result { - let (array, _is_scalar) = lhs.get(); - if array.data_type().is_nested() { +pub fn compare_with_eq( + lhs: &dyn Datum, + rhs: &dyn Datum, + is_nested: bool, +) -> Result { + if is_nested { compare_op_for_nested(Operator::Eq, lhs, rhs) } else { arrow::compute::kernels::cmp::eq(lhs, rhs).map_err(|e| arrow_datafusion_err!(e)) diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index 97ee84dadb71..0a3e5fcefcf6 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -356,10 +356,15 @@ impl PhysicalExpr for InListExpr { Some(f) => f.contains(value.into_array(num_rows)?.as_ref(), self.negated)?, None => { let value = value.into_array(num_rows)?; + let is_nested = value.data_type().is_nested(); let found = self.list.iter().map(|expr| expr.evaluate(batch)).try_fold( BooleanArray::new(BooleanBuffer::new_unset(num_rows), None), |result, expr| -> Result { - let rhs = compare_with_eq(&value, &expr?.into_array(num_rows)?)?; + let rhs = compare_with_eq( + &value, + &expr?.into_array(num_rows)?, + is_nested, + )?; Ok(or_kleene(&result, &rhs)?) }, )?;