From bbd4ce7c156c76b9d06bca89b71f125d360ac533 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 6 Jul 2015 19:11:47 +0800 Subject: [PATCH 01/10] Add config to enable/disable merging part-files when merging parquet schema. --- .../scala/org/apache/spark/sql/SQLConf.scala | 6 ++++++ .../apache/spark/sql/parquet/newParquet.scala | 14 ++++++++++++- .../spark/sql/parquet/ParquetQuerySuite.scala | 21 +++++++++++++++++++ 3 files changed, 40 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 6005d35f015a9..48389652440de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -233,6 +233,12 @@ private[spark] object SQLConf { "otherwise the schema is picked from the summary file or a random data file " + "if no summary file is available.") + val PARQUET_SCHEMA_SKIP_MERGE_PARTFILES = booleanConf("spark.sql.parquet.skipMergePartFiles", + defaultValue = Some(false), + doc = "When true, we make assumption that all part-files of Parquet are consistent with " + + "summary files and we will ignore them when merging schema. Otherwise, if this is " + + "false, which is the default, we will merge all part-files.") + val PARQUET_BINARY_AS_STRING = booleanConf("spark.sql.parquet.binaryAsString", defaultValue = Some(false), doc = "Some other Parquet-producing systems, in particular Impala and older versions of " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 5ac3e9a44e6fe..afc5b8c82e8c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -119,6 +119,9 @@ private[sql] class ParquetRelation2( .map(_.toBoolean) .getOrElse(sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED)) + private val skipMergePartFiles = + sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_SKIP_MERGE_PARTFILES) + private val maybeMetastoreSchema = parameters .get(ParquetRelation2.METASTORE_SCHEMA) .map(DataType.fromJson(_).asInstanceOf[StructType]) @@ -413,7 +416,16 @@ private[sql] class ParquetRelation2( val filesToTouch = if (shouldMergeSchemas) { // Also includes summary files, 'cause there might be empty partition directories. - (metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq + + // If skipMergePartFiles config is true, we assume that all part-files are the same for + // their schema with summary files, so we ignore them when merging schema. + // If the config is false, which is the default setting, we merge all part-files. + val needMerged: Seq[FileStatus] = if (skipMergePartFiles) { + Seq() + } else { + dataStatuses + } + (metadataStatuses ++ commonMetadataStatuses ++ needMerged).toSeq } else { // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet // don't have this. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index a0a81c4309c0f..24e089b22b7e2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -124,6 +124,27 @@ class ParquetQuerySuiteBase extends QueryTest with ParquetTest { } } + test("Enabling/disabling merging partfiles when merging parquet schema") { + def testSchemaMerging(expectedColumnNumber: Int): Unit = { + withTempDir { dir => + val basePath = dir.getCanonicalPath + sqlContext.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString) + sqlContext.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=2").toString) + assert(sqlContext.read.parquet(basePath).columns.length === expectedColumnNumber) + } + } + + withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true", + SQLConf.PARQUET_SCHEMA_SKIP_MERGE_PARTFILES.key -> "true") { + testSchemaMerging(3) + } + + withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true", + SQLConf.PARQUET_SCHEMA_SKIP_MERGE_PARTFILES.key -> "false") { + testSchemaMerging(3) + } + } + test("Enabling/disabling schema merging") { def testSchemaMerging(expectedColumnNumber: Int): Unit = { withTempDir { dir => From 8bbebcb5799540f7aa147c19c57a73e52dbe8104 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 7 Jul 2015 00:03:52 +0800 Subject: [PATCH 02/10] Figure out how to test the config. --- .../org/apache/spark/sql/parquet/ParquetQuerySuite.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 24e089b22b7e2..4e93e311644e4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -17,11 +17,14 @@ package org.apache.spark.sql.parquet +import java.io.File + import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.types._ import org.apache.spark.sql.{QueryTest, Row, SQLConf} +import org.apache.spark.util.Utils /** * A test suite that tests various Parquet queries. @@ -130,13 +133,16 @@ class ParquetQuerySuiteBase extends QueryTest with ParquetTest { val basePath = dir.getCanonicalPath sqlContext.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString) sqlContext.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=2").toString) + // delete summary files, so if we don't merge part-files, one column will not be included. + Utils.deleteRecursively(new File(basePath + "/foo=1/_metadata")) + Utils.deleteRecursively(new File(basePath + "/foo=1/_common_metadata")) assert(sqlContext.read.parquet(basePath).columns.length === expectedColumnNumber) } } withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true", SQLConf.PARQUET_SCHEMA_SKIP_MERGE_PARTFILES.key -> "true") { - testSchemaMerging(3) + testSchemaMerging(2) } withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true", From 4bdd7e0ce9ad7437cc719649c4408beb36f5b234 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 7 Jul 2015 17:56:54 +0800 Subject: [PATCH 03/10] Don't read footer files if we can skip them. --- .../scala/org/apache/spark/sql/parquet/newParquet.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index afc5b8c82e8c0..bed0aa461b353 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -356,8 +356,13 @@ private[sql] class ParquetRelation2( val conf = SparkHadoopUtil.get.conf val taskSideMetaData = conf.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true) val rawFooters = if (shouldMergeSchemas) { + val leavesToMerge = if (skipMergePartFiles) { + metadataStatuses ++ commonMetadataStatuses + } else { + leaves + } ParquetFileReader.readAllFootersInParallel( - conf, seqAsJavaList(leaves), taskSideMetaData) + conf, seqAsJavaList(leavesToMerge), taskSideMetaData) } else { ParquetFileReader.readAllFootersInParallelUsingSummaryFiles( conf, seqAsJavaList(leaves), taskSideMetaData) From 3b6be5baa44d7d30fb80d1d413cfd5dbd84cc2e2 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 7 Jul 2015 18:12:28 +0800 Subject: [PATCH 04/10] Fix key not found. --- .../main/scala/org/apache/spark/sql/parquet/newParquet.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index bed0aa461b353..a8e15b7b4e2dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -254,7 +254,7 @@ private[sql] class ParquetRelation2( // Create the function to set input paths at the driver side. val setInputPaths = ParquetRelation2.initializeDriverSideJobFunc(inputFiles) _ - val footers = inputFiles.map(f => metadataCache.footers(f.getPath)) + val footers = inputFiles.flatMap(f => metadataCache.footers.get(f.getPath)) Utils.withDummyCallSite(sqlContext.sparkContext) { // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`. From 0e734e09f6f9aadef32ace2cc3f2033d1d80425f Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 7 Jul 2015 18:37:38 +0800 Subject: [PATCH 05/10] Use correct API. --- .../org/apache/spark/sql/parquet/newParquet.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index a8e15b7b4e2dd..8b2d05dae2290 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -254,7 +254,7 @@ private[sql] class ParquetRelation2( // Create the function to set input paths at the driver side. val setInputPaths = ParquetRelation2.initializeDriverSideJobFunc(inputFiles) _ - val footers = inputFiles.flatMap(f => metadataCache.footers.get(f.getPath)) + val footers = inputFiles.map(f => metadataCache.footers(f.getPath)) Utils.withDummyCallSite(sqlContext.sparkContext) { // TODO Stop using `FilteringParquetRowInputFormat` and overriding `getPartition`. @@ -356,13 +356,13 @@ private[sql] class ParquetRelation2( val conf = SparkHadoopUtil.get.conf val taskSideMetaData = conf.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true) val rawFooters = if (shouldMergeSchemas) { - val leavesToMerge = if (skipMergePartFiles) { - metadataStatuses ++ commonMetadataStatuses + if (skipMergePartFiles) { + ParquetFileReader.readAllFootersInParallelUsingSummaryFiles( + conf, seqAsJavaList(leaves), taskSideMetaData) } else { - leaves + ParquetFileReader.readAllFootersInParallel( + conf, seqAsJavaList(leaves), taskSideMetaData) } - ParquetFileReader.readAllFootersInParallel( - conf, seqAsJavaList(leavesToMerge), taskSideMetaData) } else { ParquetFileReader.readAllFootersInParallelUsingSummaryFiles( conf, seqAsJavaList(leaves), taskSideMetaData) From a57be0e90d267b51fffd07dce9c627d59e355183 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 22 Jul 2015 22:16:22 +0800 Subject: [PATCH 06/10] Merge part-files if there are no summary files. --- .../scala/org/apache/spark/sql/SQLConf.scala | 2 +- .../apache/spark/sql/parquet/newParquet.scala | 46 ++++++++++++++++--- .../spark/sql/parquet/ParquetQuerySuite.scala | 8 ++-- 3 files changed, 45 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index d91502f539b0d..fd5724159dac7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -247,7 +247,7 @@ private[spark] object SQLConf { "otherwise the schema is picked from the summary file or a random data file " + "if no summary file is available.") - val PARQUET_SCHEMA_SKIP_MERGE_PARTFILES = booleanConf("spark.sql.parquet.skipMergePartFiles", + val PARQUET_SCHEMA_RESPECT_SUMMARIES = booleanConf("spark.sql.parquet.respectSummaryFiles", defaultValue = Some(false), doc = "When true, we make assumption that all part-files of Parquet are consistent with " + "summary files and we will ignore them when merging schema. Otherwise, if this is " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index c330c2ea0731b..318d049ec0bcd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -42,7 +42,7 @@ import org.apache.spark.rdd.RDD._ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.{SqlNewHadoopPartition, SqlNewHadoopRDD} -import org.apache.spark.sql.execution.datasources.PartitionSpec +import org.apache.spark.sql.execution.datasources.{PartitionSpec, PartitioningUtils} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -123,8 +123,8 @@ private[sql] class ParquetRelation2( .map(_.toBoolean) .getOrElse(sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED)) - private val skipMergePartFiles = - sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_SKIP_MERGE_PARTFILES) + private val mergeRespectSummaries = + sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES) private val maybeMetastoreSchema = parameters .get(ParquetRelation2.METASTORE_SCHEMA) @@ -348,6 +348,28 @@ private[sql] class ParquetRelation2( // Schema of the whole table, including partition columns. var schema: StructType = _ + def filterDataStatusesWithoutSummaries( + leaves: Seq[FileStatus], + dataStatuses: Seq[FileStatus]): Seq[FileStatus] = { + + // Get the partitions that have summary files + val typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled() + val summariesPaths = metadataStatuses.map(_.getPath.getParent()) ++ + commonMetadataStatuses.map(_.getPath.getParent()) + val summariesPartitions = PartitioningUtils.parsePartitions(summariesPaths, + PartitioningUtils.DEFAULT_PARTITION_NAME, typeInference).partitions.toSet + + dataStatuses.filterNot { d => + val part = PartitioningUtils.parsePartitions(Seq(d.getPath.getParent()), + PartitioningUtils.DEFAULT_PARTITION_NAME, typeInference) + if (summariesPartitions.size > 0 && part.partitions.length > 0) { + summariesPartitions.contains(part.partitions(0)) + } else { + false + } + } + } + /** * Refreshes `FileStatus`es, footers, partition spec, and table schema. */ @@ -414,12 +436,24 @@ private[sql] class ParquetRelation2( if (shouldMergeSchemas) { // Also includes summary files, 'cause there might be empty partition directories. - // If skipMergePartFiles config is true, we assume that all part-files are the same for + // If mergeRespectSummaries config is true, we assume that all part-files are the same for // their schema with summary files, so we ignore them when merging schema. // If the config is false, which is the default setting, we merge all part-files. + + // mergeRespectSummaries is useful when dealing with partitioned tables, where each + // partition directory contains its own summary files. + // Basically in this mode, we only need to merge schemas contained in all those summary + // files. For non-partitioned tables, mergeRespectSummaries essentially disables + // shouldMergeSchema because all part-files will be ignored. + // You should enable this configuration ony if you are very sure that all partition + // directories contain the summary files with consistent schema with its part-files. + val needMerged: Seq[FileStatus] = - if (skipMergePartFiles) { - Seq() + if (mergeRespectSummaries) { + // If we want to merge parquet schema and only respect summary files, + // we still need to merge these part-files without summaries files. + filterDataStatusesWithoutSummaries(metadataStatuses ++ + commonMetadataStatuses, dataStatuses) } else { dataStatuses } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index dd13d51f51c02..929a776e744ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -133,7 +133,7 @@ class ParquetQuerySuiteBase extends QueryTest with ParquetTest { val basePath = dir.getCanonicalPath sqlContext.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString) sqlContext.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=2").toString) - // delete summary files, so if we don't merge part-files, one column will not be included. + // delete summary files, we still merge part-files without summary files. Utils.deleteRecursively(new File(basePath + "/foo=1/_metadata")) Utils.deleteRecursively(new File(basePath + "/foo=1/_common_metadata")) assert(sqlContext.read.parquet(basePath).columns.length === expectedColumnNumber) @@ -141,12 +141,12 @@ class ParquetQuerySuiteBase extends QueryTest with ParquetTest { } withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true", - SQLConf.PARQUET_SCHEMA_SKIP_MERGE_PARTFILES.key -> "true") { - testSchemaMerging(2) + SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "true") { + testSchemaMerging(3) } withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true", - SQLConf.PARQUET_SCHEMA_SKIP_MERGE_PARTFILES.key -> "false") { + SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "false") { testSchemaMerging(3) } } From ea8f6e5ed8a05019a94fd4bb5aa48766578ab1dd Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 22 Jul 2015 22:23:44 +0800 Subject: [PATCH 07/10] Correct the code comments. --- .../scala/org/apache/spark/sql/parquet/newParquet.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 318d049ec0bcd..ab2b182a00c94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -442,10 +442,11 @@ private[sql] class ParquetRelation2( // mergeRespectSummaries is useful when dealing with partitioned tables, where each // partition directory contains its own summary files. - // Basically in this mode, we only need to merge schemas contained in all those summary - // files. For non-partitioned tables, mergeRespectSummaries essentially disables - // shouldMergeSchema because all part-files will be ignored. - // You should enable this configuration ony if you are very sure that all partition + // In this mode, we only need to merge schemas contained in all those summary files. + // For non-partitioned tables, or the partition directories that don't contain + // summary files, we still merge their part-files because it is possible their schemas + // are different with other summary files. + // You should enable this configuration only if you are very sure that all partition // directories contain the summary files with consistent schema with its part-files. val needMerged: Seq[FileStatus] = From 4eb2f007f0caf2c8481c99b357a91f9d8624dc89 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 24 Jul 2015 17:20:39 +0800 Subject: [PATCH 08/10] Use given parameter. --- .../main/scala/org/apache/spark/sql/parquet/newParquet.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index ab2b182a00c94..0f4842d179777 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -354,8 +354,7 @@ private[sql] class ParquetRelation2( // Get the partitions that have summary files val typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled() - val summariesPaths = metadataStatuses.map(_.getPath.getParent()) ++ - commonMetadataStatuses.map(_.getPath.getParent()) + val summariesPaths = leaves.map(_.getPath.getParent()) val summariesPartitions = PartitioningUtils.parsePartitions(summariesPaths, PartitioningUtils.DEFAULT_PARTITION_NAME, typeInference).partitions.toSet From df430274cc1acf574d878dd91ae81e9a6738c8b9 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 24 Jul 2015 18:43:00 +0800 Subject: [PATCH 09/10] Get dataStatuses' partitions based on all paths. --- .../apache/spark/sql/parquet/newParquet.scala | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 0f4842d179777..a7278a4bf3435 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -352,20 +352,22 @@ private[sql] class ParquetRelation2( leaves: Seq[FileStatus], dataStatuses: Seq[FileStatus]): Seq[FileStatus] = { - // Get the partitions that have summary files + // Get the paths that have summary files + val directoriesWithSummaries = leaves.map(_.getPath.getParent.toString).toSet + val typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled() - val summariesPaths = leaves.map(_.getPath.getParent()) - val summariesPartitions = PartitioningUtils.parsePartitions(summariesPaths, - PartitioningUtils.DEFAULT_PARTITION_NAME, typeInference).partitions.toSet - - dataStatuses.filterNot { d => - val part = PartitioningUtils.parsePartitions(Seq(d.getPath.getParent()), - PartitioningUtils.DEFAULT_PARTITION_NAME, typeInference) - if (summariesPartitions.size > 0 && part.partitions.length > 0) { - summariesPartitions.contains(part.partitions(0)) - } else { - false + val dataPaths = dataStatuses.map(_.getPath.getParent) + val dataPathsWithoutSummaries = PartitioningUtils.parsePartitions(dataPaths, + PartitioningUtils.DEFAULT_PARTITION_NAME, typeInference).partitions + .map(_.path).filterNot(p => directoriesWithSummaries.contains(p)).toSet + + if (dataPathsWithoutSummaries.size > 0) { + dataStatuses.filter { d => + val path = d.getPath.getParent.toString + dataPathsWithoutSummaries.contains(path) } + } else { + dataStatuses } } From 8816f445a1f69bd2b188fc1567ef6111eaea4263 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 30 Jul 2015 11:37:14 +0800 Subject: [PATCH 10/10] For comments. --- .../scala/org/apache/spark/sql/SQLConf.scala | 3 +- .../spark/sql/parquet/ParquetRelation.scala | 39 ++----------------- .../spark/sql/parquet/ParquetQuerySuite.scala | 4 +- 3 files changed, 8 insertions(+), 38 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 0eca45f54c38a..2564bbd2077bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -251,7 +251,8 @@ private[spark] object SQLConf { defaultValue = Some(false), doc = "When true, we make assumption that all part-files of Parquet are consistent with " + "summary files and we will ignore them when merging schema. Otherwise, if this is " + - "false, which is the default, we will merge all part-files.") + "false, which is the default, we will merge all part-files. This should be considered " + + "as expert-only option, and shouldn't be enabled before knowing what it means exactly.") val PARQUET_BINARY_AS_STRING = booleanConf("spark.sql.parquet.binaryAsString", defaultValue = Some(false), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index e012b065a8fc2..5f175352adcc6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -333,28 +333,6 @@ private[sql] class ParquetRelation( } } - def filterDataStatusesWithoutSummaries( - leaves: Seq[FileStatus], - dataStatuses: Seq[FileStatus]): Seq[FileStatus] = { - // Get the paths that have summary files - val directoriesWithSummaries = leaves.map(_.getPath.getParent.toString).toSet - - val typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled() - val dataPaths = dataStatuses.map(_.getPath.getParent) - val dataPathsWithoutSummaries = PartitioningUtils.parsePartitions(dataPaths, - PartitioningUtils.DEFAULT_PARTITION_NAME, typeInference).partitions - .map(_.path).filterNot(p => directoriesWithSummaries.contains(p)).toSet - - if (dataPathsWithoutSummaries.size > 0) { - dataStatuses.filter { d => - val path = d.getPath.getParent.toString - dataPathsWithoutSummaries.contains(path) - } - } else { - dataStatuses - } - } - private class MetadataCache { // `FileStatus` objects of all "_metadata" files. private var metadataStatuses: Array[FileStatus] = _ @@ -450,23 +428,14 @@ private[sql] class ParquetRelation( // If mergeRespectSummaries config is true, we assume that all part-files are the same for // their schema with summary files, so we ignore them when merging schema. - // If the config is false, which is the default setting, we merge all part-files. - - // mergeRespectSummaries is useful when dealing with partitioned tables, where each - // partition directory contains its own summary files. + // If the config is disabled, which is the default setting, we merge all part-files. // In this mode, we only need to merge schemas contained in all those summary files. - // For non-partitioned tables, or the partition directories that don't contain - // summary files, we still merge their part-files because it is possible their schemas - // are different with other summary files. - // You should enable this configuration only if you are very sure that all partition - // directories contain the summary files with consistent schema with its part-files. + // You should enable this configuration only if you are very sure that for the parquet + // part-files to read there are corresponding summary files containing correct schema. val needMerged: Seq[FileStatus] = if (mergeRespectSummaries) { - // If we want to merge parquet schema and only respect summary files, - // we still need to merge these part-files without summaries files. - filterDataStatusesWithoutSummaries(metadataStatuses ++ - commonMetadataStatuses, dataStatuses) + Seq() } else { dataStatuses } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 48b43d67a7714..a95f70f2bba69 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -132,7 +132,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest { val basePath = dir.getCanonicalPath sqlContext.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString) sqlContext.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=2").toString) - // delete summary files, we still merge part-files without summary files. + // delete summary files, so if we don't merge part-files, one column will not be included. Utils.deleteRecursively(new File(basePath + "/foo=1/_metadata")) Utils.deleteRecursively(new File(basePath + "/foo=1/_common_metadata")) assert(sqlContext.read.parquet(basePath).columns.length === expectedColumnNumber) @@ -141,7 +141,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest { withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true", SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "true") { - testSchemaMerging(3) + testSchemaMerging(2) } withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",