From 51e61621469392c3b357781230ef2909cf98b7a8 Mon Sep 17 00:00:00 2001 From: scwf Date: Sat, 4 Oct 2014 23:09:18 +0800 Subject: [PATCH] add hiveconf when parse hive ql --- .../spark/sql/hive/ExtendedHiveQlParser.scala | 17 +++++++++++++++-- .../org/apache/spark/sql/hive/HiveContext.scala | 4 ++-- .../org/apache/spark/sql/hive/HiveQl.scala | 16 +++++++++++----- .../org/apache/spark/sql/hive/TestHive.scala | 2 +- .../sql/hive/api/java/JavaHiveContext.scala | 4 ++-- .../apache/spark/sql/hive/StatisticsSuite.scala | 2 +- 6 files changed, 32 insertions(+), 13 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala index e7e1cb980c2ae..6e0131a6c824d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive import scala.language.implicitConversions import scala.util.parsing.combinator.syntactical.StandardTokenParsers import scala.util.parsing.combinator.PackratParsers +import org.apache.hadoop.hive.conf.HiveConf import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.SqlLexical @@ -27,8 +28,20 @@ import org.apache.spark.sql.catalyst.SqlLexical * A parser that recognizes all HiveQL constructs together with several Spark SQL specific * extensions like CACHE TABLE and UNCACHE TABLE. */ -private[hive] class ExtendedHiveQlParser extends StandardTokenParsers with PackratParsers { +private[hive] class ExtendedHiveQlParser extends StandardTokenParsers with PackratParsers { + var hiveConf: HiveConf = _ + var initialized: Boolean = false + + def initialize(hcf: HiveConf): Unit = { + synchronized { + if (!initialized) { + initialized = true + hiveConf = hcf + } + } + } + def apply(input: String): LogicalPlan = { // Special-case out set commands since the value fields can be // complex to handle without RegexParsers. Also this approach @@ -84,7 +97,7 @@ private[hive] class ExtendedHiveQlParser extends StandardTokenParsers with Packr protected lazy val hiveQl: Parser[LogicalPlan] = remainingQuery ^^ { - case r => HiveQl.createPlan(r.trim()) + case r => HiveQl.createPlan(r.trim(), hiveConf) } /** It returns all remaining query */ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 8bcc098bbb620..a1d36c04411c2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -95,7 +95,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { if (dialect == "sql") { super.sql(sqlText) } else if (dialect == "hiveql") { - new SchemaRDD(this, HiveQl.parseSql(sqlText)) + new SchemaRDD(this, HiveQl.parseSql(sqlText, hiveconf)) } else { sys.error(s"Unsupported SQL dialect: $dialect. Try 'sql' or 'hiveql'") } @@ -103,7 +103,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { @deprecated("hiveql() is deprecated as the sql function now parses using HiveQL by default. " + s"The SQL dialect for parsing can be set using ${SQLConf.DIALECT}", "1.1") - def hiveql(hqlQuery: String): SchemaRDD = new SchemaRDD(this, HiveQl.parseSql(hqlQuery)) + def hiveql(hqlQuery: String): SchemaRDD = new SchemaRDD(this, HiveQl.parseSql(hqlQuery, hiveconf)) @deprecated("hql() is deprecated as the sql function now parses using HiveQL by default. " + s"The SQL dialect for parsing can be set using ${SQLConf.DIALECT}", "1.1") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 32c9175f181bb..c97f0cc15a119 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql.hive +import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.lib.Node import org.apache.hadoop.hive.ql.parse._ import org.apache.hadoop.hive.ql.plan.PlanUtils +import org.apache.hadoop.hive.ql.Context import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ @@ -215,15 +217,19 @@ private[hive] object HiveQl { /** * Returns the AST for the given SQL string. */ - def getAst(sql: String): ASTNode = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql)) + def getAst(sql: String, hiveConf: HiveConf): ASTNode = + ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql, new Context(hiveConf))) /** Returns a LogicalPlan for a given HiveQL string. */ - def parseSql(sql: String): LogicalPlan = hiveSqlParser(sql) + def parseSql(sql: String, hiveConf: HiveConf): LogicalPlan = { + hiveSqlParser.hiveConf = hiveConf + hiveSqlParser(sql) + } /** Creates LogicalPlan for a given HiveQL string. */ - def createPlan(sql: String) = { + def createPlan(sql: String, hiveConf: HiveConf) = { try { - val tree = getAst(sql) + val tree = getAst(sql, hiveConf) if (nativeCommands contains tree.getText) { NativeCommand(sql) } else { @@ -237,7 +243,7 @@ private[hive] object HiveQl { case e: NotImplementedError => sys.error( s""" |Unsupported language features in query: $sql - |${dumpTree(getAst(sql))} + |${dumpTree(getAst(sql, hiveConf))} """.stripMargin) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index c0e69393cc2e3..069e76add7bff 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -142,7 +142,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { val describedTable = "DESCRIBE (\\w+)".r protected[hive] class HiveQLQueryExecution(hql: String) extends this.QueryExecution { - lazy val logical = HiveQl.parseSql(hql) + lazy val logical = HiveQl.parseSql(hql, hiveconf) def hiveExec() = runSqlHive(hql) override def toString = hql + "\n" + super.toString } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala index a201d2349a2ef..966649f387e15 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala @@ -34,7 +34,7 @@ class JavaHiveContext(sparkContext: JavaSparkContext) extends JavaSQLContext(spa if (sqlContext.dialect == "sql") { super.sql(sqlText) } else if (sqlContext.dialect == "hiveql") { - new JavaSchemaRDD(sqlContext, HiveQl.parseSql(sqlText)) + new JavaSchemaRDD(sqlContext, HiveQl.parseSql(sqlText, sqlContext.hiveconf)) } else { sys.error(s"Unsupported SQL dialect: ${sqlContext.dialect}. Try 'sql' or 'hiveql'") } @@ -45,5 +45,5 @@ class JavaHiveContext(sparkContext: JavaSparkContext) extends JavaSQLContext(spa */ @Deprecated def hql(hqlQuery: String): JavaSchemaRDD = - new JavaSchemaRDD(sqlContext, HiveQl.parseSql(hqlQuery)) + new JavaSchemaRDD(sqlContext, HiveQl.parseSql(hqlQuery, sqlContext.hiveconf)) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index a35c40efdc207..e47d17bbce67c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -34,7 +34,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { test("parse analyze commands") { def assertAnalyzeCommand(analyzeCommand: String, c: Class[_]) { - val parsed = HiveQl.parseSql(analyzeCommand) + val parsed = HiveQl.parseSql(analyzeCommand, TestHive.hiveconf) val operators = parsed.collect { case a: AnalyzeTable => a case o => o