Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-15915][SQL] Logical plans should use canonicalized plan when override sameResult. #13638

Closed
wants to merge 5 commits into from

Conversation

ueshin
Copy link
Member

@ueshin ueshin commented Jun 13, 2016

What changes were proposed in this pull request?

DataFrame with plan overriding sameResult but not using canonicalized plan to compare can't cacheTable.

The example is like:

    val localRelation = Seq(1, 2, 3).toDF()
    localRelation.createOrReplaceTempView("localRelation")

    spark.catalog.cacheTable("localRelation")
    assert(
      localRelation.queryExecution.withCachedData.collect {
        case i: InMemoryRelation => i
      }.size == 1)

and this will fail as:

ArrayBuffer() had size 0 instead of expected size 1

The reason is that when do spark.catalog.cacheTable("localRelation"), CacheManager tries to cache for the plan wrapped by SubqueryAlias but when planning for the DataFrame localRelation, CacheManager tries to find cached table for the not-wrapped plan because the plan for DataFrame localRelation is not wrapped.
Some plans like LocalRelation, LogicalRDD, etc. override sameResult method, but not use canonicalized plan to compare so the CacheManager can't detect the plans are the same.

This pr modifies them to use canonicalized plan when override sameResult method.

How was this patch tested?

Added a test to check if DataFrame with plan overriding sameResult but not using canonicalized plan to compare can cacheTable.

@SparkQA
Copy link

SparkQA commented Jun 13, 2016

Test build #60389 has finished for PR 13638 at commit 52641f4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

this looks reasonable to me, cc @marmbrus is there any corner cases?

cachedData.foreach {
case data if data.plan.collect { case p if p.sameResult(plan) => p }.nonEmpty =>
case data if data.plan.collect { case p if p.sameResult(canonicalized) => p }.nonEmpty =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is redundant, sameResult already compares the canonicalized plan.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so.
For example, if the cached plan is LocalRelation (which is canonicalized) and the plan argument is SubqueryAlias(LocalRelation) (which is not canonicalized), it will fail to find the same-result plan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the implementation of sameResult:

def sameResult(plan: PlanType): Boolean = {
    val left = this.canonicalized
    val right = plan.canonicalized
    ......

Seems we do canonicalize plans?

Copy link
Member Author

@ueshin ueshin Jun 13, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some plans like LocalRelation, LogicalRDD, etc. override the sameResult and they don't use canonicalized plan to compare.

For example, LocalRelation overrides like as follows:

  override def sameResult(plan: LogicalPlan): Boolean = plan match {
    case LocalRelation(otherOutput, otherData) =>
      otherOutput.map(_.dataType) == output.map(_.dataType) && otherData == data
    case _ => false
  }

If the plan is SubqueryAlias(LocalRelation), the method returns false.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just fix those implementations?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yes, we should for now, but we can't force to use canonicalized plan and it is difficult to check all implementations so I thought CacheManager should canonicalize plans.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I would prefer to fix in in sameResult if possible so that all uses of that function will benefit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I'll fix them, too.

@marmbrus
Copy link
Contributor

Seems reasonable. Is this a regression from 1.6?

@@ -87,7 +87,7 @@ private[sql] class CacheManager extends Logging {
query: Dataset[_],
tableName: Option[String] = None,
storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
val planToCache = query.queryExecution.analyzed
val planToCache = query.queryExecution.analyzed.canonicalized
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need these changes? LogicalPlan.canonicalized is a lazy val and we don't have performance penalty even we use un-canonicalized plan as key.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we don't need them for now.
I'll revert the changes.

@ueshin ueshin changed the title [SPARK-15915][SQL] CacheManager should use canonicalized plan for planToCache. [SPARK-15915][SQL] Logical plans should use canonicalized plan when override sameResult. Jun 14, 2016
@ueshin
Copy link
Member Author

ueshin commented Jun 14, 2016

Updated pr title, description and a test name.

@SparkQA
Copy link

SparkQA commented Jun 14, 2016

Test build #60451 has finished for PR 13638 at commit 11dc433.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member Author

ueshin commented Jun 14, 2016

@marmbrus It seems this is not a regression from 1.6.
The test modified for 1.6 also fails with the same error.

@SparkQA
Copy link

SparkQA commented Jun 14, 2016

Test build #60458 has finished for PR 13638 at commit 3d27607.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 14, 2016

Test build #60459 has finished for PR 13638 at commit 0c9ec86.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

cloud-fan commented Jun 14, 2016

LGTM, @marmbrus should we backport it to 1.6?

@marmbrus
Copy link
Contributor

Yeah, sounds reasonable. Merging to master, 2.0 and 1.6.

asfgit pushed a commit that referenced this pull request Jun 14, 2016
…verride sameResult.

## What changes were proposed in this pull request?

`DataFrame` with plan overriding `sameResult` but not using canonicalized plan to compare can't cacheTable.

The example is like:

```
    val localRelation = Seq(1, 2, 3).toDF()
    localRelation.createOrReplaceTempView("localRelation")

    spark.catalog.cacheTable("localRelation")
    assert(
      localRelation.queryExecution.withCachedData.collect {
        case i: InMemoryRelation => i
      }.size == 1)
```

and this will fail as:

```
ArrayBuffer() had size 0 instead of expected size 1
```

The reason is that when do `spark.catalog.cacheTable("localRelation")`, `CacheManager` tries to cache for the plan wrapped by `SubqueryAlias` but when planning for the DataFrame `localRelation`, `CacheManager` tries to find cached table for the not-wrapped plan because the plan for DataFrame `localRelation` is not wrapped.
Some plans like `LocalRelation`, `LogicalRDD`, etc. override `sameResult` method, but not use canonicalized plan to compare so the `CacheManager` can't detect the plans are the same.

This pr modifies them to use canonicalized plan when override `sameResult` method.

## How was this patch tested?

Added a test to check if DataFrame with plan overriding sameResult but not using canonicalized plan to compare can cacheTable.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #13638 from ueshin/issues/SPARK-15915.

(cherry picked from commit c5b7355)
Signed-off-by: Michael Armbrust <michael@databricks.com>
@marmbrus
Copy link
Contributor

Hmmm, does not apply cleanly to 1.6. @ueshin if you have time it might be nice to backport.

@asfgit asfgit closed this in c5b7355 Jun 14, 2016
@ueshin
Copy link
Member Author

ueshin commented Jun 14, 2016

@marmbrus Thank you for merging this.
Sure, I'll send a backport pr for branch-1.6.

@ueshin
Copy link
Member Author

ueshin commented Jun 14, 2016

@marmbrus I sent a backport pr #13668.
But is branch-1.6 build broken now?
Looks working on #13630.

asfgit pushed a commit that referenced this pull request Jun 15, 2016
…n when override sameResult.

## What changes were proposed in this pull request?

This pr is a backport of #13638 for `branch-1.6`.

## How was this patch tested?

Added the same test as #13638 modified for `branch-1.6`.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #13668 from ueshin/issues/SPARK-15915_1.6.
zzcclp pushed a commit to zzcclp/spark that referenced this pull request Jun 16, 2016
…n when override sameResult.

## What changes were proposed in this pull request?

This pr is a backport of apache#13638 for `branch-1.6`.

## How was this patch tested?

Added the same test as apache#13638 modified for `branch-1.6`.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes apache#13668 from ueshin/issues/SPARK-15915_1.6.

(cherry picked from commit cffc080)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants