# Inference Optimization for Convolutional Netwroks
### Part 1: Model fusion, quantization

reference: https://towardsdatascience.com/inference-optimization-for-convolutional-neural-networks-e63b51b0b519


In [None]:
# Import packages
from torch import nn
from torchsummary import summary
import torch
import os

### Notebook overview
- Create CNN model and the quantized version of the same model
- Compare difference in size and latency of two models
- Fuse several blocks into one
- Compare fused and quantized version with only fused version

### Create simple CNN

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()

        # Convolutional Block 1
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=20,kernel_size=(5, 5))
        self.relu1 = nn.ReLU()
        self.maxpool1 = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))

        # Convolutional  Block 2
        self.conv2 = nn.Conv2d(in_channels=20, out_channels=50, kernel_size=(5, 5))
        self.relu2 = nn.ReLU()
        self.maxpool2 = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))

        # Fully connected 1
        self.fc1 = nn.Linear(in_features=50*53*53, out_features=500)
        self.relu3 = nn.ReLU()

        # Fully connected 2
        self.fc2 = nn.Linear(in_features=500, out_features=10)
        self.Softmax = nn.Softmax(1)

    def forward(self, x):
        # pass the input through block 1
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.maxpool1(x)

        # pass the input through block 2
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.maxpool2(x)

        # flatten the output from the previous layer and pass it through fully connected 1
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = self.relu3(x)

        # pass the input through fully connected 2 and Softmax
        x = self.fc2(x)
        output = self.Softmax(x)
        return output

## Create quantized version of CNN

In [None]:
# changes in network

class NetQuant(nn.Module):
    def __init__(self):
        super(NetQuant, self).__init__()
        # Prepare for quanitzation
        self.quant = torch.quantization.QuantStub()

        # Convolutional Block 1
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=20,kernel_size=(5, 5))
        self.relu1 = nn.ReLU()
        self.maxpool1 = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))

        # Convolutional Block 2
        self.conv2 = nn.Conv2d(in_channels=20, out_channels=50, kernel_size=(5, 5))
        self.relu2 = nn.ReLU()
        self.maxpool2 = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))

        # Fully connected 1
        self.fc1 = nn.Linear(in_features=50*53*53, out_features=500)
        self.relu3 = nn.ReLU()

        # Fully connected 2
        self.fc2 = nn.Linear(in_features=500, out_features=10)
        self.Softmax = nn.Softmax(1)

        # Prepare for dequantization
        self.dequant = torch.quantization.DeQuantStub()


    def forward(self, x):

        x = self.quant(x)

        # pass the input through block 1
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.maxpool1(x)

        # pass the input through block 2
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.maxpool2(x)

        # flatten the output from the previous layer and pass it through fully connected 1
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = self.relu3(x)

        # pass the input through fully connected 2 and Softmax
        x = self.fc2(x)
        x = self.dequant(x)
        x = self.Softmax(x)

        return x

In [None]:
# Define original and quantized models and prepae for evaluation

net = Net()
net.eval()
net_quant = NetQuant()
net_quant.eval()

NetQuant(
  (quant): QuantStub()
  (conv1): Conv2d(3, 20, kernel_size=(5, 5), stride=(1, 1))
  (relu1): ReLU()
  (maxpool1): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(20, 50, kernel_size=(5, 5), stride=(1, 1))
  (relu2): ReLU()
  (maxpool2): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=140450, out_features=500, bias=True)
  (relu3): ReLU()
  (fc2): Linear(in_features=500, out_features=10, bias=True)
  (Softmax): Softmax(dim=1)
  (dequant): DeQuantStub()
)

In [None]:
# Prepare model quantization and convert to quantized version
net_quant.qconfig = torch.quantization.get_default_qconfig("fbgemm")
torch.backends.quantized.engine = "fbgemm"
net_quant = torch.quantization.prepare(net_quant.cpu(), inplace=False)
net_quant = torch.quantization.convert(net_quant, inplace=False)



### Check size

In [None]:
# Check model size
def print_model_size(mdl):
    torch.save(mdl.state_dict(), "tmp.pt")
    size = round(os.path.getsize("tmp.pt")/1e6)
    os.remove('tmp.pt')
    return size

net_size = print_model_size(net)
quant_size = print_model_size(net_quant)

print(f'Size without quantization: {net_size} MB \n Size with quantization: {quant_size} MB')
print(f'Size ratio: {round(net_size/quant_size, 2)}')

Size without quantization: 281 MB 
 Size with quantization: 70 MB
Size ratio: 4.01


So the size of the model without quantization is 4 times the size with quantization.

## Latency

In [None]:
# input for the model
inpp = torch.rand(32, 3, 224, 224)

# compare the performance
print("Floating point FP32")
%timeit net(inpp)

print("Quantized INT8")
%timeit net_quant(inpp)


Floating point FP32
1.42 s ± 193 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Quantized INT8
741 ms ± 13 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### Fusion

In [None]:
# Define original and quantized models and prepare for evaluation

net = Net()
print(net.eval())

net_quant = NetQuant()
print(net_quant.eval())

Net(
  (conv1): Conv2d(3, 20, kernel_size=(5, 5), stride=(1, 1))
  (relu1): ReLU()
  (maxpool1): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(20, 50, kernel_size=(5, 5), stride=(1, 1))
  (relu2): ReLU()
  (maxpool2): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=140450, out_features=500, bias=True)
  (relu3): ReLU()
  (fc2): Linear(in_features=500, out_features=10, bias=True)
  (Softmax): Softmax(dim=1)
)
NetQuant(
  (quant): QuantStub()
  (conv1): Conv2d(3, 20, kernel_size=(5, 5), stride=(1, 1))
  (relu1): ReLU()
  (maxpool1): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(20, 50, kernel_size=(5, 5), stride=(1, 1))
  (relu2): ReLU()
  (maxpool2): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=140450, out_features=500, bias=True)
  (relu3): Re

In [None]:
# Perpare blocks for the fusion

moduls_to_fuse =  [['conv1', 'relu1'],
                   ['conv2', 'relu2'],
                   ['fc1', 'relu3']]

net_quant_fused = torch.quantization.fuse_modules(net_quant, moduls_to_fuse)

net_fused = torch.quantization.fuse_modules(net, moduls_to_fuse)

In [None]:
# Prepare and quantize the model

net_quant_fused.qconfig = torch.quantization.get_default_qconfig("fbgemm")
torch.backends.quantized.engine = "fbgemm"
net_quant_fused = torch.quantization.prepare(net_quant_fused.cpu(), inplace=False)
net_quant_fused = torch.quantization.convert(net_quant_fused, inplace=False)

In [None]:
print("Fused and quantized model latency")
%timeit net_quant_fused(inpp)

print("Fused model latency")
%timeit net_fused(inpp)

Fused and quantized model latency
760 ms ± 79 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Fused model latency
1.45 s ± 220 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


## New PyTorch model

In [13]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score

In [2]:
# Define data transformations
transform_train = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomCrop(32, padding=4),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])

In [3]:
# Load CIFAR-10 dataset
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
trainloader = DataLoader(trainset, batch_size=64, shuffle=True, num_workers=4)

testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)
testloader = DataLoader(testset, batch_size=64, shuffle=False, num_workers=4)

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:04<00:00, 41925787.77it/s]


Extracting ./data/cifar-10-python.tar.gz to ./data




Files already downloaded and verified


In [10]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 64, kernel_size=5)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(64 * 14 * 14, 256)  # Adjust input size
        self.fc2 = nn.Linear(256, 10)

    def forward(self, x):
        x = self.pool(torch.relu(self.conv1(x)))
        x = x.view(-1, 64 * 14 * 14)  # Flattening the tensor
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

net = Net()


In [8]:
# # Define the neural network model
# class Net(nn.Module):
#     def __init__(self):
#         super(Net, self).__init__()
#         self.conv1 = nn.Conv2d(3, 64, kernel_size=5)
#         self.pool = nn.MaxPool2d(2, 2)
#         self.fc1 = nn.Linear(64 * 5 * 5, 256)
#         self.fc2 = nn.Linear(256, 10)

#     def forward(self, x):
#       x = self.pool(torch.relu(self.conv1(x)))
#       x = x.view(-1, 64 * 14 * 14)  # Adjust the size here
#       x = torch.relu(self.fc1(x))
#       x = self.fc2(x)
#       return x


    # def forward(self, x):
    #     x = self.pool(torch.relu(self.conv1(x)))
    #     # x = x.view(-1, 64 * 5 * 5)
    #     print(x.size())  # Add this line to check the size of x
    #     # x = x.view(-1, correct_size)

    #     x = torch.relu(self.fc1(x))
    #     x = self.fc2(x)
    #     return x

# net = Net()

In [11]:
# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

# Training the model
for epoch in range(10):  # Adjust the number of epochs as needed
    net.train()
    running_loss = 0.0
    for i, data in enumerate(trainloader, 0):
        inputs, labels = data
        optimizer.zero_grad()
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    print(f'Epoch {epoch+1}, Loss: {running_loss / len(trainloader)}')

Epoch 1, Loss: 1.9152058277593549
Epoch 2, Loss: 1.622684334702504
Epoch 3, Loss: 1.4952651908635484
Epoch 4, Loss: 1.4250439022050794
Epoch 5, Loss: 1.3720774700879441
Epoch 6, Loss: 1.3248352408409119
Epoch 7, Loss: 1.2803676947760765
Epoch 8, Loss: 1.2369671256645867
Epoch 9, Loss: 1.200069046919913
Epoch 10, Loss: 1.1693911524989722


