Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,30 @@

//! DataFrame API for building and executing query plans.

use crate::arrow::record_batch::RecordBatch;
use crate::error::Result;
use crate::logical_plan::{
col, DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan, LogicalPlanBuilder,
Partitioning,
};
use parquet::file::properties::WriterProperties;
use std::sync::Arc;

use crate::physical_plan::SendableRecordBatchStream;
use async_trait::async_trait;

use crate::arrow::datatypes::Schema;
use crate::arrow::datatypes::SchemaRef;
use crate::arrow::record_batch::RecordBatch;
use crate::arrow::util::pretty;
use crate::datasource::TableProvider;
use crate::execution::context::{SessionState, TaskContext};
use crate::error::Result;
use crate::execution::{
context::{SessionState, TaskContext},
FunctionRegistry,
};
use crate::logical_expr::{utils::find_window_exprs, TableType};
use crate::logical_plan::{
col, DFSchema, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Partitioning,
};
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::physical_plan::SendableRecordBatchStream;
use crate::physical_plan::{collect, collect_partitioned};
use crate::physical_plan::{execute_stream, execute_stream_partitioned, ExecutionPlan};
use crate::scalar::ScalarValue;
use async_trait::async_trait;
use parking_lot::RwLock;
use parquet::file::properties::WriterProperties;
use std::any::Any;
use std::sync::Arc;

/// DataFrame represents a logical set of rows with the same named columns.
/// Similar to a [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html) or
Expand Down
87 changes: 87 additions & 0 deletions datafusion/core/src/datasource/default_table_source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Default TableSource implementation used in DataFusion physical plans

use crate::datasource::TableProvider;
use arrow::datatypes::SchemaRef;
use datafusion_common::DataFusionError;
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableSource};
use std::any::Any;
use std::sync::Arc;

/// DataFusion default table source, wrapping TableProvider
///
/// This structure adapts a `TableProvider` (physical plan trait) to the `TableSource`
/// (logical plan trait)
pub struct DefaultTableSource {
/// table provider
pub table_provider: Arc<dyn TableProvider>,
}

impl DefaultTableSource {
/// Create a new DefaultTableSource to wrap a TableProvider
pub fn new(table_provider: Arc<dyn TableProvider>) -> Self {
Self { table_provider }
}
}

impl TableSource for DefaultTableSource {
/// Returns the table source as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any {
self
}

/// Get a reference to the schema for this table
fn schema(&self) -> SchemaRef {
self.table_provider.schema()
}

/// Tests whether the table provider can make use of a filter expression
/// to optimise data retrieval.
fn supports_filter_pushdown(
&self,
filter: &Expr,
) -> datafusion_common::Result<TableProviderFilterPushDown> {
self.table_provider.supports_filter_pushdown(filter)
}
}

/// Wrap TableProvider in TableSource
pub fn provider_as_source(
table_provider: Arc<dyn TableProvider>,
) -> Arc<dyn TableSource> {
Arc::new(DefaultTableSource::new(table_provider))
}

/// Attempt to downcast a TableSource to DefaultTableSource and access the
/// TableProvider. This will only work with a TableSource created by DataFusion.
pub fn source_as_provider(
source: &Arc<dyn TableSource>,
) -> datafusion_common::Result<Arc<dyn TableProvider>> {
match source
.as_ref()
.as_any()
.downcast_ref::<DefaultTableSource>()
{
Some(source) => Ok(source.table_provider.clone()),
_ => Err(DataFusionError::Internal(
"TableSource was not DefaultTableSource".to_string(),
)),
}
}
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#![allow(clippy::module_inception)]
pub mod datasource;
pub mod default_table_source;
pub mod empty;
pub mod file_format;
pub mod listing;
Expand All @@ -29,6 +30,9 @@ pub mod view;
use futures::Stream;

pub use self::datasource::TableProvider;
pub use self::default_table_source::{
provider_as_source, source_as_provider, DefaultTableSource,
};
use self::listing::PartitionedFile;
pub use self::memory::MemTable;
pub use self::view::ViewTable;
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ pub mod context;
pub mod disk_manager;
pub mod memory_manager;
pub mod options;
pub mod registry;
pub mod runtime_env;

pub use disk_manager::DiskManager;
pub use memory_manager::{
human_readable_size, MemoryConsumer, MemoryConsumerId, MemoryManager,
};
pub use registry::FunctionRegistry;
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

