diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index ccd3c34bb5c8c..4430bfd3b0380 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -53,7 +53,9 @@ object MimaExcludes { ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.jdbc"), ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.jsonFile"), ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.jsonRDD"), - ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.load") + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.load"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.dialectClassName"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.getSQLDialect") ) ++ Seq( ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.SparkContext.emptyRDD"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.broadcast.HttpBroadcastFactory") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala index c1591ecfe2b4d..4bc721d0b2074 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala @@ -31,7 +31,7 @@ import org.apache.spark.unsafe.types.CalendarInterval import org.apache.spark.util.random.RandomSampler /** - * This class translates a HQL String to a Catalyst [[LogicalPlan]] or [[Expression]]. + * This class translates SQL to Catalyst [[LogicalPlan]]s or [[Expression]]s. */ private[sql] class CatalystQl(val conf: ParserConf = SimpleParserConf()) extends ParserDialect { object Token { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserDialect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserDialect.scala index 7d9fbf2f12ee6..3aa141af63079 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserDialect.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserDialect.scala @@ -17,16 +17,12 @@ package org.apache.spark.sql.catalyst -import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan /** - * Root class of SQL Parser Dialect, and we don't guarantee the binary - * compatibility for the future release, let's keep it as the internal - * interface for advanced user. + * Interface for a parser. */ -@DeveloperApi trait ParserDialect { /** Creates LogicalPlan for a given SQL string. */ def parsePlan(sqlText: String): LogicalPlan diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index dadea6b54a946..9257fba60e36c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -147,7 +147,7 @@ class Analyzer( private def assignAliases(exprs: Seq[NamedExpression]) = { exprs.zipWithIndex.map { case (expr, i) => - expr transform { + expr transformUp { case u @ UnresolvedAlias(child, optionalAliasName) => child match { case ne: NamedExpression => ne case e if !e.resolved => u diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala index d2a90a50c89f4..0d44d1dd9650b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala @@ -38,8 +38,6 @@ package object errors { } } - class DialectException(msg: String, cause: Throwable) extends Exception(msg, cause) - /** * Wraps any exceptions that are thrown while executing `f` in a * [[catalyst.errors.TreeNodeException TreeNodeException]], attaching the provided `tree`. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 97bf7a0cc4514..2ab091e40a073 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -133,6 +133,15 @@ class Column(protected[sql] val expr: Expression) extends Logging { case func: UnresolvedFunction => UnresolvedAlias(func, Some(func.prettyString)) + // If we have a top level Cast, there is a chance to give it a better alias, if there is a + // NamedExpression under this Cast. + case c: Cast => c.transformUp { + case Cast(ne: NamedExpression, to) => UnresolvedAlias(Cast(ne, to)) + } match { + case ne: NamedExpression => ne + case other => Alias(expr, expr.prettyString)() + } + case expr: Expression => Alias(expr, expr.prettyString)() } @@ -921,13 +930,7 @@ class Column(protected[sql] val expr: Expression) extends Logging { * @group expr_ops * @since 1.3.0 */ - def cast(to: DataType): Column = withExpr { - expr match { - // keeps the name of expression if possible when do cast. - case ne: NamedExpression => UnresolvedAlias(Cast(expr, to)) - case _ => Cast(expr, to) - } - } + def cast(to: DataType): Column = withExpr { Cast(expr, to) } /** * Casts the column to a different data type, using the canonical string representation diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 4c1eb0b30b25e..2d664d3ee691b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -278,11 +278,6 @@ private[spark] object SQLConf { doc = "When true, common subexpressions will be eliminated.", isPublic = false) - val DIALECT = stringConf( - "spark.sql.dialect", - defaultValue = Some("sql"), - doc = "The default SQL dialect to use.") - val CASE_SENSITIVE = booleanConf("spark.sql.caseSensitive", defaultValue = Some(true), doc = "Whether the query analyzer should be case sensitive or not.") @@ -524,21 +519,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with ParserCon new java.util.HashMap[String, String]()) /** ************************ Spark SQL Params/Hints ******************* */ - // TODO: refactor so that these hints accessors don't pollute the name space of SQLContext? - - /** - * The SQL dialect that is used when parsing queries. This defaults to 'sql' which uses - * a simple SQL parser provided by Spark SQL. This is currently the only option for users of - * SQLContext. - * - * When using a HiveContext, this value defaults to 'hiveql', which uses the Hive 0.12.0 HiveQL - * parser. Users can change this to 'sql' if they want to run queries that aren't supported by - * HiveQL (e.g., SELECT 1). - * - * Note that the choice of dialect does not affect things like what tables are available or - * how query execution is performed. - */ - private[spark] def dialect: String = getConf(DIALECT) private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 18ddffe1be211..b8c8c78b91a5e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicReference import scala.collection.JavaConverters._ import scala.collection.immutable import scala.reflect.runtime.universe.TypeTag -import scala.util.control.NonFatal import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.annotation.{DeveloperApi, Experimental} @@ -33,13 +32,11 @@ import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.{execution => sparkexecution} import org.apache.spark.sql.SQLConf.SQLConfEntry -import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect, _} +import org.apache.spark.sql.catalyst.{InternalRow, _} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.encoders.encoderFor -import org.apache.spark.sql.catalyst.errors.DialectException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.Optimizer -import org.apache.spark.sql.catalyst.parser.ParserConf import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution._ @@ -206,30 +203,10 @@ class SQLContext private[sql]( protected[sql] lazy val optimizer: Optimizer = new SparkOptimizer(this) @transient - protected[sql] val ddlParser = new DDLParser(sqlParser) + protected[sql] val sqlParser: ParserDialect = new SparkSQLParser(new SparkQl(conf)) @transient - protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect()) - - protected[sql] def getSQLDialect(): ParserDialect = { - try { - val clazz = Utils.classForName(dialectClassName) - clazz.getConstructor(classOf[ParserConf]) - .newInstance(conf) - .asInstanceOf[ParserDialect] - } catch { - case NonFatal(e) => - // Since we didn't find the available SQL Dialect, it will fail even for SET command: - // SET spark.sql.dialect=sql; Let's reset as default dialect automatically. - val dialect = conf.dialect - // reset the sql dialect - conf.unsetConf(SQLConf.DIALECT) - // throw out the exception, and the default sql dialect will take effect for next query. - throw new DialectException( - s"""Instantiating dialect '$dialect' failed. - |Reverting to default dialect '${conf.dialect}'""".stripMargin, e) - } - } + protected[sql] val ddlParser: DDLParser = new DDLParser(sqlParser) protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false) @@ -239,12 +216,6 @@ class SQLContext private[sql]( protected[sql] def executePlan(plan: LogicalPlan) = new sparkexecution.QueryExecution(this, plan) - protected[sql] def dialectClassName = if (conf.dialect == "sql") { - classOf[SparkQl].getCanonicalName - } else { - conf.dialect - } - /** * Add a jar to SQLContext */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 2e2ce88211a08..3cfa3dfd9c7ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -201,13 +201,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm case Some((key, None)) => val runFunc = (sqlContext: SQLContext) => { val value = - try { - if (key == SQLConf.DIALECT.key) { - sqlContext.conf.dialect - } else { - sqlContext.getConf(key) - } - } catch { + try sqlContext.getConf(key) catch { case _: NoSuchElementException => "" } Seq(Row(key, value)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala index 10655a85ccf89..4dea947f6a849 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala @@ -37,9 +37,10 @@ class DDLParser(fallback: => ParserDialect) override def parseExpression(sql: String): Expression = fallback.parseExpression(sql) - override def parseTableIdentifier(sql: String): TableIdentifier = - + override def parseTableIdentifier(sql: String): TableIdentifier = { fallback.parseTableIdentifier(sql) + } + def parse(input: String, exceptionOnError: Boolean): LogicalPlan = { try { parsePlan(input) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 8c2530fd684a7..3a27466176a20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -1064,7 +1064,7 @@ object functions extends LegacyFunctions { * @group normal_funcs */ def expr(expr: String): Column = { - val parser = SQLContext.getActive().map(_.getSQLDialect()).getOrElse(new CatalystQl()) + val parser = SQLContext.getActive().map(_.sqlParser).getOrElse(new CatalystQl()) Column(parser.parseExpression(expr)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index d6c140dfea9ed..afc8df07fd9ab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1007,6 +1007,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { test("SPARK-10743: keep the name of expression if possible when do cast") { val df = (1 to 10).map(Tuple1.apply).toDF("i").as("src") assert(df.select($"src.i".cast(StringType)).columns.head === "i") + assert(df.select($"src.i".cast(StringType).cast(IntegerType)).columns.head === "i") } test("SPARK-11301: fix case sensitivity for filter on partitioned columns") { @@ -1228,4 +1229,10 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { checkAnswer(df.withColumn("col.a", lit("c")), Row("c", "b")) checkAnswer(df.withColumn("col.c", lit("c")), Row("a", "b", "c")) } + + test("SPARK-12841: cast in filter") { + checkAnswer( + Seq(1 -> "a").toDF("i", "j").filter($"i".cast(StringType) === "1"), + Row(1, "a")) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index bdb9421cc19d7..d7f182352b4c9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -23,7 +23,6 @@ import java.sql.Timestamp import org.apache.spark.AccumulatorSuite import org.apache.spark.sql.catalyst.CatalystQl import org.apache.spark.sql.catalyst.analysis.FunctionRegistry -import org.apache.spark.sql.catalyst.errors.DialectException import org.apache.spark.sql.catalyst.parser.ParserConf import org.apache.spark.sql.execution.{aggregate, SparkQl} import org.apache.spark.sql.execution.joins.{CartesianProduct, SortMergeJoin} @@ -32,8 +31,6 @@ import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext} import org.apache.spark.sql.test.SQLTestData._ import org.apache.spark.sql.types._ -/** A SQL Dialect for testing purpose, and it can not be nested type */ -class MyDialect(conf: ParserConf) extends CatalystQl(conf) class SQLQuerySuite extends QueryTest with SharedSQLContext { import testImplicits._ @@ -148,23 +145,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { .count(), Row(24, 1) :: Row(14, 1) :: Nil) } - test("SQL Dialect Switching to a new SQL parser") { - val newContext = new SQLContext(sparkContext) - newContext.setConf("spark.sql.dialect", classOf[MyDialect].getCanonicalName()) - assert(newContext.getSQLDialect().getClass === classOf[MyDialect]) - assert(newContext.sql("SELECT 1").collect() === Array(Row(1))) - } - - test("SQL Dialect Switch to an invalid parser with alias") { - val newContext = new SQLContext(sparkContext) - newContext.sql("SET spark.sql.dialect=MyTestClass") - intercept[DialectException] { - newContext.sql("SELECT 1") - } - // test if the dialect set back to DefaultSQLDialect - assert(newContext.getSQLDialect().getClass === classOf[SparkQl]) - } - test("SPARK-4625 support SORT BY in SimpleSQLParser & DSL") { checkAnswer( sql("SELECT a FROM testData2 SORT BY a"), diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 7bdca52200305..dd536b78c7242 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -542,16 +542,12 @@ class HiveContext private[hive]( } protected[sql] override lazy val conf: SQLConf = new SQLConf { - override def dialect: String = getConf(SQLConf.DIALECT, "hiveql") override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) } - protected[sql] override def getSQLDialect(): ParserDialect = { - if (conf.dialect == "hiveql") { - new ExtendedHiveQlParser(this) - } else { - super.getSQLDialect() - } + @transient + protected[sql] override val sqlParser: ParserDialect = { + new SparkSQLParser(new ExtendedHiveQlParser(this)) } @transient diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 033746d42f557..a33223af24370 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -120,8 +120,6 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { new this.QueryExecution(plan) protected[sql] override lazy val conf: SQLConf = new SQLConf { - // The super.getConf(SQLConf.DIALECT) is "sql" by default, we need to set it as "hiveql" - override def dialect: String = super.getConf(SQLConf.DIALECT, "hiveql") override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) clear() diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 61d5aa7ae6b31..f7d8d395ba648 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -24,7 +24,6 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, FunctionRegistry} -import org.apache.spark.sql.catalyst.errors.DialectException import org.apache.spark.sql.catalyst.parser.ParserConf import org.apache.spark.sql.execution.SparkQl import org.apache.spark.sql.execution.datasources.LogicalRelation @@ -57,8 +56,6 @@ case class WindowData( month: Int, area: String, product: Int) -/** A SQL Dialect for testing purpose, and it can not be nested type */ -class MyDialect(conf: ParserConf) extends HiveQl(conf) /** * A collection of hive query tests where we generate the answers ourselves instead of depending on @@ -337,42 +334,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } - test("SQL dialect at the start of HiveContext") { - val hiveContext = new HiveContext(sqlContext.sparkContext) - val dialectConf = "spark.sql.dialect" - checkAnswer(hiveContext.sql(s"set $dialectConf"), Row(dialectConf, "hiveql")) - assert(hiveContext.getSQLDialect().getClass === classOf[ExtendedHiveQlParser]) - } - - test("SQL Dialect Switching") { - assert(getSQLDialect().getClass === classOf[ExtendedHiveQlParser]) - setConf("spark.sql.dialect", classOf[MyDialect].getCanonicalName()) - assert(getSQLDialect().getClass === classOf[MyDialect]) - assert(sql("SELECT 1").collect() === Array(Row(1))) - - // set the dialect back to the DefaultSQLDialect - sql("SET spark.sql.dialect=sql") - assert(getSQLDialect().getClass === classOf[SparkQl]) - sql("SET spark.sql.dialect=hiveql") - assert(getSQLDialect().getClass === classOf[ExtendedHiveQlParser]) - - // set invalid dialect - sql("SET spark.sql.dialect.abc=MyTestClass") - sql("SET spark.sql.dialect=abc") - intercept[Exception] { - sql("SELECT 1") - } - // test if the dialect set back to HiveQLDialect - getSQLDialect().getClass === classOf[ExtendedHiveQlParser] - - sql("SET spark.sql.dialect=MyTestClass") - intercept[DialectException] { - sql("SELECT 1") - } - // test if the dialect set back to HiveQLDialect - assert(getSQLDialect().getClass === classOf[ExtendedHiveQlParser]) - } - test("CTAS with serde") { sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value").collect() sql(