From 132d12ee1457c41a0bec56516ab5a41d36d8ac1f Mon Sep 17 00:00:00 2001 From: xuanyuanking Date: Tue, 20 Dec 2016 18:50:03 +0800 Subject: [PATCH 1/3] SPARK-18700: Add StripedLock for each table's relation in cache --- .../spark/metrics/source/StaticSources.scala | 26 ++++ .../datasources/fileSourceInterfaces.scala | 2 + .../spark/sql/hive/HiveMetastoreCatalog.scala | 147 ++++++++++-------- .../sql/hive/HiveMetadataCacheSuite.scala | 46 ++++++ 4 files changed, 156 insertions(+), 65 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala index 6bba259acc391..1b1094cf0e736 100644 --- a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala +++ b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala @@ -60,3 +60,29 @@ object CodegenMetrics extends Source { val METRIC_GENERATED_METHOD_BYTECODE_SIZE = metricRegistry.histogram(MetricRegistry.name("generatedMethodSize")) } + +/** + * :: Experimental :: + * Metrics for access to the hive external catalog. + */ +@Experimental +object HiveCatalogMetrics extends Source { + override val sourceName: String = "HiveExternalCatalog" + override val metricRegistry: MetricRegistry = new MetricRegistry() + + /** + * Tracks the total number of Spark jobs launched for parallel file listing. + */ + val METRIC_PARALLEL_LISTING_JOB_COUNT = metricRegistry.counter( + MetricRegistry.name("parallelListingJobCount")) + + /** + * Resets the values of all metrics to zero. This is useful in tests. + */ + def reset(): Unit = { + METRIC_PARALLEL_LISTING_JOB_COUNT.dec(METRIC_PARALLEL_LISTING_JOB_COUNT.getCount()) + } + + // clients can use these to avoid classloader issues with the codahale classes + def incrementParallelListingJobCount(n: Int): Unit = METRIC_PARALLEL_LISTING_JOB_COUNT.inc(n) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index ea614e55b540a..6d44f696e5bd2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -28,6 +28,7 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.annotation.Experimental import org.apache.spark.internal.Logging +import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions._ @@ -443,6 +444,7 @@ object HadoopFsRelation extends Logging { ignoreFileNotFound: Boolean): mutable.LinkedHashSet[FileStatus] = { assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}") + HiveCatalogMetrics.incrementParallelListingJobCount(1) val sparkContext = sparkSession.sparkContext val serializableConfiguration = new SerializableConfiguration(hadoopConf) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index e7d1ed34f5abb..670400f8a973c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive import scala.collection.JavaConverters._ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} +import com.google.common.util.concurrent.Striped import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging @@ -65,6 +66,18 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log t.identifier.table.toLowerCase) } + /** These locks guard against multiple attempts to instantiate a table, which wastes memory. */ + private val tableCreationLocks = Striped.lazyWeakLock(100) + + /** Acquires a lock on the table cache for the duration of `f`. */ + private def withTableCreationLock[A](tableName: QualifiedTableName, f: => A): A = { + val lock = tableCreationLocks.get(tableName) + lock.lock() + try f finally { + lock.unlock() + } + } + /** A cache of Spark SQL data source tables that have been accessed. */ protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, LogicalPlan] = { val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() { @@ -274,77 +287,81 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log partitionPaths } - val cached = getCached( - tableIdentifier, - paths, - metastoreRelation, - metastoreSchema, - fileFormatClass, - bucketSpec, - Some(partitionSpec)) - - val hadoopFsRelation = cached.getOrElse { - val fileCatalog = new MetaStorePartitionedTableFileCatalog( - sparkSession, - new Path(metastoreRelation.catalogTable.storage.locationUri.get), - partitionSpec) - - val inferredSchema = if (fileType.equals("parquet")) { - val inferredSchema = - defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()) - inferredSchema.map { inferred => - ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred) - }.getOrElse(metastoreSchema) - } else { - defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()).get - } + withTableCreationLock(tableIdentifier, { + val cached = getCached( + tableIdentifier, + paths, + metastoreRelation, + metastoreSchema, + fileFormatClass, + bucketSpec, + Some(partitionSpec)) + + val hadoopFsRelation = cached.getOrElse { + val fileCatalog = new MetaStorePartitionedTableFileCatalog( + sparkSession, + new Path(metastoreRelation.catalogTable.storage.locationUri.get), + partitionSpec) + + val inferredSchema = if (fileType.equals("parquet")) { + val inferredSchema = + defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()) + inferredSchema.map { inferred => + ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred) + }.getOrElse(metastoreSchema) + } else { + defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()).get + } - val relation = HadoopFsRelation( - location = fileCatalog, - partitionSchema = partitionSchema, - dataSchema = inferredSchema, - bucketSpec = bucketSpec, - fileFormat = defaultSource, - options = options)(sparkSession = sparkSession) - - val created = LogicalRelation( - relation, - metastoreTableIdentifier = - Some(TableIdentifier(tableIdentifier.name, Some(tableIdentifier.database)))) - cachedDataSourceTables.put(tableIdentifier, created) - created - } + val relation = HadoopFsRelation( + location = fileCatalog, + partitionSchema = partitionSchema, + dataSchema = inferredSchema, + bucketSpec = bucketSpec, + fileFormat = defaultSource, + options = options)(sparkSession = sparkSession) + + val created = LogicalRelation( + relation, + metastoreTableIdentifier = + Some(TableIdentifier(tableIdentifier.name, Some(tableIdentifier.database)))) + cachedDataSourceTables.put(tableIdentifier, created) + created + } - hadoopFsRelation + hadoopFsRelation + }) } else { val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) - val cached = getCached(tableIdentifier, - paths, - metastoreRelation, - metastoreSchema, - fileFormatClass, - bucketSpec, - None) - val logicalRelation = cached.getOrElse { - val created = - LogicalRelation( - DataSource( - sparkSession = sparkSession, - paths = paths, - userSpecifiedSchema = Some(metastoreRelation.schema), - bucketSpec = bucketSpec, - options = options, - className = fileType).resolveRelation(), - metastoreTableIdentifier = - Some(TableIdentifier(tableIdentifier.name, Some(tableIdentifier.database)))) - - - cachedDataSourceTables.put(tableIdentifier, created) - created - } + withTableCreationLock(tableIdentifier, { + val cached = getCached(tableIdentifier, + paths, + metastoreRelation, + metastoreSchema, + fileFormatClass, + bucketSpec, + None) + val logicalRelation = cached.getOrElse { + val created = + LogicalRelation( + DataSource( + sparkSession = sparkSession, + paths = paths, + userSpecifiedSchema = Some(metastoreRelation.schema), + bucketSpec = bucketSpec, + options = options, + className = fileType).resolveRelation(), + metastoreTableIdentifier = + Some(TableIdentifier(tableIdentifier.name, Some(tableIdentifier.database)))) + + + cachedDataSourceTables.put(tableIdentifier, created) + created + } - logicalRelation + logicalRelation + }) } result.copy(expectedOutputAttributes = Some(metastoreRelation.output)) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala index 3414f5e0409a1..fd4849a5ec2f7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala @@ -17,9 +17,13 @@ package org.apache.spark.sql.hive +import java.io.File +import java.util.concurrent.{Executors, TimeUnit} + import org.apache.hadoop.fs.Path import org.apache.spark.SparkException +import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.QueryTest import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils @@ -29,6 +33,20 @@ import org.apache.spark.sql.test.SQLTestUtils */ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { + private def setupPartitionedHiveTable(tableName: String, dir: File, scale: Int): Unit = { + spark.range(scale).selectExpr("id as fieldone", "id as partCol1", "id as partCol2").write + .partitionBy("partCol1", "partCol2") + .mode("overwrite") + .parquet(dir.getAbsolutePath) + + spark.sql(s""" + |create external table $tableName (fieldone long) + |partitioned by (partCol1 int, partCol2 int) + |stored as parquet + |location "${dir.getAbsolutePath}"""".stripMargin) + spark.sql(s"msck repair table $tableName") + } + test("SPARK-16337 temporary view refresh") { withTempView("view_refresh") { withTable("view_table") { @@ -59,4 +77,32 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi } } } + + test("SPARK-18700: table loaded only once even when resolved concurrently") { + withTable("test") { + withTempDir { dir => + HiveCatalogMetrics.reset() + assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0) + setupPartitionedHiveTable("test", dir, 50) + // select the table in multi-threads + val executorPool = Executors.newFixedThreadPool(10) + (1 to 10).map(threadId => { + val runnable = new Runnable { + override def run(): Unit = { + spark.sql("select * from test where partCol1 = 999").count() + } + } + executorPool.execute(runnable) + None + }) + executorPool.shutdown() + executorPool.awaitTermination(30, TimeUnit.SECONDS) + // check the cache hit, we use the metric of METRIC_FILES_DISCOVERED and + // METRIC_PARALLEL_LISTING_JOB_COUNT to check this, while the lock take effect, + // only one thread can really do the build, so the listing job count is 2, the other + // one is cache.load func. Also METRIC_FILES_DISCOVERED is $partition_num * 2 + assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 2) + } + } + } } From 80b86646e0f1af8fb99d78aaf3f16dc7e752a99d Mon Sep 17 00:00:00 2001 From: xuanyuanking Date: Tue, 20 Dec 2016 21:52:39 +0800 Subject: [PATCH 2/3] Fix UT test and comments --- .../spark/sql/hive/HiveMetadataCacheSuite.scala | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala index fd4849a5ec2f7..e32e63c6c64c4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala @@ -79,11 +79,11 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi } test("SPARK-18700: table loaded only once even when resolved concurrently") { - withTable("test") { + withTable("test_table") { withTempDir { dir => HiveCatalogMetrics.reset() assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0) - setupPartitionedHiveTable("test", dir, 50) + setupPartitionedHiveTable("test_table", dir, 50) // select the table in multi-threads val executorPool = Executors.newFixedThreadPool(10) (1 to 10).map(threadId => { @@ -97,11 +97,9 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi }) executorPool.shutdown() executorPool.awaitTermination(30, TimeUnit.SECONDS) - // check the cache hit, we use the metric of METRIC_FILES_DISCOVERED and - // METRIC_PARALLEL_LISTING_JOB_COUNT to check this, while the lock take effect, - // only one thread can really do the build, so the listing job count is 2, the other - // one is cache.load func. Also METRIC_FILES_DISCOVERED is $partition_num * 2 - assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 2) + // check the cache hit, we use the metric of METRIC_PARALLEL_LISTING_JOB_COUNT + // to check this, while the lock take effect,only one thread can really do the build. + assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 1) } } } From 8dd01693c5fca8a724fe0e9f1ada0f7bdaf1f5f6 Mon Sep 17 00:00:00 2001 From: xuanyuanking Date: Wed, 21 Dec 2016 12:11:23 +0800 Subject: [PATCH 3/3] Drop the UT test and metrics --- .../spark/metrics/source/StaticSources.scala | 26 ----------- .../datasources/fileSourceInterfaces.scala | 2 - .../sql/hive/HiveMetadataCacheSuite.scala | 44 ------------------- 3 files changed, 72 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala index 1b1094cf0e736..6bba259acc391 100644 --- a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala +++ b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala @@ -60,29 +60,3 @@ object CodegenMetrics extends Source { val METRIC_GENERATED_METHOD_BYTECODE_SIZE = metricRegistry.histogram(MetricRegistry.name("generatedMethodSize")) } - -/** - * :: Experimental :: - * Metrics for access to the hive external catalog. - */ -@Experimental -object HiveCatalogMetrics extends Source { - override val sourceName: String = "HiveExternalCatalog" - override val metricRegistry: MetricRegistry = new MetricRegistry() - - /** - * Tracks the total number of Spark jobs launched for parallel file listing. - */ - val METRIC_PARALLEL_LISTING_JOB_COUNT = metricRegistry.counter( - MetricRegistry.name("parallelListingJobCount")) - - /** - * Resets the values of all metrics to zero. This is useful in tests. - */ - def reset(): Unit = { - METRIC_PARALLEL_LISTING_JOB_COUNT.dec(METRIC_PARALLEL_LISTING_JOB_COUNT.getCount()) - } - - // clients can use these to avoid classloader issues with the codahale classes - def incrementParallelListingJobCount(n: Int): Unit = METRIC_PARALLEL_LISTING_JOB_COUNT.inc(n) -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index 6d44f696e5bd2..ea614e55b540a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -28,7 +28,6 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.annotation.Experimental import org.apache.spark.internal.Logging -import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions._ @@ -444,7 +443,6 @@ object HadoopFsRelation extends Logging { ignoreFileNotFound: Boolean): mutable.LinkedHashSet[FileStatus] = { assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}") - HiveCatalogMetrics.incrementParallelListingJobCount(1) val sparkContext = sparkSession.sparkContext val serializableConfiguration = new SerializableConfiguration(hadoopConf) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala index e32e63c6c64c4..3414f5e0409a1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala @@ -17,13 +17,9 @@ package org.apache.spark.sql.hive -import java.io.File -import java.util.concurrent.{Executors, TimeUnit} - import org.apache.hadoop.fs.Path import org.apache.spark.SparkException -import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.QueryTest import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils @@ -33,20 +29,6 @@ import org.apache.spark.sql.test.SQLTestUtils */ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { - private def setupPartitionedHiveTable(tableName: String, dir: File, scale: Int): Unit = { - spark.range(scale).selectExpr("id as fieldone", "id as partCol1", "id as partCol2").write - .partitionBy("partCol1", "partCol2") - .mode("overwrite") - .parquet(dir.getAbsolutePath) - - spark.sql(s""" - |create external table $tableName (fieldone long) - |partitioned by (partCol1 int, partCol2 int) - |stored as parquet - |location "${dir.getAbsolutePath}"""".stripMargin) - spark.sql(s"msck repair table $tableName") - } - test("SPARK-16337 temporary view refresh") { withTempView("view_refresh") { withTable("view_table") { @@ -77,30 +59,4 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi } } } - - test("SPARK-18700: table loaded only once even when resolved concurrently") { - withTable("test_table") { - withTempDir { dir => - HiveCatalogMetrics.reset() - assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0) - setupPartitionedHiveTable("test_table", dir, 50) - // select the table in multi-threads - val executorPool = Executors.newFixedThreadPool(10) - (1 to 10).map(threadId => { - val runnable = new Runnable { - override def run(): Unit = { - spark.sql("select * from test where partCol1 = 999").count() - } - } - executorPool.execute(runnable) - None - }) - executorPool.shutdown() - executorPool.awaitTermination(30, TimeUnit.SECONDS) - // check the cache hit, we use the metric of METRIC_PARALLEL_LISTING_JOB_COUNT - // to check this, while the lock take effect,only one thread can really do the build. - assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 1) - } - } - } }