From bce7b20a9a46035852265bcf0940794a38a7c4cd Mon Sep 17 00:00:00 2001 From: Kumar Ujjawal Date: Sat, 15 Nov 2025 22:02:34 +0530 Subject: [PATCH 1/2] refactor with assert_or_internal_err!() in datafusion/physical-expr --- datafusion/physical-expr/src/aggregate.rs | 9 ++--- datafusion/physical-expr/src/analysis.rs | 34 +++++++++---------- .../physical-expr/src/expressions/case.rs | 26 ++++++++------ .../physical-expr/src/expressions/in_list.rs | 12 +++---- .../physical-expr/src/expressions/like.rs | 11 +++--- datafusion/physical-expr/src/projection.rs | 17 +++++----- 6 files changed, 57 insertions(+), 52 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate.rs b/datafusion/physical-expr/src/aggregate.rs index 2a8467eb8832..ae5a4a855947 100644 --- a/datafusion/physical-expr/src/aggregate.rs +++ b/datafusion/physical-expr/src/aggregate.rs @@ -41,7 +41,10 @@ use crate::expressions::Column; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, FieldRef, Schema, SchemaRef}; -use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue}; +use datafusion_common::{ + assert_or_internal_err, internal_err, not_impl_err, DataFusionError, Result, + ScalarValue, +}; use datafusion_expr::{AggregateUDF, ReversedUDAF, SetMonotonicity}; use datafusion_expr_common::accumulator::Accumulator; use datafusion_expr_common::groups_accumulator::GroupsAccumulator; @@ -198,9 +201,7 @@ impl AggregateExprBuilder { is_distinct, is_reversed, } = self; - if args.is_empty() { - return internal_err!("args should not be empty"); - } + assert_or_internal_err!(!args.is_empty(), "args should not be empty"); let ordering_types = order_bys .iter() diff --git a/datafusion/physical-expr/src/analysis.rs b/datafusion/physical-expr/src/analysis.rs index f34dfb4ae1b4..dae439d5db36 100644 --- a/datafusion/physical-expr/src/analysis.rs +++ b/datafusion/physical-expr/src/analysis.rs @@ -28,7 +28,8 @@ use crate::PhysicalExpr; use arrow::datatypes::Schema; use datafusion_common::stats::Precision; use datafusion_common::{ - internal_datafusion_err, internal_err, ColumnStatistics, Result, ScalarValue, + assert_or_internal_err, internal_datafusion_err, internal_err, ColumnStatistics, + DataFusionError, Result, ScalarValue, }; use datafusion_expr::interval_arithmetic::{cardinality_ratio, Interval}; @@ -170,19 +171,16 @@ pub fn analyze( .iter() .all(|bound| bound.interval.is_none()) { - if initial_boundaries - .iter() - .any(|bound| bound.distinct_count != Precision::Exact(0)) - { - return internal_err!( - "ExprBoundaries has a non-zero distinct count although it represents an empty table" - ); - } - if context.selectivity != Some(0.0) { - return internal_err!( - "AnalysisContext has a non-zero selectivity although it represents an empty table" - ); - } + assert_or_internal_err!( + !initial_boundaries + .iter() + .any(|bound| bound.distinct_count != Precision::Exact(0)), + "ExprBoundaries has a non-zero distinct count although it represents an empty table" + ); + assert_or_internal_err!( + context.selectivity == Some(0.0), + "AnalysisContext has a non-zero selectivity although it represents an empty table" + ); Ok(context) } else if initial_boundaries .iter() @@ -257,9 +255,11 @@ fn shrink_boundaries( let selectivity = calculate_selectivity(&target_boundaries, &initial_boundaries)?; - if !(0.0..=1.0).contains(&selectivity) { - return internal_err!("Selectivity is out of limit: {}", selectivity); - } + assert_or_internal_err!( + (0.0..=1.0).contains(&selectivity), + "Selectivity is out of limit: {}", + selectivity + ); Ok(AnalysisContext::new(target_boundaries).with_selectivity(selectivity)) } diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs index 7a33aa95c56b..9db4a51c8404 100644 --- a/datafusion/physical-expr/src/expressions/case.rs +++ b/datafusion/physical-expr/src/expressions/case.rs @@ -30,8 +30,8 @@ use arrow::error::ArrowError; use datafusion_common::cast::as_boolean_array; use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; use datafusion_common::{ - exec_err, internal_datafusion_err, internal_err, DataFusionError, HashMap, HashSet, - Result, ScalarValue, + assert_or_internal_err, exec_err, internal_datafusion_err, internal_err, + DataFusionError, HashMap, HashSet, Result, ScalarValue, }; use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::datum::compare_with_eq; @@ -517,9 +517,10 @@ impl PartialResultIndex { return internal_err!("Partial result index exceeds limit"); }; - if index == NONE_VALUE { - return internal_err!("Partial result index exceeds limit"); - } + assert_or_internal_err!( + index != NONE_VALUE, + "Partial result index exceeds limit" + ); Ok(Self { index }) } @@ -663,9 +664,10 @@ impl ResultBuilder { row_indices: &ArrayRef, row_values: ArrayData, ) -> Result<()> { - if row_indices.null_count() != 0 { - return internal_err!("Row indices must not contain nulls"); - } + assert_or_internal_err!( + row_indices.null_count() == 0, + "Row indices must not contain nulls" + ); match &mut self.state { Empty => { @@ -692,9 +694,11 @@ impl ResultBuilder { // `case_when_with_expr` and `case_when_no_expr`, already ensure that // they only calculate a value for each row at most once. #[cfg(debug_assertions)] - if !indices[*row_ix as usize].is_none() { - return internal_err!("Duplicate value for row {}", *row_ix); - } + assert_or_internal_err!( + indices[*row_ix as usize].is_none(), + "Duplicate value for row {}", + *row_ix + ); indices[*row_ix as usize] = array_index; } diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index eeac986beec0..4bcfbe35d018 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -38,7 +38,8 @@ use datafusion_common::cast::{ }; use datafusion_common::hash_utils::HashValue; use datafusion_common::{ - exec_err, internal_err, not_impl_err, DFSchema, Result, ScalarValue, + assert_or_internal_err, exec_err, not_impl_err, DFSchema, DataFusionError, Result, + ScalarValue, }; use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::datum::compare_with_eq; @@ -459,11 +460,10 @@ pub fn in_list( let expr_data_type = expr.data_type(schema)?; for list_expr in list.iter() { let list_expr_data_type = list_expr.data_type(schema)?; - if !DFSchema::datatype_is_logically_equal(&expr_data_type, &list_expr_data_type) { - return internal_err!( - "The data type inlist should be same, the value type is {expr_data_type}, one of list expr type is {list_expr_data_type}" - ); - } + assert_or_internal_err!( + DFSchema::datatype_is_logically_equal(&expr_data_type, &list_expr_data_type), + "The data type inlist should be same, the value type is {expr_data_type}, one of list expr type is {list_expr_data_type}" + ); } let static_filter = try_cast_static_filter_to_set(&list, schema).ok(); Ok(Arc::new(InListExpr::new( diff --git a/datafusion/physical-expr/src/expressions/like.rs b/datafusion/physical-expr/src/expressions/like.rs index 1c9ae530f500..3046e8a028a8 100644 --- a/datafusion/physical-expr/src/expressions/like.rs +++ b/datafusion/physical-expr/src/expressions/like.rs @@ -18,7 +18,7 @@ use crate::PhysicalExpr; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; -use datafusion_common::{internal_err, Result}; +use datafusion_common::{assert_or_internal_err, DataFusionError, Result}; use datafusion_expr::{ColumnarValue, Operator}; use datafusion_physical_expr_common::datum::apply_cmp; use std::hash::Hash; @@ -169,11 +169,10 @@ pub fn like( ) -> Result> { let expr_type = &expr.data_type(input_schema)?; let pattern_type = &pattern.data_type(input_schema)?; - if !expr_type.eq(pattern_type) && !can_like_type(expr_type) { - return internal_err!( - "The type of {expr_type} AND {pattern_type} of like physical should be same" - ); - } + assert_or_internal_err!( + expr_type.eq(pattern_type) || can_like_type(expr_type), + "The type of {expr_type} AND {pattern_type} of like physical should be same" + ); Ok(Arc::new(LikeExpr::new( negated, case_insensitive, diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index a120ab427e1d..cfb69e0e7d40 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -25,7 +25,9 @@ use crate::PhysicalExpr; use arrow::datatypes::{Field, Schema, SchemaRef}; use datafusion_common::stats::{ColumnStatistics, Precision}; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result}; +use datafusion_common::{ + assert_or_internal_err, internal_datafusion_err, plan_err, DataFusionError, Result, +}; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use indexmap::IndexMap; @@ -625,13 +627,12 @@ impl ProjectionMapping { let idx = col.index(); let matching_field = input_schema.field(idx); let matching_name = matching_field.name(); - if col.name() != matching_name { - return internal_err!( - "Input field name {} does not match with the projection expression {}", - matching_name, - col.name() - ); - } + assert_or_internal_err!( + col.name() == matching_name, + "Input field name {} does not match with the projection expression {}", + matching_name, + col.name() + ); let matching_column = Column::new(matching_name, idx); Ok(Transformed::yes(Arc::new(matching_column))) } From 3058fa86584001dcab5386dbfca03d37e6751da0 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 16 Nov 2025 05:29:37 +0800 Subject: [PATCH 2/2] Apply suggestions from code review --- datafusion/physical-expr/src/analysis.rs | 3 +-- datafusion/physical-expr/src/projection.rs | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-expr/src/analysis.rs b/datafusion/physical-expr/src/analysis.rs index dae439d5db36..d85fab9db001 100644 --- a/datafusion/physical-expr/src/analysis.rs +++ b/datafusion/physical-expr/src/analysis.rs @@ -257,8 +257,7 @@ fn shrink_boundaries( assert_or_internal_err!( (0.0..=1.0).contains(&selectivity), - "Selectivity is out of limit: {}", - selectivity + "Selectivity is out of limit: {selectivity}", ); Ok(AnalysisContext::new(target_boundaries).with_selectivity(selectivity)) diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index cfb69e0e7d40..4e60b1dcc0a0 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -629,8 +629,7 @@ impl ProjectionMapping { let matching_name = matching_field.name(); assert_or_internal_err!( col.name() == matching_name, - "Input field name {} does not match with the projection expression {}", - matching_name, + "Input field name {matching_name} does not match with the projection expression {}", col.name() ); let matching_column = Column::new(matching_name, idx);