Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rpcs] add arrow flight server #112

Merged
merged 5 commits into from
Mar 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ name = "fuse-query"
path = "src/bin/fuse-query.rs"

[dependencies]
arrow = "3.0"
arrow = { git = "https://github.com/apache/arrow", rev="5647e90" }
arrow-flight = { git = "https://github.com/apache/arrow", rev="5647e90" }
async-trait = "0.1"
dyn-clone = "1.0.4"
futures = "0.3"
Expand All @@ -38,7 +39,7 @@ structopt = "0.3"
thiserror = "1.0"
threadpool = "1.8.1"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "signal", "sync"] }
tonic = "0.4.0"
tonic = "0.4"
warp = "0.3.0"
uuid = { version = "0.8", features = ["serde", "v4"] }

Expand Down
1 change: 1 addition & 0 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ fn build_proto() {
println!("cargo:rerun-if-env-changed=FORCE_REBUILD");

println!("cargo:rerun-if-changed=proto/executor.proto");

tonic_build::configure()
.compile(&["proto/executor.proto"], &["proto"])
.map_err(|e| format!("tonic_build proto compile failed: {}", e))
Expand Down
6 changes: 5 additions & 1 deletion proto/executor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

syntax = "proto3";

package executor;
package fusequery.executor;

service Executor {
rpc Ping(PingRequest) returns (PingResponse) {}
Expand All @@ -17,3 +17,7 @@ message PingRequest {
message PingResponse {
string message = 1;
}

message ExecuteRequest {
string action = 1;
}
4 changes: 2 additions & 2 deletions src/clusters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ mod cluster_test;
mod cluster;
mod node;

pub use self::cluster::{Cluster, ClusterRef};
pub use self::node::Node;
pub use cluster::{Cluster, ClusterRef};
pub use node::Node;
2 changes: 1 addition & 1 deletion src/configs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@

mod config;

pub use self::config::Config;
pub use config::Config;
2 changes: 1 addition & 1 deletion src/datablocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ mod tests;

pub mod data_block;

pub use self::data_block::DataBlock;
pub use data_block::DataBlock;
8 changes: 4 additions & 4 deletions src/datasources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ mod statistics;
mod system;
mod table;

pub use self::datasource::{DataSource, IDataSource};
pub use self::partition::{Partition, Partitions};
pub use self::statistics::Statistics;
pub use self::table::ITable;
pub use datasource::{DataSource, IDataSource};
pub use partition::{Partition, Partitions};
pub use statistics::Statistics;
pub use table::ITable;
2 changes: 1 addition & 1 deletion src/datasources/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

pub type Partitions = Vec<Partition>;

#[derive(Clone, Debug)]
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub struct Partition {
pub name: String,
pub version: u64,
Expand Down
2 changes: 1 addition & 1 deletion src/datasources/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// SPDX-License-Identifier: Apache-2.0.

#[derive(Clone, Debug)]
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub struct Statistics {
/// Total rows of the query read.
pub read_rows: usize,
Expand Down
12 changes: 6 additions & 6 deletions src/datasources/system/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ mod one_table;
mod settings_table;
mod system_factory;

pub use self::functions_table::FunctionsTable;
pub use self::numbers_stream::NumbersStream;
pub use self::numbers_table::NumbersTable;
pub use self::one_table::OneTable;
pub use self::settings_table::SettingsTable;
pub use self::system_factory::SystemFactory;
pub use functions_table::FunctionsTable;
pub use numbers_stream::NumbersStream;
pub use numbers_table::NumbersTable;
pub use one_table::OneTable;
pub use settings_table::SettingsTable;
pub use system_factory::SystemFactory;
10 changes: 5 additions & 5 deletions src/datastreams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ mod stream_datablock;
mod stream_expression;
mod stream_limit;

pub use self::stream::SendableDataBlockStream;
pub use self::stream_channel::ChannelStream;
pub use self::stream_datablock::DataBlockStream;
pub use self::stream_expression::ExpressionStream;
pub use self::stream_limit::LimitStream;
pub use stream::SendableDataBlockStream;
pub use stream_channel::ChannelStream;
pub use stream_datablock::DataBlockStream;
pub use stream_expression::ExpressionStream;
pub use stream_limit::LimitStream;
30 changes: 15 additions & 15 deletions src/datavalues/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,25 @@ mod data_value_aggregate;
mod data_value_arithmetic;
mod data_value_operator;

pub use self::data_array_aggregate::data_array_aggregate_op;
pub use self::data_array_arithmetic::data_array_arithmetic_op;
pub use self::data_array_comparison::data_array_comparison_op;
pub use self::data_array_logic::data_array_logic_op;
pub use self::data_type::numerical_arithmetic_coercion;
pub use self::data_type::numerical_coercion;
pub use self::data_value_aggregate::data_value_aggregate_op;
pub use self::data_value_arithmetic::data_value_arithmetic_op;
pub use data_array_aggregate::data_array_aggregate_op;
pub use data_array_arithmetic::data_array_arithmetic_op;
pub use data_array_comparison::data_array_comparison_op;
pub use data_array_logic::data_array_logic_op;
pub use data_type::numerical_arithmetic_coercion;
pub use data_type::numerical_coercion;
pub use data_value_aggregate::data_value_aggregate_op;
pub use data_value_arithmetic::data_value_arithmetic_op;

pub use self::data_array::{
pub use data_array::{
BooleanArray, DataArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
Int8Array, NullArray, StringArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
pub use self::data_columnar_value::DataColumnarValue;
pub use self::data_field::DataField;
pub use self::data_schema::{DataSchema, DataSchemaRef};
pub use self::data_type::DataType;
pub use self::data_value::{DataValue, DataValueRef};
pub use self::data_value_operator::{
pub use data_columnar_value::DataColumnarValue;
pub use data_field::DataField;
pub use data_schema::{DataSchema, DataSchemaRef};
pub use data_type::DataType;
pub use data_value::{DataValue, DataValueRef};
pub use data_value_operator::{
DataValueAggregateOperator, DataValueArithmeticOperator, DataValueComparisonOperator,
DataValueLogicOperator,
};
12 changes: 12 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,15 @@ impl From<std::net::AddrParseError> for FuseQueryError {
FuseQueryError::Internal(err.to_string())
}
}

impl From<prost::EncodeError> for FuseQueryError {
fn from(err: prost::EncodeError) -> Self {
FuseQueryError::Internal(err.to_string())
}
}

impl From<tonic::transport::Error> for FuseQueryError {
fn from(err: tonic::transport::Error) -> Self {
FuseQueryError::Internal(err.to_string())
}
}
10 changes: 5 additions & 5 deletions src/functions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ mod function_field;
mod logics;
mod udfs;

pub use self::function::IFunction;
pub use self::function_alias::AliasFunction;
pub use self::function_constant::ConstantFunction;
pub use self::function_factory::{FactoryFuncRef, FunctionFactory};
pub use self::function_field::FieldFunction;
pub use function::IFunction;
pub use function_alias::AliasFunction;
pub use function_constant::ConstantFunction;
pub use function_factory::{FactoryFuncRef, FunctionFactory};
pub use function_field::FieldFunction;
10 changes: 5 additions & 5 deletions src/interpreters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ mod interpreter_factory;
mod interpreter_select;
mod interpreter_setting;

pub use self::interpreter::IInterpreter;
pub use self::interpreter_explain::ExplainInterpreter;
pub use self::interpreter_factory::InterpreterFactory;
pub use self::interpreter_select::SelectInterpreter;
pub use self::interpreter_setting::SettingInterpreter;
pub use interpreter::IInterpreter;
pub use interpreter_explain::ExplainInterpreter;
pub use interpreter_factory::InterpreterFactory;
pub use interpreter_select::SelectInterpreter;
pub use interpreter_setting::SettingInterpreter;
7 changes: 6 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@ pub mod metrics;
pub mod optimizers;
pub mod planners;
pub mod processors;
pub mod proto;
pub mod rpcs;
pub mod servers;
pub mod sessions;
pub mod sql;
pub mod transforms;

// ProtoBuf generated files.
#[allow(clippy::all)]
pub mod protobuf {
tonic::include_proto!("fusequery.executor");
}
2 changes: 1 addition & 1 deletion src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@

mod metric_service;

pub use self::metric_service::MetricService;
pub use metric_service::MetricService;
4 changes: 2 additions & 2 deletions src/optimizers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ mod optimizer_filter_push_down_test;
mod optimizer;
mod optimizer_filter_push_down;

pub use self::optimizer::{IOptimizer, Optimizer};
pub use self::optimizer_filter_push_down::FilterPushDownOptimizer;
pub use optimizer::{IOptimizer, Optimizer};
pub use optimizer_filter_push_down::FilterPushDownOptimizer;
38 changes: 19 additions & 19 deletions src/planners/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,22 @@ mod plan_setting;
mod plan_stage;
mod plan_walker;

pub use self::plan_aggregator_final::AggregatorFinalPlan;
pub use self::plan_aggregator_partial::AggregatorPartialPlan;
pub use self::plan_builder::PlanBuilder;
pub use self::plan_empty::EmptyPlan;
pub use self::plan_explain::{DFExplainType, ExplainPlan};
pub use self::plan_expression::ExpressionPlan;
pub use self::plan_expression_constant::constant;
pub use self::plan_expression_field::field;
pub use self::plan_expression_function::{add, sum};
pub use self::plan_filter::FilterPlan;
pub use self::plan_limit::LimitPlan;
pub use self::plan_node::PlanNode;
pub use self::plan_projection::ProjectionPlan;
pub use self::plan_read_datasource::ReadDataSourcePlan;
pub use self::plan_rewriter::PlanRewriter;
pub use self::plan_scan::ScanPlan;
pub use self::plan_select::SelectPlan;
pub use self::plan_setting::{SettingPlan, VarValue};
pub use self::plan_stage::{StagePlan, StageState};
pub use plan_aggregator_final::AggregatorFinalPlan;
pub use plan_aggregator_partial::AggregatorPartialPlan;
pub use plan_builder::PlanBuilder;
pub use plan_empty::EmptyPlan;
pub use plan_explain::{DFExplainType, ExplainPlan};
pub use plan_expression::ExpressionPlan;
pub use plan_expression_constant::constant;
pub use plan_expression_field::field;
pub use plan_expression_function::{add, sum};
pub use plan_filter::FilterPlan;
pub use plan_limit::LimitPlan;
pub use plan_node::PlanNode;
pub use plan_projection::ProjectionPlan;
pub use plan_read_datasource::ReadDataSourcePlan;
pub use plan_rewriter::PlanRewriter;
pub use plan_scan::ScanPlan;
pub use plan_select::SelectPlan;
pub use plan_setting::{SettingPlan, VarValue};
pub use plan_stage::{StagePlan, StageState};
2 changes: 1 addition & 1 deletion src/planners/plan_aggregator_final.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::datavalues::DataSchemaRef;
use crate::error::FuseQueryResult;
use crate::planners::{ExpressionPlan, PlanNode};

#[derive(Clone)]
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct AggregatorFinalPlan {
pub aggr_expr: Vec<ExpressionPlan>,
pub group_expr: Vec<ExpressionPlan>,
Expand Down
2 changes: 1 addition & 1 deletion src/planners/plan_aggregator_partial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::datavalues::DataSchemaRef;
use crate::error::FuseQueryResult;
use crate::planners::{ExpressionPlan, PlanNode};

#[derive(Clone)]
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct AggregatorPartialPlan {
pub group_expr: Vec<ExpressionPlan>,
pub aggr_expr: Vec<ExpressionPlan>,
Expand Down
2 changes: 1 addition & 1 deletion src/planners/plan_empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use crate::datavalues::DataSchemaRef;

#[derive(Clone)]
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct EmptyPlan {
pub(crate) schema: DataSchemaRef,
}
Expand Down
4 changes: 2 additions & 2 deletions src/planners/plan_explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ use crate::datavalues::DataSchemaRef;
use crate::error::FuseQueryResult;
use crate::planners::PlanNode;

#[derive(Debug, Clone, Copy, PartialEq)]
#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, Debug, PartialEq)]
pub enum DFExplainType {
Syntax,
Graph,
Pipeline,
}

#[derive(Clone)]
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct ExplainPlan {
pub typ: DFExplainType,
pub input: Arc<PlanNode>,
Expand Down
2 changes: 1 addition & 1 deletion src/planners/plan_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::functions::{
};
use crate::sessions::FuseQueryContextRef;

#[derive(Clone)]
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub enum ExpressionPlan {
Alias(String, Box<ExpressionPlan>),
Field(String),
Expand Down
2 changes: 1 addition & 1 deletion src/planners/plan_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::datavalues::DataSchemaRef;
use crate::error::FuseQueryResult;
use crate::planners::{ExpressionPlan, PlanNode};

#[derive(Clone)]
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct FilterPlan {
/// The predicate expression, which must have Boolean type.
pub predicate: ExpressionPlan,
Expand Down
2 changes: 1 addition & 1 deletion src/planners/plan_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::datavalues::DataSchemaRef;
use crate::error::FuseQueryResult;
use crate::planners::PlanNode;

#[derive(Clone)]
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct LimitPlan {
/// The limit
pub n: usize,
Expand Down
2 changes: 1 addition & 1 deletion src/planners/plan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::planners::{
ProjectionPlan, ReadDataSourcePlan, ScanPlan, SelectPlan, SettingPlan, StagePlan,
};

#[derive(Clone)]
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub enum PlanNode {
Empty(EmptyPlan),
Stage(StagePlan),
Expand Down
2 changes: 1 addition & 1 deletion src/planners/plan_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::planners::{ExpressionPlan, PlanNode};

/// Evaluates an arbitrary list of expressions (essentially a
/// SELECT with an expression list) on its input.
#[derive(Clone)]
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct ProjectionPlan {
/// The list of expressions
pub expr: Vec<ExpressionPlan>,
Expand Down
2 changes: 1 addition & 1 deletion src/planners/plan_read_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use crate::datasources::{Partitions, Statistics};
use crate::datavalues::DataSchemaRef;

#[derive(Clone)]
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct ReadDataSourcePlan {
pub db: String,
pub table: String,
Expand Down
2 changes: 1 addition & 1 deletion src/planners/plan_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use crate::datavalues::DataSchemaRef;
use crate::planners::ExpressionPlan;

#[derive(Clone)]
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct ScanPlan {
/// The name of the schema
pub schema_name: String,
Expand Down
2 changes: 1 addition & 1 deletion src/planners/plan_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::datavalues::DataSchemaRef;
use crate::error::FuseQueryResult;
use crate::planners::PlanNode;

#[derive(Clone)]
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct SelectPlan {
pub input: Arc<PlanNode>,
}
Expand Down