[SPARK-29987][SQL] Add CatalogTable cache in SessionCatalog to improve performance#26627
[SPARK-29987][SQL] Add CatalogTable cache in SessionCatalog to improve performance#26627ulysses-you wants to merge 9 commits intoapache:masterfrom
Conversation
|
@rdblue @dongjoon-hyun Do you have time to take a review ? |
|
|
||
| private val validNameFormat = "([\\w_]+)".r | ||
|
|
||
| private val cachedCatalogTable = { |
There was a problem hiding this comment.
This is a cache, not a table. So the variable name would make more sense if it were catalogTableCache.
| cachedCatalogTable.put(qtn, catalogTable) | ||
| } | ||
|
|
||
| private[sql] def invalidateAllCachedCatalogTable(): Unit = { |
There was a problem hiding this comment.
This should be plural: invalidateAllCachedCatalogTables
| } | ||
| } | ||
|
|
||
| private[sql] def cacheCatalogTable(qtn: QualifiedTableName, catalogTable: CatalogTable): Unit = { |
There was a problem hiding this comment.
Caches should pass a Callable so that populating the cache can be combined with a get operation (get or initialize).
Instead of cacheCatalogTable, this should be getOrCacheCatalogTable(qtn: QualifiedTableName, init: Callable[CatalogTable]) that calls catalogTableCache.get(qtn, init).
There was a problem hiding this comment.
Here getOrCacheCatalogTable is just a simple wrapper doing nothing.
I suggest to move the lambda parameter in the caller into this method. It is in fact a part of "Get or Cache".
|
ok to test |
|
Test build #114256 has finished for PR 26627 at commit
|
|
Test build #114259 has finished for PR 26627 at commit
|
|
Test build #114260 has finished for PR 26627 at commit
|
|
Test build #114266 has finished for PR 26627 at commit
|
|
Test build #114271 has finished for PR 26627 at commit
|
| .internal() | ||
| .doc("The maximum expire seconds time of cache table catalog.") | ||
| .intConf | ||
| .checkValue(cacheSize => cacheSize >= 0 && cacheSize < 8, |
There was a problem hiding this comment.
cacheSize seems being copied from existing config items. Need rewording.
| buildStaticConf("spark.sql.filesourceTableCatalogCacheExpireSeconds") | ||
| .internal() | ||
| .doc("The maximum expire seconds time of cache table catalog.") | ||
| .intConf |
|
Test build #114291 has finished for PR 26627 at commit
|
| @@ -222,6 +228,7 @@ class SessionCatalog( | |||
| if (cascade && databaseExists(dbName)) { | |||
| listTables(dbName).foreach { t => | |||
| invalidateCachedTable(QualifiedTableName(dbName, t.table)) | |||
There was a problem hiding this comment.
These two methods invalidateCachedTable and invalidateCachedCatalogTable are really confusing in their names. I suggest to introduce some rewording to let the names more intuitive.
| invalidateAllCachedCatalogTables() | ||
| } | ||
|
|
||
| private[sql] def getCachedCatalogTable(qtn: QualifiedTableName): Option[CatalogTable] = { |
There was a problem hiding this comment.
Add comments. Other new methods as well.
| } | ||
| } | ||
|
|
||
| private[sql] def cacheCatalogTable(qtn: QualifiedTableName, catalogTable: CatalogTable): Unit = { |
There was a problem hiding this comment.
Here getOrCacheCatalogTable is just a simple wrapper doing nothing.
I suggest to move the lambda parameter in the caller into this method. It is in fact a part of "Get or Cache".
|
It seems that StatisticsSuite use the underlying method |
|
Test build #114298 has finished for PR 26627 at commit
|
|
Test build #114301 has finished for PR 26627 at commit
|
What changes were proposed in this pull request?
This pr is an another idea to improve sql performance. More details see 26603
Here is the idea:
Execute command plan always simply and quickly, but session catalog maybe called multi times. We can cache CatalogTable and keep a short time to improve this scene. This pr use a time-based
policy to cache CatalogTable.
Why are the changes needed?
Improve sql performance.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Add UT.