In [None]:
# For tips on running notebooks in Google Colab, see
# https://pytorch.org/tutorials/beginner/colab
%matplotlib inline

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import numpy as np

plt.ion()   # interactive mode

Loading the data
================



In [None]:
from torch.utils.data import DataLoader

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

base_dir = 'drive/MyDrive/datasetL'
train_dir = f'{base_dir}/train'
test_dir = f'{base_dir}/val'

image_transforms = {
    'train': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.Grayscale(num_output_channels=1), # Convert images to grayscale
        transforms.RandomAffine(degrees=45, translate=(0.8,0.8), scale=(0.5,1.5)),
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ]),
    'val': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.Grayscale(num_output_channels=1), # Convert images to grayscale
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])
}

train_dataset = datasets.ImageFolder(root=train_dir, transform=image_transforms['train'])
test_dataset = datasets.ImageFolder(root=test_dir, transform=image_transforms['val'])

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=True, num_workers=4)



Depicting spatial transformer networks
======================================

Spatial transformer networks boils down to three main components :

-   The localization network is a regular CNN which regresses the
    transformation parameters. The transformation is never learned
    explicitly from this dataset, instead the network learns
    automatically the spatial transformations that enhances the global
    accuracy.
-   The grid generator generates a grid of coordinates in the input
    image corresponding to each pixel from the output image.
-   The sampler uses the parameters of the transformation and applies it
    to the input image.

![](https://pytorch.org/tutorials/_static/img/stn/stn-arch.png)

<div style="background-color: #54c7ec; color: #fff; font-weight: 700; padding-left: 10px; padding-top: 5px; padding-bottom: 5px"><strong>NOTE:</strong></div>
<div style="background-color: #f3f4f7; padding-left: 10px; padding-top: 10px; padding-bottom: 10px; padding-right: 10px">
<p>We need the latest version of PyTorch that containsaffine_grid and grid_sample modules.</p>
</div>


In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        # Adjusting the in_channels for the first convolutional layers to 1 for greyscale images
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5, padding=2)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5, padding=2)
        self.conv2_drop = nn.Dropout2d(p=0.5)
        self.conv3 = nn.Conv2d(20, 40, kernel_size=5, padding=2)  # Additional Conv layer
        self.conv3_drop = nn.Dropout2d(p=0.5)  # Additional dropout layer
        self.pool = nn.MaxPool2d(2, stride=2)

        # Spatial transformer localisation-network adjustments for greyscale input
        self.localization = nn.Sequential(
            nn.Conv2d(1, 8, kernel_size=7, padding=3),
            nn.MaxPool2d(2, stride=2),
            nn.ReLU(True),
            nn.Conv2d(8, 10, kernel_size=5, padding=2),
            nn.MaxPool2d(2, stride=2),
            nn.ReLU(True)
        )

        # Placeholder for dynamic adjustment of fc_loc and fc1 input sizes
        self.fc_loc = nn.Sequential(
            nn.Linear(1, 32),  # Placeholder values, will be dynamically adjusted
            nn.ReLU(True),
            nn.Linear(32, 3 * 2)
        )

        self.fc1 = nn.Linear(1, 100)  # Placeholder value, will be dynamically adjusted
        self.fc2 = nn.Linear(100, 10)

        # Initialise the weights/bias with identity transformation
        self.fc_loc[2].weight.data.zero_()
        self.fc_loc[2].bias.data.copy_(torch.tensor([1, 0, 0, 0, 1, 0], dtype=torch.float))

        # Placeholder for the dynamically calculated size
        self.fc_loc_input_size = None
        self.fc1_input_size = None


    def stn(self, x):
        xs = self.localization(x)
        if self.fc_loc_input_size is None:
            # Calculates the output size of the localization network dynamically
            self.fc_loc_input_size = xs.nelement() // xs.shape[0]
            # Assuming we need 32 features before the fully connected layers
            self.fc_loc[0] = nn.Linear(self.fc_loc_input_size, 32).to(x.device)
        xs = xs.view(-1, self.fc_loc_input_size)
        theta = self.fc_loc(xs)
        theta = theta.view(-1, 2, 3)

        # Batch size is N
        N = theta.shape[0]

        grid = F.affine_grid(theta, x.size(), align_corners=False)
        x = F.grid_sample(x, grid, align_corners=False)
        return x


    def forward(self, x):
        # STN transformation
        xs = self.localization(x)
        if self.fc_loc_input_size is None:
            # Dynamically calculates input size for fc_loc
            self.fc_loc_input_size = xs.nelement() // xs.shape[0]  # Total elements divided by batch size
            self.fc_loc[0] = nn.Linear(self.fc_loc_input_size, 32).to(x.device)

        xs = xs.view(-1, self.fc_loc_input_size)
        theta = self.fc_loc(xs)
        theta = theta.view(-1, 2, 3)
        grid = F.affine_grid(theta, x.size(), align_corners=False)
        x = F.grid_sample(x, grid, align_corners=False)

        # Normal forward pass
        x = F.relu(self.pool(self.conv1(x)))
        x = F.relu(self.pool(self.conv2_drop(self.conv2(x))))
        x = F.relu(self.pool(self.conv3_drop(self.conv3(x))))

        if self.fc1_input_size is None:
            # Dynamically calculates input size for fc1
            self.fc1_input_size = x.nelement() // x.shape[0]  # Total elements divided by batch size
            self.fc1 = nn.Linear(self.fc1_input_size, 100).to(x.device)

        x = x.view(-1, self.fc1_input_size)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)

        return F.log_softmax(x, dim=1)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Net().to(device)

