In [1]:
import os

import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import shap
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms.functional as F_tv
from plotly.subplots import make_subplots

  from .autonotebook import tqdm as notebook_tqdm


Installing pytorch can sometimes be a pain, depending on what hardware you're using.
By default, this project only installs the CPU version of pytorch. To install an appropriate CUDA version (for use on a GPU), see https://pytorch.org/get-started/locally/ for more details.

e.g. for windows: `pip3 install --upgrade torch torchvision --index-url https://download.pytorch.org/whl/cu126`

In [2]:
# Parameters
DEVICE = "cuda"  # cuda | cpu  <-- set model running on GPU vs CPU

In [3]:
# Download the MNIST handwritten digits dataset; 64 MB
# torchvision.datasets.MNIST(root=os.getcwd(), download=True)

# Load the MNIST dataset
train_dataset = torchvision.datasets.MNIST(root=os.getcwd(), download=False, train=True)
test_dataset = torchvision.datasets.MNIST(root=os.getcwd(), download=False, train=False)
train_dataset, test_dataset

(Dataset MNIST
     Number of datapoints: 60000
     Root location: c:\Users\firtd\Documents\Projects\DSC UWA\computer-vision-dscuwa
     Split: Train,
 Dataset MNIST
     Number of datapoints: 10000
     Root location: c:\Users\firtd\Documents\Projects\DSC UWA\computer-vision-dscuwa
     Split: Test)

#### **Data science is applying scientific thinking to problems which require use of large amounts of data.**
The following steps form the basis of most data science projects, leaving out other "soft" skills like determining the correct problem, stakeholder engagement/communication, integrating the model(s) into the organisation / putting the model(s) into production.

### 1. Data Exploration. Look at your data.
Understand what trends, relationships, issues, etc. there may be within your dataset.

In [4]:
# View the images
px.imshow(train_dataset[91][0])
# px.imshow(train_dataset[100][0])

In [5]:
# View the target variables
train_target_vars = [{"target":i[1], "dataset": "train"} for i in train_dataset]
test_target_vars = [{"target":i[1], "dataset": "test"} for i in test_dataset]
target_vars = [*train_target_vars, *test_target_vars]
target_df = pd.DataFrame.from_records(target_vars)

fig = px.histogram(target_df, x="target", color="dataset", barmode="overlay")
fig.update_layout(bargap=0.1, xaxis_dtick=1)

### 2. Data cleaning + feature engineering
Skipped for the sake of time.
The MNIST dataset is already pretty clean (no noise in images, mislabelled images etc.).
Feature engineering will be mainly done by our CNN. For non-computer vision / image problems, you should think carefully about what features/inputs you will use for your model:
* Are they reliable? Can you get the data consistently in the same format, at the same quality?
* Are they useful? Do they correlate with your target variable?
* Are they independent from each other? Remove features which are double-ups!
* ...

In [6]:
# Convert data into the correct format and normalise (subtract mean, divide by std)
## Compute mean
mean_train = torch.tensor([0], dtype=float)
count = len(train_dataset) * 28 * 28  # number images * image size ** 2
for i in range(len(train_dataset)):
    mean_train += torch.sum(F_tv.pil_to_tensor(train_dataset[i][0]))
mean_train = float(mean_train)
mean_train /= count

## Compute std
std_train = torch.tensor([0], dtype=float)
for i in range(len(train_dataset)):
    std_train += torch.sum((F_tv.pil_to_tensor(train_dataset[i][0]) - mean_train) ** 2)
std_train = float(std_train)
std_train /= count

## Set up a cleaning pipeline
### Note: Pytorch's transforms pipelines are okay for simple cases like this, but in general my preference is to
### use a more generic pipeline framework, like Kedro.
transform = torchvision.transforms.Compose([
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize((0.1307,), (0.3081,))
])

## Pass data through transformations cleaned train/test data
train_clean = torchvision.datasets.MNIST(root=os.getcwd(), download=False, train=True, transform=transform)
test_clean = torchvision.datasets.MNIST(root=os.getcwd(), download=False, train=False, transform=transform)
px.imshow(train_clean[100][0][0].cpu())

### 3. Model training

