In [6]:
import cv2
from itertools import combinations
import os
import sys
import time
from typing import Optional
from unicodedata import bidirectional

from imblearn.over_sampling import SMOTE
from keras import models
from keras._tf_keras.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint, TensorBoard
from keras._tf_keras.keras.metrics import MeanAbsoluteError, Accuracy, Precision, Recall, MeanSquaredError
from keras._tf_keras.keras.models import Sequential
from keras._tf_keras.keras.optimizers import Adam , RMSprop, Nadam
from keras._tf_keras.keras.preprocessing.sequence import pad_sequences 
from keras._tf_keras.keras.layers import LSTM, Dense, Dropout, Bidirectional, BatchNormalization, Masking, InputLayer
from keras._tf_keras.keras.regularizers import L1L2, L1, L2
from matplotlib import pyplot as plt
import numpy as np
import pandas as pd
import scipy
from scipy.sparse import csr_matrix
from scipy.stats import skew, kurtosis
from sklearn.calibration import LabelEncoder
from sklearn.compose import ColumnTransformer
from sklearn.decomposition import PCA
from sklearn.discriminant_analysis import StandardScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.experimental import enable_iterative_imputer
from sklearn.feature_selection import SelectFromModel
from sklearn.impute import IterativeImputer, KNNImputer, SimpleImputer
from sklearn.linear_model import Lasso
from sklearn.model_selection import TimeSeriesSplit
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler, OneHotEncoder

import cv2
import mediapipe as mp
import io
import time
from collections import Counter
from itertools import combinations
import joblib

In [7]:
model = models.load_model("gesture_model_v1.keras")
class_labels = pd.read_csv("class_labels.csv")["gesture"].tolist()
preprocessor = joblib.load("preprocess.pkl")
mp_hands = mp.solutions.hands
hands = mp_hands.Hands(static_image_mode=False, max_num_hands=2)
mp_drawing = mp.solutions.drawing_utils

pd.set_option("display.max_columns", None) # show all cols
pd.set_option("expand_frame_repr", False) 

In [8]:
def calculate_elapsed_time(df: pd.DataFrame):

    elapsed_lists = []

    for _, gesture_data in df.groupby("gesture_index"):
        avg_frame_rate = np.mean(gesture_data["frame_rate"])

        for i in gesture_data["frame"]:
            elapsed_lists.append(i / avg_frame_rate)
        
    df['elapsed_time'] = elapsed_lists

    return df

def calculate_temporal_features(df: pd.DataFrame, cols: list):
    velocity_cols = [f"velocity_{col}" for col in cols]
    acceleration_cols = [f"acceleration_{col}" for col in cols]
    jerk_cols = [f"jerk_{col}" for col in cols]
    
    for _, gesture_data in df.groupby("gesture_index"): 
        gesture_data = gesture_data.sort_values(by="frame")

        avg_frame_rate = np.mean(gesture_data["frame_rate"])
        time_diffs = gesture_data["frame"].diff().fillna(1) / avg_frame_rate
        
        velocities = gesture_data[cols].diff().div(time_diffs, axis=0).fillna(0)
        accelerations = velocities.diff().div(time_diffs, axis=0).fillna(0)
        jerks = accelerations.diff().div(time_diffs, axis=0).fillna(0)

        df.loc[gesture_data.index, velocity_cols] = velocities.values
        df.loc[gesture_data.index, acceleration_cols] = accelerations.values
        df.loc[gesture_data.index, jerk_cols] = jerks.values

    return df
  
def calculate_temporal_stats(df: pd.DataFrame, cols: list):
    mean_cols = [f"mean_{col}" for col in cols]
    var_cols = [f"variance_{col}" for col in cols] 
    dev_cols = [f"deviation_{col}" for col in cols] 
    skew_cols = [f"skew_{col}" for col in cols] 
    kurt_cols = [f"kurt_{col}" for col in cols] 

    for _, gesture_data in df.groupby("gesture_index"):
        gesture_data = gesture_data.sort_values(by="frame")

        df.loc[gesture_data.index, dev_cols] = gesture_data[cols].rolling(2).std(engine="cython").values # might convert these to numpy for better efificeny in the future
        df.loc[gesture_data.index, var_cols] = gesture_data[cols].rolling(2).var(engine="cython").values
        df.loc[gesture_data.index, skew_cols] = gesture_data[cols].rolling(6).skew().values
        df.loc[gesture_data.index, kurt_cols] = gesture_data[cols].rolling(6).kurt().values
        df.loc[gesture_data.index, mean_cols] = gesture_data[cols].expanding().mean(engine="cython").values

    return df

def calculate_landmark_distances(df: pd.DataFrame, cols: list):
    distance_columns = [f"lm_distance_{i}_{j}" for i in range(len(cols)//3) for j in range(len(cols)//3)]

    for _, gesture_data in df.groupby("gesture_index"):
        gesture_data = gesture_data.sort_values(by="frame")
        
        coords = gesture_data[cols].values.reshape(-1, len(cols) // 3, 3)
        distances = np.sqrt(np.sum((coords[:, :, None] - coords[:, None, :])**2, axis=-1))
        
        # we technically should do something called zero out - basically in the df x_0/x_1 == x_1/x_0 (redundant)

        distances_flat = distances.reshape(-1, len(distance_columns))
        df.loc[gesture_data.index, distance_columns] = distances_flat

    return df

def calculate_landmark_angles(df: pd.DataFrame, cols: list):
    angles_per_gesture_list = []
    for _, gesture_data in df.groupby("gesture_index"):
        gesture_data = gesture_data.sort_values(by="frame")
        gesture_points = gesture_data[cols]
        angles_for_gesture = []

        
        # Iterate over each pair of consecutive points
        for i in range(len(gesture_points) - 1):
            point_a = gesture_points.iloc[i]
            point_b = gesture_points.iloc[i + 1]

            angles = []
            
            # Iterate over each landmark
            for j in range(21):
                idx = j  # Adjust if cols include additional information beyond x, y, z (e.g., wx, wy, wz)
                
                # Extract coordinates for point_a and point_b
                ax, ay, az = point_a[f"x_{idx}"], point_a[f"y_{idx}"], point_a[f"z_{idx}"]
                bx, by, bz = point_b[f"x_{idx}"], point_b[f"y_{idx}"], point_b[f"z_{idx}"]

                # Calculate dot product
                dot_prod = ax * bx + ay * by + az * bz

                # Calculate magnitudes
                magnitude1 = np.linalg.norm([ax, ay, az])
                magnitude2 = np.linalg.norm([bx, by, bz])

                # Calculate angle in degrees
                if magnitude1 > 0 and magnitude2 > 0:
                    angle = np.arccos(np.clip(dot_prod / (magnitude1 * magnitude2), -1.0, 1.0)) * (180 / np.pi)
                else:
                    angle = 0.0  # Handle division by zero or near-zero magnitude cases

                angles.append(angle)

            angles_for_gesture.append(angles)

        angles_per_gesture_list.extend(angles_for_gesture) 

    # Create DataFrame with angles_per_gesture_list
    angles_cols = [f"angle_{n1}" for n1 in range(21)]
    angles_per_gesture_list.insert(0, [0.0] * len(angles_cols))
    angles_df = pd.DataFrame(angles_per_gesture_list, columns=angles_cols)
    # Append angles_df to df
    df = pd.concat([df, angles_df], axis=1)

    return df

def calculate_hand_motion_features(df: pd.DataFrame, landmark_cols: list):
    """
    List of features
        Elasped time - time of the the recorded gesture since frame 0 ✅
        velocity ✅
        acceleration ✅
        jerk ✅
        pairwise distances ✅
        landmark angles ✅
        gesture_stats - mean, variance, skewness, and kurtosis ✅

        process time
            elapsed_time_fuc - 0.15625
            temporal - 6.671875
            stats - 24.1875
            landmarks - 85.421875 -> 32.203125 (more like 60 if running all functions)
            angles - 4.265625

        problems to hand - skew, kurt, and variance have null values - because of the lack of fillna. Skew and kurt are bigger problems cuz of rolling (will use interpolation for this and others)
        distance is just not being calculated  ✅
    """
    df_copy = df.copy()

    # s = time.process_time()
    df_elapsed = calculate_elapsed_time(df_copy)    
    # print(time.process_time()-s)


    # s = time.process_time()
    df_temporal = calculate_temporal_features(df_copy, landmark_cols)
    # print(time.process_time()-s)
    
    # s = time.process_time()
    df_stats = calculate_temporal_stats(df_copy, landmark_cols)
    # print(time.process_time()-s)


    # s = time.process_time()
    df_pairwise = calculate_landmark_distances(df_copy, landmark_cols)
    # print(time.process_time()-s)


    # s = time.process_time()
    df_angle = calculate_landmark_angles(df_copy, landmark_cols)
    # print(time.process_time()-s)
   
    
    # s = time.process_time()
    df_combined = pd.concat([df_copy, df_angle], axis=1)
    # print(time.process_time()-s)
    
    # Ensure there are no duplicate columns
    df_combined = df_combined.loc[:,~df_combined.columns.duplicated()]
    return df_combined

In [9]:
def predict_gesture(landmarks_seq, landmark_world_seq, hand_seq, frame_rate, frame_width, frame_height, gesture_action=""):
    gesture_index = int(time.time())

    header = ['frame'] + [f'{coord}_{i}' for i in range(21) for coord in ('x', 'y', 'z')] + [f'{coord}_{i}' for i in range(21) for coord in ('wx', 'wy', 'wz')] + ["hand", "score"] + ['frame_rate', 'frame_width', 'frame_height', 'gesture', 'gesture_index']
    data = [
        [i] + frame_data + wrld_frame_data + hand_data + [frame_rate, frame_width, frame_height, gesture_action, gesture_index] for i, (frame_data, wrld_frame_data, hand_data) in enumerate(zip(landmarks_seq, landmark_world_seq, hand_seq))
    ]

    df = pd.DataFrame(data, columns=header)

    landmark_cols = [col for col in df.columns if col.startswith(("x", "y", "z"))]
    landmark_world_cols = [col for col in df.columns if col.startswith(("wx", "wy", "wz"))]
  
    dataframe = calculate_hand_motion_features(df, landmark_cols)

    csv_buffer = io.StringIO()
    dataframe.to_csv(csv_buffer, columns=dataframe.columns.tolist(), index=False)
    csv_buffer.seek(0)

    input_df= pd.read_csv(csv_buffer)

    input_df = preprocessor.transform(input_df)
    
    pd.DataFrame.to_csv(input_df, "eat2.csv", index=False)

    X_new = np.reshape(input_df, (1, input_df.shape[0], input_df.shape[1]))
  
    prediction = model.predict(X_new)
    predicted_labels = [class_labels[np.argmax(pred)] for pred in prediction]

    print(prediction)
    gesture_counts = Counter(predicted_labels)

    most_common_gesture = gesture_counts.most_common(1)[0][0]

    return most_common_gesture
def get_landmarks(lm_seq, hand_lm):
    landmarks = [lm for lm in hand_lm.landmark]
    landmarks_flat = [coord for lm in landmarks for coord in (lm.x, lm.y, lm.z)]
    lm_seq.append(landmarks_flat)


def get_handedness(hand_seq, hand_lm):
    handedness_flat = [] 
    for handedness in hand_lm.classification:
        handedness_flat.append(handedness.label)
        handedness_flat.append(handedness.score)

    hand_seq.append(handedness_flat)



In [10]:
def record():
    capture = cv2.VideoCapture(0)
    
    isRecording = False
    landmark_seq = []
    landmark_world_seq = []
    hand_seq = []

    frame_rate = capture.get(cv2.CAP_PROP_FPS)
    frame_width = capture.get(cv2.CAP_PROP_FRAME_WIDTH)
    frame_height = capture.get(cv2.CAP_PROP_FRAME_HEIGHT)

    while capture.isOpened():
        ret, frame = capture.read()
        if not ret: 
            break
        
        frame = cv2.flip(frame, 1)
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = hands.process(rgb_frame)
        frame = cv2.cvtColor(rgb_frame, cv2.COLOR_RGB2BGR)

        if results.multi_hand_landmarks and results.multi_hand_world_landmarks:
            for hand_landmarks, hand_world_landmarks, handedness in zip(results.multi_hand_landmarks, results.multi_hand_world_landmarks, results.multi_handedness):
                mp_drawing.draw_landmarks(frame, hand_landmarks, mp_hands.HAND_CONNECTIONS)

                lbl = [cls.label for cls in handedness.classification][0]
                if lbl == "Left":
                    cv2.putText(frame, lbl, (50,50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,0,0), 2, cv2.LINE_AA)
                else:
                    cv2.putText(frame, lbl, (200,50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2, cv2.LINE_AA)

                if isRecording:
                    get_landmarks(landmark_seq, hand_landmarks)
                    get_landmarks(landmark_world_seq, hand_world_landmarks)       
                    get_handedness(hand_seq, handedness)
        
        cv2.imshow("Hand Gesture Recording", frame)
        
        key = cv2.waitKey(5) & 0xFF
        if key == ord("r"):
            isRecording = True 
            print("Recording gesture...")
        elif key == ord("s"):
            isRecording = False 

            if landmark_seq and landmark_world_seq and hand_seq:
                pred_gesture = predict_gesture(landmark_seq, landmark_world_seq, hand_seq, frame_rate, frame_width, frame_height)
                print(f"Predicted Gesture: {pred_gesture}")
            else:
                print("No gestures recorded.")

        # Exit on 'q' key press
        elif key == ord('q'):
            break

    capture.release()
    cv2.destroyAllWindows()

record()

Recording gesture...
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 173ms/step
[[0.01542875 0.0111964  0.08706059 0.81139416 0.04944039 0.02547975]]
Predicted Gesture: ALL-DONE
