Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4103][SQL] Clean up SessionState in HiveContext by using ThreadLocal varaible in Hive #2967

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
ba14f28
test
zhzhan Aug 8, 2014
f6a8a40
revert
zhzhan Aug 8, 2014
cb53a2c
Merge branch 'master' of https://github.com/apache/spark
zhzhan Aug 30, 2014
789ea21
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 2, 2014
921e914
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 2, 2014
e4c1982
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 5, 2014
af9feb9
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 10, 2014
1ccd7cc
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 16, 2014
2b0d513
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 17, 2014
3ee3b2b
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 22, 2014
68deb11
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 22, 2014
7e0cc36
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 25, 2014
d10bf00
Merge branch 'master' of https://github.com/apache/spark
zhzhan Sep 30, 2014
adf4924
Merge branch 'master' of https://github.com/apache/spark
zhzhan Oct 1, 2014
cedcc6f
Merge branch 'master' of https://github.com/apache/spark
zhzhan Oct 1, 2014
301eb4a
Merge branch 'master' of https://github.com/apache/spark
zhzhan Oct 5, 2014
a72c0d4
Merge branch 'master' of https://github.com/apache/spark
zhzhan Oct 9, 2014
4a2e36d
Merge branch 'master' of https://github.com/apache/spark
zhzhan Oct 13, 2014
497b0f4
Merge branch 'master' of https://github.com/apache/spark
zhzhan Oct 13, 2014
a00f60f
Merge branch 'master' of https://github.com/apache/spark
zhzhan Oct 16, 2014
a9d372b
Merge branch 'master' of https://github.com/zhzhan/spark
zhzhan Oct 16, 2014
3764505
Merge branch 'master' of https://github.com/apache/spark
zhzhan Oct 17, 2014
93f3081
Merge branch 'master' of https://github.com/apache/spark
zhzhan Oct 22, 2014
ce0ca7b
Merge branch 'master' of https://github.com/apache/spark
zhzhan Oct 24, 2014
12e1be5
Merge branch 'master' of https://github.com/apache/spark
zhzhan Oct 27, 2014
53a26ae
clean up session state in HiveContext. Currently the sessionState in …
zhzhan Oct 27, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ private[hive] object SparkSQLEnv extends Logging {
sparkContext.addSparkListener(new StatsReportListener())

hiveContext = new HiveContext(sparkContext) {
@transient override lazy val sessionState = {
val state = SessionState.get()
@transient lazy val sessionState = {
val state = getSessionState()
setConf(state.getConf.getAllProperties)
state
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,21 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
* in the HiveConf.
*/
@transient lazy val hiveconf = new HiveConf(classOf[SessionState])
@transient protected[hive] lazy val sessionState = {
val ss = new SessionState(hiveconf)
setConf(hiveconf.getAllProperties) // Have SQLConf pick up the initial set of HiveConf.
SessionState.start(ss)
ss.err = new PrintStream(outputBuffer, true, "UTF-8")
ss.out = new PrintStream(outputBuffer, true, "UTF-8")

/**
* If the thread local sessionstate is not set, start a new SessionState
* SessionState.start will put ss to thread local
* @return
*/
def getSessionState() = {
var ss = SessionState.get
if (ss == null) {
ss = new SessionState(hiveconf)
setConf(hiveconf.getAllProperties) // Have SQLConf pick up the initial set of HiveConf.
SessionState.start(ss)
ss.err = new PrintStream(outputBuffer, true, "UTF-8")
ss.out = new PrintStream(outputBuffer, true, "UTF-8")
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use lazy eval, we have to relies on SessionState.start(sessionState) to initialize it. It may be invoked multiple times in various locations. Here we relies on checking ThreadLocal variable and initialize it at the first call to avoid this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Several comments here:

  1. HiveContext.hiveconf should be retrieved from the current SessionState if SessionState.get() is non-null. That's why hiveconf and sessionState are always initialized together in #2887
  2. IO redirection and Hive properties propagation also need to be performed when SessionState.get() returns a non-null SessionState started elsewhere. This is required by Spark SQL CLI since SparkSQLCLIDriver creates a CliSessionState before HiveContext initialization.
  3. I guess you're trying to make HiveContext play well with multi-session scenarios by introducing getSessionState() here? I had also considered this issue earlier, but unfortunately IMO proper multiple Hive session support may require major refactoring over HiveContext (e.g. make it absolutely thread safe). Also, Spark SQL Hive support will probably be reimplemented against the up coming foreign data source API. So I'd rather defer this major task later.

(As a globally used thread local object, Hive SessionState is really annoying...)

ss
}

Expand Down Expand Up @@ -283,6 +291,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
*/
protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = {
try {
// invoke getSessionState to initialize session state if not already done
val sessionState = getSessionState()
val cmd_trimmed: String = cmd.trim()
val tokens: Array[String] = cmd_trimmed.split("\\s+")
val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
tableName: String,
alias: Option[String]): LogicalPlan = synchronized {
val (databaseName, tblName) = processDatabaseAndTableName(
db.getOrElse(hive.sessionState.getCurrentDatabase), tableName)
db.getOrElse(hive.getSessionState
.getCurrentDatabase), tableName)
val table = client.getTable(databaseName, tblName)
val partitions: Seq[Partition] =
if (table.isPartitioned) {
Expand Down Expand Up @@ -112,7 +113,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with

case CreateTableAsSelect(db, tableName, child) =>
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
val databaseName = dbName.getOrElse(hive.getSessionState.getCurrentDatabase)

CreateTableAsSelect(Some(databaseName), tableName, child)
}
Expand Down