In [None]:
%load_ext autoreload
%autoreload 2

import csv
import numpy as np
import iisignature
import copy
import torch
import torch.nn as nn
from sklearn.metrics import accuracy_score, roc_auc_score
from torch.utils.data import Dataset, TensorDataset, DataLoader
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, StratifiedKFold
import torch.optim as optim
import torch.nn.functional as F
import pandas as pd
import os
from sktime.datasets import load_from_tsfile_to_dataframe
from sklearn.preprocessing import LabelEncoder

In [None]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(device)

In [None]:
class NN(nn.Module):
    def __init__(self, input_dim, hidden_layers, dropout_rate, num_classes):
        """
        input_dim: int — размер входного вектора
        hidden_layers: list[int] — размеры скрытых слоёв, например [128, 64]
        dropout_rate: float — dropout rate
        num_classes: int — количество классов (>= 2)
        """
        super(NN, self).__init__()

        layers = []
        in_features = input_dim

        for hidden_dim in hidden_layers:
            linear = nn.Linear(in_features, hidden_dim)
            nn.init.kaiming_normal_(linear.weight, nonlinearity='leaky_relu', a=0.01)
            layers.append(linear)
            layers.append(nn.BatchNorm1d(hidden_dim))
            layers.append(nn.LeakyReLU(negative_slope=0.01))
            layers.append(nn.Dropout(p=dropout_rate))
            in_features = hidden_dim

        self.hidden = nn.Sequential(*layers)
        self.output = nn.Linear(in_features, num_classes)
        nn.init.xavier_normal_(self.output.weight, gain=nn.init.calculate_gain('linear'))

    def forward(self, x):
        x = self.hidden(x)
        x = self.output(x)
        return x

In [None]:
class EarlyStopper:
    def __init__(self, patience: int, min_delta: float):
        """
        patience — сколько эпох ждать перед остановкой
        min_delta — минимальное относительное улучшение (например, 0.01 = 1%)
        """
        self.patience = patience
        self.min_delta = min_delta
        self.history = []

    def step(self, accuracy):
        self.history.append(accuracy)
        
        if len(self.history) <= self.patience:
            return False 

        recent = self.history[-self.patience-1:]
        base_acc = recent[0]
        improved = False

        for acc in recent[1:]:
            acc_gain = (acc - base_acc) / max(base_acc, 1e-6)

            if acc_gain >= self.min_delta:
                improved = True
                break

        return not improved

def grid_search_kfold_and_finetune(X, y, model_params_base, train_params, hidden_layer_grid, sig_len_grid):
    '''
    Тренировка и k-fold валидация по гиперпараметрам архитектуры NN и длины сигнатуры
    С дальнейщим дообучением лучшей модели на всём датасете
    '''
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    folds = train_params["folds"]
    num_epochs = train_params["num_epochs"]
    lr = train_params["lr"]
    batch_size = train_params["batch_size"]
    weight_decay = train_params["weight_decay"]
    patience = train_params["patience"]
    min_delta = train_params["min_delta"]
    num_classes = train_params["num_classes"]
    finetune_epochs = train_params["finetune_epochs"]

    best_mean_acc = 0
    best_params = None
    best_model_state = None
    best_scaler = None

    for sig_len in sig_len_grid:
        X_subset = X[:, :sig_len]

        for hidden_layers in hidden_layer_grid:
            acc_list = []
            skf = StratifiedKFold(n_splits=folds, shuffle=True, random_state=42)

            for fold, (train_idx, val_idx) in enumerate(skf.split(X_subset, y)):
                X_train_fold, X_val_fold = X_subset[train_idx], X_subset[val_idx]
                y_train_fold, y_val_fold = y[train_idx], y[val_idx]

                scaler = StandardScaler()
                X_train_scaled = scaler.fit_transform(X_train_fold)
                X_val_scaled = scaler.transform(X_val_fold)

                X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32).to(device)
                y_train_tensor = torch.tensor(y_train_fold, dtype=torch.long).to(device)
                X_val_tensor = torch.tensor(X_val_scaled, dtype=torch.float32).to(device)
                y_val_tensor = torch.tensor(y_val_fold, dtype=torch.long).to(device)

                train_loader = DataLoader(TensorDataset(X_train_tensor, y_train_tensor), batch_size=batch_size, shuffle=True, drop_last=True)
                val_loader = DataLoader(TensorDataset(X_val_tensor, y_val_tensor), batch_size=batch_size, shuffle=False)

                model = NN(sig_len, hidden_layers, model_params_base["dropout_rate"], num_classes).to(device)
                criterion = nn.CrossEntropyLoss()
                optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)
                scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=max(1, num_epochs // 20), gamma=0.8)
                early_stopper = EarlyStopper(patience, min_delta)

                best_fold_acc = 0
                best_state = None
                for epoch in range(num_epochs):
                    model.train()
                    for batch_X, batch_y in train_loader:
                        outputs = model(batch_X)
                        loss = criterion(outputs, batch_y)
                        optimizer.zero_grad()
                        loss.backward()
                        optimizer.step()

                    scheduler.step()

                    model.eval()
                    all_outputs = []
                    all_labels = []
                    with torch.no_grad():
                        for batch_X, batch_y in val_loader:
                            outputs = model(batch_X)
                            preds = outputs.argmax(dim=1)
                            all_outputs.extend(preds.cpu().numpy())
                            all_labels.extend(batch_y.cpu().numpy())

                    acc = accuracy_score(all_labels, all_outputs)

                    if acc > best_fold_acc:
                        best_fold_acc = acc
                        best_state = copy.deepcopy(model.state_dict())

                    if early_stopper.step(acc):
                        break

                acc_list.append(best_fold_acc)

            mean_acc = np.mean(acc_list)
            print(f"sig_len={sig_len}, hidden_layers={hidden_layers} → mean_acc={mean_acc:.4f}")

            if mean_acc > best_mean_acc:
                best_mean_acc = mean_acc
                best_params = {
                    "sig_len": sig_len,
                    "hidden_layers": hidden_layers
                }
                best_model_state = best_state
                best_scaler = copy.deepcopy(scaler)
    print("\nFine-tuning best model on full dataset...")

    # Дообучение на полном датасете
    X_full_subset = X[:, :best_params["sig_len"]]
    X_full_scaled = best_scaler.transform(X_full_subset)
    X_full_tensor = torch.tensor(X_full_scaled, dtype=torch.float32).to(device)
    y_tensor = torch.tensor(y, dtype=torch.long).to(device)

    full_loader = DataLoader(TensorDataset(X_full_tensor, y_tensor), batch_size=batch_size, shuffle=True, drop_last=True)

    # Модель с лучшими параметрами и загружаем веса
    final_model = NN(best_params["sig_len"], best_params["hidden_layers"], model_params_base["dropout_rate"], num_classes).to(device)
    final_model.load_state_dict(best_model_state)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(final_model.parameters(), lr=lr, weight_decay=weight_decay)

    final_model.train()
    for epoch in range(finetune_epochs):
        for batch_X, batch_y in full_loader:
            outputs = final_model(batch_X)
            loss = criterion(outputs, batch_y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

    return best_params, best_mean_acc, final_model, best_scaler

In [None]:
def evaluate_with_bootstrap(model, X_test, y_test, n):
    """
    Оценка модели на n бустрап. выборках
    """
    model.eval()
    accuracies = []

    device = next(model.parameters()).device
    
    X_test = torch.tensor(X_test, dtype=torch.float32).to(device)
    y_test = torch.tensor(y_test, dtype=torch.long).to(device)

    outputs = model(X_test)  # логиты
    preds = torch.argmax(outputs, dim=1)

    acc_on_original_test = accuracy_score(y_test.cpu(), preds.cpu())
    
    N = len(X_test)
    
    with torch.no_grad():
        for i in range(n):
            indices = np.random.choice(N, N, replace=True)
            X_sample = X_test[indices]
            y_sample = y_test[indices]

            outputs = model(X_sample)
            preds = torch.argmax(outputs, dim=1)

            acc = accuracy_score(y_sample.cpu(), preds.cpu())
            accuracies.append(acc)
    
    mean_acc = np.mean(accuracies)
    var_acc = np.var(accuracies, ddof=1)

    return round(mean_acc * 100, 1), round(var_acc*100, 1), round(acc_on_original_test*100,1)

In [None]:
def TEST_TS_LENGTH_EQUALITY(X: pd.DataFrame) -> bool:
    lengths = X.map(len)
    all_equal = (lengths == lengths.iloc[0,0]).all().all()
    return all_equal

In [None]:
def sktime_df_to_numpy(X):
    """
    Преобразует DataFrame из sktime с Series внутри в numpy-массив (samples, timesteps, features)
    """
    n_samples, n_features = X.shape
    n_timestamps = len(X.iloc[0, 0])

    data = np.zeros((n_samples, n_timestamps, n_features), dtype=np.float32)

    for i in range(n_samples):
        for j in range(n_features):
            data[i, :, j] = X.iat[i, j].values

    return data

def sktime_df_to_variable_length_list(X):
    """
    Преобразует DataFrame из sktime с Series внутри в список numpy-массивов переменной длины.
    Возвращает: list из массивов формы (seq_len_i, n_features)
    """
    n_samples, n_features = X.shape
    output = []

    for i in range(n_samples):
        series_list = []
        for j in range(n_features):
            series = X.iat[i, j]
            series_array = series.to_numpy(dtype=np.float32)
            series_list.append(series_array)

        sample_array = np.stack(series_list, axis=-1)  # (seq_len, n_features)
        output.append(sample_array)

    return output

In [None]:
def AddTimeline(X):
    '''
    X: samples, steps, features
    '''
    samples, steps, features = X.shape
    timeline = np.linspace(0, 1, steps)  # shape: (steps,)
    timeline = np.tile(timeline, (samples, 1))  # shape: (samples, steps)
    timeline = timeline[:, :, np.newaxis]  # shape: (samples, steps, 1)

    X_new = np.concatenate((timeline, X), axis=2)  # вставка timeline как первой фичи
    return X_new

def AddTimeline_variable(X_list):
    """
        X_list: list of np.ndarray форма (seq_len_i, n_features)

    Return:
        list of np.ndarray форма (seq_len_i, n_features + 1)
    """
    X_new_list = []

    for x in X_list:
        seq_len = x.shape[0]
        timeline = np.linspace(0, 1, seq_len).reshape(-1, 1)  # (seq_len, 1)
        x_with_time = np.concatenate((timeline, x), axis=1)  # (seq_len, n_features + 1)
        X_new_list.append(x_with_time)

    return X_new_list

In [None]:
def sig_data(X, sig_level) :
    (n_samples, n_steps, n_features) = X.shape
    if (n_features == 1) :
        print('Warning: only 1 feature detected, adding timeline might be needed')
    sig_length = iisignature.siglength(n_features, sig_level)
    Y = np.zeros((n_samples, sig_length), dtype=np.float32)
    for i in range(n_samples):
        Y[i] = iisignature.sig(X[i, :, :], sig_level)
    return Y

def sig_variable_length_data(X, sig_level) :
    n_features = X[0].shape[1]
    n_samples = len(X)
    if (n_features == 1) :
        print('Warning: only 1 feature detected, adding timeline might be needed')
    sig_length = iisignature.siglength(n_features, sig_level)
    Y = np.zeros((n_samples, sig_length), dtype=np.float32)
    for i in range(n_samples):
        Y[i] = iisignature.sig(X[i], sig_level)
    return Y

In [None]:
def encode_labels(y_train : pd.Series, y_test : pd.Series):
    le = LabelEncoder()
    le.fit(y_train)

    # Проверка на неизвестные классы в y_test
    unknown = set(y_test) - set(le.classes_)
    if unknown:
        raise ValueError(f"Неизвестные классы в y_test: {unknown}")

    y_train_encoded = le.transform(y_train)
    y_test_encoded = le.transform(y_test)
    return np.array(y_train_encoded, dtype=int), np.array(y_test_encoded, dtype=int), le.classes_ #список уник. классов

In [None]:
def write_results_to_csv(filename, dataset_name, acc_on_original_test, acc, var, best_params):
    '''
    Запись результатов в csv с перезатиранием результатов на датасете, если он уже есть в файле
    '''
    acc_var_str = f"{acc:.1f} ± {var:.1f}"

    params_items = []
    for k, v in best_params.items():
        if isinstance(v, (list, tuple)):
            v = '-'.join(str(x) for x in v)
        params_items.append(str(v))

    headers = ["Dataset", "Accuracy on orig test", "Mean Accuracy ± Variance"] + list(best_params.keys())
    new_row = [dataset_name, f"{acc_on_original_test:.1f}", acc_var_str] + params_items

    file_exists = os.path.exists(filename)
    rows = []

    if file_exists:
        with open(filename, newline='', encoding='utf-8') as f:
            reader = csv.reader(f)
            rows = list(reader)

    if not rows:
        rows.append(headers)

    # ПЕРЕЗАПИСЫВАЕМ СТРОКУ ЕСЛИ ДАТАСЕТ УЖЕ ЕСТЬ В ФАЙЛЕ
    dataset_found = False
    for i, row in enumerate(rows[1:], start=1):
        if row[0] == dataset_name:
            rows[i] = new_row
            dataset_found = True
            break

    if not dataset_found:
        rows.append(new_row)

    
    with open(filename, 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerows(rows)

In [None]:
PATH_TO_DATASETS_FOLDER = "/PATH/TO/DATASETS/FOLDER"
DATASETS = ['EthanolConcentration', 'FaceDetection', 'Handwriting', 'Heartbeat',  'JapaneseVowels', 'PEMS-SF', 'SelfRegulationSCP1', 'SelfRegulationSCP2', 'SpokenArabicDigits', 'UWaveGestureLibrary']
DATASET_NAME = 'Heartbeat'

Загружаем данные из .ts в DataFrame

In [None]:
X_train_df, y_train_df = load_from_tsfile_to_dataframe(f'{PATH_TO_DATASETS_FOLDER}/{DATASET_NAME}/{DATASET_NAME}_TRAIN.ts')
X_test_df, y_test_df = load_from_tsfile_to_dataframe(f'{PATH_TO_DATASETS_FOLDER}/{DATASET_NAME}/{DATASET_NAME}_TEST.ts')

TRAIN_LENGTHS_EQUAL = TEST_TS_LENGTH_EQUALITY(X_train_df)
TEST_LENGTHS_EQUAL = TEST_TS_LENGTH_EQUALITY(X_test_df)
print(f"Train dataset has all lengths equal: {TRAIN_LENGTHS_EQUAL}\nTest dataset has all lengths equal: {TEST_LENGTHS_EQUAL}")

In [None]:
display(X_train_df)

Переводим в numpy массивы

In [None]:
if TRAIN_LENGTHS_EQUAL:
    X_train_np = sktime_df_to_numpy(X_train_df)
else:
    X_train_np = sktime_df_to_variable_length_list(X_train_df)
    
if TEST_LENGTHS_EQUAL:
    X_test_np = sktime_df_to_numpy(X_test_df)
else:
    X_test_np = sktime_df_to_variable_length_list(X_test_df)
    
y_train_np, y_test_np, classes = encode_labels(y_train_df, y_test_df)

Добавляем фичу времени

In [None]:
if TRAIN_LENGTHS_EQUAL:
    X_train_np_t = AddTimeline(X_train_np)
else:
    X_train_np_t = AddTimeline_variable(X_train_np)

if TEST_LENGTHS_EQUAL:
    X_test_np_t = AddTimeline(X_test_np)
else:
    X_test_np_t = AddTimeline_variable(X_test_np)

In [None]:
train_size = len(X_train_np_t)
train_features_amount = X_train_np_t[0].shape[1]
test_size = len(X_test_np_t)
test_features_amount = X_test_np_t[0].shape[1]

print(f"Размер train: {train_size}, итоговое количество фичей:{train_features_amount}")
print(f"Размер test: {test_size}, итоговое количество фичей:{test_features_amount}")

Выбор уровня сигнатуры (первый аргумент - количество фичей, вторая - уровень)

In [None]:
SIG_LEVEL = 2
print("Длина сигнатуры:", iisignature.siglength(train_features_amount, SIG_LEVEL))

In [None]:
if TRAIN_LENGTHS_EQUAL:
    X_train_sig = sig_data(X_train_np_t, SIG_LEVEL)
else:
    X_train_sig = sig_variable_length_data(X_train_np_t, SIG_LEVEL)

if TEST_LENGTHS_EQUAL: 
    X_test_sig = sig_data(X_test_np_t, SIG_LEVEL)
else:
    X_test_sig = sig_variable_length_data(X_test_np_t, SIG_LEVEL)

In [None]:
print("Итоговая форма train:", X_train_sig.shape)
print("Итоговая форма test:", X_test_sig.shape)

In [None]:
model_params = {
    "dropout_rate" : 0.2
}

train_params = {
    "num_epochs": 500,        #Влияет на скорость уменьшения learning rate см. grid_search_kfold_and_finetune()
    "num_classes" : len(classes),
    "folds" : 5,
    "lr": 0.001,
    "batch_size": 4,
    "weight_decay": 1e-4,
    "patience" : 50,         #Сколько эпох ждёт улучшения accuracy на min_delta EarlyStopper
    "min_delta" : 0.01,
    "finetune_epochs" : 10   #Дообучение на всём датасете после валидации
}

#ВСЕГДА К ДАННЫМ ПРИМЕНЯЕТСЯ StandardScaler()

hidden_layer_grid = [ [128, 64], [256, 128, 64], [512, 256, 128, 64], [1024, 512, 256, 128, 64] ]       #Набор гиперпараметров

full_sig_length = X_train_sig.shape[1]
sig_len_grid = [full_sig_length, full_sig_length // 2, full_sig_length // 4, full_sig_length // 8]      #Набор гиперпараметров

In [None]:
bootstrap_amount = 100
best_params, best_mean_acc, final_model, best_scaler = grid_search_kfold_and_finetune(X_train_sig, y_train_np,
                                                     model_params_base = model_params, train_params=train_params,   
                                                     hidden_layer_grid = hidden_layer_grid,
                                                     sig_len_grid = sig_len_grid)

X_test = best_scaler.transform(X_test_sig[:, :best_params['sig_len']])
mean_acc_on_bootstrap, var_on_bootstrap, acc_on_original_test = evaluate_with_bootstrap(final_model,  X_test, y_test_np, bootstrap_amount)

print("Лучшие параметры:", best_params, "Mean accuracy on validation:", best_mean_acc)
print("Accuracy на тесте:", acc_on_original_test)
print(f"Mean accuracy +- variance on bootstrap: {mean_acc_on_bootstrap} +- {var_on_bootstrap}")

In [None]:
PATH_TO_CSV = "/PATH/TO/CSV.csv"

write_results_to_csv(PATH_TO_CSV, DATASET_NAME, acc_on_original_test, mean_acc_on_bootstrap, var_on_bootstrap, best_params)