Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32765][SQL] EliminateJoinToEmptyRelation should respect exchange behavior when canChangeNumPartitions == false #29614

Closed
wants to merge 6 commits into from

Conversation

leanken-zz
Copy link
Contributor

What changes were proposed in this pull request?

Currently, EliminateJoinToEmptyRelation Rule will convert Join into EmptyRelation in some cases with AQE on. But if streamedSide of Join is a ShuffleQueryStage(canChangeNumPartitions == false), which means the Exchange produced by repartition Or singlePartition, in this case, if we were to convert it into an EmptyRelation, it will lost user specified number partition information for downstream operator, it's not right.

Why are the changes needed?

NumPartition info incorrect when streamedSide is a repartition plan or SinglePartition plan.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added case in AdaptiveQueryExecSuite.

…nChangeNumPartitions == false.

Change-Id: I3f528d4c2d150521cccc7f59674c9f1379d3c908
@leanken-zz
Copy link
Contributor Author

After some code investigation and manual test, found that Optimizer will put Repartition after LeftAnti & LeftSemi Join with Some rules. So we can see that in UT, LeftAnti & LeftSemi could still be converted from Join to EmptyRelation.

FYI, @cloud-fan

NotInSubQuery NAAJ streamedSide repartition

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.RewritePredicateSubquery ===
Repartition 77, true                          
 +- Project [a#218]                           
!   +- Filter NOT a#218 IN (list#230 [])      
!      :  +- Project [c#220]                  
!      :     +- LocalRelation [c#220, d#221]  
!      +- LocalRelation [a#218, b#219]        

Repartition 77, true
 +- Project [a#218]
    +- Join LeftAnti, ((a#218 = c#220) OR isnull((a#218 = c#220)))
       :- LocalRelation [a#218, b#219]
       +- Project [c#220]
          +- LocalRelation [c#220, d#221]

hand-rewritten NAAJ streamedSide repartition

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin ===
Project [a#218]                                                  
!+- Join LeftAnti, ((a#218 = c#220) OR isnull((a#218 = c#220)))  
!   :- Repartition 77, true                                      
!   :  +- Project [a#218, b#219]                                 
!   :     +- LocalRelation [a#218, b#219]                        
!   +- Project [c#220, d#221]                                    
!      +- LocalRelation [c#220, d#221]                           

Project [a#218]
 +- Repartition 77, true
    +- Project [a#218, b#219]
       +- Join LeftAnti, ((a#218 = c#220) OR isnull((a#218 = c#220)))
          :- LocalRelation [a#218, b#219]
          +- Project [c#220, d#221]
             +- LocalRelation [c#220, d#221]

LEFT SEMI streamedSide repartition

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin ===
+- Project [a#218]
!   +- Join LeftSemi, (a#218 = c#220)
!      :- Repartition 77, true
!      :  +- Project [a#218, b#219]
!      :     +- LocalRelation [a#218, b#219]
!      +- Project [c#220, d#221]
!         +- Project [c#220, d#221]
!            +- Filter (cast(d#221 as decimal(4,1)) > cast(cast(100 as decimal(3,0)) as decimal(4,1)))
!               +- LocalRelation [c#220, d#221]

+- Project [a#218]
    +- Repartition 77, true
       +- Project [a#218, b#219]
          +- Join LeftSemi, (a#218 = c#220)
             :- LocalRelation [a#218, b#219]
             +- Project [c#220, d#221]
                +- Project [c#220, d#221]
                   +- Filter (cast(d#221 as decimal(4,1)) > cast(cast(100 as decimal(3,0)) as decimal(4,1)))
                      +- LocalRelation [c#220, d#221]

@cloud-fan
Copy link
Contributor

ok to test

@cloud-fan
Copy link
Contributor

add to whitelist

@SparkQA
Copy link

SparkQA commented Sep 2, 2020

Test build #128183 has finished for PR 29614 at commit f21fd5c.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 2, 2020

Test build #128189 has finished for PR 29614 at commit b0a40fa.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@leanken-zz
Copy link
Contributor Author

retest this please

since we can't exhausting all the possible pattern for streamedSide,
if streamedSide plan tree contains any ShuffleQueryStageExec(canChangeNumPartitions== false),
considered it not valid to convert from Join to EmptyRelation.

Change-Id: Iacb3e9955af48ea0a331b31448ce28e5be4445a0
Change-Id: I3a64fed65e03965210e5a689096704c8cbbb8d9d
Change-Id: Icd235d876e9bb2b0c63bd01ecde20ba18bec97ea
@SparkQA
Copy link

SparkQA commented Sep 2, 2020

Test build #128192 has finished for PR 29614 at commit b68533a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 2, 2020

Test build #128202 has finished for PR 29614 at commit c67477a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Change-Id: I4906c4fd68815b8ba526780fbef106cb239ed62b
Change-Id: I8cace0531a151a4453c8cc8d45761ccbef5f3d22
@SparkQA
Copy link

SparkQA commented Sep 3, 2020

Test build #128209 has finished for PR 29614 at commit 82f6cc6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// If streamedSide of the Join contains ShuffleQueryStageExec(canChangeNumPartitions== false)
// it can't be rewritten to EmptyRelation because the conversion might lost user specified
// number partition information.
val immutablePartitionStageExists = streamedPlan.collect {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we use collectFirst instead?

   val immutablePartitionStageExists = streamedPlan.collectFirst {
      case LogicalQueryStage(_, physicalPlan: SparkPlan)
        if physicalPlan.collectFirst {
          case s: ShuffleQueryStageExec if !s.shuffle.canChangeNumPartitions => s
        }.nonEmpty => true
    }.isDefined

streamedPlan: LogicalPlan,
buildPlan: LogicalPlan,
relation: HashedRelation): Boolean = {
// If streamedSide of the Join contains ShuffleQueryStageExec(canChangeNumPartitions== false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: canChangeNumPartitions== false => canChangeNumPartitions == false


if (immutablePartitionStageExists) {
false
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit format:

    !immutablePartitionStageExists && {
      buildPlan match {
        case LogicalQueryStage(_, stage: BroadcastQueryStageExec)
          if stage.resultOption.get().isDefined
            && stage.broadcast.relationFuture.get().value == relation => true
        case _ => false
      }
    }

?

withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
// exclude ConvertToLocalRelation rule make it easier for Test.
SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> ConvertToLocalRelation.ruleName) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already exclude this in SharedSparkSession?

// Disable ConvertToLocalRelation for better test coverage. Test cases built on
// LocalRelation will exercise the optimization rules better by disabling it as
// this rule may potentially block testing of other optimization rules such as
// ConstantPropagation etc.
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)

@cloud-fan
Copy link
Contributor

Actually PropagateEmptyRelation doesn't respect repartition in join stream side either, maybe it's also fine in AQE.

@leanken-zz
Copy link
Contributor Author

@cloud-fan @maropu If this is no longer considered as a bug, then I will close this PR and JIRA. Is that OK?

@leanken-zz leanken-zz closed this Sep 14, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants