In [None]:
import os
import pandas as pd
import numpy as np
from tensorflow.keras.models import Sequential, load_model, Model
from tensorflow.keras.layers import LSTM, Dense, LayerNormalization, Dropout, Input
from sklearn.preprocessing import OneHotEncoder, MinMaxScaler
from sklearn.metrics import classification_report, accuracy_score
from sklearn.model_selection import KFold
from motorica.utils import *

In [None]:
def load_montage_data(montage_name, subdir='marked/'):
    data_train = pd.read_csv(subdir + montage_name + ".train", index_col=0)
    data_test = pd.read_csv(subdir + montage_name + ".test", index_col=0)
    data_full = pd.read_csv(subdir + montage_name + ".marked", index_col=None)
    return data_train, data_test, data_full


In [None]:
def preprocess_data(montage_name, features, subdir='marked/'):
    data_train, data_test, _ = load_montage_data(montage_name, subdir)

    X_train = data_train.drop('act_label_ext', axis=1)[features]
    y_train = data_train['act_label_ext']
    X_test = data_test.drop('act_label_ext', axis=1)[features]
    y_test = data_test['act_label_ext']

    scaler = MinMaxScaler()
    X_train_scaled = pd.DataFrame(
        scaler.fit_transform(X_train),
        columns=X_train.columns
    )
    X_test_scaled = pd.DataFrame(
        scaler.transform(X_test),
        columns=X_test.columns
    )

    return X_train_scaled, X_test_scaled, y_train, y_test


In [None]:
def create_sequences(data, labels, timesteps=2):
    X, y = [], []
    for i in range(len(data) - timesteps + 1):
        X.append(data[i:i + timesteps])
        y.append(labels[i + timesteps - 1])
    return np.array(X), np.array(y)


In [None]:
def prepare_sequences(X_train, X_test, y_train, y_test, timesteps=2):
    X_train_array = X_train.values
    y_train_array = y_train.values
    X_test_array = X_test.values
    y_test_array = y_test.values

    X_train_seq, y_train_seq = create_sequences(X_train_array, y_train_array, timesteps)
    X_test_seq, y_test_seq = create_sequences(X_test_array, y_test_array, timesteps)

    encoder = OneHotEncoder(sparse_output=False)
    y_train_encoded = encoder.fit_transform(y_train_seq.reshape(-1, 1))
    y_test_encoded = encoder.transform(y_test_seq.reshape(-1, 1))

    return X_train_seq, X_test_seq, y_train_encoded, y_test_encoded, encoder


In [None]:
from sklearn.model_selection import KFold

def build_and_train_model(X_train_seq, y_train_encoded, X_test_seq, y_test_encoded, timesteps, input_shape, output_shape):
    kf = KFold(n_splits=5, shuffle=True, random_state=42)
    accuracies = []

    for train_index, val_index in kf.split(X_train_seq):
        X_train_fold, X_val_fold = X_train_seq[train_index], X_train_seq[val_index]
        y_train_fold, y_val_fold = y_train_encoded[train_index], y_train_encoded[val_index]

        model = Sequential([
            LSTM(64, input_shape=(timesteps, input_shape), return_sequences=True),
            LayerNormalization(),
            Dropout(0.2),
            LSTM(64, return_sequences=True),
            LayerNormalization(),
            Dropout(0.2),
            LSTM(64, return_sequences=False),
            LayerNormalization(),
            Dense(32, activation='relu'),
            Dense(output_shape, activation='softmax')
        ])

        model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

        model.fit(
            X_train_fold, y_train_fold,
            validation_data=(X_val_fold, y_val_fold),
            epochs=20, batch_size=32, verbose=1
        )

        val_loss, val_accuracy = model.evaluate(X_val_fold, y_val_fold, verbose=0)
        accuracies.append(val_accuracy)

    print(f"Average validation accuracy: {np.mean(accuracies)}")

    # Обучение модели на всех данных
    model.fit(
        X_train_seq, y_train_encoded,
        validation_data=(X_test_seq, y_test_encoded),
        epochs=20, batch_size=32, verbose=1
    )

    return model


In [None]:
def predict(model, X_test_seq, encoder):
    y_pred_encoded = model.predict(X_test_seq)
    y_pred = np.argmax(y_pred_encoded, axis=1)
    y_test_actual = np.argmax(encoder.transform(X_test_seq.reshape(-1, 1)), axis=1)

    print(classification_report(y_test_actual, y_pred, zero_division=0))

    return accuracy_score(y_test_actual, y_pred)


