Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions datafusion/expr/src/expr_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,9 @@ mod tests {
use super::*;
use crate::{col, lit, out_ref_col_with_metadata};

use datafusion_common::{internal_err, DFSchema, ScalarValue};
use datafusion_common::{
assert_or_internal_err, DFSchema, DataFusionError, ScalarValue,
};

macro_rules! test_is_expr_nullable {
($EXPR_TYPE:ident) => {{
Expand Down Expand Up @@ -1000,11 +1002,8 @@ mod tests {

impl ExprSchema for MockExprSchema {
fn nullable(&self, _col: &Column) -> Result<bool> {
if self.error_on_nullable {
internal_err!("nullable error")
} else {
Ok(self.field.is_nullable())
}
assert_or_internal_err!(!self.error_on_nullable, "nullable error");
Ok(self.field.is_nullable())
}

fn field_from_column(&self, _col: &Column) -> Result<&Field> {
Expand Down
20 changes: 9 additions & 11 deletions datafusion/expr/src/logical_plan/invariants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
// under the License.

use datafusion_common::{
internal_err, plan_err,
assert_or_internal_err, plan_err,
tree_node::{TreeNode, TreeNodeRecursion},
DFSchemaRef, Result,
DFSchemaRef, DataFusionError, Result,
};

use crate::{
Expand Down Expand Up @@ -114,15 +114,13 @@ fn assert_valid_semantic_plan(plan: &LogicalPlan) -> Result<()> {
pub fn assert_expected_schema(schema: &DFSchemaRef, plan: &LogicalPlan) -> Result<()> {
let compatible = plan.schema().logically_equivalent_names_and_types(schema);

if !compatible {
internal_err!(
"Failed due to a difference in schemas: original schema: {:?}, new schema: {:?}",
schema,
plan.schema()
)
} else {
Ok(())
}
assert_or_internal_err!(
compatible,
"Failed due to a difference in schemas: original schema: {:?}, new schema: {:?}",
schema,
plan.schema()
);
Ok(())
}

/// Asserts that the subqueries are structured properly with valid node placement.
Expand Down
74 changes: 39 additions & 35 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ use datafusion_common::tree_node::{
Transformed, TreeNode, TreeNodeContainer, TreeNodeRecursion,
};
use datafusion_common::{
aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints,
DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence,
FunctionalDependencies, NullEquality, ParamValues, Result, ScalarValue, Spans,
TableReference, UnnestOptions,
aggregate_functional_dependencies, assert_eq_or_internal_err, assert_or_internal_err,
internal_err, plan_err, Column, Constraints, DFSchema, DFSchemaRef, DataFusionError,
Dependency, FunctionalDependence, FunctionalDependencies, NullEquality, ParamValues,
Result, ScalarValue, Spans, TableReference, UnnestOptions,
};
use indexmap::IndexSet;

Expand Down Expand Up @@ -965,13 +965,13 @@ impl LogicalPlan {
}
LogicalPlan::Limit(Limit { skip, fetch, .. }) => {
let old_expr_len = skip.iter().chain(fetch.iter()).count();
if old_expr_len != expr.len() {
return internal_err!(
"Invalid number of new Limit expressions: expected {}, got {}",
old_expr_len,
expr.len()
);
}
assert_eq_or_internal_err!(
old_expr_len,
expr.len(),
"Invalid number of new Limit expressions: expected {}, got {}",
old_expr_len,
expr.len()
);
// `LogicalPlan::expressions()` returns in [skip, fetch] order, so we can pop from the end.
let new_fetch = fetch.as_ref().and_then(|_| expr.pop());
let new_skip = skip.as_ref().and_then(|_| expr.pop());
Expand Down Expand Up @@ -1158,43 +1158,47 @@ impl LogicalPlan {
#[inline]
#[expect(clippy::needless_pass_by_value)] // expr is moved intentionally to ensure it's not used again
fn assert_no_expressions(&self, expr: Vec<Expr>) -> Result<()> {
if !expr.is_empty() {
return internal_err!("{self:?} should have no exprs, got {:?}", expr);
}
assert_or_internal_err!(
expr.is_empty(),
"{self:?} should have no exprs, got {:?}",
expr
);
Ok(())
}

/// Helper for [Self::with_new_exprs] to use when no inputs are expected.
#[inline]
#[expect(clippy::needless_pass_by_value)] // inputs is moved intentionally to ensure it's not used again
fn assert_no_inputs(&self, inputs: Vec<LogicalPlan>) -> Result<()> {
if !inputs.is_empty() {
return internal_err!("{self:?} should have no inputs, got: {:?}", inputs);
}
assert_or_internal_err!(
inputs.is_empty(),
"{self:?} should have no inputs, got: {:?}",
inputs
);
Ok(())
}

/// Helper for [Self::with_new_exprs] to use when exactly one expression is expected.
#[inline]
fn only_expr(&self, mut expr: Vec<Expr>) -> Result<Expr> {
if expr.len() != 1 {
return internal_err!(
"{self:?} should have exactly one expr, got {:?}",
expr
);
}
assert_eq_or_internal_err!(
expr.len(),
1,
"{self:?} should have exactly one expr, got {:?}",
&expr
);
Ok(expr.remove(0))
}

/// Helper for [Self::with_new_exprs] to use when exactly one input is expected.
#[inline]
fn only_input(&self, mut inputs: Vec<LogicalPlan>) -> Result<LogicalPlan> {
if inputs.len() != 1 {
return internal_err!(
"{self:?} should have exactly one input, got {:?}",
inputs
);
}
assert_eq_or_internal_err!(
inputs.len(),
1,
"{self:?} should have exactly one input, got {:?}",
&inputs
);
Ok(inputs.remove(0))
}

Expand All @@ -1204,12 +1208,12 @@ impl LogicalPlan {
&self,
mut inputs: Vec<LogicalPlan>,
) -> Result<(LogicalPlan, LogicalPlan)> {
if inputs.len() != 2 {
return internal_err!(
"{self:?} should have exactly two inputs, got {:?}",
inputs
);
}
assert_eq_or_internal_err!(
inputs.len(),
2,
"{self:?} should have exactly two inputs, got {:?}",
&inputs
);
let right = inputs.remove(1);
let left = inputs.remove(0);
Ok((left, right))
Expand Down
21 changes: 13 additions & 8 deletions datafusion/expr/src/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ use crate::udf_eq::UdfEq;
use crate::{ColumnarValue, Documentation, Expr, Signature};
use arrow::datatypes::{DataType, Field, FieldRef};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{not_impl_err, ExprSchema, Result, ScalarValue};
use datafusion_common::{
assert_or_internal_err, not_impl_err, DataFusionError, ExprSchema, Result,
ScalarValue,
};
use datafusion_expr_common::dyn_eq::{DynEq, DynHash};
use datafusion_expr_common::interval_arithmetic::Interval;
use std::any::Any;
Expand Down Expand Up @@ -240,13 +243,15 @@ impl ScalarUDF {
// This doesn't use debug_assert!, but it's meant to run anywhere except on production. It's same in spirit, thus conditioning on debug_assertions.
#[cfg(debug_assertions)]
{
if &result.data_type() != return_field.data_type() {
return datafusion_common::internal_err!("Function '{}' returned value of type '{:?}' while the following type was promised at planning time and expected: '{:?}'",
self.name(),
result.data_type(),
return_field.data_type()
);
}
let result_data_type = result.data_type();
let expected_type = return_field.data_type();
assert_or_internal_err!(
result_data_type == *expected_type,
"Function '{}' returned value of type '{:?}' while the following type was promised at planning time and expected: '{:?}'",
self.name(),
result_data_type,
expected_type
);
// TODO verify return data is non-null when it was promised to be?
}
Ok(result)
Expand Down