Skip to content

Conversation

@sarutak
Copy link
Member

@sarutak sarutak commented Jul 28, 2019

What changes were proposed in this pull request?

After some operations against Datasets and then persist them, Dataset.explain shows wrong result.
One of those operations is explain() itself.
An example here.

val df = spark.range(10)
df.explain
df.persist
df.explain

Expected result is like as follows.

== Physical Plan ==
*(1) ColumnarToRow
+- InMemoryTableScan [id#7L]
      +- InMemoryRelation [id#7L], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *(1) Range (0, 10, step=1, splits=12)

But I got this.

== Physical Plan ==
*(1) Range (0, 10, step=1, splits=12)

This issue is caused by withCachedData in QueryExecution is materialized early when explain() or such methods are called so this patch prevents it.

How was this patch tested?

Additional test cases in ExplainSuite.scala

@sarutak sarutak force-pushed the fix-cache-ignored-issue branch from 1114600 to 3b16d9a Compare July 28, 2019 18:50
@SparkQA
Copy link

SparkQA commented Jul 28, 2019

Test build #108278 has finished for PR 25280 at commit 3b16d9a.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 28, 2019

Test build #108279 has finished for PR 25280 at commit e76e2b3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Jul 29, 2019

wait, it seems the query works well in v2.4.3;

scala> df.explain
== Physical Plan ==
*(1) Range (0, 10, step=1, splits=4)

scala> df.persist
res1: df.type = [id: bigint]

scala> df.explain
== Physical Plan ==
*(1) InMemoryTableScan [id#0L]
   +- InMemoryRelation [id#0L], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(1) Range (0, 10, step=1, splits=4)
scala> 

Which commit affects the behaviour?

@cloud-fan
Copy link
Contributor

It's by design that a dataframe can't change its physical plan once the physical plan is materialized. That said, df.persist has no effect if df.explain is called before.

I think the current behavior is corrected and v2.4.3 is wrong. I think this problem is fixed by #24654

If you do wanna run the plan with cached data, maybe we can do val df2 = df.toDF() and execute df2 instead.

@cloud-fan
Copy link
Contributor

cloud-fan commented Jul 29, 2019

Actually, this does expose a problem. Before #24654, df.explain won't materialize the physical plan, but now we do. cc @viirya do you have any ideas? I think this one is hard to fix.

@sarutak
Copy link
Member Author

sarutak commented Jul 29, 2019

@cloud-fan I have already noticed #24654. The problem mentioned in that PR is that pre-analyzed logical plan was always reused and re-analyzed in explain command even though analyzed-logical plan is already materialized.

This solution consider that problem. If we have already analyzed-logical plan materialized, we use it otherwise create one.

@viirya
Copy link
Member

viirya commented Jul 29, 2019

We actually take the query execution of dataset to execute. If we have executed a dataset, so its physical plan is materialized, then persist it. In 2.4.3, although df.explain shows a cached plan, I think execution still uses physical plan without cache? This fix also has the issue?

That said, in 2.4.3, df.explain shows query plan with current status like cache, temp view, I think it doesn't really match with dataset execution.

Like:

val df = spark.range(10)
df.explain // show query plan without cache
df.collect() // execution without cache 
df.persist
df.explain // show query plan with cache
df.collect() // still execution without cache

@SparkQA
Copy link

SparkQA commented Jul 29, 2019

Test build #108331 has finished for PR 25280 at commit 98bcee4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class Plans(

@maropu
Copy link
Member

maropu commented Jul 29, 2019

We cannot just return a new Dataset in persist, btw, like this?

val df = spark.range(10)
df.explain // show query plan without cache
df.collect // execution without cache

val cachedDf = df.persist

df.explain // show query plan without cache
df.collect // execution without cache

cachedDf.explain // show query plan with cache
cachedDf.collect // execution with cache

@cloud-fan
Copy link
Contributor

@maropu this is a behavior change and can break many queries silently.

@sarutak
Copy link
Member Author

sarutak commented Jul 31, 2019

@viirya With my change, we can get following result.

val df = spark.range(10)
df.explain  // show query plan without cache
df.collect  // execution without cache
df.persist
df.explain // show query plan without cache
df.collect // execution without cache
df.queryExecution.executedPlan.find(_.isInstanceOf[InMemoryTableScanExec]) // None

After collect, persist is still ignored but this result is different from one of 2.4.3 and same for master branch.
As you mentioned df.collect and some operations materialize the executedPlan and cause this problem.

Some operations including df.show don't cause this problem because they create the new root plan at the time of execution implicitly so I wonder creating dummy root plan at the time of executing collect or similar operations resolve this type of problem?

@felixcheung
Copy link
Member

... where are we on this? this seems to be severe correctness impacting issue?

@cloud-fan
Copy link
Contributor

This is not a correctness issue. This is just an undocumented property "a dataframe can't change its physical plan once the physical plan is materialized" causing confusions.

After so many days, I have a fresh idea now: when df.cache is called, reset the cachedPlan, optimizedPlan and physical plans. We can implement the lazy evaluation manually to support it instead of relying on Scals lazy val.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while.
This isn't a judgement on the merit of the PR in any way. It's just
a way of keeping the PR queue manageable.

If you'd like to revive this PR, please reopen it!

@github-actions github-actions bot added the Stale label Dec 26, 2019
@github-actions github-actions bot closed this Dec 27, 2019
@maropu
Copy link
Member

maropu commented Dec 27, 2019

btw, we don't need to describe something about this behaviour in the migration guide if the current one is correct? @gatorsmile @cloud-fan

@HyukjinKwon
Copy link
Member

I wouldn't mind documenting it but sounds more like a bug fix given my reading, and it seems only affecting a debug API (explain). So I suspect it's fine not to document for now.

@maropu
Copy link
Member

maropu commented Dec 28, 2019

ok, thanks for the check.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants