Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ mod dfschema;
mod error;
#[cfg(feature = "pyarrow")]
mod pyarrow;
mod scalar;
pub mod scalar;

pub use column::Column;
pub use dfschema::{DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema};
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ path = "src/lib.rs"
[features]

[dependencies]
arrow = { version = "14.0.0" }
datafusion = { path = "../core", version = "8.0.0" }
datafusion-common = { path = "../common", version = "8.0.0" }
datafusion-expr = { path = "../expr", version = "8.0.0" }
prost = "0.10"

[build-dependencies]
Expand Down
18 changes: 8 additions & 10 deletions datafusion/proto/src/bytes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

//! Serialization / Deserialization to Bytes
use crate::{from_proto::parse_expr, protobuf};
use datafusion::{
common::{DataFusionError, Result},
logical_plan::{Expr, FunctionRegistry},
};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::Expr;
use prost::{bytes::BytesMut, Message};

// Reexport Bytes which appears in the API
use datafusion::logical_plan::FunctionRegistry;
pub use prost::bytes::Bytes;

mod registry;
Expand All @@ -32,8 +31,7 @@ mod registry;
/// bytes.
///
/// ```
/// use datafusion::prelude::*;
/// use datafusion::logical_plan::Expr;
/// use datafusion_expr::{col, lit, Expr};
/// use datafusion_proto::bytes::Serializeable;
///
/// // Create a new `Expr` a < 32
Expand Down Expand Up @@ -98,13 +96,13 @@ impl Serializeable for Expr {
#[cfg(test)]
mod test {
use super::*;
use std::sync::Arc;

use arrow::{array::ArrayRef, datatypes::DataType};
use datafusion::prelude::SessionContext;
use datafusion::{
arrow::array::ArrayRef, arrow::datatypes::DataType, logical_expr::Volatility,
logical_plan::create_udf, physical_plan::functions::make_scalar_function,
prelude::*,
};
use datafusion_expr::{lit, Volatility};
use std::sync::Arc;

#[test]
#[should_panic(
Expand Down
8 changes: 3 additions & 5 deletions datafusion/proto/src/bytes/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

use std::{collections::HashSet, sync::Arc};

use datafusion::{
common::{DataFusionError, Result},
logical_expr::{AggregateUDF, ScalarUDF},
logical_plan::FunctionRegistry,
};
use datafusion::logical_plan::FunctionRegistry;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{AggregateUDF, ScalarUDF};

/// A default [`FunctionRegistry`] registry that does not resolve any
/// user defined functions
Expand Down
44 changes: 18 additions & 26 deletions datafusion/proto/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,22 @@ use crate::protobuf::plan_type::PlanTypeEnum::{
OptimizedLogicalPlan, OptimizedPhysicalPlan,
};
use crate::protobuf::{OptimizedLogicalPlanType, OptimizedPhysicalPlanType};
use datafusion::logical_plan::plan::StringifiedPlan;
use datafusion::logical_plan::{FunctionRegistry, PlanType};
use datafusion::prelude::bit_length;
use datafusion::{
arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit, UnionMode},
error::DataFusionError,
logical_expr::{BuiltInWindowFunction, BuiltinScalarFunction},
logical_plan::{
abs, acos, ascii, asin, atan, ceil, character_length, chr, concat_expr,
concat_ws_expr, cos, digest, exp, floor, left, ln, log10, log2, now_expr, nullif,
power, random, regexp_replace, repeat, replace, reverse, right, round, signum,
sin, split_part, sqrt, starts_with, strpos, substr, tan, to_hex,
to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trunc,
window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
Column, DFField, DFSchema, DFSchemaRef, Expr, Operator,
},
physical_plan::aggregates::AggregateFunction,
prelude::{
array, btrim, coalesce, date_part, date_trunc, lower, lpad, ltrim, md5,
octet_length, regexp_match, rpad, rtrim, sha224, sha256, sha384, sha512, trim,
upper,
},
scalar::ScalarValue,
use arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit, UnionMode};
use datafusion::logical_plan::FunctionRegistry;
use datafusion_common::{
Column, DFField, DFSchema, DFSchemaRef, DataFusionError, ScalarValue,
};
use datafusion_expr::{
abs, acos, array, ascii, asin, atan, bit_length, btrim, ceil, character_length, chr,
coalesce, concat_expr, concat_ws_expr, cos, date_part, date_trunc, digest, exp,
floor, left, ln, log10, log2,
logical_plan::{PlanType, StringifiedPlan},
lower, lpad, ltrim, md5, now_expr, nullif, octet_length, power, random, regexp_match,
regexp_replace, repeat, replace, reverse, right, round, rpad, rtrim, sha224, sha256,
sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos, substr, tan,
to_hex, to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate,
trim, trunc, upper, AggregateFunction, BuiltInWindowFunction, BuiltinScalarFunction,
Expr, Operator, WindowFrame, WindowFrameBound, WindowFrameUnits,
};
use std::sync::Arc;

Expand Down Expand Up @@ -915,7 +908,6 @@ pub fn parse_expr(
proto: &protobuf::LogicalExprNode,
registry: &dyn FunctionRegistry,
) -> Result<Expr, Error> {
use datafusion::logical_expr::window_function;
use protobuf::{logical_expr_node::ExprType, window_expr_node, ScalarFunction};

let expr_type = proto
Expand Down Expand Up @@ -971,7 +963,7 @@ pub fn parse_expr(
let aggr_function = protobuf::AggregateFunction::try_from(i)?.into();

Ok(Expr::WindowFunction {
fun: window_function::WindowFunction::AggregateFunction(
fun: datafusion_expr::window_function::WindowFunction::AggregateFunction(
aggr_function,
),
args: vec![parse_required_expr(&expr.expr, registry, "expr")?],
Expand All @@ -986,7 +978,7 @@ pub fn parse_expr(
.into();

Ok(Expr::WindowFunction {
fun: window_function::WindowFunction::BuiltInWindowFunction(
fun: datafusion_expr::window_function::WindowFunction::BuiltInWindowFunction(
built_in_function,
),
args: vec![parse_required_expr(&expr.expr, registry, "expr")?],
Expand Down
20 changes: 10 additions & 10 deletions datafusion/proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ pub mod to_proto;
mod roundtrip_tests {
use super::from_proto::parse_expr;
use super::protobuf;
use datafusion::arrow::array::ArrayRef;
use arrow::{
array::ArrayRef,
datatypes::{DataType, Field, IntervalUnit, TimeUnit, UnionMode},
};
use datafusion::logical_plan::create_udaf;
use datafusion::physical_plan::functions::make_scalar_function;
use datafusion::physical_plan::Accumulator;
use datafusion::{
arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit, UnionMode},
logical_expr::{BuiltinScalarFunction::Sqrt, Volatility},
logical_plan::{col, Expr},
physical_plan::aggregates,
prelude::*,
scalar::ScalarValue,
use datafusion::prelude::{create_udf, SessionContext};
use datafusion_common::ScalarValue;
use datafusion_expr::{
col, lit, Accumulator, AggregateFunction, BuiltinScalarFunction::Sqrt, Expr,
Volatility,
};
use std::sync::Arc;

Expand Down Expand Up @@ -704,7 +704,7 @@ mod roundtrip_tests {
#[test]
fn roundtrip_approx_percentile_cont() {
let test_expr = Expr::AggregateFunction {
fun: aggregates::AggregateFunction::ApproxPercentileCont,
fun: AggregateFunction::ApproxPercentileCont,
args: vec![col("bananas"), lit(0.42_f32)],
distinct: false,
};
Expand Down
34 changes: 14 additions & 20 deletions datafusion/proto/src/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,22 @@
//! DataFusion logical plans to be serialized and transmitted between
//! processes.

use crate::protobuf;
use crate::protobuf::plan_type::PlanTypeEnum::{
FinalLogicalPlan, FinalPhysicalPlan, InitialLogicalPlan, InitialPhysicalPlan,
OptimizedLogicalPlan, OptimizedPhysicalPlan,
};
use crate::protobuf::{
self,
plan_type::PlanTypeEnum::{
FinalLogicalPlan, FinalPhysicalPlan, InitialLogicalPlan, InitialPhysicalPlan,
OptimizedLogicalPlan, OptimizedPhysicalPlan,
},
EmptyMessage, OptimizedLogicalPlanType, OptimizedPhysicalPlanType,
};

use datafusion::logical_plan::plan::StringifiedPlan;
use datafusion::logical_plan::PlanType;
use datafusion::{
arrow::datatypes::{
DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit, UnionMode,
},
logical_expr::{BuiltInWindowFunction, BuiltinScalarFunction, WindowFunction},
logical_plan::{
window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
Column, DFField, DFSchemaRef, Expr,
},
physical_plan::aggregates::AggregateFunction,
scalar::ScalarValue,
use arrow::datatypes::{
DataType, Field, IntervalUnit, Schema, SchemaRef, TimeUnit, UnionMode,
};
use datafusion_common::{Column, DFField, DFSchemaRef, ScalarValue};
use datafusion_expr::{
logical_plan::PlanType, logical_plan::StringifiedPlan, AggregateFunction,
BuiltInWindowFunction, BuiltinScalarFunction, Expr, WindowFrame, WindowFrameBound,
WindowFrameUnits, WindowFunction,
};

#[derive(Debug)]
Expand Down Expand Up @@ -732,7 +726,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
type Error = Error;

fn try_from(val: &ScalarValue) -> Result<Self, Self::Error> {
use datafusion::scalar;
use datafusion_common::scalar;
use protobuf::{scalar_value::Value, PrimitiveScalarType};

let scalar_val = match val {
Expand Down