Skip to content

[SPARK-16360][SQL] Speed up SQL query performance by removing redundant executePlan call#14044

Closed
dongjoon-hyun wants to merge 2 commits intoapache:masterfrom
dongjoon-hyun:SPARK-16360
Closed

[SPARK-16360][SQL] Speed up SQL query performance by removing redundant executePlan call#14044
dongjoon-hyun wants to merge 2 commits intoapache:masterfrom
dongjoon-hyun:SPARK-16360

Conversation

@dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented Jul 4, 2016

What changes were proposed in this pull request?

Currently, there are a few reports about Spark 2.0 query performance regression for large queries.

This PR speeds up SQL query processing performance by removing redundant consecutive executePlan call in Dataset.ofRows function and Dataset instantiation. Specifically, this PR aims to reduce the overhead of SQL query execution plan generation, not real query execution. So, we can not see the result in the Spark Web UI. Please use the following query script. The result is 25.78 sec -> 12.36 sec as expected.

Sample Query

val n = 4000
val values = (1 to n).map(_.toString).mkString(", ")
val columns = (1 to n).map("column" + _).mkString(", ")
val query =
  s"""
     |SELECT $columns
     |FROM VALUES ($values) T($columns)
     |WHERE 1=2 AND 1 IN ($columns)
     |GROUP BY $columns
     |ORDER BY $columns
     |""".stripMargin

def time[R](block: => R): R = {
  val t0 = System.nanoTime()
  val result = block
  println("Elapsed time: " + ((System.nanoTime - t0) / 1e9) + "s")
  result
}

Before

scala> time(sql(query))
Elapsed time: 30.138142577s  // First query has a little overhead of initialization.
res0: org.apache.spark.sql.DataFrame = [column1: int, column2: int ... 3998 more fields]
scala> time(sql(query))
Elapsed time: 25.787751452s  // Let's compare this one.
res1: org.apache.spark.sql.DataFrame = [column1: int, column2: int ... 3998 more fields]

After

scala> time(sql(query))
Elapsed time: 17.500279659s  // First query has a little overhead of initialization.
res0: org.apache.spark.sql.DataFrame = [column1: int, column2: int ... 3998 more fields]
scala> time(sql(query))
Elapsed time: 12.364812255s  // This shows the real difference. The speed up is about 2 times.
res1: org.apache.spark.sql.DataFrame = [column1: int, column2: int ... 3998 more fields]

How was this patch tested?

Manual by the above script.

val qe = sparkSession.sessionState.executePlan(logicalPlan)
qe.assertAnalyzed()
new Dataset[Row](sparkSession, logicalPlan, RowEncoder(qe.analyzed.schema))
new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema), skipAnalysis = true)
Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jul 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are two optimizations.

  • By using qe, sparkSession.sessionState.executePlan(logicalPlan) is not called again.
  • By using skipAnalysis = true, qe.assertAnalyzed() is not called again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about we remove the qe.assertAnalyzed() in ofRows? Then we don't need the skipAnalysis flag.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jul 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review, @cloud-fan .
It is used due to RowEncoder(qe.analyzed.schema), isn't it?

Copy link
Contributor

@cloud-fan cloud-fan Jul 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we test how much we can speed up by avoiding the duplicated check analysis? I think it's necessary to avoid duplicated analysis, but seems check analysis is not a big deal? e.g. let's remove the skipAnalysis flag and run it again

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I wrote the result in the PR description. Is it not what you mean?

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jul 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I misunderstood your point.
You mean 1) changing logicalPlan , but 2) skipAnalysis = false.
Okay. I'll report soon.

@dongjoon-hyun
Copy link
Member Author

Hi, @liancheng and @rxin .
Could you review this PR?
This code path occurs during Dataset/Dataframe merging.

@dongjoon-hyun
Copy link
Member Author

cc @cloud-fan , too.

@SparkQA
Copy link

SparkQA commented Jul 4, 2016

Test build #61714 has finished for PR 14044 at commit 1402a9d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@naliazheli
Copy link

LGTM

@dongjoon-hyun
Copy link
Member Author

Thank you for review, @naliazheli .

@hvanhovell
Copy link
Contributor

Any idea what causes the regression? 5 seconds seems way too long for analysis...

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Jul 4, 2016

Thank you for review, @hvanhovell . BTW, it's over 12 seconds for one single analysis.

Elapsed time: 25.787751452s --> Elapsed time: 12.364812255s.

The reason why I executed time(sql(query)) two times is that SQL parser and other overhead exists in the first run.

@hvanhovell
Copy link
Contributor

@dongjoon-hyun my point is that analysis should not be taking 12 seconds at all. You can see how much time is spent in a rule, if you add the following lines of code to your example:

import org.apache.spark.sql.catalyst.rules.RuleExecutor
println(RuleExecutor.dumpTimeSpent)

This yields the following result (timing in ns):

