Skip to content

Commit

Permalink
fix(cubestore): choose inplace aggregate in more cases (#2402)
Browse files Browse the repository at this point in the history
Now with tests for the important query.
  • Loading branch information
ilya-biryukov committed Mar 24, 2021
1 parent 38999b0 commit 9ab6559
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 26 deletions.
8 changes: 4 additions & 4 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use crate::queryplanner::planning::WorkerExec;
use crate::queryplanner::query_executor::ClusterSendExec;
use datafusion::error::DataFusionError;
use datafusion::physical_plan::expressions::AliasedSchemaExec;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::hash_aggregate::{AggregateStrategy, HashAggregateExec};
use datafusion::physical_plan::merge::MergeExec;
use datafusion::physical_plan::merge::{MergeExec, UnionExec};
use datafusion::physical_plan::merge_sort::MergeSortExec;
use datafusion::physical_plan::planner::compute_aggregation_strategy;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::ExecutionPlan;
use std::sync::Arc;

Expand Down Expand Up @@ -46,24 +49,33 @@ fn try_regroup_columns(
if p.as_any().is::<HashAggregateExec>() {
return Ok(p);
}
if p.as_any().is::<AliasedSchemaExec>() || p.as_any().is::<FilterExec>() {
let children: datafusion::error::Result<Vec<_>> = p
.children()
.into_iter()
.map(|c| try_regroup_columns(c))
.collect();
return p.with_new_children(children?);
if p.as_any().is::<UnionExec>()
|| p.as_any().is::<ProjectionExec>()
|| p.as_any().is::<AliasedSchemaExec>()
|| p.as_any().is::<FilterExec>()
|| p.as_any().is::<WorkerExec>()
|| p.as_any().is::<ClusterSendExec>()
{
return p.with_new_children(
p.children()
.into_iter()
.map(|c| try_regroup_columns(c))
.collect::<Result<_, DataFusionError>>()?,
);
}

let merge;
if let Some(m) = p.as_any().downcast_ref::<MergeExec>() {
merge = m;
} else {
return Ok(p);
}

let input = try_regroup_columns(merge.input().clone())?;

// Try to replace `MergeExec` with `MergeSortExec`.
let sort_order;
if let Some(o) = merge.input().output_hints().sort_order {
if let Some(o) = input.output_hints().sort_order {
sort_order = o;
} else {
return Ok(p);
Expand All @@ -73,10 +85,7 @@ fn try_regroup_columns(
}
let sort_columns = sort_order
.into_iter()
.map(|i| merge.input().schema().field(i).qualified_name())
.map(|i| input.schema().field(i).qualified_name())
.collect();
Ok(Arc::new(MergeSortExec::try_new(
merge.input().clone(),
sort_columns,
)?))
Ok(Arc::new(MergeSortExec::try_new(input, sort_columns)?))
}
21 changes: 13 additions & 8 deletions rust/cubestore/src/queryplanner/pretty_printers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,11 @@ use datafusion::physical_plan::expressions::AliasedSchemaExec;
use datafusion::physical_plan::merge::{MergeExec, UnionExec};
use datafusion::physical_plan::projection::ProjectionExec;

#[derive(Default)]
pub struct PPOptions {
pub show_filters: bool,
}

impl Default for PPOptions {
fn default() -> Self {
PPOptions {
show_filters: false,
}
}
// Applies only to physical plan.
pub show_output_hints: bool,
}

pub fn pp_phys_plan(p: &dyn ExecutionPlan) -> String {
Expand Down Expand Up @@ -299,5 +294,15 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou
} else {
panic!("unhandled ExecutionPlan: {:?}", p);
}

if o.show_output_hints {
let hints = p.output_hints();
if !hints.single_value_columns.is_empty() {
*out += &format!(", single_vals: {:?}", hints.single_value_columns);
}
if let Some(so) = hints.sort_order {
*out += &format!(", sort_order: {:?}", so);
}
}
}
}
181 changes: 181 additions & 0 deletions rust/cubestore/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2801,6 +2801,187 @@ mod tests {
.await;
}

#[tokio::test]
async fn planning_hints() {
Config::run_test("planning_hints", async move |services| {
let service = services.sql_service;

service.exec_query("CREATE SCHEMA s").await.unwrap();
service
.exec_query("CREATE TABLE s.Data(id1 int, id2 int, id3 int)")
.await
.unwrap();

let mut show_hints = PPOptions::default();
show_hints.show_output_hints = true;

// Merge produces a sort order because there is only single partition.
let p = service
.plan_query("SELECT id1, id2 FROM s.Data")
.await
.unwrap();
assert_eq!(
pp_phys_plan_ext(p.worker.as_ref(), &show_hints),
"Worker, sort_order: [0, 1]\
\n Projection, [id1, id2], sort_order: [0, 1]\
\n Merge, sort_order: [0, 1]\
\n Scan, index: default:1:[1], fields: [id1, id2], sort_order: [0, 1]\
\n Empty"
);

let p = service
.plan_query("SELECT id2, id1 FROM s.Data")
.await
.unwrap();
assert_eq!(
pp_phys_plan_ext(p.worker.as_ref(), &show_hints),
"Worker, sort_order: [1, 0]\
\n Projection, [id2, id1], sort_order: [1, 0]\
\n Merge, sort_order: [0, 1]\
\n Scan, index: default:1:[1], fields: [id1, id2], sort_order: [0, 1]\
\n Empty"
);

// Unsorted when skips columns from sort prefix.
let p = service
.plan_query("SELECT id2, id3 FROM s.Data")
.await
.unwrap();
assert_eq!(
pp_phys_plan_ext(p.worker.as_ref(), &show_hints),
"Worker\
\n Projection, [id2, id3]\
\n Merge\
\n Scan, index: default:1:[1], fields: [id2, id3]\
\n Empty"
);

// The prefix columns are still sorted.
let p = service
.plan_query("SELECT id1, id3 FROM s.Data")
.await
.unwrap();
assert_eq!(
pp_phys_plan_ext(p.worker.as_ref(), &show_hints),
"Worker, sort_order: [0]\
\n Projection, [id1, id3], sort_order: [0]\
\n Merge, sort_order: [0]\
\n Scan, index: default:1:[1], fields: [id1, id3], sort_order: [0]\
\n Empty"
);

// Single value hints.
let p = service
.plan_query("SELECT id3, id2 FROM s.Data WHERE id2 = 234")
.await
.unwrap();
assert_eq!(
pp_phys_plan_ext(p.worker.as_ref(), &show_hints),
"Worker, single_vals: [1]\
\n Projection, [id3, id2], single_vals: [1]\
\n Filter, single_vals: [0]\
\n Merge\
\n Scan, index: default:1:[1], fields: [id2, id3]\
\n Empty"
);

// Removing single value columns should keep the sort order of the rest.
let p = service
.plan_query("SELECT id3 FROM s.Data WHERE id1 = 123 AND id2 = 234")
.await
.unwrap();
assert_eq!(
pp_phys_plan_ext(p.worker.as_ref(), &show_hints),
"Worker, sort_order: [0]\
\n Projection, [id3], sort_order: [0]\
\n Filter, single_vals: [0, 1], sort_order: [0, 1, 2]\
\n Merge, sort_order: [0, 1, 2]\
\n Scan, index: default:1:[1], fields: *, sort_order: [0, 1, 2]\
\n Empty"
);
let p = service
.plan_query("SELECT id1, id3 FROM s.Data WHERE id2 = 234")
.await
.unwrap();
assert_eq!(
pp_phys_plan_ext(p.worker.as_ref(), &show_hints),
"Worker, sort_order: [0, 1]\
\n Projection, [id1, id3], sort_order: [0, 1]\
\n Filter, single_vals: [1], sort_order: [0, 1, 2]\
\n Merge, sort_order: [0, 1, 2]\
\n Scan, index: default:1:[1], fields: *, sort_order: [0, 1, 2]\
\n Empty"
);
})
.await
}

#[tokio::test]
async fn planning_inplace_aggregate2() {
Config::run_test("planning_inplace_aggregate2", async move |services| {
let service = services.sql_service;

service.exec_query("CREATE SCHEMA s").await.unwrap();
service
.exec_query("CREATE TABLE s.Data1(allowed boolean, site_id int, url text, day timestamp, hits int)")
.await
.unwrap();
service
.exec_query("CREATE TABLE s.Data2(allowed boolean, site_id int, url text, day timestamp, hits int)")
.await
.unwrap();

let p = service
.plan_query("SELECT `url` `url`, SUM(`hits`) `hits` \
FROM (SELECT * FROM s.Data1 \
UNION ALL \
SELECT * FROM s.Data2) AS `Data` \
WHERE (`allowed` = 'true') AND (`site_id` = '1') \
AND (`day` >= to_timestamp('2021-01-01T00:00:00.000') \
AND `day` <= to_timestamp('2021-01-02T23:59:59.999')) \
GROUP BY 1 \
ORDER BY 2 DESC \
LIMIT 10")
.await
.unwrap();

let mut show_hints = PPOptions::default();
show_hints.show_output_hints = true;
assert_eq!(
pp_phys_plan_ext(p.router.as_ref(), &show_hints),
"GlobalLimit, n: 10\
\n Sort\
\n Projection, [url, SUM(hits):hits], sort_order: [0]\
\n FinalInplaceAggregate, sort_order: [0]\
\n MergeSort, sort_order: [0]\
\n ClusterSend, partitions: [[1], [2]], sort_order: [0]"
);
assert_eq!(
pp_phys_plan_ext(p.worker.as_ref(), &show_hints),
"GlobalLimit, n: 10\
\n Sort\
\n Projection, [url, SUM(hits):hits], sort_order: [0]\
\n FinalInplaceAggregate, sort_order: [0]\
\n Worker, sort_order: [0]\
\n PartialInplaceAggregate, sort_order: [0]\
\n Alias, single_vals: [0, 1], sort_order: [0, 1, 2, 3, 4]\
\n MergeSort, single_vals: [0, 1], sort_order: [0, 1, 2, 3, 4]\
\n Union, single_vals: [0, 1], sort_order: [0, 1, 2, 3, 4]\
\n Projection, [allowed, site_id, url, day, hits], single_vals: [0, 1], sort_order: [0, 1, 2, 3, 4]\
\n Filter, single_vals: [0, 1], sort_order: [0, 1, 2, 3, 4]\
\n MergeSort, sort_order: [0, 1, 2, 3, 4]\
\n Scan, index: default:1:[1], fields: *, sort_order: [0, 1, 2, 3, 4]\
\n Empty\
\n Projection, [allowed, site_id, url, day, hits], single_vals: [0, 1], sort_order: [0, 1, 2, 3, 4]\
\n Filter, single_vals: [0, 1], sort_order: [0, 1, 2, 3, 4]\
\n MergeSort, sort_order: [0, 1, 2, 3, 4]\
\n Scan, index: default:2:[2], fields: *, sort_order: [0, 1, 2, 3, 4]\
\n Empty"
);
})
.await;
}

#[tokio::test]
async fn planning_simple() {
Config::run_test("planning_simple", async move |services| {
Expand Down

0 comments on commit 9ab6559

Please sign in to comment.