From d055d60aab356401f0f7b00b83b76dc76df0c30c Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 10 Jun 2020 22:09:51 +0800 Subject: [PATCH] [SPARK-31957][SQL] Cleanup hive scratch dir for the developer api startWithContext --- .../hive/thriftserver/HiveThriftServer2.scala | 19 ++++++------------- .../thriftserver/SharedThriftServer.scala | 15 +++++++++++++++ .../ThriftServerWithSparkContextSuite.scala | 4 ++++ 3 files changed, 25 insertions(+), 13 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index f9f2ceeed8a7..4e6729faced4 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -52,14 +52,17 @@ object HiveThriftServer2 extends Logging { */ @DeveloperApi def startWithContext(sqlContext: SQLContext): HiveThriftServer2 = { - val server = new HiveThriftServer2(sqlContext) - val executionHive = HiveUtils.newClientForExecution( sqlContext.sparkContext.conf, sqlContext.sessionState.newHadoopConf()) + // Cleanup the scratch dir before starting + ServerUtils.cleanUpScratchDir(executionHive.conf) + val server = new HiveThriftServer2(sqlContext) + server.init(executionHive.conf) server.start() + logInfo("HiveThriftServer2 started") createListenerAndUI(server, sqlContext.sparkContext) server } @@ -97,18 +100,8 @@ object HiveThriftServer2 extends Logging { uiTab.foreach(_.detach()) } - val executionHive = HiveUtils.newClientForExecution( - SparkSQLEnv.sqlContext.sparkContext.conf, - SparkSQLEnv.sqlContext.sessionState.newHadoopConf()) - try { - // Cleanup the scratch dir before starting - ServerUtils.cleanUpScratchDir(executionHive.conf) - val server = new HiveThriftServer2(SparkSQLEnv.sqlContext) - server.init(executionHive.conf) - server.start() - logInfo("HiveThriftServer2 started") - createListenerAndUI(server, SparkSQLEnv.sparkContext) + startWithContext(SparkSQLEnv.sqlContext) // If application was killed before HiveThriftServer2 start successfully then SparkSubmit // process can not exit, so check whether if SparkContext was stopped. if (SparkSQLEnv.sparkContext.stopped.get()) { diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala index c9e41db52cd5..d082358895fa 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hive.thriftserver +import java.io.File import java.sql.{DriverManager, Statement} import scala.collection.JavaConverters._ @@ -27,12 +28,20 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hive.service.cli.thrift.ThriftCLIService import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.util.Utils trait SharedThriftServer extends SharedSparkSession { private var hiveServer2: HiveThriftServer2 = _ private var serverPort: Int = 0 + protected val tempScratchDir: File = { + val dir = Utils.createTempDir() + dir.setWritable(true, false) + Utils.createTempDir(dir.getAbsolutePath) + dir + } + def mode: ServerMode.Value override def beforeAll(): Unit = { @@ -85,6 +94,9 @@ trait SharedThriftServer extends SharedSparkSession { sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, "0") sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname, "0") sqlContext.setConf(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname, mode.toString) + sqlContext.setConf(ConfVars.SCRATCHDIR.varname, tempScratchDir.getAbsolutePath) + sqlContext.setConf(ConfVars.HIVE_START_CLEANUP_SCRATCHDIR.varname, "true") + assert(tempScratchDir.exists()) try { hiveServer2 = HiveThriftServer2.startWithContext(sqlContext) @@ -94,6 +106,9 @@ trait SharedThriftServer extends SharedSparkSession { logInfo(s"Started HiveThriftServer2: port=$serverPort, attempt=$attempt") case _ => } + // the scratch dir will be recreated after the probe sql `SELECT 1` executed, so we + // check it here first. + assert(!tempScratchDir.exists()) // Wait for thrift server to be ready to serve the query, via executing simple query // till the query succeeds. See SPARK-30345 for more details. diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala index d6420dee41ad..1382eb2d79f3 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala @@ -19,6 +19,10 @@ package org.apache.spark.sql.hive.thriftserver trait ThriftServerWithSparkContextSuite extends SharedThriftServer { + test("the scratch dir will be deleted during server start but recreated with new operation") { + assert(tempScratchDir.exists()) + } + test("SPARK-29911: Uncache cached tables when session closed") { val cacheManager = spark.sharedState.cacheManager val globalTempDB = spark.sharedState.globalTempViewManager.database