org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences                 18784486408
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggregateFunctions         505619796
org.apache.spark.sql.catalyst.analysis.TypeCoercion$PropagateTypes                195027905
org.apache.spark.sql.catalyst.analysis.Analyzer$FixNullability                    118882430
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveMissingReferences          74401505
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics          40068476
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveDeserializer               32929965
org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator                  30524660
org.apache.spark.sql.catalyst.analysis.TypeCoercion$ImplicitTypeCasts             30453770
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions                  28383135
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowFrame                26168955
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowOrder                25736499
org.apache.spark.sql.catalyst.analysis.TimeWindowing                              24807670
org.apache.spark.sql.catalyst.analysis.DecimalPrecision                           24000260
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveSubquery                   21653219
org.apache.spark.sql.catalyst.analysis.TypeCoercion$InConversion                  20830229
org.apache.spark.sql.catalyst.analysis.TypeCoercion$PromoteStrings                19183636
org.apache.spark.sql.catalyst.analysis.TypeCoercion$FunctionArgumentConversion    17849664
org.apache.spark.sql.catalyst.analysis.TypeCoercion$BooleanEquality               15186886
org.apache.spark.sql.catalyst.analysis.TypeCoercion$IfCoercion                    13994296
org.apache.spark.sql.catalyst.analysis.TypeCoercion$Division                      13929023
org.apache.spark.sql.catalyst.analysis.TypeCoercion$DateTimeOperations            13468710
org.apache.spark.sql.catalyst.analysis.CleanupAliases                             13210810
org.apache.spark.sql.catalyst.analysis.TypeCoercion$StringToIntegralCasts         13191046
org.apache.spark.sql.catalyst.analysis.Analyzer$PullOutNondeterministic           11310837
org.apache.spark.sql.catalyst.analysis.Analyzer$HandleNullInputsForUDF            10712897
org.apache.spark.sql.catalyst.analysis.TypeCoercion$CaseWhenCoercion              10589030
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAliases                    7172334
org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions          5994564
org.apache.spark.sql.catalyst.analysis.Analyzer$CTESubstitution                   5914136
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveOrdinalInOrderByAndGroupBy 5303578
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin        4060244
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolvePivot                      3174805
org.apache.spark.sql.catalyst.analysis.EliminateUnions                            2787433
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGenerate                   2731683
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations                  2624228
org.apache.spark.sql.catalyst.analysis.Analyzer$GlobalAggregates                  2417768
org.apache.spark.sql.catalyst.analysis.Analyzer$WindowsSubstitution               2368503
org.apache.spark.sql.execution.datasources.PreprocessTableInsertion               2126155
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNewInstance                2059795
org.apache.spark.sql.execution.datasources.DataSourceAnalysis                     1944978
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast                     1912039
org.apache.spark.sql.execution.datasources.ResolveDataSource                      1896232
org.apache.spark.sql.catalyst.analysis.TypeCoercion$WidenSetOperationTypes        1623414
org.apache.spark.sql.execution.datasources.FindDataSourceTable                    1623004

I think we should take a look at ResolveReferences. I do think your PR has merit; we really shouldn't be analyzing the same plan twice.

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Jul 4, 2016

Oh, I see. And, thank you for advice of dumpTimeSpent. I didn't look at in that way.
In these days, I'm trying to investigate large query situations.
This analysis is very helpful for me. Thank you so much, @hvanhovell .

@dongjoon-hyun
Copy link
Member Author

Interesting result. We definitely need to take a look at ResolveReferences-related stuff.

@hvanhovell
Copy link
Contributor

hvanhovell commented Jul 4, 2016

ResolveReferences delegates to LogicalPlan.resolve(...) which uses a linear search to resolve columns. This is pretty bad if you are trying to lookup 4000 columns 4 times (filter, project, aggregate, sort): 4000 * (4000 / 2) * 4 = 32.000.000 lookups.

@dongjoon-hyun
Copy link
Member Author

Yep. I agree.
Could you make a PR for that? I think we also have some optimization points about that.

@liancheng
Copy link
Contributor

Agree with @hvanhovell. Analysis should never take so long a time for such a simple query. We should avoid duplicated analysis work, but fixing performance issue(s) within the analyzer seems to be more resultful.

@dongjoon-hyun
Copy link
Member Author

Thank you for review, @liancheng .
I'm sure that the performance of Analyzer need to be improved. But, in any cases, the cost of analyzer cannot be zero.
We should skip the redundant analysis. IMO, that idea sounds orthogonal to this PR. So, I asked @hvanhovell to make a PR for that.

@dongjoon-hyun
Copy link
Member Author

Hi, @cloud-fan , @hvanhovell , @liancheng .

According to @cloud-fan 's advice, after changing the following, it turns out that the difference is not noticeable.

-    new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema), skipAnalysis = true)
+    new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))

Exactly as you guys told, the second call of qe.assertAnalyzed() is not the root cause. The only difference resides on sparkSession.sessionState.executePlan(logicalPlan).

I'll update the PR.

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-16360][SQL] Speed up SQL query performance by removing redundant analysis in Dataset [SPARK-16360][SQL] Speed up SQL query performance by removing redundant executePlan call Jul 5, 2016
@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Jul 5, 2016

Now, I update the title and description of PR/JIRA.
The only patch in this PR is the following one word change.

-    new Dataset[Row](sparkSession, logicalPlan, RowEncoder(qe.analyzed.schema))
+    new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))

Thank you all for fast review & advice. At first commit, I thought it is important to remove all repeating logics. But, now only the minimum meaningful code change remains.

@liancheng
Copy link
Contributor

LGTM pending Jenkins.

@SparkQA
Copy link

SparkQA commented Jul 5, 2016

Test build #61744 has finished for PR 14044 at commit 45eb28a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

LGTM

@asfgit asfgit closed this in 7f7eb39 Jul 5, 2016
@liancheng
Copy link
Contributor

Merged to master. Thanks!

@dongjoon-hyun
Copy link
Member Author

Thank you for review and merging, @liancheng , @cloud-fan , @hvanhovell , and @naliazheli !

@dongjoon-hyun dongjoon-hyun deleted the SPARK-16360 branch July 20, 2016 07:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants