## **0 - Introduction and Install Dependencies** (_if needed_)

In [42]:
#%pip install -U argparse
#%pip install -U tqdm
#%pip install -U scikit-learn
#%pip install -U opencv-python
#%pip install -U matplotlib
#%pip install -U tensorflow
#%pip install -U pandas
#%pip install -U numpy

## **1 - Load Libraries**

In [43]:
# Load dependencies
from tensorflow.keras.applications import ResNet50
from tensorflow.keras import Input, Model # type: ignore
from tensorflow.keras.layers import AveragePooling2D, Flatten, Dense, Dropout, Add # type: ignore
from tensorflow.keras import layers # type: ignore
import tensorflow as tf
from tensorflow.keras import layers # type: ignore
import os
import argparse
from tensorflow.keras.models import Sequential, Model # type: ignore
import numpy as np
import cv2
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import pandas as pd
import math
import random

## **2 - Data Loading Utilities**

In [44]:
class ASLDataPaths():
    '''
    fetchASLDataPaths is a class that fetches the paths of the ASL dataset from a directory. The rationale behind such a class
    is the fact that our dataset is huge (relatively speaking), and we can not afford to load the entire dataset of images into memory.
    Rather, it might be a better idea to load the paths of the images, and then load the images in batches as we train our model. 
    '''

    def __init__(self, data_dir: str):

        # Check if the data directory exists
        if type(data_dir) != str or not os.path.exists(data_dir):
            raise FileNotFoundError(f"The directory {data_dir} does not exist.")
        self.data_dir = data_dir

    def fetch_paths(self):
        X_paths = []
        y = []

        # Walk over the data directory and fetch the paths of all images, label in the dataset
        for root, _, files in os.walk(self.data_dir):
            for file in files:
                X_paths.append(os.path.join(root, file))
                y.append(os.path.basename(root))
        
        X_paths = np.array(X_paths)
        y = np.array(y)
        return X_paths, y

In [45]:
# The ASLBatchLoader class is a custom data loader following the concept of this documentation code: 
# https://www.tensorflow.org/api_docs/python/tf/keras/utils/PyDataset. Refer to this documentation for more information
# and context on how to implement a custom data loader in TensorFlow.
class ASLBatchLoader(tf.keras.utils.PyDataset):

    def __init__(self, 
                 X_set: np.array, 
                 y_set: np.array,
                 batch_size: int = 32, 
                 transform = None):
        '''
        The ASLBatchLoader class is a custom data loader that loads the ASL dataset in batches.
        
        Parameters:
            X_set: np.array - A numpy array containing the paths of the images and 
            y_set: np.array - their corresponding labels.
            batch_size: int - The size of the batch that we want to load the data in.
        '''
        self.X_set = X_set
        self.y_set = y_set
        self.batch_size = batch_size
        self.transform = transform
        self.mapping = {'A': 0, 'B': 1, 'C': 2, 'D': 3, 'E': 4, 'F': 5, 'G': 6, 'H': 7, 'I': 8, 'J': 9, 'K': 10, 'L': 11, 'M': 12, 'N': 13, 'O': 14, 'P': 15, 'Q': 16, 'R': 17, 'S': 18, 'T': 19, 'U': 20, 'V': 21, 'W': 22, 'X': 23, 'Y': 24, 'Z': 25, 'del': 26, 'nothing': 27, 'space': 28}

    def __len__(self):
        '''
        This function returns the number of batches that we can load from the dataset.
        
        Returns: int - The number of batches that we can load from the dataset.
        '''
        return math.ceil(len(self.X_set) / self.batch_size)

    def __getitem__(self, index):
        '''
        This function loads a batch of data from the dataset.

        Parameters:
            index: int - The index of batch that we want to load from the dataset.

        Returns:
            X_batch: np.array - A numpy array containing the images of the batch.
            y_batch: np.array - A numpy array containing labels of the batch.
        '''
        # We specify the start of our batch
        batch_start = index * self.batch_size

        # If the batch end is greater than the length of the data directory, we set the batch end to the length of the data directory
        batch_end = min(batch_start + self.batch_size, len(self.X_set))

        # These are the paths that we immediately work with in this iteration of the batching process
        X_path_batch = self.X_set[batch_start:batch_end]
        y_batch = self.y_set[batch_start:batch_end]

        # We convert the labels to their corresponding indices
        y_batch = np.array([self.mapping[label] for label in y_batch])
        y_batch_encoded = tf.one_hot(y_batch, 29)

        # Load the images and labels from the paths
        # If a transformation is specified, we apply it to the images
        # If no transformation is specified, we simply load the images
        # A transformation is typically something like normalization, resizing, etc.
        X_batch = np.array([cv2.imread(file) for file in X_path_batch])
        if self.transform is not None:
            X_batch = self.transform(X_batch)

        return X_batch, y_batch_encoded
    
    def __iter__(self):
        '''
        This method returns an iterator for the batches.
        
        Yields: batch - A batch of data from the dataset.
        '''
        for i in range(len(self)):
            yield self[i]

In [46]:
# ChatGPT was used to generate these docstring. No need to do redundant work.
def split_data(data, test_size=0.2, val_size=0.2, random_state=42):
    '''
    Split the data into training, validation, and test sets.
    
    Parameters:
        data: contains X, y as np.array
        test_size: float - The size of the test set.
        val_size: float - The size of the validation set.
        random_state: int - The random state for reproducibility.
        
    Returns:
        tuple: a tuple of np.arrays - train_data, val_data, test_data
    '''
    train_data, test_data = train_test_split(data, test_size=test_size, random_state=random_state)
    train_data, val_data = train_test_split(train_data, test_size=val_size, random_state=random_state)
    return (train_data, val_data, test_data)

