Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions datafusion-examples/examples/csv_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ use std::{sync::Arc, vec};
use datafusion::{
assert_batches_eq,
datasource::{
file_format::file_type::FileCompressionType, listing::PartitionedFile,
file_format::file_type::FileCompressionType,
listing::PartitionedFile,
object_store::ObjectStoreUrl,
physical_plan::{CsvConfig, CsvOpener, FileScanConfig, FileStream},
},
error::Result,
physical_plan::{
file_format::{CsvConfig, CsvOpener, FileScanConfig, FileStream},
metrics::ExecutionPlanMetricsSet,
},
physical_plan::metrics::ExecutionPlanMetricsSet,
test_util::aggr_test_schema,
};
use futures::StreamExt;
Expand Down
9 changes: 4 additions & 5 deletions datafusion-examples/examples/json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@ use arrow_schema::{DataType, Field, Schema};
use datafusion::{
assert_batches_eq,
datasource::{
file_format::file_type::FileCompressionType, listing::PartitionedFile,
file_format::file_type::FileCompressionType,
listing::PartitionedFile,
object_store::ObjectStoreUrl,
physical_plan::{FileScanConfig, FileStream, JsonOpener},
},
error::Result,
physical_plan::{
file_format::{FileScanConfig, FileStream, JsonOpener},
metrics::ExecutionPlanMetricsSet,
},
physical_plan::metrics::ExecutionPlanMetricsSet,
};
use futures::StreamExt;
use object_store::ObjectStore;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::arrow::datatypes::Schema;
use crate::arrow::datatypes::SchemaRef;
use crate::arrow::record_batch::RecordBatch;
use crate::arrow::util::pretty;
use crate::datasource::physical_plan::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::datasource::{provider_as_source, MemTable, TableProvider};
use crate::error::Result;
use crate::execution::{
Expand All @@ -48,7 +49,6 @@ use crate::logical_expr::{
col, utils::find_window_exprs, Expr, JoinType, LogicalPlan, LogicalPlanBuilder,
Partitioning, TableType,
};
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::physical_plan::SendableRecordBatchStream;
use crate::physical_plan::{collect, collect_partitioned};
use crate::physical_plan::{execute_stream, execute_stream_partitioned, ExecutionPlan};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
//! Works with files following the [Arrow IPC format](https://arrow.apache.org/docs/format/Columnar.html#ipc-file-format)

use crate::datasource::file_format::FileFormat;
use crate::datasource::physical_plan::{ArrowExec, FileScanConfig};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::file_format::{ArrowExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use arrow::ipc::reader::FileReader;
use arrow_schema::{Schema, SchemaRef};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ use object_store::{GetResult, ObjectMeta, ObjectStore};

use super::FileFormat;
use crate::avro_to_arrow::read_avro_schema_from_reader;
use crate::datasource::physical_plan::{AvroExec, FileScanConfig};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::file_format::{AvroExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;

Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ use crate::datasource::file_format::{
AbortMode, AbortableWrite, AsyncPutWriter, BatchSerializer, MultiPart,
DEFAULT_SCHEMA_INFER_MAX_RECORD,
};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::file_format::{
use crate::datasource::physical_plan::{
CsvExec, FileGroupDisplay, FileMeta, FileScanConfig, FileSinkConfig,
};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::insert::{DataSink, InsertExec};
use crate::physical_plan::Statistics;
use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ use super::FileFormat;
use super::FileScanConfig;
use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::physical_plan::NdJsonExec;
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::file_format::NdJsonExec;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ use std::task::{Context, Poll};
use std::{fmt, mem};

use crate::arrow::datatypes::SchemaRef;
use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::file_format::{FileScanConfig, FileSinkConfig};
use crate::physical_plan::{ExecutionPlan, Statistics};

use arrow_array::RecordBatch;
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ use crate::arrow::array::{
use crate::arrow::datatypes::DataType;
use crate::config::ConfigOptions;

use crate::datasource::physical_plan::{ParquetExec, SchemaAdapter};
use crate::datasource::{create_max_min_accs, get_col_stats};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::file_format::{ParquetExec, SchemaAdapter};
use crate::physical_plan::{Accumulator, ExecutionPlan, Statistics};

/// The default file extension of parquet files
Expand Down Expand Up @@ -379,7 +379,7 @@ fn summarize_min_max(
/// This component is a subject to **change** in near future and is exposed for low level integrations
/// through [`ParquetFileReaderFactory`].
///
/// [`ParquetFileReaderFactory`]: crate::physical_plan::file_format::ParquetFileReaderFactory
/// [`ParquetFileReaderFactory`]: crate::datasource::physical_plan::ParquetFileReaderFactory
pub async fn fetch_parquet_metadata(
store: &dyn ObjectStore,
meta: &ObjectMeta,
Expand Down Expand Up @@ -623,7 +623,7 @@ mod tests {
use super::*;

use crate::datasource::file_format::parquet::test_util::store_parquet;
use crate::physical_plan::file_format::get_scan_files;
use crate::datasource::physical_plan::get_scan_files;
use crate::physical_plan::metrics::MetricValue;
use crate::prelude::{SessionConfig, SessionContext};
use arrow::array::{Array, ArrayRef, StringArray};
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ pub struct PartitionedFile {
/// You may use [`wrap_partition_value_in_dict`] to wrap them if you have used [`wrap_partition_type_in_dict`] to wrap the column type.
///
///
/// [`wrap_partition_type_in_dict`]: crate::physical_plan::file_format::wrap_partition_type_in_dict
/// [`wrap_partition_value_in_dict`]: crate::physical_plan::file_format::wrap_partition_value_in_dict
/// [`wrap_partition_type_in_dict`]: crate::datasource::physical_plan::wrap_partition_type_in_dict
/// [`wrap_partition_value_in_dict`]: crate::datasource::physical_plan::wrap_partition_value_in_dict
/// [`table_partition_cols`]: table::ListingOptions::table_partition_cols
pub partition_values: Vec<ScalarValue>,
/// An optional file range for a more fine-grained parallel execution
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use object_store::path::Path;
use object_store::ObjectMeta;

use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
use crate::datasource::{
file_format::{
arrow::ArrowFormat, avro::AvroFormat, csv::CsvFormat, json::JsonFormat,
Expand All @@ -44,7 +45,6 @@ use crate::datasource::{
};
use crate::logical_expr::TableProviderFilterPushDown;
use crate::physical_plan;
use crate::physical_plan::file_format::{FileScanConfig, FileSinkConfig};
use crate::{
error::{DataFusionError, Result},
execution::context::SessionState,
Expand Down Expand Up @@ -357,7 +357,7 @@ impl ListingOptions {
/// ```
///
/// [Hive Partitioning]: https://docs.cloudera.com/HDPDocuments/HDP2/HDP-2.1.3/bk_system-admin-guide/content/hive_partitioned_tables.html
/// [`wrap_partition_type_in_dict`]: crate::physical_plan::file_format::wrap_partition_type_in_dict
/// [`wrap_partition_type_in_dict`]: crate::datasource::physical_plan::wrap_partition_type_in_dict
pub fn with_table_partition_cols(
mut self,
table_partition_cols: Vec<(String, DataType)>,
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub mod file_format;
pub mod listing;
pub mod listing_table_factory;
pub mod memory;
pub mod physical_plan;
pub mod streaming;
pub mod view;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
// under the License.

//! Execution plan for reading Arrow files
use crate::error::Result;
use crate::physical_plan::file_format::{
use crate::datasource::physical_plan::{
FileMeta, FileOpenFuture, FileOpener, FileScanConfig,
};
use crate::error::Result;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ impl ExecutionPlan for AvroExec {
#[cfg(feature = "avro")]
mod private {
use super::*;
use crate::physical_plan::file_format::file_stream::{FileOpenFuture, FileOpener};
use crate::physical_plan::file_format::FileMeta;
use crate::datasource::physical_plan::file_stream::{FileOpenFuture, FileOpener};
use crate::datasource::physical_plan::FileMeta;
use bytes::Buf;
use futures::StreamExt;
use object_store::{GetResult, ObjectStore};
Expand Down Expand Up @@ -222,7 +222,7 @@ mod tests {
use crate::datasource::file_format::{avro::AvroFormat, FileFormat};
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::physical_plan::file_format::chunked_store::ChunkedStore;
use crate::datasource::physical_plan::chunked_store::ChunkedStore;
use crate::prelude::SessionContext;
use crate::scalar::ScalarValue;
use crate::test::object_store::local_unpartitioned_file;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
//! Execution plan for reading CSV files

use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::physical_plan::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::datasource::physical_plan::FileMeta;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::common::AbortOnDropSingle;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::file_format::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::physical_plan::file_format::FileMeta;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
Expand Down Expand Up @@ -369,7 +369,7 @@ pub async fn plan_to_csv(
mod tests {
use super::*;
use crate::datasource::file_format::file_type::FileType;
use crate::physical_plan::file_format::chunked_store::ChunkedStore;
use crate::datasource::physical_plan::chunked_store::ChunkedStore;
use crate::prelude::*;
use crate::test::{partitioned_csv_config, partitioned_file_groups};
use crate::test_util::{aggr_test_schema_with_missing_col, arrow_test_data};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ use std::task::{Context, Poll};
use std::time::Instant;

use crate::datasource::listing::PartitionedFile;
use crate::error::Result;
use crate::physical_plan::file_format::{
use crate::datasource::physical_plan::{
FileMeta, FileScanConfig, PartitionColumnProjector,
};
use crate::error::Result;
use crate::physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
};
Expand Down Expand Up @@ -524,7 +524,7 @@ mod tests {
use super::*;
use crate::datasource::file_format::BatchSerializer;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::physical_plan::file_format::FileMeta;
use crate::datasource::physical_plan::FileMeta;
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
use crate::prelude::SessionContext;
use crate::{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

//! Execution plan for reading line-delimited JSON files
use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::physical_plan::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::datasource::physical_plan::FileMeta;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::common::AbortOnDropSingle;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::file_format::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::physical_plan::file_format::FileMeta;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
Expand Down Expand Up @@ -308,8 +308,8 @@ mod tests {
use crate::datasource::file_format::{json::JsonFormat, FileFormat};
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::datasource::physical_plan::chunked_store::ChunkedStore;
use crate::execution::context::SessionState;
use crate::physical_plan::file_format::chunked_store::ChunkedStore;
use crate::prelude::NdJsonReadOptions;
use crate::prelude::*;
use crate::test::partitioned_file_groups;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,12 @@

//! Execution plan for reading Parquet files

use fmt::Debug;
use std::any::Any;
use std::cmp::min;
use std::fmt;
use std::fs;
use std::ops::Range;
use std::sync::Arc;

use crate::physical_plan::file_format::file_stream::{
use crate::datasource::physical_plan::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::physical_plan::file_format::parquet::page_filter::PagePruningPredicate;
use crate::datasource::physical_plan::{
parquet::page_filter::PagePruningPredicate, FileMeta, FileScanConfig, SchemaAdapter,
};
use crate::{
config::ConfigOptions,
datasource::listing::FileRange,
Expand All @@ -37,13 +31,19 @@ use crate::{
physical_optimizer::pruning::PruningPredicate,
physical_plan::{
common::AbortOnDropSingle,
expressions::PhysicalSortExpr,
file_format::{FileMeta, FileScanConfig, SchemaAdapter},
metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
Partitioning, SendableRecordBatchStream, Statistics,
},
};
use datafusion_physical_expr::PhysicalSortExpr;
use fmt::Debug;
use std::any::Any;
use std::cmp::min;
use std::fmt;
use std::fs;
use std::ops::Range;
use std::sync::Arc;

use arrow::datatypes::{DataType, SchemaRef};
use arrow::error::ArrowError;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ use parquet::{
};
use std::sync::Arc;

use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use crate::physical_plan::file_format::parquet::{
use crate::datasource::physical_plan::parquet::{
from_bytes_to_i128, parquet_to_arrow_decimal_type,
};
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};

use super::metrics::ParquetFileMetrics;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use parquet::file::{
metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics,
};

use crate::physical_plan::file_format::parquet::{
use crate::datasource::physical_plan::parquet::{
from_bytes_to_i128, parquet_to_arrow_decimal_type,
};
use crate::{
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
use crate::physical_optimizer::repartition::Repartition;

use crate::config::ConfigOptions;
use crate::datasource::physical_plan::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry};
use crate::physical_optimizer::dist_enforcement::EnforceDistribution;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::physical_plan::planner::DefaultPhysicalPlanner;
use crate::physical_plan::udaf::AggregateUDF;
use crate::physical_plan::udf::ScalarUDF;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,11 @@ mod tests {
use super::*;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::datasource::physical_plan::{FileScanConfig, ParquetExec};
use crate::physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
};
use crate::physical_plan::expressions::lit;
use crate::physical_plan::file_format::{FileScanConfig, ParquetExec};
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::{displayable, Partitioning, Statistics};

Expand Down
Loading