From c5ac3106dadcc53b834c80c316325dbae0b37a2c Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Fri, 21 Aug 2015 17:13:32 +0900 Subject: [PATCH] [SPARK-10152] [SQL] Support Init script for hive-thriftserver --- .../hive/thriftserver/HiveThriftServer2.scala | 44 ++++++++++++++++++- .../apache/spark/sql/hive/HiveContext.scala | 6 ++- 2 files changed, 47 insertions(+), 3 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index dd9fef9206d0b..7e0cd7b82193f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver import java.util.Locale import java.util.concurrent.atomic.AtomicBoolean +import java.io.{FileReader, BufferedReader} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -35,7 +36,7 @@ import org.apache.spark.sql.SQLConf import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab -import org.apache.spark.util.{ShutdownHookManager, Utils} +import org.apache.spark.util.ShutdownHookManager import org.apache.spark.{Logging, SparkContext} @@ -67,6 +68,46 @@ object HiveThriftServer2 extends Logging { } } + def runScript(sqlContext: HiveContext) { + val hiveConf = sqlContext.hiveconf + val script = hiveConf.get("hive.server2.init.script", null) + if (script != null && !script.isEmpty) { + getCommands(script).foreach { c => + try { + LOG.warn("Running.. " + c) + sqlContext.runSqlHive(c).foreach { o => LOG.warn(o) } + } catch { + case t: Throwable => LOG.warn("Failed to run command " + c, t) + } + } + } + } + + private def getCommands(input: String): Seq[String] = { + LOG.warn("Reading script " + input) + val reader: BufferedReader = new BufferedReader(new FileReader(input)) + + val commands = new mutable.ArrayBuffer[String] + val builder = new StringBuilder + var line = reader.readLine() + while (line != null) { + val next = line.trim.endsWith(";") + if (next) { + line = line.substring(0, line.lastIndexOf(';')) + } + builder.append(line).append('\n') + if (next) { + commands += builder.toString + builder.setLength(0) + } + line = reader.readLine() + } + if (builder.nonEmpty) { + commands += builder.toString + } + commands + } + def main(args: Array[String]) { val optionsProcessor = new HiveServerServerOptionsProcessor("HiveThriftServer2") if (!optionsProcessor.process(args)) { @@ -86,6 +127,7 @@ object HiveThriftServer2 extends Logging { server.init(SparkSQLEnv.hiveContext.hiveconf) server.start() logInfo("HiveThriftServer2 started") + runScript(SparkSQLEnv.hiveContext) listener = new HiveThriftServer2Listener(server, SparkSQLEnv.hiveContext.conf) SparkSQLEnv.sparkContext.addSparkListener(listener) uiTab = if (SparkSQLEnv.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 17cc83087fb1d..809807603a228 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -549,9 +549,11 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging { } protected[hive] def runSqlHive(sql: String): Seq[String] = { - if (sql.toLowerCase.contains("create temporary function")) { + val normalized = sql.trim.toLowerCase + if (normalized.contains("create temporary function") || + normalized.contains("create temporary macro")) { executionHive.runSqlHive(sql) - } else if (sql.trim.toLowerCase.startsWith("set")) { + } else if (normalized.startsWith("set")) { metadataHive.runSqlHive(sql) executionHive.runSqlHive(sql) } else {