//! FunctionRegistry trait

use crate::error::Result;
use datafusion_expr::{AggregateUDF, ScalarUDF};
use std::{collections::HashSet, sync::Arc};
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! This module provides an `Expr` enum for representing expressions
//! such as `col = 5` or `SUM(col)`. See examples on the [`Expr`] struct.
//! This is a legacy module that only contains re-exports of other modules

pub use datafusion_common::{Column, ExprSchema};
pub use datafusion_expr::{expr_fn::*, lit, lit_timestamp_nano, Expr, Literal, Operator};
69 changes: 32 additions & 37 deletions datafusion/core/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,51 +15,46 @@
// specific language governing permissions and limitations
// under the License.

//! This module provides a logical query plan enum that can describe queries. Logical query
//! plans can be created from a SQL statement or built programmatically via the Table API.
//!
//! Logical query plans can then be optimized and executed directly, or translated into
//! physical query plans and executed.
//! This is a legacy module that only contains re-exports of other modules

mod expr;
pub mod plan;
mod registry;
pub mod window_frames;
pub use datafusion_common::{DFField, DFSchema, DFSchemaRef, ToDFSchema};
pub use datafusion_expr::{
expr_fn::binary_expr,
expr_rewriter,
expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion},
logical_plan::builder::{
build_join_schema, union_with_alias, LogicalPlanBuilder, UNNAMED_TABLE,
},
ExprSchemable, Operator,

pub use crate::datasource::{provider_as_source, source_as_provider};
pub use crate::execution::FunctionRegistry;
pub use datafusion_common::{
Column, DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema,
};
pub use datafusion_optimizer::expr_simplifier::{ExprSimplifiable, SimplifyInfo};
pub use expr::{
pub use datafusion_expr::{
abs, acos, and, approx_distinct, approx_percentile_cont, array, ascii, asin, atan,
avg, bit_length, btrim, call_fn, case, ceil, character_length, chr, coalesce, col,
combine_filters, concat, concat_expr, concat_ws, concat_ws_expr, cos, count,
count_distinct, create_udaf, create_udf, date_part, date_trunc, digest, exists, exp,
expr_rewriter,
expr_rewriter::{
normalize_col, normalize_col_with_schemas, normalize_cols, replace_col,
rewrite_sort_cols_by_aggs, unnormalize_col, unnormalize_cols, ExprRewritable,
ExprRewriter, RewriteRecursion,
},
expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion},
floor, in_list, in_subquery, initcap, left, length, lit, lit_timestamp_nano, ln,
log10, log2, lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now,
now_expr, nullif, octet_length, or, power, random, regexp_match, regexp_replace,
repeat, replace, reverse, right, round, rpad, rtrim, scalar_subquery, sha224, sha256,
sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan,
to_hex, to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate,
trim, trunc, unalias, upper, when, Column, Expr, ExprSchema, Literal,
};
pub use expr_rewriter::{
normalize_col, normalize_col_with_schemas, normalize_cols, replace_col,
rewrite_sort_cols_by_aggs, unnormalize_col, unnormalize_cols, ExprRewritable,
ExprRewriter, RewriteRecursion,
};
pub use plan::{provider_as_source, source_as_provider};
pub use plan::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
CreateView, CrossJoin, DropTable, EmptyRelation, FileType, JoinConstraint, JoinType,
Limit, LogicalPlan, Offset, Partitioning, PlanType, PlanVisitor, Repartition,
StringifiedPlan, Subquery, TableScan, ToStringifiedPlan, Union,
UserDefinedLogicalNode, Values,
log10, log2,
logical_plan::{
builder::{
build_join_schema, union_with_alias, LogicalPlanBuilder, UNNAMED_TABLE,
},
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
CreateView, CrossJoin, DropTable, EmptyRelation, FileType, JoinConstraint,
JoinType, Limit, LogicalPlan, Offset, Partitioning, PlanType, PlanVisitor,
Repartition, StringifiedPlan, Subquery, TableScan, ToStringifiedPlan, Union,
UserDefinedLogicalNode, Values,
},
lower, lpad, ltrim, max, md5, min, not_exists, not_in_subquery, now, now_expr,
nullif, octet_length, or, power, random, regexp_match, regexp_replace, repeat,
replace, reverse, right, round, rpad, rtrim, scalar_subquery, sha224, sha256, sha384,
sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex,
to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trim,
trunc, unalias, upper, when, Expr, ExprSchemable, Literal, Operator,
};
pub use registry::FunctionRegistry;
pub use datafusion_optimizer::expr_simplifier::{ExprSimplifiable, SimplifyInfo};
75 changes: 4 additions & 71 deletions datafusion/core/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,11 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! This module contains the `LogicalPlan` enum that describes queries
//! via a logical query plan.

