Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix engine log does not be overwrite #408

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,30 @@ trait ProcBuilder {
// Visible for test
private[kyuubi] var logCaptureThread: Thread = _

private lazy val engineLog: File = ProcBuilder.synchronized {
private[kyuubi] lazy val engineLog: File = ProcBuilder.synchronized {
val engineLogTimeout = conf.get(KyuubiConf.ENGINE_LOG_TIMEOUT)
val currentTime = System.currentTimeMillis()
val processLogPath = workingDir
Copy link
Member

@pan3793 pan3793 Mar 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala support SAM since 2.12, we can simplify below code as

val totalExistsFile = processLogPath.toFile.listFiles { (_, name) => name.startsWith(module) }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

magic !

val totalExistsFile = processLogPath.toFile.listFiles(new FilenameFilter() {
override def accept(dir: File, name: String): Boolean = {
name.startsWith(module)
}
})
val totalExistsFile = processLogPath.toFile.listFiles { (_, name) => name.startsWith(module) }
val sorted = totalExistsFile.sortBy(_.getName.split("\\.").last.toInt)
val nextIndex = if (sorted.isEmpty) {
0
} else {
sorted.last.getName.split("\\.").last.toInt + 1
}
val file = sorted.find(_.lastModified() < currentTime - engineLogTimeout)
.map { existsFile =>
try {
// Here we want to overwrite the exists log file
existsFile.delete()
existsFile.createNewFile()
existsFile
} catch {
case e: Exception =>
warn(s"failed to delete engine log file: ${existsFile.getAbsolutePath}", e)
null
}
}
.getOrElse {
Files.createDirectories(processLogPath)
val newLogFile = new File(processLogPath.toFile, s"$module.log.$nextIndex")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
package org.apache.kyuubi.engine.spark

import java.io.File
import java.nio.file.{Files, Path, Paths}
import java.nio.file.{Files, Path, Paths, StandardOpenOption}
import java.time.Duration
import java.util.concurrent.{Executors, TimeUnit}

import org.scalatest.time.SpanSugar._

import org.apache.kyuubi.{KerberizedTestHelper, KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.ENGINE_LOG_TIMEOUT
import org.apache.kyuubi.service.ServiceUtils

class SparkProcessBuilderSuite extends KerberizedTestHelper {
Expand Down Expand Up @@ -179,6 +181,29 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper {
atomicTest()
}
}

test("overwrite log file should cleanup before write") {
val fakeWorkDir = Files.createTempDirectory("fake")
val conf = KyuubiConf()
conf.set(ENGINE_LOG_TIMEOUT, Duration.ofDays(1).toMillis)
val builder1 = new FakeSparkProcessBuilder(conf) {
override val workingDir: Path = fakeWorkDir
}
val file1 = builder1.engineLog
Files.write(file1.toPath, "a".getBytes(), StandardOpenOption.APPEND)
assert(file1.length() == 1)
Files.write(file1.toPath, "a".getBytes(), StandardOpenOption.APPEND)
assert(file1.length() == 2)
file1.setLastModified(System.currentTimeMillis() - Duration.ofDays(1).toMillis - 1000)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we explicitly set ENGINE_LOG_TIMEOUT rather than depends on default value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, use the config exsplicitly.


val builder2 = new FakeSparkProcessBuilder(conf) {
override val workingDir: Path = fakeWorkDir
}
val file2 = builder2.engineLog
assert(file1.getAbsolutePath == file2.getAbsolutePath)
Files.write(file2.toPath, "a".getBytes(), StandardOpenOption.APPEND)
assert(file2.length() == 1)
}
}

class FakeSparkProcessBuilder(config: KyuubiConf)
Expand Down