Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-41151][SQL] Keep built-in file _metadata column nullable value consistent #38683

Closed
wants to merge 9 commits into from

Conversation

Yaohua628
Copy link
Contributor

@Yaohua628 Yaohua628 commented Nov 17, 2022

What changes were proposed in this pull request?

In FileSourceStrategy, we add an Alias node to wrap the file metadata fields (e.g. file_name, file_size) in a NamedStruct (here). But CreateNamedStruct has an override nullable value false (here), which is different from the _metadata struct nullable value true (here).

This PR fixes this, by changing _metadata column to be always not nullable. Rationale:

  1. By definition, _metadata for file-based sources is always not null;
  2. If users have already persisted this nullable _metadata somewhere, then it's totally fine to write non-nullable data to this nullable column.

Why are the changes needed?

For stateful streaming, we store the schema in the state store and check consistency across batches. To avoid state schema compatibility mismatched, we should keep nullable consistent in _metadata.

Does this PR introduce any user-facing change?

No

How was this patch tested?

New UT

@github-actions github-actions bot added the SQL label Nov 17, 2022
@Yaohua628
Copy link
Contributor Author

cc: @HeartSaVioR

@HeartSaVioR
Copy link
Contributor

Maybe simpler to apply KnownNullable / KnownNotNull against CreateStruct to enforce desired nullability? Please refer the change in #35543.

@HeartSaVioR
Copy link
Contributor

cc. @cloud-fan

@Yaohua628
Copy link
Contributor Author

Maybe simpler to apply KnownNullable / KnownNotNull against CreateStruct to enforce desired nullability? Please refer the change in #35543.

Wow, good point, thanks!

@cloud-fan
Copy link
Contributor

shall we change FileSourceMetadataAttribute? I think the metadata column (at least for file source) is always not nullable.

@Yaohua628
Copy link
Contributor Author

shall we change FileSourceMetadataAttribute?

I initially thought we could relax this field for some future cases. But yeah, you are right, it seems like it is always not null for file sources.

But do you think it will cause some compatibility issues? If this nullable has been persisted somewhere?

@cloud-fan
Copy link
Contributor

cloud-fan commented Nov 18, 2022

If it has been persisted before (like a table), then it's totally fine to write non-nullable data to a nullable column. The optimizer may also optimize a column from nullable to non-nullable, so this will happen from time to time.

@Yaohua628
Copy link
Contributor Author

If it has been persisted before (like a table), then it's totally fine to write non-nullable data to a nullable column. The optimizer may also optimize a column from nullable to non-nullable, so this will happen from time to time.

Got it, that makes sense! Updated

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @Yaohua628 . When the PR is open for master branch, [3.3] should not be used.

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-41151][SQL][3.3] Keep built-in file _metadata column nullable value consistent [SPARK-41151][SQL] Keep built-in file _metadata column nullable value consistent Nov 21, 2022
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM (only two minor comments).

