## MicroPython ESP32 Algorithm Development

### Establishing connection to target board
First, make sure you've got the right serial port. On unix-based systems, you can run `ls /dev/tty.*` to see your available serial devices. Replace as necessary below.

This will allow Jupyter (your host computer) to run commands and send/receive information to/from your target board in real time using the MicroPython REPL.

In [2]:
%serialconnect to --port="/dev/tty.usbserial-02U1W54L" --baud=115200
# %serialconnect to --port="/dev/tty.usbserial-0001" --baud=115200

[34mConnecting to --port=/dev/tty.usbserial-02U1W54L --baud=115200 [0m
MicroPython d8a7bf8-dirty on 2022-02-09; ESP32 module with ESP32
Type "help()" for more information.
>>>[reboot detected 0]repl is in normal command mode
[\r\x03\x03] b'\r\n>>> '
[\r\x01] b'\r\n>>> \r\nraw REPL; CTRL-B to exit\r\n>' [34mReady.
[0m

In [18]:
%sendtofile lib/computation.py --source lib/computation.py

Sent 181 lines (5258 bytes) to lib/computation.py.


In [86]:
%sendtofile lib/decoding.py --source lib/decoding.py

Sent 234 lines (8715 bytes) to lib/decoding.py.


## Experimentation

In [5]:
import urandom
from ulab import numpy as np

def synth_x(f, Ns, noise_power=0.5, fs=250):
    """
    generate a synthetic signal vector
    
    args:
    Ns [int]: number of samples (time samples)
    noise_power [float]: variance of WGN noise distribution
    """
    t = np.arange(0, Ns/fs, 1/fs)
    return np.sin(t*2*np.pi*f)*(1+urandom.random()*noise_power)

def synth_X(f, Nc, Ns, Nt=1, noise_power=0.5, fs=200, f_std=0.02):
    """
    Generate a matrix of several variations of the same target signal. This is used
    to simulate the measurement of a common signal over multiple EEG channels 
    that have different SNR characteristics.
    
    args:
    f [float]: target frequency of synthetic signal (Hz)
    Nc [int]: number of channels
    Ns [int]: number of samples (time samples)
    noise_power [float]: variance of WGN noise distribution
    fs [float]: sampling frequency (Hz)
    f_std [float]: standard dev. of freq. in generated signal across channels to simulate interference from other frequency components over different channels
    """
    def _synth():
        X = []
        for i in range(Nc): # simulate noisy sinusoids with varying SNR across Nc channels
            f_i = f*(1+urandom.random()*f_std)
            x = synth_x(f_i, Ns, noise_power=noise_power)

            X.append(x)

        return np.array(X)
    
    if Nt <= 1:
        return _synth()
    else:
        trials = []
        for i in range(Nt):
            trials.append(_synth().flatten())

        return np.array(trials)

In [6]:
from lib.decoding import harmonic_reference

X = X_test
Y = harmonic_reference(7, 200, np.max(X_test.shape), Nh=2, standardise_out=True)

In [7]:
from lib.decoding import harmonic_reference
from lib.computation import max_eig

X = X_test
Y = harmonic_reference(7, 200, np.max(X_test.shape), Nh=2, standardise_out=True)

Cxx = np.dot(X, X.transpose()) # auto correlation matrix
Cyy = np.dot(Y, Y.transpose()) 
Cxy = np.dot(X, Y.transpose()) # cross correlation matrix
Cyx = np.dot(Y, X.transpose()) # same as Cxy.T

M1 = np.dot(np.linalg.inv(Cxx), Cxy) # intermediate result
M2 = np.dot(np.linalg.inv(Cyy), Cyx)

lam, _ = max_eig(np.dot(M1, M2), 20)

In [8]:
print(lam)

0.3064166166077461


In [11]:
from lib.decoding import CCA, SingleChannelMsetCCA, SingleChannelGCCA

class DecoderSSVEP():
    
    decoding_algos = ['CCA', 'MsetCCA', 'GCCA']
    
    def __init__(self, stim_freqs, fs, algo):
                    
        self.stim_freqs = stim_freqs 
        self.fs = fs
        self.algo = algo
        
        self.decoder_stack = {}
        
        for f in self.stim_freqs:
            if algo == 'CCA':
                decoder_f = CCA(f, self.fs, Nh=1)
            elif algo == 'MsetCCA':
                decoder_f = SingleChannelMsetCCA()
            elif algo == 'GCCA':
                decoder_f = SingleChannelGCCA(f, self.fs, Nh=1)
            else:
                raise ValueError("Invalid algorithm. Must be one of {}".format(decoding_algos))
            
            self.decoder_stack[f] = decoder_f
    
    @property
    def requires_calibration(self):
        return self.algo in ['MsetCCA', 'GCCA']
    
    @property
    def is_calibrated(self):
        return all([d.is_calibrated for d in self.decoder_stack.values()])
    
    def calibrate(self, calibration_data_map):
        
        if not self.requires_calibration:
            print("Warning: trying to fit data with an algorithm that doesn't require calibration")
            return
        
        for freq, cal_data in calibration_data_map.items():
            if freq not in self.stim_freqs:
                raise ValueError("Invalid stimulus frequency supplied")
            self.decoder_stack[freq].fit(cal_data)
            
    def classify(self, X_test):
        result = {}
        for f, decoder_f in self.decoder_stack.items():
            if self.requires_calibration and not decoder_f.is_calibrated:
                print("Warning: decoder has not been calibrated for {}Hz stimulus frequency".format(f))
                result[f] = np.nan
            else:    
                result[f] = decoder_f.compute_corr(X_test)
        return result    

In [12]:
stim_freqs = [7, 10, 12]
fs = 256
algo = 'GCCA'
Nt = 3
Ns = 100
Nc = 1

decoder = DecoderSSVEP(stim_freqs, fs, algo)
cal_data = {f:synth_X(f, Nc, Ns, Nt=Nt) for f in stim_freqs}

In [16]:
decoder.calibrate(cal_data)

In [None]:
X_test = synth_X(12, 1, 100, Nt=1)

result = decoder.compute_corr(X_test)
print(result)

{12: 0.9630586064983724, 10: -0.01418800263155016, 7: 0.001627946057185693}


In [27]:
X_train = synth_X(7, 1, 100, Nt=3)
X_test = synth_X(10, 1, 100, Nt=1)

mcca = SingleChannelMsetCCA()
mcca.fit(X_train)

print(mcca.compute_corr(X_test))

0.1412296513724207


In [21]:
from ulab import numpy as np

class SingleChannelMsetCCA(): 
    """
    Multiset CCA algorithm for SSVEP decoding.
    Computes optimised reference signal set based on historical observations
    and uses ordinary CCA for final correlation computation given a new test
    signal.
    Note: this is a 1 channel implementation (Nc=1)
    """
    def __init__(self):
        self.Ns, self.Nt = None, None
        
    def fit(self, X, compress_ref=True): 
        """
        Expects a training matrix X of shape Nt x Ns. If `compress_ref=True`, the `Nt` components in optimised reference signal Y will be averaged to form a single reference vector. This can be used for memory optimisation but will likely degrade performance slightly.
        """
        if X.shape[0] > X.shape[1]:
            print("Warning: received more trials than samples. This is unusual behaviour: check X")
        
        R = np.dot(X, X.transpose()) # inter trial covariance matrix
        S = np.eye(len(R))*np.diag(R) # intra-trial diag covariance matrix
        lam, V = solve_gen_eig_prob((R-S), S) # solve generalised eig problem
        w = V[:, np.argmax(lam)] # find eigenvector corresp to largest eigenvalue
        Y = np.array([x*w[i] for i, x in enumerate(X)]) # store optimised reference vector Nt x Ns self.Y = Y
        
        if compress_ref:
            self.Y = np.mean(Y, axis=0).reshape((1, max(Y.shape))) # this will average Nt components in Y: Nc x Nt -> 1 x Nt
    
    def compute(self, X_test):
        if self.Y is None:
            raise ValueError("Reference matrix Y must be computed using  fit  before computing corr")
        if len(X_test.shape) == 1:
            X_test = X_test.reshape((1, len(X_test)))
        return CCA.cca_eig(X_test, self.Y)[0] # use ordinary CCA with optimised ref. Y

In [188]:
def col_concat(*mats):
    cols = sum([mat.shape[1] for mat in mats])
    rows = mats[0].shape[0]
    out = np.zeros((rows, cols))
    j = 0
    for mat in mats:
        mat_cols = mat.shape[1]
        out[:, j:j+mat_cols] = mat
        j += mat_cols
        
    return out

def zeros_like(A):
    return np.zeros(A.shape)

def block_diag(X, Y, reverse=False):
    if not reverse:
        X = np.concatenate((X, zeros_like(X)), axis=1)
        Y = np.concatenate((zeros_like(Y), Y), axis=1)
    else:
        X = np.concatenate((zeros_like(X), X), axis=1)
        Y = np.concatenate((Y, zeros_like(Y)), axis=1)
    return np.concatenate((X, Y), axis=0)

def sign(x):
    x+1 # arb operation to raise an error if non-numeric arg given.
    return 1 if x >=0 else -1

In [215]:
from lib.decoding import harmonic_reference
from lib.computation import solve_gen_eig_prob, corr

class SingleChannelGCCA(): 
    """
    Generalised canonical component analysis for Nc=1.
    Expects the target frequency at `f_ssvep`. `fs` is the sampling rate used and `Nh` the number of harmonics for the harmonic r
    Ref: 'Improving SSVEP Identification Accuracy via Generalized Canonical Correlation Analysis' Sun, Chen et al
    """
    def __init__(self, f_ssvep, fs, Nh=1, name=None):
        self.Ns, self.Nt = None, None
        self.Nh = Nh
        self.w = None
        self.X_bar = None
        self.fs = fs
        self.f_ssvep = f_ssvep
        self.name = name or "gcca_{0}hz".format(f_ssvep)
    
    def fit(self, X): 
        """
        Fit against training data.
        X should be a matrix of dim (Nt x Ns)
        """
        self.Nt, self.Ns = X.shape

        # template signal
        X_bar = np.mean(X, axis=0).reshape((1, Ns))
        Y = harmonic_reference(self.f_ssvep, self.fs, self.Ns)

        # form concatenated matrices (vectors for Nc=1)
        X_c = X.reshape((1, self.Ns*self.Nt))
        
        X_bar_c = col_concat(*[X_bar for i in range(self.Nt)])
        X_bar_c = X_bar_c.reshape((1, self.Ns*self.Nt))
        
        Y_c = col_concat(*[Y for i in range(self.Nt)])
        
        X_comb = col_concat(X_c.T, X_bar_c.T, Y_c.T).T
        
        D1 = np.dot(X_c, X_c.T)
        D2 = np.dot(X_bar_c, X_bar_c.T)
        D3 = np.dot(Y_c, Y_c.T)
        
        D = block_diag(block_diag(D1, D2), D3)
        
        lam, W_eig = solve_gen_eig_prob(np.dot(X_comb, X_comb.T), D)

        self.w = W_eig[:, np.argmax(lam)] # optimal spatial filter vector with dim (2*Nc + 2*Nh)
        self.X_bar = X_bar
        
    def compute(self, X_test):
        """
        Compute output correlation for a test observation with dim. (1 x Ns)
        """
        if self.w is None:
            raise ValueError("call .fit(X_train) before performing classification.")
            
        if len(X_test.shape) == 1:
            X_test = X_test.reshape((len(X_test), 1))
        else:
            X_test = X_test.T 

        w_X = self.w[0:1]
        w_X_bar = self.w[1:2] # second weight correspond to Nc (Nc=1) template channels
        w_Y = self.w[2:] # final 2*Nh weights correspond to ref sinusoids with harmonics

        # regenerate these instead of storing from the `fit` function since
        # computationally cheap to generate but expensive to store in memory
        Y = harmonic_reference(self.f_ssvep, self.fs, self.Ns)

        X_test_image = np.dot(X_test, w_X)
        rho1 = corr(X_test_image, np.dot(self.X_bar.T, w_X_bar))
        rho2 = corr(X_test_image, np.dot(Y.T, w_Y))
        
        return sum([sign(rho_i)*rho_i**2 for rho_i in [rho1, rho2]])/2

In [216]:
Nt = 3
Ns = 200
fs = 256
f = 7

gcca = UnivariateGCCA(f, fs)

In [217]:
# Form trial matrix

X = []
for i in range(Nt):
    X.append(synth_X(f, 1, Ns).flatten())
    
X = np.array(X)
print(X)

array([[0.0, 0.2397766696779567, 0.4720847434558566, ..., -0.3453172962436833, -0.5719038352766238, -0.7806766368633139],
       [0.0, 0.2432302656913529, 0.4788798286886903, ..., -0.3641402916327024, -0.5931155041545074, -0.8036052198676307],
       [0.0, 0.260550645021267, 0.5127782211117632, ..., -0.9419911747589055, -1.126363397439811, -1.274754898609826]], dtype=float64)


In [218]:
gcca.fit(X)

In [224]:
X_test = synth_X(7, 1, Ns)

rho = gcca.compute(X_test)
print(rho)

0.9452395145148026


In [97]:
from lib.decoding import harmonic_reference

# template signal
X_bar = np.mean(X, axis=0)
print(X_bar)

Y = harmonic_reference(f, fs, Ns)
print(Y)

# form concatenated matrices (vectors for Nc=1)
X_c = X.reshape((1, Ns*Nt))
X_bar_c = np.ones(X_bar.shape)*X_bar
Y_c = np.ones(Y.shape)*Y

print(Y_c)

for i in range(1, Nt):
    Y_c = np.concatenate((Y_c, Y))
    X_bar_c = np.concatenate((X_bar_c, X_bar))

Y_c = Y_c.transpose()
X_bar_c = X_bar_c.reshape((1, Ns*Nt))

print(X_bar_c)

array([0.0, 0.2205129133879401, 0.434135087407134], dtype=float64)
array([[0.1709618887603012, 0.33688985339222, 0.492898192229784],
       [0.9852776423889413, 0.9415440651830208, 0.8700869911087114]], dtype=float64)
array([[0.1709618887603012, 0.33688985339222, 0.492898192229784],
       [0.9852776423889413, 0.9415440651830208, 0.8700869911087114]], dtype=float64)
array([[0.0, 0.2205129133879401, 0.434135087407134, 0.0, 0.2205129133879401, 0.434135087407134, 0.0, 0.2205129133879401, 0.434135087407134]], dtype=float64)


In [98]:
X_comb = np.concatenate((X_c, X_bar_c))
# X_comb = np.concatenate((X_comb, Y_c))
print(Y)

array([[0.1709618887603012, 0.33688985339222, 0.492898192229784],
       [0.9852776423889413, 0.9415440651830208, 0.8700869911087114]], dtype=float64)


In [121]:
self = gcca
# template signal
X_bar = np.mean(X, axis=0).reshape((1, Ns))
Y = harmonic_reference(self.f_ssvep, self.fs, self.Ns)

# form concatenated matrices (vectors for Nc=1)
X_c = X.reshape((1, self.Ns*self.Nt))

X_bar_c = col_concat(*[X_bar for i in range(self.Nt)])
X_bar_c = X_bar_c.reshape((1, self.Ns*self.Nt))

Y_c = col_concat(*[Y for i in range(self.Nt)])

X_comb = col_concat(X_c.T, X_bar_c.T, Y_c.T).T

print(Y_c)

array([[0.1709618887603012, 0.33688985339222, 0.492898192229784, 0.1709618887603012, 0.33688985339222, 0.492898192229784, 0.1709618887603012, 0.33688985339222, 0.492898192229784],
       [0.9852776423889413, 0.9415440651830208, 0.8700869911087114, 0.9852776423889413, 0.9415440651830208, 0.8700869911087114, 0.9852776423889413, 0.9415440651830208, 0.8700869911087114]], dtype=float64)


In [122]:
print(X_comb)

array([[0.0, 0.2550657913825401, 0.5019427873289922, 0.0, 0.2556158388150989, 0.5032690942866478, 0.0, 0.2483340087006462, 0.4887535655813716],
       [0.0, 0.2530052129660951, 0.4979884823990039, 0.0, 0.2530052129660951, 0.4979884823990039, 0.0, 0.2530052129660951, 0.4979884823990039],
       [0.1709618887603012, 0.33688985339222, 0.492898192229784, 0.1709618887603012, 0.33688985339222, 0.492898192229784, 0.1709618887603012, 0.33688985339222, 0.492898192229784],
       [0.9852776423889413, 0.9415440651830208, 0.8700869911087114, 0.9852776423889413, 0.9415440651830208, 0.8700869911087114, 0.9852776423889413, 0.9415440651830208, 0.8700869911087114]], dtype=float64)


In [105]:
D1 = np.dot(X_c, X_c.T)
print(D1)

D2 = np.dot(X_bar_c, X_bar_c.T)
print(D2)

D3 = np.dot(Y_c, Y_c.T)
print(D3)

D = block_diag(block_diag(D1, D2), D3)
print(D)

array([[0.7140238263023425]], dtype=float64)
array([[0.7112976572665111]], dtype=float64)
array([[1.157014105891531, 2.743517621334207],
       [2.743517621334207, 7.84298589410847]], dtype=float64)
array([[0.7140238263023425, 0.0, 0.0, 0.0],
       [0.0, 0.7112976572665111, 0.0, 0.0],
       [0.0, 0.0, 1.157014105891531, 2.743517621334207],
       [0.0, 0.0, 2.743517621334207, 7.84298589410847]], dtype=float64)


In [175]:
from lib.computation import solve_gen_eig_prob

lam, W_eig = solve_gen_eig_prob(np.dot(X_comb, X_comb.T), D)

w = W_eig[:, np.argmax(lam)] # optimal spatial filter vector with dim (2*Nc + 2*Nh)
w_X = w[0]
w_X_bar = w[1] # second Nc weights correspond to Nc template channels w_Y_n = w[2*self.Nc:] # final 2*Nh weights correspond to ref sinusoids with harmonics
w_Y = w[2:]

In [50]:
%ls --recursive lib

Listing directory 'lib'.
       27    lib/__init__.mpy
     1643    lib/computation.mpy
      558    lib/config.mpy
     1065    lib/core.mpy
      722    lib/decoding.mpy
     8703    lib/mqtt_as.mpy
      790    lib/networking.mpy
     2504    lib/peripherals.mpy
     1052    lib/scheduling.mpy
     1174    lib/signal.mpy
     2601    lib/umqtt.mpy
      614    lib/utils.mpy
      641    lib/websockets.mpy


In [35]:
%sendtofile --source lib/.env .env  --binary

Sent 3116 bytes in 104 chunks to .env.


In [154]:
%sendtofile lib/computation.py --source lib/computation.py

Sent 144 lines (4234 bytes) to lib/computation.py.


In [155]:
%rebootdevice

repl is in normal command mode
[\r\x03\x03] b'\r\nMicroPython d8a7bf8-dirty on 2022-02-09; ESP32 module with ESP32\r\nType "help()" for more information.\r\n>>> \r\n>>> \r\nMPY: soft reboot\r\nMicroPython d8a7bf8-dirty on 2022-02-09; ESP32 module with ESP32\r\nType "help()" for more information.\r\n>>> \r\n>>> \r\n>>> '
[\r\x01] b'\r\n>>> \r\nraw REPL; CTRL-B to exit\r\n>'

In [12]:
%lsmagic

%capture [--quiet] [--QUIET] outputfilename
    records output to a file

%comment
    print this into output

%disconnect [--raw]
    disconnects from web/serial connection

%esptool [--port PORT] {erase,esp32,esp8266} [binfile]
    commands for flashing your esp-device

%fetchfile [--binary] [--print] [--load] [--quiet] [--QUIET]
                  sourcefilename [destinationfilename]
    fetch and save a file from the device

%ls [--recurse] [dirname]
    list files on the device

%lsmagic
    list magic commands

%mpy-cross [--set-exe SET_EXE] [pyfile]
    cross-compile a .py file to a .mpy file

%readbytes [--binary]
    does serial.read_all()

%rebootdevice
    reboots device

%sendtofile [--append] [--mkdir] [--binary] [--execute] [--source [SOURCE]] [--quiet]
                   [--QUIET]
                   [destinationfilename]
    send cell contents or file/direcectory to the device

%serialconnect [--raw] [--port PORT] [--baud BAUD] [--verbose]
    connects to a device over US