In [21]:
# Evaluate the model on the test set
net.eval()
all_predictions = []
true_labels = []

with torch.no_grad():
    for data in testloader:
        images, labels = data
        outputs = net(images)
        _, predicted = torch.max(outputs, 1)
        all_predictions.extend(predicted.tolist())
        true_labels.extend(labels.tolist())

accuracy = accuracy_score(true_labels, all_predictions)
print(f'Test Accuracy: {accuracy * 100:.2f}%')




Test Accuracy: 62.35%


In [22]:
import torch

# Save the model to a file named 'original_model.pth'
torch.save(net.state_dict(), 'original_model.pth')

In [14]:
# Save the model as a TorchScript module
scripted_model = torch.jit.script(net)
scripted_model.save("optimized_model.pt")

# Load the saved TorchScript model for inference
loaded_model = torch.jit.load("optimized_model.pt")

In [15]:
# Evaluate the optimized model on the test set
loaded_model.eval()
all_predictions = []
true_labels = []

with torch.no_grad():
    for data in testloader:
        images, labels = data
        outputs = loaded_model(images)
        _, predicted = torch.max(outputs, 1)
        all_predictions.extend(predicted.tolist())
        true_labels.extend(labels.tolist())

accuracy = accuracy_score(true_labels, all_predictions)
print(f'Test Accuracy (Optimized Model): {accuracy * 100:.2f}%')




Test Accuracy (Optimized Model): 62.35%


In [17]:
import time
# net
# scripted_model

# Define the number of inference runs
num_runs = 100

# Measure inference time for the PyTorch model
pytorch_inference_times = []
for _ in range(num_runs):
    with torch.no_grad():
        start_time = time.time()
        outputs = net(images)  # Replace 'model' with your PyTorch model
        end_time = time.time()
        inference_time = end_time - start_time
        pytorch_inference_times.append(inference_time)

# Measure inference time for the TorchScript-optimized model
torchscript_inference_times = []
for _ in range(num_runs):
    with torch.no_grad():
        start_time = time.time()
        outputs = scripted_model(images)  # Replace 'scripted_model' with your TorchScript model
        end_time = time.time()
        inference_time = end_time - start_time
        torchscript_inference_times.append(inference_time)

# Calculate and print average inference times
avg_pytorch_inference_time = sum(pytorch_inference_times) / num_runs
avg_torchscript_inference_time = sum(torchscript_inference_times) / num_runs

print(f"Average PyTorch Inference Time: {avg_pytorch_inference_time:.5f} seconds")
print(f"Average TorchScript Inference Time: {avg_torchscript_inference_time:.5f} seconds")


Average PyTorch Inference Time: 0.01923 seconds
Average TorchScript Inference Time: 0.01789 seconds


In [18]:
from torchvision.datasets import CIFAR10

# Define data transformations for the CIFAR-10 dataset
transform = transforms.Compose([
    transforms.ToTensor(),  # Convert the image to a PyTorch tensor
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),  # Normalize the image
])

# Load the CIFAR-10 dataset (test set)
testset = CIFAR10(root='./data', train=False, download=True, transform=transform)

# Choose an index to select an image from the dataset
image_index = 0  # Change this to the index you want

# Get the selected image and its label
input_image, label = testset[image_index]

# Print the label (class) of the selected image
print(f"Label: {label}")

Files already downloaded and verified
Label: 3


In [23]:
import os

# Measure latency for the PyTorch model
with torch.no_grad():
    start_time = time.time()
    output = net(input_image)  # Replace 'model' with your PyTorch model
    end_time = time.time()
    latency_pytorch = end_time - start_time

# Measure latency for the TorchScript-optimized model
with torch.no_grad():
    start_time = time.time()
    output = scripted_model(input_image)  # Replace 'scripted_model' with your TorchScript model
    end_time = time.time()
    latency_torchscript = end_time - start_time

# Compare model file sizes
original_model_size = os.path.getsize('original_model.pth')  # Replace with the actual file path
torchscript_model_size = os.path.getsize('optimized_model.pt')  # Replace with the actual file path

print(f"Latency (PyTorch): {latency_pytorch:.5f} seconds")
print(f"Latency (TorchScript): {latency_torchscript:.5f} seconds")
print(f"Original Model Size: {original_model_size / (1024 * 1024):.2f} MB")
print(f"TorchScript Model Size: {torchscript_model_size / (1024 * 1024):.2f} MB")

Latency (PyTorch): 0.01892 seconds
Latency (TorchScript): 0.00291 seconds
Original Model Size: 12.28 MB
TorchScript Model Size: 12.29 MB
