In [2]:
# BLOCK 1

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import KFold
from tqdm import tqdm
import os
from datetime import datetime
import optuna


  from .autonotebook import tqdm as notebook_tqdm


In [3]:
# BLOCK 2

import numpy as np
import os

def load_data(directory):
    files = [f for f in os.listdir(directory) if f.endswith('.npz')]
    metrics_list = []
    labels_list = []

    for file in sorted(files):
        data = np.load(os.path.join(directory, file), allow_pickle=True)
        metrics = data['metrics']
        labels = data['labels']

        probabilities_list = []
        entropy_list = []
        similarity_list = []
        lengths_list = []

        for entry in metrics:
            probabilities_list.append(entry['probabilities'])
            entropy_list.append(entry['entropy'])
            similarity_list.append(entry['similarity'])
            lengths_list.append(entry['lengths'])

        probabilities_array = np.stack(probabilities_list)
        entropy_array = np.stack(entropy_list)
        similarity_array = np.stack(similarity_list)
        lengths_array = np.stack(lengths_list)

        combined_metrics = np.stack((probabilities_array, entropy_array, similarity_array, lengths_array), axis=-1)

        metrics_list.append(combined_metrics)
        labels_list.extend(labels)

    metrics_combined = np.concatenate(metrics_list, axis=0)
    labels_combined = np.array(labels_list, dtype=np.int32)

    return metrics_combined, labels_combined

In [4]:
# BLOCK 3

import torch
import torch.nn as nn
import torch.nn.functional as F

class CNNModel(nn.Module):
    def __init__(self, conv1_filters=32, conv2_filters=64, conv3_filters=128, fc1_units=128, dropout_rate=0.5):
        super(CNNModel, self).__init__()
        self.conv1 = nn.Conv2d(4, conv1_filters, kernel_size=(5, 1), padding=(2, 0))
        self.pool = nn.MaxPool2d((2, 1))
        self.conv2 = nn.Conv2d(conv1_filters, conv2_filters, kernel_size=(5, 1), padding=(2, 0))
        self.conv3 = nn.Conv2d(conv2_filters, conv3_filters, kernel_size=(5, 1), padding=(2, 0))

        final_dim = 256 // (2**3)
        self.fc1 = nn.Linear(conv3_filters * final_dim * 1, fc1_units)
        self.dropout = nn.Dropout(dropout_rate)
        self.fc2 = nn.Linear(fc1_units, 2)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        x = x.view(x.size(0), -1)
        x = self.dropout(F.relu(self.fc1(x)))
        x = self.fc2(x)
        return x


In [5]:
# BLOCK 4

from datetime import datetime

def train_and_evaluate(X, y, fold_count=7, conv1_filters=32, conv2_filters=64, conv3_filters=128, fc1_units=128, dropout_rate=0.5):
    kf = KFold(n_splits=fold_count)
    results = []
    best_model = None
    best_f1 = 0.0

    for fold, (train_index, val_index) in enumerate(kf.split(X)):
        X_train, X_val = X[train_index], X[val_index]
        y_train, y_val = y[train_index], y[val_index]

        print(f"Fold {fold + 1}:")
        print(f"  X_train shape: {X_train.shape}")
        print(f"  y_train shape: {y_train.shape}")
        print(f"  X_val shape: {X_val.shape}")
        print(f"  y_val shape: {y_val.shape}")

        num_samples_train = X_train.shape[0]
        num_samples_val = X_val.shape[0]

        X_train_tensor = torch.tensor(X_train, dtype=torch.float32).permute(0, 2, 1).unsqueeze(3)
        y_train_tensor = torch.tensor(y_train, dtype=torch.long)
        X_val_tensor = torch.tensor(X_val, dtype=torch.float32).permute(0, 2, 1).unsqueeze(3)
        y_val_tensor = torch.tensor(y_val, dtype=torch.long)

        train_loader = DataLoader(TensorDataset(X_train_tensor, y_train_tensor), batch_size=16, shuffle=True)
        val_loader = DataLoader(TensorDataset(X_val_tensor, y_val_tensor), batch_size=16, shuffle=False)

        model = CNNModel(conv1_filters, conv2_filters, conv3_filters, fc1_units, dropout_rate)
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr=0.001)

        model.train()
        for epoch in tqdm(range(20), desc=f'Training fold {fold + 1}'):
            for inputs, labels in train_loader:
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

        model.eval()
        with torch.no_grad():
            val_predictions = []
            val_true = []
            for inputs, labels in val_loader:
                outputs = model(inputs)
                _, predicted = torch.max(outputs.data, 1)
                val_predictions.extend(predicted.numpy())
                val_true.extend(labels.numpy())

        accuracy = accuracy_score(val_true, val_predictions)
        precision = precision_score(val_true, val_predictions, average='macro')
        recall = recall_score(val_true, val_predictions, average='macro')
        f1 = f1_score(val_true, val_predictions, average='macro')

        print(f"  Results for fold {fold + 1}:")
        print(f"    Accuracy: {accuracy}")
        print(f"    Precision: {precision}")
        print(f"    Recall: {recall}")
        print(f"    F1 Score: {f1}")
        results.append((accuracy, precision, recall, f1))

        if f1 > best_f1:
            best_f1 = f1
            best_model = model.state_dict()

    avg_results = np.mean(results, axis=0)
    print("Average across folds:")
    print(f"  Accuracy: {avg_results[0]}")
    print(f"  Precision: {avg_results[1]}")
    print(f"  Recall: {avg_results[2]}")
    print(f"  F1 Score: {avg_results[3]}")
    print("Training and cross-validation completed.")
 
    return best_model, results



In [6]:
# BLOCK 5

# Define global variables for the data
X_train_sample, y_train_sample = None, None

def objective(trial):
    conv1_filters = trial.suggest_int('conv1_filters', 16, 64, step=16)
    conv2_filters = trial.suggest_int('conv2_filters', 32, 128, step=32)
    conv3_filters = trial.suggest_int('conv3_filters', 64, 256, step=64)
    fc1_units = trial.suggest_int('fc1_units', 64, 256, step=64)
    dropout_rate = trial.suggest_float('dropout_rate', 0.3, 0.7, step=0.1)

    best_model, results = train_and_evaluate(X_train_sample, y_train_sample, fold_count=3, conv1_filters=conv1_filters, conv2_filters=conv2_filters, conv3_filters=conv3_filters, fc1_units=fc1_units, dropout_rate=dropout_rate)
    avg_f1 = np.mean([result[3] for result in results])
    return avg_f1

In [7]:
# BLOCK 6

# Load data directly from files
train_metrics, train_labels = load_data('./npz_results/train')
val_metrics, val_labels = load_data('./npz_results/val')
test_metrics, test_labels = load_data('./npz_results/test')

# Cap the data size to 10% for hyperparameter tuning
def sample_data(X, y, sample_fraction=0.1):
    sample_size = int(len(X) * sample_fraction)
    indices = np.random.choice(len(X), sample_size, replace=False)
    return X[indices], y[indices]

X_train_sample, y_train_sample = sample_data(train_metrics, train_labels, sample_fraction=0.1)
X_val_sample, y_val_sample = sample_data(val_metrics, val_labels, sample_fraction=0.1)
X_test_sample, y_test_sample = sample_data(test_metrics, test_labels, sample_fraction=0.1)

# Full data for final training
X_train_full, y_train_full = train_metrics, train_labels
X_val_full, y_val_full = val_metrics, val_labels
X_test_full, y_test_full = test_metrics, test_labels



In [None]:
# BLOCK 7

study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=10)

best_trial = study.best_trial
print(f"Best trial: {best_trial.values}")
print(f"Best hyperparameters: {best_trial.params}")

# BLOCK 8

best_hyperparameters = best_trial.params
best_model, results = train_and_evaluate(X_train_full, y_train_full, fold_count=7, **best_hyperparameters)

avg_results = np.mean(results, axis=0)
print("Final training with best hyperparameters:")
print(f"  Accuracy: {avg_results[0]}")
print(f"  Precision: {avg_results[1]}")
print(f"  Recall: {avg_results[2]}")
print(f"  F1 Score: {avg_results[3]}")

now = datetime.now()
timestamp = now.strftime("%Y%m%d_%H%M%S")
filename = f"best_pure_metrics_cnn_model_{timestamp}.pth"
print(f"Model will be saved as: {filename}")

if best_model is not None:
    torch.save(best_model, filename)
    print(f"Best model saved to '{filename}'.")