Skip to content

Commit

Permalink
fix(cubestore): fully execute a single-node query on a worker (#2288)
Browse files Browse the repository at this point in the history
The router will only pipe the results back to the client.
  • Loading branch information
ilya-biryukov committed Mar 4, 2021
1 parent b2d3777 commit 00156d0
Showing 1 changed file with 82 additions and 27 deletions.
109 changes: 82 additions & 27 deletions rust/cubestore/src/queryplanner/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl QueryExecutor for QueryExecutorImpl {

let physical_plan = plan_ctx.create_physical_plan(&plan_to_move.clone())?;

let worker_plan = self.get_worker_split_plan(physical_plan);
let worker_plan = self.get_worker_split_plan(physical_plan, &plan);

trace!("Partition Query Physical Plan: {:#?}", &worker_plan);

Expand Down Expand Up @@ -193,33 +193,63 @@ impl QueryExecutorImpl {
serialized_plan: Arc<SerializedPlan>,
cluster: Arc<dyn Cluster>,
available_nodes: Vec<String>,
) -> Result<Arc<dyn ExecutionPlan>, CubeError> {
// With a single partition, run the whole computation on a single node.
if Self::is_single_node_plan(&serialized_plan) {
return Ok(self.build_cluster_send(
execution_plan.as_ref(),
serialized_plan,
cluster,
available_nodes,
execution_plan.schema(),
));
}
return self.build_router_split_plan(
execution_plan,
serialized_plan,
cluster,
available_nodes,
);
}

fn is_single_node_plan(p: &SerializedPlan) -> bool {
let indices = p.index_snapshots();
return indices.len() == 1 && indices[0].partitions().len() == 1;
}

fn build_router_split_plan(
&self,
execution_plan: Arc<dyn ExecutionPlan>,
serialized_plan: Arc<SerializedPlan>,
cluster: Arc<dyn Cluster>,
available_nodes: Vec<String>,
) -> Result<Arc<dyn ExecutionPlan>, CubeError> {
if self.has_node::<HashAggregateExec>(execution_plan.clone()) {
self.get_router_split_plan_at(
self.build_router_split_plan_at(
execution_plan,
serialized_plan,
cluster,
available_nodes,
|h| h.as_any().downcast_ref::<HashAggregateExec>().is_some(),
)
} else if self.has_node::<SortExec>(execution_plan.clone()) {
self.get_router_split_plan_at(
self.build_router_split_plan_at(
execution_plan,
serialized_plan,
cluster,
available_nodes,
|h| h.as_any().downcast_ref::<SortExec>().is_some(),
)
} else if self.has_node::<GlobalLimitExec>(execution_plan.clone()) {
self.get_router_split_plan_at(
self.build_router_split_plan_at(
execution_plan,
serialized_plan,
cluster,
available_nodes,
|h| h.as_any().downcast_ref::<GlobalLimitExec>().is_some(),
)
} else {
self.get_router_split_plan_at(
self.build_router_split_plan_at(
execution_plan,
serialized_plan,
cluster,
Expand All @@ -232,25 +262,36 @@ impl QueryExecutorImpl {
fn get_worker_split_plan(
&self,
execution_plan: Arc<dyn ExecutionPlan>,
serialized_plan: &SerializedPlan,
) -> Arc<dyn ExecutionPlan> {
if Self::is_single_node_plan(serialized_plan) {
return execution_plan;
}
return self.build_worker_split_plan(execution_plan);
}

fn build_worker_split_plan(
&self,
execution_plan: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
if self.has_node::<HashAggregateExec>(execution_plan.clone()) {
self.get_worker_split_plan_at(execution_plan, |h| {
self.build_worker_split_plan_at(execution_plan, |h| {
h.as_any().downcast_ref::<HashAggregateExec>().is_some()
})
} else if self.has_node::<SortExec>(execution_plan.clone()) {
self.get_worker_split_plan_at(execution_plan, |h| {
self.build_worker_split_plan_at(execution_plan, |h| {
h.as_any().downcast_ref::<SortExec>().is_some()
})
} else if self.has_node::<GlobalLimitExec>(execution_plan.clone()) {
self.get_worker_split_plan_at(execution_plan, |h| {
self.build_worker_split_plan_at(execution_plan, |h| {
h.as_any().downcast_ref::<GlobalLimitExec>().is_some()
})
} else {
self.get_worker_split_plan_at(execution_plan, |_| true)
self.build_worker_split_plan_at(execution_plan, |_| true)
}
}

fn get_worker_split_plan_at(
fn build_worker_split_plan_at(
&self,
execution_plan: Arc<dyn ExecutionPlan>,
split_at_fn: impl Fn(Arc<dyn ExecutionPlan>) -> bool,
Expand All @@ -264,11 +305,11 @@ impl QueryExecutorImpl {
if split_at_fn(execution_plan.clone()) {
children[0].clone()
} else {
self.get_worker_split_plan(children[0].clone())
self.build_worker_split_plan(children[0].clone())
}
}

fn get_router_split_plan_at(
fn build_router_split_plan_at(
&self,
execution_plan: Arc<dyn ExecutionPlan>,
serialized_plan: Arc<SerializedPlan>,
Expand All @@ -290,7 +331,7 @@ impl QueryExecutorImpl {
.children()
.iter()
.map(move |c| {
self.get_router_split_plan(
self.build_router_split_plan(
c.clone(),
serialized_plan.clone(),
cluster.clone(),
Expand All @@ -310,24 +351,38 @@ impl QueryExecutorImpl {
available_nodes: Vec<String>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>, CubeError> {
let union_snapshots = self.union_snapshots_from_cube_table(execution_plan.clone());
Ok(
execution_plan.with_new_children(vec![self.build_cluster_send(
execution_plan.as_ref(),
serialized_plan,
cluster,
available_nodes,
children[0].schema(),
)])?,
)
}

fn build_cluster_send(
&self,
source: &dyn ExecutionPlan,
serialized_plan: Arc<SerializedPlan>,
cluster: Arc<dyn Cluster>,
available_nodes: Vec<String>,
schema: DFSchemaRef,
) -> Arc<dyn ExecutionPlan> {
let union_snapshots = self.union_snapshots_from_cube_table(source);
if !union_snapshots.is_empty() {
let cluster_exec = Arc::new(ClusterSendExec::new(
children[0].schema(),
schema,
cluster,
serialized_plan,
available_nodes,
union_snapshots,
));
Ok(execution_plan.with_new_children(vec![Arc::new(MergeExec::new(cluster_exec))])?)
Arc::new(MergeExec::new(cluster_exec))
} else {
// TODO .to_schema_ref()
Ok(
execution_plan.with_new_children(vec![Arc::new(EmptyExec::new(
false,
children[0].schema().to_schema_ref(),
))])?,
)
Arc::new(EmptyExec::new(false, schema.to_schema_ref()))
}
}

Expand All @@ -345,36 +400,36 @@ impl QueryExecutorImpl {

fn union_snapshots_from_cube_table(
&self,
execution_plan: Arc<dyn ExecutionPlan>,
execution_plan: &dyn ExecutionPlan,
) -> Vec<Vec<IndexSnapshot>> {
if let Some(cube_table) = execution_plan.as_any().downcast_ref::<CubeTableExec>() {
vec![vec![cube_table.index_snapshot.clone()]]
} else if let Some(union_exec) = execution_plan.as_any().downcast_ref::<UnionExec>() {
vec![union_exec
.children()
.iter()
.flat_map(|e| self.index_snapshots_from_cube_table(e.clone()))
.flat_map(|e| self.index_snapshots_from_cube_table(e.as_ref()))
.collect::<Vec<_>>()]
} else {
execution_plan
.children()
.iter()
.flat_map(|e| self.union_snapshots_from_cube_table(e.clone()))
.flat_map(|e| self.union_snapshots_from_cube_table(e.as_ref()))
.collect::<Vec<_>>()
}
}

fn index_snapshots_from_cube_table(
&self,
execution_plan: Arc<dyn ExecutionPlan>,
execution_plan: &dyn ExecutionPlan,
) -> Vec<IndexSnapshot> {
if let Some(cube_table) = execution_plan.as_any().downcast_ref::<CubeTableExec>() {
vec![cube_table.index_snapshot.clone()]
} else {
execution_plan
.children()
.iter()
.flat_map(|e| self.index_snapshots_from_cube_table(e.clone()))
.flat_map(|e| self.index_snapshots_from_cube_table(e.as_ref()))
.collect::<Vec<_>>()
}
}
Expand Down

0 comments on commit 00156d0

Please sign in to comment.