New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-25474][SQL][FOLLOW-UP] fallback to hdfs when relation table stats is not available #25460
Conversation
@@ -72,7 +72,8 @@ case class HadoopFsRelation( | |||
val compressionFactor = sqlContext.conf.fileCompressionFactor | |||
val defaultSize = (location.sizeInBytes * compressionFactor).toLong | |||
location match { | |||
case cfi: CatalogFileIndex if sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled => | |||
case cfi: CatalogFileIndex if sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled | |||
&& defaultSize == sqlContext.conf.defaultSizeInBytes => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
case cfi: CatalogFileIndex if sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled &&
defaultSize == sqlContext.conf.defaultSizeInBytes =>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have 2 questions here:
- Is this defaultSize the correct data size?
- If this defaultSize is the correct data size. Could we do some benchmark about
(location.sizeInBytes * compressionFactor).toLong
andCommandUtils.getSizeInBytesFallBackToHdfs(sparkSession, new Path(cfi.table.location), defaultSize)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wangyum
default size is coming as Long.MAX. If it is correct size it will not fallback to HDFS. Even if it is correct size, fallbackhdfs also give the same result. Also it is not a performance sensitive path I guess. Because, it will come here only when it requires to compute statistics, for eg: during join operation. And if the table already has statistics, flow will not come here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see benchmark here: #24715 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wangyum
(location.sizeInBytes * compressionFactor).toLong
is always 8.0EB
, even after the PR #24715
I am not sure I understand your comment. If the statistics doesn't exists, it has to fallback to HDFS. right? Next time onwards it will read from stats cache.
Number of times falling back to HDFS after this PR and #24715 are also same. right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can avoid this when constructing this CatalogFileIndex
:
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
Lines 383 to 387 in c30b529
val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes | |
val index = new CatalogFileIndex( | |
sparkSession, | |
catalogTable.get, | |
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, not defaultSize == sqlContext.conf.defaultSizeInBytes
but location.sizeInBytes == sqlContext.conf.defaultSizeInBytes
?
See: #22502 (comment)
But, does this comparison work well even when sqlContext.conf.defaultSizeInBytes
changed by users?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah good point! Basically there is no way to tell the table stats are available or not at this point. sqlContext.conf.defaultSizeInBytes
is configurable and it's possible that the table stats just equal to sqlContext.conf.defaultSizeInBytes
.
#24715 seems to be able to fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, so how about closing this and moving #24715 for more discussions about solving this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the table statistics is available here,
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
Lines 387 to 390 in 0ea8db9
val index = new CatalogFileIndex( | |
sparkSession, | |
catalogTable.get, | |
catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize)) |
Then it should be available here too right?
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
Lines 43 to 45 in 0ea8db9
catalogTable | |
.flatMap(_.stats.map(_.toPlanStats(output, conf.cboEnabled))) | |
.getOrElse(Statistics(sizeInBytes = relation.sizeInBytes)) |
So, ideally the flow shouldn't come to the fallback logic, if the table statistics already exists. That is why even after #24715, location.sizeInBytes
is 8.0EB
@cloud-fan Could you please give a reproducible test, where the issue can happen?
Test build #109146 has finished for PR 25460 at commit
|
retest this please |
Retest this please. |
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
Outdated
Show resolved
Hide resolved
Thank you for update, @shahidki31 . Hi, @cloud-fan , @maropu , @wangyum , @srowen . How do you think about that? |
I feel ok to do so (I think we should fix the existing issue on master/2.4/2.3...) |
To @shahidki31 . @maropu and @cloud-fan meant the corner case when the table size is equal to the user configuration value (not 8.0EB). Let say we set the configuration to 1GB and we have a static table T1 whose size happens to be 1GB. In that case, every query on that tables might invoke this function. Although it's a very special case, but it's a regression. So, @cloud-fan and @maropu suggested to close this PR and proceed with #24715 . I'm +1 for that suggestion because that is the correct way. I know that you are worrying that #24715 doesn't resolve 8.0EB issue. However, that should be covered by your UTs in the previous PR. In the worst case, some of your code might be reverted. However, your test cases should survive there. It's your contribution. I believe @wangyum 's PR will pass your existing test cases in addition to his new test code. That's the way we make Apache Spark stronger. How do you think about this, @shahidki31 ? It's a way of collaboration. |
@@ -72,7 +72,8 @@ case class HadoopFsRelation( | |||
val compressionFactor = sqlContext.conf.fileCompressionFactor | |||
val defaultSize = (location.sizeInBytes * compressionFactor).toLong | |||
location match { | |||
case cfi: CatalogFileIndex if sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled => | |||
case cfi: CatalogFileIndex if sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled && | |||
location.sizeInBytes == sqlContext.conf.defaultSizeInBytes => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The point @maropu and I were making is, location.sizeInBytes == sqlContext.conf.defaultSizeInBytes
doesn't mean the table stats are not available. sqlContext.conf.defaultSizeInBytes
is configurable, and it's possible that the table stats are the same as sqlContext.conf.defaultSizeInBytes
, in which case we shouldn't fallback to HDFS.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree. my point was, if the table statistics is not empty, it will not fallback to hdfs even without the condition. So, the PR itself isn't necessary. I will close this to #24715
Thanks @dongjoon-hyun , @cloud-fan @maropu for the feedbacks
Test build #109157 has finished for PR 25460 at commit
|
Test build #109180 has finished for PR 25460 at commit
|
What changes were proposed in this pull request?
When the table relation stats are not empty, do not fall back to HDFS for size estimation.
How was this patch tested?
Existing tests