From a79c0f277e301a125ac347b8354247757e5710ad Mon Sep 17 00:00:00 2001 From: Yana Kadiyska Date: Thu, 17 Dec 2015 12:05:05 -0500 Subject: [PATCH 1/2] Bugfix and test --- .../apache/spark/sql/DataFrameReader.scala | 20 ++++++++++++------- .../org/apache/spark/sql/DataFrameSuite.scala | 10 ++++++++++ 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 3ed1e55adec6d..0eaa5651a4a8c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -36,6 +36,8 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource} import org.apache.spark.sql.types.StructType +import scala.util.{Success, Try} + /** * :: Experimental :: * Interface used to load a [[DataFrame]] from external storage systems (e.g. file systems, @@ -306,19 +308,23 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { if (paths.isEmpty) { sqlContext.emptyDataFrame } else { - val globbedPaths = paths.flatMap { path => + val globbedPaths = paths.map { path => val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - SparkHadoopUtil.get.globPathIfNecessary(qualified) - }.toArray - - sqlContext.baseRelationToDataFrame( - new ParquetRelation( - globbedPaths.map(_.toString), userSpecifiedSchema, None, extraOptions.toMap)(sqlContext)) + Try(SparkHadoopUtil.get.globPathIfNecessary(qualified)) + }.collect { case Success(s) =>s }.flatten.toArray + + if (globbedPaths.isEmpty) { + sqlContext.emptyDataFrame + } else + sqlContext.baseRelationToDataFrame( + new ParquetRelation( + globbedPaths.map(_.toString), userSpecifiedSchema, None, extraOptions.toMap)(sqlContext)) } } + /** * Loads an ORC file and returns the result as a [[DataFrame]]. * diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 0644bdaaa35ce..68d1e0baab79f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -523,6 +523,16 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } } + test(" Missing parquet files(SPARK-12369"){ + withTempPath { path => + Seq((2012,"a","b")).toDF("year", "vala","valb").write.partitionBy("year","vala").parquet(path.getAbsolutePath) + val df = sqlContext.read.parquet(s"${path.getAbsolutePath}/year=2015/*/*.parquet") + assert(df.inputFiles.isEmpty) + val df1 = sqlContext.read.parquet(s"${path.getAbsolutePath}/year=2012/*/*.parquet") + assert(df1.inputFiles.nonEmpty) + } + } + ignore("show") { // This test case is intended ignored, but to make sure it compiles correctly testData.select($"*").show() From 25c0f41686eb3e9c646321bff6874efa49a46ed3 Mon Sep 17 00:00:00 2001 From: Yana Date: Fri, 18 Dec 2015 08:21:39 -0500 Subject: [PATCH 2/2] Scalastyle fixes --- .../scala/org/apache/spark/sql/DataFrameReader.scala | 10 +++++++--- .../scala/org/apache/spark/sql/DataFrameSuite.scala | 3 ++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 0eaa5651a4a8c..82ec5f06f0286 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -313,14 +313,18 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) Try(SparkHadoopUtil.get.globPathIfNecessary(qualified)) - }.collect { case Success(s) =>s }.flatten.toArray + }.collect { case Success(s) => s }.flatten.toArray if (globbedPaths.isEmpty) { sqlContext.emptyDataFrame - } else + } else { sqlContext.baseRelationToDataFrame( new ParquetRelation( - globbedPaths.map(_.toString), userSpecifiedSchema, None, extraOptions.toMap)(sqlContext)) + globbedPaths.map(_.toString), + userSpecifiedSchema, + None, + extraOptions.toMap)(sqlContext)) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 68d1e0baab79f..0000039c52e96 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -525,7 +525,8 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { test(" Missing parquet files(SPARK-12369"){ withTempPath { path => - Seq((2012,"a","b")).toDF("year", "vala","valb").write.partitionBy("year","vala").parquet(path.getAbsolutePath) + Seq((2012, "a", "b")).toDF("year", "vala", "valb") + .write.partitionBy("year", "vala").parquet(path.getAbsolutePath) val df = sqlContext.read.parquet(s"${path.getAbsolutePath}/year=2015/*/*.parquet") assert(df.inputFiles.isEmpty) val df1 = sqlContext.read.parquet(s"${path.getAbsolutePath}/year=2012/*/*.parquet")