In [None]:
%pip install seaborn

In [4]:
import os
import pandas as pd
import scipy.io as spio
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import signal
from scipy.integrate import simps
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import train_test_split
from sklearn import tree
from sklearn.preprocessing import label_binarize
from sklearn.svm import SVC

In [5]:
sns.set(font_scale=1.2)

## Утилитарные функции

In [6]:
def emotional_labeling(arousal, valence):
    emotions_set = []
    arousal = arousal - 4.5
    valence = valence - 4.5
    for index in range(arousal.size):
        if arousal[index] > 0 and valence[index] > 0:
            # happy
            emotions_set.append(1)
        elif arousal[index] > 0 > valence[index]:
            # angry
            emotions_set.append(2)
        elif arousal[index] < 0 < valence[index]:
            # calm
            emotions_set.append(3)
        elif arousal[index] < 0 and valence[index] < 0:
            # sad
            emotions_set.append(4)
        else:
            emotions_set.append(0)
    return emotions_set

In [8]:
def create_labels(eeg_band_data):
    data_with_labels = pd.DataFrame({'Valence': eeg_band_data[:, 0] - 4.5, 'Arousal': eeg_band_data[:, 1] - 4.5,
                                     'Emotion': emotional_labeling(eeg_band_data[:, 0], eeg_band_data[:, 1])})
    data_with_labels.info()
    data_with_labels.describe()
    np.save("eeg_labels", data_with_labels)

    create_circumplex_model(data_with_labels)

    return data_with_labels

In [10]:
def create_circumplex_model(dataframe):
    # Установите общей фигуры и осей
    fig, ax = plt.subplots(figsize=(12, 8))

    # Установка лимита для осей
    ax.set_xlim(-4.6, 4.6)
    ax.set_ylim(-4.6, 4.6)
    print(dataframe)
    # Установка точек и параметров для соответствующих эмоций
    for index, row in dataframe.iterrows():
        valence = row['Valence']
        arousal = row['Arousal']

        # Соотношение значений валентности и возбуждения с координатами в модели
        x = valence
        y = arousal
        color = 'white'
        marker = "x"
        label = "neutral"
        if y > 0 and x > 0:
            color = 'red'
            marker = "*"
            label = "happy"
        elif y > 0 > x:
            color = 'black'
            marker = "v"
            label = "angry"
        elif y < 0 < x:
            color = 'blue'
            marker = "D"
            label = "calm"
        elif y < 0 and x < 0:
            color = 'green'
            marker = "."
            label = "sad"

        ax.scatter(x, y, s=11, color=color, label=label, alpha=0.5, marker=marker)

    ax.set_xlabel('Valence')
    ax.set_ylabel('Arousal')
    ax.grid(True)

    legend_elements = [
        plt.Line2D([0], [0], marker='*', color='w', markerfacecolor='red', markersize=8, label='Happy'),
        plt.Line2D([0], [0], marker='v', color='w', markerfacecolor='black', markersize=8, label='Angry'),
        plt.Line2D([0], [0], marker='D', color='w', markerfacecolor='blue', markersize=8, label='Calm'),
        plt.Line2D([0], [0], marker='.', color='w', markerfacecolor='green', markersize=8, label='Sad')
    ]
    ax.legend(handles=legend_elements, loc="lower left")
    plt.show()

In [11]:
def bandpower(data, sf, band):
    band = np.asarray(band)
    # Define delta lower and upper limits
    low, high = band
    # Define window length
    window = (2 / low) * sf
    freqs, psd = signal.welch(data, sf, nperseg=window)
    # Построение графика периодограммы Уэльса
    sns.set(font_scale=1.2, style='white')
    
    plt.figure(figsize=(8, 4))
    plt.plot(freqs, psd, color='k', lw=2)
    plt.xlabel('Frequency (Hz)')
    plt.ylabel('Power spectral density (uV^2 / Hz)')
    plt.ylim([0, psd.max() * 1.1])
    plt.title("Welch's periodogram")
    plt.xlim([0, freqs.max()])
    sns.despine()
    # ------------------------------------------------------
    freq_res = freqs[1] - freqs[0]
    # Find intersecting values in frequency vector
    idx_band = np.logical_and(freqs >= low, freqs <= high)
    #  Прежде чем вычислять среднюю мощность дельта-диапазона, нужно найти диапазоны частот,
    #  которые пересекают дельта-диапазон частот.
    
    plt.figure(figsize=(7, 4))
    plt.plot(freqs, psd, lw=2, color='k')
    plt.fill_between(freqs, psd, where=idx_band, color='skyblue')
    plt.xlabel('Frequency (Hz)')
    plt.ylabel('Power spectral density (uV^2 / Hz)')
    plt.xlim([0, 10])
    plt.ylim([0, psd.max() * 1.1])
    plt.title("Welch's periodogram")
    sns.despine()
    
    # ------------------------------------------------------
    band_power = simps(psd[idx_band], dx=freq_res)
    return band_power

