Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37829][SQL] Dataframe.joinWith outer-join should return a null value for unmatched row #40755

Closed
wants to merge 8 commits into from

Conversation

kings129
Copy link
Contributor

@kings129 kings129 commented Apr 12, 2023

What changes were proposed in this pull request?

When doing an outer join with joinWith on DataFrames, unmatched rows return Row objects with null fields instead of a single null value. This is not a expected behavior, and it's a regression introduced in this commit.
This pull request aims to fix the regression, note this is not a full rollback of the commit, do not add back "schema" variable.

case class ClassData(a: String, b: Int)
val left = Seq(ClassData("a", 1), ClassData("b", 2)).toDF
val right = Seq(ClassData("x", 2), ClassData("y", 3)).toDF

left.joinWith(right, left("b") === right("b"), "left_outer").collect
Wrong results (current behavior):    Array(([a,1],[null,null]), ([b,2],[x,2]))
Correct results:                     Array(([a,1],null), ([b,2],[x,2]))

Why are the changes needed?

We need to address the regression mentioned above. It results in unexpected behavior changes in the Dataframe joinWith API between versions 2.4.8 and 3.0.0+. This could potentially cause data correctness issues for users who expect the old behavior when using Spark 3.0.0+.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added unit test (use the same test in previous closed pull request, credit to Clément de Groc)
Run sql-core and sql-catalyst submodules locally with ./build/mvn clean package -pl sql/core,sql/catalyst

@github-actions github-actions bot added the SQL label Apr 12, 2023
@hvanhovell
Copy link
Contributor

cc @zhenlineo since you are working on this on the connect side.

@kings129
Copy link
Contributor Author

cc @cloud-fan @viirya for review, thanks!