## **3 - Tansforms**

In [47]:
# Function to convert an image to grayscale. Used by the transform pipeline in Lambda layer.
def grayscale(img):
    return tf.image.rgb_to_grayscale(img)

## **4 - Hyperparameters**

In [48]:
# data directory
data_dir = './data/asl_alphabet_train/asl_alphabet_train/'

# Hyperparameters
batchSize = 32
epochs = 10
learning_rate = 0.001
weight_decay = 0
momentum = 0.8
optimizer = 'SGD'
loss = 'categorical_crossentropy'
metrics = ['accuracy', 'precision', 'recall', 'f1_score']
from_logits = False
early_stopping = None
val_split = 0.2
test_split = 0.2
model = 'resnet' # vgg

#TODO: implement
pretrain = False

## **5 - Our Models**

In [49]:
def ResNet():
    # ResNet50 base model with pre-trained weights
    rsntBase = ResNet50(weights='imagenet', include_top=False, input_tensor=Input(shape=(224, 224, 3)))

    num_classes=29

    # Additional layers
    model = rsntBase.output
    model = AveragePooling2D(pool_size=(7,7))(model)
    model = Flatten(name="flatten")(model)
    model = Dense(1024,activation='relu')(model)
    model = Dropout(0.5)(model)
    model = Dense(1024,activation='relu')(model)
    model = Dropout(0.5)(model)
    model = Dense(num_classes, activation='softmax')(model)

    # Create the final model
    finalModel = Model(inputs=rsntBase.input, outputs=model)

    # Freeze layers except for the last block
    for layer in finalModel.layers:
        if not layer.name.startswith('conv5_'):
            layer.trainable = False

    return finalModel

resnet = ResNet()

In [50]:
def OptimizerFactory(optimizer: str, learning_rate: float, momentum: float, weight_decay: float):
    if optimizer == 'adam':
        optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
    elif optimizer == 'SGD':
        optimizer = tf.keras.optimizers.SGD(learning_rate=learning_rate, momentum=momentum, weight_decay=weight_decay)
    else:
        raise ValueError(f"Invalid optimizer: {optimizer}")
    return optimizer

In [51]:
def LossFactory(loss: str, from_logits: bool):
    if loss == 'categorical_crossentropy':
        loss = tf.keras.losses.CategoricalCrossentropy(from_logits=from_logits)
    elif loss == 'sparse_categorical_crossentropy':
        loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=from_logits)
    else:
        raise ValueError(f"Invalid loss function: {loss}")
    return loss

In [52]:
def ModelFactory(model: str):
    if model == 'resnet':
        model = resnet
    elif model == 'vgg':
        NotImplementedError("VGG not implemented yet...") # TODO: NOT IMPLEMENTED YET
    else:
        raise ValueError(f"Invalid model: {model}")
    return model

## **6 - Preprocess Data**

Start up our transform pipeline. Feel free to modify this.

In [53]:
# NOTE: FEEL FREE TO MODIFY TRANSFORMATIONS AS NEEDED
transform = tf.keras.Sequential([
                                layers.Resizing(224, 224),
                                layers.Rescaling(1./255),
                                layers.RandomFlip("horizontal"),
                                layers.RandomRotation(0.2),
                                ])

Start up our data loader.

In [54]:
# Load the data paths
X_path, y = ASLDataPaths(data_dir = data_dir).fetch_paths()
X_train_path, X_test_path, y_train, y_test = train_test_split(X_path, y, test_size=0.1, random_state=42, shuffle=True)
X_train_path, X_val_path, y_train, y_val = train_test_split(X_train_path, y_train, test_size=0.2, random_state=42, shuffle=True)

# Load the data into our batchloader
train_batch_loader = ASLBatchLoader(X_set = X_train_path, y_set = y_train, batch_size=batchSize, transform = transform)
val_batch_loader = ASLBatchLoader(X_set = X_val_path, y_set = y_val, batch_size=batchSize, transform = transform)
test_batch_loader = ASLBatchLoader(X_set = X_test_path, y_set = y_test, batch_size=batchSize, transform = transform)

Fetch our optimizer, loss and model.

In [55]:
optimizer = OptimizerFactory(optimizer, learning_rate, momentum, weight_decay)
loss = LossFactory(loss, from_logits)
model = ModelFactory(model)

In [56]:
model.compile(optimizer = optimizer, loss = loss, metrics = metrics)
#checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath = os.path.join(os.getcwd(), 'resnet.model.h5'), save_best_only = True, verbose = 1)
#if early_stopping is not None:
#    early_stopping = tf.keras.callbacks.EarlyStopping(monitor = 'val_loss', patience = early_stopping, verbose = 1)

In [57]:
model.fit(train_batch_loader, validation_data = val_batch_loader, epochs = epochs, verbose = 1)

Epoch 1/10


  self._warn_if_super_not_called()


[1m 400/1958[0m [32m━━━━[0m[37m━━━━━━━━━━━━━━━━[0m [1m34:30[0m 1s/step - accuracy: 0.0420 - f1_score: 0.0359 - loss: 3.4313 - precision: 0.0000e+00 - recall: 0.0000e+00

KeyboardInterrupt: 