Skip to content

Commit

Permalink
[SPARK-15515][SQL] Error Handling in Running SQL Directly On Files
Browse files Browse the repository at this point in the history
#### What changes were proposed in this pull request?
This PR is to address the following issues:

- **ISSUE 1:** For ORC source format, we are reporting the strange error message when we did not enable Hive support:
```SQL
SQL Example:
  select id from `org.apache.spark.sql.hive.orc`.`file_path`
Error Message:
  Table or view not found: `org.apache.spark.sql.hive.orc`.`file_path`
```
Instead, we should issue the error message like:
```
Expected Error Message:
   The ORC data source must be used with Hive support enabled
```
- **ISSUE 2:** For the Avro format, we report the strange error message like:

The example query is like
  ```SQL
SQL Example:
  select id from `avro`.`file_path`
  select id from `com.databricks.spark.avro`.`file_path`
Error Message:
  Table or view not found: `com.databricks.spark.avro`.`file_path`
   ```
The desired message should be like:
```
Expected Error Message:
  Failed to find data source: avro. Please use Spark package http://spark-packages.org/package/databricks/spark-avro"
```

- ~~**ISSUE 3:** Unable to detect incompatibility libraries for Spark 2.0 in Data Source Resolution. We report a strange error message:~~

**Update**: The latest code changes contains
- For JDBC format, we added an extra checking in the rule `ResolveRelations` of `Analyzer`. Without the PR, Spark will return the error message like: `Option 'url' not specified`. Now, we are reporting `Unsupported data source type for direct query on files: jdbc`
- Make data source format name case incensitive so that error handling behaves consistent with the normal cases.
- Added the test cases for all the supported formats.

#### How was this patch tested?
Added test cases to cover all the above issues

Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>

Closes #13283 from gatorsmile/runSQLAgainstFile.
  • Loading branch information
gatorsmile authored and zsxwing committed Jun 2, 2016
1 parent 8900c8d commit 9aff6f3
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,28 +132,20 @@ case class DataSource(
// Found the data source using fully qualified path
dataSource
case Failure(error) =>
if (error.isInstanceOf[ClassNotFoundException]) {
val className = error.getMessage
if (spark2RemovedClasses.contains(className)) {
throw new ClassNotFoundException(s"$className is removed in Spark 2.0. " +
"Please check if your library is compatible with Spark 2.0")
}
}
if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
throw new ClassNotFoundException(
"The ORC data source must be used with Hive support enabled.", error)
if (provider.toLowerCase == "orc" ||
provider.startsWith("org.apache.spark.sql.hive.orc")) {
throw new AnalysisException(
"The ORC data source must be used with Hive support enabled")
} else if (provider.toLowerCase == "avro" ||
provider == "com.databricks.spark.avro") {
throw new AnalysisException(
s"Failed to find data source: ${provider.toLowerCase}. Please use Spark " +
"package http://spark-packages.org/package/databricks/spark-avro")
} else {
if (provider == "avro" || provider == "com.databricks.spark.avro") {
throw new ClassNotFoundException(
s"Failed to find data source: $provider. Please use Spark package " +
"http://spark-packages.org/package/databricks/spark-avro",
error)
} else {
throw new ClassNotFoundException(
s"Failed to find data source: $provider. Please find packages at " +
"http://spark-packages.org",
error)
}
throw new ClassNotFoundException(
s"Failed to find data source: $provider. Please find packages at " +
"http://spark-packages.org",
error)
}
}
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.datasources

import scala.util.control.NonFatal

import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
Expand All @@ -28,7 +30,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}

/**
* Try to replaces [[UnresolvedRelation]]s with [[ResolvedDataSource]].
* Try to replaces [[UnresolvedRelation]]s with [[ResolveDataSource]].
*/
private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
Expand All @@ -38,6 +40,16 @@ private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[Lo
sparkSession,
paths = u.tableIdentifier.table :: Nil,
className = u.tableIdentifier.database.get)

val notSupportDirectQuery = try {
!classOf[FileFormat].isAssignableFrom(dataSource.providingClass)
} catch {
case NonFatal(e) => false
}
if (notSupportDirectQuery) {
throw new AnalysisException("Unsupported data source type for direct query on files: " +
s"${u.tableIdentifier.database.get}")
}
val plan = LogicalRelation(dataSource.resolveRelation())
u.alias.map(a => SubqueryAlias(u.alias.get, plan)).getOrElse(plan)
} catch {
Expand Down
53 changes: 47 additions & 6 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1838,20 +1838,61 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
df)
})

