### Environment Setup

In [None]:
import torch
from torchvision.datasets import ImageFolder
from torch.utils.data import random_split
from torchvision.transforms import v2
from torch.utils.data import DataLoader
import torch.nn.functional as F
import torch.nn as nn
import torch.optim as optim
from PIL import Image

import matplotlib.pyplot as plt
import numpy as np
import math
import time
import os
import glob
import seaborn as sn
import pandas as pd
from sklearn.preprocessing import label_binarize
from sklearn.metrics import average_precision_score, precision_recall_curve
from sklearn.metrics import average_precision_score
from sklearn.metrics import PrecisionRecallDisplay

import mediapipe as mp

In [None]:
# set up the device
if torch.cuda.is_available():
    device = torch.device('cuda')          
    print(f'Using GPU: {torch.cuda.get_device_name(0)}')
elif torch.backends.mps.is_built():
    device = 'mps' 
    print(f'using {device}')
else:
    device = torch.device('cpu')
    print(f'using {device}')

### Data Processing

In [None]:
# unzip the dataset: please manually download at https://www.kaggle.com/datasets/grassknoted/asl-alphabet/data
!tar -xvzf data/asl_alphabet_train.zip -q

In [None]:
# function to normalize images
def normalize_image(tensorimage):
    image_min = tensorimage.min()
    image_max = tensorimage.max()
    tensorimage.clamp_(min=image_min, max=image_max)
    tensorimage.add_(-image_min).div_(image_max - image_min + 1e-5)
    return tensorimage

In [None]:
# function to get class name from numeric label
def get_class(dataloader, label):
    # define the class to label dictionary
    class2label = dataloader.dataset.dataset.class_to_idx
    # revert the dictionary
    label2class = {v: k for k, v in class2label.items()}

    # return the corresponding class name and print error if label is undefined
    return label2class.get(label, 'label not found')

In [None]:
# function to process images by mediapipe
def process_images(input_folder, output_folder):
    # create mediapipe model
    mp_hands = mp.solutions.hands
    hands = mp_hands.Hands(static_image_mode=True, 
                           max_num_hands=1, 
                           min_detection_confidence=0.5)
    
    # go through each subfolder
    for class_folder in os.listdir(input_folder):
        input_class_path = os.path.join(input_folder, class_folder)
        output_class_path = os.path.join(output_folder, class_folder)

        # create output subfolder
        if not os.path.exists(output_class_path):
            os.makedirs(output_class_path)

        for image_path in glob.glob(os.path.join(input_class_path, '*.jpg')):
            # read and convert the image to rgb
            image = Image.open(image_path)
            image_rgb = np.array(image.convert('RGB'))

            # process through mediapipe model
            results = hands.process(image_rgb)
            if results.multi_hand_landmarks:
                # drop the marks on image
                for hand_landmarks in results.multi_hand_landmarks:
                    mp.solutions.drawing_utils.draw_landmarks(image_rgb, hand_landmarks, mp_hands.HAND_CONNECTIONS)

            # save the output image
            image_mp = Image.fromarray(image_rgb)
            base_name = os.path.basename(image_path)
            image_mp.save(os.path.join(output_class_path, base_name))

In [None]:
# process images
input_folder = 'data/asl_alphabet_train'
output_folder = 'data/asl_alphabet_train_mp'
process_images(input_folder, output_folder)
# load the dataset
folder_mp = 'data/asl_alphabet_train_mp'
dataset_mp = ImageFolder(root=folder)

In [None]:
# for val and test transforms
transforms_valtest = v2.Compose([
    # transform to tensor
    v2.ToImage(),
    v2.ToDtype(torch.float32, scale=True),
    # normalize
    normalize_image,
    # crop the image
    v2.Resize(size=(200, 200), antialias=True)
])

In [None]:
# load the dataset
folder = 'data/asl_alphabet_train'
dataset = ImageFolder(root=folder)

# number of images in the dataset
dataset_size = len(dataset)

# define split sizes; avoid rounding errors
size_train = int(dataset_size / 10 * 7)
size_val = int(dataset_size / 10 * 1.5)
size_test = int(dataset_size / 10 * 1.5)

# split the dataset
dataset_train, dataset_val, dataset_test = random_split(dataset, 
                                                        [int(size_train), 
                                                         int(size_val), 
                                                         int(size_test)])

In [None]:
# number of images in the dataset
dataset_size_mp = len(dataset_mp)

