## Обработка датасета

In [1]:
# parse_file принимает на вход путь к edf файлу и возвращает:
# - data           : матрица(np.ndarray) в которой хранятся 3 сигнала (FrL, FrR, OcR) + каждая точка проклассифицирована
# То есть каждый элемент двухмерный - точка сигнала + класс
# - swd_annotation : массив кортежей (начало, конец) эпи-разрядов (swd)
# - is_annotation  : массив кортежей (начало, конец) промежуточной фазы сна (is)
# - ds_annotation  : массив кортежей (начало, конец) глубокой фазы сна (ds)

import numpy as np
import mne
from typing import Tuple, List


def parse_file(file_path: str) -> Tuple[np.ndarray, list, list, list]:
    edf = mne.io.read_raw_edf(file_path)

    # Предполагаем, что все сигналы хахатона записаны с частотой 400Гц
    sampling_frequency = edf.info['sfreq']
    if sampling_frequency != 400:
        raise ValueError('Sampling frequency is not 400Hz')
    
    data = edf.get_data().T

    # Для каждого сигнала добавляем класс
    # Класс 0 - нет класса
    # Класс 1 - swd
    # Класс 2 - is
    # Класс 3 - ds
    classes = np.zeros((data.shape[0],), dtype=int)

    # Аннотации - это метки, которые ставятся на временные отрезки сигнала
    # У нас есть: 
    # - swd1, swd2 - начало и конец эпи-разрядов
    # - is1, is2 - начало и конец промежуточной фазы сна
    # - ds1, ds2 - начало и конец глубокой фазы сна
    # Они идут по порядку, то есть swd1 - начало значит сразу после него будет swd2 - конец

    annotations = edf.annotations
    swd_annotation, is_annotation, ds_annotation = [], [], []
    
    # Обрабатываем аннотации и устанавливаем классы точкам
    i = 0
    while i < len(annotations):
        onset = int(annotations[i]['onset'] * sampling_frequency)
        description = annotations[i]['description']

        if description == 'swd1':
            offset = int(annotations[i + 1]['onset'] * sampling_frequency)
            swd_annotation.append((onset, offset))
            classes[onset:offset] = 1
            i += 2 
        elif description == 'is1':
            offset = int(annotations[i + 1]['onset'] * sampling_frequency)
            is_annotation.append((onset, offset))
            classes[onset:offset] = 2
            i += 2
        elif description == 'ds1':
            offset = int(annotations[i + 1]['onset'] * sampling_frequency)
            ds_annotation.append((onset, offset))
            classes[onset:offset] = 3
            i += 2
        else:
            i += 1  

    data_with_classes = np.column_stack((data, classes))
    return data_with_classes, swd_annotation, is_annotation, ds_annotation


In [None]:
import os
import pandas as pd

def find_marked_files(directory):
    marked_files = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            if 'marked' in file:
                marked_files.append(os.path.join(root, file))
    return marked_files

# Укажите путь к директории
directory_path = 'ECoG_fully_marked_(4+2 files, 6 h each)'
files = find_marked_files(directory_path)
all_data = pd.DataFrame(columns=['1', '2', '3', 'target'])
for f in files:
    path = fr"{f}"
    data = parse_file(path)[0]
    data_df = pd.DataFrame(data)
    data_df.columns = ['1', '2', '3', 'target']
    display(data_df['target'].value_counts())
    all_data = pd.concat([all_data, data_df], ignore_index=True)

In [3]:
import numpy as np
np.set_printoptions(suppress=True)
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.io import arff
from sklearn.model_selection import train_test_split
import matplotlib
matplotlib.rcParams["figure.figsize"] = (6, 4)
plt.style.use("ggplot")
import tensorflow as tf
from tensorflow import data
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.metrics import mae
from tensorflow.keras import layers
from tensorflow import keras
from keras.utils import to_categorical 

from sklearn.metrics import accuracy_score, recall_score, precision_score, confusion_matrix, f1_score, classification_report

In [4]:
from sklearn.preprocessing import StandardScaler
standard_scaler = StandardScaler()
X_data = standard_scaler.fit_transform(np.array(all_data[['1']]))
y_data =  np.array(all_data['target'])
X_data_total = X_data  
segment_size = 12000
num_segments = len(X_data_total) // segment_size
k = 1 # кол-во каналов
X_data_total = X_data_total[:num_segments * segment_size].reshape(-1, segment_size, k)
y_data_total = y_data[:num_segments * segment_size].reshape(-1, segment_size, 1)


In [6]:
X_data_total.shape

(4320, 12000, 1)

In [7]:
y_data_total.shape

(4320, 12000, 1)

In [10]:
X_data_total = np.delete(X_data_total, remove_list_third[:1150], axis=0)
y_data_total = np.delete(y_data_total, remove_list_third[:1150], axis=0)
    

