Skip to content

Commit

Permalink
feat(cubestore): Distributed query execution
Browse files Browse the repository at this point in the history
  • Loading branch information
paveltiunov committed Nov 15, 2020
1 parent cbd5399 commit 102c641
Show file tree
Hide file tree
Showing 6 changed files with 367 additions and 49 deletions.
36 changes: 15 additions & 21 deletions rust/cubestore/src/cluster/mod.rs
Expand Up @@ -16,7 +16,7 @@ use std::path::Path;
use tokio::io::AsyncWriteExt;
use std::time::{SystemTime};
use tokio::{fs, time};
use crate::store::{ChunkDataStore, DataFrame};
use crate::store::{ChunkDataStore};
use crate::metastore::{RowKey, TableId, MetaStore, IdRow};
use log::{info, error, debug};
use crate::store::compaction::CompactionService;
Expand All @@ -31,24 +31,23 @@ use serde::{Deserialize, Serialize};
use tokio::runtime::Handle;
use crate::queryplanner::serialized_plan::{SerializedPlan};
use crate::config::{Config, ConfigObj};
use crate::queryplanner::query_executor::QueryExecutor;
use crate::queryplanner::query_executor::{QueryExecutor, SerializedRecordBatchStream};
use crate::cluster::worker_pool::{WorkerPool, MessageProcessor};
use arrow::record_batch::RecordBatch;

#[automock]
#[async_trait]
pub trait Cluster: Send + Sync {
async fn notify_job_runner(&self, node_name: String) -> Result<(), CubeError>;

async fn run_select(&self, node_name: String, plan_node: SerializedPlan) -> Result<DataFrame, CubeError>;
async fn run_select(&self, node_name: String, plan_node: SerializedPlan) -> Result<Vec<RecordBatch>, CubeError>;

async fn available_nodes(&self) -> Result<Vec<String>, CubeError>;

fn server_name(&self) -> &str;

async fn download(&self, remote_path: &str) -> Result<String, CubeError>;

fn get_chunk_store(&self) -> &Arc<dyn ChunkDataStore>;

async fn wait_for_job_result(&self, row_key: RowKey, job_type: JobType) -> Result<JobEvent, CubeError>;
}