# define split sizes; avoid rounding errors
size_train_mp = int(dataset_size_mp / 10 * 7)
size_val_mp = int(dataset_size_mp / 10 * 1.5)
size_test_mp = int(dataset_size_mp / 10 * 1.5)

# split the dataset
dataset_train_mp, dataset_val_mp, dataset_test_mp = random_split(dataset_mp, 
                                                        [int(size_train_mp), 
                                                         int(size_val_mp), 
                                                         int(size_test_mp)])

In [None]:
dataset_test.dataset = ImageFolder(root=folder, transform=transforms_valtest)
dataset_test_mp.dataset = ImageFolder(root=folder_mp, transform=transforms_valtest)

batch_size = 32

loader_test = DataLoader(dataset_test, batch_size=batch_size, shuffle=False)
loader_test_mp = DataLoader(dataset_test_mp, batch_size=batch_size, shuffle=False)

### Model Structure

In [None]:
class BasicCNN(nn.Module):
    def __init__(self):
        super(BasicCNN, self).__init__()
        
        # block1, 200x200
        self.conv1a = nn.Conv2d(3, 8, kernel_size=3, padding=1)
        self.bn1a = nn.BatchNorm2d(8)
        self.conv1b = nn.Conv2d(8, 16, kernel_size=3, padding=1)
        self.bn1b = nn.BatchNorm2d(16)
        self.pool1 = nn.MaxPool2d(kernel_size=2)
        
        # block2, 100x100
        self.conv2a = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.bn2a = nn.BatchNorm2d(32)
        self.conv2b = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn2b = nn.BatchNorm2d(64)
        self.pool2 = nn.MaxPool2d(kernel_size=2)
        
        # block3, 50x50
        self.conv3a = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn3a = nn.BatchNorm2d(128)
        self.conv3b = nn.Conv2d(128, 128, kernel_size=3, padding=1)
        self.bn3b = nn.BatchNorm2d(128)
        self.pool3 = nn.MaxPool2d(kernel_size=5)

        self.dropout = nn.Dropout2d(0.3)
        
        # linear layers, 25x25
        self.fc1 = nn.Linear(128 * 10 * 10, 512)
        self.fc2 = nn.Linear(512, 29)
        
        self.flatten = nn.Flatten()
        
        
    def forward(self, x):
        x = F.relu(self.bn1a(self.conv1a(x)))
        x = F.relu(self.bn1b(self.conv1b(x)))
        x = self.pool1(x)
        
        x = F.relu(self.bn2a(self.conv2a(x)))
        x = F.relu(self.bn2b(self.conv2b(x)))
        x = self.pool2(x)
        
        x = F.relu(self.bn3a(self.conv3a(x)))
        x = F.relu(self.bn3b(self.conv3b(x)))
        x = self.pool3(x)
        
        x = self.dropout(x)
        
        x = self.flatten(x)
        
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        
        return x

### Regular Trained Model

In [None]:
criterion = nn.CrossEntropyLoss()
model = BasicCNN()
model = model.to(device)

In [None]:
loss_test = 0.0
acc_test = 0.0
# load the best model
model.load_state_dict(torch.load('best_models/best_model_baseline.pth', map_location=torch.device('cpu')))
#model.load_state_dict(torch.load('best_models/best_model_baseline.pth'))
model = model.to(device)

model.eval()

for inputs, labels in loader_test:
    inputs, labels = inputs.to(device), labels.to(device)
    
    # disable gradient computation
    with torch.no_grad():
        
        # forward
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        acc = torch.sum(torch.eq(torch.max(outputs, axis=1).indices, labels))
        
        # add up loss and acc
        loss_test += loss.item()
        acc_test += acc.item()

# get average loss and acc
loss_test_avg = loss_test / len(loader_test)
acc_test_avg = acc_test / len(loader_test.dataset)

print(f'test loss: {loss_test_avg:.2f}, test acc: {acc_test_avg:.2f}')

### Trained on Skeleton Model

In [None]:
criterion_mp = nn.CrossEntropyLoss()
model_mp = BasicCNN()
model_mp = model_mp.to(device)

In [None]:
# test
loss_test_mp = 0.0
acc_test_mp = 0.0
# load the best model
model.load_state_dict(torch.load('best_models/best_model_skeleton.pth', map_location=torch.device('cpu')))
#model_mp.load_state_dict(torch.load('best_models/best_model_skeleton.pth'))
model_mp = model_mp.to(device)

