Skip to content

Commit

Permalink
Backport [SPARK-30616][SQL] Introduce TTL config option for SQL Metad…
Browse files Browse the repository at this point in the history
…ata Cache (#14)
  • Loading branch information
Luan, Xuedong authored and mingmwang committed Aug 13, 2020
1 parent 9a22915 commit ad6d800
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.catalog
import java.net.URI
import java.util.{Locale, UUID}
import java.util.concurrent.Callable
import java.util.concurrent.TimeUnit
import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable
Expand Down Expand Up @@ -147,7 +148,16 @@ class SessionCatalog(

private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
val cacheSize = conf.tableRelationCacheSize
CacheBuilder.newBuilder().maximumSize(cacheSize).build[QualifiedTableName, LogicalPlan]()
val cacheTTL = conf.metadataCacheTTL

var builder = CacheBuilder.newBuilder()
.maximumSize(cacheSize)

if (cacheTTL > 0) {
builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS)
}

builder.build[QualifiedTableName, LogicalPlan]()
}

/** This method provides a way to get a cached plan. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3166,6 +3166,8 @@ class SQLConf extends Serializable with Logging {
def maxNumberForTemporaryTablesPerSession: Long =
getConf(StaticSQLConf.TEMPORARY_TABLE_MAX_NUM_PER_SESSION)

def metadataCacheTTL: Long = getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.internal

import java.util.Locale
import java.util.concurrent.TimeUnit

import org.apache.spark.network.util.ByteUnit
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -233,6 +234,18 @@ object StaticSQLConf {
.intConf
.createWithDefault(100)

val METADATA_CACHE_TTL_SECONDS = buildStaticConf("spark.sql.metadataCacheTTLSeconds")
.doc("Time-to-live (TTL) value for the metadata caches: partition file metadata cache and " +
"session catalog cache. This configuration only has an effect when this value having " +
"a positive value (> 0). It also requires setting " +
s"'${StaticSQLConf.CATALOG_IMPLEMENTATION.key}' to `hive`, setting " +
s"'${SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE.key}' > 0 and setting " +
s"'${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key}' to `true` " +
"to be applied to the partition file metadata cache.")
.version("3.1.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefault(-1)

val SPARK_SCRATCH_DIR =
buildStaticConf("spark.scratchdir")
.doc("Scratch space for Spark temporary table and so on. Similar with hive.exec.scratchdir")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@

package org.apache.spark.sql.catalyst.catalog

import scala.concurrent.duration._

import org.scalatest.concurrent.Eventually

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View}
import org.apache.spark.sql.catalyst.plans.logical.{Command, Range, SubqueryAlias, View}
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types._

class InMemorySessionCatalogSuite extends SessionCatalogSuite {
Expand All @@ -45,7 +49,7 @@ class InMemorySessionCatalogSuite extends SessionCatalogSuite {
* signatures but do not extend a common parent. This is largely by design but
* unfortunately leads to very similar test code in two places.
*/
abstract class SessionCatalogSuite extends AnalysisTest {
abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
protected val utils: CatalogTestUtils

protected val isHiveExternalCatalog = false
Expand All @@ -70,6 +74,16 @@ abstract class SessionCatalogSuite extends AnalysisTest {
catalog.reset()
}
}

private def withConfAndEmptyCatalog(conf: SQLConf)(f: SessionCatalog => Unit): Unit = {
val catalog = new SessionCatalog(newEmptyCatalog(), new SimpleFunctionRegistry(), conf)
catalog.createDatabase(newDb("default"), ignoreIfExists = true)
try {
f(catalog)
} finally {
catalog.reset()
}
}
// --------------------------------------------------------------------------
// Databases
// --------------------------------------------------------------------------
Expand Down Expand Up @@ -1641,4 +1655,27 @@ abstract class SessionCatalogSuite extends AnalysisTest {
assert(cause.cause.get.getMessage.contains("Actual error"))
}
}

test("expire table relation cache if TTL is configured") {
case class TestCommand() extends Command

val conf = new SQLConf()
conf.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, 1L)

withConfAndEmptyCatalog(conf) { catalog =>
val table = QualifiedTableName(catalog.getCurrentDatabase, "test")

// First, make sure the test table is not cached.
assert(catalog.getCachedTable(table) === null)

catalog.cacheTable(table, TestCommand())
assert(catalog.getCachedTable(table) !== null)

// Wait until the cache expiration.
eventually(timeout(3.seconds)) {
// And the cache is gone.
assert(catalog.getCachedTable(table) === null)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution.datasources

import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -44,7 +45,9 @@ object FileStatusCache {
session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) {
if (sharedCache == null) {
sharedCache = new SharedInMemoryCache(
session.sqlContext.conf.filesourcePartitionFileCacheSize)
session.sqlContext.conf.filesourcePartitionFileCacheSize,
session.sqlContext.conf.metadataCacheTTL
)
}
sharedCache.createForNewClient()
} else {
Expand Down Expand Up @@ -89,7 +92,7 @@ abstract class FileStatusCache {
*
* @param maxSizeInBytes max allowable cache size before entries start getting evicted
*/
private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging {
private class SharedInMemoryCache(maxSizeInBytes: Long, cacheTTL: Long) extends Logging {

// Opaque object that uniquely identifies a shared cache user
private type ClientId = Object
Expand Down Expand Up @@ -129,11 +132,17 @@ private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging {
}
}
}
CacheBuilder.newBuilder()

var builder = CacheBuilder.newBuilder()
.weigher(weigher)
.removalListener(removalListener)
.maximumWeight(maxSizeInBytes / weightScale)
.build[(ClientId, Path), Array[FileStatus]]()

if (cacheTTL > 0) {
builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS)
}

builder.build[(ClientId, Path), Array[FileStatus]]()
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.{File, FileNotFoundException}
import java.net.URI

import scala.collection.mutable
import scala.concurrent.duration._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path, RawLocalFileSystem, RemoteIterator}
Expand All @@ -33,7 +34,7 @@ import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.util.KnownSizeEstimation
Expand Down Expand Up @@ -488,6 +489,31 @@ class FileIndexSuite extends SharedSparkSession {
val fileIndex = new TestInMemoryFileIndex(spark, path)
assert(fileIndex.leafFileStatuses.toSeq == statuses)
}

test("expire FileStatusCache if TTL is configured") {
val previousValue = SQLConf.get.getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS)
try {
// using 'SQLConf.get.setConf' instead of 'withSQLConf' to set a static config at runtime
SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, 1L)

val path = new Path("/dummy_tmp", "abc")
val files = (1 to 3).map(_ => new FileStatus())

FileStatusCache.resetForTesting()
val fileStatusCache = FileStatusCache.getOrCreate(spark)
fileStatusCache.putLeafFiles(path, files.toArray)

// Exactly 3 files are cached.
assert(fileStatusCache.getLeafFiles(path).get.length === 3)
// Wait until the cache expiration.
eventually(timeout(3.seconds)) {
// And the cache is gone.
assert(fileStatusCache.getLeafFiles(path).isEmpty === true)
}
} finally {
SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, previousValue)
}
}
}

object DeletionRaceFileSystem {
Expand Down

0 comments on commit ad6d800

Please sign in to comment.