In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
import pickle
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

In [None]:
# Load the data
with open("cnn_outputs.pkl", "rb") as f:
    data = pickle.load(f)

X_train = data["train_outputs"]
y_train = data["train_labels"]
X_valid = data["valid_outputs"]
y_valid = data["valid_labels"]

In [None]:
# 1. Determine and visualize the distribution of the data

def analyze_distribution(X, y, title):
    plt.figure(figsize=(15, 5))
    for i in range(X.shape[1]):
        plt.subplot(1, 3, i+1)
        for class_label in np.unique(y):
            sns.histplot(X[y == class_label, i], kde=True, label=f'Class {class_label}')
        plt.title(f'Distribution of Feature {i+1} by Class')
        plt.legend()
    plt.suptitle(title)
    plt.tight_layout()
    plt.show()

    for i in range(X.shape[1]):
        _, p_value = stats.normaltest(X[:, i])
        print(f"Feature {i+1} normality test p-value: {p_value}")

analyze_distribution(X_train, y_train, "Original Training Data Distribution")

In [None]:
# 2. Create more data based on the distribution (only for training set)


def create_more_data(X, y, multiplier=2):
    new_X = []
    new_y = []
    for class_label in np.unique(y):
        class_X = X[y == class_label]
        class_mean = np.mean(class_X, axis=0)
        class_cov = np.cov(class_X.T)

        new_samples = stats.multivariate_normal.rvs(
            mean=class_mean, cov=class_cov, size=class_X.shape[0] * (multiplier - 1)
        )
        new_X.append(np.vstack((class_X, new_samples)))
        new_y.extend([class_label] * (class_X.shape[0] * multiplier))

    return np.vstack(new_X), np.array(new_y)


X_train_augmented, y_train_augmented = create_more_data(X_train, y_train, multiplier=200)

analyze_distribution(
    X_train_augmented, y_train_augmented, "Augmented Training Data Distribution"
)

In [None]:
# 3. Prepare data for training

# Normalize the data
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_valid_scaled = scaler.transform(X_valid)
X_train_aug_scaled = scaler.fit_transform(X_train_augmented)

# Convert to PyTorch tensors
X_train_tensor = torch.FloatTensor(X_train_scaled)
y_train_tensor = torch.LongTensor(y_train)
X_valid_tensor = torch.FloatTensor(X_valid_scaled)
y_valid_tensor = torch.LongTensor(y_valid)

X_train_aug_tensor = torch.FloatTensor(X_train_aug_scaled)
y_train_aug_tensor = torch.LongTensor(y_train_augmented)

# Create DataLoaders
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=120, shuffle=True)

train_aug_dataset = TensorDataset(X_train_aug_tensor, y_train_aug_tensor)
train_aug_loader = DataLoader(train_aug_dataset, batch_size=120, shuffle=True)

valid_dataset = TensorDataset(X_valid_tensor, y_valid_tensor)
valid_loader = DataLoader(valid_dataset, batch_size=120, shuffle=False)

In [None]:
# 4. Define the model


class H97_ANN(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(H97_ANN, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        out = self.fc1(x)
        out = self.relu(out)
        out = self.fc2(out)
        out = self.relu(out)
        out = self.fc3(out)
        return out

In [None]:
# 5. Train the model with early stopping


def train_model(
    model, train_loader, valid_loader, criterion, optimizer, num_epochs=100, patience=10, model_name="best_model_H97_ANN.pth"
):
    train_losses = []
    valid_losses = []
    best_valid_loss = float("inf")
    epochs_no_improve = 0

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        train_loss /= len(train_loader)
        train_losses.append(train_loss)

        model.eval()
        valid_loss = 0.0
        with torch.no_grad():
            for inputs, labels in valid_loader:
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                valid_loss += loss.item()
        valid_loss /= len(valid_loader)
        valid_losses.append(valid_loss)

        print(
            f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Valid Loss: {valid_loss:.4f}"
        )

        if valid_loss < best_valid_loss:
            best_valid_loss = valid_loss
            epochs_no_improve = 0
            torch.save(model.state_dict(), model_name)
        else:
            epochs_no_improve += 1

        if epochs_no_improve == patience:
            print("Early stopping!")
            model.load_state_dict(torch.load(model_name))
            break

    return model, train_losses, valid_losses

In [None]:
# 6. Train and evaluate models

def train_and_evaluate(X_train, y_train, X_valid, y_valid, title):
    model = H97_ANN(X_train.shape[1], 97, len(np.unique(y_train)))
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters())

    train_dataset = TensorDataset(torch.FloatTensor(X_train), torch.LongTensor(y_train))
    train_loader = DataLoader(train_dataset, batch_size=120, shuffle=True)
    valid_dataset = TensorDataset(torch.FloatTensor(X_valid), torch.LongTensor(y_valid))
    valid_loader = DataLoader(valid_dataset, batch_size=120, shuffle=False)

    model, train_losses, valid_losses = train_model(model, train_loader, valid_loader, criterion, optimizer, model_name=f"best_model_H97_ANN_{title}.pth")

    # Evaluate the model
    model.eval()
    with torch.no_grad():
        valid_outputs = model(torch.FloatTensor(X_valid))
        _, predicted = torch.max(valid_outputs.data, 1)
        accuracy = (predicted == torch.LongTensor(y_valid)).float().mean()
    
    print(f"{title} - Validation accuracy: {accuracy:.4f}")

    # Visualize predictions
    y_pred = nn.Softmax(dim=1)(valid_outputs).numpy()
    plt.figure(figsize=(10, 8))
    for i in range(3):
        for j in range(3):
            if i != j:
                plt.subplot(3, 3, i*3 + j + 1)
                scatter = plt.scatter(y_pred[:, i], y_pred[:, j], c=y_valid, cmap='viridis', alpha=0.5)
                plt.xlabel(f'Class {i} probability')
                plt.ylabel(f'Class {j} probability')
    plt.colorbar(scatter)
    plt.suptitle(f'{title} - Prediction Probabilities')
    plt.tight_layout()
    plt.show()

    # Plot training and validation loss
    plt.figure(figsize=(10, 5))
    plt.plot(train_losses, label='Train Loss')
    plt.plot(valid_losses, label='Validation Loss')
    plt.title(f'{title} - Training and Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

# Train and evaluate model without augmentation
train_and_evaluate(X_train_scaled, y_train, X_valid_scaled, y_valid, "Model_without_Augmentation")

# Train and evaluate model with augmentation
train_and_evaluate(X_train_aug_scaled, y_train_augmented, X_valid_scaled, y_valid, "Model_with_Augmentation")