Skip to content

Commit

Permalink
feat(cubesql): In subquery rewrite (#8162)
Browse files Browse the repository at this point in the history
  • Loading branch information
waralexrom committed Apr 24, 2024
1 parent 6ff2208 commit d17c2a7
Show file tree
Hide file tree
Showing 20 changed files with 1,164 additions and 109 deletions.
2 changes: 2 additions & 0 deletions packages/cubejs-schema-compiler/src/adapter/BaseQuery.js
Expand Up @@ -2893,6 +2893,8 @@ export class BaseQuery {
cast: 'CAST({{ expr }} AS {{ data_type }})',
window_function: '{{ fun_call }} OVER ({% if partition_by_concat %}PARTITION BY {{ partition_by_concat }}{% if order_by_concat %} {% endif %}{% endif %}{% if order_by_concat %}ORDER BY {{ order_by_concat }}{% endif %})',
in_list: '{{ expr }} {% if negated %}NOT {% endif %}IN ({{ in_exprs_concat }})',
subquery: '({{ expr }})',
in_subquery: '{{ expr }} {% if negated %}NOT {% endif %}IN {{ subquery_expr }}',
negative: '-({{ expr }})',
not: 'NOT ({{ expr }})',
true: 'TRUE',
Expand Down
4 changes: 4 additions & 0 deletions rust/cubesql/cubesql/src/compile/engine/df/scan.rs
Expand Up @@ -146,6 +146,7 @@ pub struct WrappedSelectNode {
pub schema: DFSchemaRef,
pub select_type: WrappedSelectType,
pub projection_expr: Vec<Expr>,
pub subqueries: Vec<Arc<LogicalPlan>>,
pub group_expr: Vec<Expr>,
pub aggr_expr: Vec<Expr>,
pub window_expr: Vec<Expr>,
Expand All @@ -166,6 +167,7 @@ impl WrappedSelectNode {
schema: DFSchemaRef,
select_type: WrappedSelectType,
projection_expr: Vec<Expr>,
subqueries: Vec<Arc<LogicalPlan>>,
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>,
window_expr: Vec<Expr>,
Expand All @@ -184,6 +186,7 @@ impl WrappedSelectNode {
schema,
select_type,
projection_expr,
subqueries,
group_expr,
aggr_expr,
window_expr,
Expand Down Expand Up @@ -320,6 +323,7 @@ impl UserDefinedLogicalNode for WrappedSelectNode {
self.schema.clone(),
self.select_type.clone(),
projection_expr,
self.subqueries.clone(),
group_expr,
aggregate_expr,
window_expr,
Expand Down
123 changes: 121 additions & 2 deletions rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

Large diffs are not rendered by default.

153 changes: 147 additions & 6 deletions rust/cubesql/cubesql/src/compile/mod.rs
Expand Up @@ -15633,6 +15633,7 @@ ORDER BY "source"."str0" ASC
)
}

/* TODO Uncomment after fixing the rewrite logic for filters with date_trunc
#[tokio::test]
async fn test_quicksight_date_trunc_column_less_or_eq() {
init_logger();
Expand Down Expand Up @@ -15679,7 +15680,7 @@ ORDER BY "source"."str0" ASC
ungrouped: None,
}
)
}
} */

#[tokio::test]
async fn test_quicksight_excluding_n_weeks() {
Expand Down Expand Up @@ -20179,11 +20180,151 @@ ORDER BY "source"."str0" ASC
.sql
.contains("COALESCE"));

let physical_plan = query_plan.as_physical_plan().await.unwrap();
println!(
"Physical plan: {}",
displayable(physical_plan.as_ref()).indent()
);
let _physical_plan = query_plan.as_physical_plan().await.unwrap();
}

#[tokio::test]
async fn test_simple_subquery_wrapper_projection() {
if !Rewriter::sql_push_down_enabled() {
return;
}
init_logger();

let query_plan = convert_select_to_query_plan(
"SELECT (SELECT customer_gender FROM KibanaSampleDataEcommerce WHERE customer_gender = 'male' LIMIT 1) as gender, avgPrice FROM KibanaSampleDataEcommerce a"
.to_string(),
DatabaseProtocol::PostgreSQL,
)
.await;

let logical_plan = query_plan.as_logical_plan();
assert!(logical_plan
.find_cube_scan_wrapper()
.wrapped_sql
.unwrap()
.sql
.contains("(SELECT"));
assert!(logical_plan
.find_cube_scan_wrapper()
.wrapped_sql
.unwrap()
.sql
.contains("\\\"limit\\\":1"));

let _physical_plan = query_plan.as_physical_plan().await.unwrap();
}

#[tokio::test]
async fn test_simple_subquery_wrapper_projection_aggregate() {
if !Rewriter::sql_push_down_enabled() {
return;
}
init_logger();

let query_plan = convert_select_to_query_plan(
"SELECT (SELECT customer_gender FROM KibanaSampleDataEcommerce WHERE customer_gender = 'male' LIMIT 1), avg(avgPrice) FROM KibanaSampleDataEcommerce a GROUP BY 1"
.to_string(),
DatabaseProtocol::PostgreSQL,
)
.await;

let logical_plan = query_plan.as_logical_plan();
assert!(logical_plan
.find_cube_scan_wrapper()
.wrapped_sql
.unwrap()
.sql
.contains("(SELECT"));
assert!(logical_plan
.find_cube_scan_wrapper()
.wrapped_sql
.unwrap()
.sql
.contains("LIMIT 1"));

let _physical_plan = query_plan.as_physical_plan().await.unwrap();
}

#[tokio::test]
async fn test_simple_subquery_wrapper_filter_equal() {
if !Rewriter::sql_push_down_enabled() {
return;
}
init_logger();

let query_plan = convert_select_to_query_plan(
"SELECT customer_gender, avgPrice FROM KibanaSampleDataEcommerce a where customer_gender = (select customer_gender from KibanaSampleDataEcommerce limit 1)"
.to_string(),
DatabaseProtocol::PostgreSQL,
)
.await;

let logical_plan = query_plan.as_logical_plan();
assert!(logical_plan
.find_cube_scan_wrapper()
.wrapped_sql
.unwrap()
.sql
.contains("(SELECT"));
assert!(logical_plan
.find_cube_scan_wrapper()
.wrapped_sql
.unwrap()
.sql
.contains("\\\"limit\\\":1"));

let _physical_plan = query_plan.as_physical_plan().await.unwrap();
}

#[tokio::test]
async fn test_simple_subquery_wrapper_filter_in() {
if !Rewriter::sql_push_down_enabled() {
return;
}
init_logger();

let query_plan = convert_select_to_query_plan(
"SELECT customer_gender, avgPrice FROM KibanaSampleDataEcommerce a where customer_gender in (select customer_gender from KibanaSampleDataEcommerce)"
.to_string(),
DatabaseProtocol::PostgreSQL,
)
.await;

let logical_plan = query_plan.as_logical_plan();
assert!(logical_plan
.find_cube_scan_wrapper()
.wrapped_sql
.unwrap()
.sql
.contains("IN (SELECT"));

let _physical_plan = query_plan.as_physical_plan().await.unwrap();
}

#[tokio::test]
async fn test_simple_subquery_wrapper_filter_and_projection() {
if !Rewriter::sql_push_down_enabled() {
return;
}
init_logger();

let query_plan = convert_select_to_query_plan(
"SELECT (select customer_gender from KibanaSampleDataEcommerce limit 1), avgPrice FROM KibanaSampleDataEcommerce a where customer_gender in (select customer_gender from KibanaSampleDataEcommerce)"
.to_string(),
DatabaseProtocol::PostgreSQL,
)
.await;

let logical_plan = query_plan.as_logical_plan();

assert!(logical_plan
.find_cube_scan_wrapper()
.wrapped_sql
.unwrap()
.sql
.contains("IN (SELECT"));

let _physical_plan = query_plan.as_physical_plan().await.unwrap();
}

#[tokio::test]
Expand Down
63 changes: 42 additions & 21 deletions rust/cubesql/cubesql/src/compile/rewrite/converter.rs
Expand Up @@ -14,7 +14,7 @@ use crate::{
ChangeUserMemberValue, ColumnExprColumn, CubeScanAliasToCube, CubeScanLimit,
CubeScanOffset, CubeScanUngrouped, CubeScanWrapped, DimensionName,
EmptyRelationProduceOneRow, FilterMemberMember, FilterMemberOp, FilterMemberValues,
FilterOpOp, InListExprNegated, InSubqueryNegated, JoinJoinConstraint, JoinJoinType,
FilterOpOp, InListExprNegated, InSubqueryExprNegated, JoinJoinConstraint, JoinJoinType,
JoinLeftOn, JoinRightOn, LikeExprEscapeChar, LikeExprLikeType, LikeExprNegated,
LikeType, LimitFetch, LimitSkip, LiteralExprValue, LiteralMemberRelation,
LiteralMemberValue, LogicalPlanLanguage, MeasureName, MemberErrorError, OrderAsc,
Expand Down Expand Up @@ -432,9 +432,11 @@ impl LogicalPlanToLanguageConverter {
} => {
let expr = Self::add_expr_replace_params(graph, expr, query_params)?;
let subquery = Self::add_expr_replace_params(graph, subquery, query_params)?;
let negated = add_expr_data_node!(graph, negated, InSubqueryNegated);
let negated = add_expr_data_node!(graph, negated, InSubqueryExprNegated);

graph.add(LogicalPlanLanguage::InSubquery([expr, subquery, negated]))
graph.add(LogicalPlanLanguage::InSubqueryExpr([
expr, subquery, negated,
]))
}
Expr::Wildcard => graph.add(LogicalPlanLanguage::WildcardExpr([])),
Expr::GetIndexedField { expr, key } => {
Expand Down Expand Up @@ -1035,10 +1037,10 @@ pub fn node_to_expr(
"QueryParam can't be evaluated as an Expr node".to_string(),
));
}
LogicalPlanLanguage::InSubquery(params) => {
LogicalPlanLanguage::InSubqueryExpr(params) => {
let expr = Box::new(to_expr(params[0].clone())?);
let subquery = Box::new(to_expr(params[1].clone())?);
let negated = match_data_node!(node_by_id, params[2], InSubqueryNegated);
let negated = match_data_node!(node_by_id, params[2], InSubqueryExprNegated);
Expr::InSubquery {
expr,
subquery,
Expand Down Expand Up @@ -1889,14 +1891,22 @@ impl LanguageToLogicalPlanConverter {
params[1],
WrappedSelectProjectionExpr
);
let subqueries =
match_list_node_ids!(node_by_id, params[2], WrappedSelectSubqueries)
.into_iter()
.map(|j| {
let input = Arc::new(self.to_logical_plan(j)?);
Ok(input)
})
.collect::<Result<Vec<_>, CubeError>>()?;
let group_expr =
match_expr_list_node!(node_by_id, to_expr, params[2], WrappedSelectGroupExpr);
match_expr_list_node!(node_by_id, to_expr, params[3], WrappedSelectGroupExpr);
let aggr_expr =
match_expr_list_node!(node_by_id, to_expr, params[3], WrappedSelectAggrExpr);
match_expr_list_node!(node_by_id, to_expr, params[4], WrappedSelectAggrExpr);
let window_expr =
match_expr_list_node!(node_by_id, to_expr, params[4], WrappedSelectWindowExpr);
let from = Arc::new(self.to_logical_plan(params[5])?);
let joins = match_list_node!(node_by_id, params[6], WrappedSelectJoins)
match_expr_list_node!(node_by_id, to_expr, params[5], WrappedSelectWindowExpr);
let from = Arc::new(self.to_logical_plan(params[6])?);
let joins = match_list_node!(node_by_id, params[7], WrappedSelectJoins)
.into_iter()
.map(|j| {
if let LogicalPlanLanguage::WrappedSelectJoin(params) = j {
Expand All @@ -1912,16 +1922,16 @@ impl LanguageToLogicalPlanConverter {
.collect::<Result<Vec<_>, _>>()?;

let filter_expr =
match_expr_list_node!(node_by_id, to_expr, params[7], WrappedSelectFilterExpr);
match_expr_list_node!(node_by_id, to_expr, params[8], WrappedSelectFilterExpr);
let having_expr =
match_expr_list_node!(node_by_id, to_expr, params[8], WrappedSelectHavingExpr);
let limit = match_data_node!(node_by_id, params[9], WrappedSelectLimit);
let offset = match_data_node!(node_by_id, params[10], WrappedSelectOffset);
match_expr_list_node!(node_by_id, to_expr, params[9], WrappedSelectHavingExpr);
let limit = match_data_node!(node_by_id, params[10], WrappedSelectLimit);
let offset = match_data_node!(node_by_id, params[11], WrappedSelectOffset);
let order_expr =
match_expr_list_node!(node_by_id, to_expr, params[11], WrappedSelectOrderExpr);
let alias = match_data_node!(node_by_id, params[12], WrappedSelectAlias);
let distinct = match_data_node!(node_by_id, params[13], WrappedSelectDistinct);
let ungrouped = match_data_node!(node_by_id, params[14], WrappedSelectUngrouped);
match_expr_list_node!(node_by_id, to_expr, params[12], WrappedSelectOrderExpr);
let alias = match_data_node!(node_by_id, params[13], WrappedSelectAlias);
let distinct = match_data_node!(node_by_id, params[14], WrappedSelectDistinct);
let ungrouped = match_data_node!(node_by_id, params[15], WrappedSelectUngrouped);

let filter_expr = normalize_cols(
replace_qualified_col_with_flat_name_if_missing(
Expand Down Expand Up @@ -1981,8 +1991,15 @@ impl LanguageToLogicalPlanConverter {
} else {
all_expr_without_window
};

let mut subqueries_schema = DFSchema::empty();
for subquery in subqueries.iter() {
subqueries_schema.merge(subquery.schema());
}
let schema_with_subqueries = from.schema().join(&subqueries_schema)?;

let without_window_fields =
exprlist_to_fields(all_expr_without_window.iter(), from.schema())?;
exprlist_to_fields(all_expr_without_window.iter(), &schema_with_subqueries)?;
let replace_map = all_expr_without_window
.iter()
.zip(without_window_fields.iter())
Expand Down Expand Up @@ -2034,8 +2051,11 @@ impl LanguageToLogicalPlanConverter {
without_window_fields
.into_iter()
.chain(
exprlist_to_fields(window_expr_rebased.iter(), from.schema())?
.into_iter(),
exprlist_to_fields(
window_expr_rebased.iter(),
&schema_with_subqueries,
)?
.into_iter(),
)
.collect(),
HashMap::new(),
Expand All @@ -2051,6 +2071,7 @@ impl LanguageToLogicalPlanConverter {
Arc::new(schema),
select_type,
projection_expr,
subqueries,
group_expr,
aggr_expr,
window_expr_rebased,
Expand Down
9 changes: 9 additions & 0 deletions rust/cubesql/cubesql/src/compile/rewrite/cost.rs
Expand Up @@ -39,6 +39,7 @@ pub struct CubePlanCost {
table_scans: i64,
empty_wrappers: i64,
non_detected_cube_scans: i64,
unwrapped_subqueries: usize,
member_errors: i64,
// TODO if pre-aggregation can be used for window functions, then it'd be suboptimal
non_pushed_down_window: i64,
Expand Down Expand Up @@ -147,6 +148,7 @@ impl CubePlanCost {
ast_size: self.ast_size + other.ast_size,
ast_size_inside_wrapper: self.ast_size_inside_wrapper + other.ast_size_inside_wrapper,
ungrouped_nodes: self.ungrouped_nodes + other.ungrouped_nodes,
unwrapped_subqueries: self.unwrapped_subqueries + other.unwrapped_subqueries,
}
}

Expand Down Expand Up @@ -199,6 +201,7 @@ impl CubePlanCost {
}
CubePlanState::Wrapper => 0,
} + self.ungrouped_aggregates,
unwrapped_subqueries: self.unwrapped_subqueries,
wrapper_nodes: self.wrapper_nodes,
wrapped_select_ungrouped_scan: self.wrapped_select_ungrouped_scan,
cube_scan_nodes: self.cube_scan_nodes,
Expand Down Expand Up @@ -378,6 +381,11 @@ impl CostFunction<LogicalPlanLanguage> for BestCubePlan {
_ => 0,
};

let unwrapped_subqueries = match enode {
LogicalPlanLanguage::Subquery(_) => 1,
_ => 0,
};

let initial_cost = CubePlanCostAndState {
cost: CubePlanCost {
replacers: this_replacers,
Expand All @@ -402,6 +410,7 @@ impl CostFunction<LogicalPlanLanguage> for BestCubePlan {
ast_size_without_alias,
ast_size: 1,
ungrouped_nodes,
unwrapped_subqueries,
},
state: match enode {
LogicalPlanLanguage::CubeScanWrapped(CubeScanWrapped(true)) => {
Expand Down

0 comments on commit d17c2a7

Please sign in to comment.