From 317bd1b63cc38e82bc1b09c53a68d4dd202aa4e8 Mon Sep 17 00:00:00 2001 From: Andrey Taptunov Date: Thu, 13 Jul 2017 12:50:42 +0200 Subject: [PATCH 1/2] [SPARK-21374][CORE] Fix reading globbed paths from S3 into DF with disabled FS cache --- .../org/apache/spark/deploy/SparkHadoopUtil.scala | 12 ++++++++++-- .../spark/sql/execution/datasources/DataSource.scala | 4 ++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 6afe58bff5229..f113c04b90d7f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -226,14 +226,22 @@ class SparkHadoopUtil extends Logging { } def globPath(pattern: Path): Seq[Path] = { - val fs = pattern.getFileSystem(conf) + globPath(pattern, conf) + } + + def globPath(pattern: Path, hadoopConf: Configuration): Seq[Path] = { + val fs = pattern.getFileSystem(hadoopConf) Option(fs.globStatus(pattern)).map { statuses => statuses.map(_.getPath.makeQualified(fs.getUri, fs.getWorkingDirectory)).toSeq }.getOrElse(Seq.empty[Path]) } def globPathIfNecessary(pattern: Path): Seq[Path] = { - if (isGlobPath(pattern)) globPath(pattern) else Seq(pattern) + globPathIfNecessary(pattern, conf) + } + + def globPathIfNecessary(pattern: Path, hadoopConf: Configuration): Seq[Path] = { + if (isGlobPath(pattern)) globPath(pattern, hadoopConf) else Seq(pattern) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index d36a04f1fff8e..1da8f371f87a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -132,7 +132,7 @@ case class DataSource( val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(hadoopConf) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - SparkHadoopUtil.get.globPathIfNecessary(qualified) + SparkHadoopUtil.get.globPathIfNecessary(qualified, hadoopConf) }.toArray new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache) } @@ -364,7 +364,7 @@ case class DataSource( val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(hadoopConf) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified) + val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified, hadoopConf) if (globPath.isEmpty) { throw new AnalysisException(s"Path does not exist: $qualified") From 9d9b57a68d6d0504f4b58fd3fc99d6f678e4f213 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 4 Aug 2017 14:05:17 -0700 Subject: [PATCH 2/2] Clean up --- .../apache/spark/deploy/SparkHadoopUtil.scala | 12 ++--- .../execution/datasources/DataSource.scala | 45 ++++++++++++------- 2 files changed, 34 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index f113c04b90d7f..550bd6854f67e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -226,22 +226,22 @@ class SparkHadoopUtil extends Logging { } def globPath(pattern: Path): Seq[Path] = { - globPath(pattern, conf) + val fs = pattern.getFileSystem(conf) + globPath(fs, pattern) } - def globPath(pattern: Path, hadoopConf: Configuration): Seq[Path] = { - val fs = pattern.getFileSystem(hadoopConf) + def globPath(fs: FileSystem, pattern: Path): Seq[Path] = { Option(fs.globStatus(pattern)).map { statuses => statuses.map(_.getPath.makeQualified(fs.getUri, fs.getWorkingDirectory)).toSeq }.getOrElse(Seq.empty[Path]) } def globPathIfNecessary(pattern: Path): Seq[Path] = { - globPathIfNecessary(pattern, conf) + if (isGlobPath(pattern)) globPath(pattern) else Seq(pattern) } - def globPathIfNecessary(pattern: Path, hadoopConf: Configuration): Seq[Path] = { - if (isGlobPath(pattern)) globPath(pattern, hadoopConf) else Seq(pattern) + def globPathIfNecessary(fs: FileSystem, pattern: Path): Seq[Path] = { + if (isGlobPath(pattern)) globPath(fs, pattern) else Seq(pattern) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 1da8f371f87a1..6e88e275738aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import scala.language.{existentials, implicitConversions} import scala.util.{Failure, Success, Try} +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil @@ -132,7 +133,7 @@ case class DataSource( val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(hadoopConf) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - SparkHadoopUtil.get.globPathIfNecessary(qualified, hadoopConf) + SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) }.toArray new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache) } @@ -360,22 +361,8 @@ case class DataSource( case (format: FileFormat, _) => val allPaths = caseInsensitiveOptions.get("path") ++ paths val hadoopConf = sparkSession.sessionState.newHadoopConf() - val globbedPaths = allPaths.flatMap { path => - val hdfsPath = new Path(path) - val fs = hdfsPath.getFileSystem(hadoopConf) - val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified, hadoopConf) - - if (globPath.isEmpty) { - throw new AnalysisException(s"Path does not exist: $qualified") - } - // Sufficient to check head of the globPath seq for non-glob scenario - // Don't need to check once again if files exist in streaming mode - if (checkFilesExist && !fs.exists(globPath.head)) { - throw new AnalysisException(s"Path does not exist: ${globPath.head}") - } - globPath - }.toArray + val globbedPaths = allPaths.flatMap( + DataSource.checkAndGlobPathIfNecessary(hadoopConf, _, checkFilesExist)).toArray val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache) @@ -638,4 +625,28 @@ object DataSource extends Logging { CatalogStorageFormat.empty.copy( locationUri = path.map(CatalogUtils.stringToURI), properties = optionsWithoutPath) } + + /** + * If `path` is a file pattern, return all the files that match it. Otherwise, return itself. + * If `checkFilesExist` is `true`, also check the file existence. + */ + private def checkAndGlobPathIfNecessary( + hadoopConf: Configuration, + path: String, + checkFilesExist: Boolean): Seq[Path] = { + val hdfsPath = new Path(path) + val fs = hdfsPath.getFileSystem(hadoopConf) + val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + val globPath = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) + + if (globPath.isEmpty) { + throw new AnalysisException(s"Path does not exist: $qualified") + } + // Sufficient to check head of the globPath seq for non-glob scenario + // Don't need to check once again if files exist in streaming mode + if (checkFilesExist && !fs.exists(globPath.head)) { + throw new AnalysisException(s"Path does not exist: ${globPath.head}") + } + globPath + } }