In [7]:
def train_epoch(data, model, optimizer, epoch):
    model.train()
    for batch_idx, (img, target) in enumerate(data):
        # Put data on GPU/CPU
        img, target = img.to(DEVICE), target.to(DEVICE)

        # Reset optimiser's gradients to 0
        optimizer.zero_grad()
        
        # Run the image through the model
        output = model(img)

        # Score the predicted output
        loss = F.nll_loss(output, target)

        # Calculate gradients
        loss.backward()

        # Update model parameters
        optimizer.step()

    print(f"Train Epoch: {epoch} ({100. * batch_idx / len(data):.0f}%)\tLoss: {loss.item():.6f}")


def test_epoch(data, model):
    pass


def train(train_data, model, optimizer, lr_scheduler, epochs):

    for epoch in range(1, epochs + 1):
        train_epoch(train_data, model, optimizer, epoch)
        # test_epoch(test_data, model)
        lr_scheduler.step()

### 4. Model testing
Should be done at the end. Have included a "validation" score at the end of each epoch.

Sometimes this validation set is used to tune parameters/update the model. In this case, your test set should be completely independent from your training and validation sets.

### 5. Experiment, iterate, and test repeatedly
This is typically done rapidly in a proof-of-concept phase of the project. Sense check the outputs of the model:
* Where is it succeeding and failing? Are these results what you expected?
* Which features/aspects of your data contribute the most to the model's performance? Are these results what you expected?
* Does your model generalise?
* Is your model biased?

In [8]:
# Train a standard CNN and check its performance
class SimpleCNN(nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)  # input channels, output channels, channel size, stride
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)  # dropout probability
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)  # input size, output size
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output


# Create the dataset iterator, model, and optimiser
## Specify model parameters
learning_rate = 1.0
lr_step_size = 1
lr_gamma = 0.7
batch_size = 64
n_epochs = 10

## Load data, passing it through our cleaning pipeline
train_clean_loader = torch.utils.data.DataLoader(train_clean, batch_size=batch_size)
test_clean_loader = torch.utils.data.DataLoader(test_clean, batch_size=batch_size)

simpleCNN_model = SimpleCNN().to(device=DEVICE)
optimizer = torch.optim.Adadelta(simpleCNN_model.parameters(), lr=learning_rate)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=lr_step_size, gamma=lr_gamma)

# Train the model
train(
    train_data=train_clean_loader,
    model=simpleCNN_model,
    optimizer=optimizer,
    lr_scheduler=lr_scheduler,
    epochs=n_epochs,
)

# Save the model
torch.save(simpleCNN_model.state_dict(), "simpleCNN_model.pt")

Train Epoch: 1 (100%)	Loss: 0.016965
Train Epoch: 2 (100%)	Loss: 0.008882
Train Epoch: 3 (100%)	Loss: 0.019224
Train Epoch: 4 (100%)	Loss: 0.014866
Train Epoch: 5 (100%)	Loss: 0.001876
Train Epoch: 6 (100%)	Loss: 0.006772
Train Epoch: 7 (100%)	Loss: 0.000536
Train Epoch: 8 (100%)	Loss: 0.000745
Train Epoch: 9 (100%)	Loss: 0.005401
Train Epoch: 10 (100%)	Loss: 0.000143


In [9]:
# Test out the model on a batch of images from the test set
with torch.no_grad():
    for data, target in test_clean_loader:
        data, target = data.to(DEVICE), target.to(DEVICE)
        pred = simpleCNN_model(data)
        break

pred_idx = 6
# pred_idx = 8
fig = make_subplots(rows=1, cols=2)

fig1 = px.imshow(data[pred_idx][0].cpu())
fig2 = px.bar(torch.exp(pred[pred_idx]).cpu())

fig.add_trace(fig1["data"][0], row=1, col=1)
fig.add_trace(fig2["data"][0], row=1, col=2)

fig.update_coloraxes(colorscale="Plasma")
fig.update_xaxes(dtick=1, row=1, col=2)
fig.update_yaxes(autorange="reversed", row=1, col=1)
fig.update_layout(width=800, height=400)

---
### Example of overfitting - only train on 0-4, test on 0-9.

Overfitting is when your model cannot generalise to new data reliably, due to it fitting too closely to the training dataset.

This can be casued by limited training data which is not reflective of the test dataset, or by having too many parameters in your model, causing the model to "memorise" the dataset.

In [10]:
# Filter out training data with targets >= 5
filt_train = []
for img, target in train_dataset:
    if target < 5:
        filt_train.append((transform(img), target))

## Load data, passing it through our cleaning pipeline
filt_train_loader = torch.utils.data.DataLoader(filt_train, batch_size=batch_size)
test_clean_loader = torch.utils.data.DataLoader(test_clean, batch_size=batch_size)

overfittedCNN_model = SimpleCNN().to(device=DEVICE)
optimizer = torch.optim.Adadelta(overfittedCNN_model.parameters(), lr=learning_rate)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=lr_step_size, gamma=lr_gamma)

# Train the model
train(
    train_data=filt_train_loader,
    model=overfittedCNN_model,
    optimizer=optimizer,
    lr_scheduler=lr_scheduler,
    epochs=n_epochs,
)

# Save the model
torch.save(simpleCNN_model.state_dict(), "overfittedCNN_model.pt")

Train Epoch: 1 (100%)	Loss: 0.002069
Train Epoch: 2 (100%)	Loss: 0.000353
Train Epoch: 3 (100%)	Loss: 0.000096
Train Epoch: 4 (100%)	Loss: 0.000959
Train Epoch: 5 (100%)	Loss: 0.000604
Train Epoch: 6 (100%)	Loss: 0.000079
Train Epoch: 7 (100%)	Loss: 0.000001
Train Epoch: 8 (100%)	Loss: 0.000076
Train Epoch: 9 (100%)	Loss: 0.000072
Train Epoch: 10 (100%)	Loss: 0.000015


In [11]:
# Test out the model on a batch of images from the test set (includes the previously unseen classes, 5-9)
with torch.no_grad():
    for data, target in test_clean_loader:
        data, target = data.to(DEVICE), target.to(DEVICE)
        pred = overfittedCNN_model(data)
        break

# pred_idx = 6
pred_idx = 8
# pred_idx = 9
fig = make_subplots(rows=1, cols=2)

fig1 = px.imshow(data[pred_idx][0].cpu())
fig2 = px.bar(torch.exp(pred[pred_idx]).cpu())

fig.add_trace(fig1["data"][0], row=1, col=1)
fig.add_trace(fig2["data"][0], row=1, col=2)

fig.update_coloraxes(colorscale="Plasma")
fig.update_xaxes(dtick=1, row=1, col=2)
fig.update_yaxes(autorange="reversed", row=1, col=1)
fig.update_layout(width=800, height=400)

### Example of transfer learning

#### Freeze the model and add a head to classify odd vs even numbers

In [12]:
# Set up a modified CNN to be a binary classifier, using the conv2d layers from a pretrained 0-9 classifier
class BinaryCNN(nn.Module):
    def __init__(self, pretrained_model: nn.Module) -> None:
        super().__init__()
        self.conv1 = pretrained_model.conv1  # input channels, output channels, channel size, stride
        self.conv2 = pretrained_model.conv2
        self.dropout = nn.Dropout(0.5)  # dropout probability for fc layers
        self.fc1 = nn.Linear(9216, 128)  # input size, output size
        self.fc2 = nn.Linear(128, 2)

        # Freeze the parameters copied from pretrained_model
        for param in self.conv1.parameters():
            param.requires_grad = False
        for param in self.conv2.parameters():
            param.requires_grad = False

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output


# Set training data targets to be 0 if even, 
binary_train = []
for img, target in train_dataset:
    binary_train.append((transform(img), target % 2))

## Load data, passing it through our cleaning pipeline
binary_train_loader = torch.utils.data.DataLoader(binary_train, batch_size=batch_size)

binaryCNN_model = BinaryCNN(pretrained_model=simpleCNN_model).to(device=DEVICE)
optimizer = torch.optim.Adadelta(binaryCNN_model.parameters(), lr=learning_rate)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=lr_step_size, gamma=lr_gamma)

# Train the model
train(
    train_data=binary_train_loader,
    model=binaryCNN_model,
    optimizer=optimizer,
    lr_scheduler=lr_scheduler,
    epochs=n_epochs,
)

# Save the model
torch.save(binaryCNN_model.state_dict(), "binaryCNN_model.pt")

Train Epoch: 1 (100%)	Loss: 0.002440
Train Epoch: 2 (100%)	Loss: 0.001211
Train Epoch: 3 (100%)	Loss: 0.003874
Train Epoch: 4 (100%)	Loss: 0.000483
Train Epoch: 5 (100%)	Loss: 0.001338
Train Epoch: 6 (100%)	Loss: 0.000098
Train Epoch: 7 (100%)	Loss: 0.000033
Train Epoch: 8 (100%)	Loss: 0.000075
Train Epoch: 9 (100%)	Loss: 0.001063
Train Epoch: 10 (100%)	Loss: 0.000168


In [13]:
# Test out the model on a batch of images from the test set
with torch.no_grad():
    for data, target in test_clean_loader:
        data, target = data.to(DEVICE), target.to(DEVICE)
        pred = binaryCNN_model(data)
        break

# pred_idx = 6
# pred_idx = 8
pred_idx = 9
fig = make_subplots(rows=1, cols=2)

fig1 = px.imshow(data[pred_idx][0].cpu())
fig2 = px.bar(torch.exp(pred[pred_idx]).cpu())

fig.add_trace(fig1["data"][0], row=1, col=1)
fig.add_trace(fig2["data"][0], row=1, col=2)

fig.update_coloraxes(colorscale="Plasma")
fig.update_xaxes(dtick=1, row=1, col=2)
fig.update_yaxes(autorange="reversed", row=1, col=1)
fig.update_layout(width=800, height=400)

### Example of model interpretability

Display the first conv2d layer from our initial model (simpleCNN)

In [14]:
fig = px.imshow(simpleCNN_model.conv1.weight.detach().clone()[:, 0, ...].cpu().numpy(), facet_col=0, facet_col_wrap=4, height=1500, facet_col_spacing=0, facet_row_spacing=0, color_continuous_scale="RdBu", color_continuous_midpoint=0)
fig.update_xaxes(showticklabels=False).update_yaxes(showticklabels=False)

SHAP interpretation - which regions of the image contributed towards/against each class's output predication probability

Example taken from SHAP documentation

<img src="assets/shap.png">

In [15]:
def dummy_model_output(x):
    tmp = x.copy()
    tmp = torch.from_numpy(tmp).to(DEVICE)
    tmp = torchvision.transforms.Normalize((0.1307,), (0.3081,))(tmp)
    return simpleCNN_model(tmp)

raw_test_loader = torch.utils.data.DataLoader(
    torchvision.datasets.MNIST(root=os.getcwd(), download=False, train=False, transform=torchvision.transforms.ToTensor()),
    batch_size=batch_size,
)

batch = next(iter(raw_test_loader))
imgs, _ = batch

masker = shap.maskers.Image("blur(28,28)", imgs[0].shape)

explainer = shap.Explainer(dummy_model_output, masker, output_names=[x for x in range(10)])

shap_values = explainer(imgs[0:3], max_evals=10000, batch_size=64)

In [21]:
fig = make_subplots(rows=3, cols=11, subplot_titles=["input", *[str(x) for x in range(10)]])

for i in range(3):
    fig_i = px.imshow(shap_values.data[i, 0].numpy())
    fig.add_trace(fig_i["data"][0], row=i+1, col=1)
    fig.update_yaxes(autorange="reversed", row=i+1, col=1)

    for j in range(0, 10):
        fig_ij = px.imshow(shap_values.values[i, 0, :, :, j])
        fig.add_trace(fig_ij["data"][0], row=i+1, col=j+2)
        fig.update_coloraxes(colorscale="Plasma", row=i+1, col=j+2)
        fig.update_yaxes(autorange="reversed", row=i+1, col=j+2)

fig.update_coloraxes(colorscale="RdBu", cmin=-0.1, cmax=0.1)
fig.update_layout(width=1500, height=600)
fig

### Spend your time wisely

Recommend starting to solve a project by trying classical CV approached. If they prove to be unreliable, or fail to generalise well, then explore deep learning solutions.

Explore existing implementations before building a custom implementation/modification of a deep learning model. Large repositories exist of models implemented from research papers, with pre-trained backbones; e.g.: https://github.com/qubvel-org/segmentation_models.pytorch

To improve model performance, typically your time is better spent improving your dataset (more data, cleaner data, simplifying the problem, synthetic augmentation) than developing a custom model.