-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25474][SQL][DOCS] Update the docs for spark.sql.statistics.fallBackToHdfs #24715
Conversation
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { | ||
case logicalRelation @ LogicalRelation(_, _, catalogTable, _) if catalogTable.nonEmpty && | ||
catalogTable.forall(DDLUtils.isDatasourceTable) && catalogTable.forall(_.stats.isEmpty) => | ||
val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this code will be a duplicate at #24712. Are there more places that similar patterns happen? Let's do that PR first and deduplicate it if there are some more places to deduplicate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm move DetermineTableStats
from HiveStrategies
to DataSourceStrategy
to reduce duplicate.
Test build #105808 has finished for PR 24715 at commit
|
# Conflicts: # sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala # sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
Test build #106995 has finished for PR 24715 at commit
|
Test build #107010 has finished for PR 24715 at commit
|
Test build #107034 has finished for PR 24715 at commit
|
retest this please |
Test build #107035 has finished for PR 24715 at commit
|
Test build #107080 has finished for PR 24715 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
Outdated
Show resolved
Hide resolved
The idea LGTM, can you rebase this PR? |
# Conflicts: # sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala # sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
I did some benchmark. Prepare data: spark.range(100000000).repartition(10000).write.saveAsTable("test_non_partition_10000")
spark.range(100000000).repartition(300000).write.saveAsTable("test_non_partition_300000")
spark.range(100000000).selectExpr("id", "id % 5000 as c2", "id as c3").repartition(org.apache.spark.sql.functions.col("c2")).write.partitionBy("c2").saveAsTable("test_partition_5000")
spark.range(100000000).selectExpr("id", "id % 10000 as c2", "id as c3").repartition(org.apache.spark.sql.functions.col("c2")).write.partitionBy("c2").saveAsTable("test_partition_10000") Add these lines to LogicalRelation.computeStats: val time1 = System.currentTimeMillis()
val relationSize = relation.sizeInBytes
val time2 = System.currentTimeMillis()
val fallBackToHdfsSize = CommandUtils.getSizeInBytesFallBackToHdfs(relation.sqlContext.sparkSession, catalogTable.get)
val time3 = System.currentTimeMillis()
// scalastyle:off
println(s"Get size from relation: $relationSize, time: ${time2 - time1}")
println(s"Get size fall back to HDFS: $fallBackToHdfsSize, time: ${time3 - time2}")
// scalastyle:on Non-partitioned table benchmark result:
Partitioned table benchmark result:
Partitioned table with
|
@wangyum do you mean |
|
||
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { | ||
// For the data source table, we only recalculate the table statistics when it creates | ||
// the CatalogFileIndex using defaultSizeInBytes. See SPARK-25474 for more details. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when it creates the CatalogFileIndex using defaultSizeInBytes
-> when the table stats are not available
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
*/ | ||
class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { | ||
|
||
private val sessionConf = session.sessionState.conf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: just call it conf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes)))) | ||
logical.copy(catalogTable = Some(withStats)) | ||
|
||
case relation: HiveTableRelation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we catch InsertIntoTable(HiveTableRelation)
as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@advancedxy Already work on this:c86a27b
// Non-partitioned table | ||
withTempDir { dir => | ||
Seq(false, true).foreach { fallBackToHDFSForStats => | ||
withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> s"$fallBackToHDFSForStats") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this config has no effect in this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// fallBackToHDFSForStats = true: The table stats will be recalculated by DetermineTableStats
// fallBackToHDFSForStats = false: The table stats will be recalculated by FileIndex
} | ||
|
||
// Partitioned table | ||
Seq(false, true).foreach { fallBackToHDFSForStats => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please create a test case for it. e.g.
test("partitioned data source tables support fallback to HDFS for size estimation")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
Test build #109415 has finished for PR 24715 at commit
|
} | ||
} | ||
|
||
test("External partitioned data source table does not support fallback to HDFS " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how is this implemented?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not support it:
if (fallBackToHDFS) {
assert(relation.stats.sizeInBytes === 0)
} else {
assert(relation.stats.sizeInBytes === conf.defaultSizeInBytes)
}
val relation = spark.table("spark_25474").queryExecution.analyzed.children.head | ||
assert(spark.table("spark_25474").count() === 5) | ||
if (fallBackToHDFS) { | ||
assert(relation.stats.sizeInBytes === 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a serious issue here: the wrong stats may mislead Spark to broadcast a very large table and OOM.
I think we can only fallback to HDFS size for non-partitioned tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 Partitioned table usually very large.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shahidki31 @dongjoon-hyun What do you think ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for disabling for partitioned tables.
This is master
branch status, isn't it? Previously, Spark returns 8EB
in this case safely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, non-partitioned data source table is already getting the correct statistics. I am not sure, we need to support fallback to HDFS for size for non partitioned table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wangyum . Why do we need to revert that? You can revert the functional part here and keep the test codes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For me, this PR already contains the reverting here(#24715 (comment)).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally, I'm -1 for removing the existing test cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun It is expensive to support partitioned table with external partitions. Please see this test case. Its data size is incorrect.
Related discussion:
#24715 (comment)
#24715 (comment)
So we plan do not fallback to HDFS size for partitioned tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 We need to document it.
@wangyum May be, in this PR only you can do both, if we doesn't support fallback config.
s"PARTITIONED BY(a) LOCATION '${dir.toURI}'") | ||
|
||
withTempDir { partitionDir => | ||
spark.range(5).write.mode(SaveMode.Overwrite).parquet(partitionDir.getCanonicalPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark.range(5).toDF("b")
instead of spark.range(5)
?
# Conflicts: # sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala # sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala # sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
"determining if a table is small enough to use auto broadcast joins. " + | ||
"For non-partitioned data source table, it will be automatically recalculated if table " + | ||
"statistics are not available. For partitioned data source table, It is " + | ||
s"'${DEFAULT_SIZE_IN_BYTES.key}' if table statistics are not available.") | ||
.booleanConf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shahidki31 I have updated the documentation. Please take a look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @wangyum . Looks good. Shall we add it in the configuration.md also. It seems these configs are not there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @wangyum . We had better minimize the patch diff always. Please keep the original location of this conf. I don't see any difference for the following three lines.
val ENABLE_FALL_BACK_TO_HDFS_FOR_STATS = buildConf("spark.sql.statistics.fallBackToHdfs")
.booleanConf
.createWithDefault(false)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because we added DEFAULT_SIZE_IN_BYTES to ENABLE_FALL_BACK_TO_HDFS_FOR_STATS
. So we need to move DEFAULT_SIZE_IN_BYTES
before ENABLE_FALL_BACK_TO_HDFS_FOR_STATS
, otherwise:
[error] [warn] /home/jenkins/workspace/SparkPullRequestBuilder/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:1224: Reference to uninitialized value DEFAULT_SIZE_IN_BYTES
[error] [warn] s"'${DEFAULT_SIZE_IN_BYTES.key}' if table statistics are not available.")
[error] [warn]
[warn] 8 warnings found
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shahidki31 Yes. Running the SET -v
command will show the entire list of the SQL configuration:
http://spark.apache.org/docs/latest/configuration.html#spark-sql
Test build #109645 has finished for PR 24715 at commit
|
Test build #109647 has finished for PR 24715 at commit
|
@@ -1230,6 +1224,16 @@ object SQLConf { | |||
.bytesConf(ByteUnit.BYTE) | |||
.createWithDefault(Long.MaxValue) | |||
|
|||
val ENABLE_FALL_BACK_TO_HDFS_FOR_STATS = buildConf("spark.sql.statistics.fallBackToHdfs") | |||
.doc("This flag is effective only if it is Hive table. When true, it will fall back to HDFS " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hive table
-> Non-partitioned Hive table
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently Hive partitioned table is supported, do you think we need to disable it?
spark/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
Lines 116 to 136 in 07c4b9b
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { | |
case relation: HiveTableRelation | |
if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty => | |
val table = relation.tableMeta | |
val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { | |
try { | |
val hadoopConf = session.sessionState.newHadoopConf() | |
val tablePath = new Path(table.location) | |
val fs: FileSystem = tablePath.getFileSystem(hadoopConf) | |
fs.getContentSummary(tablePath).getLength | |
} catch { | |
case e: IOException => | |
logWarning("Failed to get table size from hdfs.", e) | |
session.sessionState.conf.defaultSizeInBytes | |
} | |
} else { | |
session.sessionState.conf.defaultSizeInBytes | |
} | |
val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes)))) | |
relation.copy(tableMeta = withStats) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I think so. Can you send a new PR to fix it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK.
@@ -1230,6 +1224,16 @@ object SQLConf { | |||
.bytesConf(ByteUnit.BYTE) | |||
.createWithDefault(Long.MaxValue) | |||
|
|||
val ENABLE_FALL_BACK_TO_HDFS_FOR_STATS = buildConf("spark.sql.statistics.fallBackToHdfs") | |||
.doc("When true, it will fall back to HDFS if the table statistics are not available from " + | |||
"table metadata. This is useful in determining if a table is small enough to use auto " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
auto broadcast join
? maybe just say broadcast join
val ENABLE_FALL_BACK_TO_HDFS_FOR_STATS = buildConf("spark.sql.statistics.fallBackToHdfs") | ||
.doc("When true, it will fall back to HDFS if the table statistics are not available from " + | ||
"table metadata. This is useful in determining if a table is small enough to use auto " + | ||
"broadcast joins. This flag is effective only if it is non-partitioned Hive table. " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only for non-partitioned Hive tables
Test build #109850 has finished for PR 24715 at commit
|
Test build #109851 has finished for PR 24715 at commit
|
retest this please |
Test build #109853 has finished for PR 24715 at commit
|
thanks, merging to master! |
What changes were proposed in this pull request?
This PR update
spark.sql.statistics.fallBackToHdfs
's doc:Related code:
Non-partitioned data source table:
SizeInBytesOnlyStatsPlanVisitor.default() -> LogicalRelation.computeStats() -> HadoopFsRelation.sizeInBytes() -> PartitioningAwareFileIndex.sizeInBytes()
PartitioningAwareFileIndex.sizeInBytes()
is calculated byallFiles().map(_.getLen).sum
if table statistics are not available.Partitioned data source table:
SizeInBytesOnlyStatsPlanVisitor.default() -> LogicalRelation.computeStats() -> CatalogFileIndex.sizeInBytes
CatalogFileIndex.sizeInBytes
is spark.sql.defaultSizeInBytes if table statistics are not available.How was this patch tested?
N/A