## Chinese Character Detection

In [None]:
import os
import math
import json
import time
import pickle
import random
from PIL import Image
from datetime import datetime
from joblib import Parallel, delayed

from tqdm import tqdm
import numpy as np
import pandas as pd
import matplotlib.path as mplpath
import matplotlib.pyplot as plt
import matplotlib
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

import torch
from torch import nn
from torch.utils.data import DataLoader
from torch.autograd import Variable
from torch.optim import Adam

%matplotlib inline

random.seed(42)

## Part 1: data preparation

First of all, we need to specify a couple of paths which point to the following files:

  - An image directory
  - A directory containing an `info.json` file which documents metadata about the images, such as height and width in pixels. This directory should also contain a `jsonl` file which specifies the locations of the bounding boxes in each image.
  
These variables are defined below, and need to be populated before the notebook can be executed:

In [None]:
IMAGE_DIR = "ENTER PATH HERE"        # For example: "/scratch/lt2326-h21/a1/images"
JSON_DIR = "ENTER JSON PATH HERE"    # For example: "/scratch/lt2326-h21/a1/"

In [None]:
info = json.load(open(os.path.join(JSON_DIR, "info.json")))

Check that all images have the same height and width in pixels:

In [None]:
print(set(i["height"] for i in info["train"]))
print(set(i["width"] for i in info["train"]))

Open train JSON file and check that the length is the same as `info.json`:

In [None]:
with open(os.path.join(JSON_DIR, "train.jsonl")) as file:
    data = [json.loads(x) for x in file][:10]

Filter down the entries in `data` to just those which are actually in `images`:

In [None]:
image_list = os.listdir(IMAGE_DIR)
test_data = [item for item in data if item["file_name"] in image_list]
print(f"There are {len(test_data)} items in the data")

## Extract and Process Polygons for Test Images

In [None]:
GRID = np.array([[[a,b] for b in list(range(2048))] for a in list(range(2048))]).reshape((2048 * 2048, 2))

def truncate(number, digits = 0) -> int:
    stepper = 10.0 ** digits
    return int(math.trunc(stepper * number) / stepper)


def get_truncated_polygons(annotations):
    polygons = []

    for sentence in annotations:
        for character in sentence:
            if character["is_chinese"]:
                truncated = []
                for (x, y) in character["polygon"]:
                    truncated.append([truncate(x), truncate(y)])
                polygons.append(truncated)

    return polygons

def get_polygon_matrix(annotations):
    polygons = get_truncated_polygons(annotations)
        
    truth_values = np.array([False] * GRID.shape[0])
    list_of_paths = [mplpath.Path(arr) for arr in np.array(polygons)]
    
    for path in list_of_paths:
        truth_values += path.contains_points(GRID)
    
    truth_values = np.asarray(truth_values, int).reshape((2048, 2048)).T
    return truth_values
    
def convert_image_to_numpy_array(file_name):
    path = os.path.join(IMAGE_DIR, file_name)
    return np.array(Image.open(path), dtype=np.int)

def process_image_in_parallel(image):
    return (
        convert_image_to_numpy_array(image["file_name"]),
        get_polygon_matrix(image["annotations"]),
    )

The functions above take an `image` entry from the `jsonl` file and process the JSON contents. The `file_name` is used to load the image itself into memory before converting to a numpy array. Additionally, the `annotations` data is used to create a matrix of 0s and 1s indicating whether or not a specific pixel is part of a bounding box.

