-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25474][SQL] Support spark.sql.statistics.fallBackToHdfs
in data source tables
#22502
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shahidki31 . The code doesn't look specific to Parquet data source. If then, please remove in case of parquet datasource table
from the title.
@dongjoon-hyun . Thanks for the comment. I have modified the title. Kindly review the PR. |
Hi @cloud-fan , could you please review the code. |
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
Show resolved
Hide resolved
@shahidki31 thanks for fixing it! Do you know where we read |
@cloud-fan Thanks. I will check and update the PR. |
Test build #101276 has finished for PR 22502 at commit
|
} | ||
|
||
private def sizeInBytesFallBackToHdfs: Long = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than repeat the 'compression' part here, you could inline this method, return
from the try block, and ignore the exception, falling through to a default return value with the 'compression' logic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have updated the PR, based on the above comments
…tes is coming as default size in bytes ( 8.0 EB)
…tes is coming as default size in bytes ( 8.0 EB)
Test build #106905 has finished for PR 22502 at commit
|
val compressionFactor = sqlContext.conf.fileCompressionFactor | ||
(location.sizeInBytes * compressionFactor).toLong | ||
val defaultSize = (location.sizeInBytes * compressionFactor).toLong | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
location match {
case cfi: CatalogFileIndex if sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled =>
DDLUtils...
case _ => defaultSize
}
maybe?
Does it make sense to push the check for sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled
into the method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Updated.
Does it make sense to push the check for sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled into the method?
The method sizeInBytesFallBackToHdfs
is supposed for getting sizeInBytes from hdfs, if user enable the fallback
configuration. I am not sure about moving configuration check to the method sizeInBytesFallBackToHdfs
.
location match { | ||
case _: CatalogFileIndex if sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled => | ||
DDLUtils.sizeInBytesFallBackToHdfs(sparkSession, | ||
new Path(location.asInstanceOf[CatalogFileIndex].table.location), defaultSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you name the case match variable, you already have it as a cast here. But yeah this is cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Updated.
Test build #106935 has finished for PR 22502 at commit
|
Test build #106940 has finished for PR 22502 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks reasonable to me. @dongjoon-hyun ?
I think the correct approach should be to add a new rule(#24715) if the issue occurs at the table level.
For example, after #24715: [root@spark-3267648 spark]# bin/spark-shell --conf spark.sql.statistics.fallBackToHdfs=true
Spark context Web UI available at http://spark-3267648.lvs02.dev.ebayc3.com:4040
Spark context available as 'sc' (master = local[*], app id = local-1561652081851).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.0.0-SNAPSHOT
/_/
Using Scala version 2.12.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_211)
Type in expressions to have them evaluated.
Type :help for more information.
scala> spark.sql("create table table1 (id int, name string) using parquet partitioned by (name)")
res0: org.apache.spark.sql.DataFrame = []
scala> spark.sql("insert into table1 values (1, 'a')")
res1: org.apache.spark.sql.DataFrame = []
scala> spark.sql("explain cost select * from table1").show(false)
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|plan |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|== Optimized Logical Plan ==
Relation[id#2,name#3] parquet, Statistics(sizeInBytes=421.0 B)
== Physical Plan ==
*(1) FileScan parquet default.table1[id#2,name#3] Batched: true, DataFilters: [], Format: Parquet, Location: CatalogFileIndex[file:/root/opensource/spark/spark-warehouse/table1], PartitionCount: 1, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:int>
|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |
@wangyum The issue seems happening only with the catalogFileIndex datasource tables. For InMemoryFileIndex case, sizeInBytes is already estimating from the hdfs. That is why in the PR I have put the condition for only catalogFileIndex. Line 103 in a7e1619
|
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very sorry for the delay, @srowen and @shahidki31 .
I left a few comments. I'll review again after rebasing and updating.
Thanks @srowen @dongjoon-hyun @maropu for the review comments. I have updated the code. |
Test build #108271 has finished for PR 22502 at commit
|
Test build #108272 has finished for PR 22502 at commit
|
Test build #108277 has finished for PR 22502 at commit
|
sql("CREATE TABLE t1 (id INT, name STRING) USING PARQUET PARTITIONED BY (name)") | ||
sql("INSERT INTO t1 VALUES (1, 'a')") | ||
// Analyze command updates the statistics of table `t1` | ||
sql("analyze table t1 compute statistics") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit.
- // Analyze command updates the statistics of table `t1`
- sql("analyze table t1 compute statistics")
+ sql("ANALYZE TABLE t1 COMPUTE STATISTICS")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shahidki31 . I'll fix this during merging~
} | ||
} | ||
} | ||
assert(sizeInBytesEnabledFallBack === sizeInBytesDisabledFallBack) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ur, if the fallback logic returns the same value with Never mind.ANALYZE TABLE t1 COMPUTE STATISTICS
, this assertion doesn't prove anything.
spark.sql.statistics.fallBackToHdfs
in data source tables
@@ -1484,4 +1484,44 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto | |||
} | |||
} | |||
} | |||
|
|||
test("SPARK-25474: test sizeInBytes for CatalogFileIndex dataSourceTable") { | |||
withSQLConf("spark.sql.statistics.fallBackToHdfs" -> "true") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key
.
|
||
var sizeInBytesDisabledFallBack, sizeInBytesEnabledFallBack = 0L | ||
Seq(true, false).foreach { fallBackToHdfs => | ||
withSQLConf("spark.sql.statistics.fallBackToHdfs" -> fallBackToHdfs.toString) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @shahidki31 , @maropu , @srowen , and @wangyum .
Merged to master.
Could you make two backporting PRs to |
cc @gatorsmile and @cloud-fan |
Thank you @dongjoon-hyun for merging. Sure, I will create PRs for backporting. |
Thanks! |
(location.sizeInBytes * compressionFactor).toLong | ||
val defaultSize = (location.sizeInBytes * compressionFactor).toLong | ||
location match { | ||
case cfi: CatalogFileIndex if sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should only fallback to HDFS if the table stats are not available from table metadata. How can we know the table stats are not available here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked the code in DataSource.resolveRelation
, the CatalogFileIndex
is created as
val index = new CatalogFileIndex(
sparkSession,
catalogTable.get,
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch! Then, shall we compare the location.sizeInBytes
with spark.sql.defaultSizeInBytes
in order to check that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think (location.sizeInBytes * compressionFactor).toLong
is enough.
And (location.sizeInBytes * compressionFactor).toLong
is faster than CommandUtils.getSizeInBytesFallBackToHdfs(sparkSession, new Path(cfi.table.location), defaultSize)
.
That why I only recalculate table statistics if we go this code path:
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
Lines 383 to 387 in c30b529
val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes | |
val index = new CatalogFileIndex( | |
sparkSession, | |
catalogTable.get, | |
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize)) |
https://github.com/apache/spark/pull/24715/files#diff-d99813bd5bbc18277e4090475e4944cfR643-R646
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure, is there any issue in this PR. As per this code, if the table doesn't have any statistics, then only will come to the sizeInBytes
method. May be we can add the extra check mentioned above.
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
Lines 42 to 46 in 0526529
override def computeStats(): Statistics = { | |
catalogTable | |
.flatMap(_.stats.map(_.toPlanStats(output, conf.cboEnabled))) | |
.getOrElse(Statistics(sizeInBytes = relation.sizeInBytes)) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC the issue in this PR is, we always fallback to HDFS stats even if table stats are available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan I have created a followup PR to add the extra condition. Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I have prepared some tests to illustrate this issue. These tests can be passed before this commit:
@wangyum First and 3rd will pass after the PR. 2nd test is a bug which fixed in the commit.
Btw, 1st and 3rd tests are not CatalogFileIndex, which any way won't come to this flow
Hi, guys. As @cloud-fan mentioned, since there is a regression case, I'll revert this from cc @kiszk since he is a release manager for 2.3.4. |
What changes were proposed in this pull request?
In case of CatalogFileIndex datasource table, sizeInBytes is always coming as default size in bytes, which is 8.0EB (Even when the user give fallBackToHdfsForStatsEnabled=true) . So, the datasource table which has CatalogFileIndex, always prefer SortMergeJoin, instead of BroadcastJoin, even though the size is below broadcast join threshold.
In this PR, In case of CatalogFileIndex table, if we enable "fallBackToHdfsForStatsEnabled=true", then the computeStatistics get the sizeInBytes from the hdfs and we get the actual size of the table. Hence, during join operation, when the table size is below broadcast threshold, it will prefer broadCastHashJoin instead of SortMergeJoin.
How was this patch tested?
Added UT