diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index bcaed880a326..596267486ff1 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -20,7 +20,7 @@ mod dfschema; mod error; #[cfg(feature = "pyarrow")] mod pyarrow; -mod scalar; +pub mod scalar; pub use column::Column; pub use dfschema::{DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema}; diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml index 57466e03be02..58a69124dbf4 100644 --- a/datafusion/proto/Cargo.toml +++ b/datafusion/proto/Cargo.toml @@ -35,7 +35,10 @@ path = "src/lib.rs" [features] [dependencies] +arrow = { version = "14.0.0" } datafusion = { path = "../core", version = "8.0.0" } +datafusion-common = { path = "../common", version = "8.0.0" } +datafusion-expr = { path = "../expr", version = "8.0.0" } prost = "0.10" [build-dependencies] diff --git a/datafusion/proto/src/bytes/mod.rs b/datafusion/proto/src/bytes/mod.rs index 67bf65d6ab66..1781756df61d 100644 --- a/datafusion/proto/src/bytes/mod.rs +++ b/datafusion/proto/src/bytes/mod.rs @@ -17,13 +17,12 @@ //! Serialization / Deserialization to Bytes use crate::{from_proto::parse_expr, protobuf}; -use datafusion::{ - common::{DataFusionError, Result}, - logical_plan::{Expr, FunctionRegistry}, -}; +use datafusion_common::{DataFusionError, Result}; +use datafusion_expr::Expr; use prost::{bytes::BytesMut, Message}; // Reexport Bytes which appears in the API +use datafusion::logical_plan::FunctionRegistry; pub use prost::bytes::Bytes; mod registry; @@ -32,8 +31,7 @@ mod registry; /// bytes. /// /// ``` -/// use datafusion::prelude::*; -/// use datafusion::logical_plan::Expr; +/// use datafusion_expr::{col, lit, Expr}; /// use datafusion_proto::bytes::Serializeable; /// /// // Create a new `Expr` a < 32 @@ -98,13 +96,13 @@ impl Serializeable for Expr { #[cfg(test)] mod test { use super::*; - use std::sync::Arc; - + use arrow::{array::ArrayRef, datatypes::DataType}; + use datafusion::prelude::SessionContext; use datafusion::{ - arrow::array::ArrayRef, arrow::datatypes::DataType, logical_expr::Volatility, logical_plan::create_udf, physical_plan::functions::make_scalar_function, - prelude::*, }; + use datafusion_expr::{lit, Volatility}; + use std::sync::Arc; #[test] #[should_panic( diff --git a/datafusion/proto/src/bytes/registry.rs b/datafusion/proto/src/bytes/registry.rs index e49cbc6294f3..2f701a0c20c1 100644 --- a/datafusion/proto/src/bytes/registry.rs +++ b/datafusion/proto/src/bytes/registry.rs @@ -17,11 +17,9 @@ use std::{collections::HashSet, sync::Arc}; -use datafusion::{ - common::{DataFusionError, Result}, - logical_expr::{AggregateUDF, ScalarUDF}, - logical_plan::FunctionRegistry, -}; +use datafusion::logical_plan::FunctionRegistry; +use datafusion_common::{DataFusionError, Result}; +use datafusion_expr::{AggregateUDF, ScalarUDF}; /// A default [`FunctionRegistry`] registry that does not resolve any /// user defined functions diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs index 0bb767a347cd..fd72db4edac4 100644 --- a/datafusion/proto/src/from_proto.rs +++ b/datafusion/proto/src/from_proto.rs @@ -21,29 +21,22 @@ use crate::protobuf::plan_type::PlanTypeEnum::{ OptimizedLogicalPlan, OptimizedPhysicalPlan, }; use crate::protobuf::{OptimizedLogicalPlanType, OptimizedPhysicalPlanType}; -use datafusion::logical_plan::plan::StringifiedPlan; -use datafusion::logical_plan::{FunctionRegistry, PlanType}; -use datafusion::prelude::bit_length; -use datafusion::{ - arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit, UnionMode}, - error::DataFusionError, - logical_expr::{BuiltInWindowFunction, BuiltinScalarFunction}, - logical_plan::{ - abs, acos, ascii, asin, atan, ceil, character_length, chr, concat_expr, - concat_ws_expr, cos, digest, exp, floor, left, ln, log10, log2, now_expr, nullif, - power, random, regexp_replace, repeat, replace, reverse, right, round, signum, - sin, split_part, sqrt, starts_with, strpos, substr, tan, to_hex, - to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trunc, - window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits}, - Column, DFField, DFSchema, DFSchemaRef, Expr, Operator, - }, - physical_plan::aggregates::AggregateFunction, - prelude::{ - array, btrim, coalesce, date_part, date_trunc, lower, lpad, ltrim, md5, - octet_length, regexp_match, rpad, rtrim, sha224, sha256, sha384, sha512, trim, - upper, - }, - scalar::ScalarValue, +use arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit, UnionMode}; +use datafusion::logical_plan::FunctionRegistry; +use datafusion_common::{ + Column, DFField, DFSchema, DFSchemaRef, DataFusionError, ScalarValue, +}; +use datafusion_expr::{ + abs, acos, array, ascii, asin, atan, bit_length, btrim, ceil, character_length, chr, + coalesce, concat_expr, concat_ws_expr, cos, date_part, date_trunc, digest, exp, + floor, left, ln, log10, log2, + logical_plan::{PlanType, StringifiedPlan}, + lower, lpad, ltrim, md5, now_expr, nullif, octet_length, power, random, regexp_match, + regexp_replace, repeat, replace, reverse, right, round, rpad, rtrim, sha224, sha256, + sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, tan, + to_hex, to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, + trim, trunc, upper, AggregateFunction, BuiltInWindowFunction, BuiltinScalarFunction, + Expr, Operator, WindowFrame, WindowFrameBound, WindowFrameUnits, }; use std::sync::Arc; @@ -915,7 +908,6 @@ pub fn parse_expr( proto: &protobuf::LogicalExprNode, registry: &dyn FunctionRegistry, ) -> Result { - use datafusion::logical_expr::window_function; use protobuf::{logical_expr_node::ExprType, window_expr_node, ScalarFunction}; let expr_type = proto @@ -971,7 +963,7 @@ pub fn parse_expr( let aggr_function = protobuf::AggregateFunction::try_from(i)?.into(); Ok(Expr::WindowFunction { - fun: window_function::WindowFunction::AggregateFunction( + fun: datafusion_expr::window_function::WindowFunction::AggregateFunction( aggr_function, ), args: vec![parse_required_expr(&expr.expr, registry, "expr")?], @@ -986,7 +978,7 @@ pub fn parse_expr( .into(); Ok(Expr::WindowFunction { - fun: window_function::WindowFunction::BuiltInWindowFunction( + fun: datafusion_expr::window_function::WindowFunction::BuiltInWindowFunction( built_in_function, ), args: vec![parse_required_expr(&expr.expr, registry, "expr")?], diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs index 24809b9aa6f4..3b042da423b0 100644 --- a/datafusion/proto/src/lib.rs +++ b/datafusion/proto/src/lib.rs @@ -29,17 +29,17 @@ pub mod to_proto; mod roundtrip_tests { use super::from_proto::parse_expr; use super::protobuf; - use datafusion::arrow::array::ArrayRef; + use arrow::{ + array::ArrayRef, + datatypes::{DataType, Field, IntervalUnit, TimeUnit, UnionMode}, + }; use datafusion::logical_plan::create_udaf; use datafusion::physical_plan::functions::make_scalar_function; - use datafusion::physical_plan::Accumulator; - use datafusion::{ - arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit, UnionMode}, - logical_expr::{BuiltinScalarFunction::Sqrt, Volatility}, - logical_plan::{col, Expr}, - physical_plan::aggregates, - prelude::*, - scalar::ScalarValue, + use datafusion::prelude::{create_udf, SessionContext}; + use datafusion_common::ScalarValue; + use datafusion_expr::{ + col, lit, Accumulator, AggregateFunction, BuiltinScalarFunction::Sqrt, Expr, + Volatility, }; use std::sync::Arc; @@ -704,7 +704,7 @@ mod roundtrip_tests { #[test] fn roundtrip_approx_percentile_cont() { let test_expr = Expr::AggregateFunction { - fun: aggregates::AggregateFunction::ApproxPercentileCont, + fun: AggregateFunction::ApproxPercentileCont, args: vec![col("bananas"), lit(0.42_f32)], distinct: false, }; diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs index a5f6ffbb0fe2..7aa4278b39a4 100644 --- a/datafusion/proto/src/to_proto.rs +++ b/datafusion/proto/src/to_proto.rs @@ -19,28 +19,22 @@ //! DataFusion logical plans to be serialized and transmitted between //! processes. -use crate::protobuf; -use crate::protobuf::plan_type::PlanTypeEnum::{ - FinalLogicalPlan, FinalPhysicalPlan, InitialLogicalPlan, InitialPhysicalPlan, - OptimizedLogicalPlan, OptimizedPhysicalPlan, -}; use crate::protobuf::{ + self, + plan_type::PlanTypeEnum::{ + FinalLogicalPlan, FinalPhysicalPlan, InitialLogicalPlan, InitialPhysicalPlan, + OptimizedLogicalPlan, OptimizedPhysicalPlan, + }, EmptyMessage, OptimizedLogicalPlanType, OptimizedPhysicalPlanType, }; - -use datafusion::logical_plan::plan::StringifiedPlan; -use datafusion::logical_plan::PlanType; -use datafusion::{ - arrow::datatypes::{ - DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit, UnionMode, - }, - logical_expr::{BuiltInWindowFunction, BuiltinScalarFunction, WindowFunction}, - logical_plan::{ - window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits}, - Column, DFField, DFSchemaRef, Expr, - }, - physical_plan::aggregates::AggregateFunction, - scalar::ScalarValue, +use arrow::datatypes::{ + DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit, UnionMode, +}; +use datafusion_common::{Column, DFField, DFSchemaRef, ScalarValue}; +use datafusion_expr::{ + logical_plan::PlanType, logical_plan::StringifiedPlan, AggregateFunction, + BuiltInWindowFunction, BuiltinScalarFunction, Expr, WindowFrame, WindowFrameBound, + WindowFrameUnits, WindowFunction, }; #[derive(Debug)] @@ -732,7 +726,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue { type Error = Error; fn try_from(val: &ScalarValue) -> Result { - use datafusion::scalar; + use datafusion_common::scalar; use protobuf::{scalar_value::Value, PrimitiveScalarType}; let scalar_val = match val {