Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1428,7 +1428,7 @@ test_that("sampleBy() on a DataFrame", {

test_that("SQL error message is returned from JVM", {
retError <- tryCatch(sql(sqlContext, "select * from blah"), error = function(e) e)
expect_equal(grepl("Table Not Found: blah", retError), TRUE)
expect_equal(grepl("Table not found: blah", retError), TRUE)
})

test_that("Method as.data.frame as a synonym for collect()", {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,21 @@ class Analyzer(
catalog.lookupRelation(u.tableIdentifier, u.alias)
} catch {
case _: NoSuchTableException =>
u.failAnalysis(s"Table Not Found: ${u.tableName}")
u.failAnalysis(s"Table not found: ${u.tableName}")
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
i.copy(table = EliminateSubQueries(getTable(u)))
case u: UnresolvedRelation =>
getTable(u)
try {
getTable(u)
} catch {
case _: AnalysisException if u.tableIdentifier.database.isDefined =>
// delay the exception into CheckAnalysis, then it could be resolved as data source.
u
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ trait CheckAnalysis {
plan.foreachUp {
case p if p.analyzed => // Skip already analyzed sub-plans

case u: UnresolvedRelation =>
u.failAnalysis(s"Table not found: ${u.tableIdentifier}")

case operator: LogicalPlan =>
operator transformExpressionsUp {
case a: Attribute if !a.resolved =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class AnalysisSuite extends AnalysisTest {

test("resolve relations") {
assertAnalysisError(
UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq("Table Not Found: tAbLe"))
UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq("Table not found: tAbLe"))

checkAnalysis(UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation)

Expand Down
8 changes: 8 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,12 @@ private[spark] object SQLConf {
val USE_SQL_AGGREGATE2 = booleanConf("spark.sql.useAggregate2",
defaultValue = Some(true), doc = "<TODO>")

val RUN_SQL_ON_FILES = booleanConf("spark.sql.runSQLOnFiles",
defaultValue = Some(true),
isPublic = false,
doc = "When true, we could use `datasource`.`path` as table in SQL query"
)

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
val EXTERNAL_SORT = "spark.sql.planner.externalSort"
Expand Down Expand Up @@ -540,6 +546,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf {

private[spark] def dataFrameRetainGroupColumns: Boolean = getConf(DATAFRAME_RETAIN_GROUP_COLUMNS)

private[spark] def runSQLOnFile: Boolean = getConf(RUN_SQL_ON_FILES)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class SQLContext private[sql](
override val extendedResolutionRules =
ExtractPythonUDFs ::
PreInsertCastAndRename ::
Nil
(if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil)

override val extendedCheckRules = Seq(
datasources.PreWriteCheck(catalog)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,37 @@

package org.apache.spark.sql.execution.datasources

import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.catalyst.analysis.{Catalog, EliminateSubQueries}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation, InsertableRelation}
import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}

/**
* Try to replaces [[UnresolvedRelation]]s with [[ResolvedDataSource]].
*/
private[sql] class ResolveDataSource(sqlContext: SQLContext) extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case u: UnresolvedRelation if u.tableIdentifier.database.isDefined =>
try {
val resolved = ResolvedDataSource(
sqlContext,
userSpecifiedSchema = None,
partitionColumns = Array(),
provider = u.tableIdentifier.database.get,
options = Map("path" -> u.tableIdentifier.table))
val plan = LogicalRelation(resolved.relation)
u.alias.map(a => Subquery(u.alias.get, plan)).getOrElse(plan)
} catch {
case e: ClassNotFoundException => u
case e: Exception =>
// the provider is valid, but failed to create a logical plan
u.failAnalysis(e.getMessage)
}
}
}

/**
* A rule to do pre-insert data type casting and field renaming. Before we insert into
Expand Down
28 changes: 28 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1782,6 +1782,34 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}

test("run sql directly on files") {
val df = sqlContext.range(100)
withTempPath(f => {
df.write.json(f.getCanonicalPath)
checkAnswer(sql(s"select id from json.`${f.getCanonicalPath}`"),
df)
checkAnswer(sql(s"select id from `org.apache.spark.sql.json`.`${f.getCanonicalPath}`"),
df)
checkAnswer(sql(s"select a.id from json.`${f.getCanonicalPath}` as a"),
df)
})

val e1 = intercept[AnalysisException] {
sql("select * from in_valid_table")
}
assert(e1.message.contains("Table not found"))

val e2 = intercept[AnalysisException] {
sql("select * from no_db.no_table")
}
assert(e2.message.contains("Table not found"))

val e3 = intercept[AnalysisException] {
sql("select * from json.invalid_file")
}
assert(e3.message.contains("No input paths specified"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error message seems confusing - there was a path specified?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the existing behavior, different data source will have different messages. This error message came from Hadoop, I think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No valid files found at specified path: '$path'?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

test("SortMergeJoin returns wrong results when using UnsafeRows") {
// This test is for the fix of https://issues.apache.org/jira/browse/SPARK-10737.
// This bug will be triggered when Tungsten is enabled and there are multiple
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect, SqlParser}
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, PreInsertCastAndRename, PreWriteCheck}
import org.apache.spark.sql.execution.datasources.{ResolveDataSource, DataSourceStrategy, PreInsertCastAndRename, PreWriteCheck}
import org.apache.spark.sql.execution.ui.SQLListener
import org.apache.spark.sql.execution.{CacheManager, ExecutedCommand, ExtractPythonUDFs, SetCommand}
import org.apache.spark.sql.hive.client._
Expand Down Expand Up @@ -473,7 +473,7 @@ class HiveContext private[hive](
ExtractPythonUDFs ::
ResolveHiveWindowFunction ::
PreInsertCastAndRename ::
Nil
(if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil)

override val extendedCheckRules = Seq(
PreWriteCheck(catalog)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,19 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}

test("run sql directly on files") {
val df = sqlContext.range(100)
withTempPath(f => {
df.write.parquet(f.getCanonicalPath)
checkAnswer(sql(s"select id from parquet.`${f.getCanonicalPath}`"),
df)
checkAnswer(sql(s"select id from `org.apache.spark.sql.parquet`.`${f.getCanonicalPath}`"),
df)
checkAnswer(sql(s"select a.id from parquet.`${f.getCanonicalPath}` as a"),
df)
})
}

test("correctly parse CREATE VIEW statement") {
withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
withTable("jt") {
Expand Down