diff --git a/bazelrunner/src/main/java/org/jetbrains/bsp/bazel/bazelrunner/BazelInfoResolver.kt b/bazelrunner/src/main/java/org/jetbrains/bsp/bazel/bazelrunner/BazelInfoResolver.kt index 81dc61ba0..b285c5874 100644 --- a/bazelrunner/src/main/java/org/jetbrains/bsp/bazel/bazelrunner/BazelInfoResolver.kt +++ b/bazelrunner/src/main/java/org/jetbrains/bsp/bazel/bazelrunner/BazelInfoResolver.kt @@ -12,7 +12,7 @@ class BazelInfoResolver( } private fun bazelInfoFromBazel(): BazelInfo { - val processResult = bazelRunner.commandBuilder().info().executeBazelCommand().waitAndGetResult() + val processResult = bazelRunner.commandBuilder().info().executeBazelCommand().waitAndGetResult(true) return parseBazelInfo(processResult).also { storage.store(it) } } diff --git a/bazelrunner/src/main/java/org/jetbrains/bsp/bazel/bazelrunner/BazelProcess.kt b/bazelrunner/src/main/java/org/jetbrains/bsp/bazel/bazelrunner/BazelProcess.kt index f0cb6d124..10741d3a7 100644 --- a/bazelrunner/src/main/java/org/jetbrains/bsp/bazel/bazelrunner/BazelProcess.kt +++ b/bazelrunner/src/main/java/org/jetbrains/bsp/bazel/bazelrunner/BazelProcess.kt @@ -3,6 +3,8 @@ package org.jetbrains.bsp.bazel.bazelrunner import java.time.Duration import org.apache.logging.log4j.LogManager import org.jetbrains.bsp.bazel.bazelrunner.outputs.AsyncOutputProcessor +import org.jetbrains.bsp.bazel.bazelrunner.outputs.OutputProcessor +import org.jetbrains.bsp.bazel.bazelrunner.outputs.SyncOutputProcessor import org.jetbrains.bsp.bazel.commons.Format import org.jetbrains.bsp.bazel.commons.Stopwatch import org.jetbrains.bsp.bazel.logger.BspClientLogger @@ -13,9 +15,11 @@ class BazelProcess internal constructor( private val originId: String? ) { - fun waitAndGetResult(): BazelProcessResult { - val outputProcessor = AsyncOutputProcessor(process, logger.withOriginId(originId)::message, LOGGER::info) + fun waitAndGetResult(ensureAllOutputRead: Boolean = false): BazelProcessResult { val stopwatch = Stopwatch.start() + val outputProcessor: OutputProcessor = + if (ensureAllOutputRead) SyncOutputProcessor(process, logger::message, LOGGER::info) + else AsyncOutputProcessor(process, logger::message, LOGGER::info) val exitCode = outputProcessor.waitForExit() val duration = stopwatch.stop() diff --git a/bazelrunner/src/main/java/org/jetbrains/bsp/bazel/bazelrunner/outputs/AsyncOutputProcessor.kt b/bazelrunner/src/main/java/org/jetbrains/bsp/bazel/bazelrunner/outputs/AsyncOutputProcessor.kt index 7122ab8d8..ec7763f53 100644 --- a/bazelrunner/src/main/java/org/jetbrains/bsp/bazel/bazelrunner/outputs/AsyncOutputProcessor.kt +++ b/bazelrunner/src/main/java/org/jetbrains/bsp/bazel/bazelrunner/outputs/AsyncOutputProcessor.kt @@ -1,61 +1,27 @@ package org.jetbrains.bsp.bazel.bazelrunner.outputs -import java.io.BufferedReader -import java.io.IOException -import java.io.InputStream -import java.io.InputStreamReader -import java.util.concurrent.Executors -import java.util.concurrent.Future import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.AtomicBoolean class AsyncOutputProcessor( - private val process: Process, + process: Process, vararg loggers: OutputHandler -) { - private val executorService = Executors.newCachedThreadPool() - private val runningProcessors = mutableListOf>() +) : OutputProcessor(process, *loggers) { - val stdoutCollector = OutputCollector() - val stderrCollector = OutputCollector() + private val isRunning = AtomicBoolean(true) - init { - start(process.inputStream, stdoutCollector, *loggers) - start(process.errorStream, stderrCollector, *loggers) - } + override fun isRunning(): Boolean = isRunning.get() - private fun start(inputStream: InputStream, vararg handlers: OutputHandler) { - val runnable = Runnable { + override fun shutdown() { + isRunning.set(false) + runningProcessors.forEach { try { - BufferedReader(InputStreamReader(inputStream)).use { reader -> - var prevLine: String? = null - - while (!Thread.currentThread().isInterrupted) { - val line = reader.readLine() ?: return@Runnable - if (line == prevLine) continue - prevLine = line - handlers.forEach { it.onNextLine(line) } - } - } - } catch (e: IOException) { - if (Thread.currentThread().isInterrupted) return@Runnable - throw RuntimeException(e) + it.get(500, TimeUnit.MILLISECONDS) // Output handles should not be _that_ heavy + } catch (_: TimeoutException) { + // ignore it } } - - executorService.submit(runnable).also { runningProcessors.add(it) } - } - - fun waitForExit(): Int { - val exitCode = process.waitFor() - shutdown() - return exitCode - } - - private fun shutdown() { - runningProcessors.forEach { - it.get(1, TimeUnit.MINUTES) // Output handles should not be _that_ heavy - } - executorService.shutdown() + super.shutdown() } } diff --git a/bazelrunner/src/main/java/org/jetbrains/bsp/bazel/bazelrunner/outputs/OutputProcessor.kt b/bazelrunner/src/main/java/org/jetbrains/bsp/bazel/bazelrunner/outputs/OutputProcessor.kt new file mode 100644 index 000000000..10bf455bd --- /dev/null +++ b/bazelrunner/src/main/java/org/jetbrains/bsp/bazel/bazelrunner/outputs/OutputProcessor.kt @@ -0,0 +1,59 @@ +package org.jetbrains.bsp.bazel.bazelrunner.outputs + +import java.io.BufferedReader +import java.io.IOException +import java.io.InputStream +import java.io.InputStreamReader +import java.util.concurrent.Executors +import java.util.concurrent.Future + +abstract class OutputProcessor(private val process: Process, vararg loggers: OutputHandler) { + val stdoutCollector = OutputCollector() + val stderrCollector = OutputCollector() + + private val executorService = Executors.newCachedThreadPool() + protected val runningProcessors = mutableListOf>() + + init { + start(process.inputStream, stdoutCollector, *loggers) + start(process.errorStream, stderrCollector, *loggers) + } + + protected open fun shutdown() { + executorService.shutdown() + } + + protected abstract fun isRunning(): Boolean + + protected fun start(inputStream: InputStream, vararg handlers: OutputHandler) { + val runnable = Runnable { + try { + BufferedReader(InputStreamReader(inputStream)).use { reader -> + var prevLine: String? = null + + while (!Thread.currentThread().isInterrupted) { + val line = reader.readLine() ?: return@Runnable + if (line == prevLine) continue + prevLine = line + if (isRunning()) { + handlers.forEach { it.onNextLine(line) } + } else { + break + } + } + } + } catch (e: IOException) { + if (Thread.currentThread().isInterrupted) return@Runnable + throw RuntimeException(e) + } + } + + executorService.submit(runnable).also { runningProcessors.add(it) } + } + + fun waitForExit(): Int { + val exitCode = process.waitFor() + shutdown() + return exitCode + } +} diff --git a/bazelrunner/src/main/java/org/jetbrains/bsp/bazel/bazelrunner/outputs/SyncOutputProcessor.kt b/bazelrunner/src/main/java/org/jetbrains/bsp/bazel/bazelrunner/outputs/SyncOutputProcessor.kt new file mode 100644 index 000000000..0351b0f51 --- /dev/null +++ b/bazelrunner/src/main/java/org/jetbrains/bsp/bazel/bazelrunner/outputs/SyncOutputProcessor.kt @@ -0,0 +1,18 @@ +package org.jetbrains.bsp.bazel.bazelrunner.outputs + +import java.util.concurrent.TimeUnit + +class SyncOutputProcessor( + process: Process, + vararg loggers: OutputHandler +) : OutputProcessor(process, *loggers) { + + override fun isRunning(): Boolean = true + + override fun shutdown() { + runningProcessors.forEach { + it.get(1, TimeUnit.MINUTES) // Output handles should not be _that_ heavy + } + super.shutdown() + } +} diff --git a/server/src/main/java/org/jetbrains/bsp/bazel/server/bsp/managers/BazelBspCompilationManager.java b/server/src/main/java/org/jetbrains/bsp/bazel/server/bsp/managers/BazelBspCompilationManager.java index 56b56e04e..d56763a7b 100644 --- a/server/src/main/java/org/jetbrains/bsp/bazel/server/bsp/managers/BazelBspCompilationManager.java +++ b/server/src/main/java/org/jetbrains/bsp/bazel/server/bsp/managers/BazelBspCompilationManager.java @@ -28,7 +28,7 @@ public BepBuildResult buildTargetsWithBep( .withFlags(extraFlags.asJava()) .withTargets(targetSpecs) .executeBazelBesCommand(originId) - .waitAndGetResult(); + .waitAndGetResult(false); return new BepBuildResult(result, bepServer.getBepOutput()); }