From 6e98bbd39457e8ba04ad242fd0c45c0168648a39 Mon Sep 17 00:00:00 2001 From: pq Date: Mon, 1 Dec 2025 19:33:19 -0800 Subject: [PATCH] [analytics] introduce `DTDProcess` --- .../com/jetbrains/lang/dart/dtd/DTDProcess.kt | 241 ++++++++++++++++++ 1 file changed, 241 insertions(+) create mode 100644 third_party/src/main/java/com/jetbrains/lang/dart/dtd/DTDProcess.kt diff --git a/third_party/src/main/java/com/jetbrains/lang/dart/dtd/DTDProcess.kt b/third_party/src/main/java/com/jetbrains/lang/dart/dtd/DTDProcess.kt new file mode 100644 index 000000000..9360fd855 --- /dev/null +++ b/third_party/src/main/java/com/jetbrains/lang/dart/dtd/DTDProcess.kt @@ -0,0 +1,241 @@ +/* + * Copyright 2025 The Chromium Authors. All rights reserved. + * Use of this source code is governed by a BSD-style license that can be + * found in the LICENSE file. + */ + +package com.jetbrains.lang.dart.dtd + +import com.google.gson.JsonObject +import com.google.gson.JsonParser +import com.intellij.execution.configurations.GeneralCommandLine +import com.intellij.execution.process.KillableProcessHandler +import com.intellij.execution.process.ProcessEvent +import com.intellij.execution.process.ProcessListener +import com.intellij.openapi.Disposable +import com.intellij.openapi.application.ApplicationManager +import com.intellij.openapi.diagnostic.logger +import com.intellij.openapi.util.Key +import com.intellij.openapi.util.io.FileUtil +import com.intellij.util.EventDispatcher +import com.intellij.util.io.BaseOutputReader +import com.jetbrains.lang.dart.ide.toolingDaemon.DartToolingDaemonConsumer +import com.jetbrains.lang.dart.ide.toolingDaemon.DartToolingDaemonListener +import com.jetbrains.lang.dart.ide.toolingDaemon.DartToolingDaemonRequestHandler +import com.jetbrains.lang.dart.sdk.DartSdk +import com.jetbrains.lang.dart.sdk.DartSdkUtil +import de.roderick.weberknecht.WebSocket +import de.roderick.weberknecht.WebSocketEventHandler +import de.roderick.weberknecht.WebSocketException +import de.roderick.weberknecht.WebSocketMessage +import java.net.URI +import java.nio.charset.StandardCharsets +import java.util.concurrent.atomic.AtomicInteger + +class DTDProcess { + companion object { + private val logger = logger() + } + + private lateinit var osProcessHandler: KillableProcessHandler + + private val consumerMap: MutableMap = mutableMapOf() + + private var processRunning = false + + var uri: String? = null + private set + private var secret: String? = null + + private val nextRequestId = AtomicInteger() + + private lateinit var webSocket: WebSocket + var webSocketReady: Boolean = false + private set + private val servicesMap: MutableMap = mutableMapOf() + + private val eventDispatcher: EventDispatcher = + EventDispatcher.create(DartToolingDaemonListener::class.java) + + var listener: DTDProcessListener? = null + set + private fun connectToWebSocket(uri: String) { + try { + webSocket = WebSocket(URI(uri)) + webSocket.eventHandler = object : WebSocketEventHandler { + override fun onClose() { + listener?.onWebSocketClose() + } + + override fun onMessage(message: WebSocketMessage) { + val text = message.text + handleWebSocketMessage(text) + listener?.onWebSocketMessage(text) + } + + override fun onOpen() { + webSocketReady = true + listener?.onWebSocketOpen() + } + + override fun onPing() {} + + override fun onPong() {} + } + webSocket.connect() + } catch (e: Exception) { + logger.error("Failed to connect to Dart Tooling Daemon, uri: $uri", e) + } + } + + // TODO (pq): call from DartToolingDaemonService + fun addToolingDaemonListener(listener: DartToolingDaemonListener, parentDisposable: Disposable): Unit = + eventDispatcher.addListener(listener, parentDisposable) + + private fun handleWebSocketMessage(text: String) { + logger.debug("<-- $text") + + val json: JsonObject = try { + JsonParser.parseString(text) as JsonObject + } catch (e: Exception) { + logger.warn("Failed to parse message, error: ${e.message}, message: $text") + return + } + + val method = json["method"]?.asString + val serviceConsumer = servicesMap[method] + if (method == "streamNotify") { + val params = json["params"].asJsonObject + val streamId = params["streamId"].asString + eventDispatcher.multicaster.received(streamId, json) + } else if (serviceConsumer != null) { + val params = json["params"].asJsonObject + val id = json["id"].asString + ApplicationManager.getApplication().executeOnPooledThread { + val response = serviceConsumer.handleRequest(params) + sendResponse(id, response.result, response.error) + } + } else { + val id = json["id"].asString + val consumer = consumerMap.remove(id) + consumer?.received(json) + } + } + + fun start(sdk: DartSdk) { + val commandLine = GeneralCommandLine().withWorkDirectory(sdk.homePath) + commandLine.exePath = FileUtil.toSystemDependentName(DartSdkUtil.getDartExePath(sdk)) + commandLine.charset = StandardCharsets.UTF_8 + commandLine.addParameter("tooling-daemon") + commandLine.addParameter("--machine") + + osProcessHandler = object : KillableProcessHandler(commandLine) { + override fun readerOptions(): BaseOutputReader.Options = BaseOutputReader.Options.forMostlySilentProcess() + } + + osProcessHandler.addProcessListener(OSProcessListener()) + osProcessHandler.startNotify() + } + + fun terminate() { + if (::osProcessHandler.isInitialized && !osProcessHandler.isProcessTerminated) { + osProcessHandler.killProcess() + } + } + + private fun onProcessStarted(uri: String?, secret: String?) { + processRunning = true + this.uri = uri + this.secret = secret + + uri?.let { + connectToWebSocket(it) + } + + listener?.onProcessStarted(uri) + } + + private fun sendResponse(id: String, result: JsonObject?, error: JsonObject? = null) { + if (!webSocketReady) { + logger.warn("sendResponse(\"$id\", $result) called when the socket is not ready") + return + } + + val response = JsonObject() + response.addProperty("jsonrpc", "2.0") + response.addProperty("id", id) + if (error == null) { + response.add("result", result) + } else { + response.add("error", error) + } + + val responseString = response.toString() + logger.debug("--> $responseString") + webSocket.send(responseString) + } + + @Throws(WebSocketException::class) + fun sendRequest(method: String, params: JsonObject, includeSecret: Boolean, consumer: DartToolingDaemonConsumer) { + if (!webSocketReady) { + logger.warn("sendRequest(\"$method\") called when the socket is not ready") + return + } + + val request = JsonObject() + request.addProperty("jsonrpc", "2.0") + request.addProperty("method", method) + + val id = nextRequestId.incrementAndGet().toString() + request.addProperty("id", id) + secret?.takeIf { includeSecret }?.let { params.addProperty("secret", it) } + request.add("params", params) + + consumerMap[id] = consumer + + val requestString = request.toString() + logger.debug("--> $requestString") + webSocket.send(requestString) + } + + private inner class OSProcessListener : ProcessListener { + + override fun processTerminated(event: ProcessEvent) { + processRunning = false + webSocketReady = false + uri = null + logger.info("DTD process terminated, exit code: ${event.exitCode}") + } + + override fun onTextAvailable(event: ProcessEvent, outputType: Key<*>) { + logger.debug("DTD output: ${event.text}") + + if (processRunning) return + + // The first line of text is the command issued, which can be ignored. + val text = event.text.trim().takeUnless { it.endsWith(" tooling-daemon --machine") } + ?: return + + var uri: String? = null + var secret: String? = null + try { + val json = JsonParser.parseString(text) as JsonObject + val details = json["tooling_daemon_details"].asJsonObject + uri = details["uri"].asString + secret = details["trusted_client_secret"].asString + } catch (e: Exception) { + logger.warn("Failed to parse DTD init message. Error: ${e.message}. DTD message: $text") + } finally { + onProcessStarted(uri, secret) + } + } + } +} + +interface DTDProcessListener { + fun onWebSocketMessage(text: String) + fun onWebSocketClose() + fun onWebSocketOpen() + fun onProcessStarted(uri: String?) +} +