@@ -600,7 +600,7 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
val df2 = spark.read.format("json")
.load(dir.getCanonicalPath + "/target/new-streaming-data-join")
// Verify self-join results
assert(streamQuery2.lastProgress.numInputRows == 4L)
assert(streamQuery2.lastProgress.numInputRows == 2L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Off-topic: this is very interesting. Looks like fixing this "enables" ReusedExchange, which somehow makes ProgressReporter pick up the metric from the single leaf node instead of two.

Before the fix

== Parsed Logical Plan ==
WriteToMicroBatchDataSourceV1 FileSink[/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/new-streaming-data-join], 77baa2ac-cc0b-4e01-94ff-ec20c98eb29b, [checkpointLocation=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/checkpoint_join, path=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/new-streaming-data-join], Append, 0
+- Project [name#2339, age#2340, info#2341, _metadata#2345]
   +- Join Inner, ((((name#2339 = name#2504) AND (age#2340 = age#2505)) AND (info#2341 = info#2506)) AND (_metadata#2345 = _metadata#2507))
      :- Project [name#2339, age#2340, info#2341, _metadata#2345]
      :  +- Project [_metadata#2345, name#2339, age#2340, info#2341, _metadata#2345]
      :     +- Project [name#2517 AS name#2339, age#2518 AS age#2340, info#2519 AS info#2341, _metadata#2529 AS _metadata#2345]
      :        +- Relation [name#2517,age#2518,info#2519,_metadata#2529] json
      +- Project [name#2504, age#2505, info#2506, _metadata#2507]
         +- Project [_metadata#2507, name#2504, age#2505, info#2506, _metadata#2507]
            +- Project [name#2523 AS name#2504, age#2524 AS age#2505, info#2525 AS info#2506, _metadata#2530 AS _metadata#2507]
               +- Relation [name#2523,age#2524,info#2525,_metadata#2530] json

== Analyzed Logical Plan ==
name: string, age: int, info: struct<id:bigint,university:string>, _metadata: struct<file_path:string,file_name:string,file_size:bigint,file_modification_time:timestamp>
WriteToMicroBatchDataSourceV1 FileSink[/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/new-streaming-data-join], 77baa2ac-cc0b-4e01-94ff-ec20c98eb29b, [checkpointLocation=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/checkpoint_join, path=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/new-streaming-data-join], Append, 0
+- Project [name#2339, age#2340, info#2341, _metadata#2345]
   +- Join Inner, ((((name#2339 = name#2504) AND (age#2340 = age#2505)) AND (info#2341 = info#2506)) AND (_metadata#2345 = _metadata#2507))
      :- Project [name#2339, age#2340, info#2341, _metadata#2345]
      :  +- Project [_metadata#2345, name#2339, age#2340, info#2341, _metadata#2345]
      :     +- Project [name#2517 AS name#2339, age#2518 AS age#2340, info#2519 AS info#2341, _metadata#2529 AS _metadata#2345]
      :        +- Relation [name#2517,age#2518,info#2519,_metadata#2529] json
      +- Project [name#2504, age#2505, info#2506, _metadata#2507]
         +- Project [_metadata#2507, name#2504, age#2505, info#2506, _metadata#2507]
            +- Project [name#2523 AS name#2504, age#2524 AS age#2505, info#2525 AS info#2506, _metadata#2530 AS _metadata#2507]
               +- Relation [name#2523,age#2524,info#2525,_metadata#2530] json

== Optimized Logical Plan ==
Project [name#2517, age#2518, info#2519, _metadata#2529]
+- Join Inner, ((((name#2517 = name#2523) AND (age#2518 = age#2524)) AND (info#2519 = info#2525)) AND (_metadata#2529 = _metadata#2530))
   :- Filter (((isnotnull(name#2517) AND isnotnull(age#2518)) AND isnotnull(info#2519)) AND isnotnull(_metadata#2529))
   :  +- Relation [name#2517,age#2518,info#2519,_metadata#2529] json
   +- Filter (((isnotnull(name#2523) AND isnotnull(age#2524)) AND isnotnull(info#2525)) AND isnotnull(_metadata#2530))
      +- Relation [name#2523,age#2524,info#2525,_metadata#2530] json

== Physical Plan ==
*(3) Project [name#2517, age#2518, info#2519, _metadata#2529]
+- StreamingSymmetricHashJoin [name#2517, age#2518, info#2519, _metadata#2529], [name#2523, age#2524, info#2525, _metadata#2530], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = file:/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/checkpoint_join/state, runId = b3233731-bee2-478f-9774-3322b2f88110, opId = 0, ver = 0, numPartitions = 5], 0, 0, state cleanup [ left = null, right = null ], 2
   :- Exchange hashpartitioning(name#2517, age#2518, info#2519, _metadata#2529, 5), ENSURE_REQUIREMENTS, [plan_id=2637]
   :  +- *(1) Filter (((isnotnull(name#2517) AND isnotnull(age#2518)) AND isnotnull(info#2519)) AND isnotnull(_metadata#2529))
   :     +- *(1) Project [name#2517, age#2518, info#2519, named_struct(file_path, file_path#2533, file_name, file_name#2534, file_size, file_size#2535L, file_modification_time, file_modification_time#2536) AS _metadata#2529]
   :        +- FileScan json [name#2517,age#2518,info#2519,file_path#2533,file_name#2534,file_size#2535L,file_modification_time#2536] Batched: false, DataFilters: [isnotnull(name#2517), isnotnull(age#2518), isnotnull(info#2519), isnotnull(_metadata#2529)], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b..., PartitionFilters: [], PushedFilters: [IsNotNull(name), IsNotNull(age), IsNotNull(info)], ReadSchema: struct<name:string,age:int,info:struct<id:bigint,university:string>>
   +- Exchange hashpartitioning(name#2523, age#2524, info#2525, _metadata#2530, 5), ENSURE_REQUIREMENTS, [plan_id=2642]
      +- *(2) Filter (((isnotnull(name#2523) AND isnotnull(age#2524)) AND isnotnull(info#2525)) AND isnotnull(_metadata#2530))
         +- *(2) Project [name#2523, age#2524, info#2525, named_struct(file_path, file_path#2537, file_name, file_name#2538, file_size, file_size#2539L, file_modification_time, file_modification_time#2540) AS _metadata#2530]
            +- FileScan json [name#2523,age#2524,info#2525,file_path#2537,file_name#2538,file_size#2539L,file_modification_time#2540] Batched: false, DataFilters: [isnotnull(name#2523), isnotnull(age#2524), isnotnull(info#2525), isnotnull(_metadata#2530)], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b..., PartitionFilters: [], PushedFilters: [IsNotNull(name), IsNotNull(age), IsNotNull(info)], ReadSchema: struct<name:string,age:int,info:struct<id:bigint,university:string>>

After the fix

== Parsed Logical Plan ==
WriteToMicroBatchDataSourceV1 FileSink[/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/new-streaming-data-join], d8c57232-267e-436b-ad82-4cf8b7f4849b, [checkpointLocation=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/checkpoint_join, path=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/new-streaming-data-join], Append, 0
+- Project [name#2339, age#2340, info#2341, _metadata#2345]
   +- Join Inner, ((((name#2339 = name#2504) AND (age#2340 = age#2505)) AND (info#2341 = info#2506)) AND (_metadata#2345 = _metadata#2507))
      :- Project [name#2339, age#2340, info#2341, _metadata#2345]
      :  +- Project [_metadata#2345, name#2339, age#2340, info#2341, _metadata#2345]
      :     +- Project [name#2523 AS name#2339, age#2524 AS age#2340, info#2525 AS info#2341, _metadata#2529 AS _metadata#2345]
      :        +- Relation [name#2523,age#2524,info#2525,_metadata#2529] json
      +- Project [name#2504, age#2505, info#2506, _metadata#2507]
         +- Project [_metadata#2507, name#2504, age#2505, info#2506, _metadata#2507]
            +- Project [name#2517 AS name#2504, age#2518 AS age#2505, info#2519 AS info#2506, _metadata#2530 AS _metadata#2507]
               +- Relation [name#2517,age#2518,info#2519,_metadata#2530] json

== Analyzed Logical Plan ==
name: string, age: int, info: struct<id:bigint,university:string>, _metadata: struct<file_path:string,file_name:string,file_size:bigint,file_modification_time:timestamp>
WriteToMicroBatchDataSourceV1 FileSink[/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/new-streaming-data-join], d8c57232-267e-436b-ad82-4cf8b7f4849b, [checkpointLocation=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/checkpoint_join, path=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/new-streaming-data-join], Append, 0
+- Project [name#2339, age#2340, info#2341, _metadata#2345]
   +- Join Inner, ((((name#2339 = name#2504) AND (age#2340 = age#2505)) AND (info#2341 = info#2506)) AND (_metadata#2345 = _metadata#2507))
      :- Project [name#2339, age#2340, info#2341, _metadata#2345]
      :  +- Project [_metadata#2345, name#2339, age#2340, info#2341, _metadata#2345]
      :     +- Project [name#2523 AS name#2339, age#2524 AS age#2340, info#2525 AS info#2341, _metadata#2529 AS _metadata#2345]
      :        +- Relation [name#2523,age#2524,info#2525,_metadata#2529] json
      +- Project [name#2504, age#2505, info#2506, _metadata#2507]
         +- Project [_metadata#2507, name#2504, age#2505, info#2506, _metadata#2507]
            +- Project [name#2517 AS name#2504, age#2518 AS age#2505, info#2519 AS info#2506, _metadata#2530 AS _metadata#2507]
               +- Relation [name#2517,age#2518,info#2519,_metadata#2530] json

== Optimized Logical Plan ==
Project [name#2523, age#2524, info#2525, _metadata#2529]
+- Join Inner, ((((name#2523 = name#2517) AND (age#2524 = age#2518)) AND (info#2525 = info#2519)) AND (_metadata#2529 = _metadata#2530))
   :- Filter ((isnotnull(name#2523) AND isnotnull(age#2524)) AND isnotnull(info#2525))
   :  +- Relation [name#2523,age#2524,info#2525,_metadata#2529] json
   +- Filter ((isnotnull(name#2517) AND isnotnull(age#2518)) AND isnotnull(info#2519))
      +- Relation [name#2517,age#2518,info#2519,_metadata#2530] json

== Physical Plan ==
*(3) Project [name#2523, age#2524, info#2525, _metadata#2529]
+- StreamingSymmetricHashJoin [name#2523, age#2524, info#2525, _metadata#2529], [name#2517, age#2518, info#2519, _metadata#2530], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = file:/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/checkpoint_join/state, runId = 649e748e-fc6d-42c0-9acd-babc7809c621, opId = 0, ver = 0, numPartitions = 5], 0, 0, state cleanup [ left = null, right = null ], 2
   :- Exchange hashpartitioning(name#2523, age#2524, info#2525, _metadata#2529, 5), ENSURE_REQUIREMENTS, [plan_id=2637]
   :  +- *(1) Filter ((isnotnull(name#2523) AND isnotnull(age#2524)) AND isnotnull(info#2525))
   :     +- *(1) Project [name#2523, age#2524, info#2525, knownnotnull(named_struct(file_path, file_path#2533, file_name, file_name#2534, file_size, file_size#2535L, file_modification_time, file_modification_time#2536)) AS _metadata#2529]
   :        +- FileScan json [name#2523,age#2524,info#2525,file_path#2533,file_name#2534,file_size#2535L,file_modification_time#2536] Batched: false, DataFilters: [isnotnull(name#2523), isnotnull(age#2524), isnotnull(info#2525)], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a..., PartitionFilters: [], PushedFilters: [IsNotNull(name), IsNotNull(age), IsNotNull(info)], ReadSchema: struct<name:string,age:int,info:struct<id:bigint,university:string>>
   +- ReusedExchange [name#2517, age#2518, info#2519, _metadata#2530], Exchange hashpartitioning(name#2523, age#2524, info#2525, _metadata#2529, 5), ENSURE_REQUIREMENTS, [plan_id=2637]

This is definitely an "improvement", but it also shows us the way we collect metrics with DSv1 in microbatch can be also affected by physical planning along with optimization as well. It has been a sort of fragile.

Anyway, even if this happens with DSv2, the number of input rows would have been counted once, so I'd consider this as "correct".

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I'm neutral for remaining review comments but will wait for a couple of days to see them be resolved.

@HeartSaVioR
Copy link
Contributor

I see comments are addressed. Nice!

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master/3.3.

@HeartSaVioR
Copy link
Contributor

@Yaohua628 Looks like there is a conflict on 3.3 branch. Could you please submit a new PR against 3.3? Thanks in advance!

Yaohua628 added a commit to Yaohua628/spark that referenced this pull request Nov 22, 2022
…ue consistent

In FileSourceStrategy, we add an Alias node to wrap the file metadata fields (e.g. file_name, file_size) in a NamedStruct ([here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L279)). But `CreateNamedStruct` has an override `nullable` value `false` ([here](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala#L443)), which is different from the `_metadata` struct `nullable` value `true` ([here](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala#L467)).

This PR fixes this, by changing `_metadata` column to be always not nullable. Rationale:
1. By definition, `_metadata` for file-based sources is always not null;
2. If users have already persisted this nullable `_metadata` somewhere, then it's totally fine to write non-nullable data to this nullable column.

For stateful streaming, we store the schema in the state store and [check consistency across batches](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala#L47). To avoid state schema compatibility mismatched, we should keep nullable consistent in `_metadata`.

No

New UT

Closes apache#38683 from Yaohua628/spark-41151.

Authored-by: yaohua <yaohua.zhao@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
@Yaohua628
Copy link
Contributor Author

@Yaohua628 Looks like there is a conflict on 3.3 branch. Could you please submit a new PR against 3.3? Thanks in advance!

Thanks! Please find here: #38748

HeartSaVioR pushed a commit that referenced this pull request Nov 22, 2022
…e value consistent

### What changes were proposed in this pull request?

NOTE: This PR cherry-picks #38683 to 3.3

In FileSourceStrategy, we add an Alias node to wrap the file metadata fields (e.g. file_name, file_size) in a NamedStruct ([here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L279)). But `CreateNamedStruct` has an override `nullable` value `false` ([here](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala#L443)), which is different from the `_metadata` struct `nullable` value `true` ([here](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala#L467)).

This PR fixes this, by changing `_metadata` column to be always not nullable. Rationale:
1. By definition, `_metadata` for file-based sources is always not null;
2. If users have already persisted this nullable `_metadata` somewhere, then it's totally fine to write non-nullable data to this nullable column.

### Why are the changes needed?
For stateful streaming, we store the schema in the state store and [check consistency across batches](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala#L47). To avoid state schema compatibility mismatched, we should keep nullable consistent in `_metadata`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
New UT

Closes #38748 from Yaohua628/spark-41151-3-3.

Authored-by: yaohua <yaohua.zhao@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
cloud-fan pushed a commit that referenced this pull request Dec 5, 2022
…lable value consistent

### What changes were proposed in this pull request?
A follow-up PR of #38683.

Apart from making `_metadata` struct not nullable, we should also make all fields inside of `_metadata` not nullable (`file_path`, `file_name`, `file_modification_time`, `file_size`, `row_index`).

### Why are the changes needed?
Consistent nullability behavior for everything

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
New UTs

Closes #38777 from Yaohua628/spark-41151-follow-up.

Authored-by: yaohua <yaohua.zhao@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Yaohua628 added a commit to Yaohua628/spark that referenced this pull request Dec 5, 2022
…lable value consistent

A follow-up PR of apache#38683.

Apart from making `_metadata` struct not nullable, we should also make all fields inside of `_metadata` not nullable (`file_path`, `file_name`, `file_modification_time`, `file_size`, `row_index`).

Consistent nullability behavior for everything

No

New UTs

Closes apache#38777 from Yaohua628/spark-41151-follow-up.

Authored-by: yaohua <yaohua.zhao@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
SandishKumarHN pushed a commit to SandishKumarHN/spark that referenced this pull request Dec 12, 2022
…ue consistent

### What changes were proposed in this pull request?
In FileSourceStrategy, we add an Alias node to wrap the file metadata fields (e.g. file_name, file_size) in a NamedStruct ([here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L279)). But `CreateNamedStruct` has an override `nullable` value `false` ([here](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala#L443)), which is different from the `_metadata` struct `nullable` value `true` ([here](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala#L467)).

This PR fixes this, by changing `_metadata` column to be always not nullable. Rationale:
1. By definition, `_metadata` for file-based sources is always not null;
2. If users have already persisted this nullable `_metadata` somewhere, then it's totally fine to write non-nullable data to this nullable column.

### Why are the changes needed?
For stateful streaming, we store the schema in the state store and [check consistency across batches](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala#L47). To avoid state schema compatibility mismatched, we should keep nullable consistent in `_metadata`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
New UT

Closes apache#38683 from Yaohua628/spark-41151.

Authored-by: yaohua <yaohua.zhao@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
beliefer pushed a commit to beliefer/spark that referenced this pull request Dec 15, 2022
…ue consistent

### What changes were proposed in this pull request?
In FileSourceStrategy, we add an Alias node to wrap the file metadata fields (e.g. file_name, file_size) in a NamedStruct ([here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L279)). But `CreateNamedStruct` has an override `nullable` value `false` ([here](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala#L443)), which is different from the `_metadata` struct `nullable` value `true` ([here](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala#L467)).

This PR fixes this, by changing `_metadata` column to be always not nullable. Rationale:
1. By definition, `_metadata` for file-based sources is always not null;
2. If users have already persisted this nullable `_metadata` somewhere, then it's totally fine to write non-nullable data to this nullable column.

### Why are the changes needed?
For stateful streaming, we store the schema in the state store and [check consistency across batches](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala#L47). To avoid state schema compatibility mismatched, we should keep nullable consistent in `_metadata`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
New UT

Closes apache#38683 from Yaohua628/spark-41151.

Authored-by: yaohua <yaohua.zhao@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
beliefer pushed a commit to beliefer/spark that referenced this pull request Dec 18, 2022
…ue consistent

### What changes were proposed in this pull request?
In FileSourceStrategy, we add an Alias node to wrap the file metadata fields (e.g. file_name, file_size) in a NamedStruct ([here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L279)). But `CreateNamedStruct` has an override `nullable` value `false` ([here](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala#L443)), which is different from the `_metadata` struct `nullable` value `true` ([here](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala#L467)).

This PR fixes this, by changing `_metadata` column to be always not nullable. Rationale:
1. By definition, `_metadata` for file-based sources is always not null;
2. If users have already persisted this nullable `_metadata` somewhere, then it's totally fine to write non-nullable data to this nullable column.

### Why are the changes needed?
For stateful streaming, we store the schema in the state store and [check consistency across batches](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala#L47). To avoid state schema compatibility mismatched, we should keep nullable consistent in `_metadata`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
New UT

Closes apache#38683 from Yaohua628/spark-41151.

Authored-by: yaohua <yaohua.zhao@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
beliefer pushed a commit to beliefer/spark that referenced this pull request Dec 18, 2022
…lable value consistent

### What changes were proposed in this pull request?
A follow-up PR of apache#38683.

Apart from making `_metadata` struct not nullable, we should also make all fields inside of `_metadata` not nullable (`file_path`, `file_name`, `file_modification_time`, `file_size`, `row_index`).

### Why are the changes needed?
Consistent nullability behavior for everything

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
New UTs

Closes apache#38777 from Yaohua628/spark-41151-follow-up.

Authored-by: yaohua <yaohua.zhao@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
5 participants