In [13]:
def get_band_power(people, channel, band, eeg_data):
    bd = (0, 0)
    if band == "delta":
        bd = (0.5, 4)
    if band == "theta":
        bd = (4, 8)
    elif band == "alpha":
        bd = (8, 12)
    elif band == "beta":
        bd = (12, 30)
    elif band == "gamma":
        bd = (30, 64)
    return bandpower(eeg_data[people, channel], 128, bd)

## Основные функции

In [7]:
def read_preprocessed_data():
    directory = "C:/Users/alash/Desktop/4 курс/Diplom/program/datasets/DEAP/data_preprocessed_matlab/"
    data = []
    labels = []
    files = os.listdir(directory)
    for file in files:
        current_file = spio.loadmat(directory + file)

        keys = [key for key, values in current_file.items() if
                key != '__header__' and key != '__version__' and key != '__globals__']

        labels.append(current_file[keys[0]])
        data.append(current_file[keys[1]])

    labels = np.array(labels)
    data = np.array(data)

    print(files)
    print(labels.shape)
    print(data.shape)

    labels = labels.reshape(1280, 4)
    data = data.reshape(1280, 40, 8064)

    print(labels.shape)
    print(data.shape)

    return labels, data

In [14]:
def feature_extraction(labels, data):
    eeg_data = data[:, :32, :]
    labels = labels[:, :2]
    print(labels.shape)
    print(eeg_data.shape)

    eeg_band = []
    for i in range(len(eeg_data)):
        for j in range(len(eeg_data[0])):
            eeg_band.append(get_band_power(i, j, "delta", eeg_data))
            eeg_band.append(get_band_power(i, j, "theta", eeg_data))
            eeg_band.append(get_band_power(i, j, "alpha", eeg_data))
            eeg_band.append(get_band_power(i, j, "beta", eeg_data))
            eeg_band.append(get_band_power(i, j, "gamma", eeg_data))

    eeg_band = (np.array(eeg_band))
    eeg_band = eeg_band.reshape((1280, 160))
    print(eeg_band.shape)

    np.save("eeg_band.npy", eeg_band)

    return eeg_band

In [15]:
def svm_classifier(data, labels):
    """
    Функция для обучения классификатора SVM на наборе данных и выполнения классификации.

    Аргументы:
    - data: np.array, набор данных размером (1280, 160) с EEG измерениями.
    - labels: np.array, метки классов размером (1280, 3) в формате [arousal (float), valence (float), emotion (string)].

    Возвращает:
    - accuracy: float, точность классификации на тестовых данных.
    - report: str, отчет о классификации (precision, recall, f1-score) на тестовых данных.
    """

    # Извлечение отдельных столбцов меток классов
    arousal = labels[:, 0]
    valence = labels[:, 1]
    emotion = labels[:, 2]

    y = label_binarize(emotion, classes=[0, 1, 2, 3])
    n_classes = y.shape[1]

    # Разделение данных на тренировочную и тестовую выборки
    # data - матрица объектов признаков и emotion - вектор ответов
    X_train, X_test, y_train, y_test = train_test_split(data, emotion, test_size=0.3, random_state=42)

    # Создание и обучение классификатора SVM
    classifier = SVC(decision_function_shape='ovr')
    classifier.fit(X_train, y_train)
    # Прогнозирование меток классов на тестовых данных
    predicted = classifier.predict(X_test)
    # Оценка точности классификации
    accuracy = accuracy_score(predicted, y_test)
    # Генерация отчета о классификации
    report = classification_report(y_test, predicted, zero_division=1)
    print("Точность классификации 4х эмоций: " + str(accuracy))
    print(report)

    return accuracy

In [16]:
def random_forest_classifier(data, labels):
    """
    Функция для обучения классификатора Random Forest на наборе данных и выполнения классификации.

    Аргументы:
    - data: np.array, набор данных размером (1280, 160) с EEG измерениями.
    - labels: np.array, метки классов размером (1280, 3) в формате [arousal (float), valence (float), emotion (string)].

    Возвращает:
    - accuracy: float, точность классификации на тестовых данных.
    - report: str, отчет о классификации (precision, recall, f1-score) на тестовых данных.
    """

    # Извлечение отдельных столбцов меток классов
    arousal = labels[:, 0]
    valence = labels[:, 1]
    emotion = labels[:, 2]

    # Разделение данных на обучающую и тестовую выборки
    X_train, X_test, y_train, y_test = train_test_split(data, emotion, test_size=0.3, random_state=42)

    # Создание и обучение классификатора Random Forest
    classifier = RandomForestClassifier()
    classifier.fit(X_train, y_train)

    # Прогнозирование меток классов на тестовых данных
    y_pred = classifier.predict(X_test)

    # Оценка точности классификации
    accuracy = accuracy_score(y_test, y_pred)

    # Генерация отчета о классификации
    report = classification_report(y_test, y_pred, zero_division=1)

    print("Точность классификации 4х эмоций: " + str(accuracy))
    print(report)

    return accuracy

In [17]:
def classification_knn(data, labels):
    """
    Функция для обучения классификатора  на наборе данных и выполнения классификации.

    Аргументы:
    - data: np.array, набор данных размером (1280, 160) с EEG измерениями.
    - labels: np.array, метки классов размером (1280, 3) в формате [arousal (float), valence (float), emotion (string)].

    Возвращает:
    - accuracy: float, точность классификации на тестовых данных.
    - report: str, отчет о классификации (precision, recall, f1-score) на тестовых данных.
    """

    arousal = labels[:, 0]
    valence = labels[:, 1]
    emotion = labels[:, 2]

    # Разделение данных на обучающую и тестовую выборки
    X_train, X_test, y_train, y_test = train_test_split(data, emotion, test_size=0.3, random_state=42)

    # Создание и обучение классификатора KNN
    knn = KNeighborsClassifier(n_neighbors=7)
    knn.fit(X_train, y_train)

    y_pred = knn.predict(X_test)

    # Оценка точности классификации
    accuracy = accuracy_score(y_test, y_pred)

    # Генерация отчета о классификации
    report = classification_report(y_test, y_pred, zero_division=1)

    print("Точность классификации 4х эмоций: " + str(accuracy))
    print(report)

    return accuracy

In [18]:
# 1 Выделение первоначальных датафреймов для eeg данных и признаков
    # labels_for_feature, data_for_feature = read_preprocessed_data()
    # 2 Выделение признаков из eeg данных на основе диапазона мощности каждого из сигналов
    # eeg_band_data = feature_extraction(labels_for_feature, data_for_feature)
    # 3 Маркировка тренировочного набора arousal, valence соответствующими эмоциями
    # 1 - happy, 2 - angry, 3 - calm, 4 - sad.
    # labels_for_classification = create_labels(labels_for_feature)
    # 4 Классификация различными методам машинного обучения
    # данные берутся из заранее сохраненных результатов предыдущих методов для ускорения работ
    labels_for_classification = 'datasets2/eeg_labels.npy'
    eeg_band_data = 'datasets2/eeg_band.npy'

    # classification with random_forest classificator
    print("Классификации методом random_forest: \n")
    result_rf = random_forest_classifier(np.load(eeg_band_data, allow_pickle=True),
                                         np.load(labels_for_classification, allow_pickle=True))
    # classification with svm classificator
    print("Классификации методом опорных векторов: \n")
    result_svm = svm_classifier(np.load(eeg_band_data, allow_pickle=True),
                                np.load(labels_for_classification, allow_pickle=True))
    # classification with knn classificator
    print("Классификации методом ближайших соседей: \n")
    result_knn = classification_knn(np.load(eeg_band_data, allow_pickle=True),
                                    np.load(labels_for_classification, allow_pickle=True)

IndentationError: unexpected indent (276625320.py, line 10)