In [None]:
def fine_tune_model(model_path, new_montage_name, features, subdir='marked/', timesteps=2):
    # Загрузка предварительно обученной модели
    model = load_model(model_path)

    # Загрузка и предобработка новых данных
    new_X_train_scaled, new_X_test_scaled, new_y_train, new_y_test = preprocess_data(new_montage_name, features, subdir)

    # Создание последовательностей и кодирование меток
    new_X_train_seq, new_X_test_seq, new_y_train_encoded, new_y_test_encoded, encoder = prepare_sequences(
        new_X_train_scaled, new_X_test_scaled, new_y_train, new_y_test, timesteps
    )

    # Изменение последнего слоя модели
    input_layer = Input(shape=(timesteps, new_X_train_seq.shape[2]))
    x = model.layers[0](input_layer)
    for layer in model.layers[1:-2]:
        x = layer(x)
    x = Dense(new_y_train_encoded.shape[1], activation='softmax')(x)

    fine_tuned_model = Model(inputs=input_layer, outputs=x)

    fine_tuned_model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

    # Обучение модели на новых данных
    fine_tuned_model.fit(
        new_X_train_seq, new_y_train_encoded,
        validation_data=(new_X_test_seq, new_y_test_encoded),
        epochs=20, batch_size=32, verbose=1
    )

    # Сохранение дообученной модели
    fine_tuned_model.save(model_path)

    return fine_tuned_model


In [None]:
def get_pilot_montages(meta_info, pilot_id=2):
    pilot_montages = meta_info[meta_info['pilote_id'] == pilot_id].index
    return pilot_montages

In [None]:
# Путь к папке с новыми данными и файлу с обработанными файлами
working_directory = 'working/files/'
processed_files_path = 'working/processed_files.txt'

# Путь к предобученной модели
model_path = 'motorica/model/lstm_model.h5'

# Загрузка метаданных
METAINFO_PATH = 'marked/selected_montages.csv'
meta_info = read_meta_info(METAINFO_PATH)

# Пример использования функции для получения всех монтажей пилота по индексу
pilot_id = int('Введите id пилота: ')
pilot_montages = get_pilot_montages(meta_info, pilot_id)

# Инициализация списков для хранения данных
X_train_list, X_test_list, y_train_list, y_test_list = [], [], [], []

# Загрузка и предобработка данных для каждого монтажа
for montage in pilot_montages:
    montage_info = meta_info.loc[montage]
    features = [
            '0','1','2','3','4','5','6','7','8','9','10','11','12','13','14','15','16','17','18','19','20','21','22','23','24','25',
            '26','27','28','29','30','31','32','33','34','35','36','37','38','39','40','41','42','43','44','45','46','47','48','49',
            'ACC0','ACC1','ACC2'        # + pronation_0, pronation_1, pronation_2
            ]

    X_train_scaled, X_test_scaled, y_train, y_test = preprocess_data(montage, features)

    X_train_list.append(X_train_scaled)
    X_test_list.append(X_test_scaled)
    y_train_list.append(y_train)
    y_test_list.append(y_test)

# Объединение данных
X_train_combined = pd.concat(X_train_list, axis=0).reset_index(drop=True)
X_test_combined = pd.concat(X_test_list, axis=0).reset_index(drop=True)
y_train_combined = pd.concat(y_train_list, axis=0).reset_index(drop=True)
y_test_combined = pd.concat(y_test_list, axis=0).reset_index(drop=True)

# Оптимальное значение timesteps
best_timesteps = 2

# Создание последовательностей и кодирование меток
X_train_seq, X_test_seq, y_train_encoded, y_test_encoded, encoder = prepare_sequences(
    X_train_combined, X_test_combined, y_train_combined, y_test_combined, best_timesteps
)

# Построение и обучение модели с использованием KFold
model = build_and_train_model(
    X_train_seq, y_train_encoded, X_test_seq, y_test_encoded, best_timesteps, X_train_seq.shape[2], y_train_encoded.shape[1]
)

# Сохранение модели
model.save(model_path)

```py
# Пример использования функции для дообучения модели на новых данных
new_montage_name = "2023-04-18_19-08-47 gestures train.palm"
fine_tuned_model = fine_tune_model(model_path, new_montage_name, features)

# Пример использования функции для предсказания на новых данных
new_X_train_scaled, new_X_test_scaled, new_y_train, new_y_test = preprocess_data(new_montage_name, features)
new_X_train_seq, new_X_test_seq, new_y_train_encoded, new_y_test_encoded, new_encoder = prepare_sequences(
    new_X_train_scaled, new_X_test_scaled, new_y_train, new_y_test, best_timesteps
)
predict(fine_tuned_model, new_X_test_seq, new_encoder)
```