Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21441][SQL]Incorrect Codegen in SortMergeJoinExec results failures in some cases #18656

Closed
wants to merge 6 commits into from
Closed

[SPARK-21441][SQL]Incorrect Codegen in SortMergeJoinExec results failures in some cases #18656

wants to merge 6 commits into from

Conversation

DonnyZone
Copy link
Contributor

@DonnyZone DonnyZone commented Jul 17, 2017

What changes were proposed in this pull request?

https://issues.apache.org/jira/projects/SPARK/issues/SPARK-21441

This issue can be reproduced by the following example:

val spark = SparkSession
   .builder()
   .appName("smj-codegen")
   .master("local")
   .config("spark.sql.autoBroadcastJoinThreshold", "1")
   .getOrCreate()
val df1 = spark.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("key", "int")
val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"), (3, "3"))).toDF("key", "str")
val df = df1.join(df2, df1("key") === df2("key"))
   .filter("int = 2 or reflect('java.lang.Integer', 'valueOf', str) = 1")
   .select("int")
   df.show()

To conclude, the issue happens when:
(1) SortMergeJoin condition contains CodegenFallback expressions.
(2) In PhysicalPlan tree, SortMergeJoin node is the child of root node, e.g., the Project in above example.

This patch fixes the logic in CollapseCodegenStages rule.

How was this patch tested?

Unit test and manual verification in our cluster.

@DonnyZone
Copy link
Contributor Author

Hi, @cloud-fan, @vanzin , could you help to take a look?

@viirya
Copy link
Member

viirya commented Jul 18, 2017

Will CodegenFallback be used in wholestage codegen? I think it's not supported.

@DonnyZone
Copy link
Contributor Author

Yeah, CodegenFallback just provide a fallback mode.
However, in such case, SortMergeJoinExec passes incomplete row as input to hiveUDF that implements CodegenFallback.

@viirya
Copy link
Member

viirya commented Jul 18, 2017

No. I meant if there's a CodegenFallback expression, wholestage codegen should not be enabled.

@DonnyZone
Copy link
Contributor Author

That's interesting, I will take a look at why the codegen is enabled

@DonnyZone
Copy link
Contributor Author

I notice that the CollapseCodegenStages rule will still enable codegen for SortMergeJoinExec without checking CodegenFallback expressions.

The logic in insertInputAdapter seems to skip validating SortMergeJoinExec.

Actually, I'am not familiar with this part, please correct me if I get something wrong

