Skip to content

Commit

Permalink
feat(cubestore): Hash join support
Browse files Browse the repository at this point in the history
  • Loading branch information
paveltiunov authored and ovr committed Dec 1, 2020
1 parent b4685dd commit 8b1a5da
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 28 deletions.
51 changes: 29 additions & 22 deletions rust/cubestore/src/queryplanner/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::time::SystemTime;
use arrow::record_batch::RecordBatch;
use crate::table::{Row, TableValue, TimestampValue};
use arrow::array::{UInt64Array, Int64Array, Float64Array, TimestampMicrosecondArray, TimestampNanosecondArray, StringArray, Array, BooleanArray};
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use async_trait::async_trait;
use mockall::automock;
use log::{debug, warn, trace};
Expand All @@ -37,6 +37,7 @@ use std::fmt::Formatter;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::sort::SortExec;
use datafusion::physical_plan::limit::GlobalLimitExec;
use datafusion::physical_plan::hash_join::HashJoinExec;

#[automock]
#[async_trait]
Expand All @@ -54,7 +55,7 @@ pub struct QueryExecutorImpl;
impl QueryExecutor for QueryExecutorImpl {
async fn execute_plan(&self, plan: SerializedPlan, remote_to_local_names: HashMap<String, String>) -> Result<DataFrame, CubeError> {
let plan_to_move = plan.logical_plan();
let ctx = self.execution_context(plan.index_snapshots(), remote_to_local_names, None)?;
let ctx = self.execution_context(plan.index_snapshots(), remote_to_local_names, HashSet::new())?;
let plan_ctx = ctx.clone();

let physical_plan = tokio::task::spawn_blocking(move || {
Expand All @@ -74,7 +75,7 @@ impl QueryExecutor for QueryExecutorImpl {

async fn execute_router_plan(&self, plan: SerializedPlan, cluster: Arc<dyn Cluster>) -> Result<DataFrame, CubeError> {
let plan_to_move = plan.logical_plan();
let ctx = self.execution_context(plan.index_snapshots(), HashMap::new(), None)?;
let ctx = self.execution_context(plan.index_snapshots(), HashMap::new(), HashSet::new())?;
let plan_ctx = ctx.clone();

let serialized_plan = Arc::new(plan);
Expand All @@ -97,7 +98,7 @@ impl QueryExecutor for QueryExecutorImpl {

async fn execute_worker_plan(&self, plan: SerializedPlan, remote_to_local_names: HashMap<String, String>) -> Result<Vec<RecordBatch>, CubeError> {
let plan_to_move = plan.logical_plan();
let ctx = self.execution_context(plan.index_snapshots(), remote_to_local_names, plan.partition_id_to_execute())?;
let ctx = self.execution_context(plan.index_snapshots(), remote_to_local_names, plan.partition_ids_to_execute())?;
let plan_ctx = ctx.clone();

let physical_plan = plan_ctx.create_physical_plan(&plan_to_move)?;
Expand All @@ -116,11 +117,11 @@ impl QueryExecutor for QueryExecutorImpl {
}

impl QueryExecutorImpl {
fn execution_context(&self, index_snapshots: &Vec<IndexSnapshot>, remote_to_local_names: HashMap<String, String>, worker_partition_id: Option<u64>) -> Result<Arc<ExecutionContext>, CubeError> {
fn execution_context(&self, index_snapshots: &Vec<IndexSnapshot>, remote_to_local_names: HashMap<String, String>, worker_partition_id: HashSet<u64>) -> Result<Arc<ExecutionContext>, CubeError> {
let mut ctx = ExecutionContext::new();

for row in index_snapshots.iter() {
let provider = CubeTable::try_new(row.clone(), remote_to_local_names.clone(), worker_partition_id)?; // TODO Clone
let provider = CubeTable::try_new(row.clone(), remote_to_local_names.clone(), worker_partition_id.clone())?; // TODO Clone
ctx.register_table(&row.table_name(), Box::new(provider));
}

Expand Down Expand Up @@ -237,7 +238,7 @@ impl QueryExecutorImpl {
cluster,
serialized_plan,
available_nodes,
index_snapshots
index_snapshots,
));
Ok(execution_plan.with_new_children(vec![Arc::new(MergeExec::new(cluster_exec))])?)
} else {
Expand Down Expand Up @@ -268,18 +269,18 @@ impl QueryExecutorImpl {
pub struct CubeTable {
index_snapshot: IndexSnapshot,
remote_to_local_names: HashMap<String, String>,
worker_partition_id: Option<u64>,
worker_partition_ids: HashSet<u64>,
schema: SchemaRef,
}

impl CubeTable {
pub fn try_new(
index_snapshot: IndexSnapshot,
remote_to_local_names: HashMap<String, String>,
worker_partition_id: Option<u64>,
worker_partition_ids: HashSet<u64>,
) -> Result<Self, CubeError> {
let schema = Arc::new(Schema::new(index_snapshot.table().get_row().get_columns().iter().map(|c| c.clone().into()).collect::<Vec<_>>()));
Ok(Self { index_snapshot, schema, remote_to_local_names, worker_partition_id })
Ok(Self { index_snapshot, schema, remote_to_local_names, worker_partition_ids })
}

fn async_scan(&self, projection: &Option<Vec<usize>>, batch_size: usize) -> Result<Arc<dyn ExecutionPlan>, CubeError> {
Expand All @@ -298,11 +299,7 @@ impl CubeTable {
);

for partition_snapshot in partition_snapshots {
if let Some(partition_id) = self.worker_partition_id {
if partition_snapshot.partition().get_id() != partition_id {
continue;
}
} else {
if !self.worker_partition_ids.contains(&partition_snapshot.partition().get_id()) {
continue;
}
let partition = partition_snapshot.partition();
Expand All @@ -329,8 +326,16 @@ impl CubeTable {
partition_execs.push(Arc::new(EmptyExec::new(false, self.schema.clone())));
}

let projected_schema = if let Some(p) = projection {
Arc::new(Schema::new(
p.iter().map(|i| self.schema.field(*i).clone()).collect()
))
} else {
self.schema.clone()
};

let plan = Arc::new(MergeExec::new(Arc::new(
CubeTableExec { schema: self.schema.clone(), partition_execs, index_snapshot: self.index_snapshot.clone() }
CubeTableExec { schema: projected_schema, partition_execs, index_snapshot: self.index_snapshot.clone() }
)));

Ok(plan)
Expand Down Expand Up @@ -388,7 +393,7 @@ impl ExecutionPlan for CubeTableExec {

pub struct ClusterSendExec {
schema: SchemaRef,
partitions: Vec<IdRow<Partition>>,
partitions: Vec<Vec<IdRow<Partition>>>,
cluster: Arc<dyn Cluster>,
available_nodes: Vec<String>,
serialized_plan: Arc<SerializedPlan>,
Expand All @@ -400,13 +405,15 @@ impl ClusterSendExec {
cluster: Arc<dyn Cluster>,
serialized_plan: Arc<SerializedPlan>,
available_nodes: Vec<String>,
index_snapshots: Vec<IndexSnapshot>
index_snapshots: Vec<IndexSnapshot>,
) -> Self {
let partitions = index_snapshots.into_iter()
.map(|index| index.partitions().iter().map(|p| p.partition().clone()).collect::<Vec<_>>())
.multi_cartesian_product()
.collect::<Vec<Vec<_>>>();
Self {
schema,
partitions: index_snapshots.iter().flat_map(
|index_snapshot| index_snapshot.partitions().iter().map(|p| p.partition().clone()).collect::<Vec<_>>()
).collect::<Vec<_>>(),
partitions,
cluster,
available_nodes,
serialized_plan,
Expand Down Expand Up @@ -448,7 +455,7 @@ impl ExecutionPlan for ClusterSendExec {
async fn execute(&self, partition: usize) -> Result<Pin<Box<dyn RecordBatchStream + Send>>, DataFusionError> {
let record_batches = self.cluster.run_select(
self.available_nodes[0].clone(), // TODO find node by partition
self.serialized_plan.with_partition_id_to_execute(self.partitions[partition].get_id()),
self.serialized_plan.with_partition_id_to_execute(self.partitions[partition].iter().map(|p| p.get_id()).collect()),
).await?;
let memory_exec = MemoryExec::try_new(&vec![record_batches], self.schema.clone(), None)?;
memory_exec.execute(0).await
Expand Down
13 changes: 7 additions & 6 deletions rust/cubestore/src/queryplanner/serialized_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ use itertools::Itertools;
use futures::future::BoxFuture;
use crate::CubeError;
use futures::FutureExt;
use std::collections::HashSet;

#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct SerializedPlan {
logical_plan: Arc<SerializedLogicalPlan>,
schema_snapshot: Arc<SchemaSnapshot>,
partition_id_to_execute: Option<u64>
partition_ids_to_execute: HashSet<u64>
}

#[derive(Clone, Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -255,20 +256,20 @@ impl SerializedPlan {
schema_snapshot: Arc::new(SchemaSnapshot {
index_snapshots
}),
partition_id_to_execute: None
partition_ids_to_execute: HashSet::new()
})
}

pub fn with_partition_id_to_execute(&self, partition_id_to_execute: u64) -> Self {
pub fn with_partition_id_to_execute(&self, partition_ids_to_execute: HashSet<u64>) -> Self {
Self {
logical_plan: self.logical_plan.clone(),
schema_snapshot: self.schema_snapshot.clone(),
partition_id_to_execute: Some(partition_id_to_execute)
partition_ids_to_execute
}
}

pub fn partition_id_to_execute(&self) -> Option<u64> {
self.partition_id_to_execute.clone()
pub fn partition_ids_to_execute(&self) -> HashSet<u64> {
self.partition_ids_to_execute.clone()
}

pub fn logical_plan(&self) -> LogicalPlan {
Expand Down
25 changes: 25 additions & 0 deletions rust/cubestore/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,31 @@ mod tests {
}).await;
}

#[tokio::test]
async fn join() {
Config::run_test("join", async move |services| {
let service = services.sql_service;

let _ = service.exec_query("CREATE SCHEMA foo").await.unwrap();

let _ = service.exec_query("CREATE TABLE foo.orders (customer_id text, amount int)").await.unwrap();
let _ = service.exec_query("CREATE TABLE foo.customers (id text, city text, state text)").await.unwrap();

service.exec_query(
"INSERT INTO foo.orders (customer_id, amount) VALUES ('a', 10), ('b', 2), ('b', 3)"
).await.unwrap();

service.exec_query(
"INSERT INTO foo.customers (id, city, state) VALUES ('a', 'San Francisco', 'CA'), ('b', 'New York', 'NY')"
).await.unwrap();

let result = service.exec_query("SELECT city, sum(amount) from foo.orders o JOIN foo.customers c ON o.customer_id = id GROUP BY 1 ORDER BY 2 DESC").await.unwrap();

assert_eq!(result.get_rows()[0], Row::new(vec![TableValue::String("San Francisco".to_string()), TableValue::Int(10)]));
assert_eq!(result.get_rows()[1], Row::new(vec![TableValue::String("New York".to_string()), TableValue::Int(5)]));
}).await;
}

#[tokio::test]
async fn create_schema_if_not_exists() {
Config::run_test("create_schema_if_not_exists", async move |services| {
Expand Down

0 comments on commit 8b1a5da

Please sign in to comment.