Skip to content

Commit

Permalink
[SPARK-11624][SPARK-11972][SQL] fix commands that need hive to exec
Browse files Browse the repository at this point in the history
In SparkSQLCLI, we have created a `CliSessionState`, but then we call `SparkSQLEnv.init()`, which will start another `SessionState`. This would lead to exception because `processCmd` need to get the `CliSessionState` instance by calling `SessionState.get()`, but the return value would be a instance of `SessionState`. See the exception below.

spark-sql> !echo "test";
Exception in thread "main" java.lang.ClassCastException: org.apache.hadoop.hive.ql.session.SessionState cannot be cast to org.apache.hadoop.hive.cli.CliSessionState
	at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:112)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:301)
	at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:242)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:691)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Author: Daoyuan Wang <daoyuan.wang@intel.com>

Closes #9589 from adrian-wang/clicommand.

(cherry picked from commit 5d80fac)
Signed-off-by: Michael Armbrust <michael@databricks.com>

Conflicts:
	sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
  • Loading branch information
adrian-wang authored and marmbrus committed Feb 23, 2016
1 parent 85e6a22 commit f7898f9
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,11 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
val tokens: Array[String] = cmd_trimmed.split("\\s+")
val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
if (cmd_lower.equals("quit") ||
cmd_lower.equals("exit") ||
tokens(0).toLowerCase(Locale.ENGLISH).equals("source") ||
cmd_lower.equals("exit")) {
sessionState.close()
System.exit(0)
}
if (tokens(0).toLowerCase(Locale.ENGLISH).equals("source") ||
cmd_trimmed.startsWith("!") ||
tokens(0).toLowerCase.equals("list") ||
isRemoteMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,4 +234,9 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
-> "Error in query: Table not found: nonexistent_table;"
)
}

test("SPARK-11624 Spark SQL CLI should set sessionState only once") {
runCliWithin(2.minute, Seq("-e", "!echo \"This is a test for Spark-11624\";"))(
"" -> "This is a test for Spark-11624")
}
}
6 changes: 6 additions & 0 deletions sql/hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
-->
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-cli</artifactId>
</dependency>
<!--
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
import scala.language.reflectiveCalls

import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.cli.CliSessionState
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.{Database, FieldSchema}
import org.apache.hadoop.hive.metastore.{TableType => HTableType}
Expand Down Expand Up @@ -172,29 +173,39 @@ private[hive] class ClientWrapper(
}

val ret = try {
val initialConf = new HiveConf(classOf[SessionState])
// HiveConf is a Hadoop Configuration, which has a field of classLoader and
// the initial value will be the current thread's context class loader
// (i.e. initClassLoader at here).
// We call initialConf.setClassLoader(initClassLoader) at here to make
// this action explicit.
initialConf.setClassLoader(initClassLoader)
config.foreach { case (k, v) =>
if (k.toLowerCase.contains("password")) {
logDebug(s"Hive Config: $k=xxx")
} else {
logDebug(s"Hive Config: $k=$v")
// originState will be created if not exists, will never be null
val originalState = SessionState.get()
if (originalState.isInstanceOf[CliSessionState]) {
// In `SparkSQLCLIDriver`, we have already started a `CliSessionState`,
// which contains information like configurations from command line. Later
// we call `SparkSQLEnv.init()` there, which would run into this part again.
// so we should keep `conf` and reuse the existing instance of `CliSessionState`.
originalState
} else {
val initialConf = new HiveConf(classOf[SessionState])
// HiveConf is a Hadoop Configuration, which has a field of classLoader and
// the initial value will be the current thread's context class loader
// (i.e. initClassLoader at here).
// We call initialConf.setClassLoader(initClassLoader) at here to make
// this action explicit.
initialConf.setClassLoader(initClassLoader)
config.foreach { case (k, v) =>
if (k.toLowerCase.contains("password")) {
logDebug(s"Hive Config: $k=xxx")
} else {
logDebug(s"Hive Config: $k=$v")
}
initialConf.set(k, v)
}
initialConf.set(k, v)
}
val state = new SessionState(initialConf)
if (clientLoader.cachedHive != null) {
Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
val state = new SessionState(initialConf)
if (clientLoader.cachedHive != null) {
Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
}
SessionState.start(state)
state.out = new PrintStream(outputBuffer, true, "UTF-8")
state.err = new PrintStream(outputBuffer, true, "UTF-8")
state
}
SessionState.start(state)
state.out = new PrintStream(outputBuffer, true, "UTF-8")
state.err = new PrintStream(outputBuffer, true, "UTF-8")
state
} finally {
Thread.currentThread().setContextClassLoader(original)
}
Expand Down

4 comments on commit f7898f9

@preecet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've recently noticed java.lang.NoClassDefFoundError: org.apache.hadoop.hive.cli.CliSessionState running the org.apache.spark.sql.hive.client.VersionsSuite tests on the 1.6 branch.

This change looks the most likely reason, although I'm not sure why I am seeing this and no-one else.

@adrian-wang
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@preecet I cannot reproduce your issue here.

@adrian-wang
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you provide your error log?

@preecet
Copy link
Contributor

@preecet preecet commented on f7898f9 Mar 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adrian-wang
I put the details in SPARK-13648 and also in the corresponding pull request.

Please sign in to comment.