Skip to content

Commit

Permalink
fix code review
Browse files Browse the repository at this point in the history
  • Loading branch information
zuotingbing committed Mar 5, 2018
1 parent d965ba2 commit 93e87f5
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 5 deletions.
Expand Up @@ -672,10 +672,7 @@ private void cleanupPipeoutFile() {
String sessionID = hiveConf.getVar(ConfVars.HIVESESSIONID);

File[] fileAry = new File(lScratchDir).listFiles(
file -> {
String name = file.getName();
return (name.startsWith(sessionID) && name.endsWith(".pipeout"));
});
(dir, name) -> name.startsWith(sessionID) && name.endsWith(".pipeout"));

for (File file : fileAry) {
try {
Expand Down
Expand Up @@ -17,10 +17,11 @@

package org.apache.spark.sql.hive.thriftserver

import java.io.File
import java.io.{File, FilenameFilter}
import java.net.URL
import java.nio.charset.StandardCharsets
import java.sql.{Date, DriverManager, SQLException, Statement}
import java.util.UUID

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -613,6 +614,28 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
bufferSrc.close()
}
}

test("SPARK-23547 Cleanup the .pipeout file when the Hive Session closed") {
def pipeoutFileList(sessionID: UUID): Array[File] = {
lScratchDir.listFiles(new FilenameFilter {
override def accept(dir: File, name: String): Boolean = {
name.startsWith(sessionID.toString) && name.endsWith(".pipeout")
}
})
}

withCLIServiceClient { client =>
val user = System.getProperty("user.name")
val sessionHandle = client.openSession(user, "")
val sessionID = sessionHandle.getSessionId

assert(pipeoutFileList(sessionID).length == 1)

client.closeSession(sessionHandle)

assert(pipeoutFileList(sessionID).length == 0)
}
}
}

class SingleSessionSuite extends HiveThriftJdbcTest {
Expand Down Expand Up @@ -807,6 +830,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
private val pidDir: File = Utils.createTempDir("thriftserver-pid")
protected var logPath: File = _
protected var operationLogPath: File = _
protected var lScratchDir: File = _
private var logTailingProcess: Process = _
private var diagnosisBuffer: ArrayBuffer[String] = ArrayBuffer.empty[String]

Expand Down Expand Up @@ -844,6 +868,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=localhost
| --hiveconf ${ConfVars.HIVE_SERVER2_TRANSPORT_MODE}=$mode
| --hiveconf ${ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}=$operationLogPath
| --hiveconf ${ConfVars.LOCALSCRATCHDIR}=$lScratchDir
| --hiveconf $portConf=$port
| --driver-class-path $driverClassPath
| --driver-java-options -Dlog4j.debug
Expand Down Expand Up @@ -873,6 +898,8 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
metastorePath.delete()
operationLogPath = Utils.createTempDir()
operationLogPath.delete()
lScratchDir = Utils.createTempDir()
lScratchDir.delete()
logPath = null
logTailingProcess = null

Expand Down Expand Up @@ -956,6 +983,9 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
operationLogPath.delete()
operationLogPath = null

lScratchDir.delete()
lScratchDir = null

Option(logPath).foreach(_.delete())
logPath = null

Expand Down

0 comments on commit 93e87f5

Please sign in to comment.