val e1 = intercept[AnalysisException] {
var e = intercept[AnalysisException] {
sql("select * from in_valid_table")
}
assert(e1.message.contains("Table or view not found"))
assert(e.message.contains("Table or view not found"))

val e2 = intercept[AnalysisException] {
e = intercept[AnalysisException] {
sql("select * from no_db.no_table").show()
}
assert(e2.message.contains("Table or view not found"))
assert(e.message.contains("Table or view not found"))

val e3 = intercept[AnalysisException] {
e = intercept[AnalysisException] {
sql("select * from json.invalid_file")
}
assert(e3.message.contains("Path does not exist"))
assert(e.message.contains("Path does not exist"))

e = intercept[AnalysisException] {
sql(s"select id from `org.apache.spark.sql.hive.orc`.`file_path`")
}
assert(e.message.contains("The ORC data source must be used with Hive support enabled"))

e = intercept[AnalysisException] {
sql(s"select id from `com.databricks.spark.avro`.`file_path`")
}
assert(e.message.contains("Failed to find data source: com.databricks.spark.avro. " +
"Please use Spark package http://spark-packages.org/package/databricks/spark-avro"))

// data source type is case insensitive
e = intercept[AnalysisException] {
sql(s"select id from Avro.`file_path`")
}
assert(e.message.contains("Failed to find data source: avro. Please use Spark package " +
"http://spark-packages.org/package/databricks/spark-avro"))

e = intercept[AnalysisException] {
sql(s"select id from avro.`file_path`")
}
assert(e.message.contains("Failed to find data source: avro. Please use Spark package " +
"http://spark-packages.org/package/databricks/spark-avro"))

e = intercept[AnalysisException] {
sql(s"select id from `org.apache.spark.sql.sources.HadoopFsRelationProvider`.`file_path`")
}
assert(e.message.contains("Table or view not found: " +
"`org.apache.spark.sql.sources.HadoopFsRelationProvider`.`file_path`"))

e = intercept[AnalysisException] {
sql(s"select id from `Jdbc`.`file_path`")
}
assert(e.message.contains("Unsupported data source type for direct query on files: Jdbc"))

e = intercept[AnalysisException] {
sql(s"select id from `org.apache.spark.sql.execution.datasources.jdbc`.`file_path`")
}
assert(e.message.contains("Unsupported data source type for direct query on files: " +
"org.apache.spark.sql.execution.datasources.jdbc"))
}

test("SortMergeJoin returns wrong results when using UnsafeRows") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.sources

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.{AnalysisException, SQLContext}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{StringType, StructField, StructType}

Expand All @@ -42,9 +42,10 @@ class DDLSourceLoadSuite extends DataSourceTest with SharedSQLContext {
}

test("should fail to load ORC without Hive Support") {
intercept[ClassNotFoundException] {
val e = intercept[AnalysisException] {
spark.read.format("orc").load()
}
assert(e.message.contains("The ORC data source must be used with Hive support enabled"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.sources

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.execution.datasources.DataSource

class ResolvedDataSourceSuite extends SparkFunSuite {
Expand Down Expand Up @@ -60,13 +61,22 @@ class ResolvedDataSourceSuite extends SparkFunSuite {
classOf[org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat])
}

test("csv") {
assert(
getProvidingClass("csv") ===
classOf[org.apache.spark.sql.execution.datasources.csv.CSVFileFormat])
assert(
getProvidingClass("com.databricks.spark.csv") ===
classOf[org.apache.spark.sql.execution.datasources.csv.CSVFileFormat])
}

test("error message for unknown data sources") {
val error1 = intercept[ClassNotFoundException] {
val error1 = intercept[AnalysisException] {
getProvidingClass("avro")
}
assert(error1.getMessage.contains("spark-packages"))

val error2 = intercept[ClassNotFoundException] {
val error2 = intercept[AnalysisException] {
getProvidingClass("com.databricks.spark.avro")
}
assert(error2.getMessage.contains("spark-packages"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1247,11 +1247,12 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}

test("run sql directly on files") {
test("run sql directly on files - parquet") {
val df = spark.range(100).toDF()
withTempPath(f => {
df.write.parquet(f.getCanonicalPath)
checkAnswer(sql(s"select id from parquet.`${f.getCanonicalPath}`"),
// data source type is case insensitive
checkAnswer(sql(s"select id from Parquet.`${f.getCanonicalPath}`"),
df)
checkAnswer(sql(s"select id from `org.apache.spark.sql.parquet`.`${f.getCanonicalPath}`"),
df)
Expand All @@ -1260,6 +1261,49 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
})
}

test("run sql directly on files - orc") {
val df = spark.range(100).toDF()
withTempPath(f => {
df.write.orc(f.getCanonicalPath)
// data source type is case insensitive
checkAnswer(sql(s"select id from ORC.`${f.getCanonicalPath}`"),
df)
checkAnswer(sql(s"select id from `org.apache.spark.sql.hive.orc`.`${f.getCanonicalPath}`"),
df)
checkAnswer(sql(s"select a.id from orc.`${f.getCanonicalPath}` as a"),
df)
})
}

test("run sql directly on files - csv") {
val df = spark.range(100).toDF()
withTempPath(f => {
df.write.csv(f.getCanonicalPath)
// data source type is case insensitive
checkAnswer(sql(s"select cast(_c0 as int) id from CSV.`${f.getCanonicalPath}`"),
df)
checkAnswer(
sql(s"select cast(_c0 as int) id from `com.databricks.spark.csv`.`${f.getCanonicalPath}`"),
df)
checkAnswer(sql(s"select cast(a._c0 as int) id from csv.`${f.getCanonicalPath}` as a"),
df)
})
}

test("run sql directly on files - json") {
val df = spark.range(100).toDF()
withTempPath(f => {
df.write.json(f.getCanonicalPath)
// data source type is case insensitive
checkAnswer(sql(s"select id from jsoN.`${f.getCanonicalPath}`"),
df)
checkAnswer(sql(s"select id from `org.apache.spark.sql.json`.`${f.getCanonicalPath}`"),
df)
checkAnswer(sql(s"select a.id from json.`${f.getCanonicalPath}` as a"),
df)
})
}

test("SPARK-8976 Wrong Result for Rollup #1") {
checkAnswer(sql(
"SELECT count(*) AS cnt, key % 5, grouping_id() FROM src GROUP BY key%5 WITH ROLLUP"),
Expand Down

0 comments on commit 9aff6f3

Please sign in to comment.