In [1]:
import numpy as np
import pandas as pd
import pyedflib
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, Bidirectional, LSTM, Dropout, Flatten, Dense, Conv1D, MaxPooling1D

In [2]:
def get_dataset(name):
    #Загружаем датасет
    def read_dataset(file_path):
        edf_file = pyedflib.EdfReader(file_path)
        n_signals = edf_file.signals_in_file
        signal_labels = edf_file.getSignalLabels()
        signals = [edf_file.readSignal(i) for i in range(n_signals)]
        edf_file.close()
        print('Сигналов обнаружено: ', n_signals)
        return signal_labels, signals

    def read_txt_markers(file_path):
        with open(file_path, 'r') as file:
            labels = file.read().splitlines()
        return labels
    
    def convert_to_sec(time: str):
        s = list(map(int, time.split(':')))
        return s[0]*3600 + s[1] * 60 + s[2]

    def get_markered_dataset(data_file_path, labels_file_path):
        signal_labels, signals = read_dataset(data_file_path)
        signals = np.array(signals)
        data = pd.DataFrame(signals).T.rename(columns={i: signal_labels[i] for i in range(len(signal_labels))})
        
        markers = read_txt_markers(labels_file_path)[1:]
        markers = [line.split('\t') for line in markers]
        markers = [[line[0], convert_to_sec(line[1])*400, line[2]] for line in markers]
        
        markers_df = pd.DataFrame(markers, columns=['id', 'time', 'marker'])
        markers_df['time'] = markers_df['time'].astype(int)
    
        data['target'] = None
    
        for i in range(0, len(markers_df), 2):
            start_marker = markers_df.iloc[i]
            end_marker = markers_df.iloc[i + 1]
            if start_marker['marker'].startswith('ds'):
                mask = list(range(start_marker.time, end_marker.time))
                data.loc[mask, 'target'] = 'ds'
            elif start_marker['marker'].startswith('is'):
                mask = list(range(start_marker.time, end_marker.time))
                data.loc[mask, 'target'] = 'is'
            elif start_marker['marker'].startswith('swd'):
                mask = list(range(start_marker.time, end_marker.time))
                data.loc[mask, 'target'] = 'swd'
        percentage_marked = data[data['target'].notna()].shape[0]/data.shape[0]
        print(f'Размечено {round(percentage_marked, 2)} данных')
        return data

    def load_marked_dataset(file, folder='ECoG_fully_marked_(4+2 files, 6 h each)', base_path=r"Downloads/train_dataset_minzdrav_train"):
        dataset_file_path = fr"{base_path}/{folder}/{file}.edf"
        markers_file_path = fr"{base_path}/{folder}/{file}.txt"
        dataset = get_markered_dataset(dataset_file_path, markers_file_path)
        return dataset

    dataset = load_marked_dataset(name)
    dataset = dataset[dataset['target'].notna()]
    return dataset

In [3]:
dataset_names = ['Ati4x1_15m_BL_6h', 
                 'Ati4x1_15m_Dex003(Pharm!)_6h', 
                 'Ati4x1_15m_H2O_6h', 
                 'Ati4x3_12m_BL_6h',
                 'Ati4x6_14m_BL_6h']
#'Ati4x3_9m_Xyl01(Pharm!)_6h', 
dataset_list = []
for dataset_name in dataset_names:
    dataset = get_dataset(dataset_name)
    dataset = dataset[dataset['target'].notna()]
    dataset_list.append(dataset)
    
all_datas = pd.concat(dataset_list, ignore_index=True)
all_datas

Сигналов обнаружено:  3
Размечено 0.42 данных
Сигналов обнаружено:  3
Размечено 0.25 данных
Сигналов обнаружено:  3
Размечено 0.22 данных
Сигналов обнаружено:  3
Размечено 0.23 данных
Сигналов обнаружено:  3
Размечено 0.34 данных


Unnamed: 0,FrL,FrR,OcR,target
0,-0.167625,-0.125625,-0.089688,ds
1,-0.175687,-0.100250,-0.038562,ds
2,-0.176687,-0.123250,-0.039438,ds
3,-0.174375,-0.127812,-0.014688,ds
4,-0.138375,-0.085625,0.032250,ds
...,...,...,...,...
12613995,-0.090625,0.000937,-0.033250,swd
12613996,-0.059188,0.032938,0.000438,swd
12613997,-0.056688,0.031000,-0.014563,swd
12613998,-0.047563,0.041250,-0.027938,swd


