In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.models import resnet50, ResNet50_Weights
import scipy
import timm
import matplotlib.pyplot as plt
from tqdm.notebook import trange, tqdm
from torchinfo import summary
import timeit
import os
from prettytable import PrettyTable, SINGLE_BORDER

This notebook will demonstrate model compression techniques and export an iOS compatible coreML model to load into mobile devices

In [138]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


Code from https://pytorch.org/tutorials/beginner/blitz/cifar10_tutorial.html

In [139]:
batch_size=128

In [140]:
train_transform = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.4914, 0.4822, 0.4465], std=[0.2023, 0.1994, 0.2010])
    ])
    
val_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.4914, 0.4822, 0.4465], std=[0.2023, 0.1994, 0.2010])
])

In [None]:
# Dataloader

# Define the directory for the dataset
data_dir = "data"

# Create the directory if it doesn't exist
if not os.path.exists(data_dir):
    os.makedirs(data_dir)

training_data = datasets.CIFAR10(
    root="data",
    train=True,
    download=True,
    transform=train_transform
)

testing_data = datasets.CIFAR10(
    root="data",
    train=False,
    download=True,
    transform=val_transform
)


train_data_loader  = torch.utils.data.DataLoader(training_data,
                                          batch_size=batch_size,
                                          shuffle=True,
                                          num_workers=2)

test_data_loader  = torch.utils.data.DataLoader(testing_data,
                                          batch_size=batch_size,
                                          shuffle=True,
                                          num_workers=2)


classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

Files already downloaded and verified
Files already downloaded and verified


In [142]:
epochs = 5

In [143]:
criterion = nn.CrossEntropyLoss()

## Teacher Model

Resnet50 (Modified for CIFAR10)

In [144]:
class ResNet50SmallPretrained(nn.Module):
    def __init__(self, num_classes=10):
        super(ResNet50SmallPretrained, self).__init__()
        # Load pretrained ResNet50
        resnet = resnet50(weights=ResNet50_Weights.DEFAULT)
        
        # Method 1: Modify first convolution layer
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        
        # Copy weights from pretrained model with adaptation
        with torch.no_grad():
            # Adapt the weights by averaging over the original kernel size
            original_weights = resnet.conv1.weight
            new_weights = torch.mean(original_weights.view(64, 3, 7*7), dim=2).view(64, 3, 1, 1)
            self.conv1.weight = nn.Parameter(new_weights)
        
        # Remove the original first maxpool layer as it's too aggressive for small images
        self.bn1 = resnet.bn1
        self.relu = resnet.relu
        
        # Keep the rest of the architecture
        self.layer1 = resnet.layer1
        self.layer2 = resnet.layer2
        self.layer3 = resnet.layer3
        self.layer4 = resnet.layer4
        
        # Adjust the final layers
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(2048, num_classes)
    
    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        
        # No maxpool
        
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        
        return x

In [145]:
teacher_model = ResNet50SmallPretrained().to(device)

In [171]:
summary(teacher_model, input_size=(batch_size, 3, 32, 32))

Layer (type:depth-idx)                   Output Shape              Param #
ResNet50SmallPretrained                  [128, 10]                 --
├─Conv2d: 1-1                            [128, 64, 34, 34]         192
├─BatchNorm2d: 1-2                       [128, 64, 34, 34]         128
├─ReLU: 1-3                              [128, 64, 34, 34]         --
├─Sequential: 1-4                        [128, 256, 34, 34]        --
│    └─Bottleneck: 2-1                   [128, 256, 34, 34]        --
│    │    └─Conv2d: 3-1                  [128, 64, 34, 34]         4,096
│    │    └─BatchNorm2d: 3-2             [128, 64, 34, 34]         128
│    │    └─ReLU: 3-3                    [128, 64, 34, 34]         --
│    │    └─Conv2d: 3-4                  [128, 64, 34, 34]         36,864
│    │    └─BatchNorm2d: 3-5             [128, 64, 34, 34]         128
│    │    └─ReLU: 3-6                    [128, 64, 34, 34]         --
│    │    └─Conv2d: 3-7                  [128, 256, 34, 34]        16,384


In [146]:
# optimizer = optim.SGD(teacher_model.parameters(), lr=0.001, momentum=0.9)
optimizer = optim.Adam(teacher_model.parameters())

In [147]:
for epoch in range(epochs):  # loop over the dataset multiple times

    running_loss = 0.0
    for i, data in enumerate(tqdm(train_data_loader), 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device) # send to cuda

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = teacher_model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        # if i % 1000 == 999:    # print every 100 mini-batches
        #     print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 1000:.3f}')
        #     running_loss = 0.0

    # Calculate average loss for the epoch
    avg_loss = running_loss / len(train_data_loader)
    # Print average loss for the epoch
    print(f'Epoch [{epoch + 1}/{epochs}], Average Loss: {avg_loss:.4f}')

print('Finished Training')

  0%|          | 0/391 [00:00<?, ?it/s]

Epoch [1/5], Average Loss: 0.4998


  0%|          | 0/391 [00:00<?, ?it/s]

Epoch [2/5], Average Loss: 0.2772


  0%|          | 0/391 [00:00<?, ?it/s]

Epoch [3/5], Average Loss: 0.2187


  0%|          | 0/391 [00:00<?, ?it/s]

Epoch [4/5], Average Loss: 0.1884


  0%|          | 0/391 [00:00<?, ?it/s]

Epoch [5/5], Average Loss: 0.1665
Finished Training


## Student Model

In [148]:
class ResNet50Smaller(nn.Module):
    def __init__(self, num_classes=10):
        super(ResNet50Smaller, self).__init__()
        # Load pretrained ResNet50
        resnet = resnet50()
        
        # Method 1: Modify first convolution layer
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        
        
        # Remove the original first maxpool layer as it's too aggressive for small images
        self.bn1 = resnet.bn1
        self.relu = resnet.relu
        
        # Keep the rest of the architecture
        self.layer1 = resnet.layer1
        self.layer2 = resnet.layer2
        self.layer3 = resnet.layer3
        # Cut layer 4 from original resnet50
        # self.layer4 = resnet.layer4
        
        # Adjust the final layers
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(1024, num_classes)
    
    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        
        # No maxpool
        
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        # x = self.layer4(x)
        
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        
        return x

In [149]:
# Load Student Models
# One instance without Knowledge Distillation
# One instance for Knowledge Distillation
# For comparison of effectiveness of KD
student_model_noKD = ResNet50Smaller().to(device)
student_model_KD = ResNet50Smaller().to(device)

In [150]:
summary(student_model_noKD, input_size=(batch_size, 3, 32, 32))

Layer (type:depth-idx)                   Output Shape              Param #
ResNet50Smaller                          [128, 10]                 --
├─Conv2d: 1-1                            [128, 64, 32, 32]         1,728
├─BatchNorm2d: 1-2                       [128, 64, 32, 32]         128
├─ReLU: 1-3                              [128, 64, 32, 32]         --
├─Sequential: 1-4                        [128, 256, 32, 32]        --
│    └─Bottleneck: 2-1                   [128, 256, 32, 32]        --
│    │    └─Conv2d: 3-1                  [128, 64, 32, 32]         4,096
│    │    └─BatchNorm2d: 3-2             [128, 64, 32, 32]         128
│    │    └─ReLU: 3-3                    [128, 64, 32, 32]         --
│    │    └─Conv2d: 3-4                  [128, 64, 32, 32]         36,864
│    │    └─BatchNorm2d: 3-5             [128, 64, 32, 32]         128
│    │    └─ReLU: 3-6                    [128, 64, 32, 32]         --
│    │    └─Conv2d: 3-7                  [128, 256, 32, 32]        16,38

Regular Student Training (on Dataset)

In [151]:
# optimizer = optim.SGD(student_model_noKD.parameters(), lr=0.001, momentum=0.9)
optimizer = optim.Adam(student_model_noKD.parameters())

