<a href="https://colab.research.google.com/github/BharathSShankar/DSA4212_Assignments/blob/bharath-exp/BayesianNeuralNet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install medmnist
!python -m medmnist download

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting medmnist
  Downloading medmnist-2.2.1-py3-none-any.whl (21 kB)
Collecting fire
  Downloading fire-0.5.0.tar.gz (88 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m88.3/88.3 kB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: fire
  Building wheel for fire (setup.py) ... [?25l[?25hdone
  Created wheel for fire: filename=fire-0.5.0-py2.py3-none-any.whl size=116952 sha256=6da8b7bbc87c339f612ba3df53897bb5fce68a006b7aba31caac2042ddc877ff
  Stored in directory: /root/.cache/pip/wheels/f7/f1/89/b9ea2bf8f80ec027a88fef1d354b3816b4d3d29530988972f6
Successfully built fire
Installing collected packages: fire, medmnist
Successfully installed fire-0.5.0 medmnist-2.2.1
Downloading pathmnist...
Downloading https://zenodo.org/record/6496656/files/pathmnist.npz?download=1 to /root/.

In [2]:
from tqdm import tqdm
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import torchvision.transforms as transforms
import torch.nn.functional as F
from torchvision import datasets, transforms
from tqdm import tqdm, trange

In [3]:
from google.colab import drive
drive.mount('/content/drive')
!cd "/content/drive/MyDrive/DSA4212/Assignment 3"

Mounted at /content/drive


In [4]:
import medmnist
from medmnist import INFO, Evaluator

In [5]:
data_flag = 'pathmnist'
# data_flag = 'breastmnist'
download = True

NUM_EPOCHS = 3
BATCH_SIZE = 128
lr = 0.001

info = INFO[data_flag]
task = info['task']
n_channels = info['n_channels']
n_classes = len(info['label'])

DataClass = getattr(medmnist, info['python_class'])


In [6]:
# preprocessing
data_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[.5], std=[.5])
])

# load the data
train_dataset = DataClass(split='train', transform=data_transform, download=download)
test_dataset = DataClass(split='test', transform=data_transform, download=download)

pil_dataset = DataClass(split='train', download=download)

# encapsulate data into dataloader form
train_loader = data.DataLoader(dataset=train_dataset, batch_size=BATCH_SIZE, shuffle=True)
train_loader_at_eval = data.DataLoader(dataset=train_dataset, batch_size=2*BATCH_SIZE, shuffle=False)
test_loader = data.DataLoader(dataset=test_dataset, batch_size=2*BATCH_SIZE, shuffle=False)

Using downloaded and verified file: /root/.medmnist/pathmnist.npz
Using downloaded and verified file: /root/.medmnist/pathmnist.npz
Using downloaded and verified file: /root/.medmnist/pathmnist.npz


In [7]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
PI = 0.5
SIGMA_1 = torch.FloatTensor([np.exp(1)]).to(DEVICE)
SIGMA_2 = torch.FloatTensor([np.exp(-4)]).to(DEVICE)

In [8]:
class Gaussian(object):
    def __init__(self, mu, rho):
        super().__init__()
        self.mu = mu
        self.rho = rho
        self.normal = torch.distributions.Normal(0,1)
    
    @property
    def sigma(self):
        return torch.log1p(torch.exp(self.rho))
    
    def sample(self):
        epsilon = self.normal.sample(self.rho.size()).to(DEVICE)
        return self.mu + self.sigma * epsilon
    
    def log_prob(self, input):
        return (-np.log(np.sqrt(2 * np.pi))
                - torch.log(self.sigma)
                - ((input - self.mu) ** 2) / (2 * self.sigma ** 2)).sum()

In [9]:
class ScaleMixtureGaussian(object):
    def __init__(self, pi, sigma1, sigma2):
        super().__init__()
        self.pi = pi
        self.sigma1 = sigma1
        self.sigma2 = sigma2
        self.gaussian1 = torch.distributions.Normal(0,sigma1)
        self.gaussian2 = torch.distributions.Normal(0,sigma2)
    
    def log_prob(self, input):
        prob1 = torch.exp(self.gaussian1.log_prob(input))
        prob2 = torch.exp(self.gaussian2.log_prob(input))
        return (torch.log(self.pi * prob1 + (1-self.pi) * prob2)).sum()

In [18]:
class BayesianDense(nn.Module):
    def __init__(self, in_features, out_features):
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features

        self.weight_mu = nn.Parameter(torch.Tensor(out_features, in_features).uniform_(-0.1, 0.1))
        self.weight_rho = nn.Parameter(torch.Tensor(out_features, in_features).uniform_(-8,-6))
        self.weight = Gaussian(self.weight_mu, self.weight_rho)

        self.bias_mu = nn.Parameter(torch.Tensor(out_features).uniform_(-0.1, 0.1))
        self.bias_rho = nn.Parameter(torch.Tensor(out_features).uniform_(-8,-6))
        self.bias = Gaussian(self.bias_mu, self.bias_rho)

        self.weight_prior = ScaleMixtureGaussian(PI, SIGMA_1, SIGMA_2)
        self.bias_prior = ScaleMixtureGaussian(PI, SIGMA_1, SIGMA_2)
        self.log_prior = 0
        self.log_variational_posterior = 0

    def forward(self, input, sample=False, calculate_log_probs=False):
        if self.training or sample:
            weight = self.weight.sample()
            bias = self.bias.sample()
        else:
            weight = self.weight.mu
            bias = self.bias.mu
        if self.training or calculate_log_probs:
            self.log_prior = self.weight_prior.log_prob(weight) + self.bias_prior.log_prob(bias)
            self.log_variational_posterior = self.weight.log_prob(weight) + self.bias.log_prob(bias)
        else:
            self.log_prior, self.log_variational_posterior = 0, 0

        return F.linear(input, weight, bias)

In [19]:
class BayesianConv2D(nn.Module):
    def __init__(self, in_channels, out_channels, k_size):
        super().__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.k_size = k_size

        self.weight_mu = nn.Parameter(torch.Tensor(out_channels, in_channels, k_size, k_size).uniform_(-0.1, 0.1))
        self.weight_rho = nn.Parameter(torch.Tensor(out_channels, in_channels, k_size,k_size).uniform_(-8,-6))
        self.weight = Gaussian(self.weight_mu, self.weight_rho)

        self.bias_mu = nn.Parameter(torch.Tensor(out_channels).uniform_(-0.1, 0.1))
        self.bias_rho = nn.Parameter(torch.Tensor(out_channels).uniform_(-8,-6))
        self.bias = Gaussian(self.bias_mu, self.bias_rho)

        self.weight_prior = ScaleMixtureGaussian(PI, SIGMA_1, SIGMA_2)
        self.bias_prior = ScaleMixtureGaussian(PI, SIGMA_1, SIGMA_2)
        self.log_prior = 0
        self.log_variational_posterior = 0

    def forward(self, input, sample=False, calculate_log_probs=False):
        if self.training or sample:
            weight = self.weight.sample()
            bias = self.bias.sample()
        else:
            weight = self.weight.mu
            bias = self.bias.mu
        if self.training or calculate_log_probs:
            self.log_prior = self.weight_prior.log_prob(weight) + self.bias_prior.log_prob(bias)
            self.log_variational_posterior = self.weight.log_prob(weight) + self.bias.log_prob(bias)
        else:
            self.log_prior, self.log_variational_posterior = 0, 0

        return F.conv2d(input, weight, bias)

In [20]:
class BayesianNeuralNetFC(nn.Module):
    def __init__(self, layers_dims, input_size, n_classes):
        super().__init__()
        self.inputLayer = BayesianDense(input_size, layers_dims[0])
        layer_list = []
        for i in range(1, len(layers_dims)):
            layer_list.append(BayesianDense(layers_dims[i - 1], layers_dims[i]))
        self.linears = nn.ModuleList(layer_list)
        self.outputLayer = BayesianDense(layers_dims[-1], n_classes)
    
    def forward(self, input, sample=False, calculate_log_probs=False):
        x = nn.Flatten()(input)
        x = self.inputLayer(x, sample, calculate_log_probs)
        x = F.relu(x)
        x = nn.Dropout(p = 0.1)(x)
        for layer in self.linears:
            x = layer(x, sample, calculate_log_probs)
            x = F.relu(x)
            x = nn.Dropout(p = 0.1)(x)
        x = self.outputLayer(x)
        x = F.softmax(x, dim = 1)
        return x

In [21]:
def train_bayesian_net(net, train_loader, test_loader, n_epochs=20, lr=3e-4, log_interval=10):
    # Define loss function and optimizer
    loss_func = nn.CrossEntropyLoss()
    optimizer = optim.Adam(net.parameters(), lr=lr)

    # Move model to device
    net.to(DEVICE)

    # Training loop
    for epoch in range(n_epochs):
        # Training mode
        net.train()
        train_loss = 0
        correct = 0
        total = 0
        with tqdm(train_loader, desc=f"Epoch {epoch+1}") as t:
            for batch_idx, (data, target) in enumerate(t):
                data, target = data.to(DEVICE), target.to(DEVICE)

                # Forward pass
                optimizer.zero_grad()
                outputs = net(data)

                # Compute loss and update model
                loss = loss_func(outputs, target.T[0])
                loss.backward()
                optimizer.step()

                # Update training statistics
                train_loss += loss.item()
                _, predicted = outputs.max(1)
                total += data.shape[0]
                correct += predicted.eq(target.T[0]).sum().item()

                # Log training progress
                if batch_idx % log_interval == 0:
                    train_acc = correct / total
                    train_loss /= log_interval
                    t.set_postfix(loss=f"{train_loss:.6f}", accuracy=f"{train_acc:.2f}")
                    train_loss = 0
                    correct = 0
                    total = 0

        # Evaluation mode
        net.eval()
        test_loss = 0
        correct = 0
        total = 0
        with torch.no_grad():
            for data, target in test_loader:
                data, target = data.to(DEVICE), target.to(DEVICE)

                # Forward pass
                outputs = net(data)

                # Compute loss and update evaluation statistics
                loss = loss_func(outputs, target.T[0])
                test_loss += loss.item()
                _, predicted = outputs.max(1)
                total += data.shape[0]
                correct += predicted.eq(target.T[0]).sum().item()

        # Log evaluation statistics
        test_acc = 100. * correct / total
        test_loss /= len(test_loader)
        print('Test set: Average loss: {:.4f}, Accuracy: {:.2f}%\n'.format(
            test_loss, test_acc))



In [22]:
net = BayesianNeuralNetFC([128, 256, 64], 3 * 28 * 28, 9)
train_bayesian_net(net, train_loader, test_loader, 10)

Epoch 1: 100%|██████████| 704/704 [00:37<00:00, 18.79it/s, accuracy=0.47, loss=1.903054]


Test set: Average loss: 1.7715, Accuracy: 59.90%



Epoch 2: 100%|██████████| 704/704 [00:35<00:00, 19.81it/s, accuracy=0.49, loss=1.872848]


Test set: Average loss: 1.7963, Accuracy: 57.21%



Epoch 3: 100%|██████████| 704/704 [00:35<00:00, 19.65it/s, accuracy=0.51, loss=1.849811]


Test set: Average loss: 1.7782, Accuracy: 58.84%



Epoch 4: 100%|██████████| 704/704 [00:35<00:00, 19.65it/s, accuracy=0.53, loss=1.840323]


Test set: Average loss: 1.8137, Accuracy: 55.45%



Epoch 5:  93%|█████████▎| 655/704 [00:33<00:02, 19.78it/s, accuracy=0.54, loss=1.828342]


KeyboardInterrupt: ignored

In [None]:
net= nn.Sequential(
    nn.Flatten(),
    nn.LazyLinear(128),
    nn.ReLU(),
    nn.Dropout(0.1),
    nn.LazyLinear(256),
    nn.ReLU(),
    nn.Dropout(0.1),
    nn.LazyLinear(64),
    nn.ReLU(),
    nn.Dropout(0.1),
    nn.LazyLinear(9),
    nn.Softmax(dim = 1)
).to(DEVICE)
train_bayesian_net(net, train_loader, test_loader, 10)

In [29]:
class BayesianNeuralNetConv(nn.Module):
    def __init__(self, channel_list, input_channels, n_classes):
        super().__init__()
        self.inputLayer = BayesianConv2D(input_channels, channel_list[0], k_size=3)
        layer_list = []
        for i in range(1, len(channel_list)):
            layer_list.append(BayesianConv2D(channel_list[i - 1], channel_list[i], k_size=1))
        self.convs = nn.ModuleList(layer_list)
        self.fc = BayesianDense(channel_list[-1] * 9, n_classes)
    
    def forward(self, input, sample=False, calculate_log_probs=False):
        x = self.inputLayer(input, sample, calculate_log_probs)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        for conv in self.convs:
            x = conv(x, sample, calculate_log_probs)
            x = F.relu(x)
            x = F.max_pool2d(x, 2)
        x = nn.Flatten()(x)
        x = self.fc(x, sample, calculate_log_probs)
        x = F.softmax(x, dim=1)
        return x


In [None]:
net = BayesianNeuralNetConv([128, 256, 512], 3, 9)
train_bayesian_net(net, train_loader, test_loader)

Epoch 1: 100%|██████████| 704/704 [00:46<00:00, 15.23it/s, accuracy=0.60, loss=1.778098]


Test set: Average loss: 1.7125, Accuracy: 66.63%



Epoch 2: 100%|██████████| 704/704 [00:45<00:00, 15.38it/s, accuracy=0.65, loss=1.728691]


Test set: Average loss: 1.6676, Accuracy: 71.32%



Epoch 3: 100%|██████████| 704/704 [00:44<00:00, 15.77it/s, accuracy=0.75, loss=1.633482]


Test set: Average loss: 1.6212, Accuracy: 76.74%



Epoch 4: 100%|██████████| 704/704 [00:44<00:00, 15.83it/s, accuracy=0.77, loss=1.608163]


Test set: Average loss: 1.6162, Accuracy: 76.55%



Epoch 5: 100%|██████████| 704/704 [00:44<00:00, 15.73it/s, accuracy=0.80, loss=1.577386]


Test set: Average loss: 1.6046, Accuracy: 78.26%



Epoch 6: 100%|██████████| 704/704 [00:44<00:00, 15.85it/s, accuracy=0.83, loss=1.551932]
