From 102c64120e2488a6ba2eff960d674cd5aedb9e8f Mon Sep 17 00:00:00 2001 From: Pavel Tiunov Date: Sun, 15 Nov 2020 12:12:35 -0800 Subject: [PATCH] feat(cubestore): Distributed query execution --- rust/cubestore/src/cluster/mod.rs | 36 +- rust/cubestore/src/config/mod.rs | 4 +- rust/cubestore/src/lib.rs | 2 +- .../src/queryplanner/query_executor.rs | 337 +++++++++++++++++- .../src/queryplanner/serialized_plan.rs | 24 +- rust/cubestore/src/sql/mod.rs | 13 +- 6 files changed, 367 insertions(+), 49 deletions(-) diff --git a/rust/cubestore/src/cluster/mod.rs b/rust/cubestore/src/cluster/mod.rs index 946a194ecf61..25e9a89a9c75 100644 --- a/rust/cubestore/src/cluster/mod.rs +++ b/rust/cubestore/src/cluster/mod.rs @@ -16,7 +16,7 @@ use std::path::Path; use tokio::io::AsyncWriteExt; use std::time::{SystemTime}; use tokio::{fs, time}; -use crate::store::{ChunkDataStore, DataFrame}; +use crate::store::{ChunkDataStore}; use crate::metastore::{RowKey, TableId, MetaStore, IdRow}; use log::{info, error, debug}; use crate::store::compaction::CompactionService; @@ -31,15 +31,16 @@ use serde::{Deserialize, Serialize}; use tokio::runtime::Handle; use crate::queryplanner::serialized_plan::{SerializedPlan}; use crate::config::{Config, ConfigObj}; -use crate::queryplanner::query_executor::QueryExecutor; +use crate::queryplanner::query_executor::{QueryExecutor, SerializedRecordBatchStream}; use crate::cluster::worker_pool::{WorkerPool, MessageProcessor}; +use arrow::record_batch::RecordBatch; #[automock] #[async_trait] pub trait Cluster: Send + Sync { async fn notify_job_runner(&self, node_name: String) -> Result<(), CubeError>; - async fn run_select(&self, node_name: String, plan_node: SerializedPlan) -> Result; + async fn run_select(&self, node_name: String, plan_node: SerializedPlan) -> Result, CubeError>; async fn available_nodes(&self) -> Result, CubeError>; @@ -47,8 +48,6 @@ pub trait Cluster: Send + Sync { async fn download(&self, remote_path: &str) -> Result; - fn get_chunk_store(&self) -> &Arc; - async fn wait_for_job_result(&self, row_key: RowKey, job_type: JobType) -> Result; } @@ -68,13 +67,12 @@ pub struct ClusterImpl { connect_timeout: Duration, server_name: String, server_addresses: Vec, - connected_nodes: Vec, job_notify: Arc, event_sender: Sender, jobs_enabled: Arc>, // used just to hold a reference so event_sender won't be collected _receiver: Receiver, - select_process_pool: RwLock>>>, + select_process_pool: RwLock>>>, config_obj: Arc, query_executor: Arc, } @@ -86,16 +84,16 @@ pub enum WorkerMessage { pub struct WorkerProcessor; -impl MessageProcessor for WorkerProcessor { - fn process(args: WorkerMessage) -> Result { +impl MessageProcessor for WorkerProcessor { + fn process(args: WorkerMessage) -> Result { match args { WorkerMessage::Select(plan_node, remote_to_local_names) => { debug!("Running select in worker started: {:?}", plan_node); let handle = Handle::current(); let plan_node_to_send = plan_node.clone(); - let res = handle.block_on(async move { Config::current_worker_services().query_executor.execute_plan(plan_node_to_send, remote_to_local_names).await }); + let res = handle.block_on(async move { Config::current_worker_services().query_executor.execute_worker_plan(plan_node_to_send, remote_to_local_names).await }); debug!("Running select in worker completed: {:?}", plan_node); - res + SerializedRecordBatchStream::write(res?) } } } @@ -128,7 +126,7 @@ impl Cluster for ClusterImpl { } } - async fn run_select(&self, node_name: String, plan_node: SerializedPlan) -> Result { + async fn run_select(&self, node_name: String, plan_node: SerializedPlan) -> Result, CubeError> { if self.server_name == node_name { // TODO timeout config timeout(Duration::from_secs(60), self.run_local_select(plan_node)).await? @@ -138,7 +136,7 @@ impl Cluster for ClusterImpl { } async fn available_nodes(&self) -> Result, CubeError> { - Ok(self.connected_nodes.clone()) + Ok(vec![self.server_name.to_string()]) } fn server_name(&self) -> &str { @@ -149,10 +147,6 @@ impl Cluster for ClusterImpl { self.remote_fs.download_file(remote_path).await } - fn get_chunk_store(&self) -> &Arc { - &self.chunk_store - } - async fn wait_for_job_result(&self, row_key: RowKey, job_type: JobType) -> Result { let mut receiver = self.event_sender.subscribe(); loop { @@ -294,7 +288,6 @@ impl ClusterImpl { compaction_service, import_service, meta_store, - connected_nodes: Vec::new(), job_notify: Arc::new(Notify::new()), event_sender: sender, jobs_enabled: Arc::new(RwLock::new(true)), @@ -339,7 +332,7 @@ impl ClusterImpl { Ok(()) } - async fn run_local_select(&self, plan_node: SerializedPlan) -> Result { + async fn run_local_select(&self, plan_node: SerializedPlan) -> Result, CubeError> { let start = SystemTime::now(); debug!("Running select: {:?}", plan_node); let to_download = plan_node.files_to_download(); @@ -354,10 +347,11 @@ impl ClusterImpl { let serialized_plan_node = plan_node.clone(); pool.process(WorkerMessage::Select(serialized_plan_node, remote_to_local_names)).await } else { - self.query_executor.execute_plan(plan_node.clone(), remote_to_local_names).await + // TODO optimize for no double conversion + SerializedRecordBatchStream::write(self.query_executor.execute_worker_plan(plan_node.clone(), remote_to_local_names).await?) }; info!("Running select completed ({:?})", start.elapsed()?); - res + res?.read() } pub async fn try_to_connect(&mut self) -> Result<(), CubeError> { diff --git a/rust/cubestore/src/config/mod.rs b/rust/cubestore/src/config/mod.rs index ab39c9c11780..e7328f294e91 100644 --- a/rust/cubestore/src/config/mod.rs +++ b/rust/cubestore/src/config/mod.rs @@ -174,10 +174,10 @@ impl Config { meta_store.clone(), import_service.clone(), self.config_obj.clone(), - query_executor + query_executor.clone() ); - let sql_service = SqlServiceImpl::new(meta_store.clone(), wal_store.clone(), query_planner.clone(), cluster.clone()); + let sql_service = SqlServiceImpl::new(meta_store.clone(), wal_store.clone(), query_planner.clone(), query_executor.clone(), cluster.clone()); let scheduler = SchedulerImpl::new(meta_store.clone(), cluster.clone(), remote_fs.clone(), event_receiver); CubeServices { diff --git a/rust/cubestore/src/lib.rs b/rust/cubestore/src/lib.rs index 7a19b2136209..f4fb6b1260ca 100644 --- a/rust/cubestore/src/lib.rs +++ b/rust/cubestore/src/lib.rs @@ -142,7 +142,7 @@ impl From for CubeError { impl From for CubeError { fn from(v: datafusion::error::DataFusionError) -> Self { - CubeError::internal(v.to_string()) + CubeError::from_error(v) } } diff --git a/rust/cubestore/src/queryplanner/query_executor.rs b/rust/cubestore/src/queryplanner/query_executor.rs index c161264abdc4..6e10d1b506ea 100644 --- a/rust/cubestore/src/queryplanner/query_executor.rs +++ b/rust/cubestore/src/queryplanner/query_executor.rs @@ -11,7 +11,7 @@ use std::any::Any; use datafusion::error::DataFusionError; use std::pin::Pin; use datafusion::datasource::TableProvider; -use crate::metastore::{Column, Index, IdRow, ColumnType}; +use crate::metastore::{Column, Index, IdRow, ColumnType, Partition}; use itertools::Itertools; use crate::metastore::table::Table; use std::time::SystemTime; @@ -21,15 +21,31 @@ use arrow::array::{UInt64Array, Int64Array, Float64Array, TimestampMicrosecondAr use std::collections::HashMap; use async_trait::async_trait; use mockall::automock; -use log::{debug, warn}; +use log::{debug, warn, trace}; use datafusion::{error::{Result as DFResult}}; use bigdecimal::BigDecimal; use std::convert::TryFrom; +use datafusion::physical_plan::hash_aggregate::HashAggregateExec; +use crate::cluster::Cluster; +use datafusion::physical_plan::memory::{MemoryExec}; +use serde_derive::{Deserialize, Serialize}; +use arrow::ipc::writer::{MemStreamWriter}; +use std::io::Cursor; +use arrow::ipc::reader::{StreamReader}; +use core::{fmt}; +use std::fmt::Formatter; +use datafusion::physical_plan::empty::EmptyExec; +use datafusion::physical_plan::sort::SortExec; +use datafusion::physical_plan::limit::GlobalLimitExec; #[automock] #[async_trait] pub trait QueryExecutor: Send + Sync { async fn execute_plan(&self, plan: SerializedPlan, remote_to_local_names: HashMap) -> Result; + + async fn execute_router_plan(&self, plan: SerializedPlan, cluster: Arc) -> Result; + + async fn execute_worker_plan(&self, plan: SerializedPlan, remote_to_local_names: HashMap) -> Result, CubeError>; } pub struct QueryExecutorImpl; @@ -38,7 +54,7 @@ pub struct QueryExecutorImpl; impl QueryExecutor for QueryExecutorImpl { async fn execute_plan(&self, plan: SerializedPlan, remote_to_local_names: HashMap) -> Result { let plan_to_move = plan.logical_plan(); - let ctx = self.execution_context(plan.index_snapshots(), remote_to_local_names)?; + let ctx = self.execution_context(plan.index_snapshots(), remote_to_local_names, None)?; let plan_ctx = ctx.clone(); let physical_plan = tokio::task::spawn_blocking(move || { @@ -55,34 +71,215 @@ impl QueryExecutor for QueryExecutorImpl { let data_frame = batch_to_dataframe(&results)?; Ok(data_frame) } + + async fn execute_router_plan(&self, plan: SerializedPlan, cluster: Arc) -> Result { + let plan_to_move = plan.logical_plan(); + let ctx = self.execution_context(plan.index_snapshots(), HashMap::new(), None)?; + let plan_ctx = ctx.clone(); + + let serialized_plan = Arc::new(plan); + let physical_plan = plan_ctx.create_physical_plan(&plan_to_move)?; + let available_nodes = cluster.available_nodes().await?; + let split_plan = self.get_router_split_plan(physical_plan, serialized_plan.clone(), cluster, available_nodes)?; + + let execution_time = SystemTime::now(); + let results = ctx.collect(split_plan.clone()).await?; + debug!("Query data processing time: {:?}", execution_time.elapsed()?); + if execution_time.elapsed()?.as_millis() > 200 { + warn!("Slow Query ({:?}):\n{:#?}", execution_time.elapsed()?, serialized_plan.logical_plan()); + debug!("Slow Query Physical Plan ({:?}): {:#?}", execution_time.elapsed()?, &split_plan); + } else { + trace!("Router Query Physical Plan ({:?}): {:#?}", execution_time.elapsed()?, &split_plan); + } + let data_frame = batch_to_dataframe(&results)?; + Ok(data_frame) + } + + async fn execute_worker_plan(&self, plan: SerializedPlan, remote_to_local_names: HashMap) -> Result, CubeError> { + let plan_to_move = plan.logical_plan(); + let ctx = self.execution_context(plan.index_snapshots(), remote_to_local_names, plan.partition_id_to_execute())?; + let plan_ctx = ctx.clone(); + + let physical_plan = plan_ctx.create_physical_plan(&plan_to_move)?; + + let worker_plan = self.get_worker_split_plan(physical_plan); + + let execution_time = SystemTime::now(); + let results = ctx.collect(worker_plan.clone()).await; + debug!("Partition Query data processing time: {:?}", execution_time.elapsed()?); + if execution_time.elapsed()?.as_millis() > 200 || results.is_err() { + warn!("Slow Partition Query ({:?}):\n{:#?}", execution_time.elapsed()?, plan.logical_plan()); + debug!("Slow Partition Query Physical Plan ({:?}): {:#?}", execution_time.elapsed()?, &worker_plan); + } + Ok(results?) + } } impl QueryExecutorImpl { - fn execution_context(&self, index_snapshots: &Vec, remote_to_local_names: HashMap) -> Result, CubeError> { + fn execution_context(&self, index_snapshots: &Vec, remote_to_local_names: HashMap, worker_partition_id: Option) -> Result, CubeError> { let mut ctx = ExecutionContext::new(); for row in index_snapshots.iter() { - let provider = CubeTable::try_new(row.clone(), remote_to_local_names.clone())?; // TODO Clone + let provider = CubeTable::try_new(row.clone(), remote_to_local_names.clone(), worker_partition_id)?; // TODO Clone ctx.register_table(&row.table_name(), Box::new(provider)); } Ok(Arc::new(ctx)) } + + fn get_router_split_plan( + &self, + execution_plan: Arc, + serialized_plan: Arc, + cluster: Arc, + available_nodes: Vec, + ) -> Result, CubeError> { + if self.has_node::(execution_plan.clone()) { + self.get_router_split_plan_at( + execution_plan, + serialized_plan, + cluster, + available_nodes, + |h| h.as_any().downcast_ref::().is_some() + ) + } else if self.has_node::(execution_plan.clone()) { + self.get_router_split_plan_at( + execution_plan, + serialized_plan, + cluster, + available_nodes, + |h| h.as_any().downcast_ref::().is_some() + ) + } else if self.has_node::(execution_plan.clone()) { + self.get_router_split_plan_at( + execution_plan, + serialized_plan, + cluster, + available_nodes, + |h| h.as_any().downcast_ref::().is_some() + ) + } else { + self.get_router_split_plan_at( + execution_plan, + serialized_plan, + cluster, + available_nodes, + |_| true + ) + } + } + + fn get_worker_split_plan( + &self, + execution_plan: Arc, + ) -> Arc { + if self.has_node::(execution_plan.clone()) { + self.get_worker_split_plan_at( + execution_plan, + |h| h.as_any().downcast_ref::().is_some() + ) + } else if self.has_node::(execution_plan.clone()) { + self.get_worker_split_plan_at( + execution_plan, + |h| h.as_any().downcast_ref::().is_some() + ) + } else if self.has_node::(execution_plan.clone()) { + self.get_worker_split_plan_at( + execution_plan, + |h| h.as_any().downcast_ref::().is_some() + ) + } else { + self.get_worker_split_plan_at( + execution_plan, + |_| true + ) + } + } + + fn get_worker_split_plan_at( + &self, + execution_plan: Arc, + split_at_fn: impl Fn(Arc) -> bool + ) -> Arc { + let children = execution_plan.children(); + assert!(children.len() == 1, "Only one child is expected for {:?}", &execution_plan); + if split_at_fn(execution_plan.clone()) { + children[0].clone() + } else { + self.get_worker_split_plan(children[0].clone()) + } + } + + fn get_router_split_plan_at( + &self, + execution_plan: Arc, + serialized_plan: Arc, + cluster: Arc, + available_nodes: Vec, + split_at_fn: impl Fn(Arc) -> bool + ) -> Result, CubeError> { + if split_at_fn(execution_plan.clone()) { + let children = execution_plan.children(); + self.wrap_with_cluster_send(execution_plan, serialized_plan, cluster, available_nodes, children) + } else { + let children = execution_plan.children().iter() + .map(move |c| self.get_router_split_plan(c.clone(), serialized_plan.clone(), cluster.clone(), available_nodes.clone())) + .collect::, _>>()?; + Ok(execution_plan.with_new_children(children)?) + } + } + + fn wrap_with_cluster_send(&self, execution_plan: Arc, serialized_plan: Arc, cluster: Arc, available_nodes: Vec, children: Vec>) -> Result, CubeError> { + let index_snapshots = self.index_snapshots_from_cube_table(execution_plan.clone()); + if index_snapshots.len() > 0 { + let cluster_exec = Arc::new(ClusterSendExec::new( + children[0].schema(), + cluster, + serialized_plan, + available_nodes, + index_snapshots + )); + Ok(execution_plan.with_new_children(vec![Arc::new(MergeExec::new(cluster_exec))])?) + } else { + Ok(execution_plan.with_new_children(vec![Arc::new(EmptyExec::new(children[0].schema()))])?) + } + } + + fn has_node(&self, execution_plan: Arc) -> bool { + if execution_plan.as_any().downcast_ref::().is_some() { + true + } else { + execution_plan.children().into_iter().find(|c| self.has_node::(c.clone())).is_some() + } + } + + fn index_snapshots_from_cube_table( + &self, + execution_plan: Arc, + ) -> Vec { + if let Some(cube_table) = execution_plan.as_any().downcast_ref::() { + vec![cube_table.index_snapshot.clone()] + } else { + execution_plan.children().iter().flat_map(|e| self.index_snapshots_from_cube_table(e.clone())).collect::>() + } + } } pub struct CubeTable { index_snapshot: IndexSnapshot, remote_to_local_names: HashMap, - schema: SchemaRef + worker_partition_id: Option, + schema: SchemaRef, } impl CubeTable { pub fn try_new( index_snapshot: IndexSnapshot, remote_to_local_names: HashMap, + worker_partition_id: Option, ) -> Result { let schema = Arc::new(Schema::new(index_snapshot.table().get_row().get_columns().iter().map(|c| c.clone().into()).collect::>())); - Ok(Self { index_snapshot, schema, remote_to_local_names }) + Ok(Self { index_snapshot, schema, remote_to_local_names, worker_partition_id }) } fn async_scan(&self, projection: &Option>, batch_size: usize) -> Result, CubeError> { @@ -92,7 +289,6 @@ impl CubeTable { let mut partition_execs = Vec::>::new(); - let schema = Arc::new(Schema::new(index.get_row().get_columns().iter().map(|c| c.clone().into()).collect::>())); let mapped_projection = projection.as_ref() .map( |p| CubeTable::project_to_index_positions(&CubeTable::project_to_table(&table, p), &index) @@ -102,14 +298,20 @@ impl CubeTable { ); for partition_snapshot in partition_snapshots { - let mut execs = Vec::new(); + if let Some(partition_id) = self.worker_partition_id { + if partition_snapshot.partition().get_id() != partition_id { + continue; + } + } else { + continue; + } let partition = partition_snapshot.partition(); if let Some(remote_path) = partition.get_row().get_full_name(partition.get_id()) { let local_path = self.remote_to_local_names.get(remote_path.as_str()) .expect(format!("Missing remote path {}", remote_path).as_str()); let arc: Arc = Arc::new(ParquetExec::try_new(&local_path, mapped_projection.clone(), batch_size)?); - execs.push(arc); + partition_execs.push(arc); } // TODO look up in not repartitioned parent chunks @@ -119,17 +321,16 @@ impl CubeTable { let local_path = self.remote_to_local_names.get(&remote_path) .expect(format!("Missing remote path {}", remote_path).as_str()); let node = Arc::new(ParquetExec::try_new(local_path, mapped_projection.clone(), batch_size)?); - execs.push(node); + partition_execs.push(node); } + } - partition_execs.push(Arc::new(MergeExec::new(Arc::new(CubeTableExec { - schema: schema.clone(), - partition_execs: execs, - })))) + if partition_execs.len() == 0 { + partition_execs.push(Arc::new(EmptyExec::new(self.schema.clone()))); } let plan = Arc::new(MergeExec::new(Arc::new( - CubeTableExec { schema: self.schema.clone(), partition_execs } + CubeTableExec { schema: self.schema.clone(), partition_execs, index_snapshot: self.index_snapshot.clone() } ))); Ok(plan) @@ -150,6 +351,7 @@ impl CubeTable { #[derive(Debug)] pub struct CubeTableExec { schema: SchemaRef, + index_snapshot: IndexSnapshot, partition_execs: Vec>, } @@ -175,6 +377,7 @@ impl ExecutionPlan for CubeTableExec { Ok(Arc::new(CubeTableExec { schema: self.schema.clone(), partition_execs: children, + index_snapshot: self.index_snapshot.clone() })) } @@ -183,6 +386,85 @@ impl ExecutionPlan for CubeTableExec { } } +pub struct ClusterSendExec { + schema: SchemaRef, + partitions: Vec>, + cluster: Arc, + available_nodes: Vec, + serialized_plan: Arc, +} + +impl ClusterSendExec { + pub fn new( + schema: SchemaRef, + cluster: Arc, + serialized_plan: Arc, + available_nodes: Vec, + index_snapshots: Vec + ) -> Self { + Self { + schema, + partitions: index_snapshots.iter().flat_map( + |index_snapshot| index_snapshot.partitions().iter().map(|p| p.partition().clone()).collect::>() + ).collect::>(), + cluster, + available_nodes, + serialized_plan, + } + } +} + +#[async_trait] +impl ExecutionPlan for ClusterSendExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(self.partitions.len()) + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children(&self, children: Vec>) -> Result, DataFusionError> { + if children.len() != 0 { + panic!("Expected to be a leaf node"); + } + Ok(Arc::new(ClusterSendExec { + schema: self.schema.clone(), + partitions: self.partitions.clone(), + cluster: self.cluster.clone(), + available_nodes: self.available_nodes.clone(), + serialized_plan: self.serialized_plan.clone(), + })) + } + + async fn execute(&self, partition: usize) -> Result>, DataFusionError> { + let record_batches = self.cluster.run_select( + self.available_nodes[0].clone(), // TODO find node by partition + self.serialized_plan.with_partition_id_to_execute(self.partitions[partition].get_id()), + ).await?; + let memory_exec = MemoryExec::try_new(&vec![record_batches], self.schema.clone(), None)?; + memory_exec.execute(0).await + } +} + +impl fmt::Debug for ClusterSendExec { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), fmt::Error> { + f.write_fmt(format_args!( + "ClusterSendExec: {:?}: {:?}", + self.schema, + self.partitions + )) + } +} + impl TableProvider for CubeTable { fn schema(&self) -> SchemaRef { self.schema.clone() @@ -288,4 +570,27 @@ pub fn arrow_to_column_type(arrow_type: DataType) -> Result Ok(ColumnType::Int), x => Err(CubeError::internal(format!("unsupported type {:?}", x))), } +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct SerializedRecordBatchStream { + record_batch_file: Vec +} + +impl SerializedRecordBatchStream { + pub fn write(record_batches: Vec) -> Result { + let file = Vec::new(); + let mut writer = MemStreamWriter::try_new(Cursor::new(file), &record_batches[0].schema())?; + for batch in record_batches.iter() { + writer.write(batch)?; + } + let cursor = writer.finish()?; + Ok(Self { record_batch_file: cursor.into_inner() }) + } + + pub fn read(self) -> Result, CubeError> { + let cursor = Cursor::new(self.record_batch_file); + let reader = StreamReader::try_new(cursor)?; + Ok(reader.collect::, _>>()?) + } } \ No newline at end of file diff --git a/rust/cubestore/src/queryplanner/serialized_plan.rs b/rust/cubestore/src/queryplanner/serialized_plan.rs index ec96d51499c6..e22bb5a1e444 100644 --- a/rust/cubestore/src/queryplanner/serialized_plan.rs +++ b/rust/cubestore/src/queryplanner/serialized_plan.rs @@ -14,8 +14,9 @@ use futures::FutureExt; #[derive(Clone, Serialize, Deserialize, Debug)] pub struct SerializedPlan { - logical_plan: SerializedLogicalPlan, - schema_snapshot: SchemaSnapshot, + logical_plan: Arc, + schema_snapshot: Arc, + partition_id_to_execute: Option } #[derive(Clone, Serialize, Deserialize, Debug)] @@ -224,13 +225,26 @@ impl SerializedPlan { let serialized_logical_plan = Self::serialized_logical_plan(&plan); let index_snapshots = Self::index_snapshots_from_plan(Arc::new(plan), meta_store, Vec::new()).await?; Ok(SerializedPlan { - logical_plan: serialized_logical_plan, - schema_snapshot: SchemaSnapshot { + logical_plan: Arc::new(serialized_logical_plan), + schema_snapshot: Arc::new(SchemaSnapshot { index_snapshots - }, + }), + partition_id_to_execute: None }) } + pub fn with_partition_id_to_execute(&self, partition_id_to_execute: u64) -> Self { + Self { + logical_plan: self.logical_plan.clone(), + schema_snapshot: self.schema_snapshot.clone(), + partition_id_to_execute: Some(partition_id_to_execute) + } + } + + pub fn partition_id_to_execute(&self) -> Option { + self.partition_id_to_execute.clone() + } + pub fn logical_plan(&self) -> LogicalPlan { self.logical_plan.logical_plan() } diff --git a/rust/cubestore/src/sql/mod.rs b/rust/cubestore/src/sql/mod.rs index 1704a29759a1..67945461e3d6 100644 --- a/rust/cubestore/src/sql/mod.rs +++ b/rust/cubestore/src/sql/mod.rs @@ -20,6 +20,7 @@ use datafusion::sql::parser::{Statement as DFStatement}; use futures::future::join_all; use crate::metastore::job::JobType; use datafusion::physical_plan::datetime_expressions::string_to_timestamp_nanos; +use crate::queryplanner::query_executor::QueryExecutor; #[async_trait] pub trait SqlService: Send + Sync { @@ -30,6 +31,7 @@ pub struct SqlServiceImpl { db: Arc, wal_store: Arc, query_planner: Arc, + query_executor: Arc, cluster: Arc, } @@ -38,9 +40,10 @@ impl SqlServiceImpl { db: Arc, wal_store: Arc, query_planner: Arc, + query_executor: Arc, cluster: Arc ) -> Arc { - Arc::new(SqlServiceImpl { db, wal_store, query_planner, cluster }) + Arc::new(SqlServiceImpl { db, wal_store, query_planner, query_executor, cluster }) } async fn create_schema(&self, name: String) -> Result, CubeError> { @@ -236,7 +239,7 @@ impl SqlService for SqlServiceImpl { // TODO distribute and combine let res = match logical_plan { QueryPlan::Meta(logical_plan) => self.query_planner.execute_meta_plan(logical_plan).await?, - QueryPlan::Select(serialized) => self.cluster.run_select(self.cluster.server_name().to_string(), serialized).await? + QueryPlan::Select(serialized) => self.query_executor.execute_router_plan(serialized, self.cluster.clone()).await? }; return Ok(res); } @@ -369,6 +372,7 @@ mod tests { use crate::remotefs::LocalDirRemoteFs; use std::path::PathBuf; use crate::queryplanner::MockQueryPlanner; + use crate::queryplanner::query_executor::MockQueryExecutor; use crate::config::Config; use crate::metastore::{MetaStoreEvent, RocksMetaStore}; use crate::metastore::job::JobType; @@ -397,6 +401,7 @@ mod tests { meta_store, store, Arc::new(MockQueryPlanner::new()), + Arc::new(MockQueryExecutor::new()), Arc::new(MockCluster::new()), ); let i = service.exec_query("CREATE SCHEMA foo").await.unwrap(); @@ -419,7 +424,7 @@ mod tests { let remote_fs = LocalDirRemoteFs::new(PathBuf::from(store_path.clone()), PathBuf::from(remote_store_path.clone())); let meta_store = RocksMetaStore::new(path, remote_fs.clone()); let store = WALStore::new(meta_store.clone(), remote_fs.clone(), 10); - let service = SqlServiceImpl::new(meta_store, store, Arc::new(MockQueryPlanner::new()), Arc::new(MockCluster::new())); + let service = SqlServiceImpl::new(meta_store, store, Arc::new(MockQueryPlanner::new()), Arc::new(MockQueryExecutor::new()), Arc::new(MockCluster::new())); let i = service.exec_query("CREATE SCHEMA Foo").await.unwrap(); assert_eq!(i.get_rows()[0], Row::new(vec![TableValue::Int(1), TableValue::String("Foo".to_string())])); let query = "CREATE TABLE Foo.Persons ( @@ -465,7 +470,7 @@ mod tests { move |k, t| Ok(JobEvent::Success(k, t)) ); - let service = SqlServiceImpl::new(meta_store.clone(), store, Arc::new(MockQueryPlanner::new()), Arc::new(mock_cluster)); + let service = SqlServiceImpl::new(meta_store.clone(), store, Arc::new(MockQueryPlanner::new()), Arc::new(MockQueryExecutor::new()), Arc::new(mock_cluster)); let _ = service.exec_query("CREATE SCHEMA Foo").await.unwrap(); let query = "CREATE TABLE Foo.Persons ( PersonID int,