<a href="https://colab.research.google.com/github/arvind6599/Opt_ML_Project/blob/main/Quantitative_Evaluation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [17]:
%matplotlib inline

import numpy as np
from pprint import pprint

from PIL import Image
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import grad
import torchvision
from torchvision import models, datasets, transforms
torch.manual_seed(50)

print(torch.__version__, torchvision.__version__)

2.3.0+cu121 0.18.0+cu121


In [18]:
dst = datasets.CIFAR100("~/.torch", download=True)
tp = transforms.Compose([
    transforms.Resize(32),
    transforms.CenterCrop(32),
    transforms.ToTensor()
])
tt = transforms.ToPILImage()

device = "cpu"
if torch.cuda.is_available():
    device = "cuda"
print("Running on %s" % device)

def label_to_onehot(target, num_classes=100):
    target = torch.unsqueeze(target, 1)
    onehot_target = torch.zeros(target.size(0), num_classes, device=target.device)
    onehot_target.scatter_(1, target, 1)
    return onehot_target

def cross_entropy_for_onehot(pred, target):
    return torch.mean(torch.sum(- target * F.log_softmax(pred, dim=-1), 1))

Files already downloaded and verified
Running on cuda


In [None]:
def weights_init(m):
    if hasattr(m, "weight"):
        m.weight.data.uniform_(-0.5, 0.5)
    if hasattr(m, "bias"):
        m.bias.data.uniform_(-0.5, 0.5)

class LeNet(nn.Module):
    def __init__(self):
        super(LeNet, self).__init__()
        act = nn.Sigmoid
        self.body = nn.Sequential(
            nn.Conv2d(3, 12, kernel_size=5, padding=5//2, stride=2),
            act(),
            nn.Conv2d(12, 12, kernel_size=5, padding=5//2, stride=2),
            act(),
            nn.Conv2d(12, 12, kernel_size=5, padding=5//2, stride=1),
            act(),
            nn.Conv2d(12, 12, kernel_size=5, padding=5//2, stride=1),
            act(),
        )
        self.fc = nn.Sequential(
            nn.Linear(768, 100)
        )

    def forward(self, x):
        out = self.body(x)
        out = out.view(out.size(0), -1)
        # print(out.size())
        out = self.fc(out)
        return out

In [39]:
def quantize(x,input_compress_settings={}):
    compress_settings={'n':6}
    compress_settings.update(input_compress_settings)
    #assume that x is a torch tensor

    n=compress_settings['n']
    #print('n:{}'.format(n))
    x=x.float()
    x_norm=torch.norm(x,p=float('inf'))

    sgn_x=((x>0).float()-0.5)*2

    p=torch.div(torch.abs(x),x_norm)
    renormalize_p=torch.mul(p,n)
    floor_p=torch.floor(renormalize_p)
    compare=torch.rand_like(floor_p)
    final_p=renormalize_p-floor_p
    margin=(compare < final_p).float()
    xi=(floor_p+margin)/n

    Tilde_x=x_norm*sgn_x*xi

    return Tilde_x


def uniform_quantization(x, levels=16):
    '''
    Perform uniform quantization on the input tensor x, with levels number of levels.

    Parameters:
    x (torch.Tensor): The input tensor to be quantized.
    levels (int): The number of levels to quantize the input tensor into.

    '''
    min_val, max_val = x.min(), x.max()
    scale = (max_val - min_val) / (levels - 1)
    quantized = torch.round((x - min_val) / scale) * scale + min_val
    return quantized

def log_quantization(tensor, base=2):
    '''
    Perform log quantization on the input tensor x, with base as the base of the logarithm.

    Parameters:
    tensor (torch.Tensor): The input tensor to be quantized.
    base (int): The base of the logarithm to be used for quantization.
    '''

    sign = torch.sign(tensor)
    log_tensor = torch.log(torch.abs(tensor) + 1e-9) / torch.log(torch.tensor(base))
    quantized = torch.round(log_tensor) * torch.log(torch.tensor(base))
    return sign * torch.exp(quantized)



def kmeans_quantization(tensor, clusters=4):
    '''
    Perform k-means quantization on the input tensor x, with clusters number of clusters.
    Parameters:
    tensor (torch.Tensor): The input tensor to be quantized.
    clusters (int): The number of clusters to quantize the input tensor into.
    '''
    tensor_reshaped = tensor.view(-1, 1).numpy()
    kmeans = KMeans(n_clusters=clusters).fit(tensor_reshaped)
    quantized = torch.tensor(kmeans.cluster_centers_[kmeans.labels_]).view_as(tensor)
    return quantized


def stochastic_rounding(tensor, levels=16):
    '''
    Stochastic rounding involves rounding to the nearest quantized value with a probability proportional to the distance from the exact value, which can preserve more information in expectation

    Parameters:
    tensor (torch.Tensor): The input tensor to be quantized.
    levels (int): The number of levels to quantize the input tensor into.
    '''

    min_val, max_val = tensor.min(), tensor.max()
    scale = (max_val - min_val) / (levels - 1)
    scaled = (tensor - min_val) / scale
    lower = torch.floor(scaled)
    upper = torch.ceil(scaled)
    prob = scaled - lower
    quantized = torch.where(torch.rand_like(tensor) < prob, upper, lower) * scale + min_val
    return quantized

def fixed_point_quantization(tensor, num_bits, fractional_bits):
    '''
    Fixed-point quantization involves scaling the input tensor by a power of 2, rounding to the nearest integer, and then scaling back to the original range.
    '''

    scale = 2 ** fractional_bits
    quantized = torch.round(tensor * scale) / scale
    max_val = 2 ** (num_bits - fractional_bits - 1) - 1 / scale
    min_val = -max_val
    quantized = torch.clamp(quantized, min_val, max_val)
    return quantized


def add_sparsity(x, sparsity_ratio=0.1):
    """
    Adds sparsity to the input tensor by setting a specified percentage of the smallest absolute values to zero.

    Parameters:
    tensor (torch.Tensor): The input tensor.
    sparsity_ratio (float): The ratio of elements to be set to zero, between 0 and 1.

    Returns:
    torch.Tensor: The sparse tensor.
    """
    flat_tensor = x.flatten()
    k = int(sparsity_ratio * flat_tensor.size(0))

    # print("Number of elemnts that will be zeroed out",k)

    if k > 0:
        threshold = flat_tensor.abs().kthvalue(k).values.item()
        mask = flat_tensor.abs() > threshold
        sparse_tensor = flat_tensor * mask.float()
        return sparse_tensor.view_as(x)
    else:
        return x

In [50]:
from tqdm import tqdm

NUM_SEEDS = 10

loss_histories = []
pbar = tqdm(range(NUM_SEEDS))
for _ in pbar:
    net = LeNet().to(device)

    net.apply(weights_init)
    criterion = cross_entropy_for_onehot

    ######### honest partipant #########
    img_index = 25
    gt_data = tp(dst[img_index][0]).to(device)
    gt_data = gt_data.view(1, *gt_data.size())
    gt_label = torch.Tensor([dst[img_index][1]]).long().to(device)
    gt_label = gt_label.view(1, )
    gt_onehot_label = label_to_onehot(gt_label, num_classes=100)

    # compute original gradient
    out = net(gt_data)
    y = criterion(out, gt_onehot_label)
    dy_dx = torch.autograd.grad(y, net.parameters())


    # share the gradients with other clients
    original_dy_dx = list((_.detach().clone() for _ in dy_dx))

    # generate dummy data and label
    dummy_data = torch.randn(gt_data.size()).to(device).requires_grad_(True)
    dummy_label = torch.randn(gt_onehot_label.size()).to(device).requires_grad_(True)

    optimizer = torch.optim.LBFGS([dummy_data, dummy_label] )

    history = []
    loss_history = []
    for iters in range(100):
        def closure():
            optimizer.zero_grad()

            pred = net(dummy_data)
            dummy_onehot_label = F.softmax(dummy_label, dim=-1)
            dummy_loss = criterion(pred, dummy_onehot_label) # TODO: fix the gt_label to dummy_label in both code and slides.
            dummy_dy_dx = torch.autograd.grad(dummy_loss, net.parameters(), create_graph=True)

            grad_diff = 0
            grad_count = 0
            for gx, gy_ in zip(dummy_dy_dx, original_dy_dx): # TODO: fix the variablas here
                gy = add_sparsity(gy_, 0.4)
                grad_diff += ((gx - gy) ** 2).sum()
                grad_count += gx.nelement()
            # grad_diff = grad_diff / grad_count * 1000
            grad_diff.backward()

            return grad_diff

        optimizer.step(closure)
        if iters % 10 == 0:
            current_loss = closure()
            #print(iters, "%.4f" % current_loss.item())
        pbar.set_description(f"Iterations: {iters}/100 | Loss: {current_loss.item()}")
        loss_history.append(current_loss.item())
        history.append(tt(dummy_data[0].cpu()))
    loss_histories.append(loss_history)

Iterations: 99/100 | Loss: 1.3206108808517456: 100%|██████████| 10/10 [07:46<00:00, 46.63s/it]


In [51]:
import numpy
print(np.mean([x[-1] for x in loss_histories]))
print(np.median([x[-1] for x in loss_histories]))

3.0902176022529604
2.956841826438904
