From e168a6311cf85b00c187cc64eca605ea9e5a8a6d Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 20 Sep 2016 14:36:16 -0700 Subject: [PATCH 1/3] S3A base paths with no '/' at the end return empty DataFrames --- .../PartitioningAwareFileCatalog.scala | 10 ++++- .../datasources/FileCatalogSuite.scala | 45 ++++++++++++++++++- 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala index d2d5b56c82946..b280decd6603a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala @@ -76,7 +76,15 @@ abstract class PartitioningAwareFileCatalog( paths.flatMap { path => // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel). val fs = path.getFileSystem(hadoopConf) - val qualifiedPath = fs.makeQualified(path) + val qualifiedPathPre = fs.makeQualified(path) + val qualifiedPath: Path = if (qualifiedPathPre.getParent == null) { + // SPARK-17613: Always append `Path.SEPARATOR` to the end of parent directories, + // because the `leafFile.getParent` would have returned a path with the separator at the + // end. + new Path(qualifiedPathPre, Path.SEPARATOR) + } else { + qualifiedPathPre + } // There are three cases possible with each path // 1. The path is a directory and has children files in it. Then it must be present in diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala index 0d9ea512729bd..563f34068812c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala @@ -18,10 +18,12 @@ package org.apache.spark.sql.execution.datasources import java.io.File +import java.net.URI +import scala.collection.mutable import scala.language.reflectiveCalls -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.test.SharedSQLContext @@ -67,4 +69,45 @@ class FileCatalogSuite extends SharedSQLContext { } } + + test("SPARK-17613 - PartitioningAwareFileCatalog: base path w/o '/' at end") { + class MockCatalog( + override val paths: Seq[Path]) extends PartitioningAwareFileCatalog(spark, Map.empty, None) { + + override def refresh(): Unit = {} + + override def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = mutable.LinkedHashMap( + new Path("mockFs://some-bucket/file1.json") -> new FileStatus() + ) + + override def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = Map( + new Path("mockFs://some-bucket/") -> Array(new FileStatus()) + ) + + override def partitionSpec(): PartitionSpec = { + PartitionSpec.emptySpec + } + } + + withSQLConf( + "fs.mockFs.impl" -> classOf[FakeParentPathFileSystem].getName, + "fs.mockFs.impl.disable.cache" -> "true") { + val pathWithSlash = new Path("mockFs://some-bucket/") + assert(pathWithSlash.getParent === null) + val pathWithoutSlash = new Path("mockFs://some-bucket") + assert(pathWithoutSlash.getParent === null) + val catalog1 = new MockCatalog(Seq(pathWithSlash)) + val catalog2 = new MockCatalog(Seq(pathWithoutSlash)) + assert(catalog1.allFiles().nonEmpty) + assert(catalog2.allFiles().nonEmpty) + } + } +} + +class FakeParentPathFileSystem extends RawLocalFileSystem { + override def getScheme: String = "mockFs" + + override def getUri: URI = { + URI.create("mockFs://some-bucket") + } } From 97a85d9970fda23ced2e5fcbab8b8906678ae50e Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 20 Sep 2016 16:23:15 -0700 Subject: [PATCH 2/3] Update PartitioningAwareFileCatalog.scala --- .../datasources/PartitioningAwareFileCatalog.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala index b280decd6603a..3295207e7e1ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala @@ -77,10 +77,10 @@ abstract class PartitioningAwareFileCatalog( // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel). val fs = path.getFileSystem(hadoopConf) val qualifiedPathPre = fs.makeQualified(path) - val qualifiedPath: Path = if (qualifiedPathPre.getParent == null) { + val qualifiedPath: Path = if (qualifiedPathPre.isRoot && !qualifiedPathPre.isAbsolute) { // SPARK-17613: Always append `Path.SEPARATOR` to the end of parent directories, - // because the `leafFile.getParent` would have returned a path with the separator at the - // end. + // because the `leafFile.getParent` would have returned an absolute path with the separator + // at the end. new Path(qualifiedPathPre, Path.SEPARATOR) } else { qualifiedPathPre From f5a9c12536a16e373c33b8e84bcd07126b77ccaf Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 20 Sep 2016 16:34:20 -0700 Subject: [PATCH 3/3] fix ss --- .../execution/datasources/PartitioningAwareFileCatalog.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala index 3295207e7e1ad..702ba97222e34 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala @@ -79,8 +79,8 @@ abstract class PartitioningAwareFileCatalog( val qualifiedPathPre = fs.makeQualified(path) val qualifiedPath: Path = if (qualifiedPathPre.isRoot && !qualifiedPathPre.isAbsolute) { // SPARK-17613: Always append `Path.SEPARATOR` to the end of parent directories, - // because the `leafFile.getParent` would have returned an absolute path with the separator - // at the end. + // because the `leafFile.getParent` would have returned an absolute path with the + // separator at the end. new Path(qualifiedPathPre, Path.SEPARATOR) } else { qualifiedPathPre