<a href="https://colab.research.google.com/github/VLADpit/MyGoogleColab/blob/main/rhytms.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Импорты и подгрузка необходимых библиотек

In [43]:
!pip install pyedflib numpy scipy



In [80]:
import numpy as np
from scipy.signal import butter, filtfilt, welch, find_peaks, hilbert
import pyedflib
import zipfile

# Функции

функции реализованы с помощью библиотеки для обработки сигналов [scipy.signal](https://docs.scipy.org/doc/scipy/reference/signal.html)

Полосовой фильтр на основе [фильтра Баттерфорта](https://ru.wikipedia.org/wiki/Фильтр_Баттерворта?ysclid=mldttz69kp980612220) ([scipy.signal.butter()](https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.butter.html#scipy.signal.butter)) для обрезвния частот до 1-60Гц(физиологические ритмы)


In [45]:
def band_filter(signal, fs, low=1.0, high=60.0, order=4):
    nyq = 0.5 * fs
    low, high = low / nyq, high / nyq
    b, a = butter(order, [low, high], btype='band')
    return filtfilt(b, a, signal)

Относительная мощность в заданной частотной полосе с помощью [Метода Уэлча](https://en.wikipedia.org/wiki/Welch%27s_method) ([scipy.signal.welch()](https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.welch.html#scipy.signal.welch))

In [115]:
def relative_power(signal, fs, band, total_band=(1.0, 60.0)):
    freqs, psd = welch(signal, fs, nperseg=min(1024, len(signal)), window='hann')
    idx_band = (freqs >= band[0]) & (freqs <= band[1])
    idx_total = (freqs >= total_band[0]) & (freqs <= total_band[1])
    if psd[idx_total].sum() == 0:
        return 0.0
    return psd[idx_band].sum() / psd[idx_total].sum()

[Спектральная плоскостность](https://en.wikipedia.org/wiki/Spectral_flatness) - это то, на сколько равновероятны частоты сигнала или более строго - Энтропия сигнала. При шумном сигнале энтропия высокая - частоты стремяться к равновероятным. Среднее арифметическое(AM) и среднее геометрическое(GM) при деление GM/AM дают приближённую нормировонную энтропию(отображает на отрезок [0;1]). Чем больше разброс, тем меньше GM/AM.
* Для тонального сигнала:  01–0.3.
* Для белого шума:  0.7–1.0.

In [47]:
def spectral_flatness(signal, fs):
    freqs, psd = welch(signal, fs, nperseg=min(512, len(signal)))
    psd = psd + 1e-12
    gm = np.exp(np.mean(np.log(psd)))
    am = np.mean(psd)
    return gm / am

Обнаружение и подсчёт морганий по лобным каналам Fp1/Fp2.



In [86]:
def detect_blinks(o1, o2, fp1, fp2, fs):
    blink_signal = (fp1 + fp2) / 2.0 - (o1 + o2) / 2.0
    blink_filtered = band_filter(blink_signal, fs, low=1.0, high=10.0)
    analitycs = hilbert(blink_filtered)
    threshold = np.percentile(np.abs(blink_filtered), 98)
    threshold = max(threshold, 20.0)
    peaks, _ = find_peaks(
        np.abs(analitycs),
        height=threshold,
        distance=int(1 * fs),
        prominence=0.5 * threshold  )

    return len(peaks)

Длительность самого длинного непрерывного альфа-эпизода (в секундах). Решение: с помощью скользящего окна(1 секунда), оцениваем мощность в каждом окне и если она больше порога, то фиксируем как 1, иначе 0. Таким образом, мы получим бинарный массив в котором нужно найти наибольшую подпоследовательность из 1

In [49]:
# @title Длительность самого длинного непрерывного альфа-эпизода (в секундах). Решение: с помощью скользящего окна(1 секунда), оцениваем мощность в каждом окне и если она больше порога, то фиксируем как 1, иначе 0. Таким образом, мы получим бинарный массив в котором нужно найти наибольшую подпоследовательность из 1
def longest_alpha(o1, o2, fs, min_duration_sec=0.5):
    occ_signal = (o1 + o2) / 2.0
    window_sec = 1.0
    win_samples = int(window_sec * fs)
    step_samples = int(0.1 * fs)

    if len(occ_signal) < win_samples:
        return 0.0

    alpha_power = []
    times = []
    for start in range(0, len(occ_signal) - win_samples + 1, step_samples):
        seg = occ_signal[start:start + win_samples]
        power = relative_power(seg, fs, (8, 14))
        alpha_power.append(power)
        times.append(start / fs)

    alpha_power = np.array(alpha_power)
    threshold = 0.3
    above = alpha_power > threshold

    longest_sec = 0.0
    current_start = None
    for i, is_alpha in enumerate(above):
        if is_alpha and current_start is None:
            current_start = times[i]
        elif not is_alpha and current_start is not None:
            duration = times[i - 1] - current_start + window_sec
            if duration >= min_duration_sec:
                longest_sec = max(longest_sec, duration)
            current_start = None
    if current_start is not None:
        duration = times[-1] - current_start + window_sec
        if duration >= min_duration_sec:
            longest_sec = max(longest_sec, duration)

    return round(longest_sec, 2)

In [95]:
# @title Функция для более "мягкой" классификации шумов
def is_noisy(o1, o2, fp1, fp2, t3, t4, fs, max_allowed_flatness=0.675, max_amp = 300.0):
    signals = [o1, o2, fp1, fp2, t3, t4]
    flatness_vals = []
    high_amp_ratio = []

    for sig in signals:
        if np.abs(sig).max() > max_amp:
          return True
        flat = spectral_flatness(sig, fs)
        flatness_vals.append(flat)
        high_amp = np.abs(sig) > 100.0
        high_amp_ratio.append(np.mean(high_amp))
    if any(f > max_allowed_flatness for f in flatness_vals):
        return True
    if any(ratio > 0.05 for ratio in high_amp_ratio):
        return True
    if all(f > 0.6 for f in flatness_vals):
        return True
    return False

* Порог амплитуды снижен до 100 мкВ, но теперь мы смотрим на долю времени, а не на пик.
  1. Реальный артефакт длится сотни миллисекунд - доля > 5%.
  2. Физиологический пик - кратковременный - доля < 1%.
* Плоскостность проверяется по каждому каналу.
  * Если хотя бы один канал «плоский» (плохой контакт), запись считается шумом.
* Дополнительная проверка: если все каналы имеют плоскостность > 0.6 - возможно, запись - шум.



# Классификация на одном экземпляре

Чтение сигналов из файла

In [51]:
# @title Чтение сигналов из файла {"vertical-output":true}
with pyedflib.EdfReader('1.edf') as f:
    signals = [f.readSignal(i).astype(np.float64) for i in range(6)]
    fs = [f.getSampleFrequency(i) for i in range(6)][0]
    labels = [label.capitalize().strip() for label in f.getSignalLabels()]

Проверка порядка каналов

In [52]:
labels

['O1', 'T3', 'Fp1', 'Fp2', 'T4', 'O2']

распаковка и предобработка нужных каналов

In [53]:
o1, t3, fp1, fp2, t4, o2 = signals
o1_f = band_filter(o1, fs)
o2_f = band_filter(o2, fs)
fp1_f = band_filter(fp1, fs)
fp2_f = band_filter(fp2, fs)

Проверка на артефакты по амплитуде


In [54]:
amp_ok = all(np.abs(sig).max() <= 150.0 for sig in [o1, o2, fp1, fp2])

Спектральная плоскостность (усреднённая)

In [55]:
flatness = np.mean([
        spectral_flatness(o1_f, fs),
        spectral_flatness(o2_f, fs),
        spectral_flatness(fp1_f, fs),
        spectral_flatness(fp2_f, fs)])

Альфа и бета мощности

In [56]:
alpha = np.mean([relative_power(o1_f, fs, (8, 13)), relative_power(o2_f, fs, (8, 13))])
beta = np.mean([relative_power(fp1_f, fs, (13, 30)), relative_power(fp2_f, fs, (13, 30))])

Классификация

In [57]:
if not amp_ok or flatness > 0.7:
      cls = "Шум"
elif alpha > 2.0 * beta:
      cls = "Расслабленное"
else:
      beta_alpha_ratio = beta / (alpha + 1e-8)
      if beta_alpha_ratio > 2.0:
          cls = "Концентрированное"
      else:
          cls = "Нейтральное"
cls

'Расслабленное'

Ответ совпадает с визуальным анализом


In [58]:
f'Cаммый длинный эпизод альфа-ритма {longest_alpha(o1, o2, fs)}'

'Cаммый длинный эпизод альфа-ритма 22.98'

Cаммый длинный эпизод альфа-ритма 13.19

# Классификация всех 50 записей


In [125]:
# @title Функция анализа 1 записи
def analize_once(file, duration_alpha = 5, alpha_beta_prelative = 2.0):
  with pyedflib.EdfReader(file) as f:
    signals = [f.readSignal(i).astype(np.float64) for i in range(6)]
    fs = [f.getSampleFrequency(i) for i in range(6)][0]

  o1, t3, fp1, fp2, t4, o2 = signals
  o1_f = band_filter(o1, fs)
  o2_f = band_filter(o2, fs)
  fp1_f = band_filter(fp1, fs)
  fp2_f = band_filter(fp2, fs)

  amp_ok = all(np.abs(sig).max() <= 150.0 for sig in [o1, o2, fp1, fp2])
  flatness = np.mean([
        spectral_flatness(o1_f, fs),
        spectral_flatness(o2_f, fs),
        spectral_flatness(fp1_f, fs),
        spectral_flatness(fp2_f, fs)])
  alpha_fp = np.mean([relative_power(fp1_f, fs, (8, 14)), relative_power(fp2_f, fs, (8, 14))])
  beta_fp = np.mean([relative_power(fp1_f, fs, (14, 35)), relative_power(fp2_f, fs, (14, 35))])
  beta_o = np.mean([relative_power(o1_f, fs, (14, 35)), relative_power(o2_f, fs, (14, 35))])
  long_alpha = longest_alpha(o1, o2, fs)
  if is_noisy(o1, o2, fp1, fp2, t3, t4, fs):
      cls = 4
  elif long_alpha >= duration_alpha and abs(alpha_fp - beta_o) > 0.01:
        cls = 1
  else:
        if beta_fp / (alpha_fp + 1e-8) > alpha_beta_prelative:
            cls = 3
        else:
            cls = 2
  return {
        'class': cls,
        'blinks': detect_blinks(o1,o2,fp1, fp2, fs) if cls == 3 else None,
        'longest_alpha_sec': long_alpha if cls == 1 else None}

In [96]:
general = {'labels':'',
       'blinks': 0,
       'longest_alpha':0}

with zipfile.ZipFile('signals.zip', 'r') as zipf:
  list_edf = [f for f in zipf.namelist() if f.endswith(".edf")]
  zipf.extractall('signals')

for ff in list_edf:
  res = analize_once(f'signals/{ff}')
  print(res, ff)
  general['labels'] += str(res['class'])
  general['blinks'] += 0 if res['blinks'] is None else res['blinks']
  general['longest_alpha'] = max(0 if res['longest_alpha_sec'] is None else res['longest_alpha_sec'],general['longest_alpha'])
general

{'class': 1, 'blinks': None, 'longest_alpha_sec': 22.98} 1.edf
{'class': 2, 'blinks': None, 'longest_alpha_sec': None} 2.edf
{'class': 3, 'blinks': 8, 'longest_alpha_sec': None} 3.edf
{'class': 4, 'blinks': None, 'longest_alpha_sec': None} 4.edf
{'class': 2, 'blinks': None, 'longest_alpha_sec': None} 5.edf
{'class': 2, 'blinks': None, 'longest_alpha_sec': None} 6.edf
{'class': 1, 'blinks': None, 'longest_alpha_sec': 46.12} 7.edf
{'class': 3, 'blinks': 9, 'longest_alpha_sec': None} 8.edf
{'class': 4, 'blinks': None, 'longest_alpha_sec': None} 9.edf
{'class': 2, 'blinks': None, 'longest_alpha_sec': None} 10.edf
{'class': 4, 'blinks': None, 'longest_alpha_sec': None} 11.edf
{'class': 1, 'blinks': None, 'longest_alpha_sec': 7.43} 12.edf
{'class': 4, 'blinks': None, 'longest_alpha_sec': None} 13.edf
{'class': 4, 'blinks': None, 'longest_alpha_sec': None} 14.edf
{'class': 2, 'blinks': None, 'longest_alpha_sec': None} 15.edf
{'class': 2, 'blinks': None, 'longest_alpha_sec': None} 16.edf
{'cla

{'labels': '12342213424144224144444414214412444244244344411122',
 'blinks': 24,
 'longest_alpha': 104.97}

метки полученные програмным анализом:
1234221342414422414444441421441244424424434441112

метки полученные визуальным анализом:
12342213424144224144444414214412444244244344411123

расхождение только в 50 записи, по неизвестной причине относительные мощности посчитанные алгоритмом и веб-иструментом отличаются, кроме того споры вызва 18 запись, так как по критериям она подходит и под Расслабленное состояние и под Концентрированное состояние, мы предположили, что есть приоритет классов:
Расслабленное имеет приоритет выше чем Концентрированное и Нейтральное