use super::expr::Expr;
use crate::arrow::datatypes::SchemaRef;
use crate::datasource::TableProvider;
use crate::error::DataFusionError;
pub use crate::logical_expr::{
//! This is a legacy module that only contains re-exports of other modules

pub use crate::datasource::{provider_as_source, source_as_provider, DefaultTableSource};
pub use datafusion_expr::{
logical_plan::{
display::{GraphvizVisitor, IndentVisitor},
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
Expand All @@ -33,67 +30,3 @@ pub use crate::logical_expr::{
},
TableProviderFilterPushDown, TableSource,
};
use std::any::Any;
use std::sync::Arc;

/// DataFusion default table source, wrapping TableProvider
///
/// This structure adapts a `TableProvider` (physical plan trait) to the `TableSource`
/// (logical plan trait)
pub struct DefaultTableSource {
/// table provider
pub table_provider: Arc<dyn TableProvider>,
}

impl DefaultTableSource {
/// Create a new DefaultTableSource to wrap a TableProvider
pub fn new(table_provider: Arc<dyn TableProvider>) -> Self {
Self { table_provider }
}
}

impl TableSource for DefaultTableSource {
/// Returns the table source as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any {
self
}

/// Get a reference to the schema for this table
fn schema(&self) -> SchemaRef {
self.table_provider.schema()
}

/// Tests whether the table provider can make use of a filter expression
/// to optimise data retrieval.
fn supports_filter_pushdown(
&self,
filter: &Expr,
) -> datafusion_common::Result<TableProviderFilterPushDown> {
self.table_provider.supports_filter_pushdown(filter)
}
}

/// Wrap TableProvider in TableSource
pub fn provider_as_source(
table_provider: Arc<dyn TableProvider>,
) -> Arc<dyn TableSource> {
Arc::new(DefaultTableSource::new(table_provider))
}

/// Attempt to downcast a TableSource to DefaultTableSource and access the
/// TableProvider. This will only work with a TableSource created by DataFusion.
pub fn source_as_provider(
source: &Arc<dyn TableSource>,
) -> datafusion_common::Result<Arc<dyn TableProvider>> {
match source
.as_ref()
.as_any()
.downcast_ref::<DefaultTableSource>()
{
Some(source) => Ok(source.table_provider.clone()),
_ => Err(DataFusionError::Internal(
"TableSource was not DefaultTableSource".to_string(),
)),
}
}
2 changes: 1 addition & 1 deletion datafusion/core/src/logical_plan/window_frames.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
// specific language governing permissions and limitations
// under the License.

//! Window frame types, reimported from datafusion_expr
//! This is a legacy module that only contains re-exports of other modules

pub use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits};
3 changes: 2 additions & 1 deletion datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use arrow::{
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use datafusion_expr::binary_expr;
use datafusion_expr::utils::expr_to_columns;
use datafusion_physical_expr::create_physical_expr;

Expand Down Expand Up @@ -710,7 +711,7 @@ fn build_predicate_expression(
if op == Operator::And || op == Operator::Or {
let left_expr = build_predicate_expression(left, schema, required_columns)?;
let right_expr = build_predicate_expression(right, schema, required_columns)?;
return Ok(logical_plan::binary_expr(left_expr, op, right_expr));
return Ok(binary_expr(left_expr, op, right_expr));
}

let expr_builder =
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ use super::{
aggregates, empty::EmptyExec, hash_join::PartitionMode, udaf, union::UnionExec,
values::ValuesExec, windows,
};
use crate::datasource::source_as_provider;
use crate::execution::context::{ExecutionProps, SessionState};
use crate::logical_expr::utils::generate_sort_key;
use crate::logical_plan::plan::{
source_as_provider, Aggregate, EmptyRelation, Filter, Join, Projection, Sort,
SubqueryAlias, TableScan, Window,
Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, TableScan,
Window,
};
use crate::logical_plan::{
unalias, unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan,
Expand Down