# Websockets 

In [1]:
import sys
import asyncio
import websockets
import json
from PyQt5.QtWidgets import QApplication, QWidget, QVBoxLayout, QLabel
from PyQt5.QtCore import QThread, pyqtSignal


class WebSocketThread(QThread):
    """Handles WebSocket connection in a separate thread to avoid freezing the UI."""
    telemetry_received = pyqtSignal(str, str, str)  # Emit status, battery, latency

    async def receive_telemetry(self):
        """Connects to the WebSocket server and receives telemetry data asynchronously."""
        uri = "ws://192.168.2.2:5003"  # TI board's IP address

        try:
            async with websockets.connect(uri) as websocket:
                print(f"Connected to {uri}")

                while True:
                    message = await websocket.recv()
                    data = json.loads(message)

                    latency = data.get("latency", 0)
                    battery = data.get("battery", 0)
                    status = data.get("status", "N/A")

                    self.telemetry_received.emit(status, f"{battery:.1f}%", f"{latency:.2f} ms")  # Send to UI

        except websockets.ConnectionClosed:
            print("Connection closed")
        except Exception as e:
            print(f"WebSocket error: {e}")

    def run(self):
        """Runs the WebSocket event loop inside QThread."""
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(self.receive_telemetry())


class TelemetryApp(QWidget):
    def __init__(self):
        super().__init__()
        self.initUI()

        # Start WebSocket thread
        self.websocket_thread = WebSocketThread()
        self.websocket_thread.telemetry_received.connect(self.update_telemetry)
        self.websocket_thread.start()

    def initUI(self):
        layout = QVBoxLayout()

        self.status_label = QLabel("Status: N/A", self)
        self.battery_label = QLabel("Battery: N/A", self)
        self.latency_label = QLabel("Latency: N/A", self)

        layout.addWidget(self.status_label)
        layout.addWidget(self.battery_label)
        layout.addWidget(self.latency_label)

        self.setLayout(layout)
        self.setWindowTitle("Telemetry Monitor")
        self.resize(300, 200)

    def update_telemetry(self, status, battery, latency):
        """Update telemetry labels in the UI."""
        self.status_label.setText(f"Status: {status}")
        self.battery_label.setText(f"Battery: {battery}")
        self.latency_label.setText(f"Latency: {latency}")

        print(f"Updated UI -> Status: {status}, Battery: {battery}, Latency: {latency}")  # Debugging


if __name__ == "__main__":
    app = QApplication(sys.argv)
    window = TelemetryApp()
    window.show()
    sys.exit(app.exec())


QSocketNotifier: Can only be used with threads started with QThread


SystemExit: 0

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


WebSocket error: timed out during opening handshake


In [None]:
class WebSocketThread(QThread):
    """Handles WebSocket connection in a separate thread to avoid freezing the UI."""
    telemetry_received = pyqtSignal(str, str, str)  # Emit status, battery, latency
