From 490f51fdeeadfb0769bd81e491f8c2837565d0bd Mon Sep 17 00:00:00 2001 From: Alex Qyoun-ae <4062971+MazterQyou@users.noreply.github.com> Date: Wed, 25 Jun 2025 20:00:18 +0400 Subject: [PATCH] fix(cubesql): Push down `__user` meta filter further Signed-off-by: Alex Qyoun-ae <4062971+MazterQyou@users.noreply.github.com> --- .../engine/df/optimizers/filter_split_meta.rs | 337 ++++++++++++------ .../src/compile/test/test_user_change.rs | 58 ++- 2 files changed, 293 insertions(+), 102 deletions(-) diff --git a/rust/cubesql/cubesql/src/compile/engine/df/optimizers/filter_split_meta.rs b/rust/cubesql/cubesql/src/compile/engine/df/optimizers/filter_split_meta.rs index 5307548f407f2..2360cf0da74c1 100644 --- a/rust/cubesql/cubesql/src/compile/engine/df/optimizers/filter_split_meta.rs +++ b/rust/cubesql/cubesql/src/compile/engine/df/optimizers/filter_split_meta.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{iter::FromIterator, mem::take, sync::Arc}; use datafusion::{ error::{DataFusionError, Result}, @@ -12,6 +12,7 @@ use datafusion::{ optimizer::optimizer::{OptimizerConfig, OptimizerRule}, physical_plan::functions::BuiltinScalarFunction, }; +use indexmap::IndexSet; /// Filter Split Meta optimizer rule splits a `WHERE` clause into two distinct filters, /// pushing meta filters (currently only `__user`) down the plan, separate from other filters. @@ -33,7 +34,15 @@ impl OptimizerRule for FilterSplitMeta { plan: &LogicalPlan, optimizer_config: &OptimizerConfig, ) -> Result { - filter_split_meta(self, plan, optimizer_config) + let mut meta_predicates = IndexSet::new(); + let result = filter_split_meta(self, plan, &mut meta_predicates, optimizer_config)?; + if !meta_predicates.is_empty() { + return Err(DataFusionError::Internal( + "Unexpected non-issued meta predicates while running FilterSplitMeta optimizer" + .to_string(), + )); + } + Ok(result) } fn name(&self) -> &str { @@ -46,6 +55,7 @@ impl OptimizerRule for FilterSplitMeta { fn filter_split_meta( optimizer: &FilterSplitMeta, plan: &LogicalPlan, + meta_predicates: &mut IndexSet, optimizer_config: &OptimizerConfig, ) -> Result { match plan { @@ -54,12 +64,17 @@ fn filter_split_meta( input, schema, alias, - }) => Ok(LogicalPlan::Projection(Projection { - expr: expr.clone(), - input: Arc::new(filter_split_meta(optimizer, input, optimizer_config)?), - schema: schema.clone(), - alias: alias.clone(), - })), + }) => { + // Push meta predicates down `Projection` if possible. + let plan = filter_split_meta(optimizer, input, meta_predicates, optimizer_config)?; + let plan = issue_meta_predicates(plan, meta_predicates)?; + Ok(LogicalPlan::Projection(Projection { + expr: expr.clone(), + input: Arc::new(plan), + schema: schema.clone(), + alias: alias.clone(), + })) + } LogicalPlan::Filter(Filter { predicate, input }) => { // Filter expressions can be moved around or split when they're chained with `AND` safely. // However, the input of `Filter` might be realiased, so we can't be sure if `__user` is really @@ -67,17 +82,13 @@ fn filter_split_meta( // However, we also have joins complicating things. // Additionally, there's no harm in splitting `__user` filter from other filters anyway; // hence we'll split all `Filter` nodes. - let (normal_predicates, meta_predicates) = split_predicates(predicate, vec![], vec![]); - let mut plan = filter_split_meta(optimizer, input, optimizer_config)?; - if !meta_predicates.is_empty() { + let mut normal_predicates = vec![]; + split_predicates(predicate, &mut normal_predicates, meta_predicates); + let plan = filter_split_meta(optimizer, input, meta_predicates, optimizer_config)?; + let mut plan = issue_meta_predicates(plan, meta_predicates)?; + if let Some(collected_predicates) = collect_predicates(normal_predicates, false) { plan = LogicalPlan::Filter(Filter { - predicate: collect_predicates(meta_predicates)?, - input: Arc::new(plan), - }); - } - if !normal_predicates.is_empty() { - plan = LogicalPlan::Filter(Filter { - predicate: collect_predicates(normal_predicates)?, + predicate: collected_predicates, input: Arc::new(plan), }); } @@ -88,7 +99,13 @@ fn filter_split_meta( window_expr, schema, }) => Ok(LogicalPlan::Window(Window { - input: Arc::new(filter_split_meta(optimizer, input, optimizer_config)?), + // Don't push meta predicates down `Window`. + input: Arc::new(filter_split_meta( + optimizer, + input, + &mut IndexSet::new(), + optimizer_config, + )?), window_expr: window_expr.clone(), schema: schema.clone(), })), @@ -98,15 +115,26 @@ fn filter_split_meta( aggr_expr, schema, }) => Ok(LogicalPlan::Aggregate(Aggregate { - input: Arc::new(filter_split_meta(optimizer, input, optimizer_config)?), + // Don't push meta predicates down `Aggregate`. + input: Arc::new(filter_split_meta( + optimizer, + input, + &mut IndexSet::new(), + optimizer_config, + )?), group_expr: group_expr.clone(), aggr_expr: aggr_expr.clone(), schema: schema.clone(), })), - LogicalPlan::Sort(Sort { expr, input }) => Ok(LogicalPlan::Sort(Sort { - expr: expr.clone(), - input: Arc::new(filter_split_meta(optimizer, input, optimizer_config)?), - })), + LogicalPlan::Sort(Sort { expr, input }) => { + // Push meta predicates down `Sort`. + let plan = filter_split_meta(optimizer, input, meta_predicates, optimizer_config)?; + let plan = issue_meta_predicates(plan, meta_predicates)?; + Ok(LogicalPlan::Sort(Sort { + expr: expr.clone(), + input: Arc::new(plan), + })) + } LogicalPlan::Join(Join { left, right, @@ -115,29 +143,75 @@ fn filter_split_meta( join_constraint, schema, null_equals_null, - }) => Ok(LogicalPlan::Join(Join { - left: Arc::new(filter_split_meta(optimizer, left, optimizer_config)?), - right: Arc::new(filter_split_meta(optimizer, right, optimizer_config)?), - on: on.clone(), - join_type: *join_type, - join_constraint: *join_constraint, - schema: schema.clone(), - null_equals_null: *null_equals_null, - })), + }) => { + // For `Join`, we can push down both sides and collect non-issued meta predicates. + let mut left_meta_predicates = take(meta_predicates); + let mut right_meta_predicates = left_meta_predicates.clone(); + let left_plan = + filter_split_meta(optimizer, left, &mut left_meta_predicates, optimizer_config)?; + let left_plan = issue_meta_predicates(left_plan, &mut left_meta_predicates)?; + let right_plan = filter_split_meta( + optimizer, + right, + &mut right_meta_predicates, + optimizer_config, + )?; + let right_plan = issue_meta_predicates(right_plan, &mut right_meta_predicates)?; + *meta_predicates = IndexSet::from_iter( + left_meta_predicates + .intersection(&right_meta_predicates) + .cloned(), + ); + Ok(LogicalPlan::Join(Join { + left: Arc::new(left_plan), + right: Arc::new(right_plan), + on: on.clone(), + join_type: *join_type, + join_constraint: *join_constraint, + schema: schema.clone(), + null_equals_null: *null_equals_null, + })) + } LogicalPlan::CrossJoin(CrossJoin { left, right, schema, - }) => Ok(LogicalPlan::CrossJoin(CrossJoin { - left: Arc::new(filter_split_meta(optimizer, left, optimizer_config)?), - right: Arc::new(filter_split_meta(optimizer, right, optimizer_config)?), - schema: schema.clone(), - })), + }) => { + // For `CrossJoin`, we can push down both sides and collect non-issued meta predicates. + let mut left_meta_predicates = take(meta_predicates); + let mut right_meta_predicates = left_meta_predicates.clone(); + let left_plan = + filter_split_meta(optimizer, left, &mut left_meta_predicates, optimizer_config)?; + let left_plan = issue_meta_predicates(left_plan, &mut left_meta_predicates)?; + let right_plan = filter_split_meta( + optimizer, + right, + &mut right_meta_predicates, + optimizer_config, + )?; + let right_plan = issue_meta_predicates(right_plan, &mut right_meta_predicates)?; + *meta_predicates = IndexSet::from_iter( + left_meta_predicates + .intersection(&right_meta_predicates) + .cloned(), + ); + Ok(LogicalPlan::CrossJoin(CrossJoin { + left: Arc::new(left_plan), + right: Arc::new(right_plan), + schema: schema.clone(), + })) + } LogicalPlan::Repartition(Repartition { input, partitioning_scheme, }) => Ok(LogicalPlan::Repartition(Repartition { - input: Arc::new(filter_split_meta(optimizer, input, optimizer_config)?), + // Don't push meta predicates down `Repartition`. + input: Arc::new(filter_split_meta( + optimizer, + input, + &mut IndexSet::new(), + optimizer_config, + )?), partitioning_scheme: partitioning_scheme.clone(), })), LogicalPlan::Union(Union { @@ -145,9 +219,12 @@ fn filter_split_meta( schema, alias, }) => Ok(LogicalPlan::Union(Union { + // Don't push meta predicates down `Union`. inputs: inputs .iter() - .map(|plan| filter_split_meta(optimizer, plan, optimizer_config)) + .map(|plan| { + filter_split_meta(optimizer, plan, &mut IndexSet::new(), optimizer_config) + }) .collect::>()?, schema: schema.clone(), alias: alias.clone(), @@ -159,25 +236,49 @@ fn filter_split_meta( LogicalPlan::Limit(Limit { skip, fetch, input }) => Ok(LogicalPlan::Limit(Limit { skip: *skip, fetch: *fetch, - input: Arc::new(filter_split_meta(optimizer, input, optimizer_config)?), + // Don't push meta predicates down `Limit`. + input: Arc::new(filter_split_meta( + optimizer, + input, + &mut IndexSet::new(), + optimizer_config, + )?), })), LogicalPlan::Subquery(Subquery { subqueries, input, schema, types, - }) => Ok(LogicalPlan::Subquery(Subquery { - subqueries: subqueries - .iter() - .map(|subquery| filter_split_meta(optimizer, subquery, optimizer_config)) - .collect::>()?, - input: Arc::new(filter_split_meta(optimizer, input, optimizer_config)?), - schema: schema.clone(), - types: types.clone(), - })), - LogicalPlan::Distinct(Distinct { input }) => Ok(LogicalPlan::Distinct(Distinct { - input: Arc::new(filter_split_meta(optimizer, input, optimizer_config)?), - })), + }) => { + // Push meta predicates down `Subquery` input. + let plan = filter_split_meta(optimizer, input, meta_predicates, optimizer_config)?; + let plan = issue_meta_predicates(plan, meta_predicates)?; + Ok(LogicalPlan::Subquery(Subquery { + // Don't push meta predicates down subqueries. + subqueries: subqueries + .iter() + .map(|subquery| { + filter_split_meta( + optimizer, + subquery, + &mut IndexSet::new(), + optimizer_config, + ) + }) + .collect::>()?, + input: Arc::new(plan), + schema: schema.clone(), + types: types.clone(), + })) + } + LogicalPlan::Distinct(Distinct { input }) => { + // Push meta predicates down `Distinct`. + let plan = filter_split_meta(optimizer, input, meta_predicates, optimizer_config)?; + let plan = issue_meta_predicates(plan, meta_predicates)?; + Ok(LogicalPlan::Distinct(Distinct { + input: Arc::new(plan), + })) + } other => { // The rest of the plans have no inputs to optimize, or it makes no sense // to optimize them. @@ -190,96 +291,130 @@ fn filter_split_meta( /// These will later be concatenated into a single `Filter` node each. fn split_predicates( predicate: &Expr, - mut normal_predicates: Vec, - mut meta_predicates: Vec, -) -> (Vec, Vec) { + normal_predicates: &mut Vec, + meta_predicates: &mut IndexSet, +) { if let Expr::BinaryExpr { left, op, right } = predicate { if *op == Operator::And { - let (normal_predicates, meta_predicates) = - split_predicates(left, normal_predicates, meta_predicates); - let (normal_predicates, meta_predicates) = - split_predicates(right, normal_predicates, meta_predicates); - return (normal_predicates, meta_predicates); + split_predicates(left, normal_predicates, meta_predicates); + split_predicates(right, normal_predicates, meta_predicates); + return; } } - if is_meta_predicate(predicate) { - meta_predicates.push(predicate.clone()); + if meta_column_from_predicate(predicate).is_some() { + meta_predicates.insert(predicate.clone()); } else { normal_predicates.push(predicate.clone()); } - (normal_predicates, meta_predicates) } -/// Determines if the provided expression is a meta predicate. +/// Gets a reference to the meta column in the provided expression, if any. /// Supported variants: /// - `BinaryExpr` with `Eq`, `Like`, or `ILike` operators and one of the sides being a meta column; /// - `Like` or `ILike` with expr or pattern being a meta column; /// - `IsNotNull` over a meta column (or `Not` over `IsNull` over a meta column); /// - `InList` with one value in list and expr or list value being a meta column. -fn is_meta_predicate(predicate: &Expr) -> bool { +fn meta_column_from_predicate(predicate: &Expr) -> Option<&Column> { match predicate { - Expr::BinaryExpr { left, op, right } => { - if matches!(op, Operator::Eq | Operator::Like | Operator::ILike) { - return is_meta_column(left) || is_meta_column(right); - } - false - } + Expr::BinaryExpr { + left, + op: Operator::Eq | Operator::Like | Operator::ILike, + right, + } => meta_column_from_column(left).or_else(|| meta_column_from_column(right)), Expr::Like(like) | Expr::ILike(like) => { - is_meta_column(&like.expr) || is_meta_column(&like.pattern) + meta_column_from_column(&like.expr).or_else(|| meta_column_from_column(&like.pattern)) } - Expr::IsNotNull(expr) => is_meta_column(expr), + Expr::IsNotNull(expr) => meta_column_from_column(expr), Expr::Not(expr) => match expr.as_ref() { - Expr::IsNull(expr) => is_meta_column(expr), - _ => false, + Expr::IsNull(expr) => meta_column_from_column(expr), + _ => None, }, Expr::InList { expr, list, negated: false, - } => { - if list.len() != 1 { - return false; - } - is_meta_column(expr) || is_meta_column(&list[0]) + } if list.len() == 1 => { + meta_column_from_column(expr).or_else(|| meta_column_from_column(&list[0])) } - _ => false, + _ => None, } } -/// Determines if the provided expression is meta column reference. +/// Gets reference to the meta column in the provided column expression, if any. /// Currently, only `__user` is considered a meta column. /// Additionally, `Lower` function over a meta column or casting meta column /// is also considered a meta column. -fn is_meta_column(expr: &Expr) -> bool { +fn meta_column_from_column(expr: &Expr) -> Option<&Column> { match expr { - Expr::Column(Column { name, .. }) => name.eq_ignore_ascii_case("__user"), - Expr::ScalarFunction { fun, args } => { - if matches!(fun, BuiltinScalarFunction::Lower) && args.len() == 1 { - return is_meta_column(&args[0]); - } - false + Expr::Column(column) if column.name.eq_ignore_ascii_case("__user") => Some(column), + Expr::ScalarFunction { fun, args } + if matches!(fun, BuiltinScalarFunction::Lower) && args.len() == 1 => + { + meta_column_from_column(&args[0]) } - Expr::Cast { expr, .. } => is_meta_column(expr), - _ => false, + Expr::Cast { expr, .. } => meta_column_from_column(expr), + _ => None, } } /// Concatenates the provided predicates into a single expression using `AND` operator. -fn collect_predicates(predicates: Vec) -> Result { - predicates - .into_iter() - .reduce(|last, next| Expr::BinaryExpr { +fn collect_predicates(predicates: Vec, reverse: bool) -> Option { + let predicates_iter = predicates.into_iter(); + if reverse { + predicates_iter.rev().reduce(|last, next| Expr::BinaryExpr { left: Box::new(last), op: Operator::And, right: Box::new(next), }) - .ok_or_else(|| { - DataFusionError::Internal( - "Unable to optimize plan: can't concatenate predicates, vec is unexpectedly empty" - .to_string(), - ) + } else { + predicates_iter.reduce(|last, next| Expr::BinaryExpr { + left: Box::new(last), + op: Operator::And, + right: Box::new(next), }) + } +} + +/// Issues meta predicates, if any and if applicable, returning either the original plan +/// or a filtered plan with meta predicates applied. +/// Predicates that have been issued are removed from the `meta_predicates` set. +fn issue_meta_predicates( + plan: LogicalPlan, + meta_predicates: &mut IndexSet, +) -> Result { + if meta_predicates.is_empty() { + return Ok(plan); + } + + // Collect meta predicates that can be applied to the plan. + let schema = plan.schema(); + let mut can_be_applied_indices = vec![]; + for (index, predicate) in meta_predicates.iter().enumerate() { + let Some(meta_column) = meta_column_from_predicate(predicate) else { + continue; + }; + if schema.field_from_column(meta_column).is_ok() { + can_be_applied_indices.push(index); + } + } + if can_be_applied_indices.is_empty() { + return Ok(plan); + } + + // Apply the predicates. + let can_be_applied = can_be_applied_indices + .iter() + .rev() + .filter_map(|index| meta_predicates.shift_remove_index(*index)) + .collect::>(); + let Some(issued_predicates) = collect_predicates(can_be_applied, true) else { + return Ok(plan); + }; + Ok(LogicalPlan::Filter(Filter { + predicate: issued_predicates, + input: Arc::new(plan), + })) } #[cfg(test)] diff --git a/rust/cubesql/cubesql/src/compile/test/test_user_change.rs b/rust/cubesql/cubesql/src/compile/test/test_user_change.rs index 2e7fc1c2c9da2..a34c73eb50685 100644 --- a/rust/cubesql/cubesql/src/compile/test/test_user_change.rs +++ b/rust/cubesql/cubesql/src/compile/test/test_user_change.rs @@ -280,7 +280,8 @@ GROUP BY 1 assert_eq!(load_calls[0].meta.change_user(), Some("gopher".to_string())); } -/// This should test that query with CubeScanWrapper uses proper change_user for both SQL generation and execution calls +/// This should test that query with CubeScanWrapper uses proper change_user +/// for both SQL generation and execution calls #[tokio::test] async fn test_user_change_sql_generation_cast() { if !Rewriter::sql_push_down_enabled() { @@ -319,6 +320,61 @@ GROUP BY 1 assert_eq!(load_calls[0].meta.change_user(), Some("gopher".to_string())); } +/// This should test that query with CubeScanWrapper and joins with multiple WHERE clauses +/// uses proper change_user for both SQL generation and execution calls +#[tokio::test] +async fn test_user_change_sql_push_down_with_joins() { + if !Rewriter::sql_push_down_enabled() { + return; + } + init_testing_logger(); + + let context = TestContext::new(DatabaseProtocol::PostgreSQL).await; + + context + .execute_query( + // language=PostgreSQL + r#" +SELECT + COALESCE(customer_gender, 'N/A') AS customer_gender, + AVG(avgPrice) +FROM + KibanaSampleDataEcommerce + INNER JOIN ( + SELECT + COALESCE(customer_gender, 'N/A') AS customer_gender + FROM + KibanaSampleDataEcommerce + WHERE + CAST(__user AS TEXT) = 'gopher' + AND LOWER(customer_gender) = 'test' + GROUP BY 1 + ) t0 ON ( + CAST(KibanaSampleDataEcommerce.customer_gender AS TEXT) + IS NOT DISTINCT FROM + CAST(t0.customer_gender AS TEXT) + ) +WHERE + CAST(KibanaSampleDataEcommerce.__user AS TEXT) = 'gopher' + AND LOWER(KibanaSampleDataEcommerce.customer_gender) = 'test' +GROUP BY 1 +; + "# + .to_string(), + ) + .await + .expect_err("Test transport does not support load with SQL"); + + let load_calls = context.load_calls().await; + assert_eq!(load_calls.len(), 1); + let sql_query = load_calls[0].sql_query.as_ref().unwrap(); + // This should be placed from load meta to query by TestConnectionTransport::sql + // It would mean that SQL generation used changed user + assert!(sql_query.sql.contains(r#""changeUser": "gopher""#)); + assert!(!sql_query.sql.contains("= 'gopher'")); + assert_eq!(load_calls[0].meta.change_user(), Some("gopher".to_string())); +} + /// Repeated aggregation should be flattened even in presence of __user filter #[tokio::test] async fn flatten_aggregation_into_user_change() {