Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,7 @@ mod tests {
use super::*;

use crate::datasource::file_format::parquet::test_util::store_parquet;
use crate::physical_plan::file_format::get_scan_files;
use crate::physical_plan::metrics::MetricValue;
use crate::prelude::{SessionConfig, SessionContext};
use arrow::array::{Array, ArrayRef, StringArray};
Expand Down Expand Up @@ -1215,6 +1216,25 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_get_scan_files() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let projection = Some(vec![9]);
let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?;
let scan_files = get_scan_files(exec)?;
assert_eq!(scan_files.len(), 1);
assert_eq!(scan_files[0].len(), 1);
assert_eq!(scan_files[0][0].len(), 1);
assert!(scan_files[0][0][0]
.object_meta
.location
.to_string()
.contains("alltypes_plain.parquet"));

Ok(())
}

fn check_page_index_validation(
page_index: Option<&ParquetColumnIndex>,
offset_index: Option<&ParquetOffsetIndex>,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{
coalesce_batches::CoalesceBatchesExec, filter::FilterExec, joins::HashJoinExec,
repartition::RepartitionExec, rewrite::TreeNodeRewritable, Partitioning,
repartition::RepartitionExec, tree_node::TreeNodeRewritable, Partitioning,
},
};
use std::sync::Arc;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/dist_enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use crate::physical_plan::joins::{
};
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::SortOptions;
use crate::physical_plan::tree_node::TreeNodeRewritable;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::Partitioning;
use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use std::sync::Arc;
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::tree_node::TreeNodeRewritable;
use crate::physical_plan::ExecutionPlan;

/// Currently for a sort operator, if
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::physical_plan::{ExecutionPlan, PhysicalExpr};

use super::optimizer::PhysicalOptimizerRule;
use crate::error::Result;
use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::tree_node::TreeNodeRewritable;

/// For hash join with the partition mode [PartitionMode::Auto], JoinSelection rule will make
/// a cost based decision to select which PartitionMode mode(Partitioned/CollectLeft) is optimal
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/pipeline_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::tree_node::TreeNodeRewritable;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use std::sync::Arc;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/pipeline_fixer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::physical_plan::joins::{
convert_sort_expr_with_filter_schema, HashJoinExec, PartitionMode,
SymmetricHashJoinExec,
};
use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::tree_node::TreeNodeRewritable;
use crate::physical_plan::ExecutionPlan;
use datafusion_common::DataFusionError;
use datafusion_expr::logical_plan::JoinType;
Expand Down
25 changes: 6 additions & 19 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use arrow::{
record_batch::RecordBatch,
};
use datafusion_common::{downcast_value, ScalarValue};
use datafusion_physical_expr::rewrite::{TreeNodeRewritable, TreeNodeRewriter};
use datafusion_physical_expr::rewrite::TreeNodeRewritable;
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef};
use log::trace;
Expand Down Expand Up @@ -643,28 +643,15 @@ fn rewrite_column_expr(
column_old: &phys_expr::Column,
column_new: &phys_expr::Column,
) -> Result<Arc<dyn PhysicalExpr>> {
let mut rewriter = RewriteColumnExpr {
column_old,
column_new,
};
e.transform_using(&mut rewriter)
}

struct RewriteColumnExpr<'a> {
column_old: &'a phys_expr::Column,
column_new: &'a phys_expr::Column,
}

impl<'a> TreeNodeRewriter<Arc<dyn PhysicalExpr>> for RewriteColumnExpr<'a> {
fn mutate(&mut self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
e.transform(&|expr| {
if let Some(column) = expr.as_any().downcast_ref::<phys_expr::Column>() {
if column == self.column_old {
return Ok(Arc::new(self.column_new.clone()));
if column == column_old {
return Ok(Some(Arc::new(column_new.clone())));
}
}

Ok(expr)
}
Ok(None)
})
}

fn reverse_operator(op: Operator) -> Result<Operator> {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/sort_enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ use crate::physical_optimizer::utils::add_sort_above;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::tree_node::TreeNodeRewritable;
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/physical_plan/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ impl NdJsonExec {
file_compression_type,
}
}

/// Ref to the base configs
pub fn base_config(&self) -> &FileScanConfig {
&self.base_config
}
}

impl ExecutionPlan for NdJsonExec {
Expand Down
48 changes: 48 additions & 0 deletions datafusion/core/src/physical_plan/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ use crate::datasource::{
listing::{FileRange, PartitionedFile},
object_store::ObjectStoreUrl,
};
use crate::physical_plan::tree_node::{
TreeNodeVisitable, TreeNodeVisitor, VisitRecursion,
};
use crate::physical_plan::ExecutionPlan;
use crate::{
error::{DataFusionError, Result},
scalar::ScalarValue,
Expand All @@ -68,6 +72,50 @@ pub fn partition_type_wrap(val_type: DataType) -> DataType {
DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type))
}

/// Get all of the [`PartitionedFile`] to be scanned for an [`ExecutionPlan`]
pub fn get_scan_files(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Vec<Vec<Vec<PartitionedFile>>>> {
let mut collector = FileScanCollector::new();
plan.accept(&mut collector)?;
Ok(collector.file_groups)
}

struct FileScanCollector {
file_groups: Vec<Vec<Vec<PartitionedFile>>>,
}

impl FileScanCollector {
fn new() -> Self {
Self {
file_groups: vec![],
}
}
}

impl TreeNodeVisitor for FileScanCollector {
type N = Arc<dyn ExecutionPlan>;

fn pre_visit(&mut self, node: &Self::N) -> Result<VisitRecursion> {
let plan_any = node.as_any();
let file_groups =
if let Some(parquet_exec) = plan_any.downcast_ref::<ParquetExec>() {
parquet_exec.base_config().file_groups.clone()
} else if let Some(avro_exec) = plan_any.downcast_ref::<AvroExec>() {
avro_exec.base_config().file_groups.clone()
} else if let Some(json_exec) = plan_any.downcast_ref::<NdJsonExec>() {
json_exec.base_config().file_groups.clone()
} else if let Some(csv_exec) = plan_any.downcast_ref::<CsvExec>() {
csv_exec.base_config().file_groups.clone()
} else {
return Ok(VisitRecursion::Continue);
};

self.file_groups.push(file_groups);
Ok(VisitRecursion::Stop)
}
}

/// The base configurations to provide when creating a physical plan for
/// any given file format.
#[derive(Debug, Clone)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,16 @@ impl<'a> TreeNodeRewriter<Arc<dyn PhysicalExpr>> for FilterCandidateBuilder<'a>

if DataType::is_nested(self.file_schema.field(idx).data_type()) {
self.non_primitive_columns = true;
return Ok(RewriteRecursion::Stop);
}
} else if self.table_schema.index_of(column.name()).is_err() {
// If the column does not exist in the (un-projected) table schema then
// it must be a projected column.
self.projected_columns = true;
return Ok(RewriteRecursion::Stop);
}
}

Ok(RewriteRecursion::Continue)
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -653,10 +653,10 @@ pub mod metrics;
pub mod planner;
pub mod projection;
pub mod repartition;
pub mod rewrite;
pub mod sorts;
pub mod stream;
pub mod streaming;
pub mod tree_node;
pub mod udaf;
pub mod union;
pub mod unnest;
Expand Down
Loading