diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/debug/TelemetryDebugger.java b/ice-adapter/src/main/java/com/faforever/iceadapter/debug/TelemetryDebugger.java index c8d2ad7..3bf7a2e 100644 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/debug/TelemetryDebugger.java +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/debug/TelemetryDebugger.java @@ -18,6 +18,7 @@ import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.google.common.util.concurrent.RateLimiter; import com.nbarraille.jjsonrpc.JJsonPeer; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.ice4j.ice.Candidate; import org.ice4j.ice.CandidatePair; @@ -33,8 +34,10 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; @Slf4j public class TelemetryDebugger implements Debugger { @@ -42,6 +45,9 @@ public class TelemetryDebugger implements Debugger { private final ObjectMapper objectMapper; private final Map peerRateLimiter = new ConcurrentHashMap<>(); + private final BlockingQueue messageQueue = new LinkedBlockingQueue<>(); + + private final Thread sendingLoopThread; public TelemetryDebugger(String telemetryServer, int gameId, int playerId) { Debug.register(this); @@ -79,14 +85,38 @@ public void onError(Exception ex) { objectMapper = new ObjectMapper(); objectMapper.registerModule(new JavaTimeModule()); + + sendingLoopThread = new Thread(this::sendingLoop, "sendingLoop"); + sendingLoopThread.setUncaughtExceptionHandler((t, e) -> log.error("Thread sendingLoop crashed unexpectedly", e)); + sendingLoopThread.start(); } private void sendMessage(OutgoingMessageV1 message) { try { - String json = objectMapper.writeValueAsString(message); - websocketClient.send(json); - } catch (IOException e) { - log.error("Error on serialising message object: {}", message, e); + messageQueue.put(message); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @SneakyThrows + private void sendingLoop() { + while (true) { + var message = messageQueue.take(); + try { + String json = objectMapper.writeValueAsString(message); + + if(websocketClient.isClosed()) { + log.warn("Telemetry websocket is closed"); + websocketClient.reconnectBlocking(); + log.info("Telemetry websocket reconnected"); + } + + log.trace("Sending telemetry message: {}", json); + websocketClient.send(json); + } catch (Exception e) { + log.error("Error on sending message object: {}", message, e); + } } } diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/ice/GameSession.java b/ice-adapter/src/main/java/com/faforever/iceadapter/ice/GameSession.java index 2f97a36..7819e6c 100644 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/ice/GameSession.java +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/ice/GameSession.java @@ -76,6 +76,7 @@ public void disconnectFromPeer(int remotePlayerId) { * Stops the connection to all peers and all ice agents */ public void close() { + log.info("Closing gameSession"); peers.values().forEach(Peer::close); peers.clear(); } diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/ice/Peer.java b/ice-adapter/src/main/java/com/faforever/iceadapter/ice/Peer.java index a2f0bb7..a4f3369 100644 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/ice/Peer.java +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/ice/Peer.java @@ -94,6 +94,8 @@ public void close() { return; } + log.info("Closing peer for player {}", getRemoteId()); + closing = true; if(faSocket != null) { faSocket.close(); diff --git a/ice-adapter/src/main/java/com/faforever/iceadapter/ice/PeerConnectivityCheckerModule.java b/ice-adapter/src/main/java/com/faforever/iceadapter/ice/PeerConnectivityCheckerModule.java index 749dbc3..9ee3a9a 100644 --- a/ice-adapter/src/main/java/com/faforever/iceadapter/ice/PeerConnectivityCheckerModule.java +++ b/ice-adapter/src/main/java/com/faforever/iceadapter/ice/PeerConnectivityCheckerModule.java @@ -39,10 +39,15 @@ synchronized void start() { averageRTT = 0.0f; lastPacketReceived = System.currentTimeMillis(); - checkerThread = new Thread(this::checkerThread); + checkerThread = new Thread(this::checkerThread, getThreadName()); + checkerThread.setUncaughtExceptionHandler((t, e) -> log.error("Thread {} crashed unexpectedly", t.getName(), e)); checkerThread.start(); } + private String getThreadName() { + return "connectivityChecker-"+ice.getPeer().getRemoteId(); + } + synchronized void stop() { if (!running) { return; @@ -82,6 +87,8 @@ void echoReceived(byte[] data, int offset, int length) { private void checkerThread() { while (running) { + log.trace("Running connectivity checker"); + byte[] data = new byte[9]; data[0] = 'e'; @@ -104,5 +111,7 @@ private void checkerThread() { return; } } + + log.info(getThreadName()+" stopped gracefully"); } }