Processing all images individually takes far too long, so the [`joblib`](https://joblib.readthedocs.io/en/latest/parallel.html) library is used to run the processing in parallel threads for each subset of the data. For the training set, this reduced the overall runtime of the processing from around 34 minutes to less than 500 seconds.

Each processed image returns a tuple, so the [`joblib`](https://joblib.readthedocs.io/en/latest/parallel.html) library returns a list of tuples, where the first element is the input data (`X`) and the second element is the output data (`y`)

In [None]:
start = time.time()
processed_data = Parallel(n_jobs=100)(delayed(process_image_in_parallel)(image) for image in tqdm(test_data))
end = time.time()

print("Processing data took", round(end - start, 0), "seconds")

Before splitting the data into training, testing, and validation sets, we must first convert the list of tuples into a list of lists. Here, the first list is all the input data points, while the second list is all the target data points.

In [None]:
X, y = list(map(list, zip(*processed_data)))

Finally, we should sense check the processing by inspecting a random sample of data points and ensuring both that the images look as we would expect, and that the bounding boxes are where they should be.

In [None]:
def sample_and_display(X_subset, y_subset):
    idx = random.sample(range(0, len(X_subset)), 1)[0]

    cmap = matplotlib.colors.ListedColormap([np.array([0, 0, 0, 0]), 'lawngreen'])
    bounds = [0., 0.5, 1.]
    norm = matplotlib.colors.BoundaryNorm(bounds, cmap.N)

    plt.figure(figsize=(8,8))
    plt.imshow(X_subset[idx])
    plt.imshow(y_subset[idx], interpolation='none', cmap=cmap, norm=norm, alpha=0.5);

In [None]:
sample_and_display(X, y)

# Part 2: the models

At this point we have processed our data, but need an easy way to interface with the PyTorch models which will be defined later in this section. To achieve this, we implement a simple Dataset class. It would be possible to actually call the processing of each subset in the `__init__` method, but this isn't an important design decision.

In [None]:
import torch
from torch.utils.data import Dataset

class CustomImageDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        x = torch.Tensor(self.X[idx])
        y = torch.Tensor(self.y[idx].astype(int))
        return x, y

Now that the datasets are set up, we can pass them into a DataLoader to batch and shuffle them for input into a model.

In [None]:
BATCH_SIZE = 2

test_dataset = CustomImageDataset(X=X, y=y)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True)

test_batch_count = len(test_dataloader.dataset) // BATCH_SIZE

Next, we specify the device:

device = torch.device("cuda") if torch.cuda.is_available() else 'cpu'
print('Using {} device'.format(device))

Now that we have created some dataloaders, we can define the first model:

In [None]:
class NoPoolsAllowed(nn.Module):   
    def __init__(self):
        super(NoPoolsAllowed, self).__init__()
        self.conv1 = nn.Conv2d(3, 6, (5,5), 3, (1,1))
        self.sigmoid1 = nn.Sigmoid()
        
        self.conv2 = nn.Conv2d(6, 12, (3,3), 2, (1,1))
        self.sigmoid2 = nn.Sigmoid()
        
        self.up1 = nn.Upsample((512,512))
        
        self.conv3 = nn.Conv2d(12, 1, (3,3), 1, (1,1))
        self.up2 = nn.Upsample((2048,2048))
        self.sigmoid3 = nn.Sigmoid()
        
    # Defining the forward pass    
    def forward(self, x):
        x = x.permute((0,3,1,2))
        x = self.conv1(x)
        x = self.sigmoid1(x)
        x = self.conv2(x)
        x = self.sigmoid2(x)
        x = self.up1(x)
        x = self.conv3(x)
        x = self.up2(x)
        x = self.sigmoid3(x)
        
        return x

We then define the second model, which takes inspiration from [UNet](https://arxiv.org/abs/1505.04597) (albeit simplified so as not to break MLTGPU, which it did anyway):

In [None]:
class UnspiredNet(nn.Module):
    def __init__(self):
        super(UnspiredNet, self).__init__()
        # Encoder Block 1
        self.enc1_conv1 = nn.Conv2d(3, 8, kernel_size=3, padding=1)
        self.enc1_bn1 = nn.BatchNorm2d(8)
        self.enc1_conv2 = nn.Conv2d(8, 8, kernel_size=3, padding=1)
        self.enc1_bn2 = nn.BatchNorm2d(8)
        self.enc1_relu = nn.ReLU()
        self.enc1_pool = nn.MaxPool2d((2, 2))
        
        # Encode Block 2
        self.enc2_conv1 = nn.Conv2d(8, 16, kernel_size=3, padding=1)
        self.enc2_bn1 = nn.BatchNorm2d(16)
        self.enc2_conv2 = nn.Conv2d(16, 16, kernel_size=3, padding=1)
        self.enc2_bn2 = nn.BatchNorm2d(16)
        self.enc2_relu = nn.ReLU()
        self.enc2_pool = nn.MaxPool2d((2, 2))
        
        # Bottleneck
        self.bot_conv1 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.bot_bn1 = nn.BatchNorm2d(32)
        self.bot_conv2 = nn.Conv2d(32, 32, kernel_size=3, padding=1)
        self.bot_bn2 = nn.BatchNorm2d(32)
        self.bot_relu = nn.ReLU()
        
        # Decoder Block 1
        self.dec1_up = nn.ConvTranspose2d(32, 16, kernel_size=2, stride=2, padding=0)
        self.dec1_conv1 = nn.Conv2d(32, 16, kernel_size=3, padding=1)
        self.dec1_bn1 = nn.BatchNorm2d(16)
        self.dec1_conv2 = nn.Conv2d(16, 16, kernel_size=3, padding=1)
        self.dec1_bn2 = nn.BatchNorm2d(16)
        self.dec1_relu = nn.ReLU()
        
        # Decoder Block 2
        self.dec2_up = nn.ConvTranspose2d(16, 8, kernel_size=2, stride=2, padding=0)
        self.dec2_conv1 = nn.Conv2d(16, 8, kernel_size=3, padding=1)
        self.dec2_bn1 = nn.BatchNorm2d(8)
        self.dec2_conv2 = nn.Conv2d(8, 8, kernel_size=3, padding=1)
        self.dec2_bn2 = nn.BatchNorm2d(8)
        self.dec2_relu = nn.ReLU()
        
        self.f_conv = nn.Conv2d(8, 1, kernel_size=1, padding=0)
        self.f_sig = nn.Sigmoid()
    
    def forward(self, inputs):
        inputs = inputs.permute((0,3,1,2))
        
        # Run through Encoder Block 1
        enc1_x = self.enc1_relu(self.enc1_bn2(self.enc1_conv2(self.enc1_bn1(self.enc1_conv1(inputs)))))
        enc1_p = self.enc1_pool(enc1_x)
        
        # Run through Encoder Block 2
        enc2_x = self.enc2_relu(self.enc2_bn2(self.enc2_conv2(self.enc2_bn1(self.enc2_conv1(enc1_p)))))
        enc2_p = self.enc2_pool(enc2_x)
        
        # Run through Bottleneck
        bot_x = self.bot_relu(self.bot_conv2(self.bot_bn1(self.bot_conv1(enc2_p))))
        
        # Run through Decoder Block 1
        dec1_x = self.dec1_relu(
            self.dec1_conv2(
                self.dec1_bn1(
                    self.dec1_conv1(
                        torch.cat( # The output of convTrans2D is concatenated with enc2_x -> output of last enc block
                            [
                                self.dec1_up(bot_x), # the first operation puts the bottleneck output through the convTrans2D
                                enc2_x
                            ],
                            axis=1
                        )
                    )
                )
            )
        )
        
        # Run through Decoder 2
        dec1_x = self.dec2_relu(
            self.dec2_conv2(
                self.dec2_bn1(
                    self.dec2_conv1(
                        torch.cat( # The output of convTrans2D is concatenated with enc2_x -> output of last enc block
                            [
                                self.dec2_up(dec1_x), # the first operation puts the bottleneck output through the convTrans2D
                                enc1_x
                            ],
                            axis=1
                        )
                    )
                )
            )
        )
        
        # Run through Output
        conv = self.f_conv(dec1_x)
        return self.f_sig(conv)


Before training the model, let's define a helper function that will run the training and validation for us. The function takes a name, a model class, a device, an optimizer, a learning rate and the number of epochs and does all the training loop for us, before running through the validation data. For both datasets, the function stores the average loss per epoch and, along with the model parameters, saves these to a unique name.

## Testing and Evaluation of the Two Models

In [None]:
def get_predictions(raw_preds):
    batch_predictions = []
    for item in raw_preds:
        batch_predictions.append(np.asarray(item.detach().cpu().numpy() > THRESHOLD, int))
    return batch_predictions


def get_evaluation_variables(parameter_file, device="cuda:2"):
    this_model = torch.load(parameter_file).to(device)
    
    with torch.no_grad():
        this_model.eval()
        
        test_images = []
        test_preds = []
        test_truth = []
        
        #DEBUG
        raw_preds = []

        loss_fn = nn.BCELoss()
        test_loss = 0

        for (x, y) in test_dataloader:
            (x, y) = (x.to(device), y.to(device))

            batch_preds = this_model(x)
            test_loss += loss_fn(batch_preds.squeeze(dim=1), y)
            
            test_images.append(x.detach().cpu())
            
            #DEBUG
            raw_preds.append(batch_preds.squeeze(dim=1))
            test_preds.extend(get_predictions(batch_preds.squeeze(dim=1)))
            test_truth.append(y.detach().cpu())
           
    
        avg_test_loss = test_loss / test_batch_count
        test_preds = test_preds # this is the problematic one
        test_images = test_images # np.concat
        test_truth = test_truth # np.concat
        
        #DEBUG
        raw_preds = raw_preds # np.concatenate
    
    return test_images, test_preds, test_truth, avg_test_loss, raw_preds


def get_evaluation_metrics(preds, ground_truth, avg_test_loss, device="cpu"):
    prediction_tensor = torch.Tensor(preds).to(device)
    ground_truth_tensor = torch.Tensor(ground_truth).to(device)
    confusion_vector = prediction_tensor / ground_truth_tensor

    tp = torch.sum(confusion_vector == 1).item()
    fp = torch.sum(confusion_vector == float('inf')).item()
    tn = torch.sum(torch.isnan(confusion_vector)).item()
    fn = torch.sum(confusion_vector == 0).item()

    accuracy = (tp + tn) / (tp + fp + tn + fn)
    mse = mean_squared_error(
        ground_truth.reshape(len(ground_truth_np)*2048*2048),
        preds.reshape(len(preds)*2048*2048)
    )
    recall = tp / (tp + fn)

    try:
        precision = tp / (tp + fp)
    except ZeroDivisionError:
        precision = 0

    try:
        f1_score = 2 * ((precision * recall) / (precision + recall))
    except ZeroDivisionError:
        f1_score = 0

    return pd.DataFrame({
        "Average Test Loss": avg_test_loss.item(),
        "Accuracy": accuracy,
        "Recall": recall,
        "Precision": precision,
        "F1-Score": f1_score,
        "Mean Squared Error": mse,
    }, index=["Test Metrics"]).round(4)


def display_training_validation_loss(history_file):
    history = pd.DataFrame(pickle.load(open(history_file, "rb")))
    history["Epoch"] = [e for e in range(1, len(history) + 1)]
    history = history.rename(columns={"train_loss": "Train Loss", "val_loss": "Validation Loss"})

    plt.plot("Epoch", "Train Loss", data=history, marker='', color='skyblue', linewidth=2)
    plt.plot("Epoch", "Validation Loss", data=history, marker='', color='olive', linewidth=2)

    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.legend()

    plt.show()
    
def visualise_probability_map(images, preds, ground_truth, idx=None, save_name=None):
    if not idx:
        idx = random.sample(range(0, len(images)), 1)[0]
    print(f"Showing image #{idx}")

    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15,8))

    cmap = matplotlib.colors.ListedColormap([[0, 0, 0, 0], 'lawngreen'])
    bounds = np.array([0., 0.5, 1.],dtype=object,)
    norm = matplotlib.colors.BoundaryNorm(bounds, cmap.N)

    ax1.imshow(images[idx].astype(int))
    ax1.imshow(preds[idx], interpolation='none', cmap=cmap, alpha=1);
    ax1.set_title("Predicted")

    ax2.imshow(images[idx].astype(int))
    ax2.imshow(ground_truth[idx], interpolation='none', cmap=cmap, alpha=0.8)
    ax2.set_title("Ground Truth");
    
    if save_name:
        fig.savefig(f"{save_name}_{idx}.png")

## Evaluation for `NoPoolsAllowed`:

For the `NoPoolsAllowed` model, we use the `parameters_NoPoolsAllowed_Adam_0.01_20.pth` parameter file, as well as a `THRESHOLD` which sets the threshold for categorisation as a bounding box:

In [None]:
THRESHOLD = 0.005

images, preds, ground_truth, avg_test_loss, debug_preds = get_evaluation_variables(
    parameter_file="./parameters_NoPoolsAllowed_Adam_0.01_20.pth", 
)

In [None]:
images_np = np.concatenate([img.detach().cpu().numpy() for img in images])
ground_truth_np = np.concatenate([gt.detach().cpu().numpy() for gt in ground_truth])
preds = np.stack(preds)

In [None]:
get_evaluation_metrics(preds, ground_truth_np, avg_test_loss)

In [None]:
display_training_validation_loss(history_file="history_NoPoolsAllowed_Adam_0.01_20.pic")

In [None]:
visualise_probability_map(images_np, preds, ground_truth_np, idx=2, save_name="NoPoolsAllowed_Adam_0.01_20")

## Evaluation for `UnspiredNet`:

Since the `UnspiredNet` seemed to actually learn something, it was possible to use a much higher `THRESHOLD` for pixel categorization:

In [None]:
THRESHOLD = 0.05

images, preds, ground_truth, avg_test_loss, debug_preds = get_evaluation_variables(
    parameter_file="./parameters_UnspiredNet_Adam_0.01_5.pth", 
)

In [None]:
images_np = np.concatenate([img.detach().cpu().numpy() for img in images])
ground_truth_np = np.concatenate([gt.detach().cpu().numpy() for gt in ground_truth])
preds = np.stack(preds)

In [None]:
get_evaluation_metrics(preds, ground_truth_np, avg_test_loss)

In [None]:
display_training_validation_loss(history_file="history_UnspiredNet_Adam_0.01_5.pic")

In [None]:
visualise_probability_map(images_np, preds, ground_truth_np, idx=2, save_name="UnspiredNet_Adam_0.01_5")