<a href="https://colab.research.google.com/github/Raz0rGithub/CSIRE/blob/main/Image_Classification_ResNet50_x_Waste_Classification_Data_v2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Setup**: imports, seeds, & dataset downloading

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler
from torch.optim.lr_scheduler import _LRScheduler
import torch.utils.data as data

import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models

from sklearn import decomposition
from sklearn import manifold
from sklearn.metrics import confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import numpy as np

import copy
from collections import namedtuple
import os
import random
import shutil
import time

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f"Setup complete. Using torch {torch.__version__} ({torch.cuda.get_device_properties(0).name if torch.cuda.is_available() else 'CPU'})")

Setup complete. Using torch 1.12.0+cu113 (Tesla T4)


In [None]:
SEED = 1234

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

In [None]:
os.environ['KAGGLE_USERNAME'] = 'victorckaggle'
os.environ['KAGGLE_KEY'] = '9336857b8bc61c71ca7a7db2df7f364a'

!pip install kaggle
!kaggle datasets download sapal6/waste-classification-data-v2 --unzip

ROOT = '/content'

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Downloading waste-classification-data-v2.zip to /content
 97% 218M/226M [00:04<00:00, 44.6MB/s]
100% 226M/226M [00:04<00:00, 57.9MB/s]


# **Dataset**

In [None]:
data_dir = os.path.join(ROOT, 'DATASET')
train_dir = os.path.join(data_dir, 'TRAIN')
test_dir = os.path.join(data_dir, 'TEST')
images_dir = os.path.join(data_dir, 'TRAIN')

classes = os.listdir(images_dir)

In [None]:
# train_data = datasets.ImageFolder(root = train_dir, transform = transforms.ToTensor())

# means = torch.zeros(3)
# stds = torch.zeros(3)

# for img, label in train_data:
#     means += torch.mean(img, dim = (1,2))
#     stds += torch.std(img, dim = (1,2))

# means /= len(train_data)
# stds /= len(train_data)
    
# print(f'Calculated means: {means}')
# print(f'Calculated stds: {stds}')

Pretrained values

In [None]:
pretrained_size = 224
pretrained_means = [0.6892, 0.6416, 0.5667]
pretrained_stds= [0.2142, 0.2238, 0.2451]

train_transforms = transforms.Compose([
                           transforms.Resize(pretrained_size),
                           transforms.RandomRotation(5),
                           transforms.RandomHorizontalFlip(0.5),
                           transforms.RandomCrop(pretrained_size, padding = 10),
                           transforms.ToTensor(),
                           transforms.Normalize(mean = pretrained_means, 
                                                std = pretrained_stds)
                       ])

test_transforms = transforms.Compose([
                           transforms.Resize(pretrained_size),
                           transforms.CenterCrop(pretrained_size),
                           transforms.ToTensor(),
                           transforms.Normalize(mean = pretrained_means, 
                                                std = pretrained_stds)
                       ])

In [None]:
train_data = datasets.ImageFolder(root = train_dir, 
                                  transform = train_transforms)

test_data = datasets.ImageFolder(root = test_dir, 
                                 transform = test_transforms)

In [None]:
VALID_RATIO = 0.9

n_train_examples = int(len(train_data) * VALID_RATIO)
n_valid_examples = len(train_data) - n_train_examples

train_data, valid_data = data.random_split(train_data, 
                                           [n_train_examples, n_valid_examples])

In [None]:
valid_data = copy.deepcopy(valid_data)
valid_data.dataset.transform = test_transforms

In [None]:
print(f'Number of training examples: {len(train_data)}')
print(f'Number of validation examples: {len(valid_data)}')
print(f'Number of testing examples: {len(test_data)}')
total = len(test_data) + len(train_data) + len(valid_data)
print(f'Total: {total}')

Number of training examples: 20307
Number of validation examples: 2257
Number of testing examples: 2908
Total: 25472


In [None]:
BATCH_SIZE = 64

train_iterator = data.DataLoader(train_data, 
                                 shuffle = True, 
                                 batch_size = BATCH_SIZE)

valid_iterator = data.DataLoader(valid_data, 
                                 batch_size = BATCH_SIZE)

test_iterator = data.DataLoader(test_data, 
                                batch_size = BATCH_SIZE)

# Plot Images

In [None]:
def normalize_image(image):
    image_min = image.min()
    image_max = image.max()
    image.clamp_(min = image_min, max = image_max)
    image.add_(-image_min).div_(image_max - image_min + 1e-5)
    return image    

In [None]:
# def plot_images(images, labels, classes, normalize = True):

#     n_images = len(images)

#     rows = int(np.sqrt(n_images))
#     cols = int(np.sqrt(n_images))

#     fig = plt.figure(figsize = (15, 15))

#     for i in range(rows*cols):

#         ax = fig.add_subplot(rows, cols, i+1)
        
#         image = images[i]

#         if normalize:
#             image = normalize_image(image)

#         ax.imshow(image.permute(1, 2, 0).cpu().numpy())
#         label = classes[labels[i]]
#         ax.set_title(label)
#         ax.axis('off')

In [None]:
# N_IMAGES = 25

# images, labels = zip(*[(image, label) for image, label in 
#                            [train_data[i] for i in range(N_IMAGES)]])

# classes = test_data.classes

# plot_images(images, labels, classes)

In [None]:
# REFORMATING
# def format_label(label):
#     label = label.split('.')[-1]
#     label = label.replace('_', ' ')
#     label = label.title()
#     label = label.replace(' ', '')
#     return label

# test_data.classes = [format_label(c) for c in test_data.classes]

# classes = test_data.classes

# plot_images(images, labels, classes)

# Architechture


In [None]:
class ResNet(nn.Module):
    def __init__(self, config, output_dim):
        super().__init__()
                
        block, n_blocks, channels = config
        self.in_channels = channels[0]
            
        assert len(n_blocks) == len(channels) == 4
        
        self.conv1 = nn.Conv2d(3, self.in_channels, kernel_size = 7, stride = 2, padding = 3, bias = False)
        self.bn1 = nn.BatchNorm2d(self.in_channels)
        self.relu = nn.ReLU(inplace = True)
        self.maxpool = nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)
        
        self.layer1 = self.get_resnet_layer(block, n_blocks[0], channels[0])
        self.layer2 = self.get_resnet_layer(block, n_blocks[1], channels[1], stride = 2)
        self.layer3 = self.get_resnet_layer(block, n_blocks[2], channels[2], stride = 2)
        self.layer4 = self.get_resnet_layer(block, n_blocks[3], channels[3], stride = 2)
        
        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.fc = nn.Linear(self.in_channels, output_dim)
        
    def get_resnet_layer(self, block, n_blocks, channels, stride = 1):
    
        layers = []
        
        if self.in_channels != block.expansion * channels:
            downsample = True
        else:
            downsample = False
        
        layers.append(block(self.in_channels, channels, stride, downsample))
        
        for i in range(1, n_blocks):
            layers.append(block(block.expansion * channels, channels))

        self.in_channels = block.expansion * channels
            
        return nn.Sequential(*layers)
        
    def forward(self, x):
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        
        x = self.avgpool(x)
        h = x.view(x.shape[0], -1)
        x = self.fc(h)
        
        return x, h

In [None]:
class BasicBlock(nn.Module):
    
    expansion = 1
    
    def __init__(self, in_channels, out_channels, stride = 1, downsample = False):
        super().__init__()
                
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size = 3, 
                               stride = stride, padding = 1, bias = False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size = 3, 
                               stride = 1, padding = 1, bias = False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        self.relu = nn.ReLU(inplace = True)
        
        if downsample:
            conv = nn.Conv2d(in_channels, out_channels, kernel_size = 1, 
                             stride = stride, bias = False)
            bn = nn.BatchNorm2d(out_channels)
            downsample = nn.Sequential(conv, bn)
        else:
            downsample = None
        
        self.downsample = downsample
        
    def forward(self, x):
        
        i = x
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        
        x = self.conv2(x)
        x = self.bn2(x)
        
        if self.downsample is not None:
            i = self.downsample(i)
                        
        x += i
        x = self.relu(x)
        
        return x

In [None]:
ResNetConfig = namedtuple('ResNetConfig', ['block', 'n_blocks', 'channels'])

In [None]:
resnet18_config = ResNetConfig(block = BasicBlock,
                               n_blocks = [2,2,2,2],
                               channels = [64, 128, 256, 512])

resnet34_config = ResNetConfig(block = BasicBlock,
                               n_blocks = [3,4,6,3],
                               channels = [64, 128, 256, 512])

In [None]:
class Bottleneck(nn.Module):
    
    expansion = 4
    
    def __init__(self, in_channels, out_channels, stride = 1, downsample = False):
        super().__init__()
    
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size = 1, 
                               stride = 1, bias = False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size = 3, 
                               stride = stride, padding = 1, bias = False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        self.conv3 = nn.Conv2d(out_channels, self.expansion * out_channels, kernel_size = 1,
                               stride = 1, bias = False)
        self.bn3 = nn.BatchNorm2d(self.expansion * out_channels)
        
        self.relu = nn.ReLU(inplace = True)
        
        if downsample:
            conv = nn.Conv2d(in_channels, self.expansion * out_channels, kernel_size = 1, 
                             stride = stride, bias = False)
            bn = nn.BatchNorm2d(self.expansion * out_channels)
            downsample = nn.Sequential(conv, bn)
        else:
            downsample = None
            
        self.downsample = downsample
        
    def forward(self, x):
        
        i = x
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        
        x = self.conv3(x)
        x = self.bn3(x)
                
        if self.downsample is not None:
            i = self.downsample(i)
            
        x += i
        x = self.relu(x)
    
        return x

In [None]:
resnet50_config = ResNetConfig(block = Bottleneck,
                               n_blocks = [3, 4, 6, 3],
                               channels = [64, 128, 256, 512])

resnet101_config = ResNetConfig(block = Bottleneck,
                                n_blocks = [3, 4, 23, 3],
                                channels = [64, 128, 256, 512])

resnet152_config = ResNetConfig(block = Bottleneck,
                                n_blocks = [3, 8, 36, 3],
                                channels = [64, 128, 256, 512])

# Active Model

In [None]:
pretrained_model = models.resnet50(pretrained = True)

  f"The parameter '{pretrained_param}' is deprecated since 0.13 and will be removed in 0.15, "
Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth


  0%|          | 0.00/97.8M [00:00<?, ?B/s]

In [None]:
print(pretrained_model)

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [None]:
IN_FEATURES = pretrained_model.fc.in_features 
OUTPUT_DIM = len(test_data.classes)

fc = nn.Linear(IN_FEATURES, OUTPUT_DIM)

In [None]:
pretrained_model.fc = fc

In [None]:
model = ResNet(resnet50_config, OUTPUT_DIM)

In [None]:
for name, param in model.named_parameters():
  if "fc" in name or "avgpool" in name:
    param.requires_grad = True
  else:
    param.requires_grad = False

In [None]:
model.load_state_dict(pretrained_model.state_dict())

<All keys matched successfully>

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

The model has 6,147 trainable parameters


# Learning Rate

In [None]:
# START_LR = 1e-7

# optimizer = optim.Adam(model.parameters(), lr=START_LR)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

criterion = nn.CrossEntropyLoss()

model = model.to(device)
criterion = criterion.to(device)

In [None]:
# class LRFinder:
#     def __init__(self, model, optimizer, criterion, device):
        
#         self.optimizer = optimizer
#         self.model = model
#         self.criterion = criterion
#         self.device = device
        
#         torch.save(model.state_dict(), 'init_params.pt')

#     def range_test(self, iterator, end_lr = 10, num_iter = 100, 
#                    smooth_f = 0.05, diverge_th = 5):
        
#         lrs = []
#         losses = []
#         best_loss = float('inf')

#         lr_scheduler = ExponentialLR(self.optimizer, end_lr, num_iter)
        
#         iterator = IteratorWrapper(iterator)
        
#         for iteration in range(num_iter):

#             loss = self._train_batch(iterator)

#             #update lr
#             lr_scheduler.step()
            
#             lrs.append(lr_scheduler.get_lr()[0])

#             if iteration > 0:
#                 loss = smooth_f * loss + (1 - smooth_f) * losses[-1]
                
#             if loss < best_loss:
#                 best_loss = loss

#             losses.append(loss)
            
#             if loss > diverge_th * best_loss:
#                 print("Stopping early, the loss has diverged")
#                 break
                       
#         #reset model to initial parameters
#         model.load_state_dict(torch.load('init_params.pt'))
                    
#         return lrs, losses

#     def _train_batch(self, iterator):
        
#         self.model.train()
        
#         self.optimizer.zero_grad()
        
#         x, y = iterator.get_batch()
        
#         x = x.to(self.device)
#         y = y.to(self.device)
        
#         y_pred, _ = self.model(x)
                
#         loss = self.criterion(y_pred, y)
        
#         loss.backward()
        
#         self.optimizer.step()
        
#         return loss.item()

# class ExponentialLR(_LRScheduler):
#     def __init__(self, optimizer, end_lr, num_iter, last_epoch=-1):
#         self.end_lr = end_lr
#         self.num_iter = num_iter
#         super(ExponentialLR, self).__init__(optimizer, last_epoch)

#     def get_lr(self):
#         curr_iter = self.last_epoch + 1
#         r = curr_iter / self.num_iter
#         return [base_lr * (self.end_lr / base_lr) ** r for base_lr in self.base_lrs]

# class IteratorWrapper:
#     def __init__(self, iterator):
#         self.iterator = iterator
#         self._iterator = iter(iterator)

#     def __next__(self):
#         try:
#             inputs, labels = next(self._iterator)
#         except StopIteration:
#             self._iterator = iter(self.iterator)
#             inputs, labels, *_ = next(self._iterator)

#         return inputs, labels

#     def get_batch(self):
#         return next(self)

In [None]:
# END_LR = 50
# NUM_ITER = 50

# lr_finder = LRFinder(model, optimizer, criterion, device)
# lrs, losses = lr_finder.range_test(train_iterator, END_LR, NUM_ITER)

In [None]:
# def plot_lr_finder(lrs, losses, skip_start = 5, skip_end = 5):
    
#     if skip_end == 0:
#         lrs = lrs[skip_start:]
#         losses = losses[skip_start:]
#     else:
#         lrs = lrs[skip_start:-skip_end]
#         losses = losses[skip_start:-skip_end]
    
#     fig = plt.figure(figsize = (16,8))
#     ax = fig.add_subplot(1,1,1)
#     ax.plot(lrs, losses)
#     ax.set_xscale('log')
#     ax.set_xlabel('Learning rate')
#     ax.set_ylabel('Loss')
#     ax.grid(True, 'both', 'x')
#     plt.show()

In [None]:
# plot_lr_finder(lrs, losses, skip_start = 0, skip_end = 0)

In [None]:
FOUND_LR = 1e-2

params = [
          {'params': model.conv1.parameters(), 'lr': FOUND_LR / 10},
          {'params': model.bn1.parameters(), 'lr': FOUND_LR / 10},
          {'params': model.layer1.parameters(), 'lr': FOUND_LR / 8},
          {'params': model.layer2.parameters(), 'lr': FOUND_LR / 6},
          {'params': model.layer3.parameters(), 'lr': FOUND_LR / 4},
          {'params': model.layer4.parameters(), 'lr': FOUND_LR / 2},
          {'params': model.fc.parameters()}
         ]


optimizer = optim.Adam(params, lr = FOUND_LR)

# Train

In [None]:
EPOCHS = 20
STEPS_PER_EPOCH = len(train_iterator)
TOTAL_STEPS = EPOCHS * STEPS_PER_EPOCH

MAX_LRS = [p['lr'] for p in optimizer.param_groups]

scheduler = lr_scheduler.OneCycleLR(optimizer,
                                    max_lr = MAX_LRS,
                                    total_steps = TOTAL_STEPS)

In [None]:
bs = 10;

