<a href="https://colab.research.google.com/github/Jayesh-CSE/Data-independent-pruning-using-Coresets/blob/main/Data_independent_pruning_MLP_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
# import torch.nn.utils.prune as prune
from typing import Callable, Tuple, Union
import sys
from matplotlib import pyplot as plt

In [None]:
#cd /content/drive/MyDrive/Data Independent Pruning Coreset

In [None]:
batch_size = 128

transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.13066), (0.30810))])

trainset = torchvision.datasets.MNIST(root='/content/drive/MyDrive/Data Independent Pruning Coreset/data', train=True,
                                        download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,
                                          shuffle=False, num_workers=2)

testset = torchvision.datasets.MNIST(root='/content/drive/MyDrive/Data Independent Pruning Coreset/data', train=False,
                                       download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
                                         shuffle=False, num_workers=2)

classes = [str(i) for i in range(10)]

In [None]:
class Net(nn.Module):
    def __init__(self, in_size=28*28, num_n1=10000, num_n2 = 5000, cache_activation=False):
        super().__init__()
        self.fc1 = nn.Linear(in_size, num_n1)
        self.fc2 = nn.Linear(num_n1, num_n2)
        self.fc3 = nn.Linear(num_n2, 10)
        self.cache_activation = cache_activation

    def forward(self, x):
        x0 = x.view(x.size(0), -1)
        x1 = F.relu(self.fc1(x0))
        x2 = F.relu(self.fc2(x1))
        x3 = self.fc3(x2)
        
        if self.cache_activation:
            return x1, x2, x3
        else:
            return x3

In [None]:
def train(epochs, lr=0.01):
    
    #criterion = nn.CrossEntropyLoss().cuda()
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)
    
    for ep in range(epochs):
        for (x, y) in list(trainloader):
            
            optimizer.zero_grad()
           #output = model(x.cuda(0, non_blocking=True))
            output = model(x)
            if model.cache_activation:
                output = output[-1]
            #ls = criterion(output, y.cuda(0, non_blocking=True))
            ls = criterion(output, y)
            ls.backward()
            optimizer.step()
            
        print(ep+1, validate(testloader, model))

In [None]:
def validate(loader, model):
    
    model.eval()
    acc = 0
    n = 0
    
    with torch.no_grad():
        for i, (inp, target) in enumerate(loader):
            #output = model(inp.cuda(non_blocking=True))
            output = model(inp)
            if model.cache_activation:
                output = output[-1]
            _, pred = torch.max(output, 1)
            #acc += (pred == target.cuda(non_blocking=True)).sum().item()
            acc += (pred == target).sum().item()
            n += len(target)
            
    acc = acc/n
    return acc

In [None]:
num_n1 = int(1e4)
num_n2 = int(5e3)
model = Net(28*28, num_n1, num_n2, True)
#train(10, 0.01)

# torch.cuda.set_device()
# torch.save(model.state_dict(), '/content/drive/MyDrive/Data Independent Pruning Coreset/v_fc_{}_{}'.format(num_n1, num_n2))
model.load_state_dict(torch.load('/content/drive/MyDrive/Data Independent Pruning Coreset/v_fc_{}_{}'.format(num_n1, num_n2)))
#model = model.cuda()

<All keys matched successfully>

In [None]:
torch.save(model.state_dict(), '/content/drive/MyDrive/Data Independent Pruning Coreset/v_fc_{}_{}'.format(num_n1, num_n2))

In [None]:
print('Epoch', 0, 'Neurons', num_n1, num_n2)
print('Test acc', validate(testloader, model))

Epoch 0 Neurons 10000 5000
Test acc 0.982


In [None]:
print(model)

Net(
  (fc1): Linear(in_features=784, out_features=10000, bias=True)
  (fc2): Linear(in_features=10000, out_features=5000, bias=True)
  (fc3): Linear(in_features=5000, out_features=10, bias=True)
)


In [None]:
class Coreset:
    def __init__(self, points, weights, activation_function: Callable, upper_bound: int = 1):
        assert points.shape[0] == weights.shape[0]

        self.__points = points.cpu()
        self.__weights = weights.cpu()
        self.__activation = activation_function
        self.__beta = upper_bound
        self.__sensitivity = None
        self.indices = None

    @property
    def sensitivity(self):
        if self.__sensitivity is None:
            points_norm = self.__points.norm(dim=1)
            assert points_norm.shape[0] == self.__points.shape[0]
            weights = torch.abs(self.__weights).max(dim=1)[0]  # max returns (values, indices)
            assert weights.shape[0] == self.__points.shape[0]
            #print(len(self.__beta))
            #print(points_norm.shape)
            #print(weights.shape)
            self.__sensitivity = weights * torch.abs(self.__activation(self.__beta * points_norm))
            self.__sensitivity /= self.__sensitivity.sum()

        return self.__sensitivity

    def compute_coreset(self, coreset_size):
        assert coreset_size <= self.__points.shape[0]
        prob = self.sensitivity.cpu().detach().numpy()

        indices = set()
        idxs = []

        cnt = 0
        while len(indices) < coreset_size:
            i = np.random.choice(a=self.__points.shape[0], size=1, p=prob).tolist()[0]
            idxs.append(i)
            indices.add(i)
            cnt += 1

        hist = np.histogram(idxs, bins=range(self.__points.shape[0] + 1))[0].flatten()
        idxs = np.nonzero(hist)[0]
        self.indices = idxs
        coreset = self.__points[idxs, :]

        weights = (self.__weights[idxs].t() * torch.tensor(hist[idxs]).float()).t()
        weights = (weights.t() / (torch.tensor(prob[idxs]) * cnt)).t()

        return coreset, weights

In [None]:
def compress_fc_layer(layer1: Tuple[torch.Tensor, torch.Tensor],
                      layer2: Tuple[torch.Tensor, torch.Tensor],
                      compressed_size,
                      activation: Callable,
                      upper_bound,
                      device,
                      compression_type):
    num_neurons = layer1[1].shape[0]
    if compression_type == "Coreset":
        points = np.concatenate(
            (layer1[0].cpu().detach().numpy(), layer1[1].view(num_neurons, 1).cpu().detach().numpy()),
            axis=1)
        points = torch.tensor(points)
        weights = layer2[0].t()
        coreset = Coreset(points=points, weights=weights, activation_function=activation, upper_bound=upper_bound)
        points, weights = coreset.compute_coreset(compressed_size)
        indices = coreset.indices
        layer1 = (points[:, :-1].to(device), points[:, 1].to(device))
        weights = weights.t()
        layer2 = (weights.to(device), layer2[1].to(device))
    elif compression_type == "Uniform":
        indices = np.random.choice(num_neurons, size=compressed_size, replace=False)
        layer1 = (layer1[0][indices, :], layer1[1][indices])
        layer2 = (layer2[0][:, indices], layer2[1])
    elif compression_type == "Top-K":
        indices = torch.topk(torch.norm(layer1[0], dim=1), k=compressed_size)[1]
        layer1 = (layer1[0][indices, :], layer1[1][indices])
        layer2 = (layer2[0][:, indices], layer2[1])
    else:
        sys.exit("There is not a compression type: {}".format(compression_type))

    return layer1, layer2, indices

In [None]:
def relu(X):
   return np.maximum(0,X)

In [None]:
relu(torch.tensor([4,3,-2]))

tensor([4, 3, 0])

In [None]:
layer1 = tuple(model.fc1.parameters())
layer2 = tuple(model.fc2.parameters())

In [None]:
from torch import linalg as LA

data_norm = []

for (x, y) in list(trainset):
    data_norm.append(LA.vector_norm(x).int())

In [None]:
#len(data_norm)
#data_norm
#type(data_norm)
max(data_norm)

tensor(48, dtype=torch.int32)

In [None]:
num_core_n = 1000
beta = 1

l1, l2, ind = compress_fc_layer(layer1, layer2, num_core_n , relu, beta , "cpu", "Coreset")

In [None]:
[len(a) for a in l2]

[5000, 5000]

In [None]:
model.fc1 = nn.Linear(l1[0].shape[1], l1[0].shape[0])
model.fc2 = nn.Linear(l2[0].shape[1], l2[0].shape[0])

with torch.no_grad():
    model.fc1.weight.copy_(l1[0])
    model.fc1.bias.copy_(l1[1])
    model.fc2.weight.copy_(l2[0])
    model.fc2.bias.copy_(l2[1])

In [None]:
print(model)

Net(
  (fc1): Linear(in_features=784, out_features=1000, bias=True)
  (fc2): Linear(in_features=1000, out_features=5000, bias=True)
  (fc3): Linear(in_features=5000, out_features=10, bias=True)
)


In [None]:
train(5, 0.01)

1 0.9681
2 0.9688
3 0.9796
4 0.9809
5 0.9818


In [None]:
print('Epoch', 1 , 'Neurons', num_core_n, num_n2)
print('Test acc', validate(testloader, model))

Epoch 1 Neurons 1000 5000
Test acc 0.9818


In [None]:
print(model)

Net(
  (fc1): Linear(in_features=784, out_features=1000, bias=True)
  (fc2): Linear(in_features=1000, out_features=5000, bias=True)
  (fc3): Linear(in_features=5000, out_features=10, bias=True)
)


#Layer 2 pruning as per layer 3

In [None]:
layer2 = tuple(model.fc2.parameters())
layer3 = tuple(model.fc3.parameters())

In [None]:
num_core_n = 500
beta = 1

l1, l2, ind = compress_fc_layer(layer2, layer3, num_core_n , relu, beta , "cpu", "Coreset")

In [None]:
[len(a) for a in l1]

[500, 500]

In [None]:
model.fc2 = nn.Linear(l1[0].shape[1], l1[0].shape[0])
model.fc3 = nn.Linear(l2[0].shape[1], l2[0].shape[0])

with torch.no_grad():
    model.fc2.weight.copy_(l1[0])
    model.fc2.bias.copy_(l1[1])
    model.fc3.weight.copy_(l2[0])
    model.fc3.bias.copy_(l2[1])

In [None]:
train(5, 0.01)

1 0.9662
2 0.9778
3 0.9792
4 0.9826
5 0.9839


In [None]:
print('Epoch', 1 , 'Neurons', 1000, 500)
print('Test acc', validate(testloader, model))

Epoch 1 Neurons 1000 500
Test acc 0.9839


In [None]:
print(model)

Net(
  (fc1): Linear(in_features=784, out_features=1000, bias=True)
  (fc2): Linear(in_features=1000, out_features=500, bias=True)
  (fc3): Linear(in_features=500, out_features=10, bias=True)
)
