# Построим модель из признаков модели VAR

### Для начала разобьем DEAP на трейн и тест:

In [1]:
import os
import numpy as np
import pandas as pd
import pickle
import random
from typing import List
import matplotlib.pyplot as plt

In [2]:
# путь к DEAP
prepocessed_DEAP_path = '' # путь к предобработанному DEAP
deap_lstdir = os.listdir(prepocessed_DEAP_path)

In [3]:
# создаем общий датасет, который после будем разделять

# проходим по всем испытуемым
subject_dicts = []
for subject_path in deap_lstdir:
    path = os.path.join(prepocessed_DEAP_path, subject_path)
    with open(path, 'rb') as file:
        subject = pickle.load(file, encoding='latin1')
        subject_dicts.append(subject)

In [5]:
def quadrant_fill(val_arous_tuple: tuple):
    valence, arousal = val_arous_tuple
    if valence == 'high' and arousal == 'high':
        return 'HAHV'
    elif valence == 'low' and arousal == 'high':
        return 'HALV'
    elif valence == 'low' and arousal == 'low':
        return 'LALV'
    elif valence == 'high' and arousal == 'low':
        return 'LAHV'

def build_deap_dataframe(subject_dicts) -> pd.DataFrame:
    """
    subject_dicts: список словарей (по одному на каждого испытуемого),
                   каждый словарь должен иметь:
                       'data'   -> shape (40, 40, 8064)
                       'labels' -> shape (40, 4)  (valence, arousal, dominance, liking)

    Возвращает: pd.DataFrame со столбцами:
      [subject_id, video_id, data, valence, arousal, dominance, liking]
    """

    all_rows = []

    for subject_id, subject_dict in enumerate(subject_dicts, start=1):
        data_3d = subject_dict['data']    # shape = (40, 40, 8064)
        labels_2d = subject_dict['labels']  # shape = (40, 4)

        n_trials, n_channels, n_points = data_3d.shape
        n_channels = 32 # берем только ЭЭГ каналы
        fs = 128
        seg_length = 10 * fs  # 10 секунд * 128 Гц = 1280

        for trial_id in range(n_trials):
            # Извлекаем (40, 8064)
            trial_signal = data_3d[trial_id, :n_channels, :]
            
            # Извлекаем метки (valence, arousal, dominance, liking) для данного триала
            valence, arousal, dominance, liking = labels_2d[trial_id]

            # Формируем строку будущего DataFrame
            row_dict = {
                'subject_id': subject_id,
                'video_id': trial_id + 1,
                'data': trial_signal,  
                'valence': valence,
                'arousal': arousal,
                'dominance': dominance,
                'liking': liking
            }
            all_rows.append(row_dict)

    # Создаём DataFrame 
    df = pd.DataFrame(all_rows)
    # сделаем задачу классификации
    df['Low/high valence'] = df['valence'].apply(lambda x: 'low' if x <= 5 else 'high')
    df['Low/high arousal'] = df['arousal'].apply(lambda x: 'low' if x <= 5 else 'high')
    df['quadrant'] = df.apply(lambda x: quadrant_fill((x['Low/high valence'], x['Low/high arousal'])), axis=1)
    return df

def deap_train_test_split(deap_df: pd.DataFrame) -> List[pd.DataFrame]:
    random.seed(42)
    subject_idx = random.sample(range(1, 33),6)
    video_idx = random.sample(range(1, 41),8)

    train_df =  deap_df[(~deap_df['video_id'].isin(video_idx)) & (~deap_df['subject_id'].isin(subject_idx))]
    test_df = deap_df[(deap_df['video_id'].isin(video_idx)) & (deap_df['subject_id'].isin(subject_idx))]
    
    test_new_subj_old_vid = deap_df[(~deap_df['video_id'].isin(video_idx) & (deap_df['subject_id'].isin(subject_idx)))]
    test_new_vid_old_subj = deap_df[(~deap_df['subject_id'].isin(subject_idx) & (deap_df['video_id'].isin(video_idx)))]

    return train_df, test_df, test_new_subj_old_vid, test_new_vid_old_subj # len: 832, 48, 1024, 1040


In [6]:
df = build_deap_dataframe(subject_dicts)
df.head()

Unnamed: 0,subject_id,video_id,data,valence,arousal,dominance,liking,Low/high valence,Low/high arousal,quadrant
0,1,1,"[[0.948231680995192, 1.65333532651348, 3.01372...",7.71,7.6,6.9,7.83,high,high,HAHV
1,1,2,"[[10.260175049914748, 12.795442725569648, 10.4...",8.1,7.31,7.28,8.47,high,high,HAHV
2,1,3,"[[1.0130495576625123, -1.0678322951836536, 3.9...",8.58,7.54,9.0,7.08,high,high,HAHV
3,1,4,"[[-7.658428424515396, -3.2675578443273143, 0.7...",4.94,6.01,6.12,8.06,low,high,HALV
4,1,5,"[[-1.8111079228929805, -4.7838764286765, -0.52...",6.96,3.92,7.19,6.05,high,low,LAHV


In [15]:
train_df, test_df, test_new_subj_old_vid, test_new_vid_old_subj = deap_train_test_split(df)

### Обучим модель VAR

In [16]:
from statsmodels.tsa.api import VAR

In [17]:
def get_var_features(array, best_p=5, last_sec = 10, fs=128):
    # Для VAR statsmodels нужно (time, channels) => (1280, 40)
    seg_length = last_sec * fs
    last_segment = array[:, -seg_length:]
    
    data_var = last_segment.T  # (1280, 40)

    # Обучаем VAR на выбранном p
    model = VAR(data_var)
    results = model.fit(best_p)

    # Извлекаем коэффициенты (shape = (p, k, k)) и интерсепт (shape = (k,))
    coefs = results.coefs      # матрицы A_i
    intercept = results.intercept  # вектор c
    # Превратим всё в один вектор
    # (p*k*k) + (k) признаков
    var_features = np.concatenate([coefs.flatten(), intercept.flatten()])

    return var_features

In [18]:
train_df['var_features'] = train_df['data'].apply(lambda x: get_var_features(x))
train_df

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  train_df['var_features'] = train_df['data'].apply(lambda x: get_var_features(x))


Unnamed: 0,subject_id,video_id,data,valence,arousal,dominance,liking,Low/high valence,Low/high arousal,quadrant,var_features
40,2,1,"[[-11.11631040230457, 71.81244100609055, 122.8...",9.00,5.03,7.13,6.62,high,high,HAHV,"[-26270538892.56006, -26270538892.99161, -2627..."
43,2,4,"[[-171.79711783177316, -117.17857114425763, -4...",6.05,1.00,5.04,7.03,high,low,LAHV,"[-24367199887.515038, -24367199889.032898, -24..."
44,2,5,"[[1.4713609744647984, -38.936057294245664, -66...",5.04,3.00,3.65,5.04,high,low,LAHV,"[10418081987.315346, 10418081986.42015, 104180..."
47,2,8,"[[15.944453976224564, 11.812813111443923, 7.97...",9.00,9.00,9.00,9.00,high,high,HAHV,"[1.7698641691775852, -0.1805263829047668, -0.0..."
49,2,10,"[[178.77559036713717, 144.71389583087222, 97.0...",4.99,1.00,9.00,1.00,low,low,LALV,"[14366347983.066248, 14366347983.031427, 14366..."
...,...,...,...,...,...,...,...,...,...,...,...
1233,31,34,"[[20.488056516461008, 27.592511136336057, 25.5...",1.95,8.03,2.08,1.00,low,high,HALV,"[54136014418.20242, 54136014417.449615, 541360..."
1235,31,36,"[[0.5274022253354644, -2.4252270719619826, 5.0...",4.97,6.95,1.96,1.96,low,high,HALV,"[0.9553039723074183, 0.041257079745979613, 0.1..."
1237,31,38,"[[-10.197774986718422, -4.8967558966950655, 4....",1.00,9.00,1.00,1.00,low,high,HALV,"[0.9520495133535651, 0.0743042161171885, 0.668..."
1238,31,39,"[[-14.439967892369696, -5.040654886024263, 4.0...",1.00,9.00,1.00,1.01,low,high,HALV,"[16144239026.216217, 16144239025.354963, 16144..."


In [19]:
# test
test_df['var_features'] = test_df['data'].apply(lambda x: get_var_features(x))
test_new_subj_old_vid['var_features'] = test_new_subj_old_vid['data'].apply(lambda x: get_var_features(x))
test_new_vid_old_subj['var_features'] = test_new_vid_old_subj['data'].apply(lambda x: get_var_features(x))

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  test_df['var_features'] = test_df['data'].apply(lambda x: get_var_features(x))
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  test_new_subj_old_vid['var_features'] = test_new_subj_old_vid['data'].apply(lambda x: get_var_features(x))
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  test_new_vid_old_su

### Проведем бутстрап проверку моделей


In [23]:
from typing import Dict

In [20]:
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import LabelEncoder

scaler = StandardScaler()
le = LabelEncoder()

# Обучаем scaler на train (все данные объединены в один массив)
X_train = np.stack(train_df['var_features'].values, axis=0)
y_train = train_df['quadrant']
scaler.fit(X_train)
le.fit(y_train)

# Функция для нормализации фичей в DataFrame
def normalize_features(df, scaler, le):
    X = np.stack(df['var_features'].values, axis=0)
    X_scaled = scaler.transform(X)  # Применяем трансформацию
    y_digit = le.transform(df['quadrant'])
    df['var_features'] = list(X_scaled)  # Записываем обратно в DataFrame
    df['quadrant'] = y_digit
    return df

# Применяем scaler к каждому DataFrame
train_df_epochs = normalize_features(train_df, scaler, le)
test_df_epochs = normalize_features(test_df, scaler, le)
test_new_subj_old_vid_epochs = normalize_features(test_new_subj_old_vid, scaler, le)
test_new_vid_old_subj_epochs = normalize_features(test_new_vid_old_subj, scaler, le)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['var_features'] = list(X_scaled)  # Записываем обратно в DataFrame
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['quadrant'] = y_digit
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df['var_features'] = list(X_scaled)  # Записываем обратно в DataFrame
A value is trying to be set on a copy of

In [24]:
def bootstrap_metric(y_true, y_pred, metric_fn, samples_cnt=1000, random_state=42):
    np.random.seed(random_state)
    y_true = np.array(y_true)  # Ensure NumPy array
    y_pred = np.array(y_pred)  # Ensure NumPy array
    b_metric = np.zeros(samples_cnt)
    for i in range(samples_cnt):
        poses = np.random.choice(y_true.shape[0], size=y_true.shape[0], replace=True)

        y_true_boot = y_true[poses]
        y_pred_boot = y_pred[poses]
        m_val = metric_fn(y_true_boot, y_pred_boot)
        b_metric[i] = m_val

    return b_metric

In [25]:
def train_and_test_classifier(models: Dict, train_df_epochs, test_df_epochs,
                              test_new_subj_old_vid_epochs, test_new_vid_old_subj_epochs, verb=True) -> pd.DataFrame:
    """
    Функция берет models: dict, который содержит инициализированные модели, обучает их, 
    а после тестирует с помощью balanced_accuracy. Результат обучения заносится в pd.DataFrame.
    """
    # train
    x_train = np.stack(train_df_epochs['var_features'].values, axis=0)
    y_train = train_df_epochs['quadrant']

    # test
    x_test = np.stack(test_df_epochs['var_features'].values, axis=0)
    y_test = test_df_epochs['quadrant']

    # corner test
    x_test_new_subj_old_vid = np.stack(test_new_subj_old_vid_epochs['var_features'].values, axis=0)
    y_test_new_subj_old_vid = test_new_subj_old_vid_epochs['quadrant']

    x_test_new_vid_old_subj = np.stack(test_new_vid_old_subj_epochs['var_features'].values, axis=0)
    y_test_new_vid_old_subj = test_new_vid_old_subj_epochs['quadrant']

    results_list = []  # Список для хранения всех результатов

    for name, model in models.items():
        model.fit(x_train, y_train)

        # Обычное тестирование
        y_pred = model.predict(x_test)
        test_scores = bootstrap_metric(y_test, y_pred, metric_fn=balanced_accuracy_score)
        test_mean, test_ci_lower, test_ci_upper = np.mean(test_scores), np.percentile(test_scores, 2.5), np.percentile(test_scores, 97.5)

        # "Новые субъекты, старые видео"
        y_pred_new_subj_old_vid = model.predict(x_test_new_subj_old_vid)
        test_new_subj_old_vid_scores = bootstrap_metric(y_test_new_subj_old_vid, y_pred_new_subj_old_vid, 
                                                         metric_fn=balanced_accuracy_score)
        test_new_subj_old_vid_mean, test_new_subj_old_vid_ci_lower, test_new_subj_old_vid_ci_upper = (
            np.mean(test_new_subj_old_vid_scores),
            np.percentile(test_new_subj_old_vid_scores, 2.5),
            np.percentile(test_new_subj_old_vid_scores, 97.5),
        )

        # "Новые видео, старые субъекты"
        y_pred_new_vid_old_subj = model.predict(x_test_new_vid_old_subj)
        test_new_vid_old_subj_scores = bootstrap_metric(y_test_new_vid_old_subj, y_pred_new_vid_old_subj, 
                                                         metric_fn=balanced_accuracy_score)
        test_new_vid_old_subj_mean, test_new_vid_old_subj_ci_lower, test_new_vid_old_subj_ci_upper = (
            np.mean(test_new_vid_old_subj_scores),
            np.percentile(test_new_vid_old_subj_scores, 2.5),
            np.percentile(test_new_vid_old_subj_scores, 97.5),
        )

        results_list.append({
            "model": name,
            "test_mean": test_mean,
            "test_ci_lower": test_ci_lower,
            "test_ci_upper": test_ci_upper,
            "test_new_subj_old_vid_mean": test_new_subj_old_vid_mean,
            "test_new_subj_old_vid_ci_lower": test_new_subj_old_vid_ci_lower,
            "test_new_subj_old_vid_ci_upper": test_new_subj_old_vid_ci_upper,
            "test_new_vid_old_subj_mean": test_new_vid_old_subj_mean,
            "test_new_vid_old_subj_ci_lower": test_new_vid_old_subj_ci_lower,
            "test_new_vid_old_subj_ci_upper": test_new_vid_old_subj_ci_upper
        })

        if verb:
            print(f"Fitted {name} with mean balanced accuracy: {test_mean:.3f}")

    return pd.DataFrame(results_list)

In [28]:
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.tree import DecisionTreeClassifier
from xgboost import XGBClassifier
from sklearn.metrics import balanced_accuracy_score, accuracy_score

np.random.seed(42)

# Создаем словарь моделей классификации
models = {
    "Logistic Regression": LogisticRegression(random_state=42),
    "Random Forest": RandomForestClassifier(random_state=42),
    "Gradient Boosting": GradientBoostingClassifier(random_state=42),
    "Support Vector Machine": SVC(),
    "K-Nearest Neighbors": KNeighborsClassifier(),
    "Naive Bayes": GaussianNB(),
    "Decision Tree": DecisionTreeClassifier(random_state=42),
    "XGBoost": XGBClassifier(random_state=42,
                            learning_rate=0.1,
                            n_jobs=-1)
}

In [29]:
# результаты для var_features нормализованные без бейзлайна
df_result = train_and_test_classifier(models, train_df_epochs, test_df_epochs,
                              test_new_subj_old_vid_epochs, test_new_vid_old_subj_epochs)
df_result

STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(


Fitted Logistic Regression with mean balanced accuracy: 0.186




Fitted Random Forest with mean balanced accuracy: 0.284




Fitted Gradient Boosting with mean balanced accuracy: 0.203
Fitted Support Vector Machine with mean balanced accuracy: 0.238




Fitted K-Nearest Neighbors with mean balanced accuracy: 0.247




Fitted Naive Bayes with mean balanced accuracy: 0.337




Fitted Decision Tree with mean balanced accuracy: 0.317




Fitted XGBoost with mean balanced accuracy: 0.270


Unnamed: 0,model,test_mean,test_ci_lower,test_ci_upper,test_new_subj_old_vid_mean,test_new_subj_old_vid_ci_lower,test_new_subj_old_vid_ci_upper,test_new_vid_old_subj_mean,test_new_vid_old_subj_ci_lower,test_new_vid_old_subj_ci_upper
0,Logistic Regression,0.185562,0.095956,0.284376,0.288696,0.217867,0.356978,0.25431,0.196226,0.313804
1,Random Forest,0.283524,0.163038,0.41276,0.234588,0.184839,0.288498,0.239601,0.186381,0.296367
2,Gradient Boosting,0.202563,0.120802,0.292871,0.213975,0.163287,0.268828,0.240784,0.183451,0.301975
3,Support Vector Machine,0.237538,0.208333,0.25,0.253145,0.238462,0.271752,0.260072,0.24081,0.283584
4,K-Nearest Neighbors,0.247343,0.131129,0.369048,0.220666,0.170254,0.27918,0.240318,0.181554,0.302864
5,Naive Bayes,0.336594,0.204746,0.485814,0.241763,0.195005,0.289365,0.250749,0.190211,0.309006
6,Decision Tree,0.316755,0.175591,0.475069,0.280121,0.221747,0.346798,0.254533,0.192159,0.319317
7,XGBoost,0.270177,0.144214,0.412932,0.24744,0.194054,0.304745,0.267395,0.204169,0.334573
