# cifar10 vgg16


In [1]:
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
from torchvision import datasets, models, transforms
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
import time
torch.__version__

import os
import sys 
import argparse
import pandas as pd
import csv
import time


In [2]:
torch.manual_seed(123)

<torch._C.Generator at 0x7f97a418c970>

In [3]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f'{device}\n')

cuda:0



In [4]:
size = 224
classes = ('0_Rust', '1_Brown_Spots', '2_Sooty_Molds')
stage = 'stage_2'
batch_size = 32

In [5]:
transform_train = transforms.Compose([
    # transforms.RandomCrop(32, padding=4),
    transforms.Resize(size=[size, size]),
    # transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) 
])

transform_test = transforms.Compose([
    transforms.Resize(size=[size, size]),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [6]:
class CustomDataset(Dataset):
    def __init__(self, root, transform=None):
        self.root = root
        self.transform = transform
        self.image_paths = []

        classes = os.listdir(self.root)
        for class_name in classes:
            class_path = os.path.join(self.root, class_name)
            if os.path.isdir(class_path):
                images = os.listdir(class_path)
                for image_name in images:
                    image_path = os.path.join(class_path, image_name)
                    self.image_paths.append((image_path, int(class_name.split('_')[0])))

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path, label = self.image_paths[idx]
        image = Image.open(image_path).convert("RGB")

        if self.transform:
            image = self.transform(image)

        return image, label

# Create custom datasets
dataset_folder = r'./dataset/'

train_folder = os.path.join(dataset_folder, f'swatdcnn/data/Augmented/{stage}/train')
test_folder = os.path.join(dataset_folder, f'swatdcnn/data/Augmented/{stage}/validation')

train_dataset = CustomDataset(root=train_folder, transform=transform_train)
test_dataset = CustomDataset(root=test_folder, transform=transform_test)

train_dataset_size = len(train_dataset)
test_dataset_size = len(test_dataset)

print(f"Number of images in train dataset: {train_dataset_size}")
print(f"Number of images in test/validation dataset: {test_dataset_size}")




Number of images in train dataset: 6000
Number of images in test/validation dataset: 625


## Dataloaders

In [7]:
# Create data loaders
trainloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
testloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [8]:
def lookat_dataset(dataset, istensor=False):
  figure = plt.figure(figsize=(8, 8))
  rows, cols = 2, 2
  for i in range(1, 5):
      sample_idx = torch.randint(len(dataset), size=(1,)).item()
      img, label = dataset[sample_idx]
      figure.add_subplot(rows, cols, i)
      plt.title(CATEGORIES[label])
      plt.axis("off")
      if istensor:
        plt.imshow(img.squeeze().permute(1, 2, 0))
      else:
        plt.imshow(img)
  plt.show()

In [9]:
# lookat_dataset(trainloader, True)

# Creating the CNN model

In [10]:
# vit_b_16 = models.vit_b_16(weights='DEFAULT')
net = models.vgg16(weights='DEFAULT')
net

VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1

In [None]:
# Freezing the network
for param in vit_b_16.parameters():
    param.requires_grad = True

In [None]:
# Replace the last fully connected layer
num_features = vit_b_16.heads.head.in_features
print('num_features: ', num_features)

vit_b_16.conv_proj = nn.Conv2d(3, 768, kernel_size=(4, 4), stride=(4, 4))

vit_b_16.encoder.layers = nn.Sequential(
    vit_b_16.encoder.layers[0],  # Keep encoder_layer_0
    vit_b_16.encoder.layers[1],  # Keep encoder_layer_1
    vit_b_16.encoder.layers[2],  # Keep encoder_layer_2
    vit_b_16.encoder.layers[3],  # Keep encoder_layer_3
    # vit_b_16.encoder.layers[4],  # Keep encoder_layer_4
    # vit_b_16.encoder.layers[5],  # Keep encoder_layer_5
    # vit_b_16.encoder.layers[6],  # Keep encoder_layer_6
    # vit_b_16.encoder.layers[7],  # Keep encoder_layer_7
)

vit_b_16.heads = nn.Sequential(
    nn.Linear(num_features, 256),
    nn.ReLU(),
    nn.Linear(256, 10),  
    nn.Softmax(dim=1)  
)

In [None]:
loss_function = nn.CrossEntropyLoss()
optimizer = optim.Adam(vit_b_16.parameters())

In [None]:
optimizer = optim.Adam(vit_b_16.parameters())
print('learning rate: ', optimizer.param_groups[0]['lr'])

In [None]:
import os
import datetime

model_description = "vit grad = False, normalized = imagenet mean and std"

def show_log(folder_name, log_str):
    print(log_str)
    
    log_folder = f"logs/{folder_name}"
    os.makedirs(log_folder, exist_ok=True)
    log_filename = f"{log_folder}/log.txt"

    log_file = open(log_filename, 'a')
    log_file.write(log_str)
    log_file.close()

In [None]:
def train_model(model, loss_function, optimizer, epochs=25, patience=10):
    historic = []
    best_accuracy = 0.0
    best_accuracy_epoch = 1
    best_model = None
    early_stopping_counter = 0

    log_folder = "log_" + datetime.datetime.now().strftime("%d-%m-%Y-%H-%M")
    
    show_log(log_folder, f"Description: {model_description}\n")
    show_log(log_folder, f"loss_function: {loss_function}\n")
    show_log(log_folder, f"optimizer: {optimizer}\n")
    show_log(log_folder, f"epochs: {epochs}\n")
    show_log(log_folder, f"patience: {patience}\n")

    for epoch in range(epochs):
        init_epoch = time.time()
        log_str = f"\nEpoch: {epoch+1}/{epochs}\n"
        show_log(log_folder, log_str)

        model.to(device)
        model.train()

        loss_train = 0.0
        accuracy_train = 0.0

        loss_validation = 0.0
        accuracy_validation = 0.0

        for i, (inputs, labels) in enumerate(data_loader_train):
            inputs = inputs.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()

            outputs = model(inputs)

            loss = loss_function(outputs, labels)
            loss.backward()

            optimizer.step()

            loss_train += loss.item() * inputs.size(0)

            values_max, index_values_max = torch.max(outputs.data, 1)
            correct_predictions = index_values_max.eq(labels.data.view_as(index_values_max))

            accuracy = torch.mean(correct_predictions.type(torch.FloatTensor))
            accuracy_train += accuracy.item() * inputs.size(0)

            # log_str = f"Train - batch number {i:03d}, Loss: {loss.item():.4f}, Accuracy: {accuracy.item():.4f}\n"
            # show_log(log_folder, log_str)

        with torch.no_grad():
            model.eval()

            for j, (inputs, labels) in enumerate(data_loader_validation):
                inputs = inputs.to(device)
                labels = labels.to(device)

                outputs = model(inputs)

                loss = loss_function(outputs, labels)

                loss_validation += loss.item() * inputs.size(0)

                values_max, index_values_max = torch.max(outputs.data, 1)
                correct_predictions = index_values_max.eq(labels.data.view_as(index_values_max))

                accuracy = torch.mean(correct_predictions.type(torch.FloatTensor))
                accuracy_validation += accuracy.item() * inputs.size(0)

                # log_str = "Validation - batch number: {:03d}, Loss: {:.4f}, Accuracy: {:.4f}".format(j, loss.item(), accuracy.item())
                # show_log(log_folder, log_str)

        loss_mean_train = loss_train / train_images_length
        accuracy_mean_train = accuracy_train / train_images_length

        loss_mean_validation = loss_validation / validation_images_length
        accuracy_mean_validation = accuracy_validation / validation_images_length

        historic.append([
            loss_mean_train,
            loss_mean_validation,
            accuracy_mean_train,
            accuracy_mean_validation
        ])

        end_epoch = time.time()

        log_str = f"Epoch: {epoch+1:03d}, Train: Loss: {loss_mean_train:.4f}, Accuracy: {accuracy_mean_train*100:.4f}%,\n\tValidation: Loss: {loss_mean_validation:.4f}, Accuracy: {accuracy_mean_validation*100:.4f}%, Time: {end_epoch-init_epoch:.4f}s\n\tBest accuracy: {best_accuracy:.4f}, Best_accuracy_epoch: {best_accuracy_epoch + 1}\n"
        show_log(log_folder, log_str)
        
        if accuracy_mean_validation > best_accuracy:
            best_accuracy = accuracy_mean_validation
            best_accuracy_epoch = epoch
            torch.save(model, f"logs/{log_folder}/best_model.pth")
            best_model = model
            early_stopping_counter = 0
        else:
            early_stopping_counter += 1
            if early_stopping_counter >= patience:
                log_str = f"\nEarly stopping triggered! No improvement in validation accuracy for {patience} epochs.\n"
                show_log(log_folder, log_str)

                break

    log_file.close()
    return best_model, historic


In [None]:
epochs = 100

trained_model, historic = train_model(vit_b_16, loss_function, optimizer, epochs)

In [None]:
# Run all above

In [None]:
# Run all above

In [None]:
# historic

# Evaluating the CNN model

In [None]:
# learn about Matplotlib with Kizzy: https://youtu.be/iSpi3rKdoLQ
# Matplotlib introduction | Python Graphs | Data analysis #7
def plot_losses(losses):
  fig = plt.figure(figsize=(13, 5))
  ax = fig.gca()
  for loss_name, loss_values in losses.items():
    ax.plot(loss_values, label=loss_name)
  ax.legend(fontsize="16")
  ax.set_xlabel("Iteration", fontsize="16")
  ax.set_ylabel("Loss", fontsize="16")
  ax.set_title("Loss vs iterations", fontsize="16");

In [None]:
def make_confusion_matrix(model, loader, n_classes):
  confusion_matrix = torch.zeros(n_classes, n_classes, dtype=torch.int64)
  with torch.no_grad():
    for i, (imgs, labels) in enumerate(loader):
      imgs = imgs.to(device)
      labels = labels.to(device)
      outputs = model(imgs)
      _, predicted = torch.max(outputs, 1)
      for t, p in zip(torch.as_tensor(labels, dtype=torch.int64).view(-1),
                      torch.as_tensor(predicted, dtype=torch.int64).view(-1)):
        confusion_matrix[t, p] += 1
  return confusion_matrix

In [None]:
def evaluate_accuracy(model, dataloader, classes, verbose=True):
  # prepare to count predictions for each class
  correct_pred = {classname: 0 for classname in classes}
  total_pred = {classname: 0 for classname in classes}

  confusion_matrix = make_confusion_matrix(model, dataloader, len(classes))
  if verbose:
    total_correct = 0.0
    total_prediction = 0.0
    for i, classname in enumerate(classes):
      correct_count = confusion_matrix[i][i].item()
      class_pred = torch.sum(confusion_matrix[i]).item()

      total_correct += correct_count
      total_prediction += class_pred

      accuracy = 100 * float(correct_count) / class_pred
      print("Accuracy for class {:5s} is: {:.1f} %".format(classname,
                                                    accuracy))
  print("Global acccuracy is {:.1f}".format(100 * total_correct/total_prediction))
  return confusion_matrix

In [None]:
losses = {"Train Loss": loss_mean_train, "Test Loss": loss_mean_validation}
plot_losses(losses)

In [None]:
confusion_matrix = evaluate_accuracy(vit_b_16, data_loader_validation, CATEGORIES)

In [None]:
import seaborn as sn
plt.figure(figsize=(12, 12))
sn.set(font_scale=1.4)
sn.heatmap(confusion_matrix.tolist(),
           annot=True, annot_kws={"size": 16}, fmt='d')

# Testing examples

In [None]:
import os
from PIL import Image

# Get the path of the image relative to the current working directory
image_path = os.path.abspath('./images/bird1.png')

# Open and display the image
img = Image.open(image_path)
img

In [None]:
plt.imshow(img_tensor.permute(1,2, 0))

In [None]:
batch = img_tensor.unsqueeze(0).to(device)
net.eval()
output = net(batch)
output

In [None]:
logits = torch.nn.functional.softmax(output, dim=1) * 100
prob_dict = {}
for i, classname in enumerate(CATEGORIES):
  prob = logits[0][i].item()
  print(f"{classname} score: {prob:.2f}")
  prob_dict[classname] = [prob]

In [None]:
import pandas as pd
df_prob = pd.DataFrame.from_dict(prob_dict)
df_prob.plot(kind='barh', figsize=(12, 8))

# Saving the model

In [None]:
# torch.save(model.state_dict(), '/content/drive/MyDrive/Colab Notebooks/some_models/mlp_model_weights.pth')
torch.save(net.state_dict(), './models/cifar10_model_weights.pth')

# Creating the onnx model

In [None]:
!pip install onnx

In [None]:
# Load pretrained model weights
model_url = './models/cifar10_model_weights.pth'
model_to_onnx = ConvolutionalModel()

# Initialize model with the pretrained weights
device = torch.device('cuda')
device
model_to_onnx = ConvolutionalModel()
model_to_onnx.load_state_dict(torch.load(model_url))
# model_to_onnx.load_state_dict(torch.load(model_url, map_location=device))

# set the model to inference mode
model_to_onnx.eval()

In [None]:
import onnx
from torch.autograd import Variable

# dummy_input = torch.randn(1, 3, 32, 32)
dummy_input = torch.randn(1, 3, 32, 32)

# Export the model
torch.onnx.export(model_to_onnx,                 # model being run
                  dummy_input,                         # model input (or a tuple for multiple inputs)
                  "./models/cifar10_onnx_mnist.onnx",   # where to save the model (can be a file or file-like object)
                  export_params=True,        # store the trained parameter weights inside the model file
                  opset_version=10,          # the ONNX version to export the model to
                  do_constant_folding=True,  # whether to execute constant folding for optimization
                  input_names = ['input'],   # the model's input names
                  output_names = ['output'], # the model's output names
                  dynamic_axes={'input' : {0 : 'batch_size'},    # variable length axes
                                'output' : {0 : 'batch_size'}})