Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions datafusion/core/src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,4 @@ pub mod context;
pub use crate::datasource::file_format::options;

// backwards compatibility
pub use datafusion_execution::disk_manager;
pub use datafusion_execution::memory_pool;
pub use datafusion_execution::registry;
pub use datafusion_execution::runtime_env;

pub use disk_manager::DiskManager;
pub use registry::FunctionRegistry;
pub use datafusion_execution::*;
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ use futures::stream::{Stream, StreamExt};
use hashbrown::raw::RawTable;
use itertools::izip;

use crate::execution::context::TaskContext;
use crate::execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
use crate::physical_plan::aggregates::{
evaluate_group_by, evaluate_many, evaluate_optional, group_schema, AggregateMode,
AggregationOrdering, GroupByOrderMode, PhysicalGroupBy, RowAccumulatorItem,
};
use crate::physical_plan::metrics::{BaselineMetrics, RecordOutput};
use crate::physical_plan::{aggregates, AggregateExpr, PhysicalExpr};
use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;

use crate::physical_plan::aggregates::utils::{
aggr_state_schema, col_to_scalar, get_at_indices, get_optional_filters,
Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

//! Aggregates functionalities

use crate::execution::context::TaskContext;
use crate::physical_plan::aggregates::{
bounded_aggregate_stream::BoundedAggregateStream, no_grouping::AggregateStream,
row_hash::GroupedHashAggregateStream,
Expand All @@ -32,6 +31,7 @@ use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::utils::longest_consecutive_prefix;
use datafusion_common::{DataFusionError, Result};
use datafusion_execution::TaskContext;
use datafusion_expr::Accumulator;
use datafusion_physical_expr::{
aggregate::row_accumulator::RowAccumulator,
Expand Down Expand Up @@ -1047,8 +1047,8 @@ fn evaluate_group_by(

#[cfg(test)]
mod tests {
use crate::execution::context::{SessionConfig, TaskContext};
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use super::*;
use crate::execution::context::SessionConfig;
use crate::from_slice::FromSlice;
use crate::physical_plan::aggregates::{
get_finest_requirement, get_working_mode, AggregateExec, AggregateMode,
Expand All @@ -1063,6 +1063,7 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_physical_expr::expressions::{
lit, ApproxDistinct, Column, Count, Median,
};
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/aggregates/no_grouping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

//! Aggregate without grouping columns

use crate::execution::context::TaskContext;
use crate::physical_plan::aggregates::{
aggregate_expressions, create_accumulators, finalize_aggregation, AccumulatorItem,
AggregateMode,
Expand All @@ -27,14 +26,15 @@ use crate::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::PhysicalExpr;
use futures::stream::BoxStream;
use std::borrow::Cow;
use std::sync::Arc;
use std::task::{Context, Poll};

use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
use crate::physical_plan::filter::batch_filter;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use futures::stream::{Stream, StreamExt};

use super::AggregateExec;
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_plan/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ use datafusion_physical_expr::hash_utils::create_hashes;
use futures::ready;
use futures::stream::{Stream, StreamExt};

use crate::execution::context::TaskContext;
use crate::execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
use crate::physical_plan::aggregates::utils::{
aggr_state_schema, col_to_scalar, get_at_indices, get_optional_filters,
read_as_batch, slice_and_maybe_filter, ExecutionState, GroupState,
Expand All @@ -49,6 +46,9 @@ use arrow::datatypes::DataType;
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use datafusion_common::cast::as_boolean_array;
use datafusion_common::{Result, ScalarValue};
use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;
use datafusion_expr::Accumulator;
use datafusion_row::accessor::RowAccessor;
use datafusion_row::layout::RowLayout;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use tokio::task::JoinSet;
use super::expressions::PhysicalSortExpr;
use super::stream::RecordBatchStreamAdapter;
use super::{Distribution, SendableRecordBatchStream};
use crate::execution::context::TaskContext;
use datafusion_execution::TaskContext;

/// `EXPLAIN ANALYZE` execution plan operator. This operator runs its input,
/// discards the results, and then prints out an annotated plan with metrics
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ use crate::physical_plan::{
RecordBatchStream, SendableRecordBatchStream,
};

use crate::execution::context::TaskContext;
use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use datafusion_execution::TaskContext;
use futures::stream::{Stream, StreamExt};
use log::trace;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ use crate::physical_plan::{
};

use super::SendableRecordBatchStream;
use crate::execution::context::TaskContext;
use crate::physical_plan::common::spawn_execution;
use datafusion_execution::TaskContext;

/// Merge execution plan executes partitions in parallel and combines them into a single
/// partition. No guarantees are made about the order of the resulting partition.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

use super::SendableRecordBatchStream;
use crate::error::{DataFusionError, Result};
use crate::execution::context::TaskContext;
use crate::execution::memory_pool::MemoryReservation;
use crate::physical_plan::stream::RecordBatchReceiverStream;
use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statistics};
use arrow::datatypes::Schema;
use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
use arrow::record_batch::RecordBatch;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::expressions::{BinaryExpr, Column};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use futures::{Future, StreamExt, TryStreamExt};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use log::trace;
use super::expressions::PhysicalSortExpr;
use super::{common, SendableRecordBatchStream, Statistics};

use crate::execution::context::TaskContext;
use datafusion_execution::TaskContext;

/// Execution plan for empty relation (produces no rows)
#[derive(Debug)]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatc
use log::trace;

use super::{expressions::PhysicalSortExpr, SendableRecordBatchStream};
use crate::execution::context::TaskContext;
use crate::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion_execution::TaskContext;

/// Explain execution plan operator. This operator contains the string
/// values of the various plans it has when it is created, and passes
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

//! Execution plan for reading line-delimited Avro files
use crate::error::Result;
use crate::execution::context::TaskContext;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
Partitioning, SendableRecordBatchStream, Statistics,
};
use datafusion_execution::TaskContext;

use arrow::datatypes::SchemaRef;
use datafusion_physical_expr::{LexOrdering, OrderingEquivalenceProperties};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

use crate::datasource::file_format::file_type::FileCompressionType;
use crate::error::{DataFusionError, Result};
use crate::execution::context::TaskContext;
use crate::physical_plan::common::AbortOnDropSingle;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::file_format::file_stream::{
Expand All @@ -33,6 +32,7 @@ use crate::physical_plan::{
};
use arrow::csv;
use arrow::datatypes::SchemaRef;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{LexOrdering, OrderingEquivalenceProperties};

use bytes::{Buf, Bytes};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
//! Execution plan for reading line-delimited JSON files
use crate::datasource::file_format::file_type::FileCompressionType;
use crate::error::{DataFusionError, Result};
use crate::execution::context::TaskContext;
use crate::physical_plan::common::AbortOnDropSingle;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::file_format::file_stream::{
Expand All @@ -30,6 +29,7 @@ use crate::physical_plan::{
ordering_equivalence_properties_helper, DisplayFormatType, ExecutionPlan,
Partitioning, SendableRecordBatchStream, Statistics,
};
use datafusion_execution::TaskContext;

use arrow::json::ReaderBuilder;
use arrow::{datatypes::SchemaRef, json};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ pub struct FileScanConfig {
/// [`RuntimeEnv::object_store`]
///
/// [`ObjectStore`]: object_store::ObjectStore
/// [`RuntimeEnv::object_store`]: crate::execution::runtime_env::RuntimeEnv::object_store
/// [`RuntimeEnv::object_store`]: datafusion_execution::runtime_env::RuntimeEnv::object_store
pub object_store_url: ObjectStoreUrl,
/// Schema before `projection` is applied. It contains the all columns that may
/// appear in the files. It does not include table partition columns
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -781,12 +781,12 @@ mod tests {
// See also `parquet_exec` integration test

use super::*;
use crate::datasource::file_format::options::CsvReadOptions;
use crate::datasource::file_format::parquet::test_util::store_parquet;
use crate::datasource::file_format::test_util::scan_format;
use crate::datasource::listing::{FileRange, PartitionedFile};
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::context::SessionState;
use crate::execution::options::CsvReadOptions;
use crate::physical_plan::displayable;
use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use crate::test::object_store::local_unpartitioned_file;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use datafusion_physical_expr::{split_conjunction, AnalysisContext};

use log::trace;

use crate::execution::context::TaskContext;
use datafusion_execution::TaskContext;
use futures::stream::{Stream, StreamExt};

/// FilterExec evaluates a boolean predicate against all input batches to determine which rows to
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ use std::any::Any;
use std::fmt::{Debug, Display};
use std::sync::Arc;

use crate::execution::context::TaskContext;
use crate::physical_plan::stream::RecordBatchStreamAdapter;
use crate::physical_plan::Distribution;
use datafusion_common::DataFusionError;
use datafusion_execution::TaskContext;

/// `DataSink` implements writing streams of [`RecordBatch`]es to
/// user defined destinations.
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_plan/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use std::{any::Any, sync::Arc, task::Poll};
use arrow::datatypes::{Fields, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;

use crate::execution::context::TaskContext;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
coalesce_batches::concat_batches, coalesce_partitions::CoalescePartitionsExec,
Expand All @@ -37,6 +36,7 @@ use crate::{error::Result, scalar::ScalarValue};
use async_trait::async_trait;
use datafusion_common::DataFusionError;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;

use super::utils::{
adjust_right_output_partitioning, cross_join_equivalence_properties,
Expand Down Expand Up @@ -158,7 +158,7 @@ impl ExecutionPlan for CrossJoinExec {

/// Specifies whether this plan generates an infinite stream of records.
/// If the plan does not support pipelining, but its input(s) are
/// infinite, returns an error to indicate this.
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
if children[0] || children[1] {
Err(DataFusionError::Plan(
Expand Down Expand Up @@ -457,10 +457,10 @@ mod tests {
use super::*;
use crate::assert_batches_sorted_eq;
use crate::common::assert_contains;
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::physical_plan::common;
use crate::prelude::{SessionConfig, SessionContext};
use crate::test::{build_table_scan_i32, columns};
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};

async fn join_collect(
left: Arc<dyn ExecutionPlan>,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ use std::sync::Arc;
use std::task::Poll;

use crate::error::Result;
use crate::execution::context::TaskContext;
use crate::execution::memory_pool::MemoryConsumer;
use crate::physical_plan::coalesce_batches::concat_batches;
use datafusion_execution::memory_pool::MemoryConsumer;
use datafusion_execution::TaskContext;

/// Data of the inner table side
type JoinLeftData = (RecordBatch, MemoryReservation);
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_plan/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ use futures::{Stream, StreamExt};

use crate::error::DataFusionError;
use crate::error::Result;
use crate::execution::context::TaskContext;
use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation};
use crate::logical_expr::JoinType;
use crate::physical_plan::expressions::Column;
use crate::physical_plan::expressions::PhysicalSortExpr;
Expand All @@ -54,6 +52,8 @@ use crate::physical_plan::{
metrics, DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan,
Partitioning, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;

use datafusion_common::tree_node::{Transformed, TreeNode};

Expand Down Expand Up @@ -1396,7 +1396,6 @@ mod tests {

use crate::common::assert_contains;
use crate::error::Result;
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::logical_expr::JoinType;
use crate::physical_plan::expressions::Column;
use crate::physical_plan::joins::utils::JoinOn;
Expand All @@ -1406,6 +1405,7 @@ mod tests {
use crate::prelude::{SessionConfig, SessionContext};
use crate::test::{build_table_i32, columns};
use crate::{assert_batches_eq, assert_batches_sorted_eq};
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};

fn build_table(
a: (&str, &Vec<i32>),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ use datafusion_execution::memory_pool::MemoryConsumer;
use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval, IntervalBound};

use crate::error::{DataFusionError, Result};
use crate::execution::context::TaskContext;
use crate::logical_expr::JoinType;
use crate::physical_plan::common::SharedMemoryReservation;
use crate::physical_plan::joins::hash_join_utils::convert_sort_expr_with_filter_schema;
Expand All @@ -72,6 +71,7 @@ use crate::physical_plan::{
DisplayFormatType, Distribution, EquivalenceProperties, ExecutionPlan, Partitioning,
RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use datafusion_execution::TaskContext;

const HASHMAP_SHRINK_SCALE_FACTOR: usize = 4;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use super::{
RecordBatchStream, SendableRecordBatchStream, Statistics,
};

use crate::execution::context::TaskContext;
use datafusion_execution::TaskContext;

/// Limit execution plan
#[derive(Debug)]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use std::any::Any;
use std::sync::Arc;
use std::task::{Context, Poll};

use crate::execution::context::TaskContext;
use datafusion_common::DataFusionError;
use datafusion_execution::TaskContext;
use futures::Stream;

/// Execution plan for reading in-memory batches of data
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,10 +716,10 @@ pub mod unnest;
pub mod values;
pub mod windows;

use crate::execution::context::TaskContext;
use crate::physical_plan::common::AbortOnDropSingle;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_execution::TaskContext;
pub use datafusion_physical_expr::{
expressions, functions, hash_utils, type_coercion, udf,
};
Expand Down
Loading