In [152]:
for epoch in range(epochs):  # loop over the dataset multiple times

    running_loss = 0.0
    for i, data in enumerate(tqdm(train_data_loader), 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device) # send to cuda

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = student_model_noKD(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()

    # Calculate average loss for the epoch
    avg_loss = running_loss / len(train_data_loader)
    # Print average loss for the epoch
    print(f'Epoch [{epoch + 1}/{epochs}], Average Loss: {avg_loss:.4f}')

print('Finished Training')

  0%|          | 0/391 [00:00<?, ?it/s]

Epoch [1/5], Average Loss: 1.4047


  0%|          | 0/391 [00:00<?, ?it/s]

Epoch [2/5], Average Loss: 0.9068


  0%|          | 0/391 [00:00<?, ?it/s]

Epoch [3/5], Average Loss: 0.7048


  0%|          | 0/391 [00:00<?, ?it/s]

Epoch [4/5], Average Loss: 0.5997


  0%|          | 0/391 [00:00<?, ?it/s]

Epoch [5/5], Average Loss: 0.5245
Finished Training


Student Training from Teacher

Some code taken from: https://pytorch.org/tutorials/beginner/knowledge_distillation_tutorial.html

In [153]:
# optimizer = optim.SGD(student_model_KD.parameters(), lr=0.001, momentum=0.9)
optimizer = optim.Adam(student_model_KD.parameters())

In [154]:
# Set teacher model to evaluation mode to not mess with gradients of teacher model
teacher = teacher_model.eval()

student_model_KD.train() # Student to train mode

# Apply ``train_knowledge_distillation`` with a temperature of 2. Arbitrarily set the weights to 0.75 for CE and 0.25 for distillation loss.
temperature = 2
soft_target_loss_weight = 0.25
ce_loss_weight = 0.75

for epoch in range(epochs):  # loop over the dataset multiple times

    running_loss = 0.0
    for i, data in enumerate(tqdm(train_data_loader), 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device) # send to cuda

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        student_logits = student_model_KD(inputs)

        with torch.no_grad():
                teacher_logits = teacher(inputs)
        
        #Soften the student logits by applying softmax first and log() second
        soft_targets = nn.functional.softmax(teacher_logits / temperature, dim=-1)
        soft_prob = nn.functional.log_softmax(student_logits / temperature, dim=-1)

        # Calculate the soft targets loss. Scaled by temperature**2 as suggested by the authors of the paper "Distilling the knowledge in a neural network"
        soft_targets_loss = torch.sum(soft_targets * (soft_targets.log() - soft_prob)) / soft_prob.size()[0] * (temperature**2)

        # Calculate the true label loss
        label_loss = criterion(student_logits, labels)

        # Weighted sum of the two losses
        loss = (soft_target_loss_weight * soft_targets_loss) + (ce_loss_weight * label_loss)

        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
    
    # Calculate average loss for the epoch
    avg_loss = running_loss / len(train_data_loader)
    # Print average loss for the epoch
    print(f'Epoch [{epoch + 1}/{epochs}], Average Loss: {avg_loss:.4f}')

print('Finished Training')

  0%|          | 0/391 [00:00<?, ?it/s]

Epoch [1/5], Average Loss: 2.0814


  0%|          | 0/391 [00:00<?, ?it/s]

Epoch [2/5], Average Loss: 1.2638


  0%|          | 0/391 [00:00<?, ?it/s]

Epoch [3/5], Average Loss: 0.9330


  0%|          | 0/391 [00:00<?, ?it/s]

Epoch [4/5], Average Loss: 0.7500


  0%|          | 0/391 [00:00<?, ?it/s]

Epoch [5/5], Average Loss: 0.6369
Finished Training


## Metrics

In [220]:
def evaluate_model(model, model_name, data_loader, testing_mode=False):
    correct = 0
    total = 0
    # since we're not training, we don't need to calculate the gradients for our outputs
    with torch.no_grad():
        for data in data_loader:
            images, labels = data
            images = images.to(device)
            labels = labels.to(device)
            # calculate outputs by running images through the network
            outputs = model(images)
            # the class with the highest energy is what we choose as prediction
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    if not testing_mode:
        return correct / total
        # print(f'Accuracy of {model_name} on the 10000 test images: {100 * correct / total:.3f} %')

Accuracy Testing

In [221]:
teacher_acc = evaluate_model(teacher_model, "teacher model", test_data_loader)

In [222]:
student_noKD_acc = evaluate_model(student_model_noKD, "student model with no Knowledge Distillation", test_data_loader)

In [223]:
student_KD_acc = evaluate_model(student_model_KD, "student model with Knowledge Distillation", test_data_loader)

In [225]:
teacher_acc_percent = teacher_acc * 100
student_noKD_acc_percent = student_noKD_acc * 100
student_KD_acc_percent = student_KD_acc * 100

teacher_to_student_noKD_acc = ((teacher_acc - student_noKD_acc) / teacher_acc) * 100
teacher_to_student_KD_acc = ((teacher_acc - student_KD_acc) / teacher_acc) * 100

# Create a PrettyTable object
acc_table = PrettyTable()
acc_table.set_style(SINGLE_BORDER)

# Define the columns
acc_table.field_names = ["Model", "Accuracy", "% Decrease from Teacher"]
acc_table.add_row(["Teacher Model", f"{teacher_acc_percent:.2f} %", "-"])
acc_table.add_row(["Student Model (No KD)", f"{student_noKD_acc_percent:.2f} %", f"{teacher_to_student_noKD_acc:.2f}%"])
acc_table.add_row(["Student Model (KD)", f"{student_KD_acc_percent:.2f} %", f"{teacher_to_student_KD_acc:.2f}%"])

# Print the table
print(acc_table)

┌───────────────────────┬──────────┬─────────────────────────┐
│         Model         │ Accuracy │ % Decrease from Teacher │
├───────────────────────┼──────────┼─────────────────────────┤
│     Teacher Model     │ 92.03 %  │            -            │
│ Student Model (No KD) │ 79.79 %  │          13.30%         │
│   Student Model (KD)  │ 82.23 %  │          10.65%         │
└───────────────────────┴──────────┴─────────────────────────┘


Evaluation Speed Testing

In [214]:
num_runs = 5

In [None]:
time_teacher = timeit.timeit(lambda: evaluate_model(teacher_model, "teacher model", test_data_loader, testing_mode=True), number=num_runs)

In [None]:
time_student_noKD = timeit.timeit(lambda: evaluate_model(student_model_noKD, "student model with no Knowledge Distillation", test_data_loader, testing_mode=True), number=num_runs)

In [None]:
time_student_KD = timeit.timeit(lambda: evaluate_model(student_model_KD, "student model with Knowledge Distillation", test_data_loader, testing_mode=True), number=num_runs)

In [227]:
teacher_to_student_noKD_time = ((time_teacher - time_student_noKD) / time_teacher) * 100
teacher_to_student_KD_time = ((time_teacher - time_student_KD) / time_teacher) * 100

# Create a PrettyTable object
speed_table = PrettyTable()
speed_table.set_style(SINGLE_BORDER)

# Define the columns
speed_table.field_names = ["Model", f"Time Averaged over {num_runs} runs (seconds)", "% Decrease from Teacher"]
speed_table.add_row(["Teacher Model", f"{time_teacher:.2f}", "-"])
speed_table.add_row(["Student Model (No KD)", f"{time_student_noKD:.2f}", f"{teacher_to_student_noKD_time:.2f}%"])
speed_table.add_row(["Student Model (KD)", f"{time_student_KD:.2f}", f"{teacher_to_student_KD_time:.2f}%"])

# Print the table
print(speed_table)

┌───────────────────────┬─────────────────────────────────────┬─────────────────────────┐
│         Model         │ Time Averaged over 5 runs (seconds) │ % Decrease from Teacher │
├───────────────────────┼─────────────────────────────────────┼─────────────────────────┤
│     Teacher Model     │                18.63                │            -            │
│ Student Model (No KD) │                14.17                │          23.95%         │
│   Student Model (KD)  │                13.69                │          26.54%         │
└───────────────────────┴─────────────────────────────────────┴─────────────────────────┘


The knowledge distilled smaller model is faster than all the models and more accurate than the non knowledge distilled smaller model that was trained regularly using the training dataset

## Exporting Knowledge Distilled Model into CoreML model for iOS

Following Pytorch docs from: https://pytorch.org/executorch/stable/getting-started-setup.html