In [1]:
import numpy as np
import random

import os

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torch.nn.functional as F
from torch.utils.data import DataLoader

from netcal.metrics import ECE

import torchbnn as bnn

import matplotlib.pyplot as plt
from tqdm import tqdm

## Setting

In [2]:
b_size = 64 

random.seed(1)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

## Load Data

In [3]:
class AddGaussianNoise(object):
    def __init__(self, mean=0., std=0.5):
        self.mean = mean
        self.std = std

    def __call__(self, tensor):
        return tensor + torch.randn(tensor.size()) * self.std + self.mean

    def __repr__(self):
        return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)

train = torchvision.datasets.MNIST('./data/', train=True, download=True,
								transform=torchvision.transforms.Compose([
										torchvision.transforms.ToTensor(),
										torchvision.transforms.Normalize((0.1307,), (0.3081,))
								]))

test = torchvision.datasets.MNIST('./data/', train=False, download=True,
								transform=torchvision.transforms.Compose([
										torchvision.transforms.ToTensor(),
										torchvision.transforms.Normalize((0.1307,), (0.3081,)),
                                        AddGaussianNoise(0., .5)
								]))

train_loader = DataLoader(train, batch_size=b_size)
test_loader = DataLoader(test, batch_size=len(test))

## Model Architecture

In [4]:
class BayesianMnistNet(nn.Module):
    def __init__(self):
        super(BayesianMnistNet, self).__init__()
        self.conv1 = bnn.BayesConv2d(prior_mu=0., prior_sigma=.1, in_channels=1, out_channels=32, kernel_size=3, padding=1)
        self.conv2 = bnn.BayesConv2d(prior_mu=0., prior_sigma=.1, in_channels=32, out_channels=64, kernel_size=3, padding=1)
        self.fc1 = bnn.BayesLinear(prior_mu=0., prior_sigma=.1, in_features=64 * 7 * 7, out_features=128)
        self.fc2 = bnn.BayesLinear(prior_mu=0., prior_sigma=.1, in_features=128, out_features=10)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.relu = nn.ReLU()
        self.flatten = nn.Flatten()

    def forward(self, x):
        x = self.relu(self.conv1(x))
        x = self.pool(x)
        x = self.relu(self.conv2(x))
        x = self.pool(x)
        x = self.flatten(x)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [5]:
class NonBayesianMnistNet(nn.Module):
    def __init__(self):
        super(NonBayesianMnistNet, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(in_features=64 * 7 * 7, out_features=128)
        self.fc2 = nn.Linear(in_features=128, out_features=10)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.relu = nn.ReLU()
        self.flatten = nn.Flatten()

    def forward(self, x):
        x = self.relu(self.conv1(x))
        x = self.pool(x)
        x = self.relu(self.conv2(x))
        x = self.pool(x)
        x = self.flatten(x)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        return x

## Training

In [6]:
bnn_model = BayesianMnistNet().to(device)
ce = nn.CrossEntropyLoss()
kl = bnn.BKLLoss(reduction='mean', last_layer_only=False)
kl_weight = 1.
n_epoch = 10

optimizer = optim.Adam(bnn_model.parameters(), lr=1e-2)

In [7]:
ce_losses = []
kl_losses = []
losses = []
bnn_epoch_losses = []

for epoch in range(n_epoch):
    bnn_model.train()
    epoch_loss = 0.
    with tqdm(total=len(train_loader), desc=f'Epoch {epoch+1}/{n_epoch}', leave=True) as pbar:
            for batch_id, sampl in enumerate(train_loader):
                    imgs, labels = sampl
                    imgs, labels = imgs.to(device), labels.to(device)

                    pred = bnn_model(imgs)
                    ce_loss = ce(pred, labels)
                    kl_loss = kl(bnn_model)
                    total_loss = ce_loss + kl_weight*kl_loss
                    
                    ce_loss_cpu = ce_loss.detach().cpu().item()
                    kl_loss_cpu = kl_loss.detach().cpu().item()
                    total_loss_cpu = total_loss.detach().cpu().item()
                    kl_losses.append(kl_loss_cpu)
                    ce_losses.append(ce_loss_cpu)
                    losses.append(total_loss_cpu)         

                    optimizer.zero_grad()
                    total_loss.backward()
                    optimizer.step()

                    epoch_loss += total_loss.item()
                    pbar.set_postfix({'kl Loss': f'{kl_loss.detach().cpu().item():.4f}',
									  'Total Loss': f'{total_loss.detach().cpu().item():.4f}'})
                    pbar.update(1)
					
    print(f'Epoch {epoch+1}/{n_epoch} - Epoch Loss: {epoch_loss/len(train_loader)}')
    bnn_epoch_losses.append(epoch_loss/len(train_loader))

models = []
models.append(bnn_model)
torch.save(bnn_model.state_dict(), f'./checkpoint/noisydata/bnn_model.pth')


Epoch 1/10: 100%|██████████| 938/938 [00:13<00:00, 68.83it/s, kl Loss=0.1253, Total Loss=1.7407]


Epoch 1/10 - Epoch Loss: 2.7643949362133613


Epoch 2/10: 100%|██████████| 938/938 [00:15<00:00, 62.37it/s, kl Loss=0.1195, Total Loss=0.3926]


Epoch 2/10 - Epoch Loss: 0.7217386314736755


Epoch 3/10:  99%|█████████▉| 929/938 [00:14<00:00, 63.35it/s, kl Loss=0.0975, Total Loss=0.5120]


KeyboardInterrupt: 

In [None]:
nonbnn_model = NonBayesianMnistNet().to(device)
optimizer = optim.Adam(nonbnn_model.parameters(), lr=1e-2)

nonbnn_epoch_losses = []

for epoch in range(n_epoch):
    nonbnn_model.train()
    epoch_loss = 0.
    with tqdm(total=len(train_loader), desc=f'Epoch {epoch+1}/{n_epoch}', leave=True) as pbar:
            for batch_id, sampl in enumerate(train_loader):
                    imgs, labels = sampl
                    imgs, labels = imgs.to(device), labels.to(device)

                    pred = nonbnn_model(imgs)
                    ce_loss = ce(pred, labels)

                    ce_loss_cpu = ce_loss.detach().cpu().item()
                    ce_losses.append(ce_loss_cpu)

                    optimizer.zero_grad()
                    ce_loss.backward()
                    optimizer.step()

                    epoch_loss += ce_loss.item()
                    pbar.set_postfix({'Total Loss': f'{ce_loss.detach().cpu().item():.4f}'})
                    pbar.update(1)

    print(f'Epoch {epoch+1}/{n_epoch} - Epoch Loss: {epoch_loss/len(train_loader)}')
    nonbnn_epoch_losses.append(epoch_loss/len(train_loader))

torch.save(nonbnn_model.state_dict(), f'./checkpoint/noisydata/nonbnn_model.pth')

Epoch 1/10: 100%|██████████| 938/938 [00:13<00:00, 67.09it/s, Total Loss=0.0080]


Epoch 1/10 - Epoch Loss: 0.23186591703721238


Epoch 2/10: 100%|██████████| 938/938 [00:13<00:00, 70.63it/s, Total Loss=0.0040]


Epoch 2/10 - Epoch Loss: 0.12146881685562697


Epoch 3/10: 100%|██████████| 938/938 [00:13<00:00, 71.59it/s, Total Loss=0.0031]


Epoch 3/10 - Epoch Loss: 0.10241588324611751


Epoch 4/10: 100%|██████████| 938/938 [00:13<00:00, 71.50it/s, Total Loss=0.0220]


Epoch 4/10 - Epoch Loss: 0.09357685144922236


Epoch 5/10: 100%|██████████| 938/938 [00:13<00:00, 71.87it/s, Total Loss=0.0046]


Epoch 5/10 - Epoch Loss: 0.09353900067903201


Epoch 6/10: 100%|██████████| 938/938 [00:13<00:00, 70.76it/s, Total Loss=0.0004]


Epoch 6/10 - Epoch Loss: 0.08771455064255054


Epoch 7/10: 100%|██████████| 938/938 [00:13<00:00, 70.28it/s, Total Loss=0.0026]


Epoch 7/10 - Epoch Loss: 0.08654229805859413


Epoch 8/10: 100%|██████████| 938/938 [00:13<00:00, 71.82it/s, Total Loss=0.0005]


Epoch 8/10 - Epoch Loss: 0.08089353621512009


Epoch 9/10: 100%|██████████| 938/938 [00:13<00:00, 69.84it/s, Total Loss=0.0670]


Epoch 9/10 - Epoch Loss: 0.07730662154477587


Epoch 10/10: 100%|██████████| 938/938 [00:13<00:00, 68.96it/s, Total Loss=0.0001]

Epoch 10/10 - Epoch Loss: 0.07914280973125211





## Evaluation

In [None]:
from sklearn.calibration import calibration_curve

# Evaluate models and calculate ECE using sklearn
def evaluate_model(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    all_probs = []
    all_labels = []
    with torch.no_grad():
        for imgs, labels in test_loader:
            imgs, labels = imgs.to(device), labels.to(device)
            outputs = model(imgs)
            probs = F.softmax(outputs, dim=1)
            all_probs.append(probs.cpu().numpy())
            all_labels.append(labels.cpu().numpy())
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    all_probs = np.concatenate(all_probs)
    all_labels = np.concatenate(all_labels)
    accuracy = 100 * correct / total
    return accuracy, all_probs, all_labels

test_loader = DataLoader(test, batch_size=len(test), shuffle=False)

bnn_model = BayesianMnistNet()
nonbnn_model = NonBayesianMnistNet()
bnn_state_dict_path = f'./checkpoint/noisydata/bnn_model.pth'
nonbnn_state_dict_path = f'./checkpoint/noisydata/nonbnn_model.pth'
bnn_model.load_state_dict(torch.load(bnn_state_dict_path))
nonbnn_model.load_state_dict(torch.load(nonbnn_state_dict_path))

# Evaluate BNN
bnn_accuracy, bnn_probs, bnn_labels = evaluate_model(bnn_model, test_loader)
print(f'Bayesian Model - Accuracy: {bnn_accuracy:.2f}%')

# Evaluate Non-BNN
nonn_accuracy, nonn_probs, nonn_labels = evaluate_model(nonbnn_model, test_loader)
print(f'Non-Bayesian Model - Accuracy: {nonn_accuracy:.2f}%')

# Plot calibration curve
def plot_calibration_curve(probs1, labels1, probs2, labels2, title1, title2):
    prob_true1, prob_pred1 = calibration_curve(labels1, probs1.max(axis=1), n_bins=15)
    prob_true2, prob_pred2 = calibration_curve(labels2, probs2.max(axis=1), n_bins=15)

    plt.figure(figsize=(10, 5))

    plt.subplot(1, 2, 1)
    plt.plot(prob_pred1, prob_true1, marker='o', label='Bayesian Model')
    plt.plot([0, 1], [0, 1], linestyle='--')
    plt.title(title1)
    plt.xlabel('Predicted Probability')
    plt.ylabel('True Probability')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(prob_pred2, prob_true2, marker='o', label='Non-Bayesian Model')
    plt.plot([0, 1], [0, 1], linestyle='--')
    plt.title(title2)
    plt.xlabel('Predicted Probability')
    plt.ylabel('True Probability')
    plt.legend()

    plt.tight_layout()
    plt.show()

plot_calibration_curve(bnn_probs, bnn_labels, nonn_probs, nonn_labels, 
                       'Bayesian Model Calibration Curve', 'Non-Bayesian Model Calibration Curve')


RuntimeError: Input type (torch.cuda.FloatTensor) and weight type (torch.FloatTensor) should be the same

In [None]:
from sklearn.calibration import calibration_curve
from sklearn.preprocessing import label_binarize

# Evaluate models and calculate ECE using sklearn
def evaluate_model(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    all_probs = []
    all_labels = []
    with torch.no_grad():
        for imgs, labels in test_loader:
            imgs, labels = imgs.to(device), labels.to(device)
            outputs = model(imgs)
            probs = F.softmax(outputs, dim=1)
            all_probs.append(probs.cpu().numpy())
            all_labels.append(labels.cpu().numpy())
            _, predicted = torch.max(probs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    all_probs = np.concatenate(all_probs)
    all_labels = np.concatenate(all_labels)
    accuracy = 100 * correct / total
    return accuracy, all_probs, all_labels

test_loader = DataLoader(test, batch_size=len(test), shuffle=False)

# Evaluate BNN
bnn_accuracy, bnn_probs, bnn_labels = evaluate_model(bnn_model, test_loader)
print(f'Bayesian Model - Accuracy: {bnn_accuracy:.2f}%')

# Evaluate Non-BNN
nonn_accuracy, nonn_probs, nonn_labels = evaluate_model(nonbnn_model, test_loader)
print(f'Non-Bayesian Model - Accuracy: {nonn_accuracy:.2f}%')

# Plot calibration curve
def plot_calibration_curve(probs1, labels1, probs2, labels2, title1, title2):
    n_classes = 10
    labels1_bin = label_binarize(labels1, classes=range(n_classes))
    labels2_bin = label_binarize(labels2, classes=range(n_classes))

    plt.figure(figsize=(10, 5))

    plt.subplot(1, 2, 1)
    for i in range(n_classes):
        prob_true, prob_pred = calibration_curve(labels1_bin[:, i], probs1[:, i], n_bins=20)
        plt.plot(prob_pred, prob_true, marker='o', label=f'Class {i}')
    plt.plot([0, 1], [0, 1], linestyle='--')
    plt.title(title1)
    plt.xlabel('Predicted Probability')
    plt.ylabel('True Probability')
    plt.legend()

    plt.subplot(1, 2, 2)
    for i in range(n_classes):
        prob_true, prob_pred = calibration_curve(labels2_bin[:, i], probs2[:, i], n_bins=20)
        plt.plot(prob_pred, prob_true, marker='o', label=f'Class {i}')
    plt.plot([0, 1], [0, 1], linestyle='--')
    plt.title(title2)
    plt.xlabel('Predicted Probability')
    plt.ylabel('True Probability')
    plt.legend()

    plt.tight_layout()
    plt.show()

plot_calibration_curve(bnn_probs, bnn_labels, nonn_probs, nonn_labels, 
                       'Bayesian Model Calibration Curve', 'Non-Bayesian Model Calibration Curve')


NameError: name 'DataLoader' is not defined