Skip to content

Commit

Permalink
[SPARK-31957][SQL] Cleanup hive scratch dir for the developer api sta…
Browse files Browse the repository at this point in the history
…rtWithContext
  • Loading branch information
yaooqinn committed Jun 10, 2020
1 parent 43063e2 commit d055d60
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 13 deletions.
Expand Up @@ -52,14 +52,17 @@ object HiveThriftServer2 extends Logging {
*/
@DeveloperApi
def startWithContext(sqlContext: SQLContext): HiveThriftServer2 = {
val server = new HiveThriftServer2(sqlContext)

val executionHive = HiveUtils.newClientForExecution(
sqlContext.sparkContext.conf,
sqlContext.sessionState.newHadoopConf())

// Cleanup the scratch dir before starting
ServerUtils.cleanUpScratchDir(executionHive.conf)
val server = new HiveThriftServer2(sqlContext)

server.init(executionHive.conf)
server.start()
logInfo("HiveThriftServer2 started")
createListenerAndUI(server, sqlContext.sparkContext)
server
}
Expand Down Expand Up @@ -97,18 +100,8 @@ object HiveThriftServer2 extends Logging {
uiTab.foreach(_.detach())
}

val executionHive = HiveUtils.newClientForExecution(
SparkSQLEnv.sqlContext.sparkContext.conf,
SparkSQLEnv.sqlContext.sessionState.newHadoopConf())

try {
// Cleanup the scratch dir before starting
ServerUtils.cleanUpScratchDir(executionHive.conf)
val server = new HiveThriftServer2(SparkSQLEnv.sqlContext)
server.init(executionHive.conf)
server.start()
logInfo("HiveThriftServer2 started")
createListenerAndUI(server, SparkSQLEnv.sparkContext)
startWithContext(SparkSQLEnv.sqlContext)
// If application was killed before HiveThriftServer2 start successfully then SparkSubmit
// process can not exit, so check whether if SparkContext was stopped.
if (SparkSQLEnv.sparkContext.stopped.get()) {
Expand Down
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.hive.thriftserver

import java.io.File
import java.sql.{DriverManager, Statement}

import scala.collection.JavaConverters._
Expand All @@ -27,12 +28,20 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.service.cli.thrift.ThriftCLIService

import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.Utils

trait SharedThriftServer extends SharedSparkSession {

private var hiveServer2: HiveThriftServer2 = _
private var serverPort: Int = 0

protected val tempScratchDir: File = {
val dir = Utils.createTempDir()
dir.setWritable(true, false)
Utils.createTempDir(dir.getAbsolutePath)
dir
}

def mode: ServerMode.Value

override def beforeAll(): Unit = {
Expand Down Expand Up @@ -85,6 +94,9 @@ trait SharedThriftServer extends SharedSparkSession {
sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, "0")
sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname, "0")
sqlContext.setConf(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, mode.toString)
sqlContext.setConf(ConfVars.SCRATCHDIR.varname, tempScratchDir.getAbsolutePath)
sqlContext.setConf(ConfVars.HIVE_START_CLEANUP_SCRATCHDIR.varname, "true")
assert(tempScratchDir.exists())

try {
hiveServer2 = HiveThriftServer2.startWithContext(sqlContext)
Expand All @@ -94,6 +106,9 @@ trait SharedThriftServer extends SharedSparkSession {
logInfo(s"Started HiveThriftServer2: port=$serverPort, attempt=$attempt")
case _ =>
}
// the scratch dir will be recreated after the probe sql `SELECT 1` executed, so we
// check it here first.
assert(!tempScratchDir.exists())

// Wait for thrift server to be ready to serve the query, via executing simple query
// till the query succeeds. See SPARK-30345 for more details.
Expand Down
Expand Up @@ -19,6 +19,10 @@ package org.apache.spark.sql.hive.thriftserver

trait ThriftServerWithSparkContextSuite extends SharedThriftServer {

test("the scratch dir will be deleted during server start but recreated with new operation") {
assert(tempScratchDir.exists())
}

test("SPARK-29911: Uncache cached tables when session closed") {
val cacheManager = spark.sharedState.cacheManager
val globalTempDB = spark.sharedState.globalTempViewManager.database
Expand Down

0 comments on commit d055d60

Please sign in to comment.