diff --git a/datafusion-examples/examples/csv_opener.rs b/datafusion-examples/examples/csv_opener.rs index 351f95cd2ccc..f2982522a7cd 100644 --- a/datafusion-examples/examples/csv_opener.rs +++ b/datafusion-examples/examples/csv_opener.rs @@ -20,14 +20,13 @@ use std::{sync::Arc, vec}; use datafusion::{ assert_batches_eq, datasource::{ - file_format::file_type::FileCompressionType, listing::PartitionedFile, + file_format::file_type::FileCompressionType, + listing::PartitionedFile, object_store::ObjectStoreUrl, + physical_plan::{CsvConfig, CsvOpener, FileScanConfig, FileStream}, }, error::Result, - physical_plan::{ - file_format::{CsvConfig, CsvOpener, FileScanConfig, FileStream}, - metrics::ExecutionPlanMetricsSet, - }, + physical_plan::metrics::ExecutionPlanMetricsSet, test_util::aggr_test_schema, }; use futures::StreamExt; diff --git a/datafusion-examples/examples/json_opener.rs b/datafusion-examples/examples/json_opener.rs index 6c7d5ae3d72f..39013455da35 100644 --- a/datafusion-examples/examples/json_opener.rs +++ b/datafusion-examples/examples/json_opener.rs @@ -21,14 +21,13 @@ use arrow_schema::{DataType, Field, Schema}; use datafusion::{ assert_batches_eq, datasource::{ - file_format::file_type::FileCompressionType, listing::PartitionedFile, + file_format::file_type::FileCompressionType, + listing::PartitionedFile, object_store::ObjectStoreUrl, + physical_plan::{FileScanConfig, FileStream, JsonOpener}, }, error::Result, - physical_plan::{ - file_format::{FileScanConfig, FileStream, JsonOpener}, - metrics::ExecutionPlanMetricsSet, - }, + physical_plan::metrics::ExecutionPlanMetricsSet, }; use futures::StreamExt; use object_store::ObjectStore; diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 760ce7c0f59b..1b285107b38a 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -38,6 +38,7 @@ use crate::arrow::datatypes::Schema; use crate::arrow::datatypes::SchemaRef; use crate::arrow::record_batch::RecordBatch; use crate::arrow::util::pretty; +use crate::datasource::physical_plan::{plan_to_csv, plan_to_json, plan_to_parquet}; use crate::datasource::{provider_as_source, MemTable, TableProvider}; use crate::error::Result; use crate::execution::{ @@ -48,7 +49,6 @@ use crate::logical_expr::{ col, utils::find_window_exprs, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Partitioning, TableType, }; -use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet}; use crate::physical_plan::SendableRecordBatchStream; use crate::physical_plan::{collect, collect_partitioned}; use crate::physical_plan::{execute_stream, execute_stream_partitioned, ExecutionPlan}; diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 2a27468f4591..2b3ef7ee4eab 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -20,9 +20,9 @@ //! Works with files following the [Arrow IPC format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format) use crate::datasource::file_format::FileFormat; +use crate::datasource::physical_plan::{ArrowExec, FileScanConfig}; use crate::error::Result; use crate::execution::context::SessionState; -use crate::physical_plan::file_format::{ArrowExec, FileScanConfig}; use crate::physical_plan::ExecutionPlan; use arrow::ipc::reader::FileReader; use arrow_schema::{Schema, SchemaRef}; diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs index 374ef18970f0..ab9f1f5dd000 100644 --- a/datafusion/core/src/datasource/file_format/avro.rs +++ b/datafusion/core/src/datasource/file_format/avro.rs @@ -28,9 +28,9 @@ use object_store::{GetResult, ObjectMeta, ObjectStore}; use super::FileFormat; use crate::avro_to_arrow::read_avro_schema_from_reader; +use crate::datasource::physical_plan::{AvroExec, FileScanConfig}; use crate::error::Result; use crate::execution::context::SessionState; -use crate::physical_plan::file_format::{AvroExec, FileScanConfig}; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::Statistics; diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 69d1c0089f1e..01bf76ccf48d 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -45,11 +45,11 @@ use crate::datasource::file_format::{ AbortMode, AbortableWrite, AsyncPutWriter, BatchSerializer, MultiPart, DEFAULT_SCHEMA_INFER_MAX_RECORD, }; -use crate::error::Result; -use crate::execution::context::SessionState; -use crate::physical_plan::file_format::{ +use crate::datasource::physical_plan::{ CsvExec, FileGroupDisplay, FileMeta, FileScanConfig, FileSinkConfig, }; +use crate::error::Result; +use crate::execution::context::SessionState; use crate::physical_plan::insert::{DataSink, InsertExec}; use crate::physical_plan::Statistics; use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 21cd22f0701a..6247e85ba879 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -36,9 +36,9 @@ use super::FileFormat; use super::FileScanConfig; use crate::datasource::file_format::file_type::FileCompressionType; use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD; +use crate::datasource::physical_plan::NdJsonExec; use crate::error::Result; use crate::execution::context::SessionState; -use crate::physical_plan::file_format::NdJsonExec; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::Statistics; diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 71bd8f1c07b8..a6848b0d122d 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -36,9 +36,9 @@ use std::task::{Context, Poll}; use std::{fmt, mem}; use crate::arrow::datatypes::SchemaRef; +use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig}; use crate::error::Result; use crate::execution::context::SessionState; -use crate::physical_plan::file_format::{FileScanConfig, FileSinkConfig}; use crate::physical_plan::{ExecutionPlan, Statistics}; use arrow_array::RecordBatch; diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 6957f367c9d2..875c58ae447e 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -42,11 +42,11 @@ use crate::arrow::array::{ use crate::arrow::datatypes::DataType; use crate::config::ConfigOptions; +use crate::datasource::physical_plan::{ParquetExec, SchemaAdapter}; use crate::datasource::{create_max_min_accs, get_col_stats}; use crate::error::Result; use crate::execution::context::SessionState; use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; -use crate::physical_plan::file_format::{ParquetExec, SchemaAdapter}; use crate::physical_plan::{Accumulator, ExecutionPlan, Statistics}; /// The default file extension of parquet files @@ -379,7 +379,7 @@ fn summarize_min_max( /// This component is a subject to **change** in near future and is exposed for low level integrations /// through [`ParquetFileReaderFactory`]. /// -/// [`ParquetFileReaderFactory`]: crate::physical_plan::file_format::ParquetFileReaderFactory +/// [`ParquetFileReaderFactory`]: crate::datasource::physical_plan::ParquetFileReaderFactory pub async fn fetch_parquet_metadata( store: &dyn ObjectStore, meta: &ObjectMeta, @@ -623,7 +623,7 @@ mod tests { use super::*; use crate::datasource::file_format::parquet::test_util::store_parquet; - use crate::physical_plan::file_format::get_scan_files; + use crate::datasource::physical_plan::get_scan_files; use crate::physical_plan::metrics::MetricValue; use crate::prelude::{SessionConfig, SessionContext}; use arrow::array::{Array, ArrayRef, StringArray}; diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index a434a081e8b6..427cfc8501b3 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -62,8 +62,8 @@ pub struct PartitionedFile { /// You may use [`wrap_partition_value_in_dict`] to wrap them if you have used [`wrap_partition_type_in_dict`] to wrap the column type. /// /// - /// [`wrap_partition_type_in_dict`]: crate::physical_plan::file_format::wrap_partition_type_in_dict - /// [`wrap_partition_value_in_dict`]: crate::physical_plan::file_format::wrap_partition_value_in_dict + /// [`wrap_partition_type_in_dict`]: crate::datasource::physical_plan::wrap_partition_type_in_dict + /// [`wrap_partition_value_in_dict`]: crate::datasource::physical_plan::wrap_partition_value_in_dict /// [`table_partition_cols`]: table::ListingOptions::table_partition_cols pub partition_values: Vec, /// An optional file range for a more fine-grained parallel execution diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index fd316a74b282..b11fa8b06377 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -33,6 +33,7 @@ use object_store::path::Path; use object_store::ObjectMeta; use crate::datasource::file_format::file_type::{FileCompressionType, FileType}; +use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig}; use crate::datasource::{ file_format::{ arrow::ArrowFormat, avro::AvroFormat, csv::CsvFormat, json::JsonFormat, @@ -44,7 +45,6 @@ use crate::datasource::{ }; use crate::logical_expr::TableProviderFilterPushDown; use crate::physical_plan; -use crate::physical_plan::file_format::{FileScanConfig, FileSinkConfig}; use crate::{ error::{DataFusionError, Result}, execution::context::SessionState, @@ -357,7 +357,7 @@ impl ListingOptions { /// ``` /// /// [Hive Partitioning]: https://docs.cloudera.com/HDPDocuments/HDP2/HDP-2.1.3/bk_system-admin-guide/content/hive_partitioned_tables.html - /// [`wrap_partition_type_in_dict`]: crate::physical_plan::file_format::wrap_partition_type_in_dict + /// [`wrap_partition_type_in_dict`]: crate::datasource::physical_plan::wrap_partition_type_in_dict pub fn with_table_partition_cols( mut self, table_partition_cols: Vec<(String, DataType)>, diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index dba4112b1e56..683afb7902e5 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -28,6 +28,7 @@ pub mod file_format; pub mod listing; pub mod listing_table_factory; pub mod memory; +pub mod physical_plan; pub mod streaming; pub mod view; diff --git a/datafusion/core/src/physical_plan/file_format/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs similarity index 99% rename from datafusion/core/src/physical_plan/file_format/arrow_file.rs rename to datafusion/core/src/datasource/physical_plan/arrow_file.rs index 72a6d0a0b489..43074ccb77c1 100644 --- a/datafusion/core/src/physical_plan/file_format/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -16,10 +16,10 @@ // under the License. //! Execution plan for reading Arrow files -use crate::error::Result; -use crate::physical_plan::file_format::{ +use crate::datasource::physical_plan::{ FileMeta, FileOpenFuture, FileOpener, FileScanConfig, }; +use crate::error::Result; use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::physical_plan::{ ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan, diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs similarity index 98% rename from datafusion/core/src/physical_plan/file_format/avro.rs rename to datafusion/core/src/datasource/physical_plan/avro.rs index 9adf492d7da3..704a97ba7e88 100644 --- a/datafusion/core/src/physical_plan/file_format/avro.rs +++ b/datafusion/core/src/datasource/physical_plan/avro.rs @@ -165,8 +165,8 @@ impl ExecutionPlan for AvroExec { #[cfg(feature = "avro")] mod private { use super::*; - use crate::physical_plan::file_format::file_stream::{FileOpenFuture, FileOpener}; - use crate::physical_plan::file_format::FileMeta; + use crate::datasource::physical_plan::file_stream::{FileOpenFuture, FileOpener}; + use crate::datasource::physical_plan::FileMeta; use bytes::Buf; use futures::StreamExt; use object_store::{GetResult, ObjectStore}; @@ -222,7 +222,7 @@ mod tests { use crate::datasource::file_format::{avro::AvroFormat, FileFormat}; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; - use crate::physical_plan::file_format::chunked_store::ChunkedStore; + use crate::datasource::physical_plan::chunked_store::ChunkedStore; use crate::prelude::SessionContext; use crate::scalar::ScalarValue; use crate::test::object_store::local_unpartitioned_file; diff --git a/datafusion/core/src/physical_plan/file_format/chunked_store.rs b/datafusion/core/src/datasource/physical_plan/chunked_store.rs similarity index 100% rename from datafusion/core/src/physical_plan/file_format/chunked_store.rs rename to datafusion/core/src/datasource/physical_plan/chunked_store.rs diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs similarity index 99% rename from datafusion/core/src/physical_plan/file_format/csv.rs rename to datafusion/core/src/datasource/physical_plan/csv.rs index cbdd626f0e55..d2c76ecaf5ea 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -18,13 +18,13 @@ //! Execution plan for reading CSV files use crate::datasource::file_format::file_type::FileCompressionType; +use crate::datasource::physical_plan::file_stream::{ + FileOpenFuture, FileOpener, FileStream, +}; +use crate::datasource::physical_plan::FileMeta; use crate::error::{DataFusionError, Result}; use crate::physical_plan::common::AbortOnDropSingle; use crate::physical_plan::expressions::PhysicalSortExpr; -use crate::physical_plan::file_format::file_stream::{ - FileOpenFuture, FileOpener, FileStream, -}; -use crate::physical_plan::file_format::FileMeta; use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::physical_plan::{ ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan, @@ -369,7 +369,7 @@ pub async fn plan_to_csv( mod tests { use super::*; use crate::datasource::file_format::file_type::FileType; - use crate::physical_plan::file_format::chunked_store::ChunkedStore; + use crate::datasource::physical_plan::chunked_store::ChunkedStore; use crate::prelude::*; use crate::test::{partitioned_csv_config, partitioned_file_groups}; use crate::test_util::{aggr_test_schema_with_missing_col, arrow_test_data}; diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/datasource/physical_plan/file_stream.rs similarity index 99% rename from datafusion/core/src/physical_plan/file_format/file_stream.rs rename to datafusion/core/src/datasource/physical_plan/file_stream.rs index 0d1dc1bee225..2c4437de0a92 100644 --- a/datafusion/core/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs @@ -28,10 +28,10 @@ use std::task::{Context, Poll}; use std::time::Instant; use crate::datasource::listing::PartitionedFile; -use crate::error::Result; -use crate::physical_plan::file_format::{ +use crate::datasource::physical_plan::{ FileMeta, FileScanConfig, PartitionColumnProjector, }; +use crate::error::Result; use crate::physical_plan::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time, }; @@ -524,7 +524,7 @@ mod tests { use super::*; use crate::datasource::file_format::BatchSerializer; use crate::datasource::object_store::ObjectStoreUrl; - use crate::physical_plan::file_format::FileMeta; + use crate::datasource::physical_plan::FileMeta; use crate::physical_plan::metrics::ExecutionPlanMetricsSet; use crate::prelude::SessionContext; use crate::{ diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs similarity index 99% rename from datafusion/core/src/physical_plan/file_format/json.rs rename to datafusion/core/src/datasource/physical_plan/json.rs index 10f249d4e735..8340c282a01e 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/datasource/physical_plan/json.rs @@ -17,13 +17,13 @@ //! Execution plan for reading line-delimited JSON files use crate::datasource::file_format::file_type::FileCompressionType; +use crate::datasource::physical_plan::file_stream::{ + FileOpenFuture, FileOpener, FileStream, +}; +use crate::datasource::physical_plan::FileMeta; use crate::error::{DataFusionError, Result}; use crate::physical_plan::common::AbortOnDropSingle; use crate::physical_plan::expressions::PhysicalSortExpr; -use crate::physical_plan::file_format::file_stream::{ - FileOpenFuture, FileOpener, FileStream, -}; -use crate::physical_plan::file_format::FileMeta; use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::physical_plan::{ ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan, @@ -308,8 +308,8 @@ mod tests { use crate::datasource::file_format::{json::JsonFormat, FileFormat}; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; + use crate::datasource::physical_plan::chunked_store::ChunkedStore; use crate::execution::context::SessionState; - use crate::physical_plan::file_format::chunked_store::ChunkedStore; use crate::prelude::NdJsonReadOptions; use crate::prelude::*; use crate::test::partitioned_file_groups; diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs similarity index 100% rename from datafusion/core/src/physical_plan/file_format/mod.rs rename to datafusion/core/src/datasource/physical_plan/mod.rs diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs similarity index 99% rename from datafusion/core/src/physical_plan/file_format/parquet.rs rename to datafusion/core/src/datasource/physical_plan/parquet.rs index a0c7402cc462..383a2066fc41 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -17,18 +17,12 @@ //! Execution plan for reading Parquet files -use fmt::Debug; -use std::any::Any; -use std::cmp::min; -use std::fmt; -use std::fs; -use std::ops::Range; -use std::sync::Arc; - -use crate::physical_plan::file_format::file_stream::{ +use crate::datasource::physical_plan::file_stream::{ FileOpenFuture, FileOpener, FileStream, }; -use crate::physical_plan::file_format::parquet::page_filter::PagePruningPredicate; +use crate::datasource::physical_plan::{ + parquet::page_filter::PagePruningPredicate, FileMeta, FileScanConfig, SchemaAdapter, +}; use crate::{ config::ConfigOptions, datasource::listing::FileRange, @@ -37,13 +31,19 @@ use crate::{ physical_optimizer::pruning::PruningPredicate, physical_plan::{ common::AbortOnDropSingle, - expressions::PhysicalSortExpr, - file_format::{FileMeta, FileScanConfig, SchemaAdapter}, metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }, }; +use datafusion_physical_expr::PhysicalSortExpr; +use fmt::Debug; +use std::any::Any; +use std::cmp::min; +use std::fmt; +use std::fs; +use std::ops::Range; +use std::sync::Arc; use arrow::datatypes::{DataType, SchemaRef}; use arrow::error::ArrowError; diff --git a/datafusion/core/src/physical_plan/file_format/parquet/metrics.rs b/datafusion/core/src/datasource/physical_plan/parquet/metrics.rs similarity index 100% rename from datafusion/core/src/physical_plan/file_format/parquet/metrics.rs rename to datafusion/core/src/datasource/physical_plan/parquet/metrics.rs diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs similarity index 99% rename from datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs rename to datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs index 00e55c41ad09..c046de73d79c 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs @@ -39,10 +39,10 @@ use parquet::{ }; use std::sync::Arc; -use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; -use crate::physical_plan::file_format::parquet::{ +use crate::datasource::physical_plan::parquet::{ from_bytes_to_i128, parquet_to_arrow_decimal_type, }; +use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use super::metrics::ParquetFileMetrics; diff --git a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs similarity index 100% rename from datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs rename to datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs diff --git a/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs similarity index 99% rename from datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs rename to datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index 0cbb5d9465a9..07ef28304cce 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -27,7 +27,7 @@ use parquet::file::{ metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics, }; -use crate::physical_plan::file_format::parquet::{ +use crate::datasource::physical_plan::parquet::{ from_bytes_to_i128, parquet_to_arrow_decimal_type, }; use crate::{ diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 078093189c4c..7f74e285117e 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -80,9 +80,9 @@ use crate::physical_optimizer::coalesce_batches::CoalesceBatches; use crate::physical_optimizer::repartition::Repartition; use crate::config::ConfigOptions; +use crate::datasource::physical_plan::{plan_to_csv, plan_to_json, plan_to_parquet}; use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry}; use crate::physical_optimizer::dist_enforcement::EnforceDistribution; -use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet}; use crate::physical_plan::planner::DefaultPhysicalPlanner; use crate::physical_plan::udaf::AggregateUDF; use crate::physical_plan::udf::ScalarUDF; diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs index fdef7d54b85a..3ec9e9bbd048 100644 --- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs @@ -207,11 +207,11 @@ mod tests { use super::*; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; + use crate::datasource::physical_plan::{FileScanConfig, ParquetExec}; use crate::physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, }; use crate::physical_plan::expressions::lit; - use crate::physical_plan::file_format::{FileScanConfig, ParquetExec}; use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::{displayable, Partitioning, Statistics}; diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs b/datafusion/core/src/physical_optimizer/dist_enforcement.rs index 6c425b174090..4e456450bcb2 100644 --- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs @@ -977,12 +977,12 @@ mod tests { use super::*; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; + use crate::datasource::physical_plan::{FileScanConfig, ParquetExec}; use crate::physical_optimizer::sort_enforcement::EnforceSorting; use crate::physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, }; use crate::physical_plan::expressions::col; - use crate::physical_plan::file_format::{FileScanConfig, ParquetExec}; use crate::physical_plan::joins::{ utils::JoinOn, HashJoinExec, PartitionMode, SortMergeJoinExec, }; diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 7db54eee51c8..fb867ff36c62 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -21,11 +21,11 @@ use std::sync::Arc; use super::optimizer::PhysicalOptimizerRule; use crate::config::ConfigOptions; +use crate::datasource::physical_plan::ParquetExec; use crate::error::Result; use crate::physical_plan::Partitioning::*; use crate::physical_plan::{ - file_format::ParquetExec, repartition::RepartitionExec, - with_new_children_if_necessary, ExecutionPlan, + repartition::RepartitionExec, with_new_children_if_necessary, ExecutionPlan, }; /// Optimizer that introduces repartition to introduce more @@ -325,13 +325,13 @@ mod tests { use super::*; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; + use crate::datasource::physical_plan::{FileScanConfig, ParquetExec}; use crate::physical_optimizer::dist_enforcement::EnforceDistribution; use crate::physical_optimizer::sort_enforcement::EnforceSorting; use crate::physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, }; use crate::physical_plan::expressions::{col, PhysicalSortExpr}; - use crate::physical_plan::file_format::{FileScanConfig, ParquetExec}; use crate::physical_plan::filter::FilterExec; use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use crate::physical_plan::projection::ProjectionExec; diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index 22f7d509c864..8398b16c4ddf 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -967,10 +967,10 @@ mod tests { use super::*; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; + use crate::datasource::physical_plan::{FileScanConfig, ParquetExec}; use crate::physical_optimizer::dist_enforcement::EnforceDistribution; use crate::physical_plan::aggregates::PhysicalGroupBy; use crate::physical_plan::aggregates::{AggregateExec, AggregateMode}; - use crate::physical_plan::file_format::{FileScanConfig, ParquetExec}; use crate::physical_plan::filter::FilterExec; use crate::physical_plan::joins::utils::JoinOn; use crate::physical_plan::joins::SortMergeJoinExec; diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index deff619b4ffe..ce5dc041b99c 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -680,7 +680,6 @@ pub mod common; pub mod display; pub mod empty; pub mod explain; -pub mod file_format; pub mod filter; pub mod insert; pub mod joins; diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs index f773f3b54953..a17b8ba87e6a 100644 --- a/datafusion/core/src/physical_plan/windows/mod.rs +++ b/datafusion/core/src/physical_plan/windows/mod.rs @@ -289,9 +289,9 @@ pub(crate) fn window_ordering_equivalence( #[cfg(test)] mod tests { use super::*; + use crate::datasource::physical_plan::CsvExec; use crate::physical_plan::aggregates::AggregateFunction; use crate::physical_plan::expressions::col; - use crate::physical_plan::file_format::CsvExec; use crate::physical_plan::{collect, ExecutionPlan}; use crate::prelude::SessionContext; use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 6e4d038b4a6b..82b55063dcff 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -21,11 +21,11 @@ use crate::arrow::array::UInt32Array; use crate::datasource::file_format::file_type::{FileCompressionType, FileType}; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; +use crate::datasource::physical_plan::{CsvExec, FileScanConfig}; use crate::datasource::{MemTable, TableProvider}; use crate::error::Result; use crate::from_slice::FromSlice; use crate::logical_expr::LogicalPlan; -use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; use crate::physical_plan::memory::MemoryExec; use crate::physical_plan::ExecutionPlan; use crate::test::object_store::local_unpartitioned_file; diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index ed65c4122cd8..d3a1f9c1ef7c 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -26,11 +26,11 @@ use crate::common::ToDFSchema; use crate::config::ConfigOptions; use crate::datasource::listing::{ListingTableUrl, PartitionedFile}; use crate::datasource::object_store::ObjectStoreUrl; +use crate::datasource::physical_plan::{FileScanConfig, ParquetExec}; use crate::error::Result; use crate::optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext}; use crate::physical_expr::create_physical_expr; use crate::physical_expr::execution_props::ExecutionProps; -use crate::physical_plan::file_format::{FileScanConfig, ParquetExec}; use crate::physical_plan::filter::FilterExec; use crate::physical_plan::metrics::MetricsSet; use crate::physical_plan::ExecutionPlan; diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs index 302baca51678..7d73b4a61881 100644 --- a/datafusion/core/tests/parquet/custom_reader.rs +++ b/datafusion/core/tests/parquet/custom_reader.rs @@ -23,7 +23,7 @@ use datafusion::assert_batches_sorted_eq; use datafusion::datasource::file_format::parquet::fetch_parquet_metadata; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; -use datafusion::physical_plan::file_format::{ +use datafusion::datasource::physical_plan::{ FileMeta, FileScanConfig, ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory, }; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 3fbf41303263..031aab9f4555 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -29,11 +29,8 @@ use arrow::{ }; use chrono::{Datelike, Duration}; use datafusion::{ - datasource::{provider_as_source, TableProvider}, - physical_plan::{ - accept, file_format::ParquetExec, metrics::MetricsSet, ExecutionPlan, - ExecutionPlanVisitor, - }, + datasource::{physical_plan::ParquetExec, provider_as_source, TableProvider}, + physical_plan::{accept, metrics::MetricsSet, ExecutionPlan, ExecutionPlanVisitor}, prelude::{ParquetReadOptions, SessionConfig, SessionContext}, }; use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder}; diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 1c912883c784..3711184ccc7c 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -21,8 +21,8 @@ use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; +use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec}; use datafusion::execution::context::SessionState; -use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec}; use datafusion::physical_plan::metrics::MetricValue; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; diff --git a/datafusion/core/tests/parquet/schema_coercion.rs b/datafusion/core/tests/parquet/schema_coercion.rs index e7b1584e2155..4d028c6f1b31 100644 --- a/datafusion/core/tests/parquet/schema_coercion.rs +++ b/datafusion/core/tests/parquet/schema_coercion.rs @@ -21,8 +21,8 @@ use arrow_array::types::Int32Type; use arrow_array::{ArrayRef, DictionaryArray, Float32Array, Int64Array, StringArray}; use arrow_schema::DataType; use datafusion::assert_batches_sorted_eq; +use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec}; use datafusion::physical_plan::collect; -use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec}; use datafusion::prelude::SessionContext; use datafusion_common::Result; use datafusion_common::Statistics; diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs index f5cb0e480462..c68b422a4f06 100644 --- a/datafusion/core/tests/row.rs +++ b/datafusion/core/tests/row.rs @@ -18,9 +18,9 @@ use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; use datafusion::datasource::object_store::ObjectStoreUrl; +use datafusion::datasource::physical_plan::FileScanConfig; use datafusion::error::Result; use datafusion::execution::context::SessionState; -use datafusion::physical_plan::file_format::FileScanConfig; use datafusion::physical_plan::{collect, ExecutionPlan}; use datafusion::prelude::SessionContext; use datafusion_row::reader::read_as_batch; diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 42672e504901..7a52e5f0d09f 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -24,6 +24,7 @@ use chrono::Utc; use datafusion::arrow::datatypes::Schema; use datafusion::datasource::listing::{FileRange, PartitionedFile}; use datafusion::datasource::object_store::ObjectStoreUrl; +use datafusion::datasource::physical_plan::FileScanConfig; use datafusion::execution::context::ExecutionProps; use datafusion::execution::FunctionRegistry; use datafusion::logical_expr::window_function::WindowFunction; @@ -32,7 +33,6 @@ use datafusion::physical_plan::expressions::{ date_time_interval_expr, GetIndexedFieldExpr, }; use datafusion::physical_plan::expressions::{in_list, LikeExpr}; -use datafusion::physical_plan::file_format::FileScanConfig; use datafusion::physical_plan::{ expressions::{ BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr, IsNullExpr, Literal, diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 7385a4ac2146..3c14981355ec 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use datafusion::arrow::compute::SortOptions; use datafusion::arrow::datatypes::SchemaRef; use datafusion::datasource::file_format::file_type::FileCompressionType; +use datafusion::datasource::physical_plan::{AvroExec, CsvExec, ParquetExec}; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::execution::FunctionRegistry; use datafusion::logical_expr::WindowFrame; @@ -32,7 +33,6 @@ use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::explain::ExplainExec; use datafusion::physical_plan::expressions::{Column, PhysicalSortExpr}; -use datafusion::physical_plan::file_format::{AvroExec, CsvExec, ParquetExec}; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter}; use datafusion::physical_plan::joins::CrossJoinExec; @@ -1276,14 +1276,16 @@ mod roundtrip_tests { compute::kernels::sort::SortOptions, datatypes::{DataType, Field, Schema}, }, - datasource::listing::PartitionedFile, + datasource::{ + listing::PartitionedFile, + physical_plan::{FileScanConfig, ParquetExec}, + }, logical_expr::{JoinType, Operator}, physical_plan::{ aggregates::{AggregateExec, AggregateMode}, empty::EmptyExec, expressions::{binary, col, lit, NotExpr}, expressions::{Avg, Column, DistinctCount, PhysicalSortExpr}, - file_format::{FileScanConfig, ParquetExec}, filter::FilterExec, joins::{HashJoinExec, PartitionMode}, limit::{GlobalLimitExec, LocalLimitExec}, diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 766906d04a37..0910ddaad0c7 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -32,7 +32,7 @@ use datafusion::physical_plan::{ }; use datafusion::datasource::listing::{FileRange, PartitionedFile}; -use datafusion::physical_plan::file_format::FileScanConfig; +use datafusion::datasource::physical_plan::FileScanConfig; use datafusion::physical_plan::expressions::{Count, DistinctCount, Literal}; diff --git a/datafusion/substrait/src/physical_plan/consumer.rs b/datafusion/substrait/src/physical_plan/consumer.rs index 1de3937c5926..5d2f22b857e9 100644 --- a/datafusion/substrait/src/physical_plan/consumer.rs +++ b/datafusion/substrait/src/physical_plan/consumer.rs @@ -20,8 +20,8 @@ use chrono::DateTime; use datafusion::arrow::datatypes::Schema; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; +use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec}; use datafusion::error::{DataFusionError, Result}; -use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec}; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; use object_store::ObjectMeta; diff --git a/datafusion/substrait/src/physical_plan/producer.rs b/datafusion/substrait/src/physical_plan/producer.rs index c8d739ecdad5..ad87d7afb058 100644 --- a/datafusion/substrait/src/physical_plan/producer.rs +++ b/datafusion/substrait/src/physical_plan/producer.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. +use datafusion::datasource::physical_plan::ParquetExec; use datafusion::error::{DataFusionError, Result}; -use datafusion::physical_plan::file_format::ParquetExec; use datafusion::physical_plan::{displayable, ExecutionPlan}; use std::collections::HashMap; use substrait::proto::expression::MaskExpression; diff --git a/datafusion/substrait/tests/roundtrip_physical_plan.rs b/datafusion/substrait/tests/roundtrip_physical_plan.rs index ab77f19ea0cd..de549412b61f 100644 --- a/datafusion/substrait/tests/roundtrip_physical_plan.rs +++ b/datafusion/substrait/tests/roundtrip_physical_plan.rs @@ -20,8 +20,8 @@ mod tests { use datafusion::arrow::datatypes::Schema; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; + use datafusion::datasource::physical_plan::{FileScanConfig, ParquetExec}; use datafusion::error::Result; - use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec}; use datafusion::physical_plan::{displayable, ExecutionPlan}; use datafusion::prelude::SessionContext; use datafusion_substrait::physical_plan::{consumer, producer};