model_mp.eval()

for inputs, labels in loader_test_mp:
    inputs, labels = inputs.to(device), labels.to(device)
    
    # disable gradient computation
    with torch.no_grad():
        
        # forward
        outputs = model_mp(inputs)
        loss = criterion_mp(outputs, labels)
        acc = torch.sum(torch.eq(torch.max(outputs, axis=1).indices, labels))
        
        # add up loss and acc
        loss_test_mp += loss.item()
        acc_test_mp += acc.item()

# get average loss and acc
loss_test_avg_mp = loss_test_mp / len(loader_test_mp)
acc_test_avg_mp = acc_test_mp / len(loader_test_mp.dataset)

print(f'test loss: {loss_test_avg_mp:.2f}, test acc: {acc_test_avg_mp:.2f}')

### Confusion Matrix

In [None]:
# plot heatmap of confusion matrix
num_classes = 29
classes = [get_class(loader_test, i) for i in range(num_classes)]
confusion_matrix = np.zeros((num_classes, num_classes))
count = np.zeros((num_classes))
for inputs, labels in loader_test:
    inputs, labels = inputs.to(device), labels.to(device)
    # disable gradient computation
    with torch.no_grad():
        # forward
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)

        # prediction and create confusion matrix
        for true, pred in zip(labels, preds):
            confusion_matrix[true, pred] += 1
            count[true.long()] += 1
            
plt.figure(figsize=[12,10])
# create dataframe to hold the matrix
df = pd.DataFrame(100 * confusion_matrix / count)
# create heatmap
ax = sn.heatmap(df, vmin=0, vmax=100, cmap='turbo', annot=True, fmt='.2f', annot_kws={'size':6}, linewidths=0.5, xticklabels=classes, yticklabels=classes)
ax.set_xlabel('True')
ax.set_ylabel('Prediction')
ax.set_title('Confusion Matrix for Baseline Model')
plt.show()

In [None]:
# plot heatmap of confusion matrix for skeleton model
num_classes = 29
classes_mp = [get_class(loader_test_mp, i) for i in range(num_classes)]
confusion_matrix_mp = np.zeros((num_classes, num_classes))
count_mp = np.zeros((num_classes))
for inputs, labels in loader_test_mp:
    inputs, labels = inputs.to(device), labels.to(device)
    # disable gradient computation
    with torch.no_grad():
        # forward
        outputs = model_mp(inputs)
        _, preds = torch.max(outputs, 1)

        # prediction and create confusion matrix
        for true, pred in zip(labels, preds):
            confusion_matrix_mp[true, pred] += 1
            count_mp[true.long()] += 1
            
plt.figure(figsize=[12,10])
# create dataframe to hold the matrix
df_mp = pd.DataFrame(100 * confusion_matrix_mp / count_mp)
# create heatmap
ax = sn.heatmap(df_mp, vmin=0, vmax=100, cmap='turbo', annot=True, fmt='.2f', annot_kws={'size':6}, linewidths=0.5, xticklabels=classes, yticklabels=classes)
ax.set_xlabel('True')
ax.set_ylabel('Prediction')
ax.set_title('Confusion Matrix for Skeleton Model')
plt.show()

### Precision and Recall

In [None]:
# calculate precision and recall for baseline model
matrix = np.array(df)
# calculate TP, FP, FN and store in arrays
TP = np.diag(matrix)
FP = np.sum(matrix, axis=0) - TP
FN = np.sum(matrix, axis=1) - TP
# calculate precision and recall
precision = TP / (TP + FP)
recall = TP / (TP + FN)
print('Precision and Recall for each class of Baseline Model')
for i in range(num_classes):
    print(f'class {i+1}: precision = {precision[i]:.2f}, recall = {recall[i]:.2f}')

lowest_precision_ind = np.argmin(precision)
lowest_precision_label = get_class(loader_test, lowest_precision_ind)
lowest_recall_ind = np.argmin(recall)
lowest_recall_label = get_class(loader_test, lowest_recall_ind)

print(f'Lowest precision is {precision[lowest_precision_ind]} on class {lowest_precision_ind+1}, label {lowest_precision_label}')
print(f'Lowest recall is {recall[lowest_recall_ind]} on class {lowest_recall_ind+1}, label {lowest_recall_label}')


In [None]:
# calculate precision and recall for skeleton model
matrix_mp = np.array(df_mp)
# calculate TP, FP, FN and store in arrays
TP_mp = np.diag(matrix_mp)
FP_mp = np.sum(matrix_mp, axis=0) - TP_mp
FN_mp = np.sum(matrix_mp, axis=1) - TP_mp
# calculate precision and recall
precision_mp = TP_mp / (TP_mp + FP_mp)
recall_mp = TP_mp / (TP_mp + FN_mp)
print('Precision and Recall for each class of Skeleton Model')
for i in range(num_classes):
    print(f'class {i+1}: precision = {precision_mp[i]:.2f}, recall = {recall_mp[i]:.2f}')
    
lowest_precision_ind_mp = np.argmin(precision_mp)
lowest_precision_label_mp = get_class(loader_test_mp, lowest_precision_ind_mp)
lowest_recall_ind_mp = np.argmin(recall_mp)
lowest_recall_label_mp = get_class(loader_test_mp, lowest_recall_ind_mp)

print(f'Lowest precision is {precision_mp[lowest_precision_ind_mp]} on class {lowest_precision_ind_mp+1}, label {lowest_precision_label_mp}')
print(f'Lowest recall is {recall_mp[lowest_recall_ind_mp]} on class {lowest_recall_ind_mp+1}, label {lowest_recall_label_mp}')

In [None]:
# For Baseline Model
# get true label and predicted labels as one-hot
y_test = []
y_score = []
model.eval()
for inputs, labels in loader_test:
    inputs, labels = inputs.to(device), labels.to(device)
    with torch.no_grad():
        outputs = model(inputs)
        preds = torch.softmax(outputs, dim=1)
        y_score.append(preds)
        y_test.append(labels)
# Concatenate all the collected data
y_score = torch.cat(y_score).cpu().numpy()
y_test = torch.cat(y_test).cpu().numpy()
y_test = [get_class(loader_test, i) for i in y_test]
y_test = label_binarize(y_test, classes=classes)

In [None]:
precision = dict()
recall = dict()
average_precision = dict()
for i in range(num_classes):
    precision[i], recall[i], _ = precision_recall_curve(y_test[:, i], y_score[:, i])
    average_precision[i] = average_precision_score(y_test[:, i], y_score[:, i])

In [None]:
_, ax = plt.subplots(figsize=(10, 10))
for i in range(num_classes):
    display = PrecisionRecallDisplay(
        recall=recall[i],
        precision=precision[i],
        average_precision=average_precision[i])
    display.plot(ax=ax, name=f'class {i+1}')
ax.set_xlim([0.0, 1])
ax.set_ylim([0.0, 1.05])
ax.legend()
ax.set_title('Precision-Recall Curve for Baseline Model')
plt.show()

In [None]:
# for Skeleton Model
# get true label and predicted labels as one-hot
y_test_mp = []
y_score_mp = []
model_mp.eval()
for inputs, labels in loader_test_mp:
    inputs, labels = inputs.to(device), labels.to(device)
    with torch.no_grad():
        outputs = model_mp(inputs)
        preds = torch.softmax(outputs, dim=1)
        y_score_mp.append(preds)
        y_test_mp.append(labels)
# Concatenate all the collected data
y_score_mp = torch.cat(y_score_mp).cpu().numpy()
y_test_mp = torch.cat(y_test_mp).cpu().numpy()
y_test_mp = [get_class(loader_test_mp, i) for i in y_test_mp]
y_test_mp = label_binarize(y_test_mp, classes=classes)

In [None]:
precision_mp = dict()
recall_mp = dict()
average_precision_mp = dict()
for i in range(num_classes):
    precision_mp[i], recall_mp[i], _ = precision_recall_curve(y_test_mp[:, i], y_score_mp[:, i])
    average_precision_mp[i] = average_precision_score(y_test_mp[:, i], y_score_mp[:, i])

In [None]:
_, ax = plt.subplots(figsize=(10, 10))
for i in range(num_classes):
    display = PrecisionRecallDisplay(
        recall=recall_mp[i],
        precision=precision_mp[i],
        average_precision=average_precision_mp[i])
    display.plot(ax=ax, name=f'class {i+1}')
ax.set_xlim([0.0, 1])
ax.set_ylim([0.0, 1.05])
ax.legend()
ax.set_title('Precision-Recall Curve for Skeleton Model')
plt.show()