From e03e7761679502a98d11e3c254cc62efcc0eb36b Mon Sep 17 00:00:00 2001 From: Sunitha Kambhampati Date: Wed, 16 Mar 2016 14:44:03 -0700 Subject: [PATCH 1/3] SPARK-13774 - Improve error message for non-existent paths and add tests --- .../apache/spark/deploy/SparkHadoopUtil.scala | 2 + .../execution/datasources/DataSource.scala | 12 ++- .../org/apache/spark/sql/DataFrameSuite.scala | 16 +++ .../org/apache/spark/sql/SQLQuerySuite.scala | 2 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 97 ++++++++++--------- 5 files changed, 81 insertions(+), 48 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 270ca84e24ae4..e8b617b645537 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -237,6 +237,8 @@ class SparkHadoopUtil extends Logging { } } + def pathExists( p: Path, config: Configuration): Boolean = p.getFileSystem(config).exists(p) + /** * Lists all the files in a directory with the specified prefix, and does not end with the * given suffix. The returned {{FileStatus}} instances are sorted by the modification times of diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index e65a771202bce..5f95d069df563 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -205,7 +205,17 @@ case class DataSource( val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - SparkHadoopUtil.get.globPathIfNecessary(qualified) + val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified) + + if (globPath.isEmpty) { + throw new AnalysisException(s"Path does not exist: $qualified") + } + // Sufficient to check head of the globPath seq for non-glob scenario + if (! (SparkHadoopUtil. + get.pathExists(globPath.head, sqlContext.sparkContext.hadoopConfiguration))) { + throw new AnalysisException("Path does not exist: " + globPath.head) + } + globPath }.toArray // If they gave a schema, then we try and figure out the types of the partition columns diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 199e138abfdc2..c9007ac87fb0e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1376,4 +1376,20 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(e.getStackTrace.head.getClassName != classOf[QueryExecution].getName) } + + test("SPARK-13774: Check error message for non existent path without globbed paths") { + val e = intercept[AnalysisException] (sqlContext.read.format("csv"). + load("/xyz/file2", "/xyz/file21", "/abc/files555", "a")).getMessage() + assert(e.startsWith("Path does not exist")) + } + + test("SPARK-13774: Check error message for not existent globbed paths") { + val e = intercept[AnalysisException] (sqlContext.read.format("text"). + load( "/xyz/*")).getMessage() + assert(e.startsWith("Path does not exist")) + + val e1 = intercept[AnalysisException] (sqlContext.read.json("/mnt/*/*-xyz.json").rdd). + getMessage() + assert(e1.startsWith("Path does not exist")) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 3efe984c09eb8..936dd0af59b43 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1742,7 +1742,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { val e3 = intercept[AnalysisException] { sql("select * from json.invalid_file") } - assert(e3.message.contains("Unable to infer schema")) + assert(e3.message.contains("Path does not exist")) } test("SortMergeJoin returns wrong results when using UnsafeRows") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 81420fea116bd..eacb82092b8ce 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -693,23 +693,25 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv test("SPARK-6024 wide schema support") { withSQLConf(SQLConf.SCHEMA_STRING_LENGTH_THRESHOLD.key -> "4000") { withTable("wide_schema") { - // We will need 80 splits for this schema if the threshold is 4000. - val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true))) - - // Manually create a metastore data source table. - sessionState.catalog.createDataSourceTable( - tableIdent = TableIdentifier("wide_schema"), - userSpecifiedSchema = Some(schema), - partitionColumns = Array.empty[String], - bucketSpec = None, - provider = "json", - options = Map("path" -> "just a dummy path"), - isExternal = false) - - invalidateTable("wide_schema") - - val actualSchema = table("wide_schema").schema - assert(schema === actualSchema) + withTempDir( tempDir => { + // We will need 80 splits for this schema if the threshold is 4000. + val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true))) + + // Manually create a metastore data source table. + sessionState.catalog.createDataSourceTable( + tableIdent = TableIdentifier("wide_schema"), + userSpecifiedSchema = Some(schema), + partitionColumns = Array.empty[String], + bucketSpec = None, + provider = "json", + options = Map("path" -> tempDir.getCanonicalPath), + isExternal = false) + + invalidateTable("wide_schema") + + val actualSchema = table("wide_schema").schema + assert(schema === actualSchema) + }) } } } @@ -900,35 +902,38 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv sqlContext.sql("""drop database if exists testdb8156 CASCADE""") } + test("skip hive metadata on table creation") { - val schema = StructType((1 to 5).map(i => StructField(s"c_$i", StringType))) - - sessionState.catalog.createDataSourceTable( - tableIdent = TableIdentifier("not_skip_hive_metadata"), - userSpecifiedSchema = Some(schema), - partitionColumns = Array.empty[String], - bucketSpec = None, - provider = "parquet", - options = Map("path" -> "just a dummy path", "skipHiveMetadata" -> "false"), - isExternal = false) - - // As a proxy for verifying that the table was stored in Hive compatible format, we verify that - // each column of the table is of native type StringType. - assert(sessionState.catalog.client.getTable("default", "not_skip_hive_metadata").schema - .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == StringType)) - - sessionState.catalog.createDataSourceTable( - tableIdent = TableIdentifier("skip_hive_metadata"), - userSpecifiedSchema = Some(schema), - partitionColumns = Array.empty[String], - bucketSpec = None, - provider = "parquet", - options = Map("path" -> "just a dummy path", "skipHiveMetadata" -> "true"), - isExternal = false) - - // As a proxy for verifying that the table was stored in SparkSQL format, we verify that - // the table has a column type as array of StringType. - assert(sessionState.catalog.client.getTable("default", "skip_hive_metadata").schema - .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == ArrayType(StringType))) + withTempDir(tempPath => { + val schema = StructType((1 to 5).map(i => StructField(s"c_$i", StringType))) + + sessionState.catalog.createDataSourceTable( + tableIdent = TableIdentifier("not_skip_hive_metadata"), + userSpecifiedSchema = Some(schema), + partitionColumns = Array.empty[String], + bucketSpec = None, + provider = "parquet", + options = Map("path" -> tempPath.getCanonicalPath, "skipHiveMetadata" -> "false"), + isExternal = false) + + // As a proxy for verifying that the table was stored in Hive compatible format, we verify that + // each column of the table is of native type StringType. + assert(sessionState.catalog.client.getTable("default", "not_skip_hive_metadata").schema + .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == StringType)) + + sessionState.catalog.createDataSourceTable( + tableIdent = TableIdentifier("skip_hive_metadata"), + userSpecifiedSchema = Some(schema), + partitionColumns = Array.empty[String], + bucketSpec = None, + provider = "parquet", + options = Map("path" -> tempPath.getCanonicalPath, "skipHiveMetadata" -> "true"), + isExternal = false) + + // As a proxy for verifying that the table was stored in SparkSQL format, we verify that + // the table has a column type as array of StringType. + assert(sessionState.catalog.client.getTable("default", "skip_hive_metadata").schema + .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == ArrayType(StringType))) + }) } } From 99503d3126b983b0b949bd452a04536887fe6faf Mon Sep 17 00:00:00 2001 From: Sunitha Kambhampati Date: Thu, 17 Mar 2016 10:47:12 -0700 Subject: [PATCH 2/3] Address review comments - pass scalastyle checks,use string interpolation for the error message and remove the pathExists method and use it inline --- .../scala/org/apache/spark/deploy/SparkHadoopUtil.scala | 2 -- .../spark/sql/execution/datasources/DataSource.scala | 7 +++---- .../apache/spark/sql/hive/MetastoreDataSourcesSuite.scala | 4 ++-- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index e8b617b645537..270ca84e24ae4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -237,8 +237,6 @@ class SparkHadoopUtil extends Logging { } } - def pathExists( p: Path, config: Configuration): Boolean = p.getFileSystem(config).exists(p) - /** * Lists all the files in a directory with the specified prefix, and does not end with the * given suffix. The returned {{FileStatus}} instances are sorted by the modification times of diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 5f95d069df563..50b85f2813c5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -211,10 +211,9 @@ case class DataSource( throw new AnalysisException(s"Path does not exist: $qualified") } // Sufficient to check head of the globPath seq for non-glob scenario - if (! (SparkHadoopUtil. - get.pathExists(globPath.head, sqlContext.sparkContext.hadoopConfiguration))) { - throw new AnalysisException("Path does not exist: " + globPath.head) - } + if (! fs.exists(globPath.head)) { + throw new AnalysisException(s"Path does not exist: ${globPath.head}") + } globPath }.toArray diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index eacb82092b8ce..99277a24b0b3c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -916,8 +916,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv options = Map("path" -> tempPath.getCanonicalPath, "skipHiveMetadata" -> "false"), isExternal = false) - // As a proxy for verifying that the table was stored in Hive compatible format, we verify that - // each column of the table is of native type StringType. + // As a proxy for verifying that the table was stored in Hive compatible format, + // we verify that each column of the table is of native type StringType. assert(sessionState.catalog.client.getTable("default", "not_skip_hive_metadata").schema .forall(column => HiveMetastoreTypes.toDataType(column.dataType) == StringType)) From 02b8e74b4a402f8a9bba37cd6d88e37fe80cc731 Mon Sep 17 00:00:00 2001 From: Sunitha Kambhampati Date: Fri, 18 Mar 2016 10:05:04 -0700 Subject: [PATCH 3/3] Remove space --- .../org/apache/spark/sql/execution/datasources/DataSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 50b85f2813c5c..56457184dc0f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -211,7 +211,7 @@ case class DataSource( throw new AnalysisException(s"Path does not exist: $qualified") } // Sufficient to check head of the globPath seq for non-glob scenario - if (! fs.exists(globPath.head)) { + if (!fs.exists(globPath.head)) { throw new AnalysisException(s"Path does not exist: ${globPath.head}") } globPath