<a href="https://colab.research.google.com/github/MuhammadUmairHaider/Neural-Network-Pruning-Through-Constrained-Reinforcement-Learning/blob/main/PyTorch_Compression.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
from torchvision import models
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import torch.optim as optim

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
classes = ('plane', 'car' , 'bird',
    'cat', 'deer', 'dog',
    'frog', 'horse', 'ship', 'truck')

cuda


In [None]:
transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

batch_size = 200

trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,
                                          shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                       download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
                                         shuffle=False, num_workers=2)

classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


  0%|          | 0/170498071 [00:00<?, ?it/s]

Extracting ./data/cifar-10-python.tar.gz to ./data
Files already downloaded and verified


In [None]:
class Gates(nn.Module):
    def __init__(self, size):
        super().__init__()
        self.size = size
        filter = torch.ones(size)#(1,size,1,1)
        self.filter = nn.Parameter(filter)
        self.epsilon = 0.01
    def forward(self, x):
        g = self.filter**2/(self.filter**2+self.epsilon)
        return x*torch.reshape(g,(1,self.size,1,1))#torch.reshape(self.filter,(1,self.size,1,1))
    def get_g(self):
        return self.filter**2/(self.filter**2+self.epsilon)
    def get_epsilon(self):
        return self.get_epsilon
    def set_epsilon(self, e):
        self.epsilon = e        

In [None]:
def conv_layer(chann_in, chann_out, k_size, p_size):
    layer = nn.Sequential(
        nn.Conv2d(chann_in, chann_out, kernel_size=k_size, padding=p_size),
        nn.BatchNorm2d(chann_out),
        nn.ReLU()
    )
    return layer

def vgg_conv_block(in_list, out_list, k_list, p_list, pooling_k, pooling_s):

    layers = [ conv_layer(in_list[i], out_list[i], k_list[i], p_list[i]) for i in range(len(in_list)) ]
    layers += [ nn.MaxPool2d(kernel_size = pooling_k, stride = pooling_s)]
    return nn.Sequential(*layers)

def vgg_fc_layer(size_in, size_out):
    layer = nn.Sequential(
        nn.Linear(size_in, size_out),
        nn.BatchNorm1d(size_out),
        nn.ReLU()
    )
    return layer

class VGG16(nn.Module):
    def __init__(self, n_classes=1000):
        super(VGG16, self).__init__()

        # Conv blocks (BatchNorm + ReLU activation added in each block)
        self.layer1 = vgg_conv_block([3,64], [64,64], [3,3], [1,1], 2, 2)
        self.layer2 = vgg_conv_block([64,128], [128,128], [3,3], [1,1], 2, 2)
        self.layer3 = vgg_conv_block([128,256,256], [256,256,256], [3,3,3], [1,1,1], 2, 2)
        self.layer4 = vgg_conv_block([256,512,512], [512,512,512], [3,3,3], [1,1,1], 2, 2)
        self.layer5 = vgg_conv_block([512,512,512], [512,512,512], [3,3,3], [1,1,1], 2, 2)

        # FC layers
        self.layer6 = vgg_fc_layer(2048, 4096)
        self.layer7 = vgg_fc_layer(4096, 4096)

        # Final layer
        self.layer8 = nn.Linear(4096, n_classes)

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        vgg16_features = self.layer5(out)
        out = vgg16_features.view(out.size(0), -1)
        out = self.layer6(out)
        out = self.layer7(out)
        out = self.layer8(out)

        return vgg16_features, out

      
net = VGG16(n_classes=10)
net = net.to(device)

In [None]:
class VGG16(nn.Module):
    def __init__(self):
        super(VGG16, self).__init__()
        self.conv1_1 = nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, padding=1)
        self.gate1_1 = Gates(64)
        self.conv1_2 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, padding=1)
        self.gate1_2 = Gates(64)

        self.conv2_1 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.gate2_1 = Gates(128)
        self.conv2_2 = nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, padding=1)
        self.gate2_2 = Gates(128)

        self.conv3_1 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1)
        self.gate3_1 = Gates(256)
        self.conv3_2 = nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, padding=1)
        self.gate3_2 = Gates(256)
        self.conv3_3 = nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, padding=1)
        self.gate3_3 = Gates(256)

        self.conv4_1 = nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, padding=1)
        self.gate4_1 = Gates(512)
        self.conv4_2 = nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1)
        self.gate4_2 = Gates(512)
        self.conv4_3 = nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1)
        self.gate4_3 = Gates(512)

        self.conv5_1 = nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1)
        self.gate5_1 = Gates(512)
        self.conv5_2 = nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1)
        self.gate5_2 = Gates(512)
        self.conv5_3 = nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1)
        self.gate5_3 = Gates(512)

        self.maxpool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(512*4*4, 4096)
        self.fc2 = nn.Linear(4096, 4096)
        self.fc3 = nn.Linear(4096, 10)

    def forward(self, x):
        # print(x.shape)
        x = F.relu(self.conv1_1(x))
        # x = self.gate1_1(x)
        x = F.relu(self.conv1_2(x))
        # x = self.gate1_2(x)
        #x = self.maxpool(x)
        x = F.relu(self.conv2_1(x))
        # x = self.gate2_1(x)
        x = F.relu(self.conv2_2(x))
        # x = self.gate2_2(x)
        #x = self.maxpool(x)
        x = F.relu(self.conv3_1(x))
        # x = self.gate3_1(x)
        x = F.relu(self.conv3_2(x))
        # x = self.gate3_2(x)
        x = F.relu(self.conv3_3(x))
        # x = self.gate3_3(x)
        x = self.maxpool(x)
        x = F.relu(self.conv4_1(x))
        # x = self.gate4_1(x)
        x = F.relu(self.conv4_2(x))
        # x = self.gate4_2(x)
        x = F.relu(self.conv4_3(x))
        # x = self.gate4_3(x)
        x = self.maxpool(x)
        x = F.relu(self.conv5_1(x))
        # x = self.gate5_1(x)
        x = F.relu(self.conv5_2(x))
        # x = self.gate5_2(x)
        x = F.relu(self.conv5_3(x))
        # x = self.gate5_3(x)
        x = self.maxpool(x)
        #print(x.shape)
        x = self.flatten(x)
        #print(x.shape)
        x = F.relu(self.fc1(x))
        #x = F.dropout(x, 0.5) #dropout was included to combat overfitting
        x = F.relu(self.fc2(x))
        #x = F.dropout(x, 0.5)
        x = self.fc3(x)
        return x
net = VGG16()
net = net.to(device)

In [None]:
net = torch.hub.load('pytorch/vision:v0.10.0', 'vgg16', pretrained=True)
net = net.to(device)

Downloading: "https://github.com/pytorch/vision/archive/v0.10.0.zip" to /root/.cache/torch/hub/v0.10.0.zip
Downloading: "https://download.pytorch.org/models/vgg16-397923af.pth" to /root/.cache/torch/hub/checkpoints/vgg16-397923af.pth


  0%|          | 0.00/528M [00:00<?, ?B/s]

In [None]:
gates_group = []
rem_group = []
for m in net.modules():
    if isinstance(m, Gates):
        gates_group.append(m.filter)
    elif(isinstance(m, nn.Conv2d) ):
        print(m.out_channels)
        rem_group.append(m.weight)
        if m.bias is not None:
            rem_group.append(m.bias)

64
64
128
128
256
256
256
512
512
512
512
512
512


In [None]:
rem_group

In [None]:
criterion = nn.CrossEntropyLoss()
#optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)
optimizer = optim.SGD(
    [
        {"params": gates_group, "lr": 0.01},
        {"params": rem_group, "lr": 0.01},
    ]
)

In [None]:
#net = vgg16
l1_crit = nn.L1Loss(size_average=False)
for epoch in range(2):  # loop over the dataset multiple times
    correct = 0
    running_loss = 0.0
    for i, data in enumerate(trainloader, 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data
        inputs = inputs.to(device)
        labels = labels.to(device)
        # zero the parameter gradients
        optimizer.zero_grad()

        reg_loss = 0
        for m in net.modules():
            if isinstance(m, Gates):
                reg_loss += l1_crit(m.filter,torch.zeros_like(m.filter))

        # forward + backward + optimize
        outputs = net(inputs)
        
        loss = criterion(outputs, labels)#+ reg_loss
        _, predicted = torch.max(outputs.data, 1)
        # correct += (predicted == labels).sum().item()
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        if i % 200 == 199:    # print every 2000 mini-batches
            print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
            print("Accuracy : ",(correct*100/i))
            # print(gate1.get_g())
            # print(gate1.filter)
            running_loss = 0.0
        #break

print('Finished Training')



[1,   200] loss: 0.277
Accuracy :  0.0
[2,   200] loss: 0.100
Accuracy :  0.0
Finished Training


In [None]:
correct = 0
total = 0
# since we're not training, we don't need to calculate the gradients for our outputs
with torch.no_grad():
    for data in testloader:
        images, labels = data
        images = inputs.to(device)
        labels = labels.to(device)
        # calculate outputs by running images through the network
        outputs = net(images)
        # the class with the highest energy is what we choose as prediction
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f'Accuracy of the network on the 10000 test images: {100 * correct // total} %')

RuntimeError: ignored