diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 7692a187ec6c..5a1330bf9fb6 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -17,30 +17,30 @@ //! DataFrame API for building and executing query plans. -use crate::arrow::record_batch::RecordBatch; -use crate::error::Result; -use crate::logical_plan::{ - col, DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan, LogicalPlanBuilder, - Partitioning, -}; -use parquet::file::properties::WriterProperties; -use std::sync::Arc; - -use crate::physical_plan::SendableRecordBatchStream; -use async_trait::async_trait; - use crate::arrow::datatypes::Schema; use crate::arrow::datatypes::SchemaRef; +use crate::arrow::record_batch::RecordBatch; use crate::arrow::util::pretty; use crate::datasource::TableProvider; -use crate::execution::context::{SessionState, TaskContext}; +use crate::error::Result; +use crate::execution::{ + context::{SessionState, TaskContext}, + FunctionRegistry, +}; use crate::logical_expr::{utils::find_window_exprs, TableType}; +use crate::logical_plan::{ + col, DFSchema, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Partitioning, +}; use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet}; +use crate::physical_plan::SendableRecordBatchStream; use crate::physical_plan::{collect, collect_partitioned}; use crate::physical_plan::{execute_stream, execute_stream_partitioned, ExecutionPlan}; use crate::scalar::ScalarValue; +use async_trait::async_trait; use parking_lot::RwLock; +use parquet::file::properties::WriterProperties; use std::any::Any; +use std::sync::Arc; /// DataFrame represents a logical set of rows with the same named columns. /// Similar to a [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html) or diff --git a/datafusion/core/src/datasource/default_table_source.rs b/datafusion/core/src/datasource/default_table_source.rs new file mode 100644 index 000000000000..2e65be0bcc3a --- /dev/null +++ b/datafusion/core/src/datasource/default_table_source.rs @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Default TableSource implementation used in DataFusion physical plans + +use crate::datasource::TableProvider; +use arrow::datatypes::SchemaRef; +use datafusion_common::DataFusionError; +use datafusion_expr::{Expr, TableProviderFilterPushDown, TableSource}; +use std::any::Any; +use std::sync::Arc; + +/// DataFusion default table source, wrapping TableProvider +/// +/// This structure adapts a `TableProvider` (physical plan trait) to the `TableSource` +/// (logical plan trait) +pub struct DefaultTableSource { + /// table provider + pub table_provider: Arc, +} + +impl DefaultTableSource { + /// Create a new DefaultTableSource to wrap a TableProvider + pub fn new(table_provider: Arc) -> Self { + Self { table_provider } + } +} + +impl TableSource for DefaultTableSource { + /// Returns the table source as [`Any`](std::any::Any) so that it can be + /// downcast to a specific implementation. + fn as_any(&self) -> &dyn Any { + self + } + + /// Get a reference to the schema for this table + fn schema(&self) -> SchemaRef { + self.table_provider.schema() + } + + /// Tests whether the table provider can make use of a filter expression + /// to optimise data retrieval. + fn supports_filter_pushdown( + &self, + filter: &Expr, + ) -> datafusion_common::Result { + self.table_provider.supports_filter_pushdown(filter) + } +} + +/// Wrap TableProvider in TableSource +pub fn provider_as_source( + table_provider: Arc, +) -> Arc { + Arc::new(DefaultTableSource::new(table_provider)) +} + +/// Attempt to downcast a TableSource to DefaultTableSource and access the +/// TableProvider. This will only work with a TableSource created by DataFusion. +pub fn source_as_provider( + source: &Arc, +) -> datafusion_common::Result> { + match source + .as_ref() + .as_any() + .downcast_ref::() + { + Some(source) => Ok(source.table_provider.clone()), + _ => Err(DataFusionError::Internal( + "TableSource was not DefaultTableSource".to_string(), + )), + } +} diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 8cb40cd271a2..13e175797a14 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -19,6 +19,7 @@ #![allow(clippy::module_inception)] pub mod datasource; +pub mod default_table_source; pub mod empty; pub mod file_format; pub mod listing; @@ -29,6 +30,9 @@ pub mod view; use futures::Stream; pub use self::datasource::TableProvider; +pub use self::default_table_source::{ + provider_as_source, source_as_provider, DefaultTableSource, +}; use self::listing::PartitionedFile; pub use self::memory::MemTable; pub use self::view::ViewTable; diff --git a/datafusion/core/src/execution/mod.rs b/datafusion/core/src/execution/mod.rs index 85a9f8b4e648..024980dee059 100644 --- a/datafusion/core/src/execution/mod.rs +++ b/datafusion/core/src/execution/mod.rs @@ -44,9 +44,11 @@ pub mod context; pub mod disk_manager; pub mod memory_manager; pub mod options; +pub mod registry; pub mod runtime_env; pub use disk_manager::DiskManager; pub use memory_manager::{ human_readable_size, MemoryConsumer, MemoryConsumerId, MemoryManager, }; +pub use registry::FunctionRegistry; diff --git a/datafusion/core/src/logical_plan/registry.rs b/datafusion/core/src/execution/registry.rs similarity index 97% rename from datafusion/core/src/logical_plan/registry.rs rename to datafusion/core/src/execution/registry.rs index f439a12f31a9..5bfa306e35ec 100644 --- a/datafusion/core/src/logical_plan/registry.rs +++ b/datafusion/core/src/execution/registry.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//! FunctionRegistry trait + use crate::error::Result; use datafusion_expr::{AggregateUDF, ScalarUDF}; use std::{collections::HashSet, sync::Arc}; diff --git a/datafusion/core/src/logical_plan/expr.rs b/datafusion/core/src/logical_plan/expr.rs index fecb7ce46f7f..771888cbc523 100644 --- a/datafusion/core/src/logical_plan/expr.rs +++ b/datafusion/core/src/logical_plan/expr.rs @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! This module provides an `Expr` enum for representing expressions -//! such as `col = 5` or `SUM(col)`. See examples on the [`Expr`] struct. +//! This is a legacy module that only contains re-exports of other modules pub use datafusion_common::{Column, ExprSchema}; pub use datafusion_expr::{expr_fn::*, lit, lit_timestamp_nano, Expr, Literal, Operator}; diff --git a/datafusion/core/src/logical_plan/mod.rs b/datafusion/core/src/logical_plan/mod.rs index eb6e788d500d..ea6238f408a5 100644 --- a/datafusion/core/src/logical_plan/mod.rs +++ b/datafusion/core/src/logical_plan/mod.rs @@ -15,51 +15,46 @@ // specific language governing permissions and limitations // under the License. -//! This module provides a logical query plan enum that can describe queries. Logical query -//! plans can be created from a SQL statement or built programmatically via the Table API. -//! -//! Logical query plans can then be optimized and executed directly, or translated into -//! physical query plans and executed. +//! This is a legacy module that only contains re-exports of other modules mod expr; pub mod plan; -mod registry; pub mod window_frames; -pub use datafusion_common::{DFField, DFSchema, DFSchemaRef, ToDFSchema}; -pub use datafusion_expr::{ - expr_fn::binary_expr, - expr_rewriter, - expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion}, - logical_plan::builder::{ - build_join_schema, union_with_alias, LogicalPlanBuilder, UNNAMED_TABLE, - }, - ExprSchemable, Operator, + +pub use crate::datasource::{provider_as_source, source_as_provider}; +pub use crate::execution::FunctionRegistry; +pub use datafusion_common::{ + Column, DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema, }; -pub use datafusion_optimizer::expr_simplifier::{ExprSimplifiable, SimplifyInfo}; -pub use expr::{ +pub use datafusion_expr::{ abs, acos, and, approx_distinct, approx_percentile_cont, array, ascii, asin, atan, avg, bit_length, btrim, call_fn, case, ceil, character_length, chr, coalesce, col, combine_filters, concat, concat_expr, concat_ws, concat_ws_expr, cos, count, count_distinct, create_udaf, create_udf, date_part, date_trunc, digest, exists, exp, + expr_rewriter, + expr_rewriter::{ + normalize_col, normalize_col_with_schemas, normalize_cols, replace_col, + rewrite_sort_cols_by_aggs, unnormalize_col, unnormalize_cols, ExprRewritable, + ExprRewriter, RewriteRecursion, + }, + expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion}, floor, in_list, in_subquery, initcap, left, length, lit, lit_timestamp_nano, ln, - log10, log2, lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now, - now_expr, nullif, octet_length, or, power, random, regexp_match, regexp_replace, - repeat, replace, reverse, right, round, rpad, rtrim, scalar_subquery, sha224, sha256, - sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, - to_hex, to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, - trim, trunc, unalias, upper, when, Column, Expr, ExprSchema, Literal, -}; -pub use expr_rewriter::{ - normalize_col, normalize_col_with_schemas, normalize_cols, replace_col, - rewrite_sort_cols_by_aggs, unnormalize_col, unnormalize_cols, ExprRewritable, - ExprRewriter, RewriteRecursion, -}; -pub use plan::{provider_as_source, source_as_provider}; -pub use plan::{ - CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, - CreateView, CrossJoin, DropTable, EmptyRelation, FileType, JoinConstraint, JoinType, - Limit, LogicalPlan, Offset, Partitioning, PlanType, PlanVisitor, Repartition, - StringifiedPlan, Subquery, TableScan, ToStringifiedPlan, Union, - UserDefinedLogicalNode, Values, + log10, log2, + logical_plan::{ + builder::{ + build_join_schema, union_with_alias, LogicalPlanBuilder, UNNAMED_TABLE, + }, + CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, + CreateView, CrossJoin, DropTable, EmptyRelation, FileType, JoinConstraint, + JoinType, Limit, LogicalPlan, Offset, Partitioning, PlanType, PlanVisitor, + Repartition, StringifiedPlan, Subquery, TableScan, ToStringifiedPlan, Union, + UserDefinedLogicalNode, Values, + }, + lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now, now_expr, + nullif, octet_length, or, power, random, regexp_match, regexp_replace, repeat, + replace, reverse, right, round, rpad, rtrim, scalar_subquery, sha224, sha256, sha384, + sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex, + to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trim, + trunc, unalias, upper, when, Expr, ExprSchemable, Literal, Operator, }; -pub use registry::FunctionRegistry; +pub use datafusion_optimizer::expr_simplifier::{ExprSimplifiable, SimplifyInfo}; diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs index 6529695ad33f..e841b22ee085 100644 --- a/datafusion/core/src/logical_plan/plan.rs +++ b/datafusion/core/src/logical_plan/plan.rs @@ -14,14 +14,11 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -//! This module contains the `LogicalPlan` enum that describes queries -//! via a logical query plan. -use super::expr::Expr; -use crate::arrow::datatypes::SchemaRef; -use crate::datasource::TableProvider; -use crate::error::DataFusionError; -pub use crate::logical_expr::{ +//! This is a legacy module that only contains re-exports of other modules + +pub use crate::datasource::{provider_as_source, source_as_provider, DefaultTableSource}; +pub use datafusion_expr::{ logical_plan::{ display::{GraphvizVisitor, IndentVisitor}, Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable, @@ -33,67 +30,3 @@ pub use crate::logical_expr::{ }, TableProviderFilterPushDown, TableSource, }; -use std::any::Any; -use std::sync::Arc; - -/// DataFusion default table source, wrapping TableProvider -/// -/// This structure adapts a `TableProvider` (physical plan trait) to the `TableSource` -/// (logical plan trait) -pub struct DefaultTableSource { - /// table provider - pub table_provider: Arc, -} - -impl DefaultTableSource { - /// Create a new DefaultTableSource to wrap a TableProvider - pub fn new(table_provider: Arc) -> Self { - Self { table_provider } - } -} - -impl TableSource for DefaultTableSource { - /// Returns the table source as [`Any`](std::any::Any) so that it can be - /// downcast to a specific implementation. - fn as_any(&self) -> &dyn Any { - self - } - - /// Get a reference to the schema for this table - fn schema(&self) -> SchemaRef { - self.table_provider.schema() - } - - /// Tests whether the table provider can make use of a filter expression - /// to optimise data retrieval. - fn supports_filter_pushdown( - &self, - filter: &Expr, - ) -> datafusion_common::Result { - self.table_provider.supports_filter_pushdown(filter) - } -} - -/// Wrap TableProvider in TableSource -pub fn provider_as_source( - table_provider: Arc, -) -> Arc { - Arc::new(DefaultTableSource::new(table_provider)) -} - -/// Attempt to downcast a TableSource to DefaultTableSource and access the -/// TableProvider. This will only work with a TableSource created by DataFusion. -pub fn source_as_provider( - source: &Arc, -) -> datafusion_common::Result> { - match source - .as_ref() - .as_any() - .downcast_ref::() - { - Some(source) => Ok(source.table_provider.clone()), - _ => Err(DataFusionError::Internal( - "TableSource was not DefaultTableSource".to_string(), - )), - } -} diff --git a/datafusion/core/src/logical_plan/window_frames.rs b/datafusion/core/src/logical_plan/window_frames.rs index 519582089db4..1ff7331b9ff4 100644 --- a/datafusion/core/src/logical_plan/window_frames.rs +++ b/datafusion/core/src/logical_plan/window_frames.rs @@ -15,6 +15,6 @@ // specific language governing permissions and limitations // under the License. -//! Window frame types, reimported from datafusion_expr +//! This is a legacy module that only contains re-exports of other modules pub use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits}; diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index b79e16090186..073aa2457146 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -44,6 +44,7 @@ use arrow::{ datatypes::{DataType, Field, Schema, SchemaRef}, record_batch::RecordBatch, }; +use datafusion_expr::binary_expr; use datafusion_expr::utils::expr_to_columns; use datafusion_physical_expr::create_physical_expr; @@ -710,7 +711,7 @@ fn build_predicate_expression( if op == Operator::And || op == Operator::Or { let left_expr = build_predicate_expression(left, schema, required_columns)?; let right_expr = build_predicate_expression(right, schema, required_columns)?; - return Ok(logical_plan::binary_expr(left_expr, op, right_expr)); + return Ok(binary_expr(left_expr, op, right_expr)); } let expr_builder = diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 39603d8f1f8e..743ed7e5dd5e 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -22,11 +22,12 @@ use super::{ aggregates, empty::EmptyExec, hash_join::PartitionMode, udaf, union::UnionExec, values::ValuesExec, windows, }; +use crate::datasource::source_as_provider; use crate::execution::context::{ExecutionProps, SessionState}; use crate::logical_expr::utils::generate_sort_key; use crate::logical_plan::plan::{ - source_as_provider, Aggregate, EmptyRelation, Filter, Join, Projection, Sort, - SubqueryAlias, TableScan, Window, + Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, TableScan, + Window, }; use crate::logical_plan::{ unalias, unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan,