Skip to content

Commit 809ea2a

Browse files
committed
[KYUUBI #2296] Fix operation log file handler leak
### _Why are the changes needed?_ If a session is empty which means that it does not contain any operation, then the OperationLog will never be closed. This bug will happen if we enable `LaunchEngine` ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2296 from ulysses-you/fix-file-handler-leak. Closes #2296 22d07e1 [ulysses-you] fix bb83b1a [ulysses-you] fix Authored-by: ulysses-you <ulyssesyou18@gmail.com> Signed-off-by: Fei Wang <fwang12@ebay.com> (cherry picked from commit e5834ae) Signed-off-by: ulysses-you <ulyssesyou@apache.org>
1 parent d3e25f0 commit 809ea2a

File tree

4 files changed

+32
-18
lines changed

4 files changed

+32
-18
lines changed

kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,10 @@ object OperationLog extends Logging {
8383

8484
class OperationLog(path: Path) {
8585

86-
private val writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8)
87-
private val reader = Files.newBufferedReader(path, StandardCharsets.UTF_8)
86+
private lazy val writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8)
87+
private lazy val reader = Files.newBufferedReader(path, StandardCharsets.UTF_8)
8888

89-
private val extraReaders: ListBuffer[BufferedReader] = ListBuffer()
89+
private lazy val extraReaders: ListBuffer[BufferedReader] = ListBuffer()
9090

9191
def addExtraLog(path: Path): Unit = synchronized {
9292
try {
@@ -151,11 +151,23 @@ class OperationLog(path: Path) {
151151
}
152152

153153
def close(): Unit = synchronized {
154-
try {
155-
closeExtraReaders()
154+
closeExtraReaders()
155+
156+
trySafely {
156157
reader.close()
158+
}
159+
trySafely {
157160
writer.close()
161+
}
162+
163+
trySafely {
158164
Files.delete(path)
165+
}
166+
}
167+
168+
private def trySafely(f: => Unit): Unit = {
169+
try {
170+
f
159171
} catch {
160172
case e: IOException =>
161173
// Printing log here may cause a deadlock. The lock order of OperationLog.write

kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ class OperationLogSuite extends KyuubiFunSuite {
5757
val operationLog = OperationLog.createOperationLog(session, oHandle)
5858
val logFile =
5959
Paths.get(operationLogRoot, sHandle.identifier.toString, oHandle.identifier.toString)
60-
assert(Files.exists(logFile))
60+
// lazy create
61+
assert(!Files.exists(logFile))
6162

6263
OperationLog.setCurrentOperationLog(operationLog)
6364
assert(OperationLog.getCurrentOperationLog === operationLog)
@@ -66,6 +67,8 @@ class OperationLogSuite extends KyuubiFunSuite {
6667
assert(OperationLog.getCurrentOperationLog === null)
6768

6869
operationLog.write(msg1 + "\n")
70+
assert(Files.exists(logFile))
71+
6972
val tRowSet1 = operationLog.read(1)
7073
assert(tRowSet1.getColumns.get(0).getStringVal.getValues.get(0) === msg1)
7174
val tRowSet2 = operationLog.read(1)
@@ -145,8 +148,10 @@ class OperationLogSuite extends KyuubiFunSuite {
145148
val oHandle = OperationHandle(
146149
OperationType.EXECUTE_STATEMENT,
147150
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
148-
val log = OperationLog.createOperationLog(session, oHandle)
149-
assert(log === null)
151+
intercept[Exception] {
152+
val log = OperationLog.createOperationLog(session, oHandle)
153+
log.read(1)
154+
}
150155
logRoot.delete()
151156

152157
OperationLog.createOperationLogRootDirectory(session)

kyuubi-server/src/main/scala/org/apache/kyuubi/operation/ExecuteStatement.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,4 @@ class ExecuteStatement(
176176
super.setState(newState)
177177
EventLogging.onEvent(KyuubiOperationEvent(this))
178178
}
179-
180-
override def close(): Unit = {
181-
super.close()
182-
}
183179
}

kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperation.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -119,13 +119,14 @@ abstract class KyuubiOperation(opType: OperationType, session: Session)
119119
if (!isClosedOrCanceled) {
120120
setState(OperationState.CLOSED)
121121
MetricsSystem.tracing(_.decCount(MetricRegistry.name(OPERATION_OPEN, opTypeName)))
122+
try {
123+
// For launch engine operation, we use OperationLog to pass engine submit log but
124+
// at that time we do not have remoteOpHandle
125+
getOperationLog.foreach(_.close())
126+
} catch {
127+
case e: IOException => error(e.getMessage, e)
128+
}
122129
if (_remoteOpHandle != null) {
123-
try {
124-
getOperationLog.foreach(_.close())
125-
} catch {
126-
case e: IOException => error(e.getMessage, e)
127-
}
128-
129130
try {
130131
client.closeOperation(_remoteOpHandle)
131132
} catch {

0 commit comments

Comments
 (0)