Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into get-or-create-m…
Browse files Browse the repository at this point in the history
…etrics
  • Loading branch information
Andrew Or committed Jan 18, 2016
2 parents e99b9af + 4f11e3f commit 202d48e
Show file tree
Hide file tree
Showing 16 changed files with 34 additions and 147 deletions.
4 changes: 3 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ object MimaExcludes {
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.jdbc"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.jsonFile"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.jsonRDD"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.load")
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.load"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.dialectClassName"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.getSQLDialect")
) ++ Seq(
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.SparkContext.emptyRDD"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.broadcast.HttpBroadcastFactory")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.random.RandomSampler

/**
* This class translates a HQL String to a Catalyst [[LogicalPlan]] or [[Expression]].
* This class translates SQL to Catalyst [[LogicalPlan]]s or [[Expression]]s.
*/
private[sql] class CatalystQl(val conf: ParserConf = SimpleParserConf()) extends ParserDialect {
object Token {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,12 @@

package org.apache.spark.sql.catalyst

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

/**
* Root class of SQL Parser Dialect, and we don't guarantee the binary
* compatibility for the future release, let's keep it as the internal
* interface for advanced user.
* Interface for a parser.
*/
@DeveloperApi
trait ParserDialect {
/** Creates LogicalPlan for a given SQL string. */
def parsePlan(sqlText: String): LogicalPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class Analyzer(
private def assignAliases(exprs: Seq[NamedExpression]) = {
exprs.zipWithIndex.map {
case (expr, i) =>
expr transform {
expr transformUp {
case u @ UnresolvedAlias(child, optionalAliasName) => child match {
case ne: NamedExpression => ne
case e if !e.resolved => u
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ package object errors {
}
}

class DialectException(msg: String, cause: Throwable) extends Exception(msg, cause)

/**
* Wraps any exceptions that are thrown while executing `f` in a
* [[catalyst.errors.TreeNodeException TreeNodeException]], attaching the provided `tree`.
Expand Down
17 changes: 10 additions & 7 deletions sql/core/src/main/scala/org/apache/spark/sql/Column.scala
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,15 @@ class Column(protected[sql] val expr: Expression) extends Logging {

case func: UnresolvedFunction => UnresolvedAlias(func, Some(func.prettyString))

// If we have a top level Cast, there is a chance to give it a better alias, if there is a
// NamedExpression under this Cast.
case c: Cast => c.transformUp {
case Cast(ne: NamedExpression, to) => UnresolvedAlias(Cast(ne, to))
} match {
case ne: NamedExpression => ne
case other => Alias(expr, expr.prettyString)()
}

case expr: Expression => Alias(expr, expr.prettyString)()
}

Expand Down Expand Up @@ -921,13 +930,7 @@ class Column(protected[sql] val expr: Expression) extends Logging {
* @group expr_ops
* @since 1.3.0
*/
def cast(to: DataType): Column = withExpr {
expr match {
// keeps the name of expression if possible when do cast.
case ne: NamedExpression => UnresolvedAlias(Cast(expr, to))
case _ => Cast(expr, to)
}
}
def cast(to: DataType): Column = withExpr { Cast(expr, to) }

/**
* Casts the column to a different data type, using the canonical string representation
Expand Down
20 changes: 0 additions & 20 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,6 @@ private[spark] object SQLConf {
doc = "When true, common subexpressions will be eliminated.",
isPublic = false)

val DIALECT = stringConf(
"spark.sql.dialect",
defaultValue = Some("sql"),
doc = "The default SQL dialect to use.")

val CASE_SENSITIVE = booleanConf("spark.sql.caseSensitive",
defaultValue = Some(true),
doc = "Whether the query analyzer should be case sensitive or not.")
Expand Down Expand Up @@ -524,21 +519,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with ParserCon
new java.util.HashMap[String, String]())

/** ************************ Spark SQL Params/Hints ******************* */
// TODO: refactor so that these hints accessors don't pollute the name space of SQLContext?

/**
* The SQL dialect that is used when parsing queries. This defaults to 'sql' which uses
* a simple SQL parser provided by Spark SQL. This is currently the only option for users of
* SQLContext.
*
* When using a HiveContext, this value defaults to 'hiveql', which uses the Hive 0.12.0 HiveQL
* parser. Users can change this to 'sql' if they want to run queries that aren't supported by
* HiveQL (e.g., SELECT 1).
*
* Note that the choice of dialect does not affect things like what tables are available or
* how query execution is performed.
*/
private[spark] def dialect: String = getConf(DIALECT)

private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED)

Expand Down
35 changes: 3 additions & 32 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicReference
import scala.collection.JavaConverters._
import scala.collection.immutable
import scala.reflect.runtime.universe.TypeTag
import scala.util.control.NonFatal

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
Expand All @@ -33,13 +32,11 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.{execution => sparkexecution}
import org.apache.spark.sql.SQLConf.SQLConfEntry
import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect, _}
import org.apache.spark.sql.catalyst.{InternalRow, _}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.encoders.encoderFor
import org.apache.spark.sql.catalyst.errors.DialectException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserConf
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution._
Expand Down Expand Up @@ -206,30 +203,10 @@ class SQLContext private[sql](
protected[sql] lazy val optimizer: Optimizer = new SparkOptimizer(this)

@transient
protected[sql] val ddlParser = new DDLParser(sqlParser)
protected[sql] val sqlParser: ParserDialect = new SparkSQLParser(new SparkQl(conf))

@transient
protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect())

protected[sql] def getSQLDialect(): ParserDialect = {
try {
val clazz = Utils.classForName(dialectClassName)
clazz.getConstructor(classOf[ParserConf])
.newInstance(conf)
.asInstanceOf[ParserDialect]
} catch {
case NonFatal(e) =>
// Since we didn't find the available SQL Dialect, it will fail even for SET command:
// SET spark.sql.dialect=sql; Let's reset as default dialect automatically.
val dialect = conf.dialect
// reset the sql dialect
conf.unsetConf(SQLConf.DIALECT)
// throw out the exception, and the default sql dialect will take effect for next query.
throw new DialectException(
s"""Instantiating dialect '$dialect' failed.
|Reverting to default dialect '${conf.dialect}'""".stripMargin, e)
}
}
protected[sql] val ddlParser: DDLParser = new DDLParser(sqlParser)

protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false)

Expand All @@ -239,12 +216,6 @@ class SQLContext private[sql](
protected[sql] def executePlan(plan: LogicalPlan) =
new sparkexecution.QueryExecution(this, plan)

protected[sql] def dialectClassName = if (conf.dialect == "sql") {
classOf[SparkQl].getCanonicalName
} else {
conf.dialect
}

/**
* Add a jar to SQLContext
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
case Some((key, None)) =>
val runFunc = (sqlContext: SQLContext) => {
val value =
try {
if (key == SQLConf.DIALECT.key) {
sqlContext.conf.dialect
} else {
sqlContext.getConf(key)
}
} catch {
try sqlContext.getConf(key) catch {
case _: NoSuchElementException => "<undefined>"
}
Seq(Row(key, value))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ class DDLParser(fallback: => ParserDialect)

override def parseExpression(sql: String): Expression = fallback.parseExpression(sql)

override def parseTableIdentifier(sql: String): TableIdentifier =

override def parseTableIdentifier(sql: String): TableIdentifier = {
fallback.parseTableIdentifier(sql)
}

def parse(input: String, exceptionOnError: Boolean): LogicalPlan = {
try {
parsePlan(input)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1064,7 +1064,7 @@ object functions extends LegacyFunctions {
* @group normal_funcs
*/
def expr(expr: String): Column = {
val parser = SQLContext.getActive().map(_.getSQLDialect()).getOrElse(new CatalystQl())
val parser = SQLContext.getActive().map(_.sqlParser).getOrElse(new CatalystQl())
Column(parser.parseExpression(expr))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
test("SPARK-10743: keep the name of expression if possible when do cast") {
val df = (1 to 10).map(Tuple1.apply).toDF("i").as("src")
assert(df.select($"src.i".cast(StringType)).columns.head === "i")
assert(df.select($"src.i".cast(StringType).cast(IntegerType)).columns.head === "i")
}

test("SPARK-11301: fix case sensitivity for filter on partitioned columns") {
Expand Down Expand Up @@ -1228,4 +1229,10 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
checkAnswer(df.withColumn("col.a", lit("c")), Row("c", "b"))
checkAnswer(df.withColumn("col.c", lit("c")), Row("a", "b", "c"))
}

test("SPARK-12841: cast in filter") {
checkAnswer(
Seq(1 -> "a").toDF("i", "j").filter($"i".cast(StringType) === "1"),
Row(1, "a"))
}
}
20 changes: 0 additions & 20 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import java.sql.Timestamp
import org.apache.spark.AccumulatorSuite
import org.apache.spark.sql.catalyst.CatalystQl
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.errors.DialectException
import org.apache.spark.sql.catalyst.parser.ParserConf
import org.apache.spark.sql.execution.{aggregate, SparkQl}
import org.apache.spark.sql.execution.joins.{CartesianProduct, SortMergeJoin}
Expand All @@ -32,8 +31,6 @@ import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext}
import org.apache.spark.sql.test.SQLTestData._
import org.apache.spark.sql.types._

/** A SQL Dialect for testing purpose, and it can not be nested type */
class MyDialect(conf: ParserConf) extends CatalystQl(conf)

class SQLQuerySuite extends QueryTest with SharedSQLContext {
import testImplicits._
Expand Down Expand Up @@ -148,23 +145,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
.count(), Row(24, 1) :: Row(14, 1) :: Nil)
}

test("SQL Dialect Switching to a new SQL parser") {
val newContext = new SQLContext(sparkContext)
newContext.setConf("spark.sql.dialect", classOf[MyDialect].getCanonicalName())
assert(newContext.getSQLDialect().getClass === classOf[MyDialect])
assert(newContext.sql("SELECT 1").collect() === Array(Row(1)))
}

test("SQL Dialect Switch to an invalid parser with alias") {
val newContext = new SQLContext(sparkContext)
newContext.sql("SET spark.sql.dialect=MyTestClass")
intercept[DialectException] {
newContext.sql("SELECT 1")
}
// test if the dialect set back to DefaultSQLDialect
assert(newContext.getSQLDialect().getClass === classOf[SparkQl])
}

test("SPARK-4625 support SORT BY in SimpleSQLParser & DSL") {
checkAnswer(
sql("SELECT a FROM testData2 SORT BY a"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,16 +542,12 @@ class HiveContext private[hive](
}

protected[sql] override lazy val conf: SQLConf = new SQLConf {
override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
}

protected[sql] override def getSQLDialect(): ParserDialect = {
if (conf.dialect == "hiveql") {
new ExtendedHiveQlParser(this)
} else {
super.getSQLDialect()
}
@transient
protected[sql] override val sqlParser: ParserDialect = {
new SparkSQLParser(new ExtendedHiveQlParser(this))
}

@transient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,6 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
new this.QueryExecution(plan)

protected[sql] override lazy val conf: SQLConf = new SQLConf {
// The super.getConf(SQLConf.DIALECT) is "sql" by default, we need to set it as "hiveql"
override def dialect: String = super.getConf(SQLConf.DIALECT, "hiveql")
override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)

clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, FunctionRegistry}
import org.apache.spark.sql.catalyst.errors.DialectException
import org.apache.spark.sql.catalyst.parser.ParserConf
import org.apache.spark.sql.execution.SparkQl
import org.apache.spark.sql.execution.datasources.LogicalRelation
Expand Down Expand Up @@ -57,8 +56,6 @@ case class WindowData(
month: Int,
area: String,
product: Int)
/** A SQL Dialect for testing purpose, and it can not be nested type */
class MyDialect(conf: ParserConf) extends HiveQl(conf)

/**
* A collection of hive query tests where we generate the answers ourselves instead of depending on
Expand Down Expand Up @@ -337,42 +334,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}

test("SQL dialect at the start of HiveContext") {
val hiveContext = new HiveContext(sqlContext.sparkContext)
val dialectConf = "spark.sql.dialect"
checkAnswer(hiveContext.sql(s"set $dialectConf"), Row(dialectConf, "hiveql"))
assert(hiveContext.getSQLDialect().getClass === classOf[ExtendedHiveQlParser])
}

test("SQL Dialect Switching") {
assert(getSQLDialect().getClass === classOf[ExtendedHiveQlParser])
setConf("spark.sql.dialect", classOf[MyDialect].getCanonicalName())
assert(getSQLDialect().getClass === classOf[MyDialect])
assert(sql("SELECT 1").collect() === Array(Row(1)))

// set the dialect back to the DefaultSQLDialect
sql("SET spark.sql.dialect=sql")
assert(getSQLDialect().getClass === classOf[SparkQl])
sql("SET spark.sql.dialect=hiveql")
assert(getSQLDialect().getClass === classOf[ExtendedHiveQlParser])

// set invalid dialect
sql("SET spark.sql.dialect.abc=MyTestClass")
sql("SET spark.sql.dialect=abc")
intercept[Exception] {
sql("SELECT 1")
}
// test if the dialect set back to HiveQLDialect
getSQLDialect().getClass === classOf[ExtendedHiveQlParser]

sql("SET spark.sql.dialect=MyTestClass")
intercept[DialectException] {
sql("SELECT 1")
}
// test if the dialect set back to HiveQLDialect
assert(getSQLDialect().getClass === classOf[ExtendedHiveQlParser])
}

test("CTAS with serde") {
sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value").collect()
sql(
Expand Down

0 comments on commit 202d48e

Please sign in to comment.