Training the model
==================

Now, let\'s use the SGD algorithm to train the model. The network is
learning the classification task in a supervised way. In the same time
the model is learning STN automatically in an end-to-end fashion.


In [None]:
optimizer = optim.SGD(model.parameters(), lr=0.01)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=2)

def train(epoch):
    model.train()
    train_loss = 0
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)

        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        train_loss += loss.item() * data.size(0)
        loss.backward()
        optimizer.step()
        if batch_idx % 500 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))
    train_loss /= len(train_loader.dataset)
    return train_loss
#
# A simple test procedure to measure the STN performances on MNIST.
#


def test():
    with torch.no_grad():
        model.eval()
        test_loss = 0
        correct = 0
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)

            # sums up batch loss
            test_loss += F.nll_loss(output, target, size_average=False).item()
            # gets the index of the max log-probability
            pred = output.max(1, keepdim=True)[1]
            correct += pred.eq(target.view_as(pred)).sum().item()

        test_loss /= len(test_loader.dataset)
        accuracy = 100. * correct / len(test_loader.dataset)
        print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'
              .format(test_loss, correct, len(test_loader.dataset),
                      accuracy))
    return test_loss

Visualizing the STN results
===========================

Now, we will inspect the results of our learned visual attention
mechanism.

We define a small helper function in order to visualize the
transformations while training.


In [None]:
def convert_image_np(inp):
    """Convert a Tensor to numpy image."""
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    return inp

# We want to visualize the output of the spatial transformers layer
# after the training, we visualise a batch of input images and
# the corresponding transformed batch using STN.


def visualize_stn():
    with torch.no_grad():
        # Gets a batch of training data
        data = next(iter(test_loader))[0].to(device)

        input_tensor = data.cpu()
        transformed_input_tensor = model.stn(data).cpu()

        in_grid = convert_image_np(
            torchvision.utils.make_grid(input_tensor))

        out_grid = convert_image_np(
            torchvision.utils.make_grid(transformed_input_tensor))

        # Plots the results side-by-side
        f, axarr = plt.subplots(1, 2, figsize=(20, 10))
        axarr[0].imshow(in_grid)
        axarr[0].set_title('Dataset Images')

        axarr[1].imshow(out_grid)
        axarr[1].set_title('Transformed Images')

early_stopping_patience = 5
best_loss = float('inf')
epochs_no_improve = 0
num_epochs = 5
train_losses = []
test_losses = []

for epoch in range(1, num_epochs + 1):
    train_loss = train(epoch)
    train_losses.append(train_loss)
    test_loss = test()
    test_losses.append(test_loss)
    # Steps the scheduler with the test loss
    scheduler.step(test_loss)

    # Checks early stopping conditions
    if test_loss < best_loss:
      best_loss = test_loss
      epochs_no_improve = 0
    else:
      epochs_no_improve += 1
      if epochs_no_improve == early_stopping_patience:
        print("Early stopping!")
        break

In [None]:
# Visualises the STN transformation on some input batch
visualize_stn()

plt.ioff()
plt.show()

In [None]:
def visualize_comparison(model, test_loader, class_names, num_images=2):
    # Gets a batch of test data
    data, true_labels = next(iter(test_loader))
    data = data.to(device)

    with torch.no_grad():
        # Passes the batch through the network to get the transformed output and predictions
        transformed_data = model.stn(data)
        output = model(data)
        _, predicted_labels = torch.max(output, 1)

    # Move data back to CPU for visualization
    data = data.cpu().numpy()
    transformed_data = transformed_data.cpu().numpy()
    true_labels = true_labels.cpu().numpy()
    predicted_labels = predicted_labels.cpu().numpy()

    # Initialises a plot
    fig, axes = plt.subplots(num_images, 2, figsize=(10, 4 * num_images))

    for i in range(num_images):
        # Gets the original image, true label, transformed image, and predicted label
        original_img = data[i].squeeze()
        true_label = class_names[true_labels[i]]
        transformed_img = transformed_data[i].squeeze()
        predicted_label = class_names[predicted_labels[i]]

        # Displays original image with true label
        ax = axes[i, 0]
        ax.imshow(original_img, cmap='gray')
        ax.axis('off')
        ax.set_title(f'Original Image\nTrue Label: {true_label}')

        # Displays transformed image with predicted label
        ax = axes[i, 1]
        ax.imshow(transformed_img, cmap='gray')
        ax.axis('off')
        ax.set_title(f'Transformed Image\nPredicted Label: {predicted_label}')

    plt.tight_layout()
    plt.show()

# Defines the class names (assuming you have 5 classes)
class_names = ['Grade 0', 'Grade 1', 'Grade 2', 'Grade 3', 'Grade 4']

# Calls the visualization function
visualize_comparison(model, test_loader, class_names)

In [None]:
# Training and validation losses plotted
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Training loss')
plt.plot(test_losses, label='Validation loss')
plt.title('Loss over epochs')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()