From 5015fa0ee08b05a39261bef1ed0b152fedb5029e Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 12 Jun 2016 01:36:15 -0700 Subject: [PATCH 1/4] Filters out metadata files while doing partition discovery --- .../datasources/ListingFileCatalog.scala | 7 +++--- .../PartitioningAwareFileCatalog.scala | 5 +++- .../ParquetPartitionDiscoverySuite.scala | 24 +++++++++++++++++++ 3 files changed, 32 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala index dd3c96a792357..18b0a57be7668 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources import scala.collection.mutable import scala.util.Try -import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path} +import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.sql.SparkSession @@ -83,8 +83,9 @@ class ListingFileCatalog( val statuses: Seq[FileStatus] = paths.flatMap { path => val fs = path.getFileSystem(hadoopConf) logInfo(s"Listing $path on driver") - Try(HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter)). - getOrElse(Array.empty) + Try { + HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter) + }.getOrElse(Array.empty) } mutable.LinkedHashSet(statuses: _*) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala index 406d2e8e81f49..fcf914a8866e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala @@ -96,7 +96,10 @@ abstract class PartitioningAwareFileCatalog( protected def inferPartitioning(): PartitionSpec = { // We use leaf dirs containing data files to discover the schema. - val leafDirs = leafDirToChildrenFiles.keys.toSeq + val leafDirs = leafDirToChildrenFiles.filterNot { + // SPARK-15895: Metadata files like Parquet summary files should not be counted as data files. + case (_, files) => files.forall(_.getPath.getName.startsWith("_")) + }.keys.toSeq partitionSchema match { case Some(userProvidedSchema) if userProvidedSchema.nonEmpty => val spec = PartitioningUtils.parsePartitions( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index e19345529e93b..f7353ec5ea6b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -25,11 +25,13 @@ import scala.collection.mutable.ArrayBuffer import com.google.common.io.Files import org.apache.hadoop.fs.Path +import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitionDirectory => Partition, PartitioningUtils, PartitionSpec} +import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -890,4 +892,26 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha } } } + + test("SPARK-15895 summary files in non-leaf partition directories") { + withTempPath { dir => + val path = dir.getCanonicalPath + + withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") { + spark.range(3).write.parquet(s"$path/p0=0/p1=0") + } + + val p0 = new File(path, "p0=0") + val p1 = new File(p0, "p1=0") + + Files.copy(new File(p1, "_metadata"), new File(p0, "_metadata")) + Files.copy(new File(p1, "_common_metadata"), new File(p0, "_common_metadata")) + + checkAnswer(spark.read.parquet(s"$path"), Seq( + Row(0, 0, 0), + Row(1, 0, 0), + Row(2, 0, 0) + )) + } + } } From 91faf0139f5c06e79bd92f637db078aa8ca0aa09 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 12 Jun 2016 11:15:45 -0700 Subject: [PATCH 2/4] Also excludes dot-files --- .../datasources/PartitioningAwareFileCatalog.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala index fcf914a8866e5..ee68feb3dbcbd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala @@ -96,9 +96,13 @@ abstract class PartitioningAwareFileCatalog( protected def inferPartitioning(): PartitionSpec = { // We use leaf dirs containing data files to discover the schema. - val leafDirs = leafDirToChildrenFiles.filterNot { - // SPARK-15895: Metadata files like Parquet summary files should not be counted as data files. - case (_, files) => files.forall(_.getPath.getName.startsWith("_")) + val leafDirs = leafDirToChildrenFiles.filterNot { case (_, files) => + // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be + // counted as data files, so that they shouldn't participate partition discovery. + files.forall { file => + val name = file.getPath.getName + name.startsWith("_") || name.startsWith(".") + } }.keys.toSeq partitionSchema match { case Some(userProvidedSchema) if userProvidedSchema.nonEmpty => From ee438e466e9b5368f821e5cac580393ecf8921ef Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 13 Jun 2016 13:48:44 -0700 Subject: [PATCH 3/4] Consolidates similar code --- .../datasources/PartitioningAwareFileCatalog.scala | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala index ee68feb3dbcbd..df525a62e43c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala @@ -50,14 +50,14 @@ abstract class PartitioningAwareFileCatalog( override def listFiles(filters: Seq[Expression]): Seq[Partition] = { val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) { - Partition(InternalRow.empty, allFiles().filterNot(_.getPath.getName startsWith "_")) :: Nil + Partition(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil } else { prunePartitions(filters, partitionSpec()).map { case PartitionDirectory(values, path) => val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match { case Some(existingDir) => // Directory has children files in it, return them - existingDir.filterNot(_.getPath.getName.startsWith("_")) + existingDir.filter(f => isDataPath(f.getPath)) case None => // Directory does not exist, or has no children files @@ -99,10 +99,7 @@ abstract class PartitioningAwareFileCatalog( val leafDirs = leafDirToChildrenFiles.filterNot { case (_, files) => // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be // counted as data files, so that they shouldn't participate partition discovery. - files.forall { file => - val name = file.getPath.getName - name.startsWith("_") || name.startsWith(".") - } + files.forall(f => !isDataPath(f.getPath)) }.keys.toSeq partitionSchema match { case Some(userProvidedSchema) if userProvidedSchema.nonEmpty => @@ -204,4 +201,9 @@ abstract class PartitioningAwareFileCatalog( if (leafFiles.contains(qualifiedPath)) qualifiedPath.getParent else qualifiedPath }.toSet } } + + private def isDataPath(path: Path): Boolean = { + val name = path.getName + !(name.startsWith("_") || name.startsWith(".")) + } } From b345d56c38d90295db3957f198a766dbb6e58795 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 14 Jun 2016 10:13:59 -0700 Subject: [PATCH 4/4] Addresses PR comments --- .../datasources/ListingFileCatalog.scala | 2 +- .../PartitioningAwareFileCatalog.scala | 4 ++-- .../ParquetPartitionDiscoverySuite.scala | 20 +++++++++++++++++++ 3 files changed, 23 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala index 18b0a57be7668..ff0adb0befca5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala @@ -85,7 +85,7 @@ class ListingFileCatalog( logInfo(s"Listing $path on driver") Try { HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter) - }.getOrElse(Array.empty) + }.getOrElse(Array.empty[FileStatus]) } mutable.LinkedHashSet(statuses: _*) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala index df525a62e43c3..811e96c99a96d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala @@ -96,10 +96,10 @@ abstract class PartitioningAwareFileCatalog( protected def inferPartitioning(): PartitionSpec = { // We use leaf dirs containing data files to discover the schema. - val leafDirs = leafDirToChildrenFiles.filterNot { case (_, files) => + val leafDirs = leafDirToChildrenFiles.filter { case (_, files) => // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be // counted as data files, so that they shouldn't participate partition discovery. - files.forall(f => !isDataPath(f.getPath)) + files.exists(f => isDataPath(f.getPath)) }.keys.toSeq partitionSchema match { case Some(userProvidedSchema) if userProvidedSchema.nonEmpty => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index f7353ec5ea6b5..133ffedf12812 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -904,8 +904,28 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha val p0 = new File(path, "p0=0") val p1 = new File(p0, "p1=0") + // Builds the following directory layout by: + // + // 1. copying Parquet summary files we just wrote into `p0=0`, and + // 2. touching a dot-file `.dummy` under `p0=0`. + // + // + // +- p0=0 + // |- _metadata + // |- _common_metadata + // |- .dummy + // +- p1=0 + // |- _metadata + // |- _common_metadata + // |- part-00000.parquet + // |- part-00001.parquet + // +- ... + // + // The summary files and the dot-file under `p0=0` should not fail partition discovery. + Files.copy(new File(p1, "_metadata"), new File(p0, "_metadata")) Files.copy(new File(p1, "_common_metadata"), new File(p0, "_common_metadata")) + Files.touch(new File(p0, ".dummy")) checkAnswer(spark.read.parquet(s"$path"), Seq( Row(0, 0, 0),