In [7]:
import psutil

HOST_IP = "192.168.8.134"
APP_PORT = 5010


In [8]:
from abc import ABC, abstractmethod
import numpy as np
import pywt
from scipy.signal import butter, filtfilt

class Denoiser(ABC):
    name: str

    @abstractmethod
    def apply(self, signal: np.ndarray) -> np.ndarray:
        pass

# class GravityRemovalDenoiser(Denoiser):
#     name = "Gravity Removal"

#     def __init__(self, cutoff=0.1, fs=100.0, order=3):
#         """
#         cutoff: cutoff frequency in Hz for high-pass filter to remove gravity
#         fs: sampling frequency in Hz
#         order: filter order
#         """
#         self.cutoff = cutoff
#         self.fs = fs
#         self.order = order

#     def apply(self, signal: np.ndarray) -> np.ndarray:
#         if len(signal) < 3:  # need at least 3 points
#             return signal
#         nyq = 0.5 * self.fs
#         normal_cutoff = self.cutoff / nyq
#         b, a = butter(self.order, normal_cutoff, btype='high', analog=False)
#         return filtfilt(b, a, signal)



class WaveletDenoiser(Denoiser):
    name = "Wavelet"

    def __init__(self, wavelet="db4", level=1):
        self.wavelet = wavelet
        self.level = level

    def apply(self, signal: np.ndarray) -> np.ndarray:
        coeffs = pywt.wavedec(signal, self.wavelet, level=self.level)
        sigma = np.median(np.abs(coeffs[-1])) / 0.6745
        uthresh = sigma * np.sqrt(2 * np.log(len(signal)))
        coeffs = [pywt.threshold(c, uthresh, mode="soft") for c in coeffs]
        return pywt.waverec(coeffs, self.wavelet)[:len(signal)]


class TotalVariationDenoiser(Denoiser):
    name = "TV"

    def __init__(self, weight=0.05, n_iter=50):
        self.weight = weight
        self.n_iter = n_iter

    def apply(self, signal: np.ndarray) -> np.ndarray:
        x = signal.copy()
        for _ in range(self.n_iter):
            x[1:-1] += self.weight * (x[:-2] - 2 * x[1:-1] + x[2:])
        return x


In [9]:
class GravityRemoverDenoiser(Denoiser):
    def __init__(self, alpha: float = 0.8):
        """
        :param alpha: Smoothing factor for the low-pass filter (0 < alpha < 1).
                      Higher alpha values track gravity more slowly.
        """
        self.name = "Gravity Remover (High-Pass Filter)"
        self.alpha = alpha

    def apply(self, signal: np.ndarray) -> np.ndarray:
        """
        Removes gravity from the signal using an IIR High-Pass Filter.
        
        :param signal: A numpy array of shape (N, D) where N is the number 
                       of samples and D is the number of axes (usually 3).
        :return: Denoised signal (Linear Acceleration).
        """
        if signal.size == 0:
            return signal

        # Initialize gravity with the first sample
        # shape (D,)
        gravity = np.copy(signal[0])
        
        # Array to store linear acceleration
        linear_acceleration = np.zeros_like(signal)

        # Apply the filter: 
        # gravity = alpha * gravity + (1 - alpha) * current_sample
        # result = current_sample - gravity
        for i in range(signal.shape[0]):
            gravity = self.alpha * gravity + (1 - self.alpha) * signal[i]
            linear_acceleration[i] = signal[i] - gravity

        return linear_acceleration

In [10]:
import socket, json, threading, queue
from datetime import datetime

class TCPReceiver(threading.Thread):
    def __init__(self, ip, port, output_queue):
        super().__init__(daemon=True)
        self.ip = ip
        self.port = port
        self.queue = output_queue

    def run(self):
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind((self.ip, self.port))
        sock.listen(1)

        print(f"[INFO] Listening on {self.ip}:{self.port}")
        conn, addr = sock.accept()
        print(f"[INFO] Connected from {addr}")

        buffer = ""
        while True:
            data = conn.recv(1024)
            if not data:
                break

            buffer += data.decode()
            while "\n" in buffer:
                line, buffer = buffer.split("\n", 1)
                try:
                    sample = json.loads(line)
                    sample["timestamp"] = datetime.now().strftime("%H:%M:%S.%f")[:-3]
                    self.queue.put(sample)
                except json.JSONDecodeError:
                    pass


In [11]:
import sys
import numpy as np
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore, QtWidgets
import random
import queue
import time

class LivePlotter:
    def __init__(self, sensor_keys, denoisers, buffer_size=200, plots_per_row=3):
        self.sensor_keys = sensor_keys
        self.denoisers = denoisers
        self.buffer_size = buffer_size
        self.plots_per_row = plots_per_row
        self.buffers = {k: [] for k in sensor_keys}
        self.curves = {}

        self.app = QtWidgets.QApplication(sys.argv)
        pg.setConfigOptions(antialias=True)

        self.win = pg.GraphicsLayoutWidget(title="Raw vs Denoised Comparison")
        self.win.resize(1200, 800)
        self.win.show()

        self._init_plots()

    def _init_plots(self):
        colors = ['r', 'g', 'b', 'c', 'm', 'y', 'w']
        layout = self.win.addLayout(row=0, col=0)
        row = 0
        col = 0
        for i, key in enumerate(self.sensor_keys):
            p = layout.addPlot(row=row, col=col, title=key.upper())
            p.showGrid(x=True, y=True)
            p.addLegend()
            self.curves[(key, "Raw")] = p.plot(pen=pg.mkPen("y", width=2), name="Raw")
            for j, d in enumerate(self.denoisers):
                color = colors[j % len(colors)]
                pen = pg.mkPen(color=color, width=1)
                self.curves[(key, d.name)] = p.plot(pen=pen, name=d.name)

            col += 1
            if col >= self.plots_per_row:
                row += 1
                col = 0

    def update_data(self, sample):
        for k in self.sensor_keys:
            if k in sample:
                self.buffers[k].append(sample[k])
                if len(self.buffers[k]) > self.buffer_size:
                    self.buffers[k].pop(0)

    def refresh(self):
        for k in self.sensor_keys:
            data = np.array(self.buffers[k])
            if len(data) < 5:
                continue
            self.curves[(k, "Raw")].setData(data)
            for d in self.denoisers:
                try:
                    self.curves[(k, d.name)].setData(d.apply(data))
                except Exception as e:
                    print(f"[{d.name} ERROR]", e)

    def run(self, data_queue):
        timer = QtCore.QTimer()

        def loop():
            while not data_queue.empty():
                self.update_data(data_queue.get())
            self.refresh()

        timer.timeout.connect(loop)
        timer.start(40)  # 25 FPS ~ 40 ms per update
        sys.exit(self.app.exec())


In [12]:
data_queue = queue.Queue()
receiver = TCPReceiver(HOST_IP, APP_PORT, data_queue)
receiver.start()

denoisers = [
    GravityRemoverDenoiser(),
    WaveletDenoiser(),
    TotalVariationDenoiser()
]

plotter = LivePlotter(
    sensor_keys=["ax", "ay", "az", "gx", "gy", "gz"],
    denoisers=denoisers
)

plotter.run(data_queue)


[INFO] Listening on 192.168.8.134:5010
[INFO] Connected from ('192.168.8.120', 34360)
[INFO] Connected from ('192.168.8.120', 34362)




SystemExit: 0

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
