From 1413744ed250a2135c7beb610302ebb387e07441 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 18 Oct 2016 10:56:33 -0700 Subject: [PATCH 1/2] Revert "Revert "[SPARK-17974] Refactor FileCatalog classes to simplify the inheritance tree"" This reverts commit 1c5a7d7f64993540baa5558be80130ee6911ba3c. --- .../scala/org/apache/spark/sql/Dataset.scala | 2 +- .../sql/execution/DataSourceScanExec.scala | 4 +- .../execution/datasources/FileCatalog.scala | 66 +++++ .../execution/datasources/FileFormat.scala | 61 ----- .../datasources/HadoopFsRelation.scala | 4 +- .../PartitioningAwareFileCatalog.scala | 217 ++++++++++++++++- .../datasources/PartitioningUtils.scala | 12 +- .../datasources/SessionFileCatalog.scala | 225 ------------------ .../datasources/TableFileCatalog.scala | 11 +- .../datasources/FileCatalogSuite.scala | 10 + .../datasources/SessionFileCatalogSuite.scala | 34 --- .../ParquetPartitionDiscoverySuite.scala | 9 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- 13 files changed, 303 insertions(+), 354 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 7dccbbd3f0a5b..073d2b1512b95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.usePrettyExpression import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution} import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView} -import org.apache.spark.sql.execution.datasources.{FileCatalog, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.execution.datasources.json.JacksonGenerator import org.apache.spark.sql.execution.python.EvaluatePython import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 623d2be55dcec..fdd1fa3648251 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -431,7 +431,7 @@ case class FileSourceScanExec( private def createBucketedReadRDD( bucketSpec: BucketSpec, readFile: (PartitionedFile) => Iterator[InternalRow], - selectedPartitions: Seq[Partition], + selectedPartitions: Seq[PartitionDirectory], fsRelation: HadoopFsRelation): RDD[InternalRow] = { logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val bucketed = @@ -463,7 +463,7 @@ case class FileSourceScanExec( */ private def createNonBucketedReadRDD( readFile: (PartitionedFile) => Iterator[InternalRow], - selectedPartitions: Seq[Partition], + selectedPartitions: Seq[PartitionDirectory], fsRelation: HadoopFsRelation): RDD[InternalRow] = { val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala new file mode 100644 index 0000000000000..2bc66ceeebdb4 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.fs._ + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ + +/** + * A collection of data files from a partitioned relation, along with the partition values in the + * form of an [[InternalRow]]. + */ +case class PartitionDirectory(values: InternalRow, files: Seq[FileStatus]) + +/** + * An interface for objects capable of enumerating the root paths of a relation as well as the + * partitions of a relation subject to some pruning expressions. + */ +trait FileCatalog { + + /** + * Returns the list of root input paths from which the catalog will get files. There may be a + * single root path from which partitions are discovered, or individual partitions may be + * specified by each path. + */ + def rootPaths: Seq[Path] + + /** + * Returns all valid files grouped into partitions when the data is partitioned. If the data is + * unpartitioned, this will return a single partition with no partition values. + * + * @param filters The filters used to prune which partitions are returned. These filters must + * only refer to partition columns and this method will only return files + * where these predicates are guaranteed to evaluate to `true`. Thus, these + * filters will not need to be evaluated again on the returned data. + */ + def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] + + /** + * Returns the list of files that will be read when scanning this relation. This call may be + * very expensive for large tables. + */ + def inputFiles: Array[String] + + /** Refresh any cached file listings */ + def refresh(): Unit + + /** Sum of table file sizes, in bytes */ + def sizeInBytes: Long +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index e7239ef91b326..9d153cec731a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -175,64 +175,3 @@ abstract class TextBasedFileFormat extends FileFormat { codec == null || codec.isInstanceOf[SplittableCompressionCodec] } } - -/** - * A collection of data files from a partitioned relation, along with the partition values in the - * form of an [[InternalRow]]. - */ -case class Partition(values: InternalRow, files: Seq[FileStatus]) - -/** - * An interface for objects capable of enumerating the root paths of a relation as well as the - * partitions of a relation subject to some pruning expressions. - */ -trait BasicFileCatalog { - - /** - * Returns the list of root input paths from which the catalog will get files. There may be a - * single root path from which partitions are discovered, or individual partitions may be - * specified by each path. - */ - def rootPaths: Seq[Path] - - /** - * Returns all valid files grouped into partitions when the data is partitioned. If the data is - * unpartitioned, this will return a single partition with no partition values. - * - * @param filters The filters used to prune which partitions are returned. These filters must - * only refer to partition columns and this method will only return files - * where these predicates are guaranteed to evaluate to `true`. Thus, these - * filters will not need to be evaluated again on the returned data. - */ - def listFiles(filters: Seq[Expression]): Seq[Partition] - - /** Returns the list of files that will be read when scanning this relation. */ - def inputFiles: Array[String] - - /** Refresh any cached file listings */ - def refresh(): Unit - - /** Sum of table file sizes, in bytes */ - def sizeInBytes: Long -} - -/** - * A [[BasicFileCatalog]] which can enumerate all of the files comprising a relation and, from - * those, infer the relation's partition specification. - */ -// TODO: Consider a more descriptive, appropriate name which suggests this is a file catalog for -// which it is safe to list all of its files? -trait FileCatalog extends BasicFileCatalog { - - /** Returns the specification of the partitions inferred from the data. */ - def partitionSpec(): PartitionSpec - - /** Returns all the valid files. */ - def allFiles(): Seq[FileStatus] - - /** Returns the list of files that will be read when scanning this relation. */ - override def inputFiles: Array[String] = - allFiles().map(_.getPath.toUri.toString).toArray - - override def sizeInBytes: Long = allFiles().map(_.getLen).sum -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala index db889edf032d6..afad8898089bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.types.StructType * Acts as a container for all of the metadata required to read from a datasource. All discovery, * resolution and merging logic for schemas and partitions has been removed. * - * @param location A [[BasicFileCatalog]] that can enumerate the locations of all the files that + * @param location A [[FileCatalog]] that can enumerate the locations of all the files that * comprise this relation. * @param partitionSchema The schema of the columns (if any) that are used to partition the relation * @param dataSchema The schema of any remaining columns. Note that if any partition columns are @@ -38,7 +38,7 @@ import org.apache.spark.sql.types.StructType * @param options Configuration used when reading / writing data. */ case class HadoopFsRelation( - location: BasicFileCatalog, + location: FileCatalog, partitionSchema: StructType, dataSchema: StructType, bucketSpec: Option[BucketSpec], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala index b2508115c282f..5c8eff7ec46b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala @@ -17,14 +17,21 @@ package org.apache.spark.sql.execution.datasources +import java.io.FileNotFoundException + import scala.collection.mutable -import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs._ +import org.apache.hadoop.mapred.{FileInputFormat, JobConf} +import org.apache.spark.internal.Logging +import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{expressions, InternalRow} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.{StringType, StructType} +import org.apache.spark.util.SerializableConfiguration /** @@ -38,22 +45,24 @@ import org.apache.spark.sql.types.{StringType, StructType} abstract class PartitioningAwareFileCatalog( sparkSession: SparkSession, parameters: Map[String, String], - partitionSchema: Option[StructType]) - extends SessionFileCatalog(sparkSession) with FileCatalog { + partitionSchema: Option[StructType]) extends FileCatalog with Logging { import PartitioningAwareFileCatalog.BASE_PATH_PARAM - override protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters) + /** Returns the specification of the partitions inferred from the data. */ + def partitionSpec(): PartitionSpec + + protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters) protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus] protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] - override def listFiles(filters: Seq[Expression]): Seq[Partition] = { + override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = { val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) { - Partition(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil + PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil } else { prunePartitions(filters, partitionSpec()).map { - case PartitionDirectory(values, path) => + case PartitionPath(values, path) => val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match { case Some(existingDir) => // Directory has children files in it, return them @@ -63,14 +72,20 @@ abstract class PartitioningAwareFileCatalog( // Directory does not exist, or has no children files Nil } - Partition(values, files) + PartitionDirectory(values, files) } } logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t")) selectedPartitions } - override def allFiles(): Seq[FileStatus] = { + /** Returns the list of files that will be read when scanning this relation. */ + override def inputFiles: Array[String] = + allFiles().map(_.getPath.toUri.toString).toArray + + override def sizeInBytes: Long = allFiles().map(_.getLen).sum + + def allFiles(): Seq[FileStatus] = { if (partitionSpec().partitionColumns.isEmpty) { // For each of the root input paths, get the list of files inside them rootPaths.flatMap { path => @@ -139,7 +154,7 @@ abstract class PartitioningAwareFileCatalog( private def prunePartitions( predicates: Seq[Expression], - partitionSpec: PartitionSpec): Seq[PartitionDirectory] = { + partitionSpec: PartitionSpec): Seq[PartitionPath] = { val PartitionSpec(partitionColumns, partitions) = partitionSpec val partitionColumnNames = partitionColumns.map(_.name).toSet val partitionPruningPredicates = predicates.filter { @@ -156,7 +171,7 @@ abstract class PartitioningAwareFileCatalog( }) val selected = partitions.filter { - case PartitionDirectory(values, _) => boundPredicate(values) + case PartitionPath(values, _) => boundPredicate(values) } logInfo { val total = partitions.length @@ -214,8 +229,186 @@ abstract class PartitioningAwareFileCatalog( val name = path.getName !((name.startsWith("_") && !name.contains("=")) || name.startsWith(".")) } + + /** + * List leaf files of given paths. This method will submit a Spark job to do parallel + * listing whenever there is a path having more files than the parallel partition discovery + * discovery threshold. + * + * This is publicly visible for testing. + */ + def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { + val files = + if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { + PartitioningAwareFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession) + } else { + PartitioningAwareFileCatalog.listLeafFilesInSerial(paths, hadoopConf) + } + + HiveCatalogMetrics.incrementFilesDiscovered(files.size) + mutable.LinkedHashSet(files: _*) + } } -object PartitioningAwareFileCatalog { +object PartitioningAwareFileCatalog extends Logging { val BASE_PATH_PARAM = "basePath" + + /** A serializable variant of HDFS's BlockLocation. */ + private case class SerializableBlockLocation( + names: Array[String], + hosts: Array[String], + offset: Long, + length: Long) + + /** A serializable variant of HDFS's FileStatus. */ + private case class SerializableFileStatus( + path: String, + length: Long, + isDir: Boolean, + blockReplication: Short, + blockSize: Long, + modificationTime: Long, + accessTime: Long, + blockLocations: Array[SerializableBlockLocation]) + + /** + * List a collection of path recursively. + */ + private def listLeafFilesInSerial( + paths: Seq[Path], + hadoopConf: Configuration): Seq[FileStatus] = { + // Dummy jobconf to get to the pathFilter defined in configuration + val jobConf = new JobConf(hadoopConf, this.getClass) + val filter = FileInputFormat.getInputPathFilter(jobConf) + + paths.flatMap { path => + val fs = path.getFileSystem(hadoopConf) + listLeafFiles0(fs, path, filter) + } + } + + /** + * List a collection of path recursively in parallel (using Spark executors). + * Each task launched will use [[listLeafFilesInSerial]] to list. + */ + private def listLeafFilesInParallel( + paths: Seq[Path], + hadoopConf: Configuration, + sparkSession: SparkSession): Seq[FileStatus] = { + assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) + logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}") + + val sparkContext = sparkSession.sparkContext + val serializableConfiguration = new SerializableConfiguration(hadoopConf) + val serializedPaths = paths.map(_.toString) + + // Set the number of parallelism to prevent following file listing from generating many tasks + // in case of large #defaultParallelism. + val numParallelism = Math.min(paths.size, 10000) + + val statuses = sparkContext + .parallelize(serializedPaths, numParallelism) + .mapPartitions { paths => + val hadoopConf = serializableConfiguration.value + listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator + }.map { status => + // Turn FileStatus into SerializableFileStatus so we can send it back to the driver + val blockLocations = status match { + case f: LocatedFileStatus => + f.getBlockLocations.map { loc => + SerializableBlockLocation( + loc.getNames, + loc.getHosts, + loc.getOffset, + loc.getLength) + } + + case _ => + Array.empty[SerializableBlockLocation] + } + + SerializableFileStatus( + status.getPath.toString, + status.getLen, + status.isDirectory, + status.getReplication, + status.getBlockSize, + status.getModificationTime, + status.getAccessTime, + blockLocations) + }.collect() + + // Turn SerializableFileStatus back to Status + statuses.map { f => + val blockLocations = f.blockLocations.map { loc => + new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length) + } + new LocatedFileStatus( + new FileStatus( + f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path)), + blockLocations) + } + } + + /** + * List a single path, provided as a FileStatus, in serial. + */ + private def listLeafFiles0( + fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = { + logTrace(s"Listing $path") + val name = path.getName.toLowerCase + if (shouldFilterOut(name)) { + Seq.empty[FileStatus] + } else { + // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist + // Note that statuses only include FileStatus for the files and dirs directly under path, + // and does not include anything else recursively. + val statuses = try fs.listStatus(path) catch { + case _: FileNotFoundException => + logWarning(s"The directory $path was not found. Was it deleted very recently?") + Array.empty[FileStatus] + } + + val allLeafStatuses = { + val (dirs, files) = statuses.partition(_.isDirectory) + val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter)) + if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats + } + + allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map { + case f: LocatedFileStatus => + f + + // NOTE: + // + // - Although S3/S3A/S3N file system can be quite slow for remote file metadata + // operations, calling `getFileBlockLocations` does no harm here since these file system + // implementations don't actually issue RPC for this method. + // + // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not + // be a big deal since we always use to `listLeafFilesInParallel` when the number of + // paths exceeds threshold. + case f => + // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), + // which is very slow on some file system (RawLocalFileSystem, which is launch a + // subprocess and parse the stdout). + val locations = fs.getFileBlockLocations(f, 0, f.getLen) + val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, + f.getModificationTime, 0, null, null, null, null, f.getPath, locations) + if (f.isSymlink) { + lfs.setSymlink(f.getSymlink) + } + lfs + } + } + } + + /** Checks if we should filter out this path name. */ + def shouldFilterOut(pathName: String): Boolean = { + // We filter everything that starts with _ and ., except _common_metadata and _metadata + // because Parquet needs to find those metadata files from leaf files returned by this method. + // We should refactor this logic to not mix metadata files with data files. + ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) && + !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata") + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 504464216e5a4..ac6795b9a2e7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -33,8 +33,8 @@ import org.apache.spark.sql.types._ // TODO: We should tighten up visibility of the classes here once we clean up Hive coupling. -object PartitionDirectory { - def apply(values: InternalRow, path: String): PartitionDirectory = +object PartitionPath { + def apply(values: InternalRow, path: String): PartitionPath = apply(values, new Path(path)) } @@ -42,14 +42,14 @@ object PartitionDirectory { * Holds a directory in a partitioned collection of files as well as as the partition values * in the form of a Row. Before scanning, the files at `path` need to be enumerated. */ -case class PartitionDirectory(values: InternalRow, path: Path) +case class PartitionPath(values: InternalRow, path: Path) case class PartitionSpec( partitionColumns: StructType, - partitions: Seq[PartitionDirectory]) + partitions: Seq[PartitionPath]) object PartitionSpec { - val emptySpec = PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[PartitionDirectory]) + val emptySpec = PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[PartitionPath]) } object PartitioningUtils { @@ -141,7 +141,7 @@ object PartitioningUtils { // Finally, we create `Partition`s based on paths and resolved partition values. val partitions = resolvedPartitionValues.zip(pathsWithPartitionValues).map { case (PartitionValues(_, literals), (path, _)) => - PartitionDirectory(InternalRow.fromSeq(literals.map(_.value)), path) + PartitionPath(InternalRow.fromSeq(literals.map(_.value)), path) } PartitionSpec(StructType(fields), partitions) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala deleted file mode 100644 index 4807a92c2e6b8..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalog.scala +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources - -import java.io.FileNotFoundException - -import scala.collection.mutable - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs._ -import org.apache.hadoop.mapred.{FileInputFormat, JobConf} - -import org.apache.spark.internal.Logging -import org.apache.spark.metrics.source.HiveCatalogMetrics -import org.apache.spark.sql.SparkSession -import org.apache.spark.util.SerializableConfiguration - - -/** - * A base class for [[BasicFileCatalog]]s that need a [[SparkSession]] and the ability to find leaf - * files in a list of HDFS paths. - * - * @param sparkSession a [[SparkSession]] - * @param ignoreFileNotFound (see [[ListingFileCatalog]]) - */ -abstract class SessionFileCatalog(sparkSession: SparkSession) - extends BasicFileCatalog with Logging { - protected val hadoopConf: Configuration - - /** - * List leaf files of given paths. This method will submit a Spark job to do parallel - * listing whenever there is a path having more files than the parallel partition discovery - * discovery threshold. - * - * This is publicly visible for testing. - */ - def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { - val files = - if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { - SessionFileCatalog.listLeafFilesInParallel(paths, hadoopConf, sparkSession) - } else { - SessionFileCatalog.listLeafFilesInSerial(paths, hadoopConf) - } - - HiveCatalogMetrics.incrementFilesDiscovered(files.size) - mutable.LinkedHashSet(files: _*) - } -} - -object SessionFileCatalog extends Logging { - - /** A serializable variant of HDFS's BlockLocation. */ - private case class SerializableBlockLocation( - names: Array[String], - hosts: Array[String], - offset: Long, - length: Long) - - /** A serializable variant of HDFS's FileStatus. */ - private case class SerializableFileStatus( - path: String, - length: Long, - isDir: Boolean, - blockReplication: Short, - blockSize: Long, - modificationTime: Long, - accessTime: Long, - blockLocations: Array[SerializableBlockLocation]) - - /** - * List a collection of path recursively. - */ - private def listLeafFilesInSerial( - paths: Seq[Path], - hadoopConf: Configuration): Seq[FileStatus] = { - // Dummy jobconf to get to the pathFilter defined in configuration - val jobConf = new JobConf(hadoopConf, this.getClass) - val filter = FileInputFormat.getInputPathFilter(jobConf) - - paths.flatMap { path => - val fs = path.getFileSystem(hadoopConf) - listLeafFiles0(fs, path, filter) - } - } - - /** - * List a collection of path recursively in parallel (using Spark executors). - * Each task launched will use [[listLeafFilesInSerial]] to list. - */ - private def listLeafFilesInParallel( - paths: Seq[Path], - hadoopConf: Configuration, - sparkSession: SparkSession): Seq[FileStatus] = { - assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) - logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}") - - val sparkContext = sparkSession.sparkContext - val serializableConfiguration = new SerializableConfiguration(hadoopConf) - val serializedPaths = paths.map(_.toString) - - // Set the number of parallelism to prevent following file listing from generating many tasks - // in case of large #defaultParallelism. - val numParallelism = Math.min(paths.size, 10000) - - val statuses = sparkContext - .parallelize(serializedPaths, numParallelism) - .mapPartitions { paths => - val hadoopConf = serializableConfiguration.value - listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator - }.map { status => - // Turn FileStatus into SerializableFileStatus so we can send it back to the driver - val blockLocations = status match { - case f: LocatedFileStatus => - f.getBlockLocations.map { loc => - SerializableBlockLocation( - loc.getNames, - loc.getHosts, - loc.getOffset, - loc.getLength) - } - - case _ => - Array.empty[SerializableBlockLocation] - } - - SerializableFileStatus( - status.getPath.toString, - status.getLen, - status.isDirectory, - status.getReplication, - status.getBlockSize, - status.getModificationTime, - status.getAccessTime, - blockLocations) - }.collect() - - // Turn SerializableFileStatus back to Status - statuses.map { f => - val blockLocations = f.blockLocations.map { loc => - new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length) - } - new LocatedFileStatus( - new FileStatus( - f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, new Path(f.path)), - blockLocations) - } - } - - /** - * List a single path, provided as a FileStatus, in serial. - */ - private def listLeafFiles0( - fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = { - logTrace(s"Listing $path") - val name = path.getName.toLowerCase - if (shouldFilterOut(name)) { - Seq.empty[FileStatus] - } else { - // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist - // Note that statuses only include FileStatus for the files and dirs directly under path, - // and does not include anything else recursively. - val statuses = try fs.listStatus(path) catch { - case _: FileNotFoundException => - logWarning(s"The directory $path was not found. Was it deleted very recently?") - Array.empty[FileStatus] - } - - val allLeafStatuses = { - val (dirs, files) = statuses.partition(_.isDirectory) - val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter)) - if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats - } - - allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map { - case f: LocatedFileStatus => - f - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not - // be a big deal since we always use to `listLeafFilesInParallel` when the number of - // paths exceeds threshold. - case f => - // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), - // which is very slow on some file system (RawLocalFileSystem, which is launch a - // subprocess and parse the stdout). - val locations = fs.getFileBlockLocations(f, 0, f.getLen) - val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, - f.getModificationTime, 0, null, null, null, null, f.getPath, locations) - if (f.isSymlink) { - lfs.setSymlink(f.getSymlink) - } - lfs - } - } - } - - /** Checks if we should filter out this path name. */ - def shouldFilterOut(pathName: String): Boolean = { - // We filter everything that starts with _ and ., except _common_metadata and _metadata - // because Parquet needs to find those metadata files from leaf files returned by this method. - // We should refactor this logic to not mix metadata files with data files. - ((pathName.startsWith("_") && !pathName.contains("=")) || pathName.startsWith(".")) && - !pathName.startsWith("_common_metadata") && !pathName.startsWith("_metadata") - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala index a5c41b244589b..5648ab480a98a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.types.StructType /** - * A [[BasicFileCatalog]] for a metastore catalog table. + * A [[FileCatalog]] for a metastore catalog table. * * @param sparkSession a [[SparkSession]] * @param db the table's database name @@ -38,10 +38,9 @@ class TableFileCatalog( db: String, table: String, partitionSchema: Option[StructType], - override val sizeInBytes: Long) - extends SessionFileCatalog(sparkSession) { + override val sizeInBytes: Long) extends FileCatalog { - override protected val hadoopConf = sparkSession.sessionState.newHadoopConf + protected val hadoopConf = sparkSession.sessionState.newHadoopConf private val externalCatalog = sparkSession.sharedState.externalCatalog @@ -51,7 +50,7 @@ class TableFileCatalog( override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq - override def listFiles(filters: Seq[Expression]): Seq[Partition] = { + override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = { filterPartitions(filters).listFiles(Nil) } @@ -79,7 +78,7 @@ class TableFileCatalog( case Some(schema) => val selectedPartitions = externalCatalog.listPartitionsByFilter(db, table, filters) val partitions = selectedPartitions.map { p => - PartitionDirectory(p.toRow(schema), p.storage.locationUri.get) + PartitionPath(p.toRow(schema), p.storage.locationUri.get) } val partitionSpec = PartitionSpec(schema, partitions) new PrunedTableFileCatalog( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala index 2695974b84b00..9c43169cbf898 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala @@ -81,6 +81,16 @@ class FileCatalogSuite extends SharedSQLContext { } } + test("PartitioningAwareFileCatalog - file filtering") { + assert(!PartitioningAwareFileCatalog.shouldFilterOut("abcd")) + assert(PartitioningAwareFileCatalog.shouldFilterOut(".ab")) + assert(PartitioningAwareFileCatalog.shouldFilterOut("_cd")) + assert(!PartitioningAwareFileCatalog.shouldFilterOut("_metadata")) + assert(!PartitioningAwareFileCatalog.shouldFilterOut("_common_metadata")) + assert(PartitioningAwareFileCatalog.shouldFilterOut("_ab_metadata")) + assert(PartitioningAwareFileCatalog.shouldFilterOut("_cd_common_metadata")) + } + test("SPARK-17613 - PartitioningAwareFileCatalog: base path w/o '/' at end") { class MockCatalog( override val rootPaths: Seq[Path]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala deleted file mode 100644 index df509583377ae..0000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SessionFileCatalogSuite.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources - -import org.apache.spark.SparkFunSuite - -class SessionFileCatalogSuite extends SparkFunSuite { - - test("file filtering") { - assert(!SessionFileCatalog.shouldFilterOut("abcd")) - assert(SessionFileCatalog.shouldFilterOut(".ab")) - assert(SessionFileCatalog.shouldFilterOut("_cd")) - - assert(!SessionFileCatalog.shouldFilterOut("_metadata")) - assert(!SessionFileCatalog.shouldFilterOut("_common_metadata")) - assert(SessionFileCatalog.shouldFilterOut("_ab_metadata")) - assert(SessionFileCatalog.shouldFilterOut("_cd_common_metadata")) - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 43357c97c395a..36d4df0015ffd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -30,7 +30,7 @@ import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.execution.datasources.{FileCatalog, HadoopFsRelation, LogicalRelation, PartitionDirectory => Partition, PartitioningUtils, PartitionSpec} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitionPath => Partition, PartitioningAwareFileCatalog, PartitioningUtils, PartitionSpec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -626,10 +626,11 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha (1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath) val queryExecution = spark.read.parquet(dir.getCanonicalPath).queryExecution queryExecution.analyzed.collectFirst { - case LogicalRelation(HadoopFsRelation(location: FileCatalog, _, _, _, _, _), _, _) => - assert(location.partitionSpec === PartitionSpec.emptySpec) + case LogicalRelation( + HadoopFsRelation(location: PartitioningAwareFileCatalog, _, _, _, _, _), _, _) => + assert(location.partitionSpec() === PartitionSpec.emptySpec) }.getOrElse { - fail(s"Expecting a ParquetRelation2, but got:\n$queryExecution") + fail(s"Expecting a matching HadoopFsRelation, but got:\n$queryExecution") } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 4a2aaa7d4f6ca..16e1e37b2fb02 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.execution.datasources.{Partition => _, _} +import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} import org.apache.spark.sql.hive.orc.OrcFileFormat import org.apache.spark.sql.types._ From 699bce560ef77c2801ba8a7e28538f49581f98fe Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 18 Oct 2016 11:19:34 -0700 Subject: [PATCH 2/2] fix it --- .../datasources/parquet/ParquetPartitionDiscoverySuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 36d4df0015ffd..7c50e33a648d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -30,7 +30,8 @@ import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitionPath => Partition, PartitioningAwareFileCatalog, PartitioningUtils, PartitionSpec} +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.{PartitionPath => Partition} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext