All members contributed equally.

# Random Forest

In [1]:
import warnings
warnings.filterwarnings("ignore")

In [2]:
# loading in data
import pandas as pd
import numpy as np
import os

# Establishing folder path and empty data frame
folder = "asl-csv-data/book-drink-csv"
combined = pd.DataFrame()

# iterating through files and combining data into one pandas frame
for filename in os.listdir(folder):
    file_path = os.path.join(folder, filename)
    df = pd.read_csv(file_path)
    # adding sign column as ID
    df['ID'] = filename 
    # combining data frames together
    combined = pd.concat([combined, df]) 
    
# adding ID
cols = ['ID'] + [col for col in combined.columns if col != 'ID']
combined = combined[cols]
combined = combined.iloc[:,:77] # not including below the waist ifnormation
combined.head() 

Unnamed: 0,ID,frame_number,nose_x,nose_y,nose_z,left_eye_inner_x,left_eye_inner_y,left_eye_inner_z,left_eye_x,left_eye_y,...,left_thumb_z,right_thumb_x,right_thumb_y,right_thumb_z,left_hip_x,left_hip_y,left_hip_z,right_hip_x,right_hip_y,right_hip_z
0,0018695357180107397-DRINK 2.csv,0,0.525554,0.289397,-0.582463,0.548919,0.232308,-0.532373,0.562274,0.232486,...,-0.206225,0.247369,1.558319,-0.511895,0.632642,1.468299,0.010406,0.398762,1.476173,-0.00709
1,0018695357180107397-DRINK 2.csv,1,0.535128,0.26047,-0.581579,0.556137,0.200653,-0.53723,0.570259,0.203041,...,-0.347507,0.288129,1.508925,-0.537757,0.643821,1.453811,-0.005935,0.422051,1.465328,0.009119
2,0018695357180107397-DRINK 2.csv,2,0.539804,0.258574,-0.632365,0.562304,0.199046,-0.590813,0.574522,0.200578,...,-0.406069,0.274099,1.477691,-0.609302,0.641027,1.472204,-0.005464,0.416517,1.47713,0.00894
3,0018695357180107397-DRINK 2.csv,3,0.537268,0.25715,-0.608182,0.558576,0.200909,-0.566078,0.571905,0.201947,...,-0.410491,0.281217,1.54346,-0.559211,0.632977,1.483827,-0.008083,0.407901,1.485082,0.011341
4,0018695357180107397-DRINK 2.csv,4,0.537328,0.259728,-0.714443,0.55818,0.204066,-0.661015,0.571754,0.204858,...,-0.420472,0.274881,1.513437,-0.781673,0.636226,1.497416,-0.009797,0.411186,1.496125,0.013032


In [3]:
# Normalizing data
id_series = combined['ID']
frame_number_series = combined['frame_number']
combined_dropped = combined.drop(['ID', 'frame_number'], axis=1)
normalized = (combined_dropped - combined_dropped.mean()) / combined_dropped.std()
normalized['ID'] = id_series
normalized['frame_number'] = frame_number_series
cols = ['ID', 'frame_number'] + [col for col in normalized.columns if col not in ['ID', 'frame_number']]
normalized = normalized[cols]
normalized.head()

Unnamed: 0,ID,frame_number,nose_x,nose_y,nose_z,left_eye_inner_x,left_eye_inner_y,left_eye_inner_z,left_eye_x,left_eye_y,...,left_thumb_z,right_thumb_x,right_thumb_y,right_thumb_z,left_hip_x,left_hip_y,left_hip_z,right_hip_x,right_hip_y,right_hip_z
0,0018695357180107397-DRINK 2.csv,0,-0.216466,-0.855473,1.351659,-0.264594,-0.843323,1.337476,-0.33669,-0.848154,...,1.243787,-0.320977,1.195948,1.408483,-0.894719,0.653317,1.016693,0.339605,0.732143,-1.029912
1,0018695357180107397-DRINK 2.csv,1,0.014752,-1.39762,1.354661,-0.092402,-1.427792,1.320892,-0.145114,-1.392753,...,0.928346,-0.045281,1.074682,1.351138,-0.599951,0.535466,0.640497,0.898149,0.644621,-0.660074
2,0018695357180107397-DRINK 2.csv,2,0.1277,-1.43315,1.182086,0.054744,-1.457452,1.137906,-0.042835,-1.438312,...,0.797595,-0.140177,0.998002,1.192498,-0.673631,0.685083,0.651345,0.765433,0.739864,-0.664163
3,0018695357180107397-DRINK 2.csv,3,0.066434,-1.459838,1.264262,-0.03421,-1.423068,1.222374,-0.105619,-1.412975,...,0.787722,-0.092028,1.159468,1.303566,-0.885885,0.779627,0.59106,0.55879,0.804034,-0.609383
4,0018695357180107397-DRINK 2.csv,4,0.067891,-1.411527,0.903173,-0.04365,-1.364773,0.898168,-0.109262,-1.359144,...,0.765437,-0.134885,1.085761,0.810294,-0.800232,0.890159,0.551605,0.637567,0.893148,-0.570813


In [4]:
import numpy as np
import pandas as pd

# Function to get the curvature of body parts
def get_curvature(df, x_part, y_part, z_part, dt):
    # find velocity components - 1st derivates
    dx = df[x_part].diff() / dt
    dy = df[y_part].diff() / dt
    dz = df[z_part].diff() / dt

    # find accelteration components - 2nd derivatives
    ddx = dx.diff() / dt
    ddy = dy.diff() / dt
    ddz = dz.diff() / dt

    # find magnitude with cross product of velocity and acceleration
    num = np.sqrt((dy * ddz - dz * ddy)**2 + (dz * ddx - dx * ddz)**2 + (dx * ddy - dy * ddx)**2)

    # find denominator -cube of the magnitude of the velocity
    den = (dx**2 + dy**2 + dz**2)**1.5

    # avoid division by zero
    with np.errstate(divide='ignore', invalid='ignore'):
        curvature = num / den
        curvature = curvature.replace([np.inf, -np.inf, np.nan], 0)

    return curvature

In [5]:
# Function to get the angle between the x,y,z body parts
def get_angle(pt1, pt2, pt3):
    vector1 = np.array(pt2) - np.array(pt1) # finding differences
    vector2 = np.array(pt2) - np.array(pt3)
    unit_vector1 = vector1 / np.linalg.norm(vector1)
    unit_vector2 = vector2 / np.linalg.norm(vector2)
    dot_product = np.dot(unit_vector1, unit_vector2) # calculating the dot product of the vectors
    angle = np.arccos(np.clip(dot_product, -1.0, 1.0))
    return np.degrees(angle)

In [6]:
# Feature Engineering, for each frame we calculate certain features
def derive_features(combined):
    # establising lists
    x_body_parts = ['nose_x', 'left_eye_inner_x', 'left_eye_x', 'left_eye_outer_x', 'right_eye_inner_x', 'right_eye_x', 'right_eye_outer_x',
                   'left_ear_x', 'right_ear_x', 'mouth_left_x', 'mouth_right_x', 'left_shoulder_x', 'right_shoulder_x', 'left_elbow_x',
                   'right_elbow_x', 'left_wrist_x', 'right_wrist_x', 'left_pinky_x', 'right_pinky_x', 'left_index_x', 'right_index_x',
                    'left_thumb_x', 'right_thumb_x', 'left_hip_x', 'right_hip_x']
    y_body_parts = ['nose_y', 'left_eye_inner_y', 'left_eye_y', 'left_eye_outer_y', 'right_eye_inner_y', 'right_eye_y', 'right_eye_outer_y',
                   'left_ear_y', 'right_ear_y', 'mouth_left_y', 'mouth_right_y', 'left_shoulder_y', 'right_shoulder_y', 'left_elbow_y',
                   'right_elbow_y', 'left_wrist_y', 'right_wrist_y', 'left_pinky_y', 'right_pinky_y', 'left_index_y', 'right_index_y',
                    'left_thumb_y', 'right_thumb_y', 'left_hip_y', 'right_hip_y']
    z_body_parts = ['nose_z', 'left_eye_inner_z', 'left_eye_z', 'left_eye_outer_z', 'right_eye_inner_z', 'right_eye_z', 'right_eye_outer_z',
                   'left_ear_z', 'right_ear_z', 'mouth_left_z', 'mouth_right_z', 'left_shoulder_z', 'right_shoulder_z', 'left_elbow_z',
                   'right_elbow_z', 'left_wrist_z', 'right_wrist_z', 'left_pinky_z', 'right_pinky_z', 'left_index_z', 'right_index_z',
                    'left_thumb_z', 'right_thumb_z', 'left_hip_z', 'right_hip_z']

    body_part_stats = {}

    # calculating body stats for each body postion
    for part in x_body_parts + y_body_parts + z_body_parts:
        body_part_stats[part] = {
            'mean': combined[part].mean(), 
            'std': combined[part].std(),
            'max': combined[part].max(),
            'min': combined[part].min()
        }


    # calculating body_movements for x, y, z
    velocity = {axis: {} for axis in ['x', 'y', 'z']}
    acceleration = {axis: {} for axis in ['x', 'y', 'z']}
    jerk = {axis: {} for axis in ['x', 'y', 'z']}
    total_displacement = {axis: {} for axis in ['x', 'y', 'z']}
    average_speed = {axis: {} for axis in ['x', 'y', 'z']}
    average_acceleration = {axis: {} for axis in ['x', 'y', 'z']}

    dt = 1  # assuming the frame rate is 1

    # process for x, y, and z body parts
    for axis, parts in [('x', x_body_parts), ('y', y_body_parts), ('z', z_body_parts)]:
        for part in parts:
            # calculate velocity
            velocity[axis][part] = combined[part].diff() / dt

            # calculate aceleration
            acceleration[axis][part] = velocity[axis][part].diff() / dt

            # calculate jerk
            jerk[axis][part] = acceleration[axis][part].diff() / dt

            # calculate total displacement from first to last frame
            total_displacement[axis][part] = np.sqrt((combined[part].iloc[-1] - combined[part].iloc[0])**2)

            # calculate average speed
            average_speed[axis][part] = np.mean(np.abs(velocity[axis][part]))

            # calculate average acceleration
            average_acceleration[axis][part] = np.mean(np.abs(acceleration[axis][part]))

    # distance between body parts and critical point (nose)
    distance_from_nose = {}
    base_body_parts = set(part.rsplit('_', 1)[0] for part in x_body_parts) # removing coords to get base name of body part

    # calculate the Euclidean distance from nose to each body part
    for base_part in base_body_parts:
        if 'nose' not in base_part:  # don't calculate nose from itself
            distance_from_nose[base_part] = np.sqrt(
                (combined[f'{base_part}_x'] - combined['nose_x'])**2 +
                (combined[f'{base_part}_y'] - combined['nose_y'])**2 +
                (combined[f'{base_part}_z'] - combined['nose_z'])**2
            )

    # Curvature
    curvature = {axis: {} for axis in ['x', 'y', 'z']}  

    # Calculating curvature for each body part
    for axis, parts in [('x', x_body_parts), ('y', y_body_parts), ('z', z_body_parts)]:
        for part_x, part_y, part_z in zip(x_body_parts, y_body_parts, z_body_parts):
            base_part_name = part_x.rsplit('_', 1)[0] 
            # Calculate curvature using associated x, y, z coords
            curvature[base_part_name] = get_curvature(combined, part_x, part_y, part_z, dt)
            
    # Angle calculation
    angles = {}

    # Calculate angles for wrists
    for part in ['left_wrist', 'right_wrist']:
        elbow = part.replace('wrist', 'elbow')
        hand = part.replace('wrist', 'pinky')  
        
        # Calculate wrist angles using elbow, wrist, and pinky
        combined[f'{part}_angle'] = combined.apply(lambda row: get_angle(
            [row[f'{elbow}_x'], row[f'{elbow}_y'], row[f'{elbow}_z']],
            [row[f'{part}_x'], row[f'{part}_y'], row[f'{part}_z']],
            [row[f'{hand}_x'], row[f'{hand}_y'], row[f'{hand}_z']]
        ), axis=1)

    # Calculate neck angles
    shoulder_left = 'left_shoulder'
    shoulder_right = 'right_shoulder'
    nose = 'nose'

    combined['neck_angle'] = combined.apply(lambda row: get_angle(
        [row[f'{shoulder_left}_x'], row[f'{shoulder_left}_y'], row[f'{shoulder_left}_z']],
        [row[f'{nose}_x'], row[f'{nose}_y'], row[f'{nose}_z']],
        [row[f'{shoulder_right}_x'], row[f'{shoulder_right}_y'], row[f'{shoulder_right}_z']]
    ), axis=1)
    
    
    # Finger Distances
    fingers = ['thumb', 'index', 'pinky']

    # dictionary to hold the distances
    finger_distances = {}

    for hand_prefix in ['left', 'right']:
        for i in range(len(fingers)):
            for j in range(i + 1, len(fingers)):
                finger1 = f"{hand_prefix}_{fingers[i]}"
                finger2 = f"{hand_prefix}_{fingers[j]}"

                # Calculate distances and store in a DataFrame or dictionary
                distance_key = f"{finger1}_to_{finger2}_distance"
                combined[distance_key] = np.sqrt(
                    (combined[f"{finger1}_x"] - combined[f"{finger2}_x"])**2 +
                    (combined[f"{finger1}_y"] - combined[f"{finger2}_y"])**2 +
                    (combined[f"{finger1}_z"] - combined[f"{finger2}_z"])**2
                )

    # adding features to combined dataset
    for axis in ['x', 'y', 'z']:
        for part in combined.columns:
            if part.endswith(axis): 
                base_part_name = part.rsplit('_', 1)[0]

                # Add velocity, acceleration, jerk
                combined[f'velocity_{part}'] = velocity[axis][part]
                combined[f'acceleration_{part}'] = acceleration[axis][part]
                combined[f'jerk_{part}'] = jerk[axis][part]

                # Add total displacement, average speed, average acceleration
                combined[f'total_displacement_{part}'] = total_displacement[axis][part]
                combined[f'average_speed_{part}'] = average_speed[axis][part]
                combined[f'average_acceleration_{part}'] = average_acceleration[axis][part]

    # Add curvature for each body part
    for base_part_name, curv_values in curvature.items():
        combined[f'curvature_{base_part_name}'] = curv_values
    
    # Getting anlge for body parts
    for angle_name, angle_values in angles.items():
        combined[angle_name] = angle_values

    # Add distance from nose for each body part
    for base_part_name, distance_values in distance_from_nose.items():
        combined[f'distance_from_nose_to_{base_part_name}'] = distance_values

    features = combined
    return features

In [7]:
# Getting features from set
features = derive_features(normalized)
features.head()

Unnamed: 0,ID,frame_number,nose_x,nose_y,nose_z,left_eye_inner_x,left_eye_inner_y,left_eye_inner_z,left_eye_x,left_eye_y,...,distance_from_nose_to_left_elbow,distance_from_nose_to_right_eye_outer,distance_from_nose_to_left_thumb,distance_from_nose_to_left_eye_outer,distance_from_nose_to_right_pinky,distance_from_nose_to_right_eye_inner,distance_from_nose_to_right_thumb,distance_from_nose_to_left_shoulder,distance_from_nose_to_right_wrist,distance_from_nose_to_right_ear
0,0018695357180107397-DRINK 2.csv,0,-0.216466,-0.855473,1.351659,-0.264594,-0.843323,1.337476,-0.33669,-0.848154,...,0.899128,0.298409,1.770994,0.143679,1.979881,0.156072,2.054867,0.894902,1.990414,0.560535
1,0018695357180107397-DRINK 2.csv,1,0.014752,-1.39762,1.354661,-0.092402,-1.427792,1.320892,-0.145114,-1.392753,...,1.876648,0.26446,2.181228,0.225592,2.434457,0.18139,2.473033,1.812124,2.468115,0.609788
2,0018695357180107397-DRINK 2.csv,2,0.1277,-1.43315,1.182086,0.054744,-1.457452,1.137906,-0.042835,-1.438312,...,2.119602,0.188437,2.404964,0.212467,2.417043,0.157489,2.445888,1.970208,2.440482,0.556988
3,0018695357180107397-DRINK 2.csv,3,0.066434,-1.459838,1.264262,-0.03421,-1.423068,1.222374,-0.105619,-1.412975,...,2.083462,0.165238,2.424921,0.259245,2.5799,0.131587,2.624389,2.043944,2.615873,0.585023
4,0018695357180107397-DRINK 2.csv,4,0.067891,-1.411527,0.903173,-0.04365,-1.364773,0.898168,-0.109262,-1.359144,...,2.053385,0.166861,2.450673,0.251082,2.457784,0.122793,2.507228,1.853849,2.492666,0.416265


In [8]:
# Function to normlaize features
def normalize_features(features):
    # droping empty columns
    unlabeled_features = features.drop(['curvature_x', 'curvature_y', 'curvature_z'], axis=1)

    # drop first three frames of each video (to get rid of NaNs)
    unlabeled_features = unlabeled_features[~unlabeled_features['frame_number'].isin([0, 1, 2])]

    # storing and removing id from frame
    ids = unlabeled_features['ID']
    unlabeled_features.drop('ID', axis=1, inplace=True)

    # making frame the index
    unlabeled_features.set_index('frame_number', inplace=True)

    # convert data to numerical vales
    for col in unlabeled_features.columns:
        unlabeled_features[col] = pd.to_numeric(unlabeled_features[col], errors='coerce')

    # Drop any columns where conversion to numeric introduced NaNs
    #unlabeled_features.dropna(axis=1, inplace=True)
    unlabeled_features.fillna(method='bfill', inplace=True) 

    # Perform normalization
    #unlabeled_features = (unlabeled_features - unlabeled_features.mean()) / unlabeled_features.std()

    # add ID back
    unlabeled_features['ID'] = ids.values  
    return unlabeled_features

In [9]:
# Normalizing features
unlabeled_features = normalize_features(features)

In [10]:
# Assigning labels to features --> only book and drink since we have the most videos
# # Update 'ID' to 'teacher' for all entries containing the substring 'teacher', case-insensitively
# unlabeled_features.loc[unlabeled_features['ID'].str.contains('teacher', case=False, na=False), 'ID'] = 'teacher'

# # Update 'ID' to 'orange' for all entries containing the substring 'orange', case-insensitively
# unlabeled_features.loc[unlabeled_features['ID'].str.contains('orange', case=False, na=False), 'ID'] = 'orange'

# Update 'ID' to 'orange' for all entries containing the substring 'orange', case-insensitively
unlabeled_features.loc[unlabeled_features['ID'].str.contains('drink', case=False, na=False), 'ID'] = 'drink'

# Update 'ID' to 'orange' for all entries containing the substring 'orange', case-insensitively
unlabeled_features.loc[unlabeled_features['ID'].str.contains('book', case=False, na=False), 'ID'] = 'book'

# # Update 'ID' to 'orange' for all entries containing the substring 'orange', case-insensitively
# unlabeled_features.loc[unlabeled_features['ID'].str.contains('repeat', case=False, na=False), 'ID'] = 'repeat'

# # Update 'ID' to 'orange' for all entries containing the substring 'orange', case-insensitively
# unlabeled_features.loc[unlabeled_features['ID'].str.contains('eat', case=False, na=False), 'ID'] = 'eat'
# unlabeled_features.head()

