In [None]:
# imports
import numpy as np
import pandas as pd
from torch.nn import functional as F
import torch.nn as nn
import torch
import torch.optim as optim
import torchvision
import torchvision.models as models
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
from scipy.stats import pearsonr
from copy import deepcopy
import math
from torch.nn import init
from tqdm import tqdm
import json
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
# loading the data
transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
batch_size = 64

trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,
                                          shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                       download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
                                         shuffle=False, num_workers=2)

classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

In [None]:
# showing sample images from data
def imshow(img):
    img = img / 2 + 0.5
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()

dataiter = iter(trainloader)
images, labels = next(dataiter)

imshow(torchvision.utils.make_grid(images))
print(' '.join(f'{classes[labels[j]]:5s}' for j in range(batch_size)))

In [None]:
# code to calculate leading digit distribution and subsequently calculating MLH
benford = [math.log10(1 + 1 / i) for i in range(1, 10)]
benford = [np.round(i, 4) for i in benford]
benford_th = torch.FloatTensor(benford)

def non_bias(m):
    if isinstance(m, torch.nn.Conv2d) or isinstance(m, torch.nn.Linear) or isinstance(m, torch.nn.BatchNorm2d):
        return m.weight
    return None

def bincount(tensor):
    counts = torch.zeros(10)
    for i in range(10):
        counts[i] = torch.count_nonzero(tensor == i)
    return counts

@torch.no_grad()
def bin_percent(tensor):
    tensor = tensor.abs() * 1e10
    tensor = tensor // 10 ** torch.log10(tensor).long()
    tensor = bincount(tensor.long())
    return tensor

def block_bincount(model):
    bins = torch.zeros(10)
    total_params = 0
    for m in model.modules():
        if list(m.children()) == []:
            weights = non_bias(m)
            if weights is not None:
                total_params += weights.numel()
                tensor = bin_percent(weights.view(-1).detach())
                for i in range(10):
                    bins[i] += tensor[i]
    return bins / bins.sum()

def calculate_mlh(bin_percents):
    return pearsonr(benford_th, bin_percents[1:])[0]

def benford_comparison(model):
    bins = block_bincount(deepcopy(model))
    return calculate_mlh(bins)

# using Xavier Uniform initialization method to initialize weights for given model
def init_params(model):
    for m in model.modules():
        if isinstance(m, torch.nn.Conv2d) or isinstance(m, torch.nn.Linear):
            init.xavier_uniform_(m.weight)
            if m.bias is not None:
                init.constant_(m.bias, 0)
        elif isinstance(m, torch.nn.BatchNorm2d):
            init.constant_(m.weight, 1)
            init.constant_(m.bias, 0)

In [None]:
# normal training epoch. If optimizer is None, then can be used for validation and test data
def epoch(model, criterion, loader, optimizer=None):
    total_loss = 0.
    total_err = 0.
    for X, y in loader:
        X, y = X.to(device), y.to(device)
        y_hat = model(X)
        loss = criterion(y_hat, y)
        if optimizer:
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        total_err += (y_hat.max(dim=1)[1] != y).sum().item()
        total_loss += loss.item() * X.shape[0]
    return total_err / len(loader.dataset), total_loss / len(loader.dataset)

# implementing PGD attack under l_infinity norm
def pgd_linf(model, X, y, epsilon=0.1, alpha=0.01, num_iter=20, randomize=False):
    if randomize:
        delta = torch.rand_like(X, requires_grad=True)
        delta.data = delta.data * 2 * epsilon - epsilon
    else:
        delta = torch.zeros_like(X, requires_grad=True)
    
    for t in range(num_iter):
        loss = F.cross_entropy(model(X + delta), y)
        loss.backward()
        delta.data = (delta + alpha * delta.grad.detach().sign()).clamp(-epsilon, epsilon)
        delta.grad.zero_()
    return delta.detach()

# adversarial training epoch. If optimizer is None, then can be used for validation and test data
def epoch_adv(model, criterion, loader, optimizer=None, **kwargs):
    total_loss = 0.
    total_err = 0.
    for X, y in loader:
        X, y = X.to(device), y.to(device)
        delta = pgd_linf(model, X, y, **kwargs)
        y_hat = model(X + delta)
        loss = criterion(y_hat, y)
        if optimizer:
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        total_err += (y_hat.max(dim=1)[1] != y).sum().item()
        total_loss += loss.item() * X.shape[0]
    return total_err / len(loader.dataset), total_loss / len(loader.dataset)

# calculating bounds
def bounds_propagation(model, initial_bound):
    l, u = initial_bound
    bounds = []
    for m in model.modules():
        if isinstance(m, torch.nn.Linear):
            l_ = (m.weight.clamp(min=0) @ l.t() + m.weight.clamp(max=0) @ u.t() + m.bias[:, None]).t()
            u_ = (m.weight.clamp(min=0) @ u.t() + m.weight.clamp(max=0) @ l.t() + m.bias[:, None]).t()
        elif isinstance(m, torch.nn.Conv2d):
            l_ = F.conv2d(l, m.weight.clamp(min=0), bias=None, stride=m.stride, padding=m.padding, dilation=m.dilation, groups=m.groups) + F.conv2d(u, m.weight.clamp(max=0), bias=None, stride=m.stride, padding=m.padding, dilation=m.dilation, groups=m.groups) + m.bias[None, :, None, None]
            u_ = F.conv2d(u, m.weight.clamp(min=0), bias=None, stride=m.stride, padding=m.padding, dilation=m.dilation, groups=m.groups) + F.conv2d(l, m.weight.clamp(max=0), bias=None, stride=m.stride, padding=m.padding, dilation=m.dilation, groups=m.groups) + m.bias[None, :, None, None]
        elif isinstance(m, torch.nn.ReLU):
            l_ = l.clamp(min=0)
            u_ = u.clamp(min=0)
        
        bounds.append((l_, u_))
        l, u = l_, u_
    return bounds

def interval_based_bound(model, c, bounds, idx):
    cW = c.t() @ list(model.modules())[-1].weight
    cb = c.t() @ list(model.modules())[-1].bias
    
    l,u = bounds[-2]
    return (cW.clamp(min=0) @ l[idx].t() + cW.clamp(max=0) @ u[idx].t() + cb[:,None]).t()  

# upper-bound based adversarial training epoch. If optimizer is None, then can be used for validation and test data 
def epoch_robust_bound(model, criterion, loader, epsilon, optimizer=None):
    total_loss = 0.
    total_err = 0.

    C = [-torch.eye(10).to(device) for _ in range(10)]
    for i in range(10):
        C[i][i, :] += 1

    for X, y in loader:
        X, y = X.to(device), y.to(device)
        initial_bound = (X - epsilon, X + epsilon)
        bounds = bounds_propagation(model, initial_bound)
        loss = 0
        for y0 in range(10):
            if sum(y == y0) > 0:
                lower_bound = interval_based_bound(model, C[y0], bounds, y == y0)
                loss += nn.CrossEntropyLoss(reduction='sum')(-lower_bound, y[y == y0]) / X.shape[0]
                total_err += (lower_bound.min(dim=1)[0] < 0).sum().item()
        total_loss += loss.item() * X.shape[0]
        if optimizer:
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
    return total_err / len(loader.dataset), total_loss / len(loader.dataset)


In [None]:
# initializing models
clean_model = models.resnet50().to(device)
init_params(clean_model)
pgd_model = models.resnet50().to(device)
pgd_model.load_state_dict(clean_model.state_dict())
bound_model = models.resnet50().to(device)
bound_model.load_state_dict(clean_model.state_dict())

clean_mlh_init = benford_comparison(clean_model)
pgd_mlh_init = benford_comparison(pgd_model)
bound_mlh_init = benford_comparison(bound_model)
print(clean_mlh_init, pgd_mlh_init, bound_mlh_init)

In [None]:
# regular training
optimizer = optim.Adam(clean_model.parameters(), lr=0.003)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50000)
criterion = nn.CrossEntropyLoss()

clean_training_accuracy = []
clean_validation_accuracy = []
clean_adv_accuracy = []
clean_mlh_list = []

for t in tqdm(range(50)):
    train_err, train_loss = epoch(clean_model, criterion, trainloader, optimizer)
    test_err, test_loss = epoch(clean_model, criterion, testloader)
    adv_err, adv_loss = epoch_adv(clean_model, criterion, testloader, epsilon=0.1, alpha=0.05, num_iter=40, randomize=True)
    scheduler.step()
    mlh = benford_comparison(clean_model)
    clean_training_accuracy.append(train_err)
    clean_validation_accuracy.append(test_err)
    clean_adv_accuracy.append(adv_err)
    clean_mlh_list.append(mlh)

print(clean_training_accuracy)
print(clean_validation_accuracy)
print(clean_adv_accuracy)
print(clean_mlh_list)
clean_dict = {
    'clean_training_accuracy': clean_training_accuracy,
    'clean_validation_accuracy': clean_validation_accuracy,
    'clean_adv_accuracy': clean_adv_accuracy,
    'clean_mlh_list': clean_mlh_list
}
with open ("clean_resnet50.json", "w") as f:
    json.dump(clean_dict, f)

In [None]:
# adversarial training using PGD adversarial examples
optimizer = optim.Adam(pgd_model.parameters(), lr=0.003)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50000)
criterion = nn.CrossEntropyLoss()

pgd_training_accuracy = []
pgd_validation_accuracy = []
pgd_adv_accuracy = []
pgd_mlh_list = []

iteration = 0
for t in tqdm(range(50)):
    train_err, train_loss = epoch_adv(pgd_model, criterion, trainloader, optimizer)
    test_err, test_loss = epoch(pgd_model, criterion, testloader)
    adv_err, adv_loss = epoch_adv(pgd_model, criterion, testloader)
    scheduler.step()
    mlh = benford_comparison(pgd_model)
    pgd_training_accuracy.append(train_err)
    pgd_validation_accuracy.append(test_err)
    pgd_adv_accuracy.append(adv_err)
    pgd_mlh_list.append(mlh)
    if iteration % 10 == 9:
        print(iteration)
        print(pgd_training_accuracy)
        print(pgd_validation_accuracy)
        print(pgd_adv_accuracy)
        print(pgd_mlh_list)
    iteration += 1
    
print("final")
print(pgd_training_accuracy)
print(pgd_validation_accuracy)
print(pgd_adv_accuracy)
print(pgd_mlh_list)
pgd_dict = {
    'pgd_training_accuracy': pgd_training_accuracy,
    'pgd_validation_accuracy': pgd_validation_accuracy,
    'pgd_adv_accuracy': pgd_adv_accuracy,
    'pgd_mlh_list': pgd_mlh_list
}
with open ("pgd_resnet50.json", "w") as f:
    json.dump(pgd_dict, f)

In [None]:
# adversarial training using upper-bound method
optimizer = optim.Adam(bound_model.parameters(), lr=0.003)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50000)
criterion = nn.CrossEntropyLoss()

bound_training_accuracy = []
bound_validation_accuracy = []
bound_adv_accuracy = []
bound_mlh_list = []

iteration = 0
for t in tqdm(range(50)):
    train_err, train_loss = epoch_robust_bound(bound_model, criterion, trainloader, 0.01, optimizer)
    test_err, test_loss = epoch(bound_model, criterion, testloader)
    adv_err, adv_loss = epoch_robust_bound(bound_model, criterion, testloader, 0.1)
    scheduler.step()
    mlh = benford_comparison(bound_model)
    bound_training_accuracy.append(train_err)
    bound_validation_accuracy.append(test_err)
    bound_adv_accuracy.append(adv_err)
    bound_mlh_list.append(mlh)
    if iteration % 10 == 9:
        print(iteration)
        print(bound_training_accuracy)
        print(bound_validation_accuracy)
        print(bound_adv_accuracy)
        print(bound_mlh_list)
    iteration += 1

print("final")
print(bound_training_accuracy)
print(bound_validation_accuracy)
print(bound_adv_accuracy)
print(bound_mlh_list)
bound_dict = {
    'bound_training_accuracy': bound_training_accuracy,
    'bound_validation_accuracy': bound_validation_accuracy,
    'bound_adv_accuracy': bound_adv_accuracy,
    'bound_mlh_list': bound_mlh_list
}
with open ("bound_resnet50.json", "w") as f:
    json.dump(bound_dict, f)

In [None]:
# early stopping using MLH as criteria
clean_model = models.resnet50().to(device)
init_params(clean_model)

optimizer = optim.Adam(clean_model.parameters(), lr=0.003)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50000)
criterion = nn.CrossEntropyLoss()

for t in tqdm(range(60)):
    train_err, train_loss = epoch(clean_model, criterion, trainloader, optimizer)
    scheduler.step()
    mlh = benford_comparison(clean_model)
    if mlh > 0.998:
        break

test_err, test_loss = epoch(clean_model, criterion, testloader)
mlh_final = benford_comparison(clean_model)
print(test_err, test_loss, mlh_final)

In [None]:
# create validation loader
trainset_size = len(trainset)
trainset_indices = list(range(trainset_size))
np.random.shuffle(trainset_indices)
split = int(np.floor(0.2 * trainset_size))
train_idx, valid_idx = trainset_indices[split:], trainset_indices[:split]
train_sampler = torch.utils.data.sampler.SubsetRandomSampler(train_idx)
valid_sampler = torch.utils.data.sampler.SubsetRandomSampler(valid_idx)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, sampler=train_sampler)
validloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, sampler=valid_sampler)

In [None]:
# early stopping using validation accuracy as criteria
optimizer = optim.Adam(clean_model.parameters(), lr=0.003)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50000)
criterion = nn.CrossEntropyLoss()

for t in tqdm(range(60)):
    train_err, train_loss = epoch(clean_model, criterion, trainloader, optimizer)
    valid_err, valid_loss = epoch(clean_model, criterion, validloader)
    scheduler.step()
    mlh = benford_comparison(clean_model)
    print(train_err, valid_err, mlh)
    if valid_err < 0.05:
        break

test_err, test_loss = epoch(clean_model, criterion, testloader)
mlh_final = benford_comparison(clean_model)
print(test_err, test_loss, mlh_final)

In [None]:
# using various initialization methods to initialize weights for given model
def multi_init_params(model, initializer):
    for m in model.modules():
        if isinstance(m, torch.nn.Conv2d) or isinstance(m, torch.nn.Linear):
            initializer(m.weight)
            if m.bias is not None:
                init.constant_(m.bias, 0)
        elif isinstance(m, torch.nn.BatchNorm2d):
            init.constant_(m.weight, 1)
            init.constant_(m.bias, 0)

# initalizing various models and initializers
torch_models = [models.resnet18(), models.resnet34(), models.resnet50(), models.resnet101(), models.resnet152(), models.densenet121(), models.densenet161(), models.densenet169(), models.densenet201(), models.inception_v3(), models.googlenet(), models.vgg11(), models.vgg11_bn(), models.vgg13(), models.vgg13_bn(), models.vgg16(), models.vgg16_bn(), models.vgg19(), models.vgg19_bn(), models.mobilenet_v2()]
torch_models = [model.to(device) for model in torch_models]
print("{} Models loaded".format(len(torch_models)))
initializers = [init.kaiming_uniform_, init.kaiming_normal_, init.xavier_uniform_, init.xavier_normal_, init.orthogonal_, init.normal_, init.uniform_, init.trunc_normal_]
print("{} Initializers loaded".format(len(initializers)))
print("Number of models: ", len(torch_models))
print("Number of initializers: ", len(initializers))

In [None]:
# calculating mlh and bins for every model for every initializer
init_mlh_values = np.zeros((len(torch_models), len(initializers)))
init_bins = np.zeros((len(torch_models), len(initializers), 9))
for i, model in enumerate(torch_models):
    for j, initializer in enumerate(initializers):
        print("Model: ", i, " Initializer: ", j)
        multi_init_params(model, initializer)
        init_mlh_values[i, j], init_bins[i, j] = benford_comparison(model)

In [None]:
# box plot for each initializer
plt.figure(figsize=(15, 6))
colors = sns.color_palette("husl", len(initializers))
plt.boxplot(init_mlh_values, showmeans=True, meanline=True, meanprops={"linestyle": "--", "color": "red"})
plt.xticks(range(1, len(initializers) + 1), ['Kaiming Uniform', 'Kaiming Normal', 'Xavier Uniform', 'Xavier Normal', 'Orthogonal', 'Normal', 'Uniform', 'Truncated Normal'])
plt.xlabel("Initializer")
plt.ylabel("MLH")
plt.title("MLH values for different initializers")
plt.grid(alpha=0.3)
plt.show()

In [None]:
# bar plot for benford distribution vs Kaiming Normal initialized weights distribution
colors = sns.color_palette("husl", len(torch_models) + 1)
plt.bar(range(1, 10), benford_th, color=colors[0], label="Benford Distribution")
for i in range(len(torch_models)):
    plt.plot(range(1, 10), init_bins[i, 1], color=colors[i + 1], label="Model {}".format(torch_models[i].__class__.__name__))
plt.xlabel("Digit")
plt.xticks(range(1, 10))
plt.ylabel("Frequency")
plt.title("Benford Distribution vs Uniform initialized weights distribution")
plt.grid(alpha=0.3)
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left', borderaxespad=0.)
plt.show()