private def insertInputAdapter(plan: SparkPlan): SparkPlan = plan match {
    case j @ SortMergeJoinExec(_, _, _, _, left, right) if j.supportCodegen =>
      // The children of SortMergeJoin should do codegen separately.
      j.copy(left = InputAdapter(insertWholeStageCodegen(left)),
        right = InputAdapter(insertWholeStageCodegen(right)))
      ......

@viirya
Copy link
Member

viirya commented Jul 18, 2017

I think the check for SortMergeJoinExec in insertInputAdapter should be corrected to:

private def insertInputAdapter(plan: SparkPlan): SparkPlan = plan match {
  case p if !supportCodegen(p) =>
    // collapse them recursively
    InputAdapter(insertWholeStageCodegen(p))
  case j @ SortMergeJoinExec(_, _, _, _, left, right) =>
    // The children of SortMergeJoin should do codegen separately.
    j.copy(left = InputAdapter(insertWholeStageCodegen(left)),
      right = InputAdapter(insertWholeStageCodegen(right)))
  case p =>
    p.withNewChildren(p.children.map(insertInputAdapter))
}

Can you try it? Thanks.

@DonnyZone
Copy link
Contributor Author

Great! I'm also considering to disable codegen for SortMergeJoinExec with CodegenFallback expressions.
Thanks for your advise. I will work on it and validate in our environment.

Moreover, I just wonder whether the current pattern oder in insertInputAdapter is specifically designed to generate code for SortMergeJoinExec in all cases.

Could you give any ideas? @davies

@DonnyZone
Copy link
Contributor Author

I have validated both cases with and without CodegenFallback expressions for SortMergeJoinExec.
The fix works well.

@@ -489,13 +489,13 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] {
* Inserts an InputAdapter on top of those that do not support codegen.
*/
private def insertInputAdapter(plan: SparkPlan): SparkPlan = plan match {
case p if !supportCodegen(p) =>
// collapse them recursively
InputAdapter(insertWholeStageCodegen(p))
case j @ SortMergeJoinExec(_, _, _, _, left, right) if j.supportCodegen =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous pattern case already validates j.supportCodegen, we don't need to verify it again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SortMergeJoinExec.supportCodegen checks whether joinType.isInstanceOf[InnerLike]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Therefore, I think we should still verify it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

supportCodegen will call CodegenSupport.supportCodegen, so SortMergeJoinExec.supportCodegen is called then.

@viirya
Copy link
Member

viirya commented Jul 19, 2017

Btw, can you also add a test for this? Thanks.

@viirya
Copy link
Member

viirya commented Jul 19, 2017

And please also add SQL tag to the PR title, e.g., [SPARK-21441][SQL]. Thanks.

@DonnyZone DonnyZone changed the title [SPARK-21441]Incorrect Codegen in SortMergeJoinExec results failures in some cases [SPARK-21441][SQL]Incorrect Codegen in SortMergeJoinExec results failures in some cases Jul 19, 2017
@DonnyZone
Copy link
Contributor Author

Thanks for reviewing, I will add a test later.

@viirya
Copy link
Member

viirya commented Jul 19, 2017

@cloud-fan Can you help trigger the jenkins test for this? Thanks.

@cloud-fan
Copy link
Contributor

ok to test

@cloud-fan
Copy link
Contributor

LGTM, can you update the PR description?

@SparkQA
Copy link

SparkQA commented Jul 19, 2017

Test build #79738 has finished for PR 18656 at commit 1161ffd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Jul 19, 2017

LGTM for the code change. But I think we're better to have a test for this.

@SparkQA
Copy link

SparkQA commented Jul 19, 2017

Test build #79749 has finished for PR 18656 at commit e2d8cef.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 19, 2017

Test build #79751 has finished for PR 18656 at commit b53bf1c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in 6b6dd68 Jul 19, 2017
asfgit pushed a commit that referenced this pull request Jul 19, 2017
…lures in some cases

## What changes were proposed in this pull request?

https://issues.apache.org/jira/projects/SPARK/issues/SPARK-21441

This issue can be reproduced by the following example:

```
val spark = SparkSession
   .builder()
   .appName("smj-codegen")
   .master("local")
   .config("spark.sql.autoBroadcastJoinThreshold", "1")
   .getOrCreate()
val df1 = spark.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("key", "int")
val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"), (3, "3"))).toDF("key", "str")
val df = df1.join(df2, df1("key") === df2("key"))
   .filter("int = 2 or reflect('java.lang.Integer', 'valueOf', str) = 1")
   .select("int")
   df.show()
```

To conclude, the issue happens when:
(1) SortMergeJoin condition contains CodegenFallback expressions.
(2) In PhysicalPlan tree, SortMergeJoin node  is the child of root node, e.g., the Project in above example.

This patch fixes the logic in `CollapseCodegenStages` rule.

## How was this patch tested?
Unit test and manual verification in our cluster.

Author: donnyzone <wellfengzhu@gmail.com>

Closes #18656 from DonnyZone/Fix_SortMergeJoinExec.

(cherry picked from commit 6b6dd68)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan
Copy link
Contributor

thanks, merging to master/2.2/2.1!

asfgit pushed a commit that referenced this pull request Jul 19, 2017
…lures in some cases

## What changes were proposed in this pull request?

https://issues.apache.org/jira/projects/SPARK/issues/SPARK-21441

This issue can be reproduced by the following example:

```
val spark = SparkSession
   .builder()
   .appName("smj-codegen")
   .master("local")
   .config("spark.sql.autoBroadcastJoinThreshold", "1")
   .getOrCreate()
val df1 = spark.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("key", "int")
val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"), (3, "3"))).toDF("key", "str")
val df = df1.join(df2, df1("key") === df2("key"))
   .filter("int = 2 or reflect('java.lang.Integer', 'valueOf', str) = 1")
   .select("int")
   df.show()
```

To conclude, the issue happens when:
(1) SortMergeJoin condition contains CodegenFallback expressions.
(2) In PhysicalPlan tree, SortMergeJoin node  is the child of root node, e.g., the Project in above example.

This patch fixes the logic in `CollapseCodegenStages` rule.

## How was this patch tested?
Unit test and manual verification in our cluster.

Author: donnyzone <wellfengzhu@gmail.com>

Closes #18656 from DonnyZone/Fix_SortMergeJoinExec.

(cherry picked from commit 6b6dd68)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@DonnyZone DonnyZone deleted the Fix_SortMergeJoinExec branch July 25, 2017 06:23
MatthewRBruce pushed a commit to Shopify/spark that referenced this pull request Jul 31, 2018
…lures in some cases

## What changes were proposed in this pull request?

https://issues.apache.org/jira/projects/SPARK/issues/SPARK-21441

This issue can be reproduced by the following example:

```
val spark = SparkSession
   .builder()
   .appName("smj-codegen")
   .master("local")
   .config("spark.sql.autoBroadcastJoinThreshold", "1")
   .getOrCreate()
val df1 = spark.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("key", "int")
val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"), (3, "3"))).toDF("key", "str")
val df = df1.join(df2, df1("key") === df2("key"))
   .filter("int = 2 or reflect('java.lang.Integer', 'valueOf', str) = 1")
   .select("int")
   df.show()
```

To conclude, the issue happens when:
(1) SortMergeJoin condition contains CodegenFallback expressions.
(2) In PhysicalPlan tree, SortMergeJoin node  is the child of root node, e.g., the Project in above example.

This patch fixes the logic in `CollapseCodegenStages` rule.

## How was this patch tested?
Unit test and manual verification in our cluster.

Author: donnyzone <wellfengzhu@gmail.com>

Closes apache#18656 from DonnyZone/Fix_SortMergeJoinExec.

(cherry picked from commit 6b6dd68)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Aug 20, 2018
…lures in some cases

https://issues.apache.org/jira/projects/SPARK/issues/SPARK-21441

This issue can be reproduced by the following example:

```
val spark = SparkSession
   .builder()
   .appName("smj-codegen")
   .master("local")
   .config("spark.sql.autoBroadcastJoinThreshold", "1")
   .getOrCreate()
val df1 = spark.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("key", "int")
val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"), (3, "3"))).toDF("key", "str")
val df = df1.join(df2, df1("key") === df2("key"))
   .filter("int = 2 or reflect('java.lang.Integer', 'valueOf', str) = 1")
   .select("int")
   df.show()
```

To conclude, the issue happens when:
(1) SortMergeJoin condition contains CodegenFallback expressions.
(2) In PhysicalPlan tree, SortMergeJoin node  is the child of root node, e.g., the Project in above example.

This patch fixes the logic in `CollapseCodegenStages` rule.

Unit test and manual verification in our cluster.

Author: donnyzone <wellfengzhu@gmail.com>

Closes apache#18656 from DonnyZone/Fix_SortMergeJoinExec.

(cherry picked from commit 6b6dd68)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants