-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
Describe the bug
DataFusion fails with schema mismatch error when processing UNION ALL query on parquet files with field metadata.
Error while planning query: Internal error: Physical input schema should be the same as the one converted from logical input schema. Differences: .
To Reproduce
Parquet: union_all_repo.zip
Query:
SET datafusion.execution.parquet.skip_metadata = false;
SELECT AVG(usage_idle), AVG(usage_system)
FROM (
SELECT time, usage_idle, NULL::DOUBLE as usage_system FROM 'union_all_repro.parquet'
UNION ALL
SELECT time, NULL::DOUBLE as usage_idle, usage_system FROM 'union_all_repro.parquet'
);Expected behavior
Query should succeed with matching logical and physical schema
Additional context
Sequence of Events
1. Projection Optimization
optimize_projections sees that time column is unused in the projection and removes it from the UnionExec
+------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| initial_logical_plan | Projection: avg(usage_idle), avg(usage_system) |
| | Aggregate: groupBy=[[]], aggr=[[avg(usage_idle), avg(usage_system)]] |
| | Union |
| | Projection: ./union_all_repro.parquet.time, ./union_all_repro.parquet.usage_idle, CAST(NULL AS Float64) AS usage_system |
| | TableScan: ./union_all_repro.parquet |
| | Projection: ./union_all_repro.parquet.time, CAST(NULL AS Float64) AS usage_idle, ./union_all_repro.parquet.usage_system |
| | TableScan: ./union_all_repro.parquet |
| logical_plan after resolve_grouping_function | SAME TEXT AS ABOVE |
| logical_plan after type_coercion | SAME TEXT AS ABOVE |
| analyzed_logical_plan | SAME TEXT AS ABOVE |
| logical_plan after eliminate_nested_union | SAME TEXT AS ABOVE |
| logical_plan after simplify_expressions | Projection: avg(usage_idle), avg(usage_system) |
| | Aggregate: groupBy=[[]], aggr=[[avg(usage_idle), avg(usage_system)]] |
| | Union |
| | Projection: ./union_all_repro.parquet.time, ./union_all_repro.parquet.usage_idle, Float64(NULL) AS usage_system |
| | TableScan: ./union_all_repro.parquet |
| | Projection: ./union_all_repro.parquet.time, Float64(NULL) AS usage_idle, ./union_all_repro.parquet.usage_system |
| | TableScan: ./union_all_repro.parquet |
| logical_plan after replace_distinct_aggregate | SAME TEXT AS ABOVE |
| logical_plan after eliminate_join | SAME TEXT AS ABOVE |
| logical_plan after decorrelate_predicate_subquery | SAME TEXT AS ABOVE |
| logical_plan after scalar_subquery_to_join | SAME TEXT AS ABOVE |
| logical_plan after decorrelate_lateral_join | SAME TEXT AS ABOVE |
| logical_plan after extract_equijoin_predicate | SAME TEXT AS ABOVE |
| logical_plan after eliminate_duplicated_expr | SAME TEXT AS ABOVE |
| logical_plan after eliminate_filter | SAME TEXT AS ABOVE |
| logical_plan after eliminate_cross_join | SAME TEXT AS ABOVE |
| logical_plan after eliminate_limit | SAME TEXT AS ABOVE |
| logical_plan after propagate_empty_relation | SAME TEXT AS ABOVE |
| logical_plan after eliminate_one_union | SAME TEXT AS ABOVE |
| logical_plan after filter_null_join_keys | SAME TEXT AS ABOVE |
| logical_plan after eliminate_outer_join | SAME TEXT AS ABOVE |
| logical_plan after push_down_limit | SAME TEXT AS ABOVE |
| logical_plan after push_down_filter | SAME TEXT AS ABOVE |
| logical_plan after single_distinct_aggregation_to_group_by | SAME TEXT AS ABOVE |
| logical_plan after eliminate_group_by_constant | SAME TEXT AS ABOVE |
| logical_plan after common_sub_expression_eliminate | SAME TEXT AS ABOVE |
| logical_plan after optimize_projections | Aggregate: groupBy=[[]], aggr=[[avg(usage_idle), avg(usage_system)]] |
| | Union |
| | Projection: ./union_all_repro.parquet.usage_idle, Float64(NULL) AS usage_system |
| | TableScan: ./union_all_repro.parquet projection=[usage_idle] |
| | Projection: Float64(NULL) AS usage_idle, ./union_all_repro.parquet.usage_system |
| | TableScan: ./union_all_repro.parquet projection=[usage_system]
2. Schema Recomputation
optimize_projections calls recompute_schema since the plan has changed. recompute_schema sees that the number of fields has changed and creates a new Union node with Union::try_new
Union::try_new calls Union::derive_schema_from_inputs to recreate the Union schema from the child inputs. For each field in the schema, it calls intersect_metadata_for_union which only keeps metadata when the field has the same metadata across all child inputs. Since the NULL literal doesn't have the same metadata as our column from parquet, the metadata gets removed from the logical schema.
3. Physical/Logical Schema Mismatch
During physical planning, we call DefaultPhysicalPlanner::map_logical_node_to_physical which checks the input of the Aggregate node (in this case Union) to see if its logical schema matches its physical schema. However, because we previously removed the metadata from Union, the metadata no longer matches when compared.