Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cubesql): In subquery support #7851

Merged
merged 3 commits into from Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 7 additions & 7 deletions packages/cubejs-backend-native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions rust/cubesql/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions rust/cubesql/cubesql/Cargo.toml
Expand Up @@ -10,12 +10,12 @@ homepage = "https://cube.dev"

[dependencies]
arc-swap = "1"
datafusion = { git = 'https://github.com/cube-js/arrow-datafusion.git', rev = "28a07c390e7195dfd657c85118dee8cb73fc6bf7", default-features = false, features = ["regex_expressions", "unicode_expressions"] }
datafusion = { git = 'https://github.com/cube-js/arrow-datafusion.git', rev = "a93bb9641d201b2b42ee92321f07e87bbd357d0e", default-features = false, features = ["regex_expressions", "unicode_expressions"] }
anyhow = "1.0"
thiserror = "1.0.50"
cubeclient = { path = "../cubeclient" }
pg-srv = { path = "../pg-srv" }
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "347f769500e3305f1920d8b38832f483d8795bd3" }
sqlparser = { git = 'https://github.com/cube-js/sqlparser-rs.git', rev = "6a54d27d3b75a04b9f9cbe309a83078aa54b32fd" }
lazy_static = "1.4.0"
base64 = "0.13.0"
tokio = { version = "^1.35", features = ["full", "rt", "tracing"] }
Expand Down
Expand Up @@ -498,6 +498,7 @@ fn filter_push_down(
subqueries,
input,
schema,
types,
}) => {
// TODO: Push Filter down Subquery
issue_filter(
Expand All @@ -516,6 +517,7 @@ fn filter_push_down(
optimizer_config,
)?),
schema: schema.clone(),
types: types.clone(),
}),
)
}
Expand Down
Expand Up @@ -339,6 +339,7 @@ fn limit_push_down(
subqueries,
input,
schema,
types,
}) => {
// TODO: Pushing Limit down Subquery?
issue_limit(
Expand All @@ -359,6 +360,7 @@ fn limit_push_down(
optimizer_config,
)?),
schema: schema.clone(),
types: types.clone(),
}),
)
}
Expand Down
Expand Up @@ -295,6 +295,7 @@ fn sort_push_down(
subqueries,
input,
schema,
types,
}) => {
// TODO: Pushing Sort down Subquery?
issue_sort(
Expand All @@ -306,6 +307,7 @@ fn sort_push_down(
.collect::<Result<_>>()?,
input: Arc::new(sort_push_down(optimizer, input, None, optimizer_config)?),
schema: schema.clone(),
types: types.clone(),
}),
)
}
Expand Down
23 changes: 22 additions & 1 deletion rust/cubesql/cubesql/src/compile/engine/df/optimizers/utils.rs
Expand Up @@ -48,7 +48,12 @@
right: Box::new(right),
})
}
Expr::AnyExpr { left, op, right } => {
Expr::AnyExpr {
left,
op,
right,
all,

Check warning on line 55 in rust/cubesql/cubesql/src/compile/engine/df/optimizers/utils.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/engine/df/optimizers/utils.rs#L52-L55

Added lines #L52 - L55 were not covered by tests
} => {
let rewrites = match (rewrite(left, map)?, rewrite(right, map)?) {
(Some(left), Some(right)) => Some((left, right)),
_ => None,
Expand All @@ -57,6 +62,7 @@
left: Box::new(left),
op: op.clone(),
right: Box::new(right),
all: all.clone(),

Check warning on line 65 in rust/cubesql/cubesql/src/compile/engine/df/optimizers/utils.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/engine/df/optimizers/utils.rs#L65

Added line #L65 was not covered by tests
})
}
Expr::Like(Like {
Expand Down Expand Up @@ -310,6 +316,21 @@
// As rewrites are used to push things down or up the plan, wildcards
// might change the selection and should be marked as non-rewrittable
Expr::Wildcard | Expr::QualifiedWildcard { .. } => None,
Expr::InSubquery {
expr,
subquery,
negated,

Check warning on line 322 in rust/cubesql/cubesql/src/compile/engine/df/optimizers/utils.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/engine/df/optimizers/utils.rs#L320-L322

Added lines #L320 - L322 were not covered by tests
} => {
let rewrites = match (rewrite(expr, map)?, rewrite(subquery, map)?) {
(Some(expr), Some(subquery)) => Some((expr, subquery)),
_ => None,

Check warning on line 326 in rust/cubesql/cubesql/src/compile/engine/df/optimizers/utils.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/engine/df/optimizers/utils.rs#L324-L326

Added lines #L324 - L326 were not covered by tests
};
rewrites.map(|(expr, subquery)| Expr::InSubquery {
expr: Box::new(expr),
subquery: Box::new(subquery),
negated: negated.clone(),

Check warning on line 331 in rust/cubesql/cubesql/src/compile/engine/df/optimizers/utils.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/engine/df/optimizers/utils.rs#L328-L331

Added lines #L328 - L331 were not covered by tests
})
}
})
}

Expand Down
77 changes: 57 additions & 20 deletions rust/cubesql/cubesql/src/compile/rewrite/converter.rs
Expand Up @@ -10,21 +10,22 @@
rewrite::{
analysis::LogicalPlanAnalysis, rewriter::Rewriter, AggregateFunctionExprDistinct,
AggregateFunctionExprFun, AggregateSplit, AggregateUDFExprFun, AliasExprAlias,
AnyExprOp, BetweenExprNegated, BinaryExprOp, CastExprDataType, ChangeUserMemberValue,
ColumnExprColumn, CubeScanAliasToCube, CubeScanLimit, CubeScanOffset,
CubeScanUngrouped, CubeScanWrapped, DimensionName, EmptyRelationProduceOneRow,
FilterMemberMember, FilterMemberOp, FilterMemberValues, FilterOpOp, InListExprNegated,
JoinJoinConstraint, JoinJoinType, JoinLeftOn, JoinRightOn, LikeExprEscapeChar,
LikeExprLikeType, LikeExprNegated, LikeType, LimitFetch, LimitSkip, LiteralExprValue,
LiteralMemberRelation, LiteralMemberValue, LogicalPlanLanguage, MeasureName,
MemberErrorError, OrderAsc, OrderMember, OuterColumnExprColumn,
OuterColumnExprDataType, ProjectionAlias, ProjectionSplit, QueryParamIndex,
ScalarFunctionExprFun, ScalarUDFExprFun, ScalarVariableExprDataType,
ScalarVariableExprVariable, SegmentMemberMember, SortExprAsc, SortExprNullsFirst,
TableScanFetch, TableScanProjection, TableScanSourceTableName, TableScanTableName,
TableUDFExprFun, TimeDimensionDateRange, TimeDimensionGranularity, TimeDimensionName,
TryCastExprDataType, UnionAlias, WindowFunctionExprFun, WindowFunctionExprWindowFrame,
WrappedSelectAlias, WrappedSelectJoinJoinType, WrappedSelectLimit, WrappedSelectOffset,
AnyExprAll, AnyExprOp, BetweenExprNegated, BinaryExprOp, CastExprDataType,
ChangeUserMemberValue, ColumnExprColumn, CubeScanAliasToCube, CubeScanLimit,
CubeScanOffset, CubeScanUngrouped, CubeScanWrapped, DimensionName,
EmptyRelationProduceOneRow, FilterMemberMember, FilterMemberOp, FilterMemberValues,
FilterOpOp, InListExprNegated, InSubqueryNegated, JoinJoinConstraint, JoinJoinType,
JoinLeftOn, JoinRightOn, LikeExprEscapeChar, LikeExprLikeType, LikeExprNegated,
LikeType, LimitFetch, LimitSkip, LiteralExprValue, LiteralMemberRelation,
LiteralMemberValue, LogicalPlanLanguage, MeasureName, MemberErrorError, OrderAsc,
OrderMember, OuterColumnExprColumn, OuterColumnExprDataType, ProjectionAlias,
ProjectionSplit, QueryParamIndex, ScalarFunctionExprFun, ScalarUDFExprFun,
ScalarVariableExprDataType, ScalarVariableExprVariable, SegmentMemberMember,
SortExprAsc, SortExprNullsFirst, SubqueryTypes, TableScanFetch, TableScanProjection,
TableScanSourceTableName, TableScanTableName, TableUDFExprFun, TimeDimensionDateRange,
TimeDimensionGranularity, TimeDimensionName, TryCastExprDataType, UnionAlias,
WindowFunctionExprFun, WindowFunctionExprWindowFrame, WrappedSelectAlias,
WrappedSelectJoinJoinType, WrappedSelectLimit, WrappedSelectOffset,
WrappedSelectSelectType, WrappedSelectType, WrappedSelectUngrouped,
},
},
Expand Down Expand Up @@ -221,11 +222,18 @@
graph.add(LogicalPlanLanguage::LiteralExpr([value]))
}
}
Expr::AnyExpr { left, op, right } => {
Expr::AnyExpr {
left,
op,
right,
all,
} => {
let left = Self::add_expr_replace_params(graph, left, query_params)?;
let op = add_expr_data_node!(graph, op, AnyExprOp);
let right = Self::add_expr_replace_params(graph, right, query_params)?;
graph.add(LogicalPlanLanguage::AnyExpr([left, op, right]))
let all = add_expr_data_node!(graph, all, AnyExprAll);

graph.add(LogicalPlanLanguage::AnyExpr([left, op, right, all]))
}
Expr::BinaryExpr { left, op, right } => {
let left = Self::add_expr_replace_params(graph, left, query_params)?;
Expand Down Expand Up @@ -414,6 +422,17 @@
let negated = add_expr_data_node!(graph, negated, InListExprNegated);
graph.add(LogicalPlanLanguage::InListExpr([expr, list, negated]))
}
Expr::InSubquery {
expr,
subquery,
negated,

Check warning on line 428 in rust/cubesql/cubesql/src/compile/rewrite/converter.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/converter.rs#L426-L428

Added lines #L426 - L428 were not covered by tests
} => {
let expr = Self::add_expr_replace_params(graph, expr, query_params)?;
let subquery = Self::add_expr_replace_params(graph, subquery, query_params)?;
let negated = add_expr_data_node!(graph, negated, InSubqueryNegated);

Check warning on line 432 in rust/cubesql/cubesql/src/compile/rewrite/converter.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/converter.rs#L430-L432

Added lines #L430 - L432 were not covered by tests

graph.add(LogicalPlanLanguage::InSubquery([expr, subquery, negated]))

Check warning on line 434 in rust/cubesql/cubesql/src/compile/rewrite/converter.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/converter.rs#L434

Added line #L434 was not covered by tests
}
Expr::Wildcard => graph.add(LogicalPlanLanguage::WildcardExpr([])),
Expr::GetIndexedField { expr, key } => {
let expr = Self::add_expr_replace_params(graph, expr, query_params)?;
Expand Down Expand Up @@ -548,8 +567,9 @@
self.add_logical_plan_replace_params(node.input.as_ref(), query_params)?;
let subqueries =
add_plan_list_node!(self, node.subqueries, query_params, SubquerySubqueries);
let types = add_data_node!(self, node.types, SubqueryTypes);
self.graph
.add(LogicalPlanLanguage::Subquery([input, subqueries]))
.add(LogicalPlanLanguage::Subquery([input, subqueries, types]))
}
LogicalPlan::TableUDFs(node) => {
let expr =
Expand Down Expand Up @@ -818,7 +838,13 @@
let left = Box::new(to_expr(params[0].clone())?);
let op = match_data_node!(node_by_id, params[1], AnyExprOp);
let right = Box::new(to_expr(params[2].clone())?);
Expr::AnyExpr { left, op, right }
let all = match_data_node!(node_by_id, params[3], AnyExprAll);
Expr::AnyExpr {
left,
op,
right,
all,
}
}
LogicalPlanLanguage::BinaryExpr(params) => {
let left = Box::new(to_expr(params[0].clone())?);
Expand Down Expand Up @@ -1006,6 +1032,16 @@
"QueryParam can't be evaluated as an Expr node".to_string(),
));
}
LogicalPlanLanguage::InSubquery(params) => {
let expr = Box::new(to_expr(params[0].clone())?);
let subquery = Box::new(to_expr(params[1].clone())?);
let negated = match_data_node!(node_by_id, params[2], InSubqueryNegated);
Expr::InSubquery {
expr,
subquery,

Check warning on line 1041 in rust/cubesql/cubesql/src/compile/rewrite/converter.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/converter.rs#L1035-L1041

Added lines #L1035 - L1041 were not covered by tests
negated,
}
}
x => panic!("Unexpected expression node: {:?}", x),
})
}
Expand Down Expand Up @@ -1179,8 +1215,9 @@
.into_iter()
.map(|n| self.to_logical_plan(n))
.collect::<Result<Vec<_>, _>>()?;
let types = match_data_node!(node_by_id, params[2], SubqueryTypes);
LogicalPlanBuilder::from(input)
.subquery(subqueries)?
.subquery(subqueries, types)?
.build()?
}
LogicalPlanLanguage::TableUDFs(params) => {
Expand Down