Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6550][SQL] Add PreAnalyzer to keep logical plan consistent across DataFrame #5203

Closed
wants to merge 2 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Mar 26, 2015

Problems

In some cases, the expressions in a logical plan will be modified to new ones during analysis, e.g. the handling for self-join cases. If some expressions are resolved based on the analyzed plan, they are referring to changed expression ids, not original ids.

But the transformation of DataFrame will use logical plan to construct new DataFrame, e.g. groupBy and aggregation. So in such cases, the expressions in these DataFrames will be inconsistent.

The problems are specified as following:

  • Expression ids in logical plan are possibly inconsistent if expression ids are changed during analysis and some expressions are resolved after that

When we try to run the following codes:

val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
val df2 = df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").min("y.int")

Because groupBy and min will perform resolving based on the analyzed logical plan, their expression ids refer to analyzed plan, instead of logical plan.

So the logical plan of df2 looks like:

'Aggregate [str#5], [str#5,MIN(int#4) AS MIN(int)#6]
 'Join Inner, Some(('x.str = 'y.str))
  Subquery x
   Project [_1#0 AS int#2,_2#1 AS str#3]
    LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]
  Subquery y
   Project [_1#0 AS int#2,_2#1 AS str#3]
    LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]

As you see, the expression ids in Aggregate are different to the expression ids in Subquery y. This is the first problem.

  • The df2 can't be performed

The showing logical plan of df2 can't be performed. Because the expression ids of Subquery y will be modified for self-join handling during analysis, the analyzed plan of df2 becomes:

Aggregate [str#5], [str#5,MIN(int#4) AS MIN(int)#6]
 Join Inner, Some((str#3 = str#8))
  Subquery x
   Project [_1#0 AS int#2,_2#1 AS str#3]
    LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]
  Subquery y
   Project [_1#0 AS int#7,_2#1 AS str#8]
    LocalRelation [_1#0,_2#1], [[1,1],[2,2],[3,3]]

The expressions referred in Aggregate are not matching to these in Subquery y. This is the second problem.

Proposed solution

We try to add a PreAnalyzer. When a logical plan rawPlan is given to SQLContext, it uses PreAnalyzer to modify the logical plan before assigning to QueryExecution.logical. Then later operations will based on the pre-analyzed logical plan, instead of the original rawPlan.

@SparkQA
Copy link

SparkQA commented Mar 26, 2015

Test build #29219 has finished for PR 5203 at commit 77ba3a6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class PreAnalyzer(caseSensitive: Boolean = true,
    • protected[sql] class QueryExecution(val rawPlan: LogicalPlan)
    • protected[sql] class QueryExecution(rawPlan: LogicalPlan)

@viirya
Copy link
Member Author

viirya commented Mar 26, 2015

The test failure is caused by another commit and just fixed in #5198. Please test it again.

@SparkQA
Copy link

SparkQA commented Mar 26, 2015

Test build #29236 has finished for PR 5203 at commit a96885a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class PreAnalyzer(caseSensitive: Boolean = true,
    • protected[sql] class QueryExecution(val rawPlan: LogicalPlan)
    • protected[sql] class QueryExecution(rawPlan: LogicalPlan)

@marmbrus
Copy link
Contributor

Thanks for finding this issue! I'm hoping there is a simpler solution than adding a whole new analyzer phase though. In #5217 I propose we simply hang on to the analyze plan instead of the logical one. This doesn't add a new phase and reduces the overhead of analyzing a plan over and over again. What do you think?

@viirya
Copy link
Member Author

viirya commented Mar 27, 2015

Looks good and it can solve this. Agree that is better to use simpler way for this bug. Thanks.

@viirya
Copy link
Member Author

viirya commented Mar 27, 2015

I will close this once #5217 is merged.

asfgit pushed a commit that referenced this pull request Mar 27, 2015
This is based on bug and test case proposed by viirya.  See #5203 for a excellent description of the problem.

TLDR; The problem occurs because the function `groupBy(String)` calls `resolve`, which returns an `AttributeReference`.  However, this `AttributeReference` is based on an analyzed plan which is thrown away.  At execution time, we once again analyze the plan.  However, in the case of self-joins, each call to analyze will produce a new tree for the left side of the join, rendering the previously returned `AttributeReference` invalid.

As a fix, I propose we keep the analyzed plan instead of the unresolved plan inside of a `DataFrame`.

Author: Michael Armbrust <michael@databricks.com>

Closes #5217 from marmbrus/preanalyzer and squashes the following commits:

1f98e2d [Michael Armbrust] revert change
dd4dec1 [Michael Armbrust] Use the analyzed plan in DataFrame
089c52e [Michael Armbrust] WIP

(cherry picked from commit 5d9c37c)
Signed-off-by: Michael Armbrust <michael@databricks.com>
asfgit pushed a commit that referenced this pull request Mar 27, 2015
This is based on bug and test case proposed by viirya.  See #5203 for a excellent description of the problem.

TLDR; The problem occurs because the function `groupBy(String)` calls `resolve`, which returns an `AttributeReference`.  However, this `AttributeReference` is based on an analyzed plan which is thrown away.  At execution time, we once again analyze the plan.  However, in the case of self-joins, each call to analyze will produce a new tree for the left side of the join, rendering the previously returned `AttributeReference` invalid.

As a fix, I propose we keep the analyzed plan instead of the unresolved plan inside of a `DataFrame`.

Author: Michael Armbrust <michael@databricks.com>

Closes #5217 from marmbrus/preanalyzer and squashes the following commits:

1f98e2d [Michael Armbrust] revert change
dd4dec1 [Michael Armbrust] Use the analyzed plan in DataFrame
089c52e [Michael Armbrust] WIP
@marmbrus
Copy link
Contributor

Thanks, I've merged #5217.

@viirya viirya closed this Mar 27, 2015
@viirya viirya deleted the pre_analyze branch December 27, 2023 18:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants