Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31850][SQL]Prevent DetermineTableStats from computing stats multiple times for same table #28662

Closed
wants to merge 9 commits into from

Conversation

karuppayya
Copy link
Contributor

@karuppayya karuppayya commented May 28, 2020

What changes were proposed in this pull request?

  • Changes to avoid stats computation for same table multiple times

Repro steps

spark.sql("set spark.sql.hive.convertMetastoreParquet=false")
spark.sql("create table c(id INT, name STRING) STORED AS PARQUET")
val df = spark.sql("select count(id) id from c group by name order by id " )
df.queryExecution.analyzed

Stacktrace indicating that stats collection happens multiple times:

  at org.apache.spark.sql.hive.DetermineTableStats.hiveTableWithStats(HiveStrategies.scala:121)
  at org.apache.spark.sql.hive.DetermineTableStats$$anonfun$apply$2.applyOrElse(HiveStrategies.scala:150)
  at org.apache.spark.sql.hive.DetermineTableStats$$anonfun$apply$2.applyOrElse(HiveStrategies.scala:147)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$2(AnalysisHelper.scala:108)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$Lambda$870.808816071.apply(Unknown Source:-1)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$1(AnalysisHelper.scala:108)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$Lambda$869.1113025977.apply(Unknown Source:-1)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown(AnalysisHelper.scala:106)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown$(AnalysisHelper.scala:104)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDown(LogicalPlan.scala:29)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$4(AnalysisHelper.scala:113)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$Lambda$872.1354725727.apply(Unknown Source:-1)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$1144.1492742163.apply(Unknown Source:-1)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$1(AnalysisHelper.scala:113)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$Lambda$869.1113025977.apply(Unknown Source:-1)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown(AnalysisHelper.scala:106)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown$(AnalysisHelper.scala:104)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDown(LogicalPlan.scala:29)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$4(AnalysisHelper.scala:113)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$Lambda$872.1354725727.apply(Unknown Source:-1)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$1144.1492742163.apply(Unknown Source:-1)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$1(AnalysisHelper.scala:113)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$Lambda$869.1113025977.apply(Unknown Source:-1)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown(AnalysisHelper.scala:106)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown$(AnalysisHelper.scala:104)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDown(LogicalPlan.scala:29)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:73)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:72)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:29)
  at org.apache.spark.sql.hive.DetermineTableStats.apply(HiveStrategies.scala:147)
  at org.apache.spark.sql.hive.DetermineTableStats.apply(HiveStrategies.scala:114)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:149)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$863.668742490.apply(Unknown Source:-1)
  at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
  at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
  at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:49)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:146)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:138)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$862.773865813.apply(Unknown Source:-1)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:176)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggregateFunctions$$anonfun$apply$20.applyOrElse(Analyzer.scala:2139)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggregateFunctions$$anonfun$apply$20.applyOrElse(Analyzer.scala:2116)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUp$3(AnalysisHelper.scala:90)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$Lambda$866.1662235713.apply(Unknown Source:-1)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUp$1(AnalysisHelper.scala:90)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$Lambda$864.152426436.apply(Unknown Source:-1)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:86)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:84)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:29)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggregateFunctions$.apply(Analyzer.scala:2116)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggregateFunctions$.apply(Analyzer.scala:2115)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:149)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$863.668742490.apply(Unknown Source:-1)
  at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
  at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
  at scala.collection.immutable.List.foldLeft(List.scala:89)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:146)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:138)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$862.773865813.apply(Unknown Source:-1)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:176)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:170)
  • In the above trace we can see DetermineTableStats got triggered as part of executing ResolveAggregateFunctions.
  • ResolveAggregateFunctions is part of a batch which has FixedPoint as its execution strategy, hence the computation can happen any number of times, based on when Fixed point is reached.
  • This is not specific to ResolveAggregateFunctions, any analyzer rule which invokes org.apache.spark.sql.catalyst.analysis.Analyzer#executeSameContext will face this issue.
  • In best case , DetermineTableStats rule will run atleast thrice as part of analysis phase, twice as part of ResolveAggregateFunctions(assuming fixed point is reached in the first two attempts ) and once as past of postHocResolutionRules#DetermineTableStats

 Note: 

  • There is no log line in DetermineTableStats to indicate that stats compute happened. Need to add a log line or use a debugger

  • The above can be repro-ed with first query on a created table.

Why are the changes needed?

Stats computation might be an expensive operation especially for a large table
Once stats are computed for a table, it can be re-used.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit tests added

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@karuppayya karuppayya closed this May 28, 2020
@karuppayya karuppayya reopened this May 28, 2020
@karuppayya
Copy link
Contributor Author

@dongjoon-hyun @cloud-fan @gatorsmile Can anyone please help reviewing this change. Thanks

@karuppayya karuppayya changed the title [SPARK-31850] Prevent DetermineTableStats from computing stats multiple times for same table [SPARK-31850][SQL]Prevent DetermineTableStats from computing stats multiple times for same table May 29, 2020
@karuppayya
Copy link
Contributor Author

karuppayya commented May 29, 2020

Tagging few more committers from the file's git history, for review: @HeartSaVioR @holdenk @maropu
Thank you.

fs.getContentSummary(tablePath).getLength
val table = relation.tableMeta
val relationSizeMap = getRelationToSizeMap
if (relationSizeMap.contains(table.identifier)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this rule catch up the stats updates of base relations in this approach?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have now handled this in a more generic way in 0a1fb93.
The way to refresh table in a Spark session is using the REFRESH TABLE <tblname> command or the corresponding DF apis(Please let me know if this is not correct).
With this change, the cache invalidation happens whenever a refresh table is invoked

@maropu
Copy link
Member

maropu commented Jun 1, 2020

Even if #28662#28686. fixed, we still need this fix? At least, it seems the issue of the query above will go away if #28662#28686. merged.

@karuppayya
Copy link
Contributor Author

@maropu in the above comment are referring to chnages in #28686 ?
Even if #28686 is fixed, the issue will still be there when the flag to convert Parquet/Orc Hive table to datasource tables is disabled.

@maropu
Copy link
Member

maropu commented Jun 2, 2020

Ur, that's a wrong number. As you said, I meant #28686.

@maropu
Copy link
Member

maropu commented Jun 2, 2020

You disable that option in your usecase? I thought most users turn on the flag to convert it to a datasrouce table. Anyway, I just want to know a priority to fix this issue.

@karuppayya
Copy link
Contributor Author

karuppayya commented Jun 2, 2020

@maropu In my example I took the case of parquet as data format. This can happen with formats other than parquet/orc(like JSON, CSV etc)

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why DetermineTableStats will calculate the statistics multiple times? Do you mean it is caused bydf.queryExecution.analyzed like the PR description shows? Once you finish query analysis of a dataframe, the analyzed plan is kept as QueryExecution.analyzed. Why accessing it will cause re-calculation?

// Hive table columns are always nullable.
table.dataSchema.asNullable.toAttributes,
table.partitionSchema.asNullable.toAttributes)
def readHiveTable(catalog: SessionCatalog, table: CatalogTable): HiveTableRelation = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see readHiveTable is used by writing path (InsertIntoStatement) too. If we just get cached plan, will it be dangerous if the cached plan is out-of-date and Spark writes with incorrect metadata?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to reuse the cache of Datasource tables for Hive tables
InsertIntoStatement for Datasource tables, also fetches from the same cache. The cache invalidation have been taken care.
From my reading, I didnt find any cases. Let me know if you find any cases that needs special handling. I will also check the code from this perspective again.

@karuppayya
Copy link
Contributor Author

karuppayya commented Jun 4, 2020

Can you explain why DetermineTableStats will calculate the statistics multiple times?

  at org.apache.spark.sql.hive.DetermineTableStats.hiveTableWithStats(HiveStrategies.scala:121)
  at org.apache.spark.sql.hive.DetermineTableStats$$anonfun$apply$2.applyOrElse(HiveStrategies.scala:150)
  at org.apache.spark.sql.hive.DetermineTableStats$$anonfun$apply$2.applyOrElse(HiveStrategies.scala:147)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$2(AnalysisHelper.scala:108)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$Lambda$870.808816071.apply(Unknown Source:-1)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$1(AnalysisHelper.scala:108)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$Lambda$869.1113025977.apply(Unknown Source:-1)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown(AnalysisHelper.scala:106)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown$(AnalysisHelper.scala:104)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDown(LogicalPlan.scala:29)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$4(AnalysisHelper.scala:113)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$Lambda$872.1354725727.apply(Unknown Source:-1)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$1144.1492742163.apply(Unknown Source:-1)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$1(AnalysisHelper.scala:113)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$Lambda$869.1113025977.apply(Unknown Source:-1)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown(AnalysisHelper.scala:106)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown$(AnalysisHelper.scala:104)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDown(LogicalPlan.scala:29)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$4(AnalysisHelper.scala:113)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$Lambda$872.1354725727.apply(Unknown Source:-1)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$1144.1492742163.apply(Unknown Source:-1)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$1(AnalysisHelper.scala:113)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$Lambda$869.1113025977.apply(Unknown Source:-1)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown(AnalysisHelper.scala:106)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown$(AnalysisHelper.scala:104)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDown(LogicalPlan.scala:29)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:73)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:72)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:29)
  at org.apache.spark.sql.hive.DetermineTableStats.apply(HiveStrategies.scala:147)
  at org.apache.spark.sql.hive.DetermineTableStats.apply(HiveStrategies.scala:114)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:149)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$863.668742490.apply(Unknown Source:-1)
  at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
  at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
  at scala.collection.mutable.ArrayBuffer.foldLeft(ArrayBuffer.scala:49)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:146)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:138)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$862.773865813.apply(Unknown Source:-1)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:176)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggregateFunctions$$anonfun$apply$20.applyOrElse(Analyzer.scala:2139)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggregateFunctions$$anonfun$apply$20.applyOrElse(Analyzer.scala:2116)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUp$3(AnalysisHelper.scala:90)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$Lambda$866.1662235713.apply(Unknown Source:-1)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUp$1(AnalysisHelper.scala:90)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$Lambda$864.152426436.apply(Unknown Source:-1)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:86)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:84)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:29)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggregateFunctions$.apply(Analyzer.scala:2116)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggregateFunctions$.apply(Analyzer.scala:2115)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:149)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$863.668742490.apply(Unknown Source:-1)
  at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
  at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
  at scala.collection.immutable.List.foldLeft(List.scala:89)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:146)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:138)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$862.773865813.apply(Unknown Source:-1)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:176)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:170)
  • In the above trace we can see DetermineTableStats got triggered as part of executing ResolveAggregateFunctions.
  • ResolveAggregateFunctions is part of a batch which has FixedPoint as its execution strategy, hence the computation can happen any number of times, based on when Fixed point is reached.
  • This is not specific to ResolveAggregateFunctions, any analyzer rule which invokes org.apache.spark.sql.catalyst.analysis.Analyzer#executeSameContext will face this issue.
  • In best case , DetermineTableStats rule will run atleast thrice as part of analysis, twice as part of ResolveAggregateFunctions(assuming fixed point is reached in the first two attempts ) and once as past of postHocResolutionRules#DetermineTableStats

Once you finish query analysis of a dataframe, the analyzed plan is kept as QueryExecution.analyzed. Why accessing it will cause re-calculation?

In the description, i had written the code to trigger the analysis phase. At the end of analysis phase, DetermineTableStats would have run multiple times, which could slow down query perf.

@viirya
Copy link
Member

viirya commented Jun 4, 2020

I see, it is because calling executeSameContext to analyze one logical plan during analysis. I'd say that the reproducing steps in the description is confusing because it doesn't show any evidence about multiple runs of DetermineTableStats. I think it is better to describe the issue clearly.

Btw, as we already calculate statistics and save into HiveTableRelation in DetermineTableStats. To prevent redundant calculation, is it much easier and simpler to just add new condition like:

class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
    case relation: HiveTableRelation
      if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty  && relation.tableStats.isEmpty =>
      hiveTableWithStats(relation)
    ...
}

Then I think Spark doesn't re-calculate if HiveTableRelation already has tableStats defined.

@karuppayya
Copy link
Contributor Author

The above condition is already present.
But we return a copy of relation(code: https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala#L137) with the updated Table Stats at the end of the method

  • When ResolvedAggregateFunction rule runs again(to achieve Fixed point), it will not be aware of the updated relation. executeWithSameContext with rerun the Stats collection as part of DetermineTableStats rule.
  • When the DetermineTableStats rule actually runs as part of Analysis phase, it will not be aware of the updated relation
    @viirya

@karuppayya karuppayya requested review from viirya and maropu June 8, 2020 16:34
@karuppayya
Copy link
Contributor Author

@viirya @maropu Can you please help review this PR

@karuppayya
Copy link
Contributor Author

karuppayya commented Jun 17, 2020

@viirya @maropu @HyukjinKwon Can you please help review this PR

@karuppayya
Copy link
Contributor Author

#28686 should handle most cases. Closing this PR.

@karuppayya karuppayya closed this Jul 6, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants