Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31957][SQL] Cleanup hive scratch dir for the developer api startWithContext #28784

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -52,14 +52,17 @@ object HiveThriftServer2 extends Logging {
*/
@DeveloperApi
def startWithContext(sqlContext: SQLContext): HiveThriftServer2 = {
val server = new HiveThriftServer2(sqlContext)

val executionHive = HiveUtils.newClientForExecution(
sqlContext.sparkContext.conf,
sqlContext.sessionState.newHadoopConf())

// Cleanup the scratch dir before starting
ServerUtils.cleanUpScratchDir(executionHive.conf)
val server = new HiveThriftServer2(sqlContext)

server.init(executionHive.conf)
server.start()
logInfo("HiveThriftServer2 started")
createListenerAndUI(server, sqlContext.sparkContext)
server
}
Expand Down Expand Up @@ -97,18 +100,8 @@ object HiveThriftServer2 extends Logging {
uiTab.foreach(_.detach())
}

val executionHive = HiveUtils.newClientForExecution(
SparkSQLEnv.sqlContext.sparkContext.conf,
SparkSQLEnv.sqlContext.sessionState.newHadoopConf())

try {
// Cleanup the scratch dir before starting
ServerUtils.cleanUpScratchDir(executionHive.conf)
val server = new HiveThriftServer2(SparkSQLEnv.sqlContext)
server.init(executionHive.conf)
server.start()
logInfo("HiveThriftServer2 started")
createListenerAndUI(server, SparkSQLEnv.sparkContext)
startWithContext(SparkSQLEnv.sqlContext)
// If application was killed before HiveThriftServer2 start successfully then SparkSubmit
// process can not exit, so check whether if SparkContext was stopped.
if (SparkSQLEnv.sparkContext.stopped.get()) {
Expand Down
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.hive.thriftserver

import java.io.File
import java.sql.{DriverManager, Statement}

import scala.collection.JavaConverters._
Expand All @@ -28,12 +29,20 @@ import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hive.service.cli.thrift.ThriftCLIService

import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.Utils

trait SharedThriftServer extends SharedSparkSession {

private var hiveServer2: HiveThriftServer2 = _
private var serverPort: Int = 0

protected val tempScratchDir: File = {
val dir = Utils.createTempDir()
dir.setWritable(true, false)
Utils.createTempDir(dir.getAbsolutePath)
dir
}

def mode: ServerMode.Value

override def beforeAll(): Unit = {
Expand Down Expand Up @@ -87,6 +96,9 @@ trait SharedThriftServer extends SharedSparkSession {
sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, "0")
sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname, "0")
sqlContext.setConf(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, mode.toString)
sqlContext.setConf(ConfVars.SCRATCHDIR.varname, tempScratchDir.getAbsolutePath)
sqlContext.setConf(ConfVars.HIVE_START_CLEANUP_SCRATCHDIR.varname, "true")
assert(tempScratchDir.exists())

try {
hiveServer2 = HiveThriftServer2.startWithContext(sqlContext)
Expand All @@ -96,6 +108,9 @@ trait SharedThriftServer extends SharedSparkSession {
logInfo(s"Started HiveThriftServer2: mode=$mode, port=$serverPort, attempt=$attempt")
case _ =>
}
// the scratch dir will be recreated after the probe sql `SELECT 1` executed, so we
// check it here first.
assert(!tempScratchDir.exists())

// Wait for thrift server to be ready to serve the query, via executing simple query
// till the query succeeds. See SPARK-30345 for more details.
Expand Down
Expand Up @@ -19,6 +19,10 @@ package org.apache.spark.sql.hive.thriftserver

trait ThriftServerWithSparkContextSuite extends SharedThriftServer {

test("the scratch dir will be deleted during server start but recreated with new operation") {
assert(tempScratchDir.exists())
}

test("SPARK-29911: Uncache cached tables when session closed") {
val cacheManager = spark.sharedState.cacheManager
val globalTempDB = spark.sharedState.globalTempViewManager.database
Expand Down