Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions datafusion/physical-optimizer/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use crate::PhysicalOptimizerRule;
use std::sync::Arc;

use datafusion_common::error::Result;
use datafusion_common::{config::ConfigOptions, internal_err};
use datafusion_common::{
assert_eq_or_internal_err, config::ConfigOptions, DataFusionError,
};
use datafusion_physical_expr::Partitioning;
use datafusion_physical_plan::{
async_func::AsyncFuncExec, coalesce_batches::CoalesceBatchesExec,
Expand Down Expand Up @@ -76,11 +78,11 @@ impl PhysicalOptimizerRule for CoalesceBatches {
} else if let Some(async_exec) = plan_any.downcast_ref::<AsyncFuncExec>() {
// Coalesce inputs to async functions to reduce number of async function invocations
let children = async_exec.children();
if children.len() != 1 {
return internal_err!(
"Expected AsyncFuncExec to have exactly one child"
);
}
assert_eq_or_internal_err!(
children.len(),
1,
"Expected AsyncFuncExec to have exactly one child"
);

let coalesce_exec = Arc::new(CoalesceBatchesExec::new(
Arc::clone(children[0]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::utils::{

use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::Transformed;
use datafusion_common::{internal_err, Result};
use datafusion_common::{assert_or_internal_err, DataFusionError, Result};
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::execution_plan::EmissionType;
use datafusion_physical_plan::repartition::RepartitionExec;
Expand Down Expand Up @@ -139,16 +139,21 @@ pub fn plan_with_order_preserving_variants(
if let Some(ordering) = child.output_ordering() {
let mut fetch = fetch;
if let Some(coalesce_fetch) = sort_input.plan.fetch() {
if let Some(sort_fetch) = fetch {
if coalesce_fetch < sort_fetch {
return internal_err!(
"CoalescePartitionsExec fetch [{:?}] should be greater than or equal to SortExec fetch [{:?}]", coalesce_fetch, sort_fetch
fetch = match fetch {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a little unsure about this block, would like a feedback.

Some(sort_fetch) => {
assert_or_internal_err!(
coalesce_fetch >= sort_fetch,
"CoalescePartitionsExec fetch [{:?}] should be greater than or equal to SortExec fetch [{:?}]",
coalesce_fetch,
sort_fetch
);
Some(sort_fetch)
}
} else {
// If the sort node does not have a fetch, we need to keep the coalesce node's fetch.
fetch = Some(coalesce_fetch);
}
None => {
// If the sort node does not have a fetch, we need to keep the coalesce node's fetch.
Some(coalesce_fetch)
}
};
};
// When the input of a `CoalescePartitionsExec` has an ordering,
// replace it with a `SortPreservingMergeExec` if appropriate:
Expand Down
47 changes: 21 additions & 26 deletions datafusion/physical-optimizer/src/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ use std::sync::Arc;
use crate::PhysicalOptimizerRule;

use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{config::ConfigOptions, internal_err, Result};
use datafusion_common::{
assert_eq_or_internal_err, config::ConfigOptions, DataFusionError, Result,
};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::physical_expr::is_volatile;
use datafusion_physical_plan::filter_pushdown::{
Expand Down Expand Up @@ -461,22 +463,18 @@ fn push_down_filters(

let filter_description_parent_filters = filter_description.parent_filters();
let filter_description_self_filters = filter_description.self_filters();
if filter_description_parent_filters.len() != children.len() {
return internal_err!(
"Filter pushdown expected FilterDescription to have parent filters for {}, but got {} for node {}",
children.len(),
filter_description_parent_filters.len(),
node.name()
);
}
if filter_description_self_filters.len() != children.len() {
return internal_err!(
"Filter pushdown expected FilterDescription to have self filters for {}, but got {} for node {}",
children.len(),
filter_description_self_filters.len(),
node.name()
);
}
assert_eq_or_internal_err!(
filter_description_parent_filters.len(),
children.len(),
"Filter pushdown expected parent filters count to match number of children for node {}",
node.name()
);
assert_eq_or_internal_err!(
filter_description_self_filters.len(),
children.len(),
"Filter pushdown expected self filters count to match number of children for node {}",
node.name()
);

for (child_idx, (child, parent_filters, self_filters)) in izip!(
children,
Expand Down Expand Up @@ -525,15 +523,12 @@ fn push_down_filters(
// from our parents and filters that the current node injected. We need to de-entangle
// this since we do need to distinguish between them.
let mut all_filters = result.filters.into_iter().collect_vec();
if all_filters.len() != num_self_filters + num_parent_filters {
return internal_err!(
"Filter pushdown did not return the expected number of filters: expected {} self filters and {} parent filters, but got {}. Likely culprit is {}",
num_self_filters,
num_parent_filters,
all_filters.len(),
child.name()
);
}
assert_eq_or_internal_err!(
all_filters.len(),
num_self_filters + num_parent_filters,
"Filter pushdown did not return the expected number of filters from {}",
child.name()
);
let parent_filters = all_filters
.split_off(num_self_filters)
.into_iter()
Expand Down