From eef498c298821ef9ea8e88562774f75d7f8afc15 Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 15 Oct 2024 14:11:45 -0700 Subject: [PATCH 1/6] test(12733): reproducer of when metadata from the left side is not transferred to the right side --- datafusion/sqllogictest/src/test_context.rs | 24 +++++++++++++++---- .../sqllogictest/test_files/metadata.slt | 9 +++++++ 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/datafusion/sqllogictest/src/test_context.rs b/datafusion/sqllogictest/src/test_context.rs index 2143b3089ee5..21e69a927f3a 100644 --- a/datafusion/sqllogictest/src/test_context.rs +++ b/datafusion/sqllogictest/src/test_context.rs @@ -319,17 +319,28 @@ pub async fn register_metadata_tables(ctx: &SessionContext) { String::from("metadata_key"), String::from("the l_name field"), )])); + let ts = Field::new("ts", DataType::Timestamp(TimeUnit::Nanosecond, None), false) .with_metadata(HashMap::from([( String::from("metadata_key"), String::from("ts non-nullable field"), )])); + let nonnull_name = + Field::new("nonnull_name", DataType::Utf8, false).with_metadata(HashMap::from([ + ( + String::from("metadata_key"), + String::from("the nonnull_name field"), + ), + ])); + let schema = - Schema::new(vec![id, name, l_name, ts]).with_metadata(HashMap::from([( - String::from("metadata_key"), - String::from("the entire schema"), - )])); + Schema::new(vec![id, name, l_name, ts, nonnull_name]).with_metadata(HashMap::from([ + ( + String::from("metadata_key"), + String::from("the entire schema"), + ), + ])); let batch = RecordBatch::try_new( Arc::new(schema), @@ -342,6 +353,11 @@ pub async fn register_metadata_tables(ctx: &SessionContext) { 1599572549190855123, 1599572549190855123, ])) as _, + Arc::new(StringArray::from(vec![ + Some("no_foo"), + Some("no_bar"), + Some("no_baz"), + ])) as _, ], ) .unwrap(); diff --git a/datafusion/sqllogictest/test_files/metadata.slt b/datafusion/sqllogictest/test_files/metadata.slt index 588a36e3d515..f11d9f4b6cbe 100644 --- a/datafusion/sqllogictest/test_files/metadata.slt +++ b/datafusion/sqllogictest/test_files/metadata.slt @@ -123,6 +123,15 @@ ORDER BY id, name, l_name; NULL bar NULL NULL NULL l_bar +# Regression test: missing field metadata from left side of the union when right side is chosen +statement error DataFusion error: Internal error: Physical input schema should be the same as the one converted from logical input schema.. +select name from ( + SELECT nonnull_name as name FROM "table_with_metadata" + UNION ALL + SELECT NULL::string as name +) group by name order by name; + + query P rowsort From 0d25c56c5a853f9f7948406f3b329f878844c787 Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 11 Oct 2024 12:33:06 -0700 Subject: [PATCH 2/6] fix(12733): because either the left or right fields may be chosen, add metadata from both to each other --- datafusion/physical-plan/src/union.rs | 29 +++++++++++++-------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 108e42e7be42..9ff7710f968b 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -472,21 +472,20 @@ fn union_schema(inputs: &[Arc]) -> SchemaRef { .map(|i| { inputs .iter() - .filter_map(|input| { - if input.schema().fields().len() > i { - let field = input.schema().field(i).clone(); - let right_hand_metdata = inputs - .get(1) - .map(|right_input| { - right_input.schema().field(i).metadata().clone() - }) - .unwrap_or_default(); - let mut metadata = field.metadata().clone(); - metadata.extend(right_hand_metdata); - Some(field.with_metadata(metadata)) - } else { - None - } + .enumerate() + .map(|(input_idx, input)| { + let field = input.schema().field(i).clone(); + let mut metadata = field.metadata().clone(); + + let other_side_metdata = inputs + .get(input_idx ^ (1 << 0)) + .map(|other_input| { + other_input.schema().field(i).metadata().clone() + }) + .unwrap_or_default(); + + metadata.extend(other_side_metdata); + field.with_metadata(metadata) }) .find_or_first(|f| f.is_nullable()) .unwrap() From c92b3bd9fda47fc2f9d52a553b4f3ea3b1a3c9f7 Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 15 Oct 2024 14:52:08 -0700 Subject: [PATCH 3/6] test(12733): update regression test to show that fix works --- datafusion/sqllogictest/test_files/metadata.slt | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/datafusion/sqllogictest/test_files/metadata.slt b/datafusion/sqllogictest/test_files/metadata.slt index f11d9f4b6cbe..99d3855c02c9 100644 --- a/datafusion/sqllogictest/test_files/metadata.slt +++ b/datafusion/sqllogictest/test_files/metadata.slt @@ -124,13 +124,17 @@ NULL bar NULL NULL NULL l_bar # Regression test: missing field metadata from left side of the union when right side is chosen -statement error DataFusion error: Internal error: Physical input schema should be the same as the one converted from logical input schema.. +query T select name from ( SELECT nonnull_name as name FROM "table_with_metadata" UNION ALL SELECT NULL::string as name ) group by name order by name; - +---- +no_bar +no_baz +no_foo +NULL From ec9cf2d3a6759cccb5a433a64d7cf7fe4db111b4 Mon Sep 17 00:00:00 2001 From: itsjunetime Date: Tue, 22 Oct 2024 15:38:23 -0600 Subject: [PATCH 4/6] Add extra test to fix other issue with schema metadata --- datafusion/sqllogictest/test_files/metadata.slt | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/metadata.slt b/datafusion/sqllogictest/test_files/metadata.slt index 99d3855c02c9..8f787254c096 100644 --- a/datafusion/sqllogictest/test_files/metadata.slt +++ b/datafusion/sqllogictest/test_files/metadata.slt @@ -136,7 +136,21 @@ no_baz no_foo NULL - +# Regression test: missing schema metadata from union when schema with metadata isn't the first one +# and also ensure it works fine with multiple unions +query T +select name from ( + SELECT NULL::string as name + UNION ALL + SELECT nonnull_name as name FROM "table_with_metadata" + UNION ALL + SELECT NULL::string as name +) group by name order by name; +---- +no_bar +no_baz +no_foo +NULL query P rowsort SELECT ts From b229807b9e449ed6ad6db18353a2798442ad86bc Mon Sep 17 00:00:00 2001 From: itsjunetime Date: Tue, 22 Oct 2024 15:40:12 -0600 Subject: [PATCH 5/6] Fix union_schema to merge metadatas for both fields and schema --- datafusion/physical-plan/src/union.rs | 33 ++++++++++++++++----------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 9ff7710f968b..433dda870def 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -468,7 +468,9 @@ pub fn can_interleave>>( } fn union_schema(inputs: &[Arc]) -> SchemaRef { - let fields: Vec = (0..inputs[0].schema().fields().len()) + let first_schema = inputs[0].schema(); + + let fields = (0..first_schema.fields().len()) .map(|i| { inputs .iter() @@ -477,25 +479,30 @@ fn union_schema(inputs: &[Arc]) -> SchemaRef { let field = input.schema().field(i).clone(); let mut metadata = field.metadata().clone(); - let other_side_metdata = inputs - .get(input_idx ^ (1 << 0)) - .map(|other_input| { - other_input.schema().field(i).metadata().clone() - }) - .unwrap_or_default(); + let other_metadatas = inputs + .iter() + .enumerate() + .filter(|(other_idx, _)| *other_idx != input_idx) + .flat_map(|(_, other_input)| { + other_input.schema().field(i).metadata().clone().into_iter() + }); - metadata.extend(other_side_metdata); + metadata.extend(other_metadatas); field.with_metadata(metadata) }) - .find_or_first(|f| f.is_nullable()) + .find_or_first(Field::is_nullable) + // We can unwrap this because if inputs was empty, this would've already panic'ed when we + // indexed into inputs[0]. .unwrap() }) + .collect::>(); + + let all_metadata_merged = inputs + .iter() + .flat_map(|i| i.schema().metadata().clone().into_iter()) .collect(); - Arc::new(Schema::new_with_metadata( - fields, - inputs[0].schema().metadata().clone(), - )) + Arc::new(Schema::new_with_metadata(fields, all_metadata_merged)) } /// CombinedRecordBatchStream can be used to combine a Vec of SendableRecordBatchStreams into one From a8e7c682531fb57d7ccca7d7b50f1941a3d4653d Mon Sep 17 00:00:00 2001 From: itsjunetime Date: Tue, 22 Oct 2024 16:04:25 -0600 Subject: [PATCH 6/6] fmt --- datafusion/sqllogictest/src/test_context.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/datafusion/sqllogictest/src/test_context.rs b/datafusion/sqllogictest/src/test_context.rs index 21e69a927f3a..deeacb1b8819 100644 --- a/datafusion/sqllogictest/src/test_context.rs +++ b/datafusion/sqllogictest/src/test_context.rs @@ -334,13 +334,12 @@ pub async fn register_metadata_tables(ctx: &SessionContext) { ), ])); - let schema = - Schema::new(vec![id, name, l_name, ts, nonnull_name]).with_metadata(HashMap::from([ - ( - String::from("metadata_key"), - String::from("the entire schema"), - ), - ])); + let schema = Schema::new(vec![id, name, l_name, ts, nonnull_name]).with_metadata( + HashMap::from([( + String::from("metadata_key"), + String::from("the entire schema"), + )]), + ); let batch = RecordBatch::try_new( Arc::new(schema),