From b49665d16be723a7abe9fdfa9ea600bd7be349df Mon Sep 17 00:00:00 2001 From: jinxing Date: Thu, 10 May 2018 16:00:24 +0800 Subject: [PATCH 1/2] [SPARK-24240] Add a config to control whether InMemoryFileIndex should update cache when refresh. --- .../apache/spark/sql/internal/SQLConf.scala | 11 ++++++++++ .../datasources/InMemoryFileIndex.scala | 20 ++++++++++++++++++- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3729bd5293eca..8b1d59b6b5fa1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1238,6 +1238,14 @@ object SQLConf { .booleanConf .createWithDefault(true) + val UPDATE_CACHE_WHEN_REFRESH_MEMORY_FILE_INDEX = + buildConf("spark.sql.updateCacheWhenRefreshMemoryFileIndex") + .internal() + .doc("When true, InMemoryFileIndex will update cache when refresh," + + " otherwise only mark the cache as outdated.") + .booleanConf + .createWithDefault(true) + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" } @@ -1406,6 +1414,9 @@ class SQLConf extends Serializable with Logging { def sortBeforeRepartition: Boolean = getConf(SORT_BEFORE_REPARTITION) + def updateCacheWhenRefreshMemoryFileIndex: Boolean = + getConf(UPDATE_CACHE_WHEN_REFRESH_MEMORY_FILE_INDEX) + /** * Returns the [[Resolver]] for the current configuration, which can be used to determine if two * identifiers are equal. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 739d1f456e3ec..bda54177dab67 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -60,6 +60,7 @@ class InMemoryFileIndex( override val rootPaths = rootPathsSpecified.filterNot(FileStreamSink.ancestorIsMetadataDirectory(_, hadoopConf)) + @volatile private var cacheOutDated: Boolean = true @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _ @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _ @volatile private var cachedPartitionSpec: PartitionSpec = _ @@ -67,6 +68,7 @@ class InMemoryFileIndex( refresh0() override def partitionSpec(): PartitionSpec = { + refreshIfCacheOutdated() if (cachedPartitionSpec == null) { cachedPartitionSpec = inferPartitioning() } @@ -75,16 +77,31 @@ class InMemoryFileIndex( } override protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = { + refreshIfCacheOutdated() cachedLeafFiles } override protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = { + refreshIfCacheOutdated() cachedLeafDirToChildrenFiles } override def refresh(): Unit = { fileStatusCache.invalidateAll() - refresh0() + invalidateCache() + if (sparkSession.sessionState.conf.updateCacheWhenRefreshMemoryFileIndex) { + refresh0() + } + } + + private def invalidateCache(): Unit = { + cacheOutDated = true + } + + private def refreshIfCacheOutdated(): Unit = { + if (cacheOutDated) { + refresh0() + } } private def refresh0(): Unit = { @@ -93,6 +110,7 @@ class InMemoryFileIndex( new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f) cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent) cachedPartitionSpec = null + cacheOutDated = false } override def equals(other: Any): Boolean = other match { From 5cabfd6d9d5189b3669cf25f0ade5e8df9029d9d Mon Sep 17 00:00:00 2001 From: jinxing Date: Tue, 15 May 2018 17:36:44 +0800 Subject: [PATCH 2/2] add a test --- .../datasources/InMemoryFileIndex.scala | 8 ++++++- .../datasources/FileIndexSuite.scala | 23 +++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index bda54177dab67..68bf33114865b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -104,7 +104,13 @@ class InMemoryFileIndex( } } - private def refresh0(): Unit = { + // Exposed for testing. + def ifCacheOutDated(): Boolean = { + cacheOutDated + } + + // Exposed for testing. + protected def refresh0(): Unit = { val files = listLeafFiles(rootPaths) cachedLeafFiles = new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 18bb4bfe661ce..0c8bf4f8d8421 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources import java.io.File import java.net.URI +import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable import scala.language.reflectiveCalls @@ -248,6 +249,28 @@ class FileIndexSuite extends SharedSQLContext { assert(spark.read.parquet(path.getAbsolutePath).schema.exists(_.name == colToUnescape)) } } + + test("Mark cache as outdated when spark.sql.updateCacheWhenRefreshMemoryFileIndex is false") { + withSQLConf("spark.sql.updateCacheWhenRefreshMemoryFileIndex" -> "false") { + withTempDir { dir => + var refreshedTimes = 0 + val catalog = + new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath())), Map.empty, None) { + override protected def refresh0(): Unit = { + super.refresh0() + refreshedTimes += 1 + } + } + assert(refreshedTimes === 1) + catalog.refresh() + assert(refreshedTimes === 1) + assert(catalog.ifCacheOutDated()) + catalog.allFiles() + assert(!catalog.ifCacheOutDated()) + assert(refreshedTimes === 2) + } + } + } } class FakeParentPathFileSystem extends RawLocalFileSystem {