Skip to content

Commit

Permalink
feat(cubestore): Distribute unions across workers the same way as par…
Browse files Browse the repository at this point in the history
…titions
  • Loading branch information
paveltiunov committed Jan 4, 2021
1 parent 67ffd4d commit 52f8a77
Showing 1 changed file with 30 additions and 8 deletions.
38 changes: 30 additions & 8 deletions rust/cubestore/src/queryplanner/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
use datafusion::physical_plan::limit::GlobalLimitExec;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::merge::MergeExec;
use datafusion::physical_plan::merge::{MergeExec, UnionExec};
use datafusion::physical_plan::merge_sort::MergeSortExec;
use datafusion::physical_plan::parquet::ParquetExec;
use datafusion::physical_plan::sort::SortExec;
Expand Down Expand Up @@ -305,14 +305,14 @@ impl QueryExecutorImpl {
available_nodes: Vec<String>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>, CubeError> {
let index_snapshots = self.index_snapshots_from_cube_table(execution_plan.clone());
if index_snapshots.len() > 0 {
let union_snapshots = self.union_snapshots_from_cube_table(execution_plan.clone());
if !union_snapshots.is_empty() {
let cluster_exec = Arc::new(ClusterSendExec::new(
children[0].schema(),
cluster,
serialized_plan,
available_nodes,
index_snapshots,
union_snapshots,
));
Ok(execution_plan.with_new_children(vec![Arc::new(MergeExec::new(cluster_exec))])?)
} else {
Expand All @@ -338,6 +338,27 @@ impl QueryExecutorImpl {
}
}

fn union_snapshots_from_cube_table(
&self,
execution_plan: Arc<dyn ExecutionPlan>,
) -> Vec<Vec<IndexSnapshot>> {
if let Some(cube_table) = execution_plan.as_any().downcast_ref::<CubeTableExec>() {
vec![vec![cube_table.index_snapshot.clone()]]
} else if let Some(union_exec) = execution_plan.as_any().downcast_ref::<UnionExec>() {
vec![union_exec
.children()
.iter()
.flat_map(|e| self.index_snapshots_from_cube_table(e.clone()))
.collect::<Vec<_>>()]
} else {
execution_plan
.children()
.iter()
.flat_map(|e| self.union_snapshots_from_cube_table(e.clone()))
.collect::<Vec<_>>()
}
}

fn index_snapshots_from_cube_table(
&self,
execution_plan: Arc<dyn ExecutionPlan>,
Expand Down Expand Up @@ -566,17 +587,18 @@ impl ClusterSendExec {
cluster: Arc<dyn Cluster>,
serialized_plan: Arc<SerializedPlan>,
available_nodes: Vec<String>,
index_snapshots: Vec<IndexSnapshot>,
union_snapshots: Vec<Vec<IndexSnapshot>>,
) -> Self {
let partitions = index_snapshots
let to_multiply = union_snapshots
.into_iter()
.map(|index| {
.map(|union| union.iter().flat_map(|index| {
index
.partitions()
.iter()
.map(|p| p.partition().clone())
.collect::<Vec<_>>()
})
}).collect::<Vec<_>>()).collect::<Vec<_>>();
let partitions = to_multiply.into_iter()
.multi_cartesian_product()
.collect::<Vec<Vec<_>>>();
Self {
Expand Down

0 comments on commit 52f8a77

Please sign in to comment.