diff --git a/datafusion/physical-optimizer/src/coalesce_batches.rs b/datafusion/physical-optimizer/src/coalesce_batches.rs index bde49e71305b..61e4c0e7f180 100644 --- a/datafusion/physical-optimizer/src/coalesce_batches.rs +++ b/datafusion/physical-optimizer/src/coalesce_batches.rs @@ -23,7 +23,9 @@ use crate::PhysicalOptimizerRule; use std::sync::Arc; use datafusion_common::error::Result; -use datafusion_common::{config::ConfigOptions, internal_err}; +use datafusion_common::{ + assert_eq_or_internal_err, config::ConfigOptions, DataFusionError, +}; use datafusion_physical_expr::Partitioning; use datafusion_physical_plan::{ async_func::AsyncFuncExec, coalesce_batches::CoalesceBatchesExec, @@ -76,11 +78,11 @@ impl PhysicalOptimizerRule for CoalesceBatches { } else if let Some(async_exec) = plan_any.downcast_ref::() { // Coalesce inputs to async functions to reduce number of async function invocations let children = async_exec.children(); - if children.len() != 1 { - return internal_err!( - "Expected AsyncFuncExec to have exactly one child" - ); - } + assert_eq_or_internal_err!( + children.len(), + 1, + "Expected AsyncFuncExec to have exactly one child" + ); let coalesce_exec = Arc::new(CoalesceBatchesExec::new( Arc::clone(children[0]), diff --git a/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs b/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs index b536e7960208..49c66e0ab244 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs @@ -27,7 +27,7 @@ use crate::utils::{ use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::Transformed; -use datafusion_common::{internal_err, Result}; +use datafusion_common::{assert_or_internal_err, DataFusionError, Result}; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::execution_plan::EmissionType; use datafusion_physical_plan::repartition::RepartitionExec; @@ -139,16 +139,21 @@ pub fn plan_with_order_preserving_variants( if let Some(ordering) = child.output_ordering() { let mut fetch = fetch; if let Some(coalesce_fetch) = sort_input.plan.fetch() { - if let Some(sort_fetch) = fetch { - if coalesce_fetch < sort_fetch { - return internal_err!( - "CoalescePartitionsExec fetch [{:?}] should be greater than or equal to SortExec fetch [{:?}]", coalesce_fetch, sort_fetch + fetch = match fetch { + Some(sort_fetch) => { + assert_or_internal_err!( + coalesce_fetch >= sort_fetch, + "CoalescePartitionsExec fetch [{:?}] should be greater than or equal to SortExec fetch [{:?}]", + coalesce_fetch, + sort_fetch ); + Some(sort_fetch) } - } else { - // If the sort node does not have a fetch, we need to keep the coalesce node's fetch. - fetch = Some(coalesce_fetch); - } + None => { + // If the sort node does not have a fetch, we need to keep the coalesce node's fetch. + Some(coalesce_fetch) + } + }; }; // When the input of a `CoalescePartitionsExec` has an ordering, // replace it with a `SortPreservingMergeExec` if appropriate: diff --git a/datafusion/physical-optimizer/src/filter_pushdown.rs b/datafusion/physical-optimizer/src/filter_pushdown.rs index df44225159e3..8bed6c3aeba0 100644 --- a/datafusion/physical-optimizer/src/filter_pushdown.rs +++ b/datafusion/physical-optimizer/src/filter_pushdown.rs @@ -36,7 +36,9 @@ use std::sync::Arc; use crate::PhysicalOptimizerRule; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; -use datafusion_common::{config::ConfigOptions, internal_err, Result}; +use datafusion_common::{ + assert_eq_or_internal_err, config::ConfigOptions, DataFusionError, Result, +}; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr_common::physical_expr::is_volatile; use datafusion_physical_plan::filter_pushdown::{ @@ -461,22 +463,18 @@ fn push_down_filters( let filter_description_parent_filters = filter_description.parent_filters(); let filter_description_self_filters = filter_description.self_filters(); - if filter_description_parent_filters.len() != children.len() { - return internal_err!( - "Filter pushdown expected FilterDescription to have parent filters for {}, but got {} for node {}", - children.len(), - filter_description_parent_filters.len(), - node.name() - ); - } - if filter_description_self_filters.len() != children.len() { - return internal_err!( - "Filter pushdown expected FilterDescription to have self filters for {}, but got {} for node {}", - children.len(), - filter_description_self_filters.len(), - node.name() - ); - } + assert_eq_or_internal_err!( + filter_description_parent_filters.len(), + children.len(), + "Filter pushdown expected parent filters count to match number of children for node {}", + node.name() + ); + assert_eq_or_internal_err!( + filter_description_self_filters.len(), + children.len(), + "Filter pushdown expected self filters count to match number of children for node {}", + node.name() + ); for (child_idx, (child, parent_filters, self_filters)) in izip!( children, @@ -525,15 +523,12 @@ fn push_down_filters( // from our parents and filters that the current node injected. We need to de-entangle // this since we do need to distinguish between them. let mut all_filters = result.filters.into_iter().collect_vec(); - if all_filters.len() != num_self_filters + num_parent_filters { - return internal_err!( - "Filter pushdown did not return the expected number of filters: expected {} self filters and {} parent filters, but got {}. Likely culprit is {}", - num_self_filters, - num_parent_filters, - all_filters.len(), - child.name() - ); - } + assert_eq_or_internal_err!( + all_filters.len(), + num_self_filters + num_parent_filters, + "Filter pushdown did not return the expected number of filters from {}", + child.name() + ); let parent_filters = all_filters .split_off(num_self_filters) .into_iter()