In [66]:
import pyriemann
from filenames_and_paths import *
from sklearn.model_selection import cross_val_score
import numpy as np
import mne
import matplotlib.pyplot as plt
from scipy.linalg import eigh


In [67]:
raw = mne.io.read_raw_brainvision(folders.raw_data + path014 + filenames014[0] + '.vhdr', preload=True)
raw = raw.drop_channels(["EOG", "BIP1", "M1", "M2"])

In [98]:
# Параметры данных
sfreq = 2048  # Частота дискретизации
time = 20
times = np.arange(0, time, 1/sfreq)  # 180 секунд
n_channels = len(raw.ch_names)  # Количество каналов
n_samples = len(times)
eeg_data = raw.get_data().T[:sfreq*time].T
raw = mne.io.RawArray(eeg_data, raw.info)
raw.filter(1, 40)
print(raw.get_data().shape)

## Чистые данные

In [99]:
raw_clean = mne.io.read_raw_eeglab(folders.preprocessed_data + path014 + filenames014[1] + '.set', preload=True)

# Шаг 2: Разделение данных на эпохи

In [100]:
events = mne.make_fixed_length_events(raw, id=1, duration=2.0)
epochs = mne.Epochs(raw, events, event_id=1, tmin=0, tmax=2.0, baseline=None, preload=True)

events_clean = mne.make_fixed_length_events(raw_clean, id=1, duration=2.0)
epochs_clean = mne.Epochs(raw_clean, events_clean, event_id=1, tmin=0, tmax=2.0, baseline=None, preload=True)

# Шаг 3: Вычисление ковариационных матриц

In [101]:
def calculate_covariances(epochs):
    n_epochs, n_channels, n_samples = epochs.shape
    covariances = np.zeros((n_epochs, n_channels, n_channels))
    for i in range(n_epochs):
        covariances[i] = np.cov(epochs[i])
    return covariances

covariances = calculate_covariances(epochs.get_data())
covariances_clean = calculate_covariances(epochs_clean.get_data())

# Шаг 4: Обучение Riemannian Potato

In [102]:
covariances.shape

In [122]:
# from scipy.linalg import eigvalsh, solve
# 
# def _check_inputs(A, B):
#     if not isinstance(A, np.ndarray) or not isinstance(B, np.ndarray):
#         raise ValueError("Inputs must be ndarrays")
#     if not A.shape == B.shape:
#         raise ValueError("Inputs must have equal dimensions")
#     if A.ndim < 2:
#         raise ValueError("Inputs must be at least a 2D ndarray")
# 
# 
# def _recursive(fun, A, B, *args, **kwargs):
#     """Recursive function with two inputs."""
#     if A.ndim == 2:
#         return fun(A, B, *args, **kwargs)
#     else:
#         return np.asarray(
#             [_recursive(fun, a, b, *args, **kwargs) for a, b in zip(A, B)]
#         )
#     
# def distance_riemann(A, B, squared=False):
#     _check_inputs(A, B)
#     d2 = (np.log(_recursive(eigvalsh, A, B))**2).sum(axis=-1)
#     return d2 if squared else np.sqrt(d2)

In [129]:
# Функция для вычисления риманова расстояния
def riemannian_distance(C1, C2):
    eigvals = eigh(C1, C2, eigvals_only=True)
    return np.sqrt(np.sum(np.log(eigvals ** 2)))

def train_potato(covariances, n_train=20):
    mean_cov = np.mean(covariances[:n_train], axis=0)
    distances = np.array([riemannian_distance(mean_cov, cov) for cov in covariances[:n_train]])
    threshold = np.percentile(distances, 95)  # Установим порог на уровне 95-го перцентиля
    return mean_cov, threshold

mean_cov, threshold = train_potato(covariances_clean, n_train=200)

In [124]:
covariances_clean

# Шаг 5: Оценка качества новых данных

In [125]:
def evaluate_potato(covariances, mean_cov, threshold):
    distances = np.array([riemannian_distance(mean_cov, cov) for cov in covariances])
    predictions = distances < threshold
    return predictions, distances

predictions, distances = evaluate_potato(covariances, mean_cov, threshold)

In [128]:
np.log([-1, 1, 0] ** 2)

In [126]:
distances

# Шаг 6: Визуализация результатов

In [88]:
plt.figure(figsize=(10, 5))
plt.plot(distances, 'o')
plt.axhline(y=threshold, color='r', linestyle='--', label='Threshold')
plt.xlabel('Сегмент')
plt.ylabel('Расстояние')
plt.title('Оценка качества данных ЭЭГ с использованием Riemannian Potato')
plt.legend()
plt.show()

plt.figure(figsize=(12, 6))
for i in range(3):
    plt.plot(times, eeg_data[i]*10000 + i, label=f'EEG{i+1}')
plt.xlabel('Время (с)')
plt.ylabel('Амплитуда (смешано)')
# plt.legend(loc='upper right')
plt.show()