Expand All @@ -68,13 +67,12 @@ pub struct ClusterImpl {
connect_timeout: Duration,
server_name: String,
server_addresses: Vec<String>,
connected_nodes: Vec<String>,
job_notify: Arc<Notify>,
event_sender: Sender<JobEvent>,
jobs_enabled: Arc<RwLock<bool>>,
// used just to hold a reference so event_sender won't be collected
_receiver: Receiver<JobEvent>,
select_process_pool: RwLock<Option<Arc<WorkerPool<WorkerMessage, DataFrame, WorkerProcessor>>>>,
select_process_pool: RwLock<Option<Arc<WorkerPool<WorkerMessage, SerializedRecordBatchStream, WorkerProcessor>>>>,
config_obj: Arc<dyn ConfigObj>,
query_executor: Arc<dyn QueryExecutor>,
}
Expand All @@ -86,16 +84,16 @@ pub enum WorkerMessage {

pub struct WorkerProcessor;

impl MessageProcessor<WorkerMessage, DataFrame> for WorkerProcessor {
fn process(args: WorkerMessage) -> Result<DataFrame, CubeError> {
impl MessageProcessor<WorkerMessage, SerializedRecordBatchStream> for WorkerProcessor {
fn process(args: WorkerMessage) -> Result<SerializedRecordBatchStream, CubeError> {
match args {
WorkerMessage::Select(plan_node, remote_to_local_names) => {
debug!("Running select in worker started: {:?}", plan_node);
let handle = Handle::current();
let plan_node_to_send = plan_node.clone();
let res = handle.block_on(async move { Config::current_worker_services().query_executor.execute_plan(plan_node_to_send, remote_to_local_names).await });
let res = handle.block_on(async move { Config::current_worker_services().query_executor.execute_worker_plan(plan_node_to_send, remote_to_local_names).await });
debug!("Running select in worker completed: {:?}", plan_node);
res
SerializedRecordBatchStream::write(res?)
}
}
}
Expand Down Expand Up @@ -128,7 +126,7 @@ impl Cluster for ClusterImpl {
}
}

async fn run_select(&self, node_name: String, plan_node: SerializedPlan) -> Result<DataFrame, CubeError> {
async fn run_select(&self, node_name: String, plan_node: SerializedPlan) -> Result<Vec<RecordBatch>, CubeError> {
if self.server_name == node_name {
// TODO timeout config
timeout(Duration::from_secs(60), self.run_local_select(plan_node)).await?
Expand All @@ -138,7 +136,7 @@ impl Cluster for ClusterImpl {
}

async fn available_nodes(&self) -> Result<Vec<String>, CubeError> {
Ok(self.connected_nodes.clone())
Ok(vec![self.server_name.to_string()])
}

fn server_name(&self) -> &str {
Expand All @@ -149,10 +147,6 @@ impl Cluster for ClusterImpl {
self.remote_fs.download_file(remote_path).await
}

fn get_chunk_store(&self) -> &Arc<dyn ChunkDataStore> {
&self.chunk_store
}

async fn wait_for_job_result(&self, row_key: RowKey, job_type: JobType) -> Result<JobEvent, CubeError> {
let mut receiver = self.event_sender.subscribe();
loop {
Expand Down Expand Up @@ -294,7 +288,6 @@ impl ClusterImpl {
compaction_service,
import_service,
meta_store,
connected_nodes: Vec::new(),
job_notify: Arc::new(Notify::new()),
event_sender: sender,
jobs_enabled: Arc::new(RwLock::new(true)),
Expand Down Expand Up @@ -339,7 +332,7 @@ impl ClusterImpl {
Ok(())
}

async fn run_local_select(&self, plan_node: SerializedPlan) -> Result<DataFrame, CubeError> {
async fn run_local_select(&self, plan_node: SerializedPlan) -> Result<Vec<RecordBatch>, CubeError> {
let start = SystemTime::now();
debug!("Running select: {:?}", plan_node);
let to_download = plan_node.files_to_download();
Expand All @@ -354,10 +347,11 @@ impl ClusterImpl {
let serialized_plan_node = plan_node.clone();
pool.process(WorkerMessage::Select(serialized_plan_node, remote_to_local_names)).await
} else {
self.query_executor.execute_plan(plan_node.clone(), remote_to_local_names).await
// TODO optimize for no double conversion
SerializedRecordBatchStream::write(self.query_executor.execute_worker_plan(plan_node.clone(), remote_to_local_names).await?)
};
info!("Running select completed ({:?})", start.elapsed()?);
res
res?.read()
}

pub async fn try_to_connect(&mut self) -> Result<(), CubeError> {
Expand Down
4 changes: 2 additions & 2 deletions rust/cubestore/src/config/mod.rs
Expand Up @@ -174,10 +174,10 @@ impl Config {
meta_store.clone(),
import_service.clone(),
self.config_obj.clone(),
query_executor
query_executor.clone()
);

let sql_service = SqlServiceImpl::new(meta_store.clone(), wal_store.clone(), query_planner.clone(), cluster.clone());
let sql_service = SqlServiceImpl::new(meta_store.clone(), wal_store.clone(), query_planner.clone(), query_executor.clone(), cluster.clone());
let scheduler = SchedulerImpl::new(meta_store.clone(), cluster.clone(), remote_fs.clone(), event_receiver);

CubeServices {
Expand Down
2 changes: 1 addition & 1 deletion rust/cubestore/src/lib.rs
Expand Up @@ -142,7 +142,7 @@ impl From<tokio::time::Elapsed> for CubeError {

impl From<datafusion::error::DataFusionError> for CubeError {
fn from(v: datafusion::error::DataFusionError) -> Self {
CubeError::internal(v.to_string())
CubeError::from_error(v)
}
}

Expand Down

0 comments on commit 102c641

Please sign in to comment.