In [None]:
! git clone https://github.com/OATML/RHO-Loss.git

Cloning into 'RHO-Loss'...
remote: Enumerating objects: 449, done.[K
remote: Counting objects: 100% (449/449), done.[K
remote: Compressing objects: 100% (271/271), done.[K
remote: Total 449 (delta 187), reused 411 (delta 162), pack-reused 0 (from 0)[K
Receiving objects: 100% (449/449), 267.26 KiB | 2.75 MiB/s, done.
Resolving deltas: 100% (187/187), done.


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import time
import os
import numpy as np
from torchsummary import summary
from torch.nn.functional import softmax
import torch.nn.functional as F

In [None]:
!pip install pytorch-lightning



In [None]:
%cd RHO-Loss/

/content/RHO-Loss


In [None]:
from torchvision import transforms

transform = transforms.Compose(
    [transforms.RandomHorizontalFlip(),
     transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

trainset = torchvision.datasets.CIFAR100(root='./data', train=True,
                                        download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=128,
                                          shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR100(root='./data', train=False,
                                       download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=128,
                                         shuffle=False, num_workers=2)


In [None]:
import torch
import torch.nn as nn

class BNL(nn.Module):
    """
        Bayesian Normalization Layer (BNL).

    This layer replaces traditional normalization layers like BatchNorm,
    LayerNorm, and InstanceNorm. It adapts normalization to account for Bayesian
    inference, making the model more robust to variations and uncertainties in
    the data.

    BNL adds gaussian noise during both inference and trainig stages.

    This implementation includes parameters named `weight` and `bias` to directly
    match those used in PyTorch's BatchNorm, LayerNorm, and InstanceNorm layers
    for compatibility when loading state dictionaries.

    Args:
        num_features (int, list, tuple): Number of features in the input, matches channels
                                         in conv layers or features in linear layers. Can
                                         be a single integer or a list/tuple for complex scenarios.
    """
    def __init__(self, num_features):
        super(BNL, self).__init__()
        # Check if num_features is a list or tuple, convert if necessary
        if isinstance(num_features, int):
            num_features = (num_features,)

        self.num_features = num_features
        self.weight = nn.Parameter(torch.ones(num_features))
        self.bias = nn.Parameter(torch.zeros(num_features))
        self.eps = 1e-5

    def forward(self, x):
        if len(self.num_features) == 1:  # Traditional usage like BatchNorm
            mean = x.mean([0, 2, 3], keepdim=True) if x.dim() == 4 else x.mean(0, keepdim=True)
            var = x.var([0, 2, 3], keepdim=True) if x.dim() == 4 else x.var(0, keepdim=True)
            x_normalized = (x - mean) / torch.sqrt(var + self.eps)

            noise = torch.randn(self.weight.shape, device=x.device)
            gamma_noisy = self.weight * (1 + noise)

            if x.dim() == 4:
                gamma_noisy = gamma_noisy.view(1, -1, 1, 1)
                bias = self.bias.view(1, -1, 1, 1)
            elif x.dim() == 2:
                gamma_noisy = gamma_noisy.view(1, -1)
                bias = self.bias.view(1, -1)

            return gamma_noisy * x_normalized + bias
        else:  # LayerNorm-like usage
            mean = x.mean(dim=tuple(range(x.dim())[1:]), keepdim=True)
            var = x.var(dim=tuple(range(x.dim())[1:]), keepdim=True, unbiased=False)
            x_normalized = (x - mean) / torch.sqrt(var + self.eps)

            noise = torch.randn(self.weight.shape, device=x.device)
            gamma_noisy = self.weight * (1 + noise)

            weight = self.weight.view((1,) + self.num_features + (1,) * (x.dim() - len(self.num_features) - 1))
            bias = self.bias.view((1,) + self.num_features + (1,) * (x.dim() - len(self.num_features) - 1))

            return gamma_noisy * x_normalized + bias

In [None]:
class ABNNLoss(torch.nn.Module):
    def __init__(self, Num_classes, model_parameters, Weight_decay=1e-4):
        super(ABNNLoss, self).__init__()
        self.model_parameters = model_parameters
        self.Weight_decay = Weight_decay
        self.eta = nn.Parameter(torch.ones(Num_classes))

    def forward(self, outputs, labels):
        # Calculate the three loss components
        nll_loss = self.negative_log_likelihood(outputs, labels)
        log_prior_loss = self.negative_log_prior(self.model_parameters, self.Weight_decay)
        custom_ce_loss = self.custom_cross_entropy_loss(outputs, labels, self.eta)

        # Sum up all three components to form the ABNN loss
        total_loss = nll_loss + log_prior_loss + custom_ce_loss
        return total_loss

    @staticmethod
    def negative_log_likelihood(outputs, labels):
        # Negative Log Likelihood (NLL) or MLE Loss:
        # NLL = -∑ log P(y_i | x_i, ω)
        return torch.nn.functional.cross_entropy(outputs, labels)

    def negative_log_prior(self, model_parameters, Weight_decay=1e-4):
        # Negative Log Prior with Gaussian Prior (L2 Regularization):
        # log P(ω) = λ ∑ ω^2 where λ (weight decay) = (1/2σ^2)
        l2_reg = sum(p.pow(2).sum() for p in model_parameters)
        return Weight_decay * l2_reg

    def custom_cross_entropy_loss(self, outputs, labels, eta):
        # Custom Cross-Entropy Loss:
        # E(ω) = -∑ η_i log P(y_i | x_i, ω)
        log_probs = torch.nn.functional.log_softmax(outputs, dim=1)
        weighted_log_probs = eta[labels] * log_probs.gather(1, labels.unsqueeze(1)).squeeze(1)
        return -torch.mean(weighted_log_probs)

In [None]:
# Check if GPU is available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")


Using device: cuda:0


In [None]:
"""ResNet in PyTorch.
For Pre-activation ResNet, see 'preact_resnet.py'.
Reference:
[1] Kaiming He, Xiangyu Zhang, Shaoqing Ren, Jian Sun
    Deep Residual Learning for Image Recognition. arXiv:1512.03385
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision


class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(
            in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False
        )
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(
            planes, planes, kernel_size=3, stride=1, padding=1, bias=False
        )
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion * planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(
                    in_planes,
                    self.expansion * planes,
                    kernel_size=1,
                    stride=stride,
                    bias=False,
                ),
                nn.BatchNorm2d(self.expansion * planes),
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, in_planes, planes, stride=1):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(
            planes, planes, kernel_size=3, stride=stride, padding=1, bias=False
        )
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(
            planes, self.expansion * planes, kernel_size=1, bias=False
        )
        self.bn3 = nn.BatchNorm2d(self.expansion * planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion * planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(
                    in_planes,
                    self.expansion * planes,
                    kernel_size=1,
                    stride=stride,
                    bias=False,
                ),
                nn.BatchNorm2d(self.expansion * planes),
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = F.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=100):
        super(ResNet, self).__init__()
        self.in_planes = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.linear = nn.Linear(512 * block.expansion, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = F.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out


def ResNet18():
    return ResNet(BasicBlock, [2, 2, 2, 2])


def ResNet34():
    return ResNet(BasicBlock, [3, 4, 6, 3])


def ResNet50():
    return ResNet(Bottleneck, [3, 4, 6, 3])


def ResNet101():
    return ResNet(Bottleneck, [3, 4, 23, 3])


def ResNet152():
    return ResNet(Bottleneck, [3, 8, 36, 3])


def test():
    net = ResNet18()
    y = net(torch.randn(1, 3, 32, 32))
    print(y.size())


def resnet18_imagenet(pretrained=False, classes=10):
    model = torchvision.models.resnet18(pretrained=pretrained, num_classes=1000)
    model.conv1 = nn.Conv2d(
        3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False
    )
    model.maxpool = nn.Identity()
    model.fc = nn.Linear(512, classes, bias=True)
    return model

def resnet50_imagenet(pretrained=False, classes=10):
    model = torchvision.models.resnet50(pretrained=pretrained, num_classes=1000)
    model.fc = nn.Linear(512, classes, bias=True)
    return model

In [None]:
class BNLBasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BNLBasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(
            in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False
        )
        self.bn1 = BNL(planes)
        self.conv2 = nn.Conv2d(
            planes, planes, kernel_size=3, stride=1, padding=1, bias=False
        )
        self.bn2 = BNL(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion * planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(
                    in_planes,
                    self.expansion * planes,
                    kernel_size=1,
                    stride=stride,
                    bias=False,
                ),
                BNL(self.expansion * planes),
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, in_planes, planes, stride=1):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(
            planes, planes, kernel_size=3, stride=stride, padding=1, bias=False
        )
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(
            planes, self.expansion * planes, kernel_size=1, bias=False
        )
        self.bn3 = nn.BatchNorm2d(self.expansion * planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion * planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(
                    in_planes,
                    self.expansion * planes,
                    kernel_size=1,
                    stride=stride,
                    bias=False,
                ),
                nn.BatchNorm2d(self.expansion * planes),
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = F.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class BNLResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=100):
        super(BNLResNet, self).__init__()
        self.in_planes = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = BNL(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.linear = nn.Linear(512 * block.expansion, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = F.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out


def BNLResNet18():
    return BNLResNet(BNLBasicBlock, [2, 2, 2, 2])

In [None]:
"""ResNet in PyTorch.
For Pre-activation ResNet, see 'preact_resnet.py'.
Reference:
[1] Kaiming He, Xiangyu Zhang, Shaoqing Ren, Jian Sun
    Deep Residual Learning for Image Recognition. arXiv:1512.03385
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision


class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(
            in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False
        )
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(
            planes, planes, kernel_size=3, stride=1, padding=1, bias=False
        )
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion * planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(
                    in_planes,
                    self.expansion * planes,
                    kernel_size=1,
                    stride=stride,
                    bias=False,
                ),
                nn.BatchNorm2d(self.expansion * planes),
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, in_planes, planes, stride=1):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(
            planes, planes, kernel_size=3, stride=stride, padding=1, bias=False
        )
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(
            planes, self.expansion * planes, kernel_size=1, bias=False
        )
        self.bn3 = nn.BatchNorm2d(self.expansion * planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion * planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(
                    in_planes,
                    self.expansion * planes,
                    kernel_size=1,
                    stride=stride,
                    bias=False,
                ),
                nn.BatchNorm2d(self.expansion * planes),
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = F.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=100):
        super(ResNet, self).__init__()
        self.in_planes = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.linear = nn.Linear(512 * block.expansion, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = F.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out


def ResNet18():
    return ResNet(BasicBlock, [2, 2, 2, 2])

In [None]:
import torch
# Load the checkpoint
ckpt = torch.load("epoch_167.ckpt", map_location="cpu",weights_only=False)

# Extract and fix the state_dict
state_dict = ckpt["state_dict"]
new_state_dict = {}

# Strip "large_model." from the keys
for key in state_dict:
    new_key = key.replace("large_model.", "")
    new_state_dict[new_key] = state_dict[key]

# Load into your model
filtered_state_dict = {k: v for k, v in new_state_dict.items() if 'running_mean' not in k and 'running_var' not in k and 'num_batches_tracked' not in k}
model5 = ResNet18()
model5.load_state_dict(filtered_state_dict)

FileNotFoundError: [Errno 2] No such file or directory: 'epoch_167.ckpt'

In [None]:
loss_func = ABNNLoss(100, model5.parameters()).to(device)
optimizer = optim.SGD(filter(lambda p: p.requires_grad, model5.parameters()), lr=0.0057, momentum=0.9, weight_decay=5e-4)

In [None]:
model5.to(device)

BNLResNet(
  (conv1): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (bn1): BNL()
  (layer1): Sequential(
    (0): BNLBasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BNL()
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BNL()
      (shortcut): Sequential()
    )
    (1): BNLBasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BNL()
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BNL()
      (shortcut): Sequential()
    )
  )
  (layer2): Sequential(
    (0): BNLBasicBlock(
      (conv1): Conv2d(64, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (bn1): BNL()
      (conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BNL()
      (shortc

In [None]:
import time
print('Start Training')

# Timing the training process
start_time = time.time()

# List to store loss values
train_losses = []
for epoch in range(20):
    running_loss = 0.0
    for i, data in enumerate(trainloader, 0):
        inputs, labels = data[0].to(device), data[1].to(device)
        eta = torch.rand(labels.size(0), device=device)

        optimizer.zero_grad()
        outputs = model5(inputs)
        loss = loss_func(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        train_losses.append(loss.item())
    print(f'[Epoch {epoch + 1}, Loss: {running_loss}')
    running_loss = 0.0

end_time = time.time()

print('Finished Training')
print(f'Time taken to train the model: {end_time - start_time:.2f} seconds')

Start Training
[Epoch 1, Loss: 2517.9910378456116
[Epoch 2, Loss: 1976.281992673874
[Epoch 3, Loss: 1849.281532049179
[Epoch 4, Loss: 1774.8414433002472
[Epoch 5, Loss: 1747.916981935501
[Epoch 6, Loss: 1709.4862475395203
[Epoch 7, Loss: 1681.392540693283
[Epoch 8, Loss: 1655.3081946372986
[Epoch 9, Loss: 1640.1109764575958
[Epoch 10, Loss: 1625.9923396110535
[Epoch 11, Loss: 1618.76971077919
[Epoch 12, Loss: 1593.5048666000366
[Epoch 13, Loss: 1586.5120894908905
[Epoch 14, Loss: 1565.612500667572
[Epoch 15, Loss: 1562.6503715515137
[Epoch 16, Loss: 1555.6033828258514
[Epoch 17, Loss: 1551.3173110485077
[Epoch 18, Loss: 1544.7979979515076
[Epoch 19, Loss: 1527.7255206108093
[Epoch 20, Loss: 1524.6597218513489
Finished Training
Time taken to train the model: 1205.05 seconds


In [None]:
# Make sure the model is in evaluation mode
model5.eval()

# Variables to track the correct predictions and total predictions
correct = 0
total = 0

# Ensure no gradients are calculated as we are only making predictions
with torch.no_grad():
    for data in testloader:
        images, labels = data[0].to(device), data[1].to(device)

        # Collect predictions from multiple evaluations
        predictions = []
        for _ in range(50):
            outputs = model5(images)
            _, preds = torch.max(outputs.data, 1)
            predictions.append(preds)

        # Calculate the mode of the predictions
        predictions = torch.stack(predictions)
        predicted, _ = torch.mode(predictions, dim=0)

        # Update total and correct counts
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

# Calculate the accuracy
accuracy = 100 * correct / total
print(f'Accuracy of the network on the test images: {accuracy:.2f}%')

Accuracy of the network on the test images: 58.01%


In [None]:
model5.eval()
all_probs = []
true_labels = []

with torch.no_grad():
    for inputs, labels in testloader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        outputs = model5(inputs)  # logits
        probs = torch.softmax(outputs, dim=1)  # convert logits to probabilities

        all_probs.append(probs)
        true_labels.append(labels)

# Concatenate everything
all_probs = torch.cat(all_probs, dim=0)            # Shape: [N, C]
true_labels = torch.cat(true_labels, dim=0)

In [None]:
import torch.nn.functional as F

# Use log probabilities for NLL
log_probs = torch.log(all_probs + 1e-12)  # for numerical stability

# Compute Negative Log-Likelihood
nll = F.nll_loss(log_probs, true_labels)
print(f"NLL (Single Model): {nll:.4f}")

NLL (Single Model): 2.1098


In [None]:
torch.save({
    'epoch': epoch,
    'model_state_dict': model5.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'loss': running_loss,
}, 'bnn100.ckpt')

In [None]:
import torch
# Load the checkpoint
ckpt = torch.load("epoch_172.ckpt", map_location="cpu",weights_only=False)

# Extract and fix the state_dict
state_dict = ckpt["state_dict"]
new_state_dict = {}

# Strip "large_model." from the keys
for key in state_dict:
    new_key = key.replace("large_model.", "")
    new_state_dict[new_key] = state_dict[key]

# Load into your model
#filtered_state_dict = {k: v for k, v in new_state_dict.items() if 'running_mean' not in k and 'running_var' not in k and 'num_batches_tracked' not in k}
model6 = ResNet18()
model6.load_state_dict(new_state_dict)

<All keys matched successfully>

In [None]:
loss_func = nn.CrossEntropyLoss().to(device)
optimizer = optim.SGD(filter(lambda p: p.requires_grad, model6.parameters()), lr=0.0057, momentum=0.9, weight_decay=5e-4)

In [None]:
# Make sure the model is in evaluation mode
model6.eval()

# Variables to track the correct predictions and total predictions
correct = 0
total = 0

# Ensure no gradients are calculated as we are only making predictions
with torch.no_grad():
    for data in testloader:
        images, labels = data[0].to(device), data[1].to(device)

        # Collect predictions from multiple evaluations
        predictions = []
        for _ in range(50):
            outputs = model6(images)
            _, preds = torch.max(outputs.data, 1)
            predictions.append(preds)

        # Calculate the mode of the predictions
        predictions = torch.stack(predictions)
        predicted, _ = torch.mode(predictions, dim=0)

        # Update total and correct counts
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

# Calculate the accuracy
accuracy = 100 * correct / total
print(f'Accuracy of the network on the test images: {accuracy:.2f}%')

RuntimeError: Input type (torch.cuda.FloatTensor) and weight type (torch.FloatTensor) should be the same

In [None]:
model6.eval()
all_probs = []
true_labels = []

with torch.no_grad():
    for inputs, labels in testloader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        outputs = model6(inputs)  # logits
        probs = torch.softmax(outputs, dim=1)  # convert logits to probabilities

        all_probs.append(probs)
        true_labels.append(labels)

# Concatenate everything
all_probs = torch.cat(all_probs, dim=0)            # Shape: [N, C]
true_labels = torch.cat(true_labels, dim=0)

In [None]:
import torch.nn.functional as F

# Use log probabilities for NLL
log_probs = torch.log(all_probs + 1e-12)  # for numerical stability

# Compute Negative Log-Likelihood
nll = F.nll_loss(log_probs, true_labels)
print(f"NLL (Single Model): {nll:.4f}")

NLL (Single Model): 1.7997


In [None]:
import torch
# Load the checkpoint
ckpt = torch.load("epoch_172.ckpt", map_location="cpu",weights_only=False)

# Extract and fix the state_dict
state_dict = ckpt["state_dict"]
new_state_dict = {}

# Strip "large_model." from the keys
for key in state_dict:
    new_key = key.replace("large_model.", "")
    new_state_dict[new_key] = state_dict[key]

# Load into your model
filtered_state_dict = {k: v for k, v in new_state_dict.items() if 'running_mean' not in k and 'running_var' not in k and 'num_batches_tracked' not in k}
model7 = BNLResNet18()
model7.load_state_dict(filtered_state_dict)

<All keys matched successfully>

In [None]:
model7.to(device)
loss_func = ABNNLoss(100, model7.parameters()).to(device)
optimizer = optim.SGD(filter(lambda p: p.requires_grad, model7.parameters()), lr=0.0057, momentum=0.9, weight_decay=5e-4)

In [None]:

print('Start Training')

# Timing the training process
start_time = time.time()

# List to store loss values
train_losses = []
for epoch in range(20):
    running_loss = 0.0
    for i, data in enumerate(trainloader, 0):
        inputs, labels = data[0].to(device), data[1].to(device)
        eta = torch.rand(labels.size(0), device=device)

        optimizer.zero_grad()
        outputs = model7(inputs)
        loss = loss_func(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        train_losses.append(loss.item())
    print(f'[Epoch {epoch + 1}, Loss: {running_loss}')
    running_loss = 0.0

end_time = time.time()

print('Finished Training')
print(f'Time taken to train the model: {end_time - start_time:.2f} seconds')

Start Training


KeyboardInterrupt: 

In [None]:
# Make sure the model is in evaluation mode
model7.eval()

# Variables to track the correct predictions and total predictions
correct = 0
total = 0

# Ensure no gradients are calculated as we are only making predictions
with torch.no_grad():
    for data in testloader:
        images, labels = data[0].to(device), data[1].to(device)

        # Collect predictions from multiple evaluations
        predictions = []
        for _ in range(50):
            outputs = model7(images)
            _, preds = torch.max(outputs.data, 1)
            predictions.append(preds)

        # Calculate the mode of the predictions
        predictions = torch.stack(predictions)
        predicted, _ = torch.mode(predictions, dim=0)

        # Update total and correct counts
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

# Calculate the accuracy
accuracy = 100 * correct / total
print(f'Accuracy of the network on the test images: {accuracy:.2f}%')


Accuracy of the network on the test images: 43.22%


In [None]:
model7.eval()
all_probs = []
true_labels = []

with torch.no_grad():
    for inputs, labels in testloader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        outputs = model7(inputs)  # logits
        probs = torch.softmax(outputs, dim=1)  # convert logits to probabilities

        all_probs.append(probs)
        true_labels.append(labels)

# Concatenate everything
all_probs = torch.cat(all_probs, dim=0)            # Shape: [N, C]
true_labels = torch.cat(true_labels, dim=0)

In [None]:
import torch.nn.functional as F

# Use log probabilities for NLL
log_probs = torch.log(all_probs + 1e-12)  # for numerical stability

# Compute Negative Log-Likelihood
nll = F.nll_loss(log_probs, true_labels)
print(f"NLL (Single Model): {nll:.4f}")


NLL (Single Model): 4.0751


In [None]:
torch.save({
    'epoch': epoch,
    'model_state_dict': model7.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'loss': running_loss,
}, 'rhoabnn100.ckpt')

In [None]:
import torch
import time
import random
import numpy as np

# Define a function to set seeds for reproducibility
def set_seed(seed):
    torch.manual_seed(seed)
    np.random.seed(seed)
    random.seed(seed)

# Train one instance of the model
def train_one_instance(model_id, trainloader, device, loss_func, num_epochs=20):
    print(f"Training model {model_id}")

    set_seed(42 + model_id)  # Different seed for each instance

    model = BNLResNet18().to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    model.train()

    start_time = time.time()
    train_losses = []

    for epoch in range(num_epochs):
        running_loss = 0.0
        for i, data in enumerate(trainloader, 0):
            inputs, labels = data[0].to(device), data[1].to(device)
            eta = torch.rand(labels.size(0), device=device)  # Unused here?

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = loss_func(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            train_losses.append(loss.item())

        print(f'[Model {model_id}, Epoch {epoch + 1}] Loss: {running_loss:.4f}')

    end_time = time.time()
    print(f'Model {model_id} Finished Training in {end_time - start_time:.2f} seconds')

    # Save the model
    torch.save(model.state_dict(), f"bnlresnet18_instance_{model_id}.pth")

    return model

In [None]:

def ensemble_predict(models, dataloader, device):
    all_outputs = []

    for model in models:
        model.eval()
        outputs = []
        with torch.no_grad():
            for data in dataloader:
                inputs, _ = data
                inputs = inputs.to(device)
                output = torch.softmax(model(inputs), dim=1)  # probability outputs
                outputs.append(output.cpu())
        all_outputs.append(torch.cat(outputs))

    # Average predictions
    ensemble_output = torch.mean(torch.stack(all_outputs), dim=0)
    return ensemble_output

In [None]:
%cd RHO-Loss/

[Errno 2] No such file or directory: 'RHO-Loss/'
/content/RHO-Loss


In [None]:
import torch
# Load the checkpoint
ckpt = torch.load("epoch_172.ckpt", map_location="cpu",weights_only=False)

# Extract and fix the state_dict
state_dict = ckpt["state_dict"]
new_state_dict = {}

# Strip "large_model." from the keys
for key in state_dict:
    new_key = key.replace("large_model.", "")
    new_state_dict[new_key] = state_dict[key]

# Load into your model
filtered_state_dict = {k: v for k, v in new_state_dict.items() if 'running_mean' not in k and 'running_var' not in k and 'num_batches_tracked' not in k}
model = BNLResNet18()
model.load_state_dict(filtered_state_dict)

<All keys matched successfully>

In [None]:
loss_func = ABNNLoss(100, model.parameters()).to(device)
optimizer = optim.SGD(filter(lambda p: p.requires_grad, model.parameters()), lr=0.0057, momentum=0.9, weight_decay=5e-4)


In [None]:
# Train 4 instances
models = []
for i in range(4):
    model = train_one_instance(i, trainloader, device, loss_func)
    models.append(model)

# Perform ensemble inference
ensemble_probs = ensemble_predict(models, testloader, device)

# If you want hard labels:
ensemble_preds = torch.argmax(ensemble_probs, dim=1)

Training model 0
[Model 0, Epoch 1] Loss: 3398.3284
[Model 0, Epoch 2] Loss: 3041.9074
[Model 0, Epoch 3] Loss: 2861.1012
[Model 0, Epoch 4] Loss: 2703.1394
[Model 0, Epoch 5] Loss: 2570.7232
[Model 0, Epoch 6] Loss: 2465.6284
[Model 0, Epoch 7] Loss: 2373.3586
[Model 0, Epoch 8] Loss: 2288.6973
[Model 0, Epoch 9] Loss: 2202.8708
[Model 0, Epoch 10] Loss: 2118.3165
[Model 0, Epoch 11] Loss: 2052.9063
[Model 0, Epoch 12] Loss: 1983.9764
[Model 0, Epoch 13] Loss: 1914.9001
[Model 0, Epoch 14] Loss: 1856.4559
[Model 0, Epoch 15] Loss: 1802.3951
[Model 0, Epoch 16] Loss: 1762.2225
[Model 0, Epoch 17] Loss: 1712.2112
[Model 0, Epoch 18] Loss: 1675.2091
[Model 0, Epoch 19] Loss: 1647.1781
[Model 0, Epoch 20] Loss: 1618.6209
Model 0 Finished Training in 1204.83 seconds
Training model 1
[Model 1, Epoch 1] Loss: 3369.0899
[Model 1, Epoch 2] Loss: 3058.2545
[Model 1, Epoch 3] Loss: 2874.8103
[Model 1, Epoch 4] Loss: 2729.5507
[Model 1, Epoch 5] Loss: 2597.7319
[Model 1, Epoch 6] Loss: 2496.2852


In [None]:
import torch.nn.functional as F

def compute_NLL(probs, labels):
    """
    probs: tensor of shape (N, C) - ensemble probabilities (after softmax and averaging)
    labels: tensor of shape (N,) - ground truth labels
    """
    # Use log probabilities and gather only for correct classes
    log_probs = torch.log(probs + 1e-12)  # avoid log(0)
    nll = F.nll_loss(log_probs, labels, reduction='mean')
    return nll.item()

In [None]:
# Get true labels from testloader
true_labels = []

with torch.no_grad():
    for data in testloader:
        inputs, labels = data
        true_labels.extend(labels)

# Convert to a torch tensor
true_labels = torch.tensor(true_labels, dtype=torch.long, device=ensemble_probs.device)

In [None]:
# Assuming `ensemble_probs` is of shape [N, C] and already averaged over ensemble members
# And `true_labels` is a 1D tensor of length N

nll = compute_NLL(ensemble_probs, true_labels)
print(f"NLL (Ensemble): {nll:.4f}")

NLL (Ensemble): 1.9288