In [11]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV, StratifiedKFold
from imblearn.pipeline import Pipeline as ImPipeline
from sklearn.preprocessing import StandardScaler
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.model_selection import train_test_split, StratifiedKFold, GridSearchCV
from sklearn.metrics import classification_report, confusion_matrix
from imblearn.over_sampling import SMOTE
import numpy as np
import pandas as pd
import joblib

# Assuming 'unlabeled_features' and 'ID' are defined
features_set = unlabeled_features.drop('ID', axis=1)
#features_set = features_set.iloc[:, 75:]  # adjust this for which features to include
labels = unlabeled_features['ID']

# Split data
X_train, X_test, y_train, y_test = train_test_split(features_set, labels, test_size=0.3, random_state=42)

# Define cross-validator
cv = StratifiedKFold(n_splits=7, shuffle=True, random_state=42) # 7 splits

# Updated pipeline with Random Forest
pipeline = ImPipeline([
    ('scaler', StandardScaler()),
    ('smote', SMOTE(random_state=42)), #SMOTE
    ('lda', LDA()), #LDA
    ('random_forest', RandomForestClassifier(random_state=42))
])

# Parameters grid for Random Forest
param_grid = {
    'random_forest__n_estimators': [100, 200, 300],  # Number of trees
    'random_forest__max_depth': [None, 10, 20, 30],  # Maximum depth of trees
    'random_forest__min_samples_split': [2, 5, 10]  # Minimum number of samples required to split an internal node
}

# Grid search with cross-validation
grid_search = GridSearchCV(pipeline, param_grid, cv=cv, scoring='accuracy')
grid_search.fit(X_train, y_train)

# Best model and its score
print("Best parameters:", grid_search.best_params_)
print("Best cross-validation score: {:.2f}".format(grid_search.best_score_))

# Save the best model
joblib.dump(grid_search.best_estimator_, 'model.pkl')
print("Best model trained and saved to 'model.pkl'")

Best parameters: {'random_forest__max_depth': 10, 'random_forest__min_samples_split': 10, 'random_forest__n_estimators': 100}
Best cross-validation score: 0.86
Best model trained and saved to 'model.pkl'


In [12]:
import pandas as pd
import joblib

def predict_sign(filepath, model_path='model.pkl'):
    # Load the data
    df = pd.read_csv(filepath)
    df = df.iloc[:, :76]  # Only including coordinate information from above the hips
    df = derive_features(df) 
    df = df[~df['frame_number'].isin([0, 1, 2])]
    df = df.drop('frame_number', axis=1) # remove frame number 
    #df = df.iloc[:, 76:]  # Adjust for which features to include, must match training data input
    
    # drop first three frames of each video (to get rid of NaNs)
    df = df.drop(['curvature_x', 'curvature_y', 'curvature_z'], axis=1, errors='ignore')

    # Convert all data to numeric to prevent TypeError during normalization
    for col in df.columns:
        df[col] = pd.to_numeric(df[col], errors='coerce')

    df.fillna(method='bfill', inplace=True) # fill nas

    # Load the saved model pipeline
    pipe = joblib.load(model_path)

    # Making a prediction using the loaded pipeline
    predicted_label = pipe.predict(df)
    
    # Returning a list of all predictions for each frame
    return predicted_label

In [13]:
# Taking into account the frames, adding them into one preidction
from collections import Counter

def most_common_label(predicted_labels):
    # count each label
    label_counts = Counter(predicted_labels)
    
    # find the most common label and its count
    most_common, count = label_counts.most_common(1)[0]
    
    # calculate the percentage of the most common label
    total = len(predicted_labels)
    percentage = (count / total) * 100
    
    return most_common, percentage

In [16]:
# Input for the model
predicted_labels = predict_sign('asl-csv-data/book_drink_test/book-4.csv', 'model.pkl') # edit to reflect path of training videos
sign, perc = most_common_label(predicted_labels)
print("The predicted sign is:", sign)
print("The certainty is:", perc, "%")

The predicted sign is: drink
The certainty is: 97.0059880239521 %
