From b40586138d662ef9bd68928ec2abcb16f3127c78 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Mon, 4 Jul 2016 11:13:34 +0900 Subject: [PATCH] Add a new interface to filter files in FileFormat --- .../ml/source/libsvm/LibSVMRelation.scala | 7 ++- .../datasources/CatalogFileIndex.scala | 4 +- .../execution/datasources/DataSource.scala | 9 ++-- .../execution/datasources/FileFormat.scala | 35 ++++++++++++ .../datasources/InMemoryFileIndex.scala | 3 +- .../PartitioningAwareFileIndex.scala | 41 ++++++-------- .../parquet/ParquetFileFormat.scala | 9 +++- .../streaming/FileStreamSource.scala | 5 +- .../streaming/MetadataLogFileIndex.scala | 4 +- .../datasources/FileFormatSuite.scala | 46 ++++++++++++++++ .../datasources/FileIndexSuite.scala | 19 ++----- .../datasources/FileSourceStrategySuite.scala | 54 ++++++++++++++++++- 12 files changed, 182 insertions(+), 54 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala index f68847a664b69..2a9866e2f2eaa 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala @@ -90,10 +90,9 @@ private[libsvm] class LibSVMFileFormat extends TextBasedFileFormat with DataSour val libSVMOptions = new LibSVMOptions(options) val numFeatures: Int = libSVMOptions.numFeatures.getOrElse { // Infers number of features if the user doesn't specify (a valid) one. - val dataFiles = files.filterNot(_.getPath.getName startsWith "_") - val path = if (dataFiles.length == 1) { - dataFiles.head.getPath.toUri.toString - } else if (dataFiles.isEmpty) { + val path = if (files.length == 1) { + files.head.getPath.toUri.toString + } else if (files.isEmpty) { throw new IOException("No input path specified for libsvm data") } else { throw new IOException("Multiple input paths are not supported for libsvm data.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index db0254f8d5581..7bc20428f8eef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -83,7 +83,8 @@ class CatalogFileIndex( sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec) } else { new InMemoryFileIndex( - sparkSession, rootPaths, table.storage.properties, partitionSchema = None) + sparkSession, rootPaths, table.storage.properties, partitionSchema = None, + PathFilter.defaultPathFilter) } } @@ -117,4 +118,5 @@ private class PrunedInMemoryFileIndex( partitionSpec.partitions.map(_.path), Map.empty, Some(partitionSpec.partitionColumns), + PathFilter.defaultPathFilter, fileStatusCache) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index c9384e44255b8..e541f0e1ce388 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -125,7 +125,8 @@ case class DataSource( val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) SparkHadoopUtil.get.globPathIfNecessary(qualified) }.toArray - new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache) + new InMemoryFileIndex(sparkSession, globbedPaths, options, None, + format.getPathFilter(options), fileStatusCache) } val partitionSchema = if (partitionColumns.isEmpty) { // Try to infer partitioning, because no DataSource in the read path provides the partitioning @@ -316,7 +317,8 @@ case class DataSource( caseInsensitiveOptions.get("path").toSeq ++ paths, sparkSession.sessionState.newHadoopConf()) => val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head) - val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath) + val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath, + format.getPathFilter(options)) val dataSchema = userSpecifiedSchema.orElse { format.inferSchema( sparkSession, @@ -369,7 +371,8 @@ case class DataSource( catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize)) } else { new InMemoryFileIndex( - sparkSession, globbedPaths, options, Some(partitionSchema), fileStatusCache) + sparkSession, globbedPaths, options, Some(partitionSchema), + format.getPathFilter(options), fileStatusCache) } HadoopFsRelation( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index dacf462953520..a1293968675bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -30,6 +30,34 @@ import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType +/** + * A filter class to list up qualified paths in parallel. + */ +abstract class PathFilter extends Serializable { + final def accept(path: Path): Boolean = isDataPath(path) || isMetaDataPath(path) + def isDataPath(path: Path): Boolean = false + def isMetaDataPath(path: Path): Boolean = false +} + +object PathFilter { + + /** A default path filter to pass through all input paths. */ + val defaultPathFilter = new PathFilter { + + override def isDataPath(path: Path): Boolean = { + // We filter follow paths: + // 1. everything that starts with _ and ., except _common_metadata and _metadata + // because Parquet needs to find those metadata files from leaf files returned by this method. + // We should refactor this logic to not mix metadata files with data files. + // 2. everything that ends with `._COPYING_`, because this is a intermediate state of file. we + // should skip this file in case of double reading. + val name = path.getName + !((name.startsWith("_") && !name.contains("=")) || name.startsWith(".") || + name.endsWith("._COPYING_")) + } + } +} + /** * Used to read and write data stored in files to/from the [[InternalRow]] format. */ @@ -74,6 +102,13 @@ trait FileFormat { false } + /** + * Return a serializable `PathFilter` class to filter qualified files for this format. + */ + def getPathFilter(options: Map[String, String]): PathFilter = { + PathFilter.defaultPathFilter + } + /** * Returns a function that can be used to read a single file in as an Iterator of InternalRow. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index ee4d0863d9771..a698d37343068 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -39,9 +39,10 @@ class InMemoryFileIndex( override val rootPaths: Seq[Path], parameters: Map[String, String], partitionSchema: Option[StructType], + pathFilter: PathFilter = PathFilter.defaultPathFilter, fileStatusCache: FileStatusCache = NoopCache) extends PartitioningAwareFileIndex( - sparkSession, parameters, partitionSchema, fileStatusCache) { + sparkSession, parameters, partitionSchema, pathFilter, fileStatusCache) { @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _ @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 71500a010581e..d945d7095df8d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -46,6 +46,7 @@ abstract class PartitioningAwareFileIndex( sparkSession: SparkSession, parameters: Map[String, String], userPartitionSchema: Option[StructType], + pathFilter: PathFilter, fileStatusCache: FileStatusCache = NoopCache) extends FileIndex with Logging { import PartitioningAwareFileIndex.BASE_PATH_PARAM @@ -237,9 +238,14 @@ abstract class PartitioningAwareFileIndex( // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be // counted as data files, so that they shouldn't participate partition discovery. - private def isDataPath(path: Path): Boolean = { - val name = path.getName - !((name.startsWith("_") && !name.contains("=")) || name.startsWith(".")) + private def isDataPath(path: Path): Boolean = pathFilter.isDataPath(path) + + private def mergePathFilter( + filter: PathFilter, + subFilter: Option[org.apache.hadoop.fs.PathFilter]) + : Path => Boolean = subFilter match { + case Some(sf) => (path: Path) => filter.accept(path) && sf.accept(path) + case None => (path: Path) => filter.accept(path) } /** @@ -261,9 +267,9 @@ abstract class PartitioningAwareFileIndex( pathsToFetch += path } } - val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass)) + val hadoopFilter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass)) val discovered = PartitioningAwareFileIndex.bulkListLeafFiles( - pathsToFetch, hadoopConf, filter, sparkSession) + pathsToFetch, hadoopConf, mergePathFilter(pathFilter, Option(hadoopFilter)), sparkSession) discovered.foreach { case (path, leafFiles) => HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size) fileStatusCache.putLeafFiles(path, leafFiles.toArray) @@ -305,7 +311,7 @@ object PartitioningAwareFileIndex extends Logging { private def bulkListLeafFiles( paths: Seq[Path], hadoopConf: Configuration, - filter: PathFilter, + filter: Path => Boolean, sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = { // Short-circuits parallel listing when serial listing is likely to be faster. @@ -392,11 +398,10 @@ object PartitioningAwareFileIndex extends Logging { private def listLeafFiles( path: Path, hadoopConf: Configuration, - filter: PathFilter, + filter: Path => Boolean, sessionOpt: Option[SparkSession]): Seq[FileStatus] = { logTrace(s"Listing $path") val fs = path.getFileSystem(hadoopConf) - val name = path.getName.toLowerCase // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist // Note that statuses only include FileStatus for the files and dirs directly under path, @@ -407,7 +412,7 @@ object PartitioningAwareFileIndex extends Logging { Array.empty[FileStatus] } - val filteredStatuses = statuses.filterNot(status => shouldFilterOut(status.getPath.getName)) + val filteredStatuses = statuses.filter(status => filter(status.getPath)) val allLeafStatuses = { val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory) @@ -418,10 +423,10 @@ object PartitioningAwareFileIndex extends Logging { dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt)) } val allFiles = topLevelFiles ++ nestedFiles - if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles + if (filter != null) allFiles.filter(f => filter(f.getPath)) else allFiles } - allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map { + allLeafStatuses.filter(status => filter(status.getPath)).map { case f: LocatedFileStatus => f @@ -447,18 +452,4 @@ object PartitioningAwareFileIndex extends Logging { lfs } } - - /** Checks if we should filter out this path name. */ - def shouldFilterOut(pathName: String): Boolean = { - // We filter follow paths: - // 1. everything that starts with _ and ., except _common_metadata and _metadata - // because Parquet needs to find those metadata files from leaf files returned by this method. - // We should refactor this logic to not mix metadata files with data files. - // 2. everything that ends with `._COPYING_`, because this is a intermediate state of file. we - // should skip this file in case of double reading. - val exclude = (pathName.startsWith("_") && !pathName.contains("=")) || - pathName.startsWith(".") || pathName.endsWith("._COPYING_") - val include = pathName.startsWith("_common_metadata") || pathName.startsWith("_metadata") - exclude && !include - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 062aa5c8ea624..111dd80787bff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -255,7 +255,7 @@ class ParquetFileFormat private def isSummaryFile(file: Path): Boolean = { file.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE || - file.getName == ParquetFileWriter.PARQUET_METADATA_FILE + file.getName == ParquetFileWriter.PARQUET_METADATA_FILE } /** @@ -275,6 +275,13 @@ class ParquetFileFormat true } + override def getPathFilter(options: Map[String, String]): PathFilter = { + new PathFilter { + override def isDataPath(path: Path): Boolean = PathFilter.defaultPathFilter.isDataPath(path) + override def isMetaDataPath(path: Path): Boolean = isSummaryFile(path) + } + } + override def buildReaderWithPartitionValues( sparkSession: SparkSession, dataSchema: StructType, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index a9e64c640042a..12e38b0be8d6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} -import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation, PathFilter} import org.apache.spark.sql.types.StructType /** @@ -195,7 +195,8 @@ class FileStreamSource( private def allFilesUsingMetadataLogFileIndex() = { // Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is guaranteed to be a // non-glob path - new MetadataLogFileIndex(sparkSession, qualifiedBasePath).allFiles() + new MetadataLogFileIndex(sparkSession, qualifiedBasePath, PathFilter.defaultPathFilter) + .allFiles() } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala index aeaa134736937..880480325c0ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala @@ -29,8 +29,8 @@ import org.apache.spark.sql.execution.datasources._ * A [[FileIndex]] that generates the list of files to processing by reading them from the * metadata log files generated by the [[FileStreamSink]]. */ -class MetadataLogFileIndex(sparkSession: SparkSession, path: Path) - extends PartitioningAwareFileIndex(sparkSession, Map.empty, None) { +class MetadataLogFileIndex(sparkSession: SparkSession, path: Path, pathFilter: PathFilter) + extends PartitioningAwareFileIndex(sparkSession, Map.empty, None, pathFilter) { private val metadataDirectory = new Path(path, FileStreamSink.metadataDir) logInfo(s"Reading streaming file log from $metadataDirectory") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatSuite.scala new file mode 100644 index 0000000000000..14d954aa662f4 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatSuite.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat + +class FileFormatSuite extends SparkFunSuite { + + test("default file filtering") { + val defaultPathFilter = PathFilter.defaultPathFilter + assert(defaultPathFilter.accept(new Path("abcd"))) + assert(!defaultPathFilter.accept(new Path(".ab"))) + assert(!defaultPathFilter.accept(new Path("_cd"))) + assert(!defaultPathFilter.accept(new Path("a._COPYING_"))) + } + + test("parquet file filtering") { + val parquetPathFilter = new ParquetFileFormat().getPathFilter(Map.empty) + assert(parquetPathFilter.accept(new Path("abcd"))) + assert(!parquetPathFilter.accept(new Path(".ab"))) + assert(!parquetPathFilter.accept(new Path("_cd"))) + assert(!parquetPathFilter.accept(new Path("a._COPYING_"))) + assert(parquetPathFilter.accept(new Path("_metadata"))) + assert(parquetPathFilter.accept(new Path("_common_metadata"))) + assert(!parquetPathFilter.accept(new Path("_ab_metadata"))) + assert(!parquetPathFilter.accept(new Path("_cd_common_metadata"))) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 7ea4064927576..269ebca6d205f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -38,7 +38,8 @@ class FileIndexSuite extends SharedSQLContext { stringToFile(file, "text") val path = new Path(file.getCanonicalPath) - val catalog = new InMemoryFileIndex(spark, Seq(path), Map.empty, None) { + val catalog = new InMemoryFileIndex(spark, Seq(path), Map.empty, None, + PathFilter.defaultPathFilter) { def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq } @@ -135,21 +136,10 @@ class FileIndexSuite extends SharedSQLContext { } } - test("PartitioningAwareFileIndex - file filtering") { - assert(!PartitioningAwareFileIndex.shouldFilterOut("abcd")) - assert(PartitioningAwareFileIndex.shouldFilterOut(".ab")) - assert(PartitioningAwareFileIndex.shouldFilterOut("_cd")) - assert(!PartitioningAwareFileIndex.shouldFilterOut("_metadata")) - assert(!PartitioningAwareFileIndex.shouldFilterOut("_common_metadata")) - assert(PartitioningAwareFileIndex.shouldFilterOut("_ab_metadata")) - assert(PartitioningAwareFileIndex.shouldFilterOut("_cd_common_metadata")) - assert(PartitioningAwareFileIndex.shouldFilterOut("a._COPYING_")) - } - test("SPARK-17613 - PartitioningAwareFileIndex: base path w/o '/' at end") { class MockCatalog( override val rootPaths: Seq[Path]) - extends PartitioningAwareFileIndex(spark, Map.empty, None) { + extends PartitioningAwareFileIndex(spark, Map.empty, None, PathFilter.defaultPathFilter) { override def refresh(): Unit = {} @@ -201,7 +191,8 @@ class FileIndexSuite extends SharedSQLContext { val dirPath = new Path(dir.getAbsolutePath) val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf()) val catalog = - new InMemoryFileIndex(spark, Seq(dirPath), Map.empty, None, fileStatusCache) { + new InMemoryFileIndex( + spark, Seq(dirPath), Map.empty, None, PathFilter.defaultPathFilter, fileStatusCache) { def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index f36162858bf7a..ca95f36bbd0ca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger import java.util.zip.GZIPOutputStream import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path, RawLocalFileSystem} +import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path, PathFilter, RawLocalFileSystem} import org.apache.hadoop.mapreduce.Job import org.apache.spark.{SparkConf, SparkException} @@ -487,6 +487,50 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi } } + test("filter out invalid files in a driver") { + withSQLConf( + "fs.file.impl" -> classOf[MockDistributedFileSystem].getName, + SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "3") { + val table = + createTable( + files = Seq( + "p1=1/file1" -> 1, + "p1=1/file2" -> 1, + "p1=2/file3" -> 1, + "p1=2/.temp" -> 1, + "p1=2/_temp" -> 1, + "p1=2/temp._COPYING_" -> 1, + "p1=2/invalid_file" -> 1)) + + checkScan(table.select('c1)) { partitions => + assert(partitions.size == 1) + assert(partitions.head.files.size == 3) + } + } + } + + test("filter out invalid files in parallel") { + withSQLConf( + "fs.file.impl" -> classOf[MockDistributedFileSystem].getName, + SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "2") { + val table = + createTable( + files = Seq( + "p1=1/file1" -> 1, + "p1=1/file2" -> 1, + "p1=2/file3" -> 1, + "p1=2/.temp" -> 1, + "p1=2/_temp" -> 1, + "p1=2/temp._COPYING_" -> 1, + "p1=2/invalid_file" -> 1)) + + checkScan(table.select('c1)) { partitions => + assert(partitions.size == 1) + assert(partitions.head.files.size == 3) + } + } + } + // Helpers for checking the arguments passed to the FileFormat. protected val checkPartitionSchema = @@ -600,6 +644,14 @@ class TestFileFormat extends TextBasedFileFormat { .add("c1", IntegerType) .add("c2", IntegerType)) + override def getPathFilter(options: Map[String, String]): PathFilter = { + new PathFilter { + override def isDataPath(path: Path): Boolean = { + PathFilter.defaultPathFilter.isDataPath(path) && !path.getName.startsWith("invalid_") + } + } + } + /** * Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can * be put here. For example, user defined output committer can be configured here