Skip to content

Commit

Permalink
Recover connectivity check from telemetry websocket disconnects
Browse files Browse the repository at this point in the history
  • Loading branch information
Brutus5000 committed Nov 17, 2023
1 parent 4c79734 commit 9a35fc8
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.google.common.util.concurrent.RateLimiter;
import com.nbarraille.jjsonrpc.JJsonPeer;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.ice4j.ice.Candidate;
import org.ice4j.ice.CandidatePair;
Expand All @@ -33,15 +34,20 @@
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;

@Slf4j
public class TelemetryDebugger implements Debugger {
private final WebSocketClient websocketClient;
private final ObjectMapper objectMapper;

private final Map<Integer, RateLimiter> peerRateLimiter = new ConcurrentHashMap<>();
private final BlockingQueue<OutgoingMessageV1> messageQueue = new LinkedBlockingQueue<>();

private final Thread sendingLoopThread;

public TelemetryDebugger(String telemetryServer, int gameId, int playerId) {
Debug.register(this);
Expand Down Expand Up @@ -79,14 +85,38 @@ public void onError(Exception ex) {

objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());

sendingLoopThread = new Thread(this::sendingLoop, "sendingLoop");
sendingLoopThread.setUncaughtExceptionHandler((t, e) -> log.error("Thread sendingLoop crashed unexpectedly", e));
sendingLoopThread.start();
}

private void sendMessage(OutgoingMessageV1 message) {
try {
String json = objectMapper.writeValueAsString(message);
websocketClient.send(json);
} catch (IOException e) {
log.error("Error on serialising message object: {}", message, e);
messageQueue.put(message);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@SneakyThrows
private void sendingLoop() {
while (true) {
var message = messageQueue.take();
try {
String json = objectMapper.writeValueAsString(message);

if(websocketClient.isClosed()) {
log.warn("Telemetry websocket is closed");
websocketClient.reconnectBlocking();
log.info("Telemetry websocket reconnected");
}

log.trace("Sending telemetry message: {}", json);
websocketClient.send(json);
} catch (Exception e) {
log.error("Error on sending message object: {}", message, e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public void disconnectFromPeer(int remotePlayerId) {
* Stops the connection to all peers and all ice agents
*/
public void close() {
log.info("Closing gameSession");
peers.values().forEach(Peer::close);
peers.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public void close() {
return;
}

log.info("Closing peer for player {}", getRemoteId());

closing = true;
if(faSocket != null) {
faSocket.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,15 @@ synchronized void start() {
averageRTT = 0.0f;
lastPacketReceived = System.currentTimeMillis();

checkerThread = new Thread(this::checkerThread);
checkerThread = new Thread(this::checkerThread, getThreadName());
checkerThread.setUncaughtExceptionHandler((t, e) -> log.error("Thread {} crashed unexpectedly", t.getName(), e));
checkerThread.start();
}

private String getThreadName() {
return "connectivityChecker-"+ice.getPeer().getRemoteId();
}

synchronized void stop() {
if (!running) {
return;
Expand Down Expand Up @@ -82,6 +87,8 @@ void echoReceived(byte[] data, int offset, int length) {

private void checkerThread() {
while (running) {
log.trace("Running connectivity checker");

byte[] data = new byte[9];
data[0] = 'e';

Expand All @@ -104,5 +111,7 @@ private void checkerThread() {
return;
}
}

log.info(getThreadName()+" stopped gracefully");
}
}

0 comments on commit 9a35fc8

Please sign in to comment.