In [None]:
def calculate_topk_accuracy(y_pred, y, k = 2):
    with torch.no_grad():
        batch_size = y.shape[0]
        bs = batch_size
        _, top_pred = y_pred.topk(k, 1)
        top_pred = top_pred.t()
        correct = top_pred.eq(y.view(1, -1).expand_as(top_pred))
        correct_1 = correct[:1].reshape(-1).float().sum(0, keepdim = True)
        correct_k = correct[:k].reshape(-1).float().sum(0, keepdim = True)
        acc_1 = correct_1 / batch_size
        acc_k = correct_k / batch_size
    return acc_1, acc_k

In [None]:
def train(model, iterator, optimizer, criterion, scheduler, device):
    
    epoch_loss = 0
    epoch_acc_1 = 0
    epoch_acc_2 = 0
    
    model.train()
    
    for (x, y) in iterator:
        
        x = x.to(device)
        y = y.to(device)
        
        optimizer.zero_grad()
                
        y_pred, _ = model(x)
        
        loss = criterion(y_pred, y)
        
        acc_1, acc_2 = calculate_topk_accuracy(y_pred, y)
        
        loss.backward()
        
        optimizer.step()
        
        scheduler.step()
        
        epoch_loss += loss.item()
        epoch_acc_1 += acc_1.item()
        epoch_acc_2 += acc_2.item()
        
    epoch_loss /= len(iterator)
    epoch_acc_1 /= len(iterator)
    epoch_acc_2 /= len(iterator)
        
    return epoch_loss, epoch_acc_1, epoch_acc_2

In [None]:
def evaluate(model, iterator, criterion, device):
    
    epoch_loss = 0
    epoch_acc_1 = 0
    epoch_acc_5 = 0
    
    model.eval()
    
    with torch.no_grad():
        
        for (x, y) in iterator:

            x = x.to(device)
            y = y.to(device)

            y_pred, _ = model(x)

            loss = criterion(y_pred, y)

            acc_1, acc_5 = calculate_topk_accuracy(y_pred, y)

            epoch_loss += loss.item()
            epoch_acc_1 += acc_1.item()
            epoch_acc_5 += acc_5.item()
        
    epoch_loss /= len(iterator)
    epoch_acc_1 /= len(iterator)
    epoch_acc_5 /= len(iterator)
        
    return epoch_loss, epoch_acc_1, epoch_acc_5

In [None]:
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [None]:
best_valid_loss = float('inf')

for epoch in range(EPOCHS):
    
    start_time = time.monotonic()
    
    train_loss, train_acc_1, train_acc_5 = train(model, train_iterator, optimizer, criterion, scheduler, device)
    valid_loss, valid_acc_1, valid_acc_5 = evaluate(model, valid_iterator, criterion, device)
        
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'tut5-model.pt')

    end_time = time.monotonic()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)
    
    print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc @1: {train_acc_1*100:6.2f}% | ' \
          f'Train Acc @2: {train_acc_5*100:6.2f}%')
    print(f'\tValid Loss: {valid_loss:.3f} | Valid Acc @1: {valid_acc_1*100:6.2f}% | ' \
          f'Valid Acc @2: {valid_acc_5*100:6.2f}%')

Epoch: 01 | Epoch Time: 2m 37s
	Train Loss: 0.442 | Train Acc @1:  83.17% | Train Acc @2:  96.10%
	Valid Loss: 0.313 | Valid Acc @1:  87.66% | Valid Acc @2:  97.36%
Epoch: 02 | Epoch Time: 2m 30s
	Train Loss: 0.322 | Train Acc @1:  87.75% | Train Acc @2:  97.70%
	Valid Loss: 0.288 | Valid Acc @1:  89.01% | Valid Acc @2:  97.44%
Epoch: 03 | Epoch Time: 2m 30s
	Train Loss: 0.382 | Train Acc @1:  86.46% | Train Acc @2:  97.21%
	Valid Loss: 0.372 | Valid Acc @1:  87.76% | Valid Acc @2:  97.24%
Epoch: 04 | Epoch Time: 2m 30s
	Train Loss: 0.397 | Train Acc @1:  86.98% | Train Acc @2:  97.36%
	Valid Loss: 0.350 | Valid Acc @1:  88.60% | Valid Acc @2:  97.48%
Epoch: 05 | Epoch Time: 2m 30s
	Train Loss: 0.443 | Train Acc @1:  86.79% | Train Acc @2:  97.40%
	Valid Loss: 0.478 | Valid Acc @1:  87.83% | Valid Acc @2:  97.42%
Epoch: 06 | Epoch Time: 2m 30s
	Train Loss: 0.506 | Train Acc @1:  86.23% | Train Acc @2:  97.25%
	Valid Loss: 0.355 | Valid Acc @1:  89.54% | Valid Acc @2:  97.76%
Epoch: 07 

# Test

In [None]:
model.load_state_dict(torch.load('tut5-model.pt'))

test_loss, test_acc_1, test_acc_5 = evaluate(model, test_iterator, criterion, device)

print(f'Test Loss: {test_loss:.3f} | Test Acc @1: {test_acc_1*100:6.2f}% | ' \
      f'Test Acc @2: {test_acc_5*100:6.2f}%')

Test Loss: 0.496 | Test Acc @1:  84.36% | Test Acc @2:  96.55%


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Conversion

In [None]:
# Some standard imports
import io
import numpy as np

from torch import nn
import torch.utils.model_zoo as model_zoo
import torch.onnx
model.eval()

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [None]:
# Input to the model
x = torch.randn(bs, 3, 7, 7, requires_grad=True)
model.to('cpu')
torch_out = model(x)

# Export the model
torch.onnx.export(model,               # model being run
                  x,                         # model input (or a tuple for multiple inputs)
                  "output.onnx",   # where to save the model (can be a file or file-like object)
                  export_params=True,        # store the trained parameter weights inside the model file
                  opset_version=10,          # the ONNX version to export the model to
                  do_constant_folding=True,  # whether to execute constant folding for optimization
                  input_names = ['input'],   # the model's input names
                  output_names = ['output'], # the model's output names
                  dynamic_axes={'input' : {0 : 'batch_size'},    # variable length axes
                                'output' : {0 : 'batch_size'}})

In [None]:
print(torch_out)

(tensor([[ 0.0782,  0.2776,  0.7803],
        [-0.1564,  0.1292,  0.5716],
        [-0.2011,  0.1668,  0.6258],
        [ 0.1239,  0.4531,  0.6995],
        [-0.0147,  0.3615,  0.7478],
        [-0.0012,  0.5690,  0.7569],
        [-0.0857,  0.7378,  0.6964],
        [ 0.0546,  0.3997,  0.6533],
        [ 0.0438,  0.2617,  0.3185],
        [-0.0398,  0.3323,  0.4207]], grad_fn=<AddmmBackward0>), tensor([[0.0000, 0.2091, 0.0000,  ..., 0.0000, 0.0000, 0.0000],
        [0.2360, 0.0000, 0.3891,  ..., 0.0000, 0.1304, 0.1135],
        [0.1077, 0.0000, 0.0000,  ..., 0.0438, 0.0000, 0.0000],
        ...,
        [0.1828, 0.0000, 0.0000,  ..., 0.3704, 0.2031, 0.0000],
        [0.0601, 0.0000, 0.0000,  ..., 0.0843, 0.0687, 0.1060],
        [0.0556, 0.0000, 0.0000,  ..., 0.0275, 0.0000, 0.0000]],
       grad_fn=<ViewBackward0>))


Test ONNX Conversion

In [None]:
#!pip install onnxruntime
import onnxruntime

ort_session = onnxruntime.InferenceSession("output.onnx")

def to_numpy(tensor):
    return tensor[:].detach().cpu().numpy() if tensor[:].requires_grad else tensor.cpu().numpy()

# compute ONNX Runtime output prediction
ort_inputs = {ort_session.get_inputs()[0].name: to_numpy(x)}
ort_outs = ort_session.run(None, ort_inputs)

# compare ONNX Runtime and PyTorch results
np.testing.assert_allclose(to_numpy(torch_out), ort_outs[0], rtol=1e-03, atol=1e-05)

print("Exported model has been tested with ONNXRuntime, and the result looks good!")

AttributeError: ignored

ONNX TO TENSORFLOW

In [None]:
!pip install onnx_tf
from onnx_tf.backend import prepare
import onnx

TF_PATH = "./converted.pb" # where the representation of tensorflow model will be stored
ONNX_PATH = "./converted.onnx" # path to my existing ONNX model
onnx_model = onnx.load(ONNX_PATH)  # load onnx model

# prepare function converts an ONNX model to an internel representation
# of the computational graph called TensorflowRep and returns
# the converted representation.
tf_rep = prepare(onnx_model)  # creating TensorflowRep object

# export_graph function obtains the graph proto corresponding to the ONNX
# model associated with the backend representation and serializes
# to a protobuf file.
tf_rep.export_graph(TF_PATH)

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting onnx_tf
  Downloading onnx_tf-1.10.0-py3-none-any.whl (226 kB)
[K     |████████████████████████████████| 226 kB 8.8 MB/s 
[?25hCollecting tensorflow-addons
  Downloading tensorflow_addons-0.17.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.1 MB)
[K     |████████████████████████████████| 1.1 MB 80.4 MB/s 
Installing collected packages: tensorflow-addons, onnx-tf
Successfully installed onnx-tf-1.10.0 tensorflow-addons-0.17.1




In [None]:
import tensorflow as tf
# Convert the model
converter = tf.lite.TFLiteConverter.from_saved_model(TF_PATH) # path to the SavedModel directory
converter.target_spec.supported_ops = [
  tf.lite.OpsSet.TFLITE_BUILTINS, # enable TensorFlow Lite ops.
  tf.lite.OpsSet.SELECT_TF_OPS # enable TensorFlow ops.
]
tflite_model = converter.convert()

model_name = "classification"

# Save the model.
with open(model_name +".tflite", "wb") as f:
  f.write(tflite_model)



In [None]:
!pip install google-colab
from google.colab import files
files.download('classification.tflite')

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# Examination

## Confusion Matrix

In [None]:
def get_predictions(model, iterator):

    model.eval()

    images = []
    labels = []
    probs = []

    with torch.no_grad():

        for (x, y) in iterator:

            x = x.to(device)

            y_pred, _ = model(x)

            y_prob = F.softmax(y_pred, dim = -1)
            top_pred = y_prob.argmax(1, keepdim = True)

            images.append(x.cpu())
            labels.append(y.cpu())
            probs.append(y_prob.cpu())

    images = torch.cat(images, dim = 0)
    labels = torch.cat(labels, dim = 0)
    probs = torch.cat(probs, dim = 0)

    return images, labels, probs

In [None]:
images, labels, probs = get_predictions(model, test_iterator)

In [None]:
pred_labels = torch.argmax(probs, 1)

In [None]:
def plot_confusion_matrix(labels, pred_labels, classes):
    
    fig = plt.figure(figsize = (50, 50));
    ax = fig.add_subplot(1, 1, 1);
    cm = confusion_matrix(labels, pred_labels);
    cm = ConfusionMatrixDisplay(cm, display_labels = classes);
    cm.plot(values_format = 'd', cmap = 'Blues', ax = ax)
    fig.delaxes(fig.axes[1]) #delete colorbar
    plt.xticks(rotation = 90)
    plt.xlabel('Predicted Label', fontsize = 50)
    plt.ylabel('True Label', fontsize = 50)

In [None]:
plot_confusion_matrix(labels, pred_labels, classes)

# Most Incorrect

In [None]:
corrects = torch.eq(labels, pred_labels)

In [None]:
incorrect_examples = []

for image, label, prob, correct in zip(images, labels, probs, corrects):
    if not correct:
        incorrect_examples.append((image, label, prob))

incorrect_examples.sort(reverse = True, key = lambda x: torch.max(x[2], dim = 0).values)

In [None]:
def plot_most_incorrect(incorrect, classes, n_images, normalize = True):

    rows = int(np.sqrt(n_images))
    cols = int(np.sqrt(n_images))

    fig = plt.figure(figsize = (25, 20))

    for i in range(rows*cols):

        ax = fig.add_subplot(rows, cols, i+1)
        
        image, true_label, probs = incorrect[i]
        image = image.permute(1, 2, 0)
        true_prob = probs[true_label]
        incorrect_prob, incorrect_label = torch.max(probs, dim = 0)
        true_class = classes[true_label]
        incorrect_class = classes[incorrect_label]

        if normalize:
            image = normalize_image(image)

        ax.imshow(image.cpu().numpy())
        ax.set_title(f'true label: {true_class} ({true_prob:.3f})\n' \
                     f'pred label: {incorrect_class} ({incorrect_prob:.3f})')
        ax.axis('off')
        
    fig.subplots_adjust(hspace=0.4)

In [None]:
N_IMAGES = 25

plot_most_incorrect(incorrect_examples, classes, N_IMAGES)

# Representations

In [None]:
def get_representations(model, iterator):

    model.eval()

    outputs = []
    intermediates = []
    labels = []

    with torch.no_grad():
        
        for (x, y) in iterator:

            x = x.to(device)

            y_pred, _ = model(x)

            outputs.append(y_pred.cpu())
            labels.append(y)
        
    outputs = torch.cat(outputs, dim = 0)
    labels = torch.cat(labels, dim = 0)

    return outputs, labels

In [None]:
outputs, labels = get_representations(model, train_iterator)

In [None]:
def get_pca(data, n_components = 2):
    pca = decomposition.PCA()
    pca.n_components = n_components
    pca_data = pca.fit_transform(data)
    return pca_data

In [None]:
def plot_representations(data, labels, classes, n_images = None):
            
    if n_images is not None:
        data = data[:n_images]
        labels = labels[:n_images]
                
    fig = plt.figure(figsize = (15, 15))
    ax = fig.add_subplot(111)
    scatter = ax.scatter(data[:, 0], data[:, 1], c = labels, cmap = 'hsv')
    handles, _ = scatter.legend_elements(num = None)
    legend = plt.legend(handles = handles, labels = classes)

In [None]:
output_pca_data = get_pca(outputs)
plot_representations(output_pca_data, labels, classes)

In [None]:
def get_tsne(data, n_components = 2, n_images = None):
    
    if n_images is not None:
        data = data[:n_images]
        
    tsne = manifold.TSNE(n_components = n_components, random_state = 0)
    tsne_data = tsne.fit_transform(data)
    return tsne_data

In [None]:
output_tsne_data = get_tsne(outputs)
plot_representations(output_tsne_data, labels, classes)

# Filters

In [None]:
def plot_filtered_images(images, filters, n_filters = None, normalize = True):

    images = torch.cat([i.unsqueeze(0) for i in images], dim = 0).cpu()
    filters = filters.cpu()

    if n_filters is not None:
        filters = filters[:n_filters]

    n_images = images.shape[0]
    n_filters = filters.shape[0]

    filtered_images = F.conv2d(images, filters)

    fig = plt.figure(figsize = (30, 30))

    for i in range(n_images):

        image = images[i]

        if normalize:
            image = normalize_image(image)

        ax = fig.add_subplot(n_images, n_filters+1, i+1+(i*n_filters))
        ax.imshow(image.permute(1,2,0).numpy())
        ax.set_title('Original')
        ax.axis('off')

        for j in range(n_filters):
            image = filtered_images[i][j]

            if normalize:
                image = normalize_image(image)

            ax = fig.add_subplot(n_images, n_filters+1, i+1+(i*n_filters)+j+1)
            ax.imshow(image.numpy(), cmap = 'bone')
            ax.set_title(f'Filter {j+1}')
            ax.axis('off');

    fig.subplots_adjust(hspace = -0.7)

In [None]:
N_IMAGES = 5
N_FILTERS = 7

images = [image for image, label in [train_data[i] for i in range(N_IMAGES)]]
filters = model.conv1.weight.data

plot_filtered_images(images, filters, N_FILTERS)

In [None]:
def plot_filters(filters, normalize = True):

    filters = filters.cpu()

    n_filters = filters.shape[0]

    rows = int(np.sqrt(n_filters))
    cols = int(np.sqrt(n_filters))

    fig = plt.figure(figsize = (30, 15))

    for i in range(rows*cols):

        image = filters[i]

        if normalize:
            image = normalize_image(image)

        ax = fig.add_subplot(rows, cols, i+1)
        ax.imshow(image.permute(1, 2, 0))
        ax.axis('off')
        
    fig.subplots_adjust(wspace = -0.9)

In [None]:
plot_filters(filters)