In [3]:
%load_ext autoreload
%autoreload 2

# How to run
There are three section in this notebook. 
The first one is transition matrix estimation. The second one relates to CNN and the last one relates to MLP.

### Installation
<code>pip install -r requirements.txt</code>

### Usage
1. copy the dataset file under **/data** folder
2. unzip the dataset zip file
3. the structure of **/data** folder should be 

<code>
data/
└── 2024_A2_datasets/
    ├── CIFAR10.npz
    ├── FashionMNIST0.3.npz
    └── FashionMNIST0.6.npz
</code>

4. Run /Assignment2_final.ipynb



# Transition matrix estimation

In [1]:
from utils.est.load import load_data
import numpy as np
from utils.est.simpleCNN import simple_cnn
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
from collections import OrderedDict
from utils.est.modeltrain import train
from utils.est.modeltest import test
from utils.est.help import seed_torch, accuracy, AverageMeter, load_model
from utils.est.Dataloader import get_loader

In [2]:
cifar10X_tr, cifar10y_tr,cifar10X_ts,cifar10y_ts = load_data("data/2024_A2_datasets/CIFAR10.npz")
cifar10X_tr.shape

(20000, 3, 32, 32)

In [3]:
train_loader, test_loader = get_loader(data=(cifar10X_tr, cifar10y_tr, cifar10X_ts, cifar10y_ts))
model = simple_cnn(in_channels=3, num_classes=4)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
import torch.optim as optim

# define model, loss function, optimizer
model = simple_cnn(in_channels=3, num_classes=4).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# set epoch
num_epochs = 10

# train and validation
for epoch in range(1, num_epochs + 1):
    print(f"Epoch {epoch}/{num_epochs}")

    train(epoch, model, optimizer, criterion, train_loader)

    val_accuracy = test(epoch, model, criterion, test_loader, is_test=False)

print("Evaluating on test set...")
test_accuracy = test(num_epochs, model, criterion, test_loader, is_test=True)

print(f"Final Test Accuracy: {test_accuracy:.2f}%")

Epoch 1/10
Train epoch  1  Accuracy  52.7
Validate epoch  1  Accuracy  67.425
Epoch 2/10
Train epoch  2  Accuracy  64.58
Validate epoch  2  Accuracy  72.55
Epoch 3/10
Train epoch  3  Accuracy  67.29
Validate epoch  3  Accuracy  76.3
Epoch 4/10
Train epoch  4  Accuracy  69.315
Validate epoch  4  Accuracy  78.125
Epoch 5/10
Train epoch  5  Accuracy  71.26
Validate epoch  5  Accuracy  80.6
Epoch 6/10
Train epoch  6  Accuracy  72.28
Validate epoch  6  Accuracy  79.9
Epoch 7/10
Train epoch  7  Accuracy  74.04
Validate epoch  7  Accuracy  80.725
Epoch 8/10
Train epoch  8  Accuracy  75.085
Validate epoch  8  Accuracy  81.725
Epoch 9/10
Train epoch  9  Accuracy  76.465
Validate epoch  9  Accuracy  82.15
Epoch 10/10
Train epoch  10  Accuracy  77.83
Validate epoch  10  Accuracy  81.875
Evaluating on test set...
Test epoch  10  Accuracy  81.875
Final Test Accuracy: 81.88%


In [5]:
from utils.est.estimate_transition_matrix import estimate_transition_matrix
from utils.est.create_anchor_loader import create_anchor_loader

In [6]:
anchor_loaders = create_anchor_loader(train_loader, model, threshold=0.6)

transition_matrix = estimate_transition_matrix(model, anchor_loaders, num_classes=4)
print("Estimated Transition Matrix:")
print(transition_matrix)

Estimated Transition Matrix:
[[0.86536193 0.09298614 0.02862846 0.01302442]
 [0.02364569 0.88456053 0.09023315 0.00156059]
 [0.03252944 0.01227329 0.85053104 0.10466633]
 [0.1318707  0.00474575 0.07453028 0.78885317]]


In [7]:
from PIL import Image

def preprocess_images(X):
    X_resized = []
    for img in X:
        #from (1, 28, 28) to (28, 28)
        img = img.squeeze(0)
        
        # grey to RGB (28, 28) -> (28, 28, 3)
        img_rgb = np.stack([img] * 3, axis=-1) 
        
        # PIL Image
        img_rgb_pil = Image.fromarray(img_rgb.astype(np.uint8))
        img_resized_pil = img_rgb_pil.resize((32, 32), Image.BILINEAR)  #resize
        
        img_resized = np.array(img_resized_pil)
        
        img_resized = np.transpose(img_resized, (2, 0, 1))  # 转换为 (3, 32, 32)
        
        X_resized.append(img_resized)
    
    return np.array(X_resized)

In [8]:
f3X_tr, f3y_tr,f3X_ts,f3y_ts = load_data("data/2024_A2_datasets/FashionMNIST0.3.npz")
print(f3X_tr.shape)

(24000, 1, 28, 28)


In [9]:
f3Xtr_resized = preprocess_images(f3X_tr)
f3X_ts_resized = preprocess_images(f3X_ts)

In [10]:
train_loader2, test_loader2 = get_loader(data=(f3Xtr_resized, f3y_tr, f3X_ts_resized, f3y_ts))
model2 = simple_cnn(in_channels=3, num_classes=4)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [11]:
import torch.optim as optim

# define model, loss function
model2 = simple_cnn(in_channels=3, num_classes=4).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model2.parameters(), lr=0.001)

# set epoch
num_epochs = 10

# train and validation
for epoch in range(1, num_epochs + 1):
    print(f"Epoch {epoch}/{num_epochs}")

    train(epoch, model2, optimizer, criterion, train_loader2)

    val_accuracy = test(epoch, model2, criterion, test_loader2, is_test=False)

print("Evaluating on test set...")
test_accuracy = test(num_epochs, model2, criterion, test_loader2, is_test=True)

print(f"Final Test Accuracy: {test_accuracy:.2f}%")

Epoch 1/10
Train epoch  1  Accuracy  43.7125
Validate epoch  1  Accuracy  62.05
Epoch 2/10
Train epoch  2  Accuracy  47.670833333333334
Validate epoch  2  Accuracy  63.1
Epoch 3/10
Train epoch  3  Accuracy  48.425
Validate epoch  3  Accuracy  63.375
Epoch 4/10
Train epoch  4  Accuracy  49.0875
Validate epoch  4  Accuracy  63.05
Epoch 5/10
Train epoch  5  Accuracy  49.67916666666667
Validate epoch  5  Accuracy  63.7
Epoch 6/10
Train epoch  6  Accuracy  50.09583333333333
Validate epoch  6  Accuracy  62.275
Epoch 7/10
Train epoch  7  Accuracy  50.71666666666667
Validate epoch  7  Accuracy  61.75
Epoch 8/10
Train epoch  8  Accuracy  51.67916666666667
Validate epoch  8  Accuracy  62.3
Epoch 9/10
Train epoch  9  Accuracy  52.35
Validate epoch  9  Accuracy  57.925
Epoch 10/10
Train epoch  10  Accuracy  53.90833333333333
Validate epoch  10  Accuracy  60.075
Evaluating on test set...
Test epoch  10  Accuracy  60.075
Final Test Accuracy: 60.08%


In [12]:
from utils.est.estimate_transition_matrix import estimate_transition_matrix
from utils.est.create_anchor_loader import create_anchor_loader

In [13]:
anchor_loaders = create_anchor_loader(train_loader2, model2, threshold=0.6)


transition_matrix2 = estimate_transition_matrix(model2, anchor_loaders, num_classes=4)
print("Estimated Transition Matrix:")
print(transition_matrix2)

Estimated Transition Matrix:
[[0.77741247 0.12019797 0.0281337  0.0742563 ]
 [0.02998707 0.74413919 0.21209586 0.01377792]
 [0.02160717 0.03702152 0.78858835 0.15278149]
 [0.11052448 0.01863763 0.05449284 0.81634617]]


In [14]:
import numpy as np

# Define the two matrices
true_matrix = np.array([
    [0.7, 0.3, 0, 0],
    [0, 0.7, 0.3, 0],
    [0, 0, 0.7, 0.3],
    [0.3, 0, 0, 0.7]
])

estimated_matrix = np.array([
    [0.77019763, 0.10626329, 0.03162862, 0.09191043],
    [0.02642049, 0.75073814, 0.2105207,  0.0123205 ],
    [0.01650259, 0.01779101, 0.81544185, 0.15026474],
    [0.18038471, 0.01821926, 0.05068507, 0.75071126]
])

# Calculate Mean Squared Error and Mean Absolute Error
mse = np.mean((true_matrix - estimated_matrix) ** 2)
mae = np.mean(np.abs(true_matrix - estimated_matrix))

mse, mae

(np.float64(0.007466059336063282), np.float64(0.069070838125))

# Hyperparameter Optimization and Regularization on Fashion-MNIST Classification

In [2]:
import torch
from torch.utils.data import TensorDataset, DataLoader
import numpy as np

# Check if MPS is available
if torch.backends.mps.is_available():
    device = torch.device('mps')
    print("Using MPS device")
elif torch.cuda.is_available():
    device = torch.device('cuda')
    print("Using CUDA device")
else:
    device = torch.device('cpu')


# device = torch.device('cpu')
device

Using MPS device


device(type='mps')

In [3]:
from utils.dataloader.FashionMNISTDataLoader import FashionMNISTDataLoader, CIFAR10DataLoader

path = 'data/2024_A2_datasets/FashionMNIST0.6.npz'
data_loader = FashionMNISTDataLoader(path=path, batch_size=64, sample_size=0.0001, train_percentage=0.8, device=device) 
train_loader, eval_loader, test_loader = data_loader.get_loaders()

print('shape of train data:', data_loader.get_shape_of_sample())   

path_cifar = 'data/2024_A2_datasets/CIFAR10.npz'
data_loader_cifar = CIFAR10DataLoader(path=path_cifar, batch_size=64, sample_size=1, train_percentage=0.8, device=device)

print('shape of train data:', data_loader_cifar.get_shape_of_sample())


shape of train data: (1, 28, 28)
shape X_train torch.Size([16000, 3, 32, 32])
shape X_test torch.Size([4000, 3, 32, 32])
shape of train data: (3, 32, 32)


In [26]:
from utils.tmatrix.transition_matrix import T_MATRIX_MNIST_0_6  # Ensure correct import
from utils.models.hyperparametertuning import OptunaOptimization

# Define path to your dataset
path = 'data/2024_A2_datasets/FashionMNIST0.6.npz'

# Initialize the transition matrix
t_matrix = T_MATRIX_MNIST_0_6()

# Initialize the OptunaOptimization class with a small sample size for testing
optimizer = OptunaOptimization(
    path=path,
    study_name='fashion_mnist_0_6_with_t_matrix',
    device=device,
    t_matrix=t_matrix,
    repetitions=2,
    sample_size=0.8  # 0.5% of the data for quick testing
)

# Run optimization with a reduced number of trials for testing purposes
best_trial = optimizer.run_optimization(n_trials=25)  # Start with 10 trials for testing

[I 2024-10-31 19:28:37,339] Using an existing study with name 'fashion_mnist_0_6_with_t_matrix' instead of creating a new one.
  lr = trial.suggest_loguniform('lr', 1e-5, 1e-2)
  dropout = trial.suggest_uniform('dropout', 0.1, 0.5)
  lambda_l1 = trial.suggest_loguniform('lambda_l1', 1e-5, 1e-2)
[I 2024-10-31 19:31:58,929] Trial 1 finished with value: 0.37623282358156024 and parameters: {'lr': 0.0014847852286384534, 'dropout': 0.3024205954332167, 'kernel_size_conv1': 2, 'conv1_channels': 32, 'conv2_channels': 256, 'fc_size': 128, 'batch_size': 128, 'use_batch_norm': True, 'epochs': 47, 'add_l1': True, 'criterion': 'nf_land_rce', 'lambda_l1': 3.9744845641176954e-05}. Best is trial 1 with value: 0.37623282358156024.
[I 2024-10-31 19:38:18,799] Trial 2 finished with value: 0.2555989583333334 and parameters: {'lr': 8.964048179998964e-05, 'dropout': 0.27441583603393144, 'kernel_size_conv1': 5, 'conv1_channels': 64, 'conv2_channels': 64, 'fc_size': 128, 'batch_size': 32, 'use_batch_norm': Tru

Number of finished trials:  26
Best trial:
  Validation Accuracy: 0.4302
  Best hyperparameters: 
    lr: 3.5027271666254944e-05
    dropout: 0.41989665422347583
    kernel_size_conv1: 4
    conv1_channels: 128
    conv2_channels: 256
    fc_size: 64
    batch_size: 64
    use_batch_norm: True
    epochs: 49
    add_l1: False
    criterion: cross_entropy


In [34]:
from utils.tmatrix.transition_matrix import T_MATRIX_MNIST_0_3  # Ensure correct import
from utils.models.hyperparametertuning import OptunaOptimization

# Define path to your dataset
path = 'data/2024_A2_datasets/FashionMNIST0.3.npz'

# Initialize the transition matrix
t_matrix = T_MATRIX_MNIST_0_3()

# Initialize the OptunaOptimization class with a small sample size for testing
optimizer = OptunaOptimization(
    path=path,
    study_name='fashion_mnist_0_3_with_t_matrix',
    device=device,
    t_matrix=t_matrix,
    repetitions=2,
    sample_size=0.8  # 0.5% of the data for quick testing
)

# Run optimization with a reduced number of trials for testing purposes
best_trial = optimizer.run_optimization(n_trials=25)  # Start with 10 trials for testing

[I 2024-10-31 22:36:00,268] A new study created in RDB with name: fashion_mnist_0_3_with_t_matrix
  lr = trial.suggest_loguniform('lr', 1e-5, 1e-2)
  dropout = trial.suggest_uniform('dropout', 0.1, 0.5)
  lambda_l1 = trial.suggest_loguniform('lambda_l1', 1e-5, 1e-2)
[I 2024-10-31 22:38:57,575] Trial 0 finished with value: 0.24716222426470597 and parameters: {'lr': 0.0004300025372090388, 'dropout': 0.2566895741873441, 'kernel_size_conv1': 2, 'conv1_channels': 128, 'conv2_channels': 128, 'fc_size': 256, 'batch_size': 128, 'use_batch_norm': False, 'epochs': 34, 'add_l1': True, 'criterion': 'nf_land_rce', 'lambda_l1': 0.0038553349283987656}. Best is trial 0 with value: 0.24716222426470597.
[I 2024-10-31 22:41:34,630] Trial 1 finished with value: 0.24134114583333338 and parameters: {'lr': 0.00022224689145275783, 'dropout': 0.37136168241193923, 'kernel_size_conv1': 2, 'conv1_channels': 64, 'conv2_channels': 128, 'fc_size': 64, 'batch_size': 128, 'use_batch_norm': False, 'epochs': 44, 'add_l1

Number of finished trials:  25
Best trial:
  Validation Accuracy: 0.6920
  Best hyperparameters: 
    lr: 0.0001148764661772254
    dropout: 0.4129162544473789
    kernel_size_conv1: 2
    conv1_channels: 64
    conv2_channels: 128
    fc_size: 64
    batch_size: 64
    use_batch_norm: True
    epochs: 37
    add_l1: False
    criterion: nf_land_rce


In [42]:
import optuna
from utils.models.CNN import CNNModel
## laod the best model 
study = optuna.load_study(study_name='fashion_mnist_0_3_with_t_matrix', storage='sqlite:///cnn_hyperparameter_tuning.db')

CNN_mnist_0_3 = CNNModel(t_matrix=t_matrix, 
                 device=device, 
                 lr = study.best_params['lr'],
                 dropout=study.best_params['dropout'],
                 kernel_size_conv = study.best_params['kernel_size_conv1'], 
                 conv_channels = [study.best_params['conv1_channels'], study.best_params['conv2_channels']],
                 use_batch_norm=study.best_params['use_batch_norm'],
                 loss = study.best_params['criterion'],
                 fc_layers_sizes=[study.best_params['fc_size']],
                 input_shape=(1, 28, 28),
                 num_classes=4)


CNN_mnist_0_3.load_state_dict(torch.load('weights/model0.6920045045045045.pth'))

# Evaluate the model
CNN_mnist_0_3.eval()  # Set the model to evaluation mode
correct = 0
total = 0
with torch.no_grad():
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = CNN_mnist_0_3(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f'Accuracy of the model on the test images: {100 * correct / total}%')



  CNN_mnist_0_3.load_state_dict(torch.load('weights/model0.6920045045045045.pth'))


Accuracy of the model on the test images: 94.475%


In [33]:
# get the study load it in with optuna 
import optuna

study = optuna.load_study(study_name='fashion_mnist_0_6_with_t_matrix', storage='sqlite:///cnn_hyperparameter_tuning.db')


print(study.best_params)

# evaluate the model with the best parameters on the test dataset 
from utils.models.CNN import CNNModel

# Initialize the model with the best parameters
model = CNNModel(t_matrix=t_matrix, 
                 device=device, 
                 lr = study.best_params['lr'],
                 dropout=study.best_params['dropout'],
                 kernel_size_conv = study.best_params['kernel_size_conv1'], 
                 conv_channels = [study.best_params['conv1_channels'], study.best_params['conv2_channels']],
                 use_batch_norm=study.best_params['use_batch_norm'],
                 loss = study.best_params['criterion'],
                 fc_layers_sizes=[study.best_params['fc_size']],
                 input_shape=(1, 28, 28),
                 num_classes=4)


# Load the best weights
model.load_state_dict(torch.load('weights/model0.4301525297619047.pth'))

# Evaluate the model
model.eval()  # Set the model to evaluation mode
correct = 0
total = 0
with torch.no_grad():
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f'Accuracy of the model on the test images: {100 * correct / total}%')


{'lr': 3.5027271666254944e-05, 'dropout': 0.41989665422347583, 'kernel_size_conv1': 4, 'conv1_channels': 128, 'conv2_channels': 256, 'fc_size': 64, 'batch_size': 64, 'use_batch_norm': True, 'epochs': 49, 'add_l1': False, 'criterion': 'cross_entropy'}


  model.load_state_dict(torch.load('weights/model0.4301525297619047.pth'))


Accuracy of the model on the test images: 56.6%


In [38]:
# Import necessary libraries
from utils.dataloader.FashionMNISTDataLoader import FashionMNISTDataLoader
from utils.models.CNN import CNNModel
from utils.tmatrix.transition_matrix import T_MATRIX_CIFAR

path_cifar = 'data/2024_A2_datasets/CIFAR10.npz'
data_loader_cifar = CIFAR10DataLoader(path=path_cifar, batch_size=64, sample_size=1, train_percentage=0.8, device=device)

train_loader, eval_loader, test_loader = data_loader_cifar.get_loaders()

print('smaple size:', data_loader_cifar.get_shape_of_sample())


CNN_Cifar = CNNModel(t_matrix=T_MATRIX_CIFAR(),
                    device=device,
                    num_classes=4,
                    input_shape=data_loader_cifar.get_shape_of_sample())


# CNN_Cifar.fit(train_loader, epochs=20) 




smaple size: (3, 32, 32)


In [12]:
from utils.tmatrix.transition_matrix import T_MATRIX_CIFAR  # Ensure correct import
from utils.models.hyperparametertuning import OptunaOptimization

# Define path to your dataset
path = 'data/2024_A2_datasets/CIFAR10.npz'  

# Initialize the transition matrix
t_matrix = T_MATRIX_CIFAR()

# Initialize the OptunaOptimization class with a small sample size for testing
optimizer = OptunaOptimization(
    path=path,
    study_name='CIFAR10',
    weights_path='weights/CIFAR10_test',
    device=device,
    t_matrix=t_matrix,
    repetitions=3,
    sample_size=1,  # 0.5% of the data for quick testing
    sample_shape = data_loader_cifar.get_shape_of_sample()
)

# Run optimization with a reduced number of trials for testing purposes
best_trial = optimizer.run_optimization(n_trials=50)  # Start with 10 trials for testing


[I 2024-10-31 14:53:35,090] Using an existing study with name 'CIFAR10' instead of creating a new one.
[I 2024-10-31 14:57:08,335] Trial 4 finished with value: 0.772712643678161 and parameters: {'lr': 0.0009536536530709598, 'dropout': 0.10330274730257508, 'kernel_size_conv1': 3, 'conv1_channels': 64, 'conv2_channels': 64, 'fc_size': 64, 'batch_size': 128, 'use_batch_norm': True, 'epochs': 29, 'add_l1': False, 'criterion': 'nf_land_rce'}. Best is trial 4 with value: 0.772712643678161.
[I 2024-10-31 15:08:36,344] Trial 5 finished with value: 0.3469880952380952 and parameters: {'lr': 0.004006172567258268, 'dropout': 0.4976928643440897, 'kernel_size_conv1': 5, 'conv1_channels': 64, 'conv2_channels': 64, 'fc_size': 256, 'batch_size': 32, 'use_batch_norm': True, 'epochs': 49, 'add_l1': False, 'criterion': 'cross_entropy'}. Best is trial 4 with value: 0.772712643678161.
[I 2024-10-31 15:16:05,721] Trial 6 finished with value: 0.7122942708333332 and parameters: {'lr': 3.451659818642591e-05, 'd

KeyboardInterrupt: 

In [39]:
import optuna 

# import the best parameters
study = optuna.load_study(study_name='CIFAR10', storage='sqlite:///cnn_hyperparameter_tuning.db')

print(study.best_params)

# eval the model

# Initialize the model with the best parameters

CNN_Cifar = CNNModel(t_matrix=T_MATRIX_CIFAR(),
                    device=device,
                    num_classes=4,
                    input_shape=data_loader_cifar.get_shape_of_sample(),
                    lr = study.best_params['lr'],
                    dropout=study.best_params['dropout'],
                    kernel_size_conv = study.best_params['kernel_size_conv1'], 
                    conv_channels = [study.best_params['conv1_channels'], study.best_params['conv2_channels']],
                    use_batch_norm=study.best_params['use_batch_norm'],
                    use_transition_matrix=True,
                    loss = study.best_params['criterion'],
                    fc_layers_sizes=[study.best_params['fc_size']])

# Load the best weights
CNN_Cifar.load_state_dict(torch.load('weights/CIFAR10_test/model0.8095746527777777.pth'))

correct = 0
total = 0

CNN_Cifar.eval()  # Set the model to evaluation mode
with torch.no_grad():
    for images, labels in train_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = CNN_Cifar(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f'Accuracy of the model on the test images: {100 * correct / total}%')

{'lr': 0.00023746676972336207, 'dropout': 0.24209237900577477, 'kernel_size_conv1': 2, 'conv1_channels': 128, 'conv2_channels': 128, 'fc_size': 128, 'batch_size': 128, 'use_batch_norm': True, 'epochs': 48, 'add_l1': False, 'criterion': 'nf_land_rce'}


  CNN_Cifar.load_state_dict(torch.load('weights/CIFAR10_test/model0.8095746527777777.pth'))


Accuracy of the model on the test images: 89.1125%


# MLP

In [9]:
import numpy as np
import pandas as pd
from collections import OrderedDict

from sklearn.metrics import accuracy_score
from sklearn.model_selection import KFold

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data
import torch.optim as optim
from torch.utils.data import Dataset,Subset, DataLoader, TensorDataset

In [10]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [11]:
def T_MATRIX_MNIST_0_6():
    return torch.tensor([
        [0.4, 0.2, 0.2, 0.2],
        [0.2, 0.4, 0.2, 0.2],
        [0.2, 0.2, 0.4, 0.2],
        [0.2, 0.2, 0.2, 0.4]
    ]).to(device)

def T_MATRIX_MNIST_0_3():
    return torch.tensor([
        [0.7, 0.3, 0, 0],
        [0, 0.7, 0.3, 0],
        [0, 0, 0.7, 0.3],
        [0.3, 0, 0, 0.7]
    ]).to(device)

def T_MATRIX_CIFAR10_1():
    return torch.tensor([[0.86189902, 0.10806329, 0.02108065, 0.00895717],
 [0.01167643, 0.90845531, 0.07811296, 0.00175436],
 [0.02824813, 0.02348635, 0.85186839, 0.09639635],
 [0.08581234, 0.02207433, 0.06435744, 0.8277564 ]]).to(device)

def T_MATRIX_CIFAR10_2():
    return torch.tensor([[0.75765127, 0.19187541, 0.03330756, 0.0171661 ], 
                         [0.0157136, 0.87488556, 0.10403936, 0.00536188], 
                         [0.03524161, 0.0154408, 0.80679089, 0.14252695], 
                         [0.13659286, 0.01401313, 0.07197482, 0.7774201 ]]).to(device)

In [12]:
class ClassifierMLP(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, dropout_p=0.5):
        super(ClassifierMLP, self).__init__()
        self.input_dim = input_dim
        # Define layers
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_dim, output_dim)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=dropout_p)

    def forward(self, x):
        # Flatten the input tensor to a 1D vector
        x = x.view(-1, self.input_dim)
        x = self.fc1(x)
        x = self.relu(x)
        # Apply dropout
        x = self.dropout(x)
        x = self.fc2(x)
        x = self.relu(x)
        return x

def noise_corrected_loss(output, labels, transition_matrix):
    # Create a criterion that calculates the CrossEntropyLoss for each element
    criterion = nn.CrossEntropyLoss(reduction='none')
    # Apply the criterion to get the initial loss values
    loss = criterion(output, labels)

    # Apply softmax to the output to get probability distributions
    softmax_output = torch.softmax(output, dim=-1)
    # Correct the predictions based on the transition matrix
    corrected_output = torch.matmul(softmax_output, transition_matrix)
    # Calculate the corrected loss using negative log likelihood
    corrected_loss = -torch.log(corrected_output[range(len(labels)), labels])
    # Return the mean of the corrected losses
    return torch.mean(corrected_loss)

def train_model(model, train_loader,val_loader, transition_matrix, num_epochs=10, learning_rate=1e-3, device=device):
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    model.train()
    accuracy_list = []
    val_acc_list = []
    loss_list = []
    for epoch in range(num_epochs):
        running_loss = 0.0
        all_predictions = []
        all_labels = []
        for data, labels in train_loader:
            data, labels = data.to(device), labels.to(device)

            output = model(data)
            loss = noise_corrected_loss(output, labels, transition_matrix)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            
            _, predicted = torch.max(output, 1)
            all_predictions.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
        
        
        # Calculate average loss for the epoch
        avg_loss = running_loss / len(train_loader)
        loss_list.append(avg_loss)

        # Calculate accuracy for the epoch
        accuracy = accuracy_score(all_labels, all_predictions)
        accuracy_list.append(accuracy)

        val_acc = validate_model(model, val_loader, device)
        val_acc_list.append(val_acc)

        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/100:.4f}, val: {val_acc:.4f}")

        

    return accuracy_list, loss_list, val_acc_list

def validate_model(model, val_loader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data, labels in val_loader:
            data, labels = data.to(device), labels.to(device)
            outputs = model(data)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    val_accuracy = correct / total
    return val_accuracy

In [13]:
def fashionDataLoader(path, percentage=0.1, batch_size=128, device='cpu' ):
    dataset_ = np.load(path)
    
    X_train_ = dataset_['X_tr']
    S_train_ = dataset_['S_tr']
    X_test_ = dataset_['X_ts']
    Y_test_ = dataset_['Y_ts'] 
    
    output_dim = np.unique(S_train_).shape[0]
    
    np.random.seed(0)
    n_samples_ = int(percentage * X_train_.shape[0])
    train_indices_ = np.random.choice(X_train_.shape[0], n_samples_, replace=False)
    val_indices_ = np.setdiff1d(np.arange(X_train_.shape[0]), train_indices_)

    #train set
    X_train_selected = X_train_[train_indices_]
    S_train_selected = S_train_[train_indices_]

    #validation set
    X_val_selected = X_train_[val_indices_]
    S_val_selected = S_train_[val_indices_]


    # Convert NumPy arrays to PyTorch tensors and move to device
    X_train_tensor = torch.tensor(X_train_selected, dtype=torch.float32).unsqueeze(1).to(device)
    X_train_tensor = (X_train_tensor - X_train_tensor.mean()) / X_train_tensor.std()
    S_train_tensor = torch.tensor(S_train_selected, dtype=torch.long).to(device)

    X_val_tensor = torch.tensor(X_val_selected, dtype=torch.float32).unsqueeze(1).to(device)
    X_val_tensor = (X_val_tensor - X_val_tensor.mean()) / X_val_tensor.std()
    S_val_tensor = torch.tensor(S_val_selected, dtype=torch.long).to(device)
    
    # Create TensorDataset and DataLoader
    train_dataset_ = TensorDataset(X_train_tensor, S_train_tensor)
    train_loader_ = DataLoader(train_dataset_, batch_size=batch_size, shuffle=True)

    val_dataset = TensorDataset(X_val_tensor, S_val_tensor)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    
    ### TEST SET ###
    
    # Convert NumPy arrays to PyTorch tensors and move to device
    X_test_tensor_ = torch.tensor(X_test_, dtype=torch.float32).unsqueeze(1).to(device)
    X_test_tensor_ = (X_test_tensor_ - X_test_tensor_.mean()) / X_test_tensor_.std()
    Y_test_tensor_ = torch.tensor(Y_test_, dtype=torch.long).to(device)
    
    # Create TensorDataset and DataLoader
    test_dataset_ = TensorDataset(X_test_tensor_, Y_test_tensor_)
    test_loader_ = DataLoader(test_dataset_, batch_size=batch_size, shuffle=False)
    
    return train_loader_, test_loader_, val_loader, output_dim

In [14]:
def train_model_fun(input_dim, hidden_dim, output_dim, dropout_p=0.5, train_loader=None, val_loader=None, transition_matrix = None, num_epochs=50, device=device):
    train_loader = train_loader
    val_loader = val_loader
    model = ClassifierMLP(input_dim=input_dim, hidden_dim=hidden_dim, output_dim=output_dim, dropout_p=dropout_p).to(device)
    accuracy_list, loss_list, val_acc_list = train_model(model, 
                                           train_loader, 
                                           val_loader,
                                           transition_matrix=transition_matrix, 
                                           num_epochs=num_epochs, 
                                           device=device)
    return model, accuracy_list, loss_list, val_acc_list
    
def evaluate_model_fun(model, test_loader, device):
    # Evaluate the model
    model.eval()  # Set the model to evaluation mode
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    test_accuracy = correct / total
    #print(f'Accuracy of the model on the test images: {100 * correct / total}%')
    return test_accuracy

In [15]:
def run_multiple_experiments(path, num_experiments, input_dim, hidden_dim, dropout_p=0.5, transition_matrix=None, num_epochs=50, percentage=0.8, batch_size=128, device='cpu'):
    all_accuracies = []
    experiment_results = []
    val_accuracy_list = []
    test_accuracy_list = []
    file_name = path.split('/')[-1]
    dataset_name = file_name.split('.')[:2]

    train_loader, test_loader, val_loader, output_dim = fashionDataLoader(path, 
                                                                          percentage=percentage, 
                                                                          batch_size=batch_size, 
                                                                          device=device)
    for i in range(num_experiments):
        
        model, accuracy_list, loss_list, val_acc_list = train_model_fun(
        input_dim = input_dim,
        hidden_dim = hidden_dim,
        output_dim= output_dim,
        dropout_p = dropout_p,
        train_loader = train_loader,
        val_loader= val_loader,
        transition_matrix = transition_matrix,
        num_epochs = num_epochs,
        device = device
        )

        val_acc = np.mean(val_acc_list)

        test_accuracy = evaluate_model_fun(model, test_loader, device)

        test_accuracy_list.append(test_accuracy)
        val_accuracy_list.append(val_acc)

    mean_val_accuracy = np.mean(val_accuracy_list)
    mean_accuracy = np.mean(test_accuracy_list)
    std_accuracy = np.std(test_accuracy_list)

    experiment_results.append({
        'dataset_name': dataset_name,
        'mean_val_accuracy': mean_val_accuracy,
        'mean_accuracy': mean_accuracy,
        'std_accuracy': std_accuracy
    })

    results_df = pd.DataFrame(experiment_results)
    return results_df

In [16]:
results_df_03 = run_multiple_experiments(
    path='data/2024_A2_datasets/FashionMNIST0.3.npz', 
    num_experiments=10,
    input_dim=28*28, 
    hidden_dim=64, 
    transition_matrix=T_MATRIX_MNIST_0_3(), 
    num_epochs=50, 
    device=device
)

print(results_df_03)

results_df_06 = run_multiple_experiments(
    path='data/2024_A2_datasets/FashionMNIST0.6.npz', 
    num_experiments=10,
    input_dim=28*28, 
    hidden_dim=64, 
    transition_matrix=T_MATRIX_MNIST_0_6(), 
    num_epochs=50, 
    device=device
)

print(results_df_06)

results_df_cifa = run_multiple_experiments(
    path='data/2024_A2_datasets/CIFAR10.npz', 
    num_experiments=10,
    input_dim=32*32*3, 
    hidden_dim=256, 
    transition_matrix=T_MATRIX_CIFAR10_2(), 
    num_epochs=50, 
    device=device
)

result = pd.concat([results_df_03, results_df_06, results_df_cifa])

Epoch 1/50, Loss: 1.2642, val: 0.6675
Epoch 2/50, Loss: 1.1030, val: 0.6769
Epoch 3/50, Loss: 1.0822, val: 0.6756
Epoch 4/50, Loss: 1.0672, val: 0.6763
Epoch 5/50, Loss: 1.0573, val: 0.6787
Epoch 6/50, Loss: 1.0434, val: 0.6817
Epoch 7/50, Loss: 1.0334, val: 0.6802
Epoch 8/50, Loss: 1.0316, val: 0.6806
Epoch 9/50, Loss: 1.0175, val: 0.6835
Epoch 10/50, Loss: 1.0081, val: 0.6781
Epoch 11/50, Loss: 1.0047, val: 0.6840
Epoch 12/50, Loss: 0.9958, val: 0.6777
Epoch 13/50, Loss: 0.9953, val: 0.6817
Epoch 14/50, Loss: 0.9889, val: 0.6783
Epoch 15/50, Loss: 0.9851, val: 0.6846
Epoch 16/50, Loss: 0.9812, val: 0.6856
Epoch 17/50, Loss: 0.9745, val: 0.6823
Epoch 18/50, Loss: 0.9710, val: 0.6800
Epoch 19/50, Loss: 0.9699, val: 0.6850
Epoch 20/50, Loss: 0.9675, val: 0.6819
Epoch 21/50, Loss: 0.9633, val: 0.6775
Epoch 22/50, Loss: 0.9575, val: 0.6802
Epoch 23/50, Loss: 0.9544, val: 0.6833
Epoch 24/50, Loss: 0.9510, val: 0.6825
Epoch 25/50, Loss: 0.9471, val: 0.6731
Epoch 26/50, Loss: 0.9475, val: 0.

In [17]:
print(result)

         dataset_name  mean_val_accuracy  mean_accuracy  std_accuracy
0  [FashionMNIST0, 3]           0.681590       0.939250      0.002624
0  [FashionMNIST0, 6]           0.369191       0.818275      0.012256
0      [CIFAR10, npz]           0.611625       0.683350      0.006382
