## Imports and defining helper functions

In [None]:
import torch
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models
import torch.nn as nn
import torch.optim as optim
import numpy as np
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
import os
import seaborn as sn
import pandas as pd

In [None]:
def get_accuracy(model, loader, specific_class=None) -> float:
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data in loader:
            inputs, labels = data[0].to(device), data[1].to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            if specific_class is None:
                correct += (predicted == labels).sum().item()
                total += labels.size(0)
            else:
                for predict, label in zip(predicted, labels):
                    if label == specific_class:
                        total += 1
                        if label == predict:
                            correct += 1
    model.trian()
    return correct / total

## Defining the image preprocessing before they enter the model

In [None]:
transformations = transforms.Compose([
    transforms.Resize(80),
    transforms.CenterCrop(80),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

## Importing the dataset

In [None]:
dataset = 'demo_dataset'
batch_size = 8
num_worksers = 2

train_set = datasets.ImageFolder(os.path.join('datasets', dataset, 'training'), transform = transformations)
test_set = datasets.ImageFolder(os.path.join('datasets', dataset, 'test'), transform = transformations)

train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=2)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=True, num_workers=2)

## Define the model and optimizer

Can hot-swap models:  https://pytorch.org/vision/stable/models.html

Or the optimzers here: https://pytorch.org/docs/stable/optim.html


In [None]:
if torch.cuda.is_available():
    device = torch.device("cuda:0")
else:
    device = torch.device('cpu')


net = models.mobilenet_v2(pretrained=False, progress=True).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

## Initialize results tracking

In [None]:
if not os.path.exists('results'):
    os.mkdir('results')

results_fp = os.path.join('results', f'{dataset}.csv')
if os.path.exists(results_fp):
    input('Warning. Results File Exists [return to continue]')

with open(results_fp, 'w') as results_f:
    results_f.write('Batches, Training Accuracy, Test Accuracy\n')

## Set Training Parameters

In [None]:
epochs = 5
minibatch_save_interval = 2000

##  Train the model

In [None]:
best_test_acc = 0.0
loss = 0.0


for epoch in range(epochs):  # loop over the dataset multiple times
    net.train()
    running_accuracy = []
    running_loss = 0.0
    for i, data in enumerate(train_loader, 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data[0].to(device), data[1].to(device)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        _, predicted = torch.max(outputs.data, 1)
        running_accuracy.append((labels == predicted).sum().item())

        # print statistics
        running_loss += loss.item()
        if i % minibatch_save_interval == (minibatch_save_interval - 1):
            with torch.no_grad():
                test_accuracy = get_accuracy(net, test_loader)

            with open(results_fp, 'a') as results_f:
                results_f.write(f'{i * (epoch + 1)},{np.mean(running_accuracy) / batch_size},{test_accuracy}\n')

            print('[%d, %5d] loss: %.3f' %
                  (epoch + 1, i + 1, running_loss / minibatch_save_interval))
            running_loss = 0.0
            print(f'Train: {np.mean(running_accuracy) / batch_size} Test: {test_accuracy}')

            if test_accuracy > best_test_acc:
                torch.save(net.state_dict(), os.path.join('results', f'{dataset}_m.p'))
                best_test_acc = test_accuracy

print('Finished Training')

## See Final Accuracy

In [None]:
get_accuracy(net, test_loader)

## See Confusion Matrix on Test Set

In [None]:
confusion_matrix = torch.zeros(len(test_set.classes), len(test_set.classes))
with torch.no_grad():
    model.eval()
    for i, (inputs, classes) in enumerate(test_loader):
        inputs = inputs.to(device)
        classes = classes.to(device)
        outputs = net(inputs)
        _, preds = torch.max(outputs, 1)
        for t, p in zip(classes.view(-1), preds.view(-1)):
                confusion_matrix[t.long(), p.long()] += 1
    model.train()
print(confusion_matrix)

## Visualize Confusion Matrix

In [None]:
df_cm = pd.DataFrame(confusion_matrix.numpy(), index = test_set.classes,
                  columns = test_set.classes)

plt.figure(figsize = (10,7))
sn.heatmap(df_cm, annot=True)