In [4]:
all_datas['target'] = all_datas['target'].replace({'ds': 0, 'is': 1, 'swd': 2})

  all_datas['target'] = all_datas['target'].replace({'ds': 0, 'is': 1, 'swd': 2})


In [25]:
 window_size = 400

num_samples = (len(all_datas) // window_size) * window_size
all_datas = all_datas.iloc[:num_samples]  # Обрезаем лишние строки

X = all_datas.drop(columns=['target']).values.reshape(-num_samples, window_size, 3)  # Преобразуем в форму (Batch Size, 800, 3)
y = all_datas['target'].values[:num_samples:window_size]  # Каждое значение 'target' соответствует одному окну из 800

# Проверим итоговые формы X и y
print("Форма X:", X.shape)  # Должно быть (Количество выборок, 800, 3)
print("Форма y:", y.shape)  # Должно быть (Количество выборок,)

# Разделение на обучающую и валидационную выборки
from sklearn.model_selection import train_test_split
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

Форма X: (63070, 200, 3)
Форма y: (63070,)


In [19]:
model = Sequential([
    Input(shape=(400, 3)),  # Входной размер (window_size, num_channels)
    
    # CNN блок
    Conv1D(filters=32, kernel_size=50, activation='relu'),
    MaxPooling1D(pool_size=2),
    Dropout(0.3),
    Conv1D(filters=64, kernel_size=50, activation='relu'),
    MaxPooling1D(pool_size=2),
    Dropout(0.3),
    
    # LSTM слой
    LSTM(128, return_sequences=False),
    
    # Полносвязный блок
    Dropout(0.4),
    Flatten(),
    Dense(3, activation='softmax')  # 3 класса для классификации
])

# Компиляция модели
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
model.summary()
# Обучение модели
history = model.fit(X_train, y_train, epochs=10, batch_size=32, validation_data=(X_val, y_val))

# Оценка модели
model.evaluate(X_val, y_val)

Epoch 1/10
[1m789/789[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m32s[0m 37ms/step - accuracy: 0.8976 - loss: 0.3358 - val_accuracy: 0.9191 - val_loss: 0.2417
Epoch 2/10
[1m789/789[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m33s[0m 41ms/step - accuracy: 0.9173 - loss: 0.2480 - val_accuracy: 0.9188 - val_loss: 0.2178
Epoch 3/10
[1m789/789[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m34s[0m 43ms/step - accuracy: 0.9208 - loss: 0.2219 - val_accuracy: 0.9209 - val_loss: 0.2044
Epoch 4/10
[1m789/789[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m34s[0m 43ms/step - accuracy: 0.9254 - loss: 0.2038 - val_accuracy: 0.9222 - val_loss: 0.2118
Epoch 5/10
[1m789/789[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m33s[0m 42ms/step - accuracy: 0.9266 - loss: 0.1951 - val_accuracy: 0.9236 - val_loss: 0.1986
Epoch 6/10
[1m789/789[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m34s[0m 44ms/step - accuracy: 0.9278 - loss: 0.1896 - val_accuracy: 0.9252 - val_loss: 0.1963
Epoch 7/10
[1m7

[0.1910426914691925, 0.9253210425376892]

In [23]:
model.save('cnnlstm_model.keras')

In [20]:
def get_dataset(name):
    #Загружаем датасет
    def read_dataset(file_path):
        edf_file = pyedflib.EdfReader(file_path)
        n_signals = edf_file.signals_in_file
        signal_labels = edf_file.getSignalLabels()
        signals = [edf_file.readSignal(i) for i in range(n_signals)]
        edf_file.close()
        print('Сигналов обнаружено: ', n_signals)
        return signal_labels, signals

    def read_txt_markers(file_path):
        with open(file_path, 'r') as file:
            labels = file.read().splitlines()
        return labels
    
    def convert_to_sec(time: str):
        s = list(map(int, time.split(':')))
        return s[0]*3600 + s[1] * 60 + s[2]

    def get_markered_dataset(data_file_path, labels_file_path):
        signal_labels, signals = read_dataset(data_file_path)
        signals = np.array(signals)
        data = pd.DataFrame(signals).T.rename(columns={i: signal_labels[i] for i in range(len(signal_labels))})
        
        markers = read_txt_markers(labels_file_path)[1:]
        markers = [line.split('\t') for line in markers]
        markers = [[line[0], convert_to_sec(line[2])*400, line[1]] for line in markers]
        
        markers_df = pd.DataFrame(markers, columns=['id', 'time', 'marker'])
        markers_df['time'] = markers_df['time'].astype(int)
    
        data['target'] = None
    
        for i in range(0, len(markers_df), 2):
            start_marker = markers_df.iloc[i]
            end_marker = markers_df.iloc[i + 1]
            if start_marker['marker'].startswith('ds'):
                mask = list(range(start_marker.time, end_marker.time))
                data.loc[mask, 'target'] = 'ds'
            elif start_marker['marker'].startswith('is'):
                mask = list(range(start_marker.time, end_marker.time))
                data.loc[mask, 'target'] = 'is'
            elif start_marker['marker'].startswith('swd'):
                mask = list(range(start_marker.time, end_marker.time))
                data.loc[mask, 'target'] = 'swd'
        percentage_marked = data[data['target'].notna()].shape[0]/data.shape[0]
        print(f'Размечено {round(percentage_marked, 2)} данных')
        return data
    
    def load_marked_dataset(file, folder='ECoG_fully_marked_(4+2 files, 6 h each)', base_path=r"Downloads/train_dataset_minzdrav_train"):
        dataset_file_path = fr"{base_path}/{folder}/{file}.edf"
        markers_file_path = fr"{base_path}/{folder}/{file}.txt"
        dataset = get_markered_dataset(dataset_file_path, markers_file_path)
        return dataset

    dataset = load_marked_dataset(name)
    dataset = dataset[dataset['target'].notna()]
    return dataset

test = get_dataset('Ati4x3_9m_Xyl01(Pharm!)_6h')
test['target'] = test['target'].replace({'ds': 0, 'is': 1, 'swd': 2})
X_test = test.drop(columns=['target'])
y_test = test['target'].values

Сигналов обнаружено:  3
Размечено 0.59 данных


  test['target'] = test['target'].replace({'ds': 0, 'is': 1, 'swd': 2})


In [21]:
window_size = 400
num_channels = 3

# Убедимся, что количество строк в тестовом наборе делится на размер окна
num_samples = (len(test) // window_size) * window_size
test = test.iloc[:num_samples]  # Обрезаем лишние строки

# Преобразуем тестовые данные в форму (количество окон, 800, 3)
X_test = test.drop(columns=['target']).values.reshape(-1, window_size, num_channels)
y_test = test['target'].values[:num_samples:window_size]  # Берем метки для каждого окна

# Оценка модели на всём тестовом наборе
test_loss, test_accuracy = model.evaluate(X_test, y_test, batch_size=32)
print(f"Точность на всём тестовом наборе: {test_accuracy * 100:.2f}%")
print(f"Потери (loss) на всём тестовом наборе: {test_loss:.4f}")

[1m400/400[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 9ms/step - accuracy: 0.9671 - loss: 0.1869
Точность на всём тестовом наборе: 92.05%
Потери (loss) на всём тестовом наборе: 0.4597


In [22]:
df = pd.read_csv('valid_file.csv')
df = df.drop(['Unnamed: 0'], axis=1)

# Размер окна и количество каналов, как в обучении
window_size = 200
num_channels = 3

# Убедимся, что количество строк в тестовом наборе делится на размер окна
num_samples = (len(df) // window_size) * window_size
df = df.iloc[:num_samples]  # Обрезаем лишние строки

# Преобразуем тестовые данные в форму (количество окон, 800, 3)
X_test = df.drop(columns=['target']).values.reshape(-1, window_size, num_channels)
y_test = df['target'].values[:num_samples:window_size]  # Берем метки для каждого окна

# Оценка модели на всём тестовом наборе
test_loss, test_accuracy = model.evaluate(X_test, y_test, batch_size=32)
print(f"Точность на всём тестовом наборе: {test_accuracy * 100:.2f}%")
print(f"Потери (loss) на всём тестовом наборе: {test_loss:.4f}")

[1m267/267[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 3ms/step - accuracy: 0.9522 - loss: 0.2300
Точность на всём тестовом наборе: 92.48%
Потери (loss) на всём тестовом наборе: 0.2969