In [None]:
sleep_set = X_data_total
y_data_total_сat = to_categorical(y_data_total)
training_data, testing_data, training_labels, testing_labels = train_test_split(sleep_set, y_data_total_сat, test_size=0.3)

dataset_train = tf.data.Dataset.from_tensor_slices((training_data, training_labels))
dataset_test = tf.data.Dataset.from_tensor_slices((testing_data, testing_labels))

BATCH_SIZE = 30
SHUFFLE_BUFFER_SIZE = 10000

dataset_train = dataset_train.batch(BATCH_SIZE)

dataset_test = dataset_test.batch(BATCH_SIZE)

## Обучение модели

In [115]:
import tensorflow as tf

# Define the autoencoder using the Sequential API
CNN_model = tf.keras.Sequential([
    # Encoder Part
    tf.keras.layers.Input(shape=(12000, 3)),

    # First Conv1D layer
    tf.keras.layers.Conv1D(kernel_size=10, filters=50, activation='relu', padding='same', strides=2),
    tf.keras.layers.BatchNormalization(center=True, scale=False),
    tf.keras.layers.MaxPool1D(pool_size=2, padding='same'),
    tf.keras.layers.Dropout(0.20),

    # Second Conv1D layer
    tf.keras.layers.Conv1D(kernel_size=10, filters=100, activation='relu', padding='same', strides=2),
    tf.keras.layers.BatchNormalization(center=True, scale=False),
    tf.keras.layers.MaxPool1D(pool_size=2, padding='same'),
    tf.keras.layers.Dropout(0.20),
    
    # Third Conv1D layer
    tf.keras.layers.Conv1D(kernel_size=10, filters=100, activation='relu', padding='same', strides=2),
    tf.keras.layers.BatchNormalization(center=True, scale=False),
    tf.keras.layers.MaxPool1D(pool_size=3, padding='same'),
    tf.keras.layers.Dropout(0.20),


    # Flatten the output of the Conv1D layers
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dropout(0.20),  # Dropout before the latent layer

    # Latent space (Dense layer representing the compressed representation)
    tf.keras.layers.Dense(300, activation='relu'),

    # Decoder Part
    tf.keras.layers.Dense(12500, activation='relu'),  # Expanding to match the previous dimensions
    tf.keras.layers.Reshape((125, 100)),  # Reshape to match Conv1DTranspose input shape

    # Upsampling and Conv1DTranspose layers to reconstruct the input
    tf.keras.layers.Conv1DTranspose( kernel_size=10, filters=100, activation='relu', padding='same', strides=3),
    tf.keras.layers.BatchNormalization(center=True, scale=False),
    tf.keras.layers.Dropout(0.20),

    tf.keras.layers.Conv1DTranspose(kernel_size=10,filters=100, activation='relu', padding='same', strides=4),
    tf.keras.layers.BatchNormalization(center=True, scale=False),
    tf.keras.layers.Dropout(0.20),

    # Final Conv1DTranspose to reconstruct the original input
    tf.keras.layers.Conv1DTranspose(kernel_size=100,filters=100, activation='sigmoid', padding='same', strides=4),  # Same number of channels as input
    tf.keras.layers.Conv1DTranspose(kernel_size=50,filters=4, activation='sigmoid', padding='same', strides=2)
])

CNN_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001),
              loss=tf.keras.losses.Dice(),
              metrics=['accuracy', 'recall'])

# print model layers
CNN_model.summary()


In [116]:
class_weights = {0: 1., 1: 10., 2: 15., 3:1}


In [24]:
checkpoint_path = "model/cp_new_third.keras"
cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
                                                 save_weights_only=False,
                                                 verbose=1)

In [None]:
history = CNN_model.fit(dataset_train, epochs=50,validation_data=dataset_test,callbacks=[cp_callback], class_weight=class_weights)

[1m41/41[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m15s[0m 363ms/step


In [34]:
CNN_model.save('clown-net-new-finak-third.keras')

In [13]:
def load_model(path_for_model):
    from tensorflow.keras.models import load_model
    return load_model(path_for_model)

In [None]:
model = load_model('clown-net-new-finak-one.keras')


In [15]:
pred = model.predict(X_data_total)
pred_max = np.argmax(pred, axis=2)


[1m28/28[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 349ms/step


In [16]:
y_pred = pred_max.flatten()
y_true = y_data_total.flatten()

In [17]:
accuracy = accuracy_score(y_true, y_pred)
print(f'Accuracy: {accuracy:.2f}')
precision = precision_score(y_true, y_pred, average='weighted')
print(f'Precision: {precision:.2f}')
recall = recall_score(y_true, y_pred,  average='weighted')
print(f'Recall: {recall:.2f}')

Accuracy: 0.87
Precision: 0.84
Recall: 0.87
