# Plotting using ESP32

In [1]:
import time, sys
from platform import python_version
print("Running Python:",python_version())

import pyqtgraph as pg
from PyQt6 import QtCore, QtWidgets
#from pyqtgraph.Qt import QtCore, QtGui, QtWidgets

try:
    from PyQt6 import QtCore, QtWidgets
except Exception:
    from PyQt6.QtCore import Qt, QtWidgets

import numpy as np
from scipy import signal
from scipy.fft import fft, fftfreq, fftshift

import esp32imu

import csv

sampling_freq = 100


Running Python: 3.9.9


In [2]:
class IMUAnalyzer:
    # Number of samples used to calculate DFT via FFT
    
    # Window function is used since we are looking at small chunks of signal
    # as they arrive in the buffer. This makes the signal look "locally staionary".
    WINDOW = 'hann'

    # How many seconds of time-domain samples to plot
    SAMPLE_WINDOW_SEC = 5

    # Plotting frequency for time-domain signals
    SAMPLE_PLOT_FREQ_HZ = 10

    def __init__(self, port, sensor='accel'):

        # Which sensor to analyze
        self.sensor = sensor # 'accel' or 'gyro'
        self.FFT_SIZE = 1024*4

        #
        # FFT setup
        #

        self.wind = signal.windows.get_window(self.WINDOW, self.FFT_SIZE)

        # data
        self.Fs = 1
        self.last_t_us = 0
        self.t = []
        self.sensx = []
        self.sensy = []
        self.sensz = []
        self.buf_sensx = []
        self.buf_sensy = []
        self.buf_sensz = []

        #
        # Plotting setup
        #

        sens = "Accelerometer" if self.sensor == 'accel' else "Gyro"

        # initialize Qt gui application and window
        self.app = QtWidgets.QApplication([])
        self.window = QtWidgets.QWidget()
        self.window.setWindowTitle(f"{sens} Analyzer")
        #self.window.resize(1200, 800)
        self.window.setGeometry( 0, 50, 1200, 800)
        # self.window.setBackground('k')

        # create plots
        self.pw = pg.PlotWidget(title=sens)
        self.pw2 = pg.PlotWidget(title="Spectrum")

        # create the layout and add widgets
        self.layout = QtWidgets.QGridLayout()
        self.window.setLayout(self.layout)
        self.layout.addWidget(self.pw, 0, 0)
        self.layout.addWidget(self.pw2, 1, 0)

        self.window.show()

        if 0: # old way
            # initialize Qt gui application and window
            self.default_window_size = (2000, 1000)
            self.app = pg.QtGui.QApplication([])
            self.pgwin = pg.GraphicsWindow(title="{} Analyzer".format(sens))
            self.pgwin.resize(*self.default_window_size)
            self.pgwin.setBackground('k')

            self.pw = self.pgwin.addPlot(row=0, col=0, title=sens)
            self.pw2 = self.pgwin.addPlot(row=1, col=0, title="Spectrum")
        
        #
        # Plotting loop
        #
        
        self.timer = pg.QtCore.QTimer()
        self.timer.timeout.connect(self._timer_cb)
        self.timer.start(int(1e3/self.SAMPLE_PLOT_FREQ_HZ)) # ms

        self.driver = esp32imu.SerialDriver(port, 115200)
        time.sleep(0.1) # wait for everything to initialize
        self.driver.sendRate(sampling_freq)

        msg = esp32imu.RGBLedCmdMsg()
        msg.r = 0
        msg.g = 0
        msg.b = 255
        msg.brightness = 100
        self.driver.sendRGBLedCmd(msg)
        print(msg)

        # Connect an IMU callback that will fire when a sample arrives
        self.driver.registerCallbackIMU(self._imu_cb)

        # Block on application window
        QtGui.QApplication.instance().exec_()

        # clean up to prevent error or resource deadlock
        self.driver.unregisterCallbacks()

    def _timer_cb(self):
        self.pw.plot(self.t, self.sensx, pen='y', clear=True)
        self.pw.plot(self.t, self.sensy, pen='g')
        self.pw.plot(self.t, self.sensz, pen='r')

        # always keep x-axis auto range based on SAMPLE_WINDOW_SEC
        self.pw.enableAutoRange(axis=pg.ViewBox.XAxis)
        self.pw.showGrid(x=True,y=True)
        self.pw.setRange(yRange=[-12,12])

        (f, mag) = self._calcSpectrum(self.buf_sensx); 
        self.pw2.plot(f[int(self.FFT_SIZE/2)+1:self.FFT_SIZE], mag[int(self.FFT_SIZE/2)+1:self.FFT_SIZE], pen=pg.mkPen(color='y', width=2), name='X', clear=True)
        (f, mag) = self._calcSpectrum(self.buf_sensy); 
        self.pw2.plot(f[int(self.FFT_SIZE/2)+1:self.FFT_SIZE], mag[int(self.FFT_SIZE/2)+1:self.FFT_SIZE], pen=pg.mkPen(color='g', width=2), name='Y')
        (f, mag) = self._calcSpectrum(self.buf_sensz); 
        self.pw2.plot(f[int(self.FFT_SIZE/2)+1:self.FFT_SIZE], mag[int(self.FFT_SIZE/2)+1:self.FFT_SIZE], pen=pg.mkPen(color='r', width=2), name='Z')

        self.pw2.enableAutoRange(axis=pg.ViewBox.XAxis)
        self.pw2.addLegend(True)
        self.pw2.setLogMode(x=True,y=True)
        self.pw2.showGrid(x=True,y=True)
        self.pw2.setRange(xRange=[-1,np.log10(self.Fs)/2.0])
        self.pw2.setRange(yRange=[-3.5,1])

        # "drawnow"
        self.app.processEvents()

    def _imu_cb(self, msg):
        global writer
        dt = (msg.t_us - self.last_t_us) * 1e-6 # us to s
        self.last_t_us = msg.t_us 
        hz = 1./dt
        self.Fs = hz
        #print('Got IMU at {} us ({:.0f} Hz): {:.2f}, {:.2f}, {:.2f}, \t {:.2f}, {:.2f}, {:.2f}'
        #        .format(msg.t_us, hz,
        #                msg.accel_x, msg.accel_y, msg.accel_z,
        #                msg.gyro_x, msg.gyro_y, msg.gyro_z))

        if self.sensor == 'accel':
            sensx = msg.gyro_x
            sensy = msg.accel_y 
            #sensy = msg.gyro_y
            sensz = msg.accel_z
        else:
            sensx = msg.gyro_x
            sensy = msg.gyro_y
            sensz = msg.gyro_z

        # FIFO buffer for time-domain plotting
        self.t.append(msg.t_us * 1e-6)
        self.sensx.append(sensx)
        self.sensy.append(sensy)
        self.sensz.append(sensz)

        data = [msg.t_us, sensx, sensy, sensz]
        writer.writerow(data)

        if len(self.t) > hz*self.SAMPLE_WINDOW_SEC:
            self.t.pop(0)
            self.sensx.pop(0)
            self.sensy.pop(0)
            self.sensz.pop(0)

        # FIFO buffer for FFT
        self.buf_sensx.append(sensx)
        self.buf_sensy.append(sensy)
        self.buf_sensz.append(sensz)

        if len(self.buf_sensx) > self.FFT_SIZE:
            self.buf_sensx.pop(0)
            self.buf_sensy.pop(0)
            self.buf_sensz.pop(0)


    def _calcSpectrum(self, buf):

        # compute frequency bins
        f = fftshift(fftfreq(self.FFT_SIZE, 1./self.Fs))

        if len(buf) < self.FFT_SIZE:
            return f, f

        # get rid of DC
        buf = np.array(buf) - np.mean(np.array(buf))

        # window the data for a better behaved short-time FT style DFT
        data = np.array(buf[0:self.FFT_SIZE]) * self.wind

        # compute DFT via FFT
        Y = fft(data)

        # make DFT look as you'd expect, plotting real part
        Y = (np.abs(fftshift(Y))/self.FFT_SIZE)

        return f, Y


In [None]:
port = '/dev/cu.usbserial-1410'
sensor = 'accel' # 'accel' or 'gyro'
csvfile = open('results.csv', 'w')
writer = csv.writer(csvfile)
analyzer = IMUAnalyzer(port, sensor)