# Image Classification (Pokemon Dataset)

### Install necessary components

### Import all model training modules

In [None]:
import random
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, models, transforms

### Set a seed for reproducibility

In [None]:
seed = 42
random.seed(seed)
torch.manual_seed(seed)

print(f"Set a random seed to: {seed}")

### Set device

In [None]:
device = torch.device("cpu")
print(f"default Device: {device}")

### Define dataset path

In [None]:
dataset_path = "data"

### Define transformations (resize and normalize)

In [None]:
image_size = 32

transform = transforms.Compose(
    [
        transforms.Resize(
            (image_size, image_size)
        ),  # Resize images to image_size x image_size
        transforms.ToTensor(),  # Convert to Tensor
        transforms.Normalize((0.5,), (0.5,)),  # Normalize images
    ]
)

### Load dataset

In [None]:
dataset = datasets.ImageFolder(root=dataset_path, transform=transform)

## Visualize

### Import all virtualizing modules

In [None]:
from IPython.display import display
import matplotlib.pyplot as plt
from ipywidgets import Button, Output

### Function to display the image and label

In [None]:
def show_image(index):
    # Get the image and label
    image, label = dataset[index]

    # Convert the image from tensor to numpy format for displaying
    image = image.permute(1, 2, 0).numpy()

    # Clear the output display and plot
    output.clear_output(wait=True)  # type: ignore
    with output:  # type: ignore
        plt.imshow(image)
        plt.title(f"Label: {dataset.classes[label]}")
        plt.axis("off")
        plt.show()

### Button click event to display the next image

In [None]:
index = 0

In [None]:
def on_button_clicked(b):
    global index
    index = (index + random.randint(1, len(dataset) - 1)) % len(
        dataset
    )  # Move to the next image, loop back at the end
    show_image(index)

### Initialize output display for Jupyter notebook

In [None]:
output = Output()
next_button = Button(description="Next Image")
next_button.on_click(on_button_clicked)
display(next_button, output)

## Model

### Split dataset into train and test sets (80-20 split)

In [None]:
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(
    dataset, [train_size, test_size], generator=torch.Generator().manual_seed(seed)
)

### Create DataLoaders for train and test sets

In [None]:
g = torch.Generator()
g.manual_seed(seed)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, generator=g)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

### Define the neural network

In [None]:
class MyNetwork(nn.Module):
    def __init__(
        self, input_size=image_size * image_size * 3, hidden_size=1000, num_classes=10
    ):
        super(MyNetwork, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.activation1 = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        x = x.view(x.size(0), -1)  # Flatten the images
        out = self.fc1(x)
        out = self.activation1(out)
        out = self.fc2(out)
        return out


class MyNetwork(nn.Module):
    def __init__(
        self, input_size=image_size * image_size * 3, hidden_size=1000, num_classes=10
    ):
        super(MyNetwork, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.activation1 = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.activation2 = nn.ReLU()
        self.fc3 = nn.Linear(hidden_size, hidden_size)
        self.activation3 = nn.ReLU()
        self.fc4 = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        x = x.view(x.size(0), -1)  # Flatten the images
        out = self.fc1(x)
        out = self.activation1(out)
        out = self.fc2(out)
        out = self.activation2(out)
        out = self.fc3(out)
        out = self.activation3(out)
        out = self.fc4(out)
        return out


class MyNetwork(nn.Module):
    def __init__(
        self, input_size=image_size * image_size * 3, hidden_size=1000, num_classes=10
    ):
        super(MyNetwork, self).__init__()
        self.vgg19 = models.vgg19(pretrained=True)
        in_features = self.vgg19.classifier[6].in_features
        self.vgg19.classifier[6] = nn.Linear(in_features, num_classes)

    def forward(self, x):
        out = self.vgg19(x)
        return out

### Get the class names

In [None]:
class_names = train_dataset.dataset.classes
num_classes = len(class_names)

### Initialize the model, loss function, and optimizer

In [None]:
model = MyNetwork(input_size=image_size * image_size * 3, num_classes=num_classes).to(
    device
)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

### Train the network

In [None]:
num_epochs = 5
for epoch in range(num_epochs):
    batch_num = 0
    for images, labels in train_loader:
        batch_num += 1
        images, labels = images.to(device), labels.to(device)

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        print(
            f"\tEpoch [{epoch+1}/{num_epochs}] Batch #{batch_num}, Loss: {loss.item():.4f}"
        )
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")

### Save the model

In [None]:
torch.save(model.state_dict(), "model.pth")

### Load the model

In [None]:
model = MyNetwork(input_size=image_size * image_size * 3, num_classes=num_classes).to(
    device
)

model.load_state_dict(torch.load("model.pth", map_location=device))

### Define test function to evaluate accuracy on the test set

In [None]:
def test(model, data_loader):
    model.eval()  # Set model to evaluation mode
    correct = 0
    total = 0
    with torch.no_grad():  # No need to calculate gradients
        for images, labels in data_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    print(f"Test Accuracy: {accuracy:.2f}%")

### Run test for evaluating the accuracy

In [None]:
test(model, test_loader)

# Inference

## Inference with PyTorch

### Import necessary modules

In [None]:
import os
import numpy as np
from PIL import Image

### Define the prediction function

In [None]:
def predict_image_pytorch(model, image_path, classes, show_result=False):
    # Load and preprocess the image
    image = Image.open(image_path).convert("RGB")
    image_tensor = transform(image)
    image_tensor = image_tensor.unsqueeze(0)  # Add batch dimension
    image_tensor = image_tensor.to(device)

    # Set the model to evaluation mode and make the prediction
    model.eval()
    with torch.no_grad():
        output = model(image_tensor)
        _, predicted = torch.max(output, axis=1)
        predicted_class = classes[predicted.item()]

    if show_result:
        plt.imshow(image)
        plt.title(f"Predicted Class: {predicted_class}")
        plt.axis("off")
        plt.show()

    return predicted_class

### Define the manual folder path

In [None]:
manual_folder = "manual"

### Predict each image in the "manual" folder

In [None]:
def inference_pytorch(model, show_result=False):
    for image_file in os.listdir(manual_folder):
        image_path = os.path.join(manual_folder, image_file)
        if os.path.isfile(image_path):  # Check if it's a file
            predicted_class = predict_image_pytorch(
                model, image_path, class_names, show_result
            )
            if show_result:
                print(f"Image: {image_file} - Predicted Class: {predicted_class}")


inference_pytorch(model, show_result=True)

## Inference with OpenVINO

In [None]:
import openvino as ov
import cv2

In [None]:
example = torch.randn(1, 3, image_size, image_size)
ov_model = ov.convert_model(
    model,
    example_input=(example,),
    input=[1, 3, image_size, image_size],
    share_weights=False,
)
ov.save_model(ov_model, "ov_model.xml")

core = ov.Core()
compiled_ov_model = core.compile_model(ov_model, "CPU")

In [None]:
def predict_image_openvino(model, image_path, classes, show_result=False):

    # Load and preprocess the image
    image = cv2.cvtColor(cv2.imread(filename=str(image_path)), code=cv2.COLOR_BGR2RGB)

    # Resize to image_size x image_size
    input_image = cv2.resize(src=image, dsize=(image_size, image_size))

    # Normalize an image
    MEAN = 255 * np.array([0.5, 0.5, 0.5])
    STD = 255 * np.array([0.5, 0.5, 0.5])
    input_image = np.array(input_image)
    input_image = input_image.transpose(-1, 0, 1)
    input_image = (input_image - MEAN[:, None, None]) / STD[:, None, None]

    # Add batch dimension
    input_image = input_image.reshape(1, 3, 32, 32)

    # Make the prediction
    output_layer = model.output(0)
    output = model([input_image])[output_layer]
    predicted = np.argmax(output, axis=1)
    predicted_class = classes[predicted.item()]

    if show_result:
        plt.imshow(image)
        plt.title(f"Predicted Class: {predicted_class}")
        plt.axis("off")
        plt.show()

    return predicted_class

In [None]:
def inference_openvino(model, show_result=False):
    for image_file in os.listdir(manual_folder):
        image_path = os.path.join(manual_folder, image_file)
        if os.path.isfile(image_path):  # Check if it's a file
            predicted_class = predict_image_openvino(
                model, image_path, class_names, show_result
            )
            if show_result:
                print(f"Image: {image_file} - Predicted Class: {predicted_class}")


inference_openvino(compiled_ov_model, show_result=True)

In [None]:
import time


def timeit(callback, run=10, **args):
    start_time = time.time()
    for _ in range(run):
        callback(**args)
    end_time = time.time()
    print("Time:", end_time - start_time)

In [None]:
timeit(inference_pytorch, model=model)

In [None]:
timeit(inference_openvino, model=compiled_ov_model)

# Optimize the model

In [None]:
import nncf


def transform_fn(data_item):
    return data_item[0]


# Creating separate dataloader with batch size = 1
# as dataloaders with batches > 1 is not supported yet.
quantization_loader = DataLoader(
    train_dataset, batch_size=1, shuffle=False, num_workers=0, pin_memory=True
)

quantization_dataset = nncf.Dataset(quantization_loader, transform_fn)

In [None]:
quantized_model = nncf.quantize(model, quantization_dataset)

In [None]:
ov_quantized_model = ov.convert_model(
    quantized_model, example_input=(example,), input=[1, 3, image_size, image_size]
)
ov.save_model(ov_quantized_model, "ov_quantized_model.xml")

core = ov.Core()
compiled_ov_quantized_model = core.compile_model(ov_quantized_model, "CPU")

### Measure accuracy

In [None]:
class AverageMeter(object):
    """Computes and stores the average and current value"""

    def __init__(self, name, fmt=":f"):
        self.name = name
        self.fmt = fmt
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

    def __str__(self):
        fmtstr = "{name} {val" + self.fmt + "} ({avg" + self.fmt + "})"
        return fmtstr.format(**self.__dict__)


class ProgressMeter(object):
    def __init__(self, num_batches, meters, prefix=""):
        self.batch_fmtstr = self._get_batch_fmtstr(num_batches)
        self.meters = meters
        self.prefix = prefix

    def display(self, batch):
        entries = [self.prefix + self.batch_fmtstr.format(batch)]
        entries += [str(meter) for meter in self.meters]
        print("\t".join(entries))

    def _get_batch_fmtstr(self, num_batches):
        num_digits = len(str(num_batches // 1))
        fmt = "{:" + str(num_digits) + "d}"
        return "[" + fmt + "/" + fmt.format(num_batches) + "]"


def accuracy(output, target, topk=(1,)):
    """Computes the accuracy over the k top predictions for the specified values of k"""
    with torch.no_grad():
        maxk = max(topk)
        batch_size = target.size(0)

        _, pred = output.topk(maxk, 1, True, True)
        pred = pred.t()
        correct = pred.eq(target.view(1, -1).expand_as(pred))

        res = []
        for k in topk:
            correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True)
            res.append(correct_k.mul_(100.0 / batch_size))
        return res


def validate(val_loader, model, criterion):
    batch_time = AverageMeter("Time", ":3.3f")
    losses = AverageMeter("Loss", ":2.3f")
    top1 = AverageMeter("Acc@1", ":2.2f")
    top5 = AverageMeter("Acc@5", ":2.2f")
    progress = ProgressMeter(
        len(val_loader), [batch_time, losses, top1, top5], prefix="Test: "
    )

    # Switch to evaluate mode.
    model.eval()

    with torch.no_grad():
        end = time.time()
        for i, (images, target) in enumerate(val_loader):
            images = images.to(device)
            target = target.to(device)

            # Compute output.
            output = model(images)
            loss = criterion(output, target)

            # Measure accuracy and record loss.
            acc1, acc5 = accuracy(output, target, topk=(1, 5))
            losses.update(loss.item(), images.size(0))
            top1.update(acc1[0], images.size(0))
            top5.update(acc5[0], images.size(0))

            # Measure elapsed time.
            batch_time.update(time.time() - end)
            end = time.time()

            print_frequency = 10
            if i % print_frequency == 0:
                progress.display(i)

        # print(" * Acc@1 {top1.avg:.3f} Acc@5 {top5.avg:.3f}".format(top1=top1, top5=top5))
    return top1.avg

In [None]:
acc1 = validate(test_loader, model, criterion)
print(f"Accuracy of original PyTorch model: {acc1:.3f}")

In [None]:
acc1 = validate(test_loader, quantized_model, criterion)
print(f"Accuracy of initialized INT8 model: {acc1:.3f}")

### Evaluation

In [None]:
test(model, test_loader)

In [None]:
test(quantized_model, test_loader)

In [None]:
timeit(inference_pytorch, model=model)

In [None]:
timeit(inference_openvino, model=compiled_ov_model)

In [None]:
timeit(inference_openvino, model=compiled_ov_quantized_model)

# Benchmark Model Performance by Computing Inference Time

In [None]:
def parse_benchmark_output(benchmark_output):
    parsed_output = [line for line in benchmark_output if "FPS" in line]
    print(*parsed_output, sep="\n")

fp32_ir_path = 'ov_model.xml'
int8_ir_path = 'ov_quantized_model.xml'

openvino_device = 'CPU'

print("Benchmark FP32 model (IR)")
benchmark_output = !benchmark_app -m $fp32_ir_path -d $openvino_device -api async -t 15
parse_benchmark_output(benchmark_output)

print("Benchmark INT8 model (IR)")
benchmark_output = !benchmark_app -m $int8_ir_path -d $openvino_device -api async -t 15
parse_benchmark_output(benchmark_output)