Skip to content

Commit

Permalink
Merge pull request #112 from BohuTANG/dev-flight
Browse files Browse the repository at this point in the history
[rpcs] add arrow flight service
  • Loading branch information
BohuTANG committed Mar 6, 2021
2 parents db0813e + 596582b commit f790885
Show file tree
Hide file tree
Showing 51 changed files with 427 additions and 155 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ name = "fuse-query"
path = "src/bin/fuse-query.rs"

[dependencies]
arrow = "3.0"
arrow = { git = "https://github.com/apache/arrow", rev="5647e90" }
arrow-flight = { git = "https://github.com/apache/arrow", rev="5647e90" }
async-trait = "0.1"
dyn-clone = "1.0.4"
futures = "0.3"
Expand All @@ -38,7 +39,7 @@ structopt = "0.3"
thiserror = "1.0"
threadpool = "1.8.1"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "signal", "sync"] }
tonic = "0.4.0"
tonic = "0.4"
warp = "0.3.0"
uuid = { version = "0.8", features = ["serde", "v4"] }

Expand Down
1 change: 1 addition & 0 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ fn build_proto() {
println!("cargo:rerun-if-env-changed=FORCE_REBUILD");

println!("cargo:rerun-if-changed=proto/executor.proto");

tonic_build::configure()
.compile(&["proto/executor.proto"], &["proto"])
.map_err(|e| format!("tonic_build proto compile failed: {}", e))
Expand Down
6 changes: 5 additions & 1 deletion proto/executor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

syntax = "proto3";

package executor;
package fusequery.executor;

service Executor {
rpc Ping(PingRequest) returns (PingResponse) {}
Expand All @@ -17,3 +17,7 @@ message PingRequest {
message PingResponse {
string message = 1;
}

message ExecuteRequest {
string action = 1;
}
4 changes: 2 additions & 2 deletions src/clusters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ mod cluster_test;
mod cluster;
mod node;

pub use self::cluster::{Cluster, ClusterRef};
pub use self::node::Node;
pub use cluster::{Cluster, ClusterRef};
pub use node::Node;
2 changes: 1 addition & 1 deletion src/configs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@

mod config;

pub use self::config::Config;
pub use config::Config;
2 changes: 1 addition & 1 deletion src/datablocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ mod tests;

pub mod data_block;

pub use self::data_block::DataBlock;
pub use data_block::DataBlock;
8 changes: 4 additions & 4 deletions src/datasources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ mod statistics;
mod system;
mod table;

pub use self::datasource::{DataSource, IDataSource};
pub use self::partition::{Partition, Partitions};
pub use self::statistics::Statistics;
pub use self::table::ITable;
pub use datasource::{DataSource, IDataSource};
pub use partition::{Partition, Partitions};
pub use statistics::Statistics;
pub use table::ITable;
2 changes: 1 addition & 1 deletion src/datasources/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

pub type Partitions = Vec<Partition>;

#[derive(Clone, Debug)]
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub struct Partition {
pub name: String,
pub version: u64,
Expand Down
2 changes: 1 addition & 1 deletion src/datasources/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//
// SPDX-License-Identifier: Apache-2.0.

#[derive(Clone, Debug)]
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub struct Statistics {
/// Total rows of the query read.
pub read_rows: usize,
Expand Down
12 changes: 6 additions & 6 deletions src/datasources/system/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ mod one_table;
mod settings_table;
mod system_factory;

pub use self::functions_table::FunctionsTable;
pub use self::numbers_stream::NumbersStream;
pub use self::numbers_table::NumbersTable;
pub use self::one_table::OneTable;
pub use self::settings_table::SettingsTable;
pub use self::system_factory::SystemFactory;
pub use functions_table::FunctionsTable;
pub use numbers_stream::NumbersStream;
pub use numbers_table::NumbersTable;
pub use one_table::OneTable;
pub use settings_table::SettingsTable;
pub use system_factory::SystemFactory;
10 changes: 5 additions & 5 deletions src/datastreams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ mod stream_datablock;
mod stream_expression;
mod stream_limit;

pub use self::stream::SendableDataBlockStream;
pub use self::stream_channel::ChannelStream;
pub use self::stream_datablock::DataBlockStream;
pub use self::stream_expression::ExpressionStream;
pub use self::stream_limit::LimitStream;
pub use stream::SendableDataBlockStream;
pub use stream_channel::ChannelStream;
pub use stream_datablock::DataBlockStream;
pub use stream_expression::ExpressionStream;
pub use stream_limit::LimitStream;
30 changes: 15 additions & 15 deletions src/datavalues/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,25 @@ mod data_value_aggregate;
mod data_value_arithmetic;
mod data_value_operator;

pub use self::data_array_aggregate::data_array_aggregate_op;
pub use self::data_array_arithmetic::data_array_arithmetic_op;
pub use self::data_array_comparison::data_array_comparison_op;
pub use self::data_array_logic::data_array_logic_op;
pub use self::data_type::numerical_arithmetic_coercion;
pub use self::data_type::numerical_coercion;
pub use self::data_value_aggregate::data_value_aggregate_op;
pub use self::data_value_arithmetic::data_value_arithmetic_op;
pub use data_array_aggregate::data_array_aggregate_op;
pub use data_array_arithmetic::data_array_arithmetic_op;
pub use data_array_comparison::data_array_comparison_op;
pub use data_array_logic::data_array_logic_op;
pub use data_type::numerical_arithmetic_coercion;
pub use data_type::numerical_coercion;
pub use data_value_aggregate::data_value_aggregate_op;
pub use data_value_arithmetic::data_value_arithmetic_op;

pub use self::data_array::{
pub use data_array::{
BooleanArray, DataArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
Int8Array, NullArray, StringArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
pub use self::data_columnar_value::DataColumnarValue;
pub use self::data_field::DataField;
pub use self::data_schema::{DataSchema, DataSchemaRef};
pub use self::data_type::DataType;
pub use self::data_value::{DataValue, DataValueRef};
pub use self::data_value_operator::{
pub use data_columnar_value::DataColumnarValue;
pub use data_field::DataField;
pub use data_schema::{DataSchema, DataSchemaRef};
pub use data_type::DataType;
pub use data_value::{DataValue, DataValueRef};
pub use data_value_operator::{
DataValueAggregateOperator, DataValueArithmeticOperator, DataValueComparisonOperator,
DataValueLogicOperator,
};
12 changes: 12 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,15 @@ impl From<std::net::AddrParseError> for FuseQueryError {
FuseQueryError::Internal(err.to_string())
}
}

impl From<prost::EncodeError> for FuseQueryError {
fn from(err: prost::EncodeError) -> Self {
FuseQueryError::Internal(err.to_string())
}
}

impl From<tonic::transport::Error> for FuseQueryError {
fn from(err: tonic::transport::Error) -> Self {
FuseQueryError::Internal(err.to_string())
}
}
10 changes: 5 additions & 5 deletions src/functions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ mod function_field;
mod logics;
mod udfs;

pub use self::function::IFunction;
pub use self::function_alias::AliasFunction;
pub use self::function_constant::ConstantFunction;
pub use self::function_factory::{FactoryFuncRef, FunctionFactory};
pub use self::function_field::FieldFunction;
pub use function::IFunction;
pub use function_alias::AliasFunction;
pub use function_constant::ConstantFunction;
pub use function_factory::{FactoryFuncRef, FunctionFactory};
pub use function_field::FieldFunction;
10 changes: 5 additions & 5 deletions src/interpreters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ mod interpreter_factory;
mod interpreter_select;
mod interpreter_setting;

pub use self::interpreter::IInterpreter;
pub use self::interpreter_explain::ExplainInterpreter;
pub use self::interpreter_factory::InterpreterFactory;
pub use self::interpreter_select::SelectInterpreter;
pub use self::interpreter_setting::SettingInterpreter;
pub use interpreter::IInterpreter;
pub use interpreter_explain::ExplainInterpreter;
pub use interpreter_factory::InterpreterFactory;
pub use interpreter_select::SelectInterpreter;
pub use interpreter_setting::SettingInterpreter;
7 changes: 6 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@ pub mod metrics;
pub mod optimizers;
pub mod planners;
pub mod processors;
pub mod proto;
pub mod rpcs;
pub mod servers;
pub mod sessions;
pub mod sql;
pub mod transforms;

// ProtoBuf generated files.
#[allow(clippy::all)]
pub mod protobuf {
tonic::include_proto!("fusequery.executor");
}
2 changes: 1 addition & 1 deletion src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@

mod metric_service;

pub use self::metric_service::MetricService;
pub use metric_service::MetricService;
4 changes: 2 additions & 2 deletions src/optimizers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ mod optimizer_filter_push_down_test;
mod optimizer;
mod optimizer_filter_push_down;

pub use self::optimizer::{IOptimizer, Optimizer};
pub use self::optimizer_filter_push_down::FilterPushDownOptimizer;
pub use optimizer::{IOptimizer, Optimizer};
pub use optimizer_filter_push_down::FilterPushDownOptimizer;
38 changes: 19 additions & 19 deletions src/planners/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,22 @@ mod plan_setting;
mod plan_stage;
mod plan_walker;

pub use self::plan_aggregator_final::AggregatorFinalPlan;
pub use self::plan_aggregator_partial::AggregatorPartialPlan;
pub use self::plan_builder::PlanBuilder;
pub use self::plan_empty::EmptyPlan;
pub use self::plan_explain::{DFExplainType, ExplainPlan};
pub use self::plan_expression::ExpressionPlan;
pub use self::plan_expression_constant::constant;
pub use self::plan_expression_field::field;
pub use self::plan_expression_function::{add, sum};
pub use self::plan_filter::FilterPlan;
pub use self::plan_limit::LimitPlan;
pub use self::plan_node::PlanNode;
pub use self::plan_projection::ProjectionPlan;
pub use self::plan_read_datasource::ReadDataSourcePlan;
pub use self::plan_rewriter::PlanRewriter;
pub use self::plan_scan::ScanPlan;
pub use self::plan_select::SelectPlan;
pub use self::plan_setting::{SettingPlan, VarValue};
pub use self::plan_stage::{StagePlan, StageState};
pub use plan_aggregator_final::AggregatorFinalPlan;
pub use plan_aggregator_partial::AggregatorPartialPlan;
pub use plan_builder::PlanBuilder;
pub use plan_empty::EmptyPlan;
pub use plan_explain::{DFExplainType, ExplainPlan};
pub use plan_expression::ExpressionPlan;
pub use plan_expression_constant::constant;
pub use plan_expression_field::field;
pub use plan_expression_function::{add, sum};
pub use plan_filter::FilterPlan;
pub use plan_limit::LimitPlan;
pub use plan_node::PlanNode;
pub use plan_projection::ProjectionPlan;
pub use plan_read_datasource::ReadDataSourcePlan;
pub use plan_rewriter::PlanRewriter;
pub use plan_scan::ScanPlan;
pub use plan_select::SelectPlan;
pub use plan_setting::{SettingPlan, VarValue};
pub use plan_stage::{StagePlan, StageState};
2 changes: 1 addition & 1 deletion src/planners/plan_aggregator_final.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::datavalues::DataSchemaRef;
use crate::error::FuseQueryResult;
use crate::planners::{ExpressionPlan, PlanNode};

#[derive(Clone)]
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct AggregatorFinalPlan {
pub aggr_expr: Vec<ExpressionPlan>,
pub group_expr: Vec<ExpressionPlan>,
Expand Down
2 changes: 1 addition & 1 deletion src/planners/plan_aggregator_partial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::datavalues::DataSchemaRef;
use crate::error::FuseQueryResult;
use crate::planners::{ExpressionPlan, PlanNode};

#[derive(Clone)]
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct AggregatorPartialPlan {
pub group_expr: Vec<ExpressionPlan>,
pub aggr_expr: Vec<ExpressionPlan>,
Expand Down
2 changes: 1 addition & 1 deletion src/planners/plan_empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use crate::datavalues::DataSchemaRef;

#[derive(Clone)]
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct EmptyPlan {
pub(crate) schema: DataSchemaRef,
}
Expand Down
4 changes: 2 additions & 2 deletions src/planners/plan_explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ use crate::datavalues::DataSchemaRef;
use crate::error::FuseQueryResult;
use crate::planners::PlanNode;

#[derive(Debug, Clone, Copy, PartialEq)]
#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, Debug, PartialEq)]
pub enum DFExplainType {
Syntax,
Graph,
Pipeline,
}

#[derive(Clone)]
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct ExplainPlan {
pub typ: DFExplainType,
pub input: Arc<PlanNode>,
Expand Down
2 changes: 1 addition & 1 deletion src/planners/plan_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::functions::{
};
use crate::sessions::FuseQueryContextRef;

#[derive(Clone)]
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub enum ExpressionPlan {
Alias(String, Box<ExpressionPlan>),
Field(String),
Expand Down
2 changes: 1 addition & 1 deletion src/planners/plan_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::datavalues::DataSchemaRef;
use crate::error::FuseQueryResult;
use crate::planners::{ExpressionPlan, PlanNode};

#[derive(Clone)]
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct FilterPlan {
/// The predicate expression, which must have Boolean type.
pub predicate: ExpressionPlan,
Expand Down
2 changes: 1 addition & 1 deletion src/planners/plan_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::datavalues::DataSchemaRef;
use crate::error::FuseQueryResult;
use crate::planners::PlanNode;

#[derive(Clone)]
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct LimitPlan {
/// The limit
pub n: usize,
Expand Down
2 changes: 1 addition & 1 deletion src/planners/plan_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::planners::{
ProjectionPlan, ReadDataSourcePlan, ScanPlan, SelectPlan, SettingPlan, StagePlan,
};

#[derive(Clone)]
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub enum PlanNode {
Empty(EmptyPlan),
Stage(StagePlan),
Expand Down
2 changes: 1 addition & 1 deletion src/planners/plan_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::planners::{ExpressionPlan, PlanNode};

/// Evaluates an arbitrary list of expressions (essentially a
/// SELECT with an expression list) on its input.
#[derive(Clone)]
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct ProjectionPlan {
/// The list of expressions
pub expr: Vec<ExpressionPlan>,
Expand Down
2 changes: 1 addition & 1 deletion src/planners/plan_read_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use crate::datasources::{Partitions, Statistics};
use crate::datavalues::DataSchemaRef;

#[derive(Clone)]
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct ReadDataSourcePlan {
pub db: String,
pub table: String,
Expand Down
2 changes: 1 addition & 1 deletion src/planners/plan_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use crate::datavalues::DataSchemaRef;
use crate::planners::ExpressionPlan;

#[derive(Clone)]
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct ScanPlan {
/// The name of the schema
pub schema_name: String,
Expand Down
2 changes: 1 addition & 1 deletion src/planners/plan_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::datavalues::DataSchemaRef;
use crate::error::FuseQueryResult;
use crate::planners::PlanNode;

#[derive(Clone)]
#[derive(serde::Serialize, serde::Deserialize, Clone)]
pub struct SelectPlan {
pub input: Arc<PlanNode>,
}
Expand Down

0 comments on commit f790885

Please sign in to comment.