In [4]:
import numpy as np
import matplotlib.pyplot as plt

from moabb.paradigms import MotorImagery
from moabb.datasets import Liu2024
from pyriemann.estimation import Covariances
from pyriemann.tangentspace import TangentSpace
from sklearn.svm import SVC
from moabb.pipelines.features import AugmentedDataset
import logging
from datetime import datetime
from sklearn.metrics import accuracy_score
import logging

from sklearn.pipeline import Pipeline
import warnings

warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=RuntimeWarning)

In [21]:
# Step 2: Create multichannel trajectory matrix
def create_trajectory_matrix(signals, window_length):
    L = window_length
    N = signals.shape[1]
    K = N - L + 1
    trajectory_blocks = [np.column_stack([signals[ch, i:i + L] for i in range(K)]) for ch in range(signals.shape[0])]
    return np.vstack(trajectory_blocks)

# Step 3: Apply SVD and keep top r components
def mssa_denoise(signals, window_length=50, rank=2):
    traj_matrix = create_trajectory_matrix(signals, window_length)
    U, s, Vt = np.linalg.svd(traj_matrix, full_matrices=False)
    S = np.diag(s[:rank])
    U_r = U[:, :rank]
    Vt_r = Vt[:rank, :]
    traj_recon = U_r @ S @ Vt_r

    print(S.sum()/s.sum())
    return traj_recon

# Step 4: Diagonal averaging
def diagonal_averaging(H):
    L, K = H.shape
    N = L + K - 1
    result = np.zeros(N)
    count = np.zeros(N)
    for i in range(L):
        for j in range(K):
            result[i + j] += H[i, j]
            count[i + j] += 1
    return result / count

# Step 5: Reconstruct each channel
def reconstruct_channels(traj_recon, n_channels, window_length, signal_length):
    L = window_length
    K = signal_length - L + 1
    recon_signals = []
    for i in range(n_channels):
        H = traj_recon[i * L:(i + 1) * L, :]
        recon = diagonal_averaging(H)
        recon_signals.append(recon)
    return np.array(recon_signals)

# Run MSSA



In [None]:
traj_recon = mssa_denoise(signals, window_length, rank)
reconstructed_signals = reconstruct_channels(traj_recon, n_channels, window_length, n_samples)

# Plot original vs reconstructed for comparison
plt.figure(figsize=(12, 8))
for i in range(n_channels):
    plt.subplot(n_channels, 1, i + 1)
    plt.plot(t, signals[i], label='Original', alpha=0.5)
    plt.plot(t, reconstructed_signals[i], label='Reconstructed', linewidth=2)
    plt.title(f'Channel {i+1}')
    plt.legend()
plt.tight_layout()
plt.show()


In [67]:
fmin, fmax = 8, 35
tmin, tmax = 0, None
events = ['left_hand', 'right_hand']

window_length = 30
rank = 12
dataset = Liu2024()
paradigm = MotorImagery(events=events, n_classes=len(events), fmin=fmin, fmax=fmax, tmax=tmax)
X, y, metadata = paradigm.get_data(dataset, subjects = [3])
acm_pipeline = Pipeline(
    steps = [('augmenteddataset',AugmentedDataset(order=8,lag=9)),
    ('covariances',Covariances(estimator='cov')),
    ('tangentspace',TangentSpace(metric='riemann')),
    ('svc',SVC(C=1.0, kernel='linear'))]
)

subjects = metadata.subject.unique()

for subject in subjects:
    print(subject)

    s_index = (metadata.subject == subject).values
    X_subject, y_subject = X[s_index], y[s_index]

    X_size = len(X_subject)
    training_portion = int(X_size/2)
    
    X_rec = []
    
    for X_i in X_subject:
        traj_recon = mssa_denoise(X_i, window_length, rank)
        reconstructed_signals = reconstruct_channels(traj_recon, 29, window_length, 2001)

        X_rec.append(reconstructed_signals)

    X_subject = np.array(X_rec)
    

    X_subject_train, X_subject_test = X_subject[:training_portion], X_subject[training_portion:]
    y_subject_train, y_subject_test = y_subject[:training_portion], y_subject[training_portion:]

    acm_pipeline.fit(X_subject_train, y_subject_train)
    
    y_predict_train = acm_pipeline.predict(X_subject_train)
    y_predict_test = acm_pipeline.predict(X_subject_test)

    testing_accuracy = accuracy_score(y_subject_test, y_predict_test)
    
    training_accuracy = accuracy_score(y_subject_train, y_predict_train)

    time_now = datetime.now().strftime('%y-%m-%d %H:%M:%S')
    print(f'{time_now}    subject: {subject},  trainingAcc: {training_accuracy},   testingAcc: {testing_accuracy}')
    

3
0.395000140362775
0.419080688473475
0.42107732597333286
0.3580555713785242
0.38652568270279203
0.3705539353522533
0.3575462380690537
0.3640055349762235
0.37546681105518886
0.3795521851176756
0.38023421278378566
0.375767022495243
0.4103316192376074
0.43291341498072233
0.4025347992051347
0.42947711673355
0.4086128970355712
0.39968282400961863
0.3765940988488058
0.35840613692574197
0.40211021819345255
0.4360033590082804
0.43450427602847225
0.43155524432431047
0.421901775265033
0.39755612008981944
0.4231913791148887
0.4983263403317651
0.46965161153750307
0.3652394275234186
0.37465277385775586
0.38244844523427424
0.38872218077323334
0.3900912546833989
0.3797506974139066
0.38163327000994957
0.374490735258222
0.3596183230760829
0.39781021660284427
0.4293594914930543


ValueError: Matrices must be positive definite. Add regularization to avoid this error.

In [57]:
acm_pipeline = Pipeline(
    steps = [('augmenteddataset',AugmentedDataset(order=8,lag=9)),
    ('covariances',Covariances(estimator='cov')),
    ('tangentspace',TangentSpace(metric='riemann')),
    ('svc',SVC(C=1, kernel='linear'))]
)

acm_pipeline.fit(X_subject_train, y_subject_train)

y_predict_train = acm_pipeline.predict(X_subject_train)
y_predict_test = acm_pipeline.predict(X_subject_test)

testing_accuracy = accuracy_score(y_subject_test, y_predict_test)

training_accuracy = accuracy_score(y_subject_train, y_predict_train)

time_now = datetime.now().strftime('%y-%m-%d %H:%M:%S')
print(f'{time_now}    subject: {subject},  trainingAcc: {training_accuracy},   testingAcc: {testing_accuracy}')


ValueError: Matrices must be positive definite. Add regularization to avoid this error.