In [1]:
import os
from pathlib import Path
from typing import Dict, Any, List, Type, Union, Tuple

import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.layers import (
    Input, Conv1D, BatchNormalization, ReLU,
    MaxPooling1D, Dropout, Bidirectional, LSTM,
    Dense, Multiply, Softmax, Lambda
)
from tensorflow.keras.optimizers import Adam, RMSprop
from tensorflow.keras.utils import to_categorical
from sklearn.metrics import (
    balanced_accuracy_score, matthews_corrcoef,
    roc_auc_score, average_precision_score
)
from sklearn.model_selection import ParameterGrid
import tensorflow.keras.backend as K

In [2]:
# Шляхи до даних
DATA_DIR = Path("../NN Datasets")
RESULTS_PATH = Path("grid_search_val_test_metrics.csv")

In [3]:
WINDOW_SIZE = 20
NUM_FEATURES = 34  # кількість ознак у X
NUM_CLASSES = 7    # кількість класів для класифікації

In [4]:
PARAM_GRID: Dict[str, List[Any]] = {
    'filters': [32, 64, 128],
    'kernel_size': [3, 5, 7],
    'pool_size': [2, 3],
    'lstm_units': [64, 128, 256],
    'lstm_layers': [1, 2],
    'dropout_rate': [0.2, 0.3, 0.5],
    'recurrent_dropout': [0.1, 0.2],
    'activation': ['relu', 'selu'],
    'kernel_initializer': ['he_uniform', 'glorot_uniform'],
    'optimizer': [Adam, RMSprop],
    'optimizer__learning_rate': [1e-2, 1e-3, 1e-4],
    'batch_size': [64, 128, 256],
    'epochs': [10, 20, 50]
}

In [5]:
CALLBACKS = [
    EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True),
    ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6)
]

In [6]:
def load_csv(path: Path, drop_cols: List[str]) -> pd.DataFrame:
    """
    Завантажує CSV у DataFrame та видаляє непотрібні стовпці.

    :param path: шлях до файлу CSV
    :param drop_cols: список назв колонок для видалення
    :return: очищений DataFrame
    """
    df = pd.read_csv(path)
    df = df.drop(columns=drop_cols, errors='ignore')
    return df

In [7]:
def prepare_windowed_data(
    X: pd.DataFrame,
    y: pd.DataFrame,
    window_size: int,
    num_classes: int
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Розбиває послідовність на вікна фіксованого розміру і повертає X у форматі
    (num_windows, window_size, num_features) та one-hot закодовані мітки за останній
    елемент кожного вікна.
    """
    X_arr = X.values
    y_arr = y.values.squeeze()

    n_rows = X_arr.shape[0]
    n_windows = n_rows // window_size
    usable = n_windows * window_size
    if usable < n_rows:
        print(f"Обрізаю {n_rows - usable} рядків до {usable} для {n_windows} вікон")

    X_trim = X_arr[:usable]
    y_trim = y_arr[:usable]

    X_windows = X_trim.reshape(n_windows, window_size, X_arr.shape[1])
    y_last = y_trim.reshape(n_windows, window_size)[:, -1]
    y_windows = to_categorical(y_last, num_classes)
    return X_windows, y_windows

In [13]:
def create_model(
    window_size: int,
    num_features: int,
    num_classes: int,
    params: Dict[str, Any]
) -> Model:
    """
    Будує 1D-CNN + BiLSTM модель з механізмом уваги на основі параметрів.

    :param window_size: довжина вікна часових рядів
    :param num_features: кількість ознак
    :param num_classes: кількість класів для класифікації
    :param params: словник з гіперпараметрами
    :return: скомпільована модель Keras
    """
    inp = Input(shape=(window_size, num_features))
    x = inp

    # Блок CNN
    x = Conv1D(params['filters'], params['kernel_size'], padding='same',
               kernel_initializer=params['kernel_initializer'])(x)
    x = BatchNormalization()(x)
    x = ReLU()(x)
    x = Conv1D(params['filters'], params['kernel_size'] + 2, padding='same',
               kernel_initializer=params['kernel_initializer'])(x)
    x = BatchNormalization()(x)
    x = ReLU()(x)
    x = MaxPooling1D(params['pool_size'])(x)
    x = Dropout(params['dropout_rate'])(x)

    # Блок BiLSTM
    for _ in range(params['lstm_layers']):
        x = Bidirectional(
            LSTM(params['lstm_units'], return_sequences=True,
                 dropout=params['dropout_rate'],
                 recurrent_dropout=params['recurrent_dropout'])
        )(x)
    x = Dropout(params['dropout_rate'])(x)

    # Механізм уваги
    attn = Dense(1, activation='tanh')(x)
    attn = Softmax(axis=1)(attn)
    context = Multiply()([x, attn])
    context = Lambda(lambda z: K.sum(z, axis=1))(context)

    # Голова класифікації
    x = Dense(128, activation=params['activation'],
              kernel_initializer=params['kernel_initializer'])(context)
    x = Dropout(params['dropout_rate'])(x)
    out = Dense(num_classes, activation='softmax')(x)

    model = Model(inputs=inp, outputs=out)
    optimizer_cls: Union[Type[tf.keras.optimizers.Optimizer], tf.keras.optimizers.Optimizer] = params['optimizer']
    optimizer = optimizer_cls(learning_rate=params['optimizer__learning_rate'])

    model.compile(
        optimizer=optimizer,
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    return model

In [None]:
def evaluate_model(
    model: Model,
    X_test: np.ndarray,
    y_test: np.ndarray
) -> Dict[str, float]:
    """
    Обчислює набір метрик для тестового набору.
    """
    y_prob = model.predict(X_test, verbose=0)
    y_pred = np.argmax(y_prob, axis=1)
    y_true = np.argmax(y_test, axis=1)

    return {
        'accuracy': np.mean(y_pred == y_true),
        'balanced_accuracy': balanced_accuracy_score(y_true, y_pred),
        'mcc': matthews_corrcoef(y_true, y_pred),
        'roc_auc': roc_auc_score(y_true, y_prob, multi_class='ovo', average='macro'),
        'avg_precision': average_precision_score(y_true, y_prob, average='macro')
    }


In [None]:
def run_grid_search():
    """
    Виконує перебір гіперпараметрів за PARAM_GRID,
    навчає та оцінює модель, зберігає результати.
    """
    # Завантаження даних
    X_train_df = load_csv(DATA_DIR / 'x_train.csv',
                           drop_cols=['Unnamed: 0', 'Init_Win_bytes_forward'])
    y_train_df = load_csv(DATA_DIR / 'y_train.csv', drop_cols=['Unnamed: 0'])
    X_val_df = load_csv(DATA_DIR / 'x_val.csv', drop_cols=['Unnamed: 0', 'Init_Win_bytes_forward'])
    y_val_df = load_csv(DATA_DIR / 'y_val.csv', drop_cols=['Unnamed: 0'])
    X_test_df = load_csv(DATA_DIR / 'x_test.csv', drop_cols=['Unnamed: 0', 'Init_Win_bytes_forward'])
    y_test_df = load_csv(DATA_DIR / 'y_test.csv', drop_cols=['Unnamed: 0'])

    # Підготовка вікон
    X_train, y_train = prepare_windowed_data(X_train_df, y_train_df, WINDOW_SIZE, NUM_CLASSES)
    X_val, y_val = prepare_windowed_data(X_val_df, y_val_df, WINDOW_SIZE, NUM_CLASSES)
    X_test, y_test = prepare_windowed_data(X_test_df, y_test_df, WINDOW_SIZE, NUM_CLASSES)

    results: List[Dict[str, Any]] = []

    for params in ParameterGrid(PARAM_GRID):
        print(f"Навчання з параметрами: {params}")
        # Відділяємо аргументи для моделі
        model_params = {k: v for k, v in params.items()
                        if k not in ('batch_size', 'epochs')}
        model = create_model(WINDOW_SIZE, NUM_FEATURES, NUM_CLASSES, model_params)

        # Навчання
        model.fit(
            X_train, y_train,
            validation_data=(X_val, y_val),
            batch_size=params['batch_size'],
            epochs=params['epochs'],
            callbacks=CALLBACKS,
            verbose=1
        )
        # Оцінка
        metrics = evaluate_model(model, X_test, y_test)
        # Збереження рядка результатів
        row = {**params, **{
            'test_accuracy': metrics['accuracy'],
            'test_balanced_accuracy': metrics['balanced_accuracy'],
            'test_mcc': metrics['mcc'],
            'test_roc_auc': metrics['roc_auc'],
            'test_avg_precision': metrics['avg_precision']
        }}
        results.append(row)
        # Очищення сесії для звільнення пам'яті
        tf.keras.backend.clear_session()

    df_results = pd.DataFrame(results)
    df_results.to_csv(RESULTS_PATH, index=False)
    print(f"Збережено результати у {RESULTS_PATH}")

In [None]:
run_grid_search()