From ad6d8000a6acdb7de539f2103663497af8900940 Mon Sep 17 00:00:00 2001 From: "Luan, Xuedong" Date: Thu, 13 Aug 2020 21:41:48 +0800 Subject: [PATCH] Backport [SPARK-30616][SQL] Introduce TTL config option for SQL Metadata Cache (#14) --- .../sql/catalyst/catalog/SessionCatalog.scala | 12 ++++- .../apache/spark/sql/internal/SQLConf.scala | 2 + .../spark/sql/internal/StaticSQLConf.scala | 13 ++++++ .../catalog/SessionCatalogSuite.scala | 45 +++++++++++++++++-- .../datasources/FileStatusCache.scala | 17 +++++-- .../datasources/FileIndexSuite.scala | 28 +++++++++++- 6 files changed, 107 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 8e1f7763f1db1..37a0890058990 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.catalog import java.net.URI import java.util.{Locale, UUID} import java.util.concurrent.Callable +import java.util.concurrent.TimeUnit import javax.annotation.concurrent.GuardedBy import scala.collection.mutable @@ -147,7 +148,16 @@ class SessionCatalog( private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = { val cacheSize = conf.tableRelationCacheSize - CacheBuilder.newBuilder().maximumSize(cacheSize).build[QualifiedTableName, LogicalPlan]() + val cacheTTL = conf.metadataCacheTTL + + var builder = CacheBuilder.newBuilder() + .maximumSize(cacheSize) + + if (cacheTTL > 0) { + builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS) + } + + builder.build[QualifiedTableName, LogicalPlan]() } /** This method provides a way to get a cached plan. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f379e1ed48e04..14f8b159f8a3c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3166,6 +3166,8 @@ class SQLConf extends Serializable with Logging { def maxNumberForTemporaryTablesPerSession: Long = getConf(StaticSQLConf.TEMPORARY_TABLE_MAX_NUM_PER_SESSION) + def metadataCacheTTL: Long = getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index 872d054258a90..0136cd572e434 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.internal import java.util.Locale +import java.util.concurrent.TimeUnit import org.apache.spark.network.util.ByteUnit import org.apache.spark.util.Utils @@ -233,6 +234,18 @@ object StaticSQLConf { .intConf .createWithDefault(100) + val METADATA_CACHE_TTL_SECONDS = buildStaticConf("spark.sql.metadataCacheTTLSeconds") + .doc("Time-to-live (TTL) value for the metadata caches: partition file metadata cache and " + + "session catalog cache. This configuration only has an effect when this value having " + + "a positive value (> 0). It also requires setting " + + s"'${StaticSQLConf.CATALOG_IMPLEMENTATION.key}' to `hive`, setting " + + s"'${SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE.key}' > 0 and setting " + + s"'${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key}' to `true` " + + "to be applied to the partition file metadata cache.") + .version("3.1.0") + .timeConf(TimeUnit.SECONDS) + .createWithDefault(-1) + val SPARK_SCRATCH_DIR = buildStaticConf("spark.scratchdir") .doc("Scratch space for Spark temporary table and so on. Similar with hive.exec.scratchdir") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index abaaa1a882d21..84151060f8443 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -17,15 +17,19 @@ package org.apache.spark.sql.catalyst.catalog +import scala.concurrent.duration._ + +import org.scalatest.concurrent.Eventually + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View} +import org.apache.spark.sql.catalyst.plans.logical.{Command, Range, SubqueryAlias, View} import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.types._ class InMemorySessionCatalogSuite extends SessionCatalogSuite { @@ -45,7 +49,7 @@ class InMemorySessionCatalogSuite extends SessionCatalogSuite { * signatures but do not extend a common parent. This is largely by design but * unfortunately leads to very similar test code in two places. */ -abstract class SessionCatalogSuite extends AnalysisTest { +abstract class SessionCatalogSuite extends AnalysisTest with Eventually { protected val utils: CatalogTestUtils protected val isHiveExternalCatalog = false @@ -70,6 +74,16 @@ abstract class SessionCatalogSuite extends AnalysisTest { catalog.reset() } } + + private def withConfAndEmptyCatalog(conf: SQLConf)(f: SessionCatalog => Unit): Unit = { + val catalog = new SessionCatalog(newEmptyCatalog(), new SimpleFunctionRegistry(), conf) + catalog.createDatabase(newDb("default"), ignoreIfExists = true) + try { + f(catalog) + } finally { + catalog.reset() + } + } // -------------------------------------------------------------------------- // Databases // -------------------------------------------------------------------------- @@ -1641,4 +1655,27 @@ abstract class SessionCatalogSuite extends AnalysisTest { assert(cause.cause.get.getMessage.contains("Actual error")) } } + + test("expire table relation cache if TTL is configured") { + case class TestCommand() extends Command + + val conf = new SQLConf() + conf.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, 1L) + + withConfAndEmptyCatalog(conf) { catalog => + val table = QualifiedTableName(catalog.getCurrentDatabase, "test") + + // First, make sure the test table is not cached. + assert(catalog.getCachedTable(table) === null) + + catalog.cacheTable(table, TestCommand()) + assert(catalog.getCachedTable(table) !== null) + + // Wait until the cache expiration. + eventually(timeout(3.seconds)) { + // And the cache is gone. + assert(catalog.getCachedTable(table) === null) + } + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala index aea27bd4c4d7f..b5d800f02862e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ @@ -44,7 +45,9 @@ object FileStatusCache { session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) { if (sharedCache == null) { sharedCache = new SharedInMemoryCache( - session.sqlContext.conf.filesourcePartitionFileCacheSize) + session.sqlContext.conf.filesourcePartitionFileCacheSize, + session.sqlContext.conf.metadataCacheTTL + ) } sharedCache.createForNewClient() } else { @@ -89,7 +92,7 @@ abstract class FileStatusCache { * * @param maxSizeInBytes max allowable cache size before entries start getting evicted */ -private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging { +private class SharedInMemoryCache(maxSizeInBytes: Long, cacheTTL: Long) extends Logging { // Opaque object that uniquely identifies a shared cache user private type ClientId = Object @@ -129,11 +132,17 @@ private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging { } } } - CacheBuilder.newBuilder() + + var builder = CacheBuilder.newBuilder() .weigher(weigher) .removalListener(removalListener) .maximumWeight(maxSizeInBytes / weightScale) - .build[(ClientId, Path), Array[FileStatus]]() + + if (cacheTTL > 0) { + builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS) + } + + builder.build[(ClientId, Path), Array[FileStatus]]() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index ea15f1891b006..040996276063b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -21,6 +21,7 @@ import java.io.{File, FileNotFoundException} import java.net.URI import scala.collection.mutable +import scala.concurrent.duration._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path, RawLocalFileSystem, RemoteIterator} @@ -33,7 +34,7 @@ import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.functions.col -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.util.KnownSizeEstimation @@ -488,6 +489,31 @@ class FileIndexSuite extends SharedSparkSession { val fileIndex = new TestInMemoryFileIndex(spark, path) assert(fileIndex.leafFileStatuses.toSeq == statuses) } + + test("expire FileStatusCache if TTL is configured") { + val previousValue = SQLConf.get.getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS) + try { + // using 'SQLConf.get.setConf' instead of 'withSQLConf' to set a static config at runtime + SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, 1L) + + val path = new Path("/dummy_tmp", "abc") + val files = (1 to 3).map(_ => new FileStatus()) + + FileStatusCache.resetForTesting() + val fileStatusCache = FileStatusCache.getOrCreate(spark) + fileStatusCache.putLeafFiles(path, files.toArray) + + // Exactly 3 files are cached. + assert(fileStatusCache.getLeafFiles(path).get.length === 3) + // Wait until the cache expiration. + eventually(timeout(3.seconds)) { + // And the cache is gone. + assert(fileStatusCache.getLeafFiles(path).isEmpty === true) + } + } finally { + SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, previousValue) + } + } } object DeletionRaceFileSystem {