Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using Union's input schema when recompute schema #10494

Closed
wants to merge 4 commits into from

Conversation

yyy1000
Copy link
Contributor

@yyy1000 yyy1000 commented May 14, 2024

Which issue does this PR close?

Closes #.

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@yyy1000 yyy1000 marked this pull request as draft May 14, 2024 02:41
@github-actions github-actions bot added the logical-expr Logical plan and expressions label May 14, 2024
@github-actions github-actions bot added the optimizer Optimizer rules label May 14, 2024
@yyy1000
Copy link
Contributor Author

yyy1000 commented May 14, 2024

This is an example.
The sql is

SELECT A.col_int32 FROM test AS A \
        INNER JOIN ( \
          SELECT col_int32 FROM test WHERE 1 = 0 \
        ) AS B ON A.col_int32 = B.col_int32 \
        UNION ALL \
        SELECT test.col_int32 FROM test WHERE 1 = 1 \
        UNION ALL \
        SELECT test.col_int32 FROM test WHERE 0 = 0 \
        UNION ALL \
        SELECT test.col_int32 FROM test WHERE test.col_int32 < 0 \
        UNION ALL \
        SELECT test.col_int32 FROM test WHERE 1 = 0
+----------------------------------------------------------------------------+----------------------------------------------------------------------------+
| plan_type                                                                  | plan                                                                       |
+----------------------------------------------------------------------------+----------------------------------------------------------------------------+
| initial_logical_plan                                                       | Union                                                                      |
|                                                                            |   Union                                                                    |
|                                                                            |     Union                                                                  |
|                                                                            |       Union                                                                |
|                                                                            |         Projection: a.col_int32                                            |
|                                                                            |           Inner Join:  Filter: a.col_int32 = b.col_int32                   |
|                                                                            |             SubqueryAlias: a                                               |
|                                                                            |               TableScan: test                                              |
|                                                                            |             SubqueryAlias: b                                               |
|                                                                            |               Projection: test.col_int32                                   |
|                                                                            |                 Filter: Int64(1) = Int64(0)                                |
|                                                                            |                   TableScan: test                                          |
|                                                                            |         Projection: test.col_int32                                         |
|                                                                            |           Filter: Int64(1) = Int64(1)                                      |
|                                                                            |             TableScan: test                                                |
|                                                                            |       Projection: test.col_int32                                           |
|                                                                            |         Filter: Int64(0) = Int64(0)                                        |
|                                                                            |           TableScan: test                                                  |
|                                                                            |     Projection: test.col_int32                                             |
|                                                                            |       Filter: test.col_int32 < Int64(0)                                    |
|                                                                            |         TableScan: test                                                    |
|                                                                            |   Projection: test.col_int32                                               |
|                                                                            |     Filter: Int64(1) = Int64(0)                                            |
|                                                                            |       TableScan: test                                                      |
| logical_plan after apply_function_rewrites                                 | SAME TEXT AS ABOVE                                                         |
| logical_plan after inline_table_scan                                       | SAME TEXT AS ABOVE                                                         |
| logical_plan after type_coercion                                           | Union                                                                      |
|                                                                            |   Union                                                                    |
|                                                                            |     Union                                                                  |
|                                                                            |       Union                                                                |
|                                                                            |         Projection: a.col_int32                                            |
|                                                                            |           Inner Join:  Filter: a.col_int32 = b.col_int32                   |
|                                                                            |             SubqueryAlias: a                                               |
|                                                                            |               TableScan: test                                              |
|                                                                            |             SubqueryAlias: b                                               |
|                                                                            |               Projection: test.col_int32                                   |
|                                                                            |                 Filter: Int64(1) = Int64(0)                                |
|                                                                            |                   TableScan: test                                          |
|                                                                            |         Projection: test.col_int32                                         |
|                                                                            |           Filter: Int64(1) = Int64(1)                                      |
|                                                                            |             TableScan: test                                                |
|                                                                            |       Projection: test.col_int32                                           |
|                                                                            |         Filter: Int64(0) = Int64(0)                                        |
|                                                                            |           TableScan: test                                                  |
|                                                                            |     Projection: test.col_int32                                             |
|                                                                            |       Filter: CAST(test.col_int32 AS Int64) < Int64(0)                     |
|                                                                            |         TableScan: test                                                    |
|                                                                            |   Projection: test.col_int32                                               |
|                                                                            |     Filter: Int64(1) = Int64(0)                                            |
|                                                                            |       TableScan: test                                                      |
| logical_plan after count_wildcard_rule                                     | SAME TEXT AS ABOVE                                                         |
| analyzed_logical_plan                                                      | SAME TEXT AS ABOVE                                                         |
| logical_plan after eliminate_nested_union                                  | Union                                                                      |
|                                                                            |   Projection: a.col_int32                                                  |
|                                                                            |     Inner Join:  Filter: a.col_int32 = b.col_int32                         |
|                                                                            |       SubqueryAlias: a                                                     |
|                                                                            |         TableScan: test                                                    |
|                                                                            |       SubqueryAlias: b                                                     |
|                                                                            |         Projection: test.col_int32                                         |
|                                                                            |           Filter: Int64(1) = Int64(0)                                      |
|                                                                            |             TableScan: test                                                |
|                                                                            |   Projection: test.col_int32                                               |
|                                                                            |     Filter: Int64(1) = Int64(1)                                            |
|                                                                            |       TableScan: test                                                      |
|                                                                            |   Projection: test.col_int32                                               |
|                                                                            |     Filter: Int64(0) = Int64(0)                                            |
|                                                                            |       TableScan: test                                                      |
|                                                                            |   Projection: test.col_int32                                               |
|                                                                            |     Filter: CAST(test.col_int32 AS Int64) < Int64(0)                       |
|                                                                            |       TableScan: test                                                      |
|                                                                            |   Projection: test.col_int32                                               |
|                                                                            |     Filter: Int64(1) = Int64(0)                                            |
|                                                                            |       TableScan: test                                                      |
| logical_plan after simplify_expressions                                    | Union                                                                      |
|                                                                            |   Projection: a.col_int32                                                  |
|                                                                            |     Inner Join:  Filter: a.col_int32 = b.col_int32                         |
|                                                                            |       SubqueryAlias: a                                                     |
|                                                                            |         TableScan: test                                                    |
|                                                                            |       SubqueryAlias: b                                                     |
|                                                                            |         Projection: test.col_int32                                         |
|                                                                            |           Filter: Boolean(false)                                           |
|                                                                            |             TableScan: test                                                |
|                                                                            |   Projection: test.col_int32                                               |
|                                                                            |     Filter: Boolean(true)                                                  |
|                                                                            |       TableScan: test                                                      |
|                                                                            |   Projection: test.col_int32                                               |
|                                                                            |     Filter: Boolean(true)                                                  |
|                                                                            |       TableScan: test                                                      |
|                                                                            |   Projection: test.col_int32                                               |
|                                                                            |     Filter: CAST(test.col_int32 AS Int64) < Int64(0)                       |
|                                                                            |       TableScan: test                                                      |
|                                                                            |   Projection: test.col_int32                                               |
|                                                                            |     Filter: Boolean(false)                                                 |
|                                                                            |       TableScan: test                                                      |
| logical_plan after unwrap_cast_in_comparison                               | Union                                                                      |
|                                                                            |   Projection: a.col_int32                                                  |
|                                                                            |     Inner Join:  Filter: a.col_int32 = b.col_int32                         |
|                                                                            |       SubqueryAlias: a                                                     |
|                                                                            |         TableScan: test                                                    |
|                                                                            |       SubqueryAlias: b                                                     |
|                                                                            |         Projection: test.col_int32                                         |
|                                                                            |           Filter: Boolean(false)                                           |
|                                                                            |             TableScan: test                                                |
|                                                                            |   Projection: test.col_int32                                               |
|                                                                            |     Filter: Boolean(true)                                                  |
|                                                                            |       TableScan: test                                                      |
|                                                                            |   Projection: test.col_int32                                               |
|                                                                            |     Filter: Boolean(true)                                                  |
|                                                                            |       TableScan: test                                                      |
|                                                                            |   Projection: test.col_int32                                               |
|                                                                            |     Filter: test.col_int32 < Int32(0)                                      |
|                                                                            |       TableScan: test                                                      |
|                                                                            |   Projection: test.col_int32                                               |
|                                                                            |     Filter: Boolean(false)                                                 |
|                                                                            |       TableScan: test                                                      |
| logical_plan after replace_distinct_aggregate                              | SAME TEXT AS ABOVE                                                         |
| logical_plan after eliminate_join                                          | SAME TEXT AS ABOVE                                                         |
| logical_plan after decorrelate_predicate_subquery                          | SAME TEXT AS ABOVE                                                         |
| logical_plan after scalar_subquery_to_join                                 | SAME TEXT AS ABOVE                                                         |
| logical_plan after extract_equijoin_predicate                              | Union                                                                      |
|                                                                            |   Projection: a.col_int32                                                  |
|                                                                            |     Inner Join: a.col_int32 = b.col_int32                                  |
|                                                                            |       SubqueryAlias: a                                                     |
|                                                                            |         TableScan: test                                                    |
|                                                                            |       SubqueryAlias: b                                                     |
|                                                                            |         Projection: test.col_int32                                         |
|                                                                            |           Filter: Boolean(false)                                           |
|                                                                            |             TableScan: test                                                |
|                                                                            |   Projection: test.col_int32                                               |
|                                                                            |     Filter: Boolean(true)                                                  |
|                                                                            |       TableScan: test                                                      |
|                                                                            |   Projection: test.col_int32                                               |
|                                                                            |     Filter: Boolean(true)                                                  |
|                                                                            |       TableScan: test                                                      |
|                                                                            |   Projection: test.col_int32                                               |
|                                                                            |     Filter: test.col_int32 < Int32(0)                                      |
|                                                                            |       TableScan: test                                                      |
|                                                                            |   Projection: test.col_int32                                               |
|                                                                            |     Filter: Boolean(false)                                                 |
|                                                                            |       TableScan: test                                                      |
| logical_plan after simplify_expressions                                    | SAME TEXT AS ABOVE                                                         |
| logical_plan after rewrite_disjunctive_predicate                           | SAME TEXT AS ABOVE                                                         |
| logical_plan after eliminate_duplicated_expr                               | SAME TEXT AS ABOVE                                                         |
| logical_plan after eliminate_filter                                        | Union                                                                      |
|                                                                            |   Projection: a.col_int32                                                  |
|                                                                            |     Inner Join: a.col_int32 = b.col_int32                                  |
|                                                                            |       SubqueryAlias: a                                                     |
|                                                                            |         TableScan: test                                                    |
|                                                                            |       SubqueryAlias: b                                                     |
|                                                                            |         Projection: test.col_int32                                         |
|                                                                            |           EmptyRelation                                                    |
|                                                                            |   Projection: test.col_int32                                               |
|                                                                            |     TableScan: test                                                        |
|                                                                            |   Projection: test.col_int32                                               |
|                                                                            |     TableScan: test                                                        |
|                                                                            |   Projection: test.col_int32                                               |
|                                                                            |     Filter: test.col_int32 < Int32(0)                                      |
|                                                                            |       TableScan: test                                                      |
|                                                                            |   Projection: test.col_int32                                               |
|                                                                            |     EmptyRelation                                                          |
| logical_plan after eliminate_cross_join                                    | SAME TEXT AS ABOVE                                                         |
| logical_plan after common_sub_expression_eliminate                         | SAME TEXT AS ABOVE                                                         |
| logical_plan after eliminate_limit                                         | SAME TEXT AS ABOVE                                                         |
| logical_plan after propagate_empty_relation                                | Union                                                                      |
|                                                                            |   Projection: test.col_int32                                               |
|                                                                            |     TableScan: test                                                        |
|                                                                            |   Projection: test.col_int32                                               |
|                                                                            |     TableScan: test                                                        |
|                                                                            |   Projection: test.col_int32                                               |
|                                                                            |     Filter: test.col_int32 < Int32(0)                                      |
|                                                                            |       TableScan: test                                                      |
| logical_plan after eliminate_one_union                                     | SAME TEXT AS ABOVE                                                         |
| logical_plan after filter_null_join_keys                                   | SAME TEXT AS ABOVE                                                         |
| logical_plan after eliminate_outer_join                                    | SAME TEXT AS ABOVE                                                         |
| logical_plan after push_down_limit                                         | SAME TEXT AS ABOVE                                                         |
| logical_plan after push_down_filter                                        | SAME TEXT AS ABOVE                                                         |
| logical_plan after single_distinct_aggregation_to_group_by                 | SAME TEXT AS ABOVE                                                         |
| logical_plan after simplify_expressions                                    | SAME TEXT AS ABOVE                                                         |
| logical_plan after unwrap_cast_in_comparison                               | SAME TEXT AS ABOVE                                                         |
| logical_plan after Optimizer rule 'common_sub_expression_eliminate' failed | Schema error: No field named a.col_int32. Valid fields are test.col_int32. |
+----------------------------------------------------------------------------+----------------------------------------------------------------------------+

),
)))
}
} else {
Ok(Transformed::yes(LogicalPlan::Union(Union {
inputs: new_inputs,
schema: union.schema.clone(),
schema: input_schema.clone(),
Copy link
Contributor Author

@yyy1000 yyy1000 May 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the example, after propagate_empty_relation, the new inputs of Union doesn't have the Alias 'a', but only test exists.
If we want to use the new inputs's schema, it will be
DFSchema { inner: Schema { fields: [Field { name: "col_int32", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, field_qualifiers: [Some(Bare { table: "test" })], functional_dependencies: FunctionalDependencies { deps: [] } }

However, the schema before is DFSchema { inner: Schema { fields: [Field { name: "col_int32", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, field_qualifiers: [Some(Bare { table: "a" })], functional_dependencies: FunctionalDependencies { deps: [] } }

The only difference is field_qualifiers, but assert_schema_is_the_same will throw an error if schema changed during the optimization. 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a solution may be, when creating the schema for a plan and for Alias{expr, name}, we use the expr's name rather than the name, however this would also be a big changing I think. 🥲

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have thought that Union required all the inputs to have the exact same schema (and be properly coerced / aliased to make that the case)

Maybe there is a bug when constructing Union somewhere that permits the inputs to have different schemas 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. It seems true.
The current code will create the Union schema based on it's first input,

// create union schema
let union_qualified_fields =
zip(left_plan.schema().iter(), right_plan.schema().iter())
.map(
|((left_qualifier, left_field), (_right_qualifier, right_field))| {
let nullable = left_field.is_nullable() || right_field.is_nullable();
let data_type = comparison_coercion(
left_field.data_type(),
right_field.data_type(),
)
.ok_or_else(|| {
plan_datafusion_err!(
"UNION Column {} (type: {}) is not compatible with column {} (type: {})",
right_field.name(),
right_field.data_type(),
left_field.name(),
left_field.data_type()
)
})?;
Ok((
left_qualifier.cloned(),
Arc::new(Field::new(left_field.name(), data_type, nullable)),
))
},
)
.collect::<Result<Vec<_>>>()?;
. In the example, it's 'a', but later 'a' was removed, and the new input schema field would be 'test'.
However I think this example may be a use case, based on the example a Union has two Project plan as inputs, how to coerce them seems a question for me. 🥲 They already have different schemas and I can't know which one would be the merged schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or from earlier, what if when we build the schema for Project and the input for the Project is an Alias, we use Alias's expr name rather than Alias's name. 🤔 I'm not sure whether it would break some criteria.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 maybe we also need add an alias for columns in the subsequent children so the column names match exactly 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😢That's true. I'm kind of confusing how to deal with it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like LogicalPlanBuilder::union has the code to do the schema coercion / matching

pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalPlan> {
let left_col_num = left_plan.schema().fields().len();
// check union plan length same.
let right_col_num = right_plan.schema().fields().len();
if right_col_num != left_col_num {
return plan_err!(
"Union queries must have the same number of columns, (left is {left_col_num}, right is {right_col_num})");
}
// create union schema
let union_qualified_fields =
zip(left_plan.schema().iter(), right_plan.schema().iter())
.map(
|((left_qualifier, left_field), (_right_qualifier, right_field))| {
let nullable = left_field.is_nullable() || right_field.is_nullable();
let data_type = comparison_coercion(
left_field.data_type(),
right_field.data_type(),
)
.ok_or_else(|| {
plan_datafusion_err!(
"UNION Column {} (type: {}) is not compatible with column {} (type: {})",
right_field.name(),
right_field.data_type(),
left_field.name(),
left_field.data_type()
)
})?;
Ok((
left_qualifier.cloned(),
Arc::new(Field::new(left_field.name(), data_type, nullable)),
))
},
)
.collect::<Result<Vec<_>>>()?;
let union_schema =
DFSchema::new_with_metadata(union_qualified_fields, HashMap::new())?;
let inputs = vec![left_plan, right_plan]
.into_iter()
.map(|p| {
let plan = coerce_plan_expr_for_schema(&p, &union_schema)?;
match plan {
LogicalPlan::Projection(Projection { expr, input, .. }) => {
Ok(Arc::new(project_with_column_index(
expr,
input,
Arc::new(union_schema.clone()),
)?))
}
other_plan => Ok(Arc::new(other_plan)),
}
})
.collect::<Result<Vec<_>>>()?;
if inputs.is_empty() {
return plan_err!("Empty UNION");
}
Ok(LogicalPlan::Union(Union {
inputs,
schema: Arc::new(union_schema),
}))
}

So perhaps we need to either:

  1. Fix the LogicalPlanBuilder code
  2. Update optimizer passes to not create LogicalPlan::Union directly but instead use the builder (and thus apply the schema rationalization to occur)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I understand you.
But for solution 1, like I said in #10494 (comment), the merged schema would use the left_qualifier in line 1395 - 1396, i.e. the first LogicalPlan's.

left_qualifier.cloned(), 
Arc::new(Field::new(left_field.name(), data_type, nullable)),

However, I'm not quiet clear how to set the correct qualifier, they are different due to a SubqueryAlias🤔. Choosing either one would make some case failed. 😞

@yyy1000 yyy1000 marked this pull request as ready for review May 14, 2024 19:14
@alamb alamb marked this pull request as draft May 15, 2024 19:07
@alamb
Copy link
Contributor

alamb commented May 15, 2024

Marking as draft as I think this PR is no longer waiting on feedback.

Copy link

Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days.

@github-actions github-actions bot added the Stale PR has not had any activity for some time label Jul 15, 2024
@github-actions github-actions bot closed this Jul 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
logical-expr Logical plan and expressions optimizer Optimizer rules Stale PR has not had any activity for some time
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants