Skip to content

Commit

Permalink
[SPARK-11089][SQL] Adds option for disabling multi-session in Thrift …
Browse files Browse the repository at this point in the history
…server

This PR adds a new option `spark.sql.hive.thriftServer.singleSession` for disabling multi-session support in the Thrift server.

Note that this option is added as a Spark configuration (retrieved from `SparkConf`) rather than Spark SQL configuration (retrieved from `SQLConf`). This is because all SQL configurations are session-ized. Since multi-session support is by default on, no JDBC connection can modify global configurations like the newly added one.

Author: Cheng Lian <lian@databricks.com>

Closes #9740 from liancheng/spark-11089.single-session-option.

(cherry picked from commit 7b1407c)
Signed-off-by: Michael Armbrust <michael@databricks.com>
  • Loading branch information
liancheng authored and marmbrus committed Nov 17, 2015
1 parent a3f0c77 commit 167ea61
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 2 deletions.
14 changes: 14 additions & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -2051,6 +2051,20 @@ options.

# Migration Guide

## Upgrading From Spark SQL 1.5 to 1.6

- From Spark 1.6, by default the Thrift server runs in multi-session mode. Which means each JDBC/ODBC
connection owns a copy of their own SQL configuration and temporary function registry. Cached
tables are still shared though. If you prefer to run the Thrift server in the old single-session
mode, please set option `spark.sql.hive.thriftServer.singleSession` to `true`. You may either add
this option to `spark-defaults.conf`, or pass it to `start-thriftserver.sh` via `--conf`:

{% highlight bash %}
./sbin/start-thriftserver.sh \
--conf spark.sql.hive.thriftServer.singleSession=true \
...
{% endhighlight %}

## Upgrading From Spark SQL 1.4 to 1.5

- Optimized execution using manually managed memory (Tungsten) is now enabled by default, along with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext:
val session = super.getSession(sessionHandle)
HiveThriftServer2.listener.onSessionCreated(
session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername)
val ctx = hiveContext.newSession()
val ctx = if (hiveContext.hiveThriftServerSingleSession) {
hiveContext
} else {
hiveContext.newSession()
}
ctx.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)
sparkSqlOperationManager.sessionToContexts += sessionHandle -> ctx
sessionHandle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import org.apache.thrift.transport.TSocket
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SparkFunSuite}
Expand Down Expand Up @@ -510,6 +509,53 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}
}

class SingleSessionSuite extends HiveThriftJdbcTest {
override def mode: ServerMode.Value = ServerMode.binary

override protected def extraConf: Seq[String] =
"--conf spark.sql.hive.thriftServer.singleSession=true" :: Nil

test("test single session") {
withMultipleConnectionJdbcStatement(
{ statement =>
val jarPath = "../hive/src/test/resources/TestUDTF.jar"
val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath"

// Configurations and temporary functions added in this session should be visible to all
// the other sessions.
Seq(
"SET foo=bar",
s"ADD JAR $jarURL",
s"""CREATE TEMPORARY FUNCTION udtf_count2
|AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
""".stripMargin
).foreach(statement.execute)
},

{ statement =>
val rs1 = statement.executeQuery("SET foo")

assert(rs1.next())
assert(rs1.getString(1) === "foo")
assert(rs1.getString(2) === "bar")

val rs2 = statement.executeQuery("DESCRIBE FUNCTION udtf_count2")

assert(rs2.next())
assert(rs2.getString(1) === "Function: udtf_count2")

assert(rs2.next())
assertResult("Class: org.apache.spark.sql.hive.execution.GenericUDTFCount2") {
rs2.getString(1)
}

assert(rs2.next())
assert(rs2.getString(1) === "Usage: To be added.")
}
)
}
}

class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
override def mode: ServerMode.Value = ServerMode.http

Expand Down Expand Up @@ -600,6 +646,8 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
private var logTailingProcess: Process = _
private var diagnosisBuffer: ArrayBuffer[String] = ArrayBuffer.empty[String]

protected def extraConf: Seq[String] = Nil

protected def serverStartCommand(port: Int) = {
val portConf = if (mode == ServerMode.binary) {
ConfVars.HIVE_SERVER2_THRIFT_PORT
Expand Down Expand Up @@ -635,6 +683,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
| --driver-class-path $driverClassPath
| --driver-java-options -Dlog4j.debug
| --conf spark.ui.enabled=false
| ${extraConf.mkString("\n")}
""".stripMargin.split("\\s+").toSeq
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ class HiveContext private[hive](
*/
protected[hive] def hiveThriftServerAsync: Boolean = getConf(HIVE_THRIFT_SERVER_ASYNC)

protected[hive] def hiveThriftServerSingleSession: Boolean =
sc.conf.get("spark.sql.hive.thriftServer.singleSession", "false").toBoolean

@transient
protected[sql] lazy val substitutor = new VariableSubstitution()

Expand Down

0 comments on commit 167ea61

Please sign in to comment.