In [1]:
# install mediapipe if it hasn't been installed already
!pip install -q mediapipe
!wget -q https://storage.googleapis.com/mediapipe-models/hand_landmarker/hand_landmarker/float16/1/hand_landmarker.task

# import the necessary libraries
import cv2
import mediapipe as mp
import os
import shutil
import pandas as pd

import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
from sklearn.preprocessing import LabelEncoder
import numpy as np

from sklearn.metrics import f1_score
from IPython import display

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m35.6/35.6 MB[0m [31m41.9 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
# if using colab, set to true [no other implementation of code currently]
colab = True

if colab:
    # mount to google drive from colab
    from google.colab import drive
    drive.mount('/content/drive')

    # import synthetic data
    import zipfile
    with zipfile.ZipFile('/content/drive/MyDrive/synthetic_asl_letters.zip', 'r') as zip_ref:
        zip_ref.extractall('/content/data')
        zip_ref.close()

Mounted at /content/drive


In [3]:
# process images with mediapipe hands overlay
def process_images_with_mediapipe(base_dir, new_output_base_dir, splits=['train', 'test', 'valid']):
    # initialize mediapipe hands module with specific settings
    mp_hands = mp.solutions.hands
    hands = mp_hands.Hands(static_image_mode=True, max_num_hands=2, min_detection_confidence=0.5)
    mp_drawing = mp.solutions.drawing_utils

    # process images for each split in the dataset ('train', 'test', 'valid')
    for split in splits:
        # define input and output directories for images
        input_dir = os.path.join(base_dir, split, 'images')
        output_dir = os.path.join(new_output_base_dir, split)
        # create output directory if it doesn't exist
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)

        # process each image in the input directory
        for image_name in os.listdir(input_dir):
            image_path = os.path.join(input_dir, image_name)
            image = cv2.imread(image_path)
            # continue to next image if current image is not found
            if image is None:
                continue
            # convert image to rgb color space (required by mediapipe)
            image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            # process the image using mediapipe hands
            results = hands.process(image_rgb)
            # if hand landmarks are detected, draw them on the image
            if results.multi_hand_landmarks:
                for hand_landmarks in results.multi_hand_landmarks:
                    mp_drawing.draw_landmarks(image, hand_landmarks, mp_hands.HAND_CONNECTIONS)
            # save the processed image to the output directory
            output_path = os.path.join(output_dir, image_name)
            cv2.imwrite(output_path, image)

    # release resources used by mediapipe hands
    hands.close()

# define base directory for input data and output directory for processed images
base_dir = '/content/data/synthetic_asl_letters'
new_output_base_dir = '/content/data/synthetic_asl_letters_mp'

# call the function to start processing images
process_images_with_mediapipe(base_dir, new_output_base_dir)

In [None]:
# use code to download image file dataset
!zip -r /content/data/synthetic_asl_letters_mp.zip /content/data/synthetic_asl_letters_mp
# from google.colab import files
# files.download("/content/data/synthetic_asl_letters_mp.zip")

In [5]:
# create sorted dataset for mediapipe use

# root path
dataset_path = "/content/data/synthetic_asl_letters"

# letter encoding mapping
label_to_letter = {i: chr(65 + i) for i in range(26)}

# create directories for a new sorted dataset structure if they don't already exist
sorted_dataset_path = "/content/sorted_synthetic_asl"
for split in ["train", "test", "valid"]:  # iterate over each data split
    for letter in label_to_letter.values():  # iterate over each letter in the dataset
        # ensure each directory for storing sorted images exists, creating them if necessary
        os.makedirs(os.path.join(sorted_dataset_path, split, letter), exist_ok=True)

# function to sort and move images based on their labels
def sort_and_move_images(split):
    # define the paths for images and labels within a specific split
    images_path = os.path.join(dataset_path, split, "images")
    labels_path = os.path.join(dataset_path, split, "labels")

    # process each label file in the labels directory
    for label_file in os.listdir(labels_path):
        with open(os.path.join(labels_path, label_file), "r") as f:
            # read the primary label (first number) from each label file
            primary_label = int(f.readline().split()[0])
            letter = label_to_letter[primary_label]  # map the numeric label to its corresponding letter

        # construct the image filename by replacing the label file's extension
        image_name = label_file.replace(".txt", ".jpg")
        source_path = os.path.join(images_path, image_name)  # source path of the image
        destination_folder = os.path.join(sorted_dataset_path, split, letter)  # destination folder based on label

        # copy the image from the source to the destination folder
        shutil.copy(source_path, destination_folder)

# perform for each split in data
for split in ["train", "test", "valid"]:
    sort_and_move_images(split)

In [None]:
# use code to download image file dataset
!zip -r /content/sorted_synthetic_asl.zip /content/sorted_synthetic_asl
# from google.colab import files
# files.download("/content/sorted_synthetic_asl.zip")

In [7]:
# extract csv data from images
def create_hand_landmarks_dataset(root_folder_path, dataset_split, csv_file_path):
    # initialize mediapipe hand model
    mp_hands = mp.solutions.hands.Hands(
        static_image_mode=True, max_num_hands=2, min_detection_confidence=0.5)

    # define path for the dataset split
    split_folder_path = os.path.join(root_folder_path, dataset_split)
    data = []

    # iterate over each label directory in the split
    for label in os.listdir(split_folder_path):
        letter_folder_path = os.path.join(split_folder_path, label)
        if os.path.isdir(letter_folder_path):
            # process each image in the directory
            for image_name in os.listdir(letter_folder_path):
                if image_name.lower().endswith(('.png', '.jpg', '.jpeg')):
                    image_path = os.path.join(letter_folder_path, image_name)
                    image = cv2.imread(image_path)
                    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

                    # apply mediapipe processing
                    results = mp_hands.process(image_rgb)

                    # extract hand landmarks if any are detected
                    if results.multi_hand_landmarks:
                        for hand_landmarks in results.multi_hand_landmarks:
                            if len(hand_landmarks.landmark) == 21:
                                row = {'image': image_name, 'label': label}
                                for i, landmark in enumerate(hand_landmarks.landmark):
                                    row[f'hand_{i}_x'] = landmark.x
                                    row[f'hand_{i}_y'] = landmark.y
                                    row[f'hand_{i}_z'] = landmark.z
                                data.append(row)

    # convert data into a dataframe
    df = pd.DataFrame(data)
    # save the dataframe to a csv file
    df.to_csv(csv_file_path, index=False)

    # release resources used by mediapipe
    mp_hands.close()

# set the root folder path for the dataset
root_folder_path = '/content/sorted_synthetic_asl'
# define dataset splits to process
dataset_splits = ['train', 'test', 'valid']
# process each split and save to corresponding csv files
for split in dataset_splits:
    csv_file_path = f'hand_landmarks_{split}.csv'
    create_hand_landmarks_dataset(root_folder_path, split, csv_file_path)

In [1]:
'''
since OS functions and nested for loops can be time intensive, this next section 
of code allows the ability to create/train/update the XGBoost model independently 
of running the previous code each time the notebook is opened for time purposes
'''

colab = True

if colab:
    from google.colab import drive
    drive.mount('/content/drive')

    import zipfile

    # import mediapipe hand landmark csv data for each split
    with zipfile.ZipFile('/content/drive/MyDrive/hand_landmarks_test.csv.zip', 'r') as zip_ref:
        zip_ref.extractall('/content')
        zip_ref.close()
    with zipfile.ZipFile('/content/drive/MyDrive/hand_landmarks_train.csv.zip', 'r') as zip_ref:
        zip_ref.extractall('/content')
        zip_ref.close()
    with zipfile.ZipFile('/content/drive/MyDrive/hand_landmarks_valid.csv.zip', 'r') as zip_ref:
        zip_ref.extractall('/content')
        zip_ref.close()

Mounted at /content/drive


In [2]:
# load the dataset from csv files
train = pd.read_csv('/content/hand_landmarks_train.csv', header=0)
test = pd.read_csv('/content/hand_landmarks_test.csv', header=0)
val = pd.read_csv('/content/hand_landmarks_valid.csv', header=0)

# prepare feature matrices by dropping label and image columns
X_train = train.drop(['label','image'], axis=1)
y_train = train['label']

X_test = test.drop(['label','image'], axis=1)
y_test = test['label']

X_val = val.drop(['label','image'], axis=1)
y_val = val['label']

# encode labels into integers
le = LabelEncoder()
y_train = le.fit_transform(y_train)
y_test = le.transform(y_test)
y_val = le.transform(y_val)

# function for applying min-max scaling to a pandas dataframe row
def min_max_scaling(row):
    min_value = row.min()
    max_value = row.max()
    return (row - min_value) / (max_value - min_value)

# apply normalization to feature matrices
X_train_norm = X_train.apply(min_max_scaling, axis=1)
X_test_norm = X_test.apply(min_max_scaling, axis=1)
X_val_norm = X_val.apply(min_max_scaling, axis=1)

# train a gradient boosting model using xgboost
gbm = xgb.XGBClassifier(max_depth=3, n_estimators=300, learning_rate=0.05).fit(X_train_norm, y_train)
# predict the training data (usually you'd predict on test data to evaluate the model)
predictions = gbm.predict(X_train_norm)

In [3]:
def f1_eval(y_pred, dtrain):
    """
    Custom F1 evaluation metric for multi-class classification in XGBoost.
    :param y_pred: The prediction of the model.
    :param dtrain: XGBoost DMatrix with the true labels.
    :return: Tuple (metric name, negative F1 score).
    """
    y_true = dtrain.get_label()
    # convert probabilities to the class with highest probability
    preds = np.argmax(y_pred.reshape(len(np.unique(y_true)), -1), axis=0)
    # calculate F1 score
    f1 = f1_score(y_true, preds, average='macro')  # Use 'macro' to treat all classes equally
    # return as 'negative' since XGBoost minimizes the loss
    return 'negF1', -f1

In [4]:
# create DMatrices for the train and test datasets since XGBoost is optimized for DMatrices
dtrain = xgb.DMatrix(X_train, label=y_train)
dtest = xgb.DMatrix(X_test, label=y_test)

In [5]:
# define XGBoost parameters
params = {
    'objective': 'multi:softprob',  # use 'multi:softprob' for multi-class classification to get probabilities
    'num_class': 26,  # number of unique classes
    'eval_metric': 'mlogloss',  # multi-class logloss
}

evals_result = {}

# train the model with custom evaluation metric
bst = xgb.train(params, dtrain, num_boost_round=300, evals=[(dtest, 'test'),(dtrain, 'train')],
                feval=f1_eval, evals_result=evals_result)

display.clear_output()

In [6]:
# create a dataframe with the results
df_evals_result = pd.DataFrame({
    'test_mlogloss': evals_result['test']['mlogloss'],
    'test_negF1': evals_result['test']['negF1'],
    'train_mlogloss': evals_result['train']['mlogloss'],
    'train_negF1': evals_result['train']['negF1']
})

In [7]:
# convert from negF1 with minimizing to normal F1 with maximizing
df_evals_result['test_negF1'] = 1 + df_evals_result['test_negF1']
df_evals_result['train_negF1'] = 1 + df_evals_result['train_negF1']

# rename column to reflect change
df_evals_result.rename(columns={'test_negF1': 'test_F1', 'train_negF1': 'train_F1'}, inplace=True)

In [8]:
# save to csv for analysis with other models, this was perfomed for both the normalized and unnormalized data
df_evals_result.to_csv('xgb_mp_f1_loss.csv', index=True)