From 5b37d27eca0b9e05ff5fb9f842fc3e4c735edbce Mon Sep 17 00:00:00 2001 From: LossyDragon Date: Wed, 18 Dec 2024 23:52:39 -0600 Subject: [PATCH 1/8] Experiment with ktor websockets again, hopefully to simplify disconnection reasons. --- build.gradle.kts | 1 + gradle/libs.versions.toml | 9 + .../networking/steam3/WebSocketCMClient.kt | 100 ----------- .../networking/steam3/WebSocketConnection.kt | 165 ++++++++++-------- 4 files changed, 102 insertions(+), 173 deletions(-) delete mode 100644 src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketCMClient.kt diff --git a/build.gradle.kts b/build.gradle.kts index 23b99c58..0df7ceda 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -122,6 +122,7 @@ dependencies { implementation(libs.okHttp) implementation(libs.xz) implementation(libs.protobuf.java) + implementation(libs.bundles.ktor) testImplementation(libs.bundles.testing) } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index e9b0bae4..3a064c06 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -44,6 +44,9 @@ protobuf-java = { module = "com.google.protobuf:protobuf-java", version.ref = "p protobuf-protoc = { module = "com.google.protobuf:protoc", version.ref = "protobuf" } qrCode = { module = "pro.leaco.qrcode:console-qrcode", version.ref = "qrCode" } xz = { module = "org.tukaani:xz", version.ref = "xz" } +ktor-client-core = { module = "io.ktor:ktor-client-core", version = "3.0.2" } +ktor-client-cio = { module = "io.ktor:ktor-client-cio", version = "3.0.2" } +ktor-client-websocket = { module = "io.ktor:ktor-client-websockets", version = "3.0.2" } test-commons-codec = { module = "commons-codec:commons-codec", version.ref = "commonsCodec" } test-jupiter-api = { module = "org.junit.jupiter:junit-jupiter-api", version.ref = "junit5" } @@ -71,3 +74,9 @@ testing = [ "test-mockito-core", "test-mockito-jupiter", ] + +ktor = [ + "ktor-client-core", + "ktor-client-cio", + "ktor-client-websocket", +] diff --git a/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketCMClient.kt b/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketCMClient.kt deleted file mode 100644 index c30cf2fb..00000000 --- a/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketCMClient.kt +++ /dev/null @@ -1,100 +0,0 @@ -package `in`.dragonbra.javasteam.networking.steam3 - -import okhttp3.OkHttpClient -import okhttp3.Request -import okhttp3.Response -import okhttp3.WebSocket -import okhttp3.WebSocketListener -import okio.ByteString -import java.net.URI -import java.util.concurrent.TimeUnit - -class WebSocketCMClient( - timeout: Int, - private val serverUrl: URI, - private val listener: WSListener, -) : WebSocketListener() { - - companion object { - // private val logger = LogManager.getLogger(WebSocketCMClient::class.java) - } - - private val client = OkHttpClient.Builder() - .readTimeout(timeout.toLong(), TimeUnit.MILLISECONDS) - .build() - - private var webSocket: WebSocket? = null - - /** - * Invoked when a web socket has been accepted by the remote peer and may begin transmitting - * messages. - */ - override fun onOpen(webSocket: WebSocket, response: Response) { - listener.onOpen(response) - response.close() - } - - /** Invoked when a text (type `0x1`) message has been received. */ - override fun onMessage(webSocket: WebSocket, text: String) { - listener.onTextData(text) - } - - /** Invoked when a binary (type `0x2`) message has been received. */ - override fun onMessage(webSocket: WebSocket, bytes: ByteString) { - listener.onData(bytes.toByteArray()) - } - - /** - * Invoked when the remote peer has indicated that no more incoming messages will be transmitted. - */ - override fun onClosing(webSocket: WebSocket, code: Int, reason: String) { - listener.onClosing(code, reason) - } - - /** - * Invoked when both peers have indicated that no more messages will be transmitted and the - * connection has been successfully released. No further calls to this listener will be made. - */ - override fun onClosed(webSocket: WebSocket, code: Int, reason: String) { - listener.onClose(code, reason) - this.webSocket = null - } - - /** - * Invoked when a web socket has been closed due to an error reading from or writing to the - * network. Both outgoing and incoming messages may have been lost. No further calls to this - * listener will be made. - */ - override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) { - listener.onError(t) - response?.close() - this.webSocket = null - } - - fun connect() { - val request = Request.Builder().url(serverUrl.toString()).build() - webSocket = client.newWebSocket(request, this) - } - - fun send(data: ByteArray) { - webSocket?.send(ByteString.of(*data)) - } - - fun close() { - webSocket?.close(1000, null) - - // Shutdown the okhttp client to prevent hanging. - client.dispatcher.executorService.shutdown() - client.connectionPool.evictAll() - client.cache?.close() - } - - interface WSListener { - fun onTextData(data: String) - fun onData(data: ByteArray) - fun onClose(code: Int, reason: String) - fun onClosing(code: Int, reason: String) - fun onError(t: Throwable) - fun onOpen(response: Response) - } -} diff --git a/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt b/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt index 9dce1967..1d64dd6b 100644 --- a/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt +++ b/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt @@ -2,104 +2,123 @@ package `in`.dragonbra.javasteam.networking.steam3 import `in`.dragonbra.javasteam.util.log.LogManager import `in`.dragonbra.javasteam.util.log.Logger -import okhttp3.Response +import io.ktor.client.HttpClient +import io.ktor.client.engine.cio.CIO +import io.ktor.client.plugins.websocket.DefaultClientWebSocketSession +import io.ktor.client.plugins.websocket.WebSockets +import io.ktor.client.plugins.websocket.webSocketSession +import io.ktor.http.URLProtocol +import io.ktor.http.path +import io.ktor.websocket.Frame +import io.ktor.websocket.close +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.launch +import kotlinx.coroutines.withTimeout import java.net.InetAddress import java.net.InetSocketAddress -import java.net.URI -import java.util.concurrent.atomic.AtomicReference +import kotlin.coroutines.CoroutineContext class WebSocketConnection : Connection(), - WebSocketCMClient.WSListener { + CoroutineScope { companion object { private val logger: Logger = LogManager.getLogger(WebSocketConnection::class.java) - - private fun constructUri(address: InetSocketAddress): URI = - URI.create("wss://${address.hostString}:${address.port}/cmsocket/") } - private val client = AtomicReference(null) - - private var socketEndPoint: InetSocketAddress? = null - - override fun connect(endPoint: InetSocketAddress, timeout: Int) { - logger.debug("Connecting to $endPoint...") + private var client: DefaultClientWebSocketSession? = null - val serverUri = constructUri(endPoint) - val newClient = WebSocketCMClient(timeout, serverUri, this) - val oldClient = client.getAndSet(newClient) + private var currentTimeout: Long = 5000L - oldClient?.let { oldClient -> - logger.debug("Attempted to connect while already connected. Closing old connection...") - oldClient.close() - onDisconnected(false) - } + private var currentEndpoint: InetSocketAddress? = null - socketEndPoint = endPoint + private val job = SupervisorJob() - newClient.connect() + private val ktorClient = HttpClient(CIO) { + install(WebSockets) } - override fun disconnect(userInitiated: Boolean) { - disconnectCore(userInitiated) - } + override val coroutineContext: CoroutineContext + get() = Dispatchers.IO + job + CoroutineName("WebSocketConnection") - override fun send(data: ByteArray) { - try { - client.get()?.send(data) - } catch (e: Exception) { - logger.debug("Exception while sending data", e) - disconnectCore(false) + override fun connect(endPoint: InetSocketAddress, timeout: Int) { + currentEndpoint = endPoint + currentTimeout = timeout.toLong() + launch { + try { + val session = withTimeout(currentTimeout) { + ktorClient.webSocketSession { + url { + protocol = URLProtocol.WSS + host = endPoint.hostString + port = endPoint.port + path("cmsocket/") + } + } + } + + client = session + onConnected() + + logger.debug("Connected to ${endPoint.hostString}:${endPoint.port}") + + for (frame in session.incoming) { + when (frame) { + is Frame.Binary -> { + val event = NetMsgEventArgs(frame.data, currentEndpoint) + onNetMsgReceived(event) + } + + is Frame.Close -> { + disconnect(false) + break + } + + else -> Unit // Ignore other frames + } + } + } catch (e: Exception) { + logger.debug("An error occurred in the WebSocket connection", e) + onDisconnected(false) + } } } - override fun getLocalIP(): InetAddress? = InetAddress.getByAddress(byteArrayOf(0, 0, 0, 0)) - - override fun getCurrentEndPoint(): InetSocketAddress? = socketEndPoint - - override fun getProtocolTypes(): ProtocolTypes = ProtocolTypes.WEB_SOCKET - - private fun disconnectCore(userInitiated: Boolean) { - logger.debug("User initiated disconnection: $userInitiated") - - val oldClient = client.getAndSet(null) - oldClient?.close() - - onDisconnected(userInitiated) - - socketEndPoint = null - } - - override fun onTextData(data: String) { - // Ignore string messages - logger.debug("Got string message: $data") - } - - override fun onData(data: ByteArray) { - if (data.isNotEmpty()) { - onNetMsgReceived(NetMsgEventArgs(data, getCurrentEndPoint())) + override fun disconnect(userInitiated: Boolean) { + logger.debug("Disconnecting from $currentEndpoint, userInitiated: $userInitiated") + launch { + try { + client?.close() + } finally { + client = null + currentEndpoint = null + onDisconnected(userInitiated) + } } } - override fun onClose(code: Int, reason: String) { - logger.debug("Connection closed") + override fun send(data: ByteArray?) { + launch { + if (client == null) { + logger.debug("Attempted to send data while not connected") + return@launch + } + + if (data != null && data.isNotEmpty()) { + withTimeout(currentTimeout) { + val frame = Frame.Binary(true, data) + client?.send(frame) + } + } + } } - override fun onClosing(code: Int, reason: String) { - logger.debug("Closing connection: $code, reason: ${reason.ifEmpty { "No reason given" }}") - // Steam can close a connection if there is nothing else it wants to send. - // For example: AccountLoginDeniedNeedTwoFactor, InvalidPassword, etc. - disconnectCore(code == 1000) - } + override fun getLocalIP(): InetAddress? = InetAddress.getLocalHost() - override fun onError(t: Throwable) { - logger.error("Error in websocket", t) - disconnectCore(false) - } + override fun getCurrentEndPoint(): InetSocketAddress? = currentEndpoint - override fun onOpen(response: Response) { - logger.debug("WebSocket connected to $socketEndPoint using TLS: ${response.handshake?.tlsVersion}") - onConnected() - } + override fun getProtocolTypes(): ProtocolTypes? = ProtocolTypes.WEB_SOCKET } From 9c41c70211951dde750a26a050a443d996b46e5c Mon Sep 17 00:00:00 2001 From: LossyDragon Date: Thu, 19 Dec 2024 16:20:20 -0600 Subject: [PATCH 2/8] fix non-null getter. --- .../javasteam/networking/steam3/WebSocketConnection.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt b/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt index 1d64dd6b..30111bbd 100644 --- a/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt +++ b/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt @@ -120,5 +120,5 @@ class WebSocketConnection : override fun getCurrentEndPoint(): InetSocketAddress? = currentEndpoint - override fun getProtocolTypes(): ProtocolTypes? = ProtocolTypes.WEB_SOCKET + override fun getProtocolTypes(): ProtocolTypes = ProtocolTypes.WEB_SOCKET } From 0c2ed14639de9d1ddb8471aba99dc11d5804da94 Mon Sep 17 00:00:00 2001 From: LossyDragon Date: Fri, 3 Jan 2025 00:22:46 -0600 Subject: [PATCH 3/8] Revamp websocket connection more, trying to be reliable when a local or remote disconnection or error occurs. --- gradle/libs.versions.toml | 6 +- .../networking/steam3/WebSocketConnection.kt | 220 +++++++++++++----- 2 files changed, 166 insertions(+), 60 deletions(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 3a064c06..6310548f 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -44,9 +44,9 @@ protobuf-java = { module = "com.google.protobuf:protobuf-java", version.ref = "p protobuf-protoc = { module = "com.google.protobuf:protoc", version.ref = "protobuf" } qrCode = { module = "pro.leaco.qrcode:console-qrcode", version.ref = "qrCode" } xz = { module = "org.tukaani:xz", version.ref = "xz" } -ktor-client-core = { module = "io.ktor:ktor-client-core", version = "3.0.2" } -ktor-client-cio = { module = "io.ktor:ktor-client-cio", version = "3.0.2" } -ktor-client-websocket = { module = "io.ktor:ktor-client-websockets", version = "3.0.2" } +ktor-client-core = { module = "io.ktor:ktor-client-core", version = "3.0.3" } +ktor-client-cio = { module = "io.ktor:ktor-client-cio", version = "3.0.3" } +ktor-client-websocket = { module = "io.ktor:ktor-client-websockets", version = "3.0.3" } test-commons-codec = { module = "commons-codec:commons-codec", version.ref = "commonsCodec" } test-jupiter-api = { module = "org.junit.jupiter:junit-jupiter-api", version.ref = "junit5" } diff --git a/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt b/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt index 30111bbd..ce2d5817 100644 --- a/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt +++ b/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt @@ -1,124 +1,230 @@ package `in`.dragonbra.javasteam.networking.steam3 import `in`.dragonbra.javasteam.util.log.LogManager -import `in`.dragonbra.javasteam.util.log.Logger import io.ktor.client.HttpClient import io.ktor.client.engine.cio.CIO -import io.ktor.client.plugins.websocket.DefaultClientWebSocketSession import io.ktor.client.plugins.websocket.WebSockets import io.ktor.client.plugins.websocket.webSocketSession import io.ktor.http.URLProtocol import io.ktor.http.path +import io.ktor.websocket.DefaultWebSocketSession import io.ktor.websocket.Frame import io.ktor.websocket.close -import kotlinx.coroutines.CoroutineName +import io.ktor.websocket.readBytes import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import kotlinx.coroutines.withTimeout import java.net.InetAddress import java.net.InetSocketAddress -import kotlin.coroutines.CoroutineContext +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference +import kotlin.time.Duration.Companion.milliseconds -class WebSocketConnection : - Connection(), - CoroutineScope { +class WebSocketConnection : Connection() { - companion object { - private val logger: Logger = LogManager.getLogger(WebSocketConnection::class.java) + private val logger = LogManager.getLogger(WebSocketConnection::class.java) + + private val client = HttpClient(CIO) { + install(WebSockets) } - private var client: DefaultClientWebSocketSession? = null + private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) - private var currentTimeout: Long = 5000L + private var session: DefaultWebSocketSession? = null - private var currentEndpoint: InetSocketAddress? = null + private var heartbeatJob: Job? = null - private val job = SupervisorJob() + private var receiveJob: Job? = null - private val ktorClient = HttpClient(CIO) { - install(WebSockets) - } + private var sendJob: Job? = null + + private val isConnected = AtomicBoolean(false) + + private val currentEndPointRef = AtomicReference() - override val coroutineContext: CoroutineContext - get() = Dispatchers.IO + job + CoroutineName("WebSocketConnection") + private val isClosing = AtomicBoolean(false) + + companion object { + private const val HEARTBEAT_INTERVAL_MS = 30_000L + } override fun connect(endPoint: InetSocketAddress, timeout: Int) { - currentEndpoint = endPoint - currentTimeout = timeout.toLong() - launch { + if (isConnected.get()) { + logger.debug("Already connected, disconnect first") + return + } + + logger.debug("Connecting to ${endPoint.hostName}:${endPoint.port}") + + currentEndPointRef.set(endPoint) + + scope.launch { try { - val session = withTimeout(currentTimeout) { - ktorClient.webSocketSession { + withTimeout(timeout.milliseconds) { + val session = client.webSocketSession { url { - protocol = URLProtocol.WSS - host = endPoint.hostString + host = endPoint.hostName port = endPoint.port + protocol = URLProtocol.WSS path("cmsocket/") } } + + this@WebSocketConnection.session = session + isConnected.set(true) + startHeartbeat() + startReceiving() + + logger.debug("Connected to ${endPoint.hostName}:${endPoint.port}") + onConnected() } + } catch (e: Exception) { + logger.error("Failed to connect: ${e.message}", e) + handleDisconnection(false) + } + } + } - client = session - onConnected() + private fun startHeartbeat() { + heartbeatJob?.cancel() + heartbeatJob = scope.launch { + logger.debug("Starting heartbeat") + while (isActive) { + delay(HEARTBEAT_INTERVAL_MS) + if (!isConnected.get()) break + + try { + val currentSession = session + if (currentSession == null || !currentSession.isActive) { + logger.debug("Session became inactive during heartbeat check") + handleDisconnection(false) + break + } - logger.debug("Connected to ${endPoint.hostString}:${endPoint.port}") + withTimeout(5000) { + try { + currentSession.send(Frame.Text("")) + } catch (e: Exception) { + logger.error("Failed to send keepalive frame", e) + throw e + } + } + } catch (e: Exception) { + logger.error("Error during heartbeat: ${e.message}", e) + handleDisconnection(false) + break + } + } + } + } - for (frame in session.incoming) { + private fun startReceiving() { + receiveJob?.cancel() + receiveJob = scope.launch { + logger.debug("Starting receive loop") + try { + val incoming = session?.incoming ?: return@launch + for (frame in incoming) { when (frame) { is Frame.Binary -> { - val event = NetMsgEventArgs(frame.data, currentEndpoint) - onNetMsgReceived(event) + val data = frame.readBytes() + onNetMsgReceived(NetMsgEventArgs(data, currentEndPointRef.get())) } - is Frame.Close -> { - disconnect(false) - break - } - - else -> Unit // Ignore other frames + else -> logger.debug("Received non-binary frame: $frame") } } } catch (e: Exception) { - logger.debug("An error occurred in the WebSocket connection", e) - onDisconnected(false) + if (!isClosing.get()) { + logger.error("Error in receive loop: ${e.message}", e) + handleDisconnection(false) + } } } } override fun disconnect(userInitiated: Boolean) { - logger.debug("Disconnecting from $currentEndpoint, userInitiated: $userInitiated") - launch { + if (isClosing.getAndSet(true)) { + logger.debug("Disconnect already in progress") + return + } + + scope.launch { try { - client?.close() + logger.debug("Disconnecting... (user initiated: $userInitiated)") + + heartbeatJob?.cancelAndJoin() + receiveJob?.cancelAndJoin() + sendJob?.cancelAndJoin() + + val currentSession = session + + session = null + isConnected.set(false) + + try { + currentSession?.close() + } catch (e: Exception) { + logger.error("Error closing session", e) + } + + val oldEndPoint = currentEndPointRef.get() + + if (oldEndPoint != null) { + onDisconnected(userInitiated) + } + + currentEndPointRef.set(null) + + logger.debug("Disconnected successfully") + } catch (e: Exception) { + logger.error("Error during disconnect: ${e.message}", e) } finally { - client = null - currentEndpoint = null - onDisconnected(userInitiated) + isClosing.set(false) } } } - override fun send(data: ByteArray?) { - launch { - if (client == null) { - logger.debug("Attempted to send data while not connected") - return@launch - } + private fun handleDisconnection(userInitiated: Boolean) { + disconnect(userInitiated) + } - if (data != null && data.isNotEmpty()) { - withTimeout(currentTimeout) { - val frame = Frame.Binary(true, data) - client?.send(frame) + override fun send(data: ByteArray) { + if (!isConnected.get()) { + logger.error("Cannot send: not connected") + return + } + + sendJob?.cancel() + sendJob = scope.launch { + try { + val currentSession = session + if (currentSession == null) { + logger.debug("Cannot send: session is null") + handleDisconnection(false) + return@launch } + + withTimeout(5000) { + currentSession.send(Frame.Binary(true, data)) + logger.debug("Sent ${data.size} bytes") + } + } catch (e: Exception) { + logger.error("Error sending data: ${e.message}", e) + handleDisconnection(false) } } } - override fun getLocalIP(): InetAddress? = InetAddress.getLocalHost() + override fun getLocalIP(): InetAddress = InetAddress.getLocalHost() - override fun getCurrentEndPoint(): InetSocketAddress? = currentEndpoint + override fun getCurrentEndPoint(): InetSocketAddress? = currentEndPointRef.get() override fun getProtocolTypes(): ProtocolTypes = ProtocolTypes.WEB_SOCKET } From 07315f21206d6bc07bfec30d101cea3ae4005af6 Mon Sep 17 00:00:00 2001 From: LossyDragon Date: Tue, 14 Jan 2025 20:16:44 -0600 Subject: [PATCH 4/8] Rework websocket again for a timeout option when the steam servers go down. Found ClientHeartBeat can ask for a reply which should help greatly. --- .../networking/steam3/WebSocketConnection.kt | 262 +++++++----------- .../dragonbra/javasteam/steam/CMClient.java | 7 +- 2 files changed, 108 insertions(+), 161 deletions(-) diff --git a/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt b/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt index ce2d5817..ac7e26bb 100644 --- a/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt +++ b/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt @@ -4,227 +4,169 @@ import `in`.dragonbra.javasteam.util.log.LogManager import io.ktor.client.HttpClient import io.ktor.client.engine.cio.CIO import io.ktor.client.plugins.websocket.WebSockets +import io.ktor.client.plugins.websocket.pingInterval import io.ktor.client.plugins.websocket.webSocketSession import io.ktor.http.URLProtocol import io.ktor.http.path -import io.ktor.websocket.DefaultWebSocketSession import io.ktor.websocket.Frame +import io.ktor.websocket.WebSocketSession import io.ktor.websocket.close import io.ktor.websocket.readBytes +import io.ktor.websocket.readText import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.cancelChildren +import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.delay import kotlinx.coroutines.isActive import kotlinx.coroutines.launch -import kotlinx.coroutines.withTimeout import java.net.InetAddress import java.net.InetSocketAddress -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.atomic.AtomicReference -import kotlin.time.Duration.Companion.milliseconds +import kotlin.coroutines.CoroutineContext +import kotlin.time.DurationUnit +import kotlin.time.toDuration -class WebSocketConnection : Connection() { +class WebSocketConnection : + Connection(), + CoroutineScope { - private val logger = LogManager.getLogger(WebSocketConnection::class.java) - - private val client = HttpClient(CIO) { - install(WebSockets) + companion object { + private val logger = LogManager.getLogger(WebSocketConnection::class.java) } - private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) - - private var session: DefaultWebSocketSession? = null - - private var heartbeatJob: Job? = null + private val job = SupervisorJob() - private var receiveJob: Job? = null + private var monitorJob: Job? = null - private var sendJob: Job? = null + private var client: HttpClient? = null - private val isConnected = AtomicBoolean(false) + private var session: WebSocketSession? = null - private val currentEndPointRef = AtomicReference() + private var endpoint: InetSocketAddress? = null - private val isClosing = AtomicBoolean(false) - - companion object { - private const val HEARTBEAT_INTERVAL_MS = 30_000L - } + override val coroutineContext: CoroutineContext = Dispatchers.IO + job override fun connect(endPoint: InetSocketAddress, timeout: Int) { - if (isConnected.get()) { - logger.debug("Already connected, disconnect first") - return - } - - logger.debug("Connecting to ${endPoint.hostName}:${endPoint.port}") + launch { + try { + endpoint = endPoint - currentEndPointRef.set(endPoint) + client = HttpClient(CIO) { + install(WebSockets) { + pingInterval = timeout.toDuration(DurationUnit.SECONDS) + } + } - scope.launch { - try { - withTimeout(timeout.milliseconds) { - val session = client.webSocketSession { - url { - host = endPoint.hostName - port = endPoint.port - protocol = URLProtocol.WSS - path("cmsocket/") - } + val session = client?.webSocketSession { + url { + host = endPoint.hostName + port = endPoint.port + protocol = URLProtocol.WSS + path("cmsocket/") } + } - this@WebSocketConnection.session = session - isConnected.set(true) - startHeartbeat() - startReceiving() + this@WebSocketConnection.session = session - logger.debug("Connected to ${endPoint.hostName}:${endPoint.port}") - onConnected() - } - } catch (e: Exception) { - logger.error("Failed to connect: ${e.message}", e) - handleDisconnection(false) - } - } - } + startConnectionMonitoring() - private fun startHeartbeat() { - heartbeatJob?.cancel() - heartbeatJob = scope.launch { - logger.debug("Starting heartbeat") - while (isActive) { - delay(HEARTBEAT_INTERVAL_MS) - if (!isConnected.get()) break - - try { - val currentSession = session - if (currentSession == null || !currentSession.isActive) { - logger.debug("Session became inactive during heartbeat check") - handleDisconnection(false) - break - } + launch { + try { + session?.incoming?.consumeEach { frame -> + when (frame) { + is Frame.Binary -> { + val data = frame.readBytes() + onNetMsgReceived(NetMsgEventArgs(data, currentEndPoint)) + } - withTimeout(5000) { - try { - currentSession.send(Frame.Text("")) - } catch (e: Exception) { - logger.error("Failed to send keepalive frame", e) - throw e + is Frame.Close -> disconnect(false) + is Frame.Ping -> logger.debug("Received pong") + is Frame.Pong -> logger.debug("Received pong") + is Frame.Text -> logger.debug("Received plain text ${frame.readText()}") + } } + } catch (e: Exception) { + logger.error("An error occurred while receiving data", e) + disconnect(false) } - } catch (e: Exception) { - logger.error("Error during heartbeat: ${e.message}", e) - handleDisconnection(false) - break } - } - } - } - - private fun startReceiving() { - receiveJob?.cancel() - receiveJob = scope.launch { - logger.debug("Starting receive loop") - try { - val incoming = session?.incoming ?: return@launch - for (frame in incoming) { - when (frame) { - is Frame.Binary -> { - val data = frame.readBytes() - onNetMsgReceived(NetMsgEventArgs(data, currentEndPointRef.get())) - } - else -> logger.debug("Received non-binary frame: $frame") - } - } + logger.debug("Connected to ${endPoint.hostName}:${endPoint.port}") + onConnected() } catch (e: Exception) { - if (!isClosing.get()) { - logger.error("Error in receive loop: ${e.message}", e) - handleDisconnection(false) - } + logger.error("An error occurred setting up the web socket client", e) + disconnect(false) } } } override fun disconnect(userInitiated: Boolean) { - if (isClosing.getAndSet(true)) { - logger.debug("Disconnect already in progress") - return - } - - scope.launch { + launch { try { - logger.debug("Disconnecting... (user initiated: $userInitiated)") - - heartbeatJob?.cancelAndJoin() - receiveJob?.cancelAndJoin() - sendJob?.cancelAndJoin() - - val currentSession = session - + monitorJob?.cancel() + session?.close() + client?.close() + } finally { session = null - isConnected.set(false) - - try { - currentSession?.close() - } catch (e: Exception) { - logger.error("Error closing session", e) - } + client = null + endpoint = null - val oldEndPoint = currentEndPointRef.get() - - if (oldEndPoint != null) { - onDisconnected(userInitiated) + if (!userInitiated) { + job.cancelChildren() } - - currentEndPointRef.set(null) - - logger.debug("Disconnected successfully") - } catch (e: Exception) { - logger.error("Error during disconnect: ${e.message}", e) - } finally { - isClosing.set(false) } } - } - private fun handleDisconnection(userInitiated: Boolean) { - disconnect(userInitiated) + onDisconnected(userInitiated) } override fun send(data: ByteArray) { - if (!isConnected.get()) { - logger.error("Cannot send: not connected") - return - } - - sendJob?.cancel() - sendJob = scope.launch { + launch { try { - val currentSession = session - if (currentSession == null) { - logger.debug("Cannot send: session is null") - handleDisconnection(false) - return@launch - } - - withTimeout(5000) { - currentSession.send(Frame.Binary(true, data)) - logger.debug("Sent ${data.size} bytes") - } + session?.send(Frame.Binary(true, data)) } catch (e: Exception) { - logger.error("Error sending data: ${e.message}", e) - handleDisconnection(false) + logger.error("An error occurred while sending data", e) + disconnect(false) } } } override fun getLocalIP(): InetAddress = InetAddress.getLocalHost() - override fun getCurrentEndPoint(): InetSocketAddress? = currentEndPointRef.get() + override fun getCurrentEndPoint(): InetSocketAddress? = endpoint override fun getProtocolTypes(): ProtocolTypes = ProtocolTypes.WEB_SOCKET + + /** + * Rudimentary watchdog + */ + private fun startConnectionMonitoring() { + monitorJob = launch { + var lastFrameTime = System.currentTimeMillis() + + launch { + session?.incoming?.consumeEach { frame -> + lastFrameTime = System.currentTimeMillis() + } + } + + while (isActive) { + delay(5000) + val timeSinceLastFrame = System.currentTimeMillis() - lastFrameTime + + // logger.debug("Watchdog status: $timeSinceLastFrame") + when { + timeSinceLastFrame > 30000 -> { + logger.error("Watchdog: No response for 30 seconds. Disconnecting from steam") + disconnect(false) + break + } + timeSinceLastFrame > 20000 -> logger.debug("Watchdog: No response for 20 seconds") + timeSinceLastFrame > 15000 -> logger.debug("Watchdog: No response for 15 seconds") + } + } + } + } } diff --git a/src/main/java/in/dragonbra/javasteam/steam/CMClient.java b/src/main/java/in/dragonbra/javasteam/steam/CMClient.java index 1491de56..aa778935 100644 --- a/src/main/java/in/dragonbra/javasteam/steam/CMClient.java +++ b/src/main/java/in/dragonbra/javasteam/steam/CMClient.java @@ -117,7 +117,12 @@ public CMClient(SteamConfiguration configuration) { this.configuration = configuration; - heartBeatFunc = new ScheduledFunction(() -> send(new ClientMsgProtobuf(CMsgClientHeartBeat.class, EMsg.ClientHeartBeat)), 5000); + heartBeatFunc = new ScheduledFunction(() -> { + var heartbeat = new ClientMsgProtobuf( + CMsgClientHeartBeat.class, EMsg.ClientHeartBeat); + heartbeat.getBody().setSendReply(true); // Ping Pong + send(heartbeat); + }, 5000); } /** From 535226af1bad25eb8cbfbabf1e54666685ec8854 Mon Sep 17 00:00:00 2001 From: LossyDragon Date: Wed, 15 Jan 2025 09:40:47 -0600 Subject: [PATCH 5/8] Simplify web socket lifecycle and fixed two consumers fighting for incoming packets. --- .../networking/steam3/WebSocketConnection.kt | 32 +++++++------------ 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt b/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt index ac7e26bb..aafb8f5b 100644 --- a/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt +++ b/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt @@ -36,9 +36,7 @@ class WebSocketConnection : private val logger = LogManager.getLogger(WebSocketConnection::class.java) } - private val job = SupervisorJob() - - private var monitorJob: Job? = null + private val job: Job = SupervisorJob() private var client: HttpClient? = null @@ -46,6 +44,8 @@ class WebSocketConnection : private var endpoint: InetSocketAddress? = null + private var lastFrameTime = System.currentTimeMillis() + override val coroutineContext: CoroutineContext = Dispatchers.IO + job override fun connect(endPoint: InetSocketAddress, timeout: Int) { @@ -77,8 +77,9 @@ class WebSocketConnection : session?.incoming?.consumeEach { frame -> when (frame) { is Frame.Binary -> { - val data = frame.readBytes() - onNetMsgReceived(NetMsgEventArgs(data, currentEndPoint)) + logger.debug("on Binary ${frame.data.size}") + lastFrameTime = System.currentTimeMillis() + onNetMsgReceived(NetMsgEventArgs(frame.readBytes(), currentEndPoint)) } is Frame.Close -> disconnect(false) @@ -105,7 +106,6 @@ class WebSocketConnection : override fun disconnect(userInitiated: Boolean) { launch { try { - monitorJob?.cancel() session?.close() client?.close() } finally { @@ -113,9 +113,7 @@ class WebSocketConnection : client = null endpoint = null - if (!userInitiated) { - job.cancelChildren() - } + job.cancelChildren() } } @@ -125,7 +123,8 @@ class WebSocketConnection : override fun send(data: ByteArray) { launch { try { - session?.send(Frame.Binary(true, data)) + val frame = Frame.Binary(true, data) + session?.send(frame) } catch (e: Exception) { logger.error("An error occurred while sending data", e) disconnect(false) @@ -143,17 +142,8 @@ class WebSocketConnection : * Rudimentary watchdog */ private fun startConnectionMonitoring() { - monitorJob = launch { - var lastFrameTime = System.currentTimeMillis() - - launch { - session?.incoming?.consumeEach { frame -> - lastFrameTime = System.currentTimeMillis() - } - } - + launch { while (isActive) { - delay(5000) val timeSinceLastFrame = System.currentTimeMillis() - lastFrameTime // logger.debug("Watchdog status: $timeSinceLastFrame") @@ -166,6 +156,8 @@ class WebSocketConnection : timeSinceLastFrame > 20000 -> logger.debug("Watchdog: No response for 20 seconds") timeSinceLastFrame > 15000 -> logger.debug("Watchdog: No response for 15 seconds") } + + delay(5000) } } } From 94c7914c2429572a125f5e71eb67066d18baafbb Mon Sep 17 00:00:00 2001 From: LossyDragon Date: Tue, 21 Jan 2025 21:02:08 -0600 Subject: [PATCH 6/8] Fix null connection when trying to reconnect after failed succession. Skip tryMark if endPoint is null. --- .../javasteam/networking/steam3/WebSocketConnection.kt | 5 ++++- .../javasteam/steam/discovery/SmartCMServerList.kt | 9 +++++++-- .../javasteam/steam/discovery/SmartCMServerListTest.java | 7 +++++++ 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt b/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt index aafb8f5b..bcb9a2e7 100644 --- a/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt +++ b/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt @@ -50,6 +50,8 @@ class WebSocketConnection : override fun connect(endPoint: InetSocketAddress, timeout: Int) { launch { + logger.debug("Trying connection to ${endPoint.hostName}:${endPoint.port}") + try { endpoint = endPoint @@ -111,7 +113,6 @@ class WebSocketConnection : } finally { session = null client = null - endpoint = null job.cancelChildren() } @@ -153,7 +154,9 @@ class WebSocketConnection : disconnect(false) break } + timeSinceLastFrame > 20000 -> logger.debug("Watchdog: No response for 20 seconds") + timeSinceLastFrame > 15000 -> logger.debug("Watchdog: No response for 15 seconds") } diff --git a/src/main/java/in/dragonbra/javasteam/steam/discovery/SmartCMServerList.kt b/src/main/java/in/dragonbra/javasteam/steam/discovery/SmartCMServerList.kt index b2e1129a..9c8df9d7 100644 --- a/src/main/java/in/dragonbra/javasteam/steam/discovery/SmartCMServerList.kt +++ b/src/main/java/in/dragonbra/javasteam/steam/discovery/SmartCMServerList.kt @@ -193,10 +193,15 @@ class SmartCMServerList(private val configuration: SteamConfiguration) { } } - fun tryMark(endPoint: InetSocketAddress, protocolTypes: ProtocolTypes, quality: ServerQuality): Boolean = + fun tryMark(endPoint: InetSocketAddress?, protocolTypes: ProtocolTypes, quality: ServerQuality): Boolean = tryMark(endPoint, EnumSet.of(protocolTypes), quality) - fun tryMark(endPoint: InetSocketAddress, protocolTypes: EnumSet, quality: ServerQuality): Boolean { + fun tryMark(endPoint: InetSocketAddress?, protocolTypes: EnumSet, quality: ServerQuality): Boolean { + if (endPoint == null) { + logger.error("Couldn't mark an endpoint ${quality.name}, skipping it") + return false + } + val serverInfos: List if (quality == ServerQuality.GOOD) { diff --git a/src/test/java/in/dragonbra/javasteam/steam/discovery/SmartCMServerListTest.java b/src/test/java/in/dragonbra/javasteam/steam/discovery/SmartCMServerListTest.java index fd83daca..ef0e04e3 100644 --- a/src/test/java/in/dragonbra/javasteam/steam/discovery/SmartCMServerListTest.java +++ b/src/test/java/in/dragonbra/javasteam/steam/discovery/SmartCMServerListTest.java @@ -285,4 +285,11 @@ var record = ServerRecord.createSocketServer(new InetSocketAddress(InetAddress.g var marked = serverList.tryMark(new InetSocketAddress(InetAddress.getLoopbackAddress(), 27016), record.getProtocolTypes(), ServerQuality.GOOD); Assertions.assertFalse(marked); } + + @Test + public void testNullConnection_ShouldReturnFalse() { + var result = serverList.tryMark(null, ProtocolTypes.WEB_SOCKET, ServerQuality.BAD); + + Assertions.assertFalse(result); + } } From a327a929b73a6f85c7638a93136fcca00e1dbe96 Mon Sep 17 00:00:00 2001 From: LossyDragon Date: Fri, 24 Jan 2025 21:44:02 -0600 Subject: [PATCH 7/8] Add another timeout message. Use watchdog to check if Client or Session are still active, otherwise disconnect. --- .../networking/steam3/WebSocketConnection.kt | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt b/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt index bcb9a2e7..0c6f0b01 100644 --- a/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt +++ b/src/main/java/in/dragonbra/javasteam/networking/steam3/WebSocketConnection.kt @@ -79,7 +79,7 @@ class WebSocketConnection : session?.incoming?.consumeEach { frame -> when (frame) { is Frame.Binary -> { - logger.debug("on Binary ${frame.data.size}") + // logger.debug("on Binary ${frame.data.size}") lastFrameTime = System.currentTimeMillis() onNetMsgReceived(NetMsgEventArgs(frame.readBytes(), currentEndPoint)) } @@ -106,6 +106,7 @@ class WebSocketConnection : } override fun disconnect(userInitiated: Boolean) { + logger.debug("Disconnect called: $userInitiated") launch { try { session?.close() @@ -145,16 +146,23 @@ class WebSocketConnection : private fun startConnectionMonitoring() { launch { while (isActive) { + if (client?.isActive == false || session?.isActive == false) { + logger.error("Client or Session is no longer active") + disconnect(userInitiated = false) + } + val timeSinceLastFrame = System.currentTimeMillis() - lastFrameTime // logger.debug("Watchdog status: $timeSinceLastFrame") when { timeSinceLastFrame > 30000 -> { logger.error("Watchdog: No response for 30 seconds. Disconnecting from steam") - disconnect(false) + disconnect(userInitiated = false) break } + timeSinceLastFrame > 25000 -> logger.debug("Watchdog: No response for 25 seconds") + timeSinceLastFrame > 20000 -> logger.debug("Watchdog: No response for 20 seconds") timeSinceLastFrame > 15000 -> logger.debug("Watchdog: No response for 15 seconds") From e3502d216b6b8ccec9713338bfcd3a87b057380a Mon Sep 17 00:00:00 2001 From: LossyDragon Date: Tue, 4 Feb 2025 17:44:56 -0600 Subject: [PATCH 8/8] Promote version --- build.gradle.kts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle.kts b/build.gradle.kts index 0df7ceda..74bba141 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -18,7 +18,7 @@ plugins { allprojects { group = "in.dragonbra" - version = "1.6.0-SNAPSHOT" + version = "1.6.0" } repositories {