new ExpressionEncoder[Any](
nullSafe(newSerializerInput, newSerializer),
nullSafe(newDeserializerInput, newDeserializer),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's kind of we push down the null check to the children deserializers. Why is the serializer fine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is intended to create a deserializer type newinstance(class scala.Tuple*) that can convert to a single null value. This behavior is the same as before the commit introduced the regression.
Regarding the serializer, in the new unit test added in this pull request, when the tuple is not null, named_struct is created for each element, and null is handled there.

if (isnull(input[0, scala.Tuple2, true])) null else named_struct(_1, if (isnull(input[0, scala.Tuple2, true]._1)) null else named_struct(a, if (input[0, scala.Tuple2, true]._1.isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(input[0, scala.Tuple2, true]._1, 0, a), StringType, ObjectType(class java.lang.String)), true, false, true), b, assertnotnull(validateexternaltype(getexternalrowfield(input[0, scala.Tuple2, true]._1, 1, b), IntegerType, ObjectType(class java.lang.Integer)).intValue)) AS _1#18, _2, if (isnull(input[0, scala.Tuple2, true]._2)) null else named_struct(a, if (input[0, scala.Tuple2, true]._2.isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(input[0, scala.Tuple2, true]._2, 0, a), StringType, ObjectType(class java.lang.String)), true, false, true), b, assertnotnull(validateexternaltype(getexternalrowfield(input[0, scala.Tuple2, true]._2, 1, b), IntegerType, ObjectType(class java.lang.Integer)).intValue)) AS _2#19)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan does my comment answer your question? PTAL, thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks correct to me to add the null check for the children deserializers. But I don't quite understand why this PR removes the outermost null check. After looking at the code, I think it doesn't matter, as the outermost null check will be removed anyway: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala#L274

Since this is unrelated to this PR, let's not touch it. If you do want to fix it (adding null check and removing it later is useless), let's fix the serializer as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, thanks for the explanation! You're right; it doesn't matter whether to keep the outermost null check. (null check for deserializer was also added in refactor commit)

I also prefer making minimal changes to fix the target issue. I added back the outermost null check for the deserializer.

@cloud-fan cloud-fan closed this in 74ce620 Apr 19, 2023
cloud-fan pushed a commit that referenced this pull request Apr 19, 2023
… value for unmatched row

### What changes were proposed in this pull request?

When doing an outer join with joinWith on DataFrames, unmatched rows return Row objects with null fields instead of a single null value. This is not a expected behavior, and it's a regression introduced in [this commit](cd92f25).
This pull request aims to fix the regression, note this is not a full rollback of the commit, do not add back "schema" variable.

```
case class ClassData(a: String, b: Int)
val left = Seq(ClassData("a", 1), ClassData("b", 2)).toDF
val right = Seq(ClassData("x", 2), ClassData("y", 3)).toDF

left.joinWith(right, left("b") === right("b"), "left_outer").collect
```

```
Wrong results (current behavior):    Array(([a,1],[null,null]), ([b,2],[x,2]))
Correct results:                     Array(([a,1],null), ([b,2],[x,2]))
```

### Why are the changes needed?

We need to address the regression mentioned above. It results in unexpected behavior changes in the Dataframe joinWith API between versions 2.4.8 and 3.0.0+. This could potentially cause data correctness issues for users who expect the old behavior when using Spark 3.0.0+.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added unit test (use the same test in previous [closed pull request](#35140), credit to Clément de Groc)
Run sql-core and sql-catalyst submodules locally with ./build/mvn clean package -pl sql/core,sql/catalyst

Closes #40755 from kings129/encoder_bug_fix.

Authored-by: --global <xuqiang129@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 74ce620)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan
Copy link
Contributor

thanks, merging to master/3.4!

@cloud-fan
Copy link
Contributor

@kings129 can you open a new PR for branch 3.3? Thanks!

kings129 added a commit to kings129/spark that referenced this pull request Apr 19, 2023
… value for unmatched row

### What changes were proposed in this pull request?

When doing an outer join with joinWith on DataFrames, unmatched rows return Row objects with null fields instead of a single null value. This is not a expected behavior, and it's a regression introduced in [this commit](apache@cd92f25).
This pull request aims to fix the regression, note this is not a full rollback of the commit, do not add back "schema" variable.

```
case class ClassData(a: String, b: Int)
val left = Seq(ClassData("a", 1), ClassData("b", 2)).toDF
val right = Seq(ClassData("x", 2), ClassData("y", 3)).toDF

left.joinWith(right, left("b") === right("b"), "left_outer").collect
```

```
Wrong results (current behavior):    Array(([a,1],[null,null]), ([b,2],[x,2]))
Correct results:                     Array(([a,1],null), ([b,2],[x,2]))
```

### Why are the changes needed?

We need to address the regression mentioned above. It results in unexpected behavior changes in the Dataframe joinWith API between versions 2.4.8 and 3.0.0+. This could potentially cause data correctness issues for users who expect the old behavior when using Spark 3.0.0+.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added unit test (use the same test in previous [closed pull request](apache#35140), credit to Clément de Groc)
Run sql-core and sql-catalyst submodules locally with ./build/mvn clean package -pl sql/core,sql/catalyst

Closes apache#40755 from kings129/encoder_bug_fix.

Authored-by: --global <xuqiang129@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@kings129
Copy link
Contributor Author

@kings129 can you open a new PR for branch 3.3? Thanks!

Thanks for the quick review, @cloud-fan!
Yes, here is the pull request for branch 3.3: #40858

dongjoon-hyun pushed a commit that referenced this pull request May 7, 2023
… null value for unmatched row

### What changes were proposed in this pull request?

This is a pull request to port the fix from the master branch to version 3.3. [PR](#40755)

When doing an outer join with joinWith on DataFrames, unmatched rows return Row objects with null fields instead of a single null value. This is not a expected behavior, and it's a regression introduced in [this commit](cd92f25). This pull request aims to fix the regression, note this is not a full rollback of the commit, do not add back "schema" variable.

```
case class ClassData(a: String, b: Int)
val left = Seq(ClassData("a", 1), ClassData("b", 2)).toDF
val right = Seq(ClassData("x", 2), ClassData("y", 3)).toDF

left.joinWith(right, left("b") === right("b"), "left_outer").collect
```

```
Wrong results (current behavior):    Array(([a,1],[null,null]), ([b,2],[x,2]))
Correct results:                     Array(([a,1],null), ([b,2],[x,2]))
```

### Why are the changes needed?

We need to address the regression mentioned above. It results in unexpected behavior changes in the Dataframe joinWith API between versions 2.4.8 and 3.0.0+. This could potentially cause data correctness issues for users who expect the old behavior when using Spark 3.0.0+.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added unit test (use the same test in previous [closed pull request](#35140), credit to Clément de Groc) Run sql-core and sql-catalyst submodules locally with ./build/mvn clean package -pl sql/core,sql/catalyst

Closes #40755 from kings129/encoder_bug_fix.

Authored-by: --global <xuqiang129gmail.com>

Closes #40858 from kings129/fix_encoder_branch_33.

Authored-by: --global <xuqiang129@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
snmvaughan pushed a commit to snmvaughan/spark that referenced this pull request Jun 20, 2023
… value for unmatched row

### What changes were proposed in this pull request?

When doing an outer join with joinWith on DataFrames, unmatched rows return Row objects with null fields instead of a single null value. This is not a expected behavior, and it's a regression introduced in [this commit](apache@cd92f25).
This pull request aims to fix the regression, note this is not a full rollback of the commit, do not add back "schema" variable.

```
case class ClassData(a: String, b: Int)
val left = Seq(ClassData("a", 1), ClassData("b", 2)).toDF
val right = Seq(ClassData("x", 2), ClassData("y", 3)).toDF

left.joinWith(right, left("b") === right("b"), "left_outer").collect
```

```
Wrong results (current behavior):    Array(([a,1],[null,null]), ([b,2],[x,2]))
Correct results:                     Array(([a,1],null), ([b,2],[x,2]))
```

### Why are the changes needed?

We need to address the regression mentioned above. It results in unexpected behavior changes in the Dataframe joinWith API between versions 2.4.8 and 3.0.0+. This could potentially cause data correctness issues for users who expect the old behavior when using Spark 3.0.0+.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added unit test (use the same test in previous [closed pull request](apache#35140), credit to Clément de Groc)
Run sql-core and sql-catalyst submodules locally with ./build/mvn clean package -pl sql/core,sql/catalyst

Closes apache#40755 from kings129/encoder_bug_fix.

Authored-by: --global <xuqiang129@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 74ce620)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
GladwinLee pushed a commit to lyft/spark that referenced this pull request Oct 10, 2023
… value for unmatched row

### What changes were proposed in this pull request?

When doing an outer join with joinWith on DataFrames, unmatched rows return Row objects with null fields instead of a single null value. This is not a expected behavior, and it's a regression introduced in [this commit](apache@cd92f25).
This pull request aims to fix the regression, note this is not a full rollback of the commit, do not add back "schema" variable.

```
case class ClassData(a: String, b: Int)
val left = Seq(ClassData("a", 1), ClassData("b", 2)).toDF
val right = Seq(ClassData("x", 2), ClassData("y", 3)).toDF

left.joinWith(right, left("b") === right("b"), "left_outer").collect
```

```
Wrong results (current behavior):    Array(([a,1],[null,null]), ([b,2],[x,2]))
Correct results:                     Array(([a,1],null), ([b,2],[x,2]))
```

### Why are the changes needed?

We need to address the regression mentioned above. It results in unexpected behavior changes in the Dataframe joinWith API between versions 2.4.8 and 3.0.0+. This could potentially cause data correctness issues for users who expect the old behavior when using Spark 3.0.0+.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added unit test (use the same test in previous [closed pull request](apache#35140), credit to Clément de Groc)
Run sql-core and sql-catalyst submodules locally with ./build/mvn clean package -pl sql/core,sql/catalyst

Closes apache#40755 from kings129/encoder_bug_fix.

Authored-by: --global <xuqiang129@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 74ce620)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
catalinii pushed a commit to lyft/spark that referenced this pull request Oct 10, 2023
… value for unmatched row

### What changes were proposed in this pull request?

When doing an outer join with joinWith on DataFrames, unmatched rows return Row objects with null fields instead of a single null value. This is not a expected behavior, and it's a regression introduced in [this commit](apache@cd92f25).
This pull request aims to fix the regression, note this is not a full rollback of the commit, do not add back "schema" variable.

```
case class ClassData(a: String, b: Int)
val left = Seq(ClassData("a", 1), ClassData("b", 2)).toDF
val right = Seq(ClassData("x", 2), ClassData("y", 3)).toDF

left.joinWith(right, left("b") === right("b"), "left_outer").collect
```

```
Wrong results (current behavior):    Array(([a,1],[null,null]), ([b,2],[x,2]))
Correct results:                     Array(([a,1],null), ([b,2],[x,2]))
```

### Why are the changes needed?

We need to address the regression mentioned above. It results in unexpected behavior changes in the Dataframe joinWith API between versions 2.4.8 and 3.0.0+. This could potentially cause data correctness issues for users who expect the old behavior when using Spark 3.0.0+.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added unit test (use the same test in previous [closed pull request](apache#35140), credit to Clément de Groc)
Run sql-core and sql-catalyst submodules locally with ./build/mvn clean package -pl sql/core,sql/catalyst

Closes apache#40755 from kings129/encoder_bug_fix.

Authored-by: --global <xuqiang129@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 74ce620)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Mar 14, 2024
## What changes were proposed in this pull request?

#40755  adds a null check on the input of the child deserializer in the tuple encoder. It breaks the deserializer for the `Option` type, because null should be deserialized into `None` rather than null. This PR adds a boolean parameter to `ExpressionEncoder.tuple` so that only the user that #40755 intended to fix has this null check.

## How was this patch tested?

Unit test.

Closes #45508 from chenhao-db/SPARK-47385.

Authored-by: Chenhao Li <chenhao.li@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Mar 14, 2024
## What changes were proposed in this pull request?

#40755  adds a null check on the input of the child deserializer in the tuple encoder. It breaks the deserializer for the `Option` type, because null should be deserialized into `None` rather than null. This PR adds a boolean parameter to `ExpressionEncoder.tuple` so that only the user that #40755 intended to fix has this null check.

## How was this patch tested?

Unit test.

Closes #45508 from chenhao-db/SPARK-47385.

Authored-by: Chenhao Li <chenhao.li@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 9986462)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Mar 14, 2024
#40755  adds a null check on the input of the child deserializer in the tuple encoder. It breaks the deserializer for the `Option` type, because null should be deserialized into `None` rather than null. This PR adds a boolean parameter to `ExpressionEncoder.tuple` so that only the user that #40755 intended to fix has this null check.

Unit test.

Closes #45508 from chenhao-db/SPARK-47385.

Authored-by: Chenhao Li <chenhao.li@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 9986462)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan pushed a commit that referenced this pull request Mar 14, 2024
#40755  adds a null check on the input of the child deserializer in the tuple encoder. It breaks the deserializer for the `Option` type, because null should be deserialized into `None` rather than null. This PR adds a boolean parameter to `ExpressionEncoder.tuple` so that only the user that #40755 intended to fix has this null check.

Unit test.

Closes #45508 from chenhao-db/SPARK-47385.

Authored-by: Chenhao Li <chenhao.li@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 9986462)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
sweisdb pushed a commit to sweisdb/spark that referenced this pull request Apr 1, 2024
## What changes were proposed in this pull request?

apache#40755  adds a null check on the input of the child deserializer in the tuple encoder. It breaks the deserializer for the `Option` type, because null should be deserialized into `None` rather than null. This PR adds a boolean parameter to `ExpressionEncoder.tuple` so that only the user that apache#40755 intended to fix has this null check.

## How was this patch tested?

Unit test.

Closes apache#45508 from chenhao-db/SPARK-47385.

Authored-by: Chenhao Li <chenhao.li@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants