In [16]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, MinMaxScaler, LabelEncoder
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization
from tensorflow.keras.optimizers import Adam
from sklearn.linear_model import LogisticRegression
import optuna
from sklearn.metrics import classification_report, accuracy_score, f1_score
import warnings
warnings.filterwarnings('ignore')
from collections import Counter
from imblearn.over_sampling import SMOTE

In [17]:
# выбор модели
keras_optuna = True
logistic_regression_optuna = False

In [18]:
# Define the function to read OMG data from a CSV file
def read_omg_csv(path_palm_data: str, 
                 n_omg_channels: int, 
                 n_acc_channels: int = 0, 
                 n_gyr_channels: int = 0, 
                 n_mag_channels: int = 0, 
                 n_enc_channels: int = 0,
                 button_ch: bool = True, 
                 sync_ch: bool = True, 
                 timestamp_ch: bool = True) -> pd.DataFrame:
    
    df_raw = pd.read_csv(path_palm_data, sep=' ', 
                         header=None, 
                         skipfooter=1, 
                         skiprows=1, 
                         engine='python')
    columns = np.arange(n_omg_channels).astype('str').tolist()
    
    for label, label_count in zip(['ACC', 'GYR', 'MAG', 'ENC'], 
                                  [n_acc_channels, n_gyr_channels, n_mag_channels, n_enc_channels]):
        columns = columns + ['{}{}'.format(label, i) for i in range(label_count)]
        
    if button_ch:
        columns = columns + ['BUTTON']
        
    if sync_ch:
        columns = columns + ['SYNC']
        
    if timestamp_ch:
        columns = columns + ['ts']
        
    df_raw.columns = columns
    
    return df_raw

In [19]:
def prepare_training_data(path_palm_data, path_protocol_data, path_meta_data, 
                          n_omg_channels=50, n_acc_channels=3, n_gyr_channels=3, 
                          n_mag_channels=0, n_enc_channels=6, 
                          standardize=True, normalize=True,
                          DO_REPLACE_TO_MOVING_AVERAGE=True, 
                          DO_CALCULATE_DERIVATIVE=True,
                          DO_SHIFT_GESTURE=True,
                          selected_channels='ALL'):
    """
    Подготовка данных для обучения и тестирования из файлов данных palm, protocol и meta.
    
    Аргументы:
    path_palm_data (str): Путь к файлу данных palm.
    path_protocol_data (str): Путь к файлу данных protocol.
    path_meta_data (str): Путь к файлу данных meta.
    n_omg_channels, n_acc_channels, и т.д. (int): Количество каналов сенсоров.
    standardize (bool): Если True, стандартизирует признаки.
    normalize (bool): Если True, нормализует признаки.
    DO_REPLACE_TO_MOVING_AVERAGE (bool): Если True, применяет скользящее среднее к данным OMG.
    DO_CALCULATE_DERIVATIVE (bool): Если True, вычисляет производные данных OMG.
    DO_SHIFT_GESTURE (bool): Если True, смещает целевой признак на максимальный скачок в данных.
    selected_channels (str): Выбор каналов данных ('OMG', 'ACC_GYR', 'ALL').
    
    Возвращает:
    tuple: Кортеж, содержащий данные для обучения и тестирования.
    """
    # Чтение данных OMG
    omg_data = read_omg_csv(path_palm_data, n_omg_channels, n_acc_channels, n_gyr_channels, 
                            n_mag_channels, n_enc_channels)
    
    # Чтение данных протокола и кодирование жестов
    gestures_protocol = pd.read_csv(path_protocol_data)
    le = LabelEncoder()
    gestures_protocol['gesture'] = le.fit_transform(
        gestures_protocol[[
            "Thumb", "Index", "Middle", "Ring", "Pinky",
            'Thumb_stretch', 'Index_stretch', 'Middle_stretch', 'Ring_stretch', 'Pinky_stretch'
        ]].apply(lambda row: str(tuple(row)), axis=1)
    )
    
    # Чтение метаинформации
    df_meta = pd.read_csv(path_meta_data)
    palm_file = path_palm_data.split('/')[-1]
    last_train_idx = df_meta[df_meta['montage'] == palm_file].to_dict(orient='records')[0]['last_train_idx']
    
    # Синхронизация меток жестов с данными OMG, используя канал SYNC
    y_cmd = np.array([gestures_protocol['gesture'].loc[s] for s in omg_data['SYNC'].values])
    
    # Подготовка названий признаков для данных OMG
    OMG_CH = [str(i) for i in range(n_omg_channels)]
    ACC_CH = ['ACC0', 'ACC1', 'ACC2']
    GYR_CH = ['GYR0', 'GYR1', 'GYR2']
    ALL_CH = OMG_CH + ACC_CH + GYR_CH

    # Выбор каналов в соответствии с параметром selected_channels
    if selected_channels == 'OMG':
        selected_features = OMG_CH
    elif selected_channels == 'ACC_GYR':
        selected_features = ACC_CH + GYR_CH
    else:
        selected_features = ALL_CH
    
    if DO_REPLACE_TO_MOVING_AVERAGE:
        # Замена на скользящее среднее
        for col in selected_features:
            omg_data[col] = omg_data[col].rolling(window=5).mean().bfill()
    
    if DO_CALCULATE_DERIVATIVE:
        # Вычисление производных данных
        OMG_DERIV = [f'{col}_deriv' for col in OMG_CH]
        for col in OMG_CH:
            omg_data[f'{col}_next'] = omg_data[col].shift(-1).ffill()
            omg_data[f'{col}_deriv'] = omg_data[f'{col}_next'] - omg_data[col]
        selected_features += OMG_DERIV

    if DO_SHIFT_GESTURE:
        # Смещение целевого признака
        id_max = 0
        cur_gesture = 0
        for i in range(y_cmd.shape[0]):
            if i < id_max:  # Пропускаем все значения до id_max
                continue
            prev_gesture = cur_gesture  # предыдущий жест
            cur_gesture = y_cmd[i]  # текущий жест
            if cur_gesture != prev_gesture:  # Если сменился жест
                id_max = omg_data[OMG_DERIV][i:i+35].abs().sum(axis=1).idxmax()  # Нахождение максимального скачка
                y_cmd[i:id_max] = prev_gesture  # Замена всех значений до id_max на предыдущий жест
    
    # Разделение данных на обучающие и тестовые наборы
    X_train = omg_data[selected_features].iloc[:last_train_idx].values
    y_train = y_cmd[:last_train_idx]
    X_test = omg_data[selected_features].iloc[last_train_idx:].values
    y_test = y_cmd[last_train_idx:]
    
    # Стандартизация и нормализация
    if standardize:
        scaler = StandardScaler()
        X_train = scaler.fit_transform(X_train)
        X_test = scaler.transform(X_test)
        
    if normalize:
        scaler = MinMaxScaler()
        X_train = scaler.fit_transform(X_train)
        X_test = scaler.transform(X_test)
    
    return (X_train, y_train), (X_test, y_test)

path_palm_data = 'data/2023-05-31_17-14-41.palm'
path_protocol_data = 'data/2023-05-31_17-14-41.palm.protocol.csv'
path_meta_data = 'data/meta_information.csv'

(X_train, y_train), (X_test, y_test) = prepare_training_data(path_palm_data, path_protocol_data, path_meta_data, standardize=True, normalize=False)
X_train.shape, y_train.shape, X_test.shape, y_test.shape

((15679, 106), (15679,), (3889, 106), (3889,))

In [20]:
# Посчитаем, как распределены классы
class_counts_train = Counter(y_train)
class_counts_test = Counter(y_test)

class_counts_train, class_counts_test

(Counter({0: 10677, 5: 1052, 3: 1032, 4: 1020, 1: 952, 2: 946}),
 Counter({0: 2608, 2: 279, 5: 258, 3: 252, 1: 246, 4: 246}))

Как видно, классы не сбалансированы: класс 0 значительно превосходит по количеству остальные классы в обоих наборах данных. 

In [21]:
if keras_optuna:
    def build_and_train_model(X_train, y_train, X_test, y_test, trial):
        # Параметры для SMOTE
        k_neighbors = trial.suggest_int('k_neighbors', 2, 10)
        sampling_strategy = trial.suggest_categorical('sampling_strategy', ['auto', 'minority', 'not majority', 'all'])
    
        # Применяем SMOTE
        smote = SMOTE(k_neighbors=k_neighbors, sampling_strategy=sampling_strategy, n_jobs=-1)
        X_resampled, y_resampled = smote.fit_resample(X_train, y_train)
    
        # Параметры модели
        learning_rate = trial.suggest_loguniform('learning_rate', 1e-5, 1e-1)
        dropout_rate = trial.suggest_uniform('dropout_rate', 0.0, 0.7)
        batch_size = trial.suggest_categorical('batch_size', [32, 64, 128, 256])
        epochs = 100

        num_classes = len(np.unique(y_resampled))
    
    # Создание модели
        model = Sequential([
            Dense(128, activation='relu', input_shape=(X_resampled.shape[1],)),
            BatchNormalization(),
            Dropout(dropout_rate),
            Dense(128, activation='relu'),
            BatchNormalization(),
            Dropout(dropout_rate),
            Dense(num_classes, activation='softmax')
        ])
    
        # Компиляция модели
        model.compile(optimizer=Adam(learning_rate=learning_rate), loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    
        # Обучение модели
        history = model.fit(X_resampled, y_resampled, epochs=epochs, batch_size=batch_size, validation_split=0.2, verbose=0)
    
        # Оценка модели
        test_loss, test_accuracy = model.evaluate(X_test, y_test, verbose=0)
        return test_accuracy

    def objective(trial):
        return build_and_train_model(X_train, y_train, X_test, y_test, trial)

    study = optuna.create_study(direction='maximize')
    study.optimize(objective, n_trials=50, n_jobs=-1)

    print("Лучшие параметры:")
    print(study.best_trial)

[I 2024-05-08 10:27:27,670] A new study created in memory with name: no-name-89c57789-12d9-4b2f-b2e0-f04461e8aaaf
[I 2024-05-08 10:28:59,166] Trial 15 finished with value: 0.9485728740692139 and parameters: {'k_neighbors': 10, 'sampling_strategy': 'minority', 'learning_rate': 0.0005568173112544624, 'dropout_rate': 0.19979605768010528, 'batch_size': 256}. Best is trial 15 with value: 0.9485728740692139.
[I 2024-05-08 10:29:52,975] Trial 2 finished with value: 0.9547441601753235 and parameters: {'k_neighbors': 2, 'sampling_strategy': 'minority', 'learning_rate': 0.06333489009732164, 'dropout_rate': 0.5921049173591335, 'batch_size': 128}. Best is trial 2 with value: 0.9547441601753235.
[I 2024-05-08 10:29:53,770] Trial 13 finished with value: 0.9454872608184814 and parameters: {'k_neighbors': 6, 'sampling_strategy': 'minority', 'learning_rate': 0.0066654008196855965, 'dropout_rate': 0.48003358676714386, 'batch_size': 128}. Best is trial 2 with value: 0.9547441601753235.
[I 2024-05-08 10:3

Лучшие параметры:
FrozenTrial(number=36, state=1, values=[0.9570583701133728], datetime_start=datetime.datetime(2024, 5, 8, 10, 40, 29, 64403), datetime_complete=datetime.datetime(2024, 5, 8, 11, 3, 1, 886744), params={'k_neighbors': 2, 'sampling_strategy': 'all', 'learning_rate': 0.00010322646860398609, 'dropout_rate': 0.2609382632562011, 'batch_size': 32}, user_attrs={}, system_attrs={}, intermediate_values={}, distributions={'k_neighbors': IntDistribution(high=10, log=False, low=2, step=1), 'sampling_strategy': CategoricalDistribution(choices=('auto', 'minority', 'not majority', 'all')), 'learning_rate': FloatDistribution(high=0.1, log=True, low=1e-05, step=None), 'dropout_rate': FloatDistribution(high=0.7, log=False, low=0.0, step=None), 'batch_size': CategoricalDistribution(choices=(32, 64, 128, 256))}, trial_id=36, value=None)


In [22]:
if keras_optuna:
    best_params = study.best_trial.params
    print("Лучшие параметры:", best_params)

    smote = SMOTE(k_neighbors=best_params['k_neighbors'], sampling_strategy=best_params['sampling_strategy'])
    X_train_smote, y_train_smote = smote.fit_resample(X_train, y_train)

    def build_final_model(X_train, y_train):
        model = Sequential([
            Dense(128, activation='relu', input_shape=(X_train.shape[1],)),
            BatchNormalization(),
            Dropout(best_params['dropout_rate']),
            Dense(128, activation='relu'),
            BatchNormalization(),
            Dropout(best_params['dropout_rate']),
            Dense(len(np.unique(y_train)), activation='softmax')
        ])

        model.compile(optimizer=Adam(learning_rate=best_params['learning_rate']),
                    loss='sparse_categorical_crossentropy',
                    metrics=['accuracy'])

        model.fit(X_train, y_train, epochs=100, batch_size=best_params['batch_size'], validation_split=0.2, verbose=1)
        return model

    final_model = build_final_model(X_train_smote, y_train_smote)

    # Предсказания
    y_pred = final_model.predict(X_test)
    y_pred_classes = np.argmax(y_pred, axis=1)

    # Отчёт о классификации
    report = classification_report(y_test, y_pred_classes)
    print(report)

Лучшие параметры: {'k_neighbors': 2, 'sampling_strategy': 'all', 'learning_rate': 0.00010322646860398609, 'dropout_rate': 0.2609382632562011, 'batch_size': 32}
Epoch 1/100
[1m1602/1602[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 960us/step - accuracy: 0.4399 - loss: 1.6274 - val_accuracy: 0.9336 - val_loss: 0.2354
Epoch 2/100
[1m1602/1602[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 879us/step - accuracy: 0.8130 - loss: 0.5166 - val_accuracy: 0.9758 - val_loss: 0.0917
Epoch 3/100
[1m1602/1602[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 941us/step - accuracy: 0.8970 - loss: 0.2978 - val_accuracy: 0.9769 - val_loss: 0.0679
Epoch 4/100
[1m1602/1602[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 902us/step - accuracy: 0.9348 - loss: 0.1993 - val_accuracy: 0.9858 - val_loss: 0.0405
Epoch 5/100
[1m1602/1602[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 1ms/step - accuracy: 0.9521 - loss: 0.1435 - val_accuracy: 0.9899 - val_loss: 0.0278
Epoch 6

In [23]:
if logistic_regression_optuna:
    def objective(trial):
    
        # Параметры для SMOTE
        smote_k_neighbors = trial.suggest_int('smote_k_neighbors', 2, 15)
        smote_sampling_strategy = trial.suggest_categorical('smote_sampling_strategy', ['auto', 'minority', 'not majority', 'all'])

        # Параметры для логистической регрессии
        C = trial.suggest_loguniform('C', 1e-5, 10)
        max_iter = trial.suggest_int('max_iter', 1000, 10000)
        penalty = trial.suggest_categorical('penalty', ['l1', 'l2', 'elasticnet'])
        
        # Установка совместимого решателя в зависимости от выбранной регуляризации
        if penalty == 'l1':
            solver = 'liblinear' # liblinear поддерживает только l1 и l2
        elif penalty == 'l2':
            solver = trial.suggest_categorical('solver', ['newton-cg', 'lbfgs', 'liblinear', 'sag', 'saga'])
        elif penalty == 'elasticnet':
            solver = 'saga'  # saga - единственный, который поддерживает elasticnet

        # Применение SMOTE
        smote = SMOTE(k_neighbors=smote_k_neighbors, sampling_strategy=smote_sampling_strategy)
        X_train_resampled, y_train_resampled = smote.fit_resample(X_train, y_train)

        # Обучение модели логистической регрессии
        model = LogisticRegression(C=C, max_iter=max_iter, penalty=penalty, solver=solver, l1_ratio=0.5 if penalty == 'elasticnet' else None)
        model.fit(X_train_resampled, y_train_resampled)

        # Оценка модели
        score = f1_score(
            y_test, 
            model.predict(X_test), 
            average = 'micro'
        )
        return score

    # Создание исследования
    study = optuna.create_study(direction='maximize')
    study.optimize(objective, n_trials=100, n_jobs=-1)

    print("Лучшие параметры:", study.best_trial.params)

In [24]:
if logistic_regression_optuna:
    # Извлечение лучших параметров
    best_params = study.best_trial.params
    print("Лучшие параметры:", best_params)

    # Применение SMOTE с лучшими параметрами
    smote = SMOTE(k_neighbors=best_params['smote_k_neighbors'], sampling_strategy=best_params['smote_sampling_strategy'])
    X_train_resampled, y_train_resampled = smote.fit_resample(X_train, y_train)

    # Выбор решателя в зависимости от типа регуляризации
    if best_params['penalty'] == 'elasticnet':
        solver = 'saga'  # saga - единственный решатель, поддерживающий elasticnet
    elif best_params['penalty'] == 'l1':
        solver = 'liblinear'  # liblinear - оптимальный выбор для l1 регуляризации
    elif best_params['penalty'] == 'l2':
        solver = best_params['solver'] 
    else:  # 'none'
        solver = 'lbfgs'  # lbfgs хорошо подходит для отсутствия регуляризации

    # Построение и обучение модели логистической регрессии
    model = LogisticRegression(
        C=best_params['C'],
        max_iter=best_params['max_iter'],
        penalty=best_params['penalty'],
        solver=solver,
        l1_ratio=0.5 if best_params['penalty'] == 'elasticnet' else None,
        multi_class='auto',
        class_weight={0: 1, 1: 1, 2: 1, 3: 3, 4: 1, 5: 1}
    )

    model.fit(X_train_resampled, y_train_resampled)

    #Делаем предсказание класса
    y_pred = model.predict(X_test)

    print(f'Метрики на валидационной выборке \n\
    {classification_report(y_test, y_pred)}')