In [1]:
import tensorflow as tf
import keras
import os
import glob
import json
import pandas as pd
import numpy as np
import csv
import random
from matplotlib import pyplot as plt
from keras.models import Sequential
from keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout, GlobalAveragePooling1D, Input
from keras.optimizers import Adam
from keras.utils import to_categorical
from sklearn.preprocessing import MinMaxScaler

In [2]:
MAX_FRAMES = 315
NUM_CLASSES = 5

# Import Sign Data

In [3]:
def get_features(parquet_path, feature_types = ['face','pose','hand']):
    
    data = pd.read_parquet(parquet_path)
    USE_FACE = 'face' in feature_types 
    USE_POSE = 'pose' in feature_types
    USE_HAND = 'hand' in feature_types

    right_hand = data[data['type'].str.contains('right_hand')]
    left_hand = data[data['type'].str.contains('left_hand')]
    pose = data[data['type'].str.contains('pose')]
    face = data[data['type'].str.contains('face')]

    # Create lists where the element contain the x,y,z 
    # values for specific landmarks at each frame
    right_hand_landmarks = []
    left_hand_landmarks = []
    pose_landmarks = []
    face_landmarks = []
    # merged contains all the landmarks that are requested 
    merged = []

    # Get a list of all of the right hand landmarks by the landmark index
    if USE_HAND:
        for i in right_hand['landmark_index'].unique():
            curr_right_hand_landmarks = pose[pose['landmark_index'] == i].copy()
            curr_right_hand_landmarks.rename(
                columns={
                    "x": "x_right_hand_" + str(i),
                    "y": "y_right_hand_" + str(i),
                    "z": "z_right_hand_" + str(i),
                },
                inplace=True,
            ) 
            curr_right_hand_landmarks.drop(
                    ["row_id", "type", "landmark_index"], axis=1, inplace=True
                )
            curr_right_hand_landmarks.reset_index(drop=True, inplace=True)
            curr_right_hand_landmarks.set_index("frame", inplace=True)
            right_hand_landmarks.append(curr_right_hand_landmarks)
        # Get a list of all of the left hand landmarks by the landmark index
        for i in left_hand['landmark_index'].unique():
            curr_left_hand_landmarks = pose[pose['landmark_index'] == i].copy() 
            curr_left_hand_landmarks.rename(
                columns={
                    "x": "x_left_hand_" + str(i),
                    "y": "y_left_hand_" + str(i),
                    "z": "z_left_hand_" + str(i),
                },
                inplace=True,
            ) 
            curr_left_hand_landmarks.drop(
                    ["row_id", "type", "landmark_index"], axis=1, inplace=True
                )
            curr_left_hand_landmarks.reset_index(drop=True, inplace=True)
            curr_left_hand_landmarks.set_index("frame", inplace=True)
            left_hand_landmarks.append(curr_left_hand_landmarks)
            
        merged_right_hand = pd.concat(right_hand_landmarks, axis = 1)
        merged_left_hand = pd.concat(left_hand_landmarks, axis = 1)
        
        # Handle hand dominance
        right_hand_nans = merged_right_hand.isna().sum().sum()
        left_hand_nans = merged_left_hand.isna().sum().sum()

        right_handed = left_hand_nans >= right_hand_nans
        
        if right_handed:
            merged_right_hand.columns = merged_right_hand.columns.str.replace("_right", "")
            merged.append(merged_right_hand)
        else: 
            center = 0.5
            x_col_left = [col for col in merged_left_hand.columns if "x" in col]
            for col in x_col_left:
                merged_left_hand[col] = center - (merged_left_hand[col] - center)
            merged_left_hand.columns = merged_left_hand.columns.str.replace("_left", "")
            merged.append(merged_left_hand)

    # Get a list of all of the pose landmarks by the landmark index    
    if USE_POSE:
        for i in pose['landmark_index'].unique():
            curr_pose_landmark = pose[pose['landmark_index'] == i].copy() 
            curr_pose_landmark.rename(
                columns={
                    "x": "x_pose_" + str(i),
                    "y": "y_pose_" + str(i),
                    "z": "z_pose_" + str(i),
                },
                inplace=True,
            ) 
            curr_pose_landmark.drop(
                    ["row_id", "type", "landmark_index"], axis=1, inplace=True
                )
            curr_pose_landmark.reset_index(drop=True, inplace=True)
            curr_pose_landmark.set_index("frame", inplace=True)
            pose_landmarks.append(curr_pose_landmark)
        merged_pose = pd.concat(pose_landmarks, axis = 1)
        merged.append(merged_pose)
        
    # Get a list of all of the face landmarks by the landmark index
    if USE_FACE:
        for i in face['landmark_index'].unique():
            curr_face_landmark = face[face['landmark_index'] == i].copy() 
            curr_face_landmark.rename(
                columns={
                    "x": "x_face_" + str(i),
                    "y": "y_face_" + str(i),
                    "z": "z_face_" + str(i),
                },
                inplace=True,
            ) 
            curr_face_landmark.drop(
                    ["row_id", "type", "landmark_index"], axis=1, inplace=True
                )
            curr_face_landmark.reset_index(drop=True, inplace=True)
            curr_face_landmark.set_index("frame", inplace=True)
            face_landmarks.append(curr_face_landmark)
        merged_face = pd.concat(face_landmarks, axis = 1)        
        merged.append(merged_face)
    
    return pd.concat(merged, axis = 1)

In [6]:
# Get the maximum number of frames in all samples for padding later
def get_max_frames():
    ROOT = 'data/filtered_data_5/'
    file_paths = pd.read_csv('data/filtered_data_5/train.csv')['path']
    frame_max = 0
    for index, path in file_paths.items():
        features = get_features(ROOT + path, ['pose'])
        if len(features) > frame_max:
            frame_max = len(features)
    return frame_max
# get_max_frames()      



24

In [5]:
participant_map = {
  "55372": 0, "28656": 1, "53618": 2,
  "62590": 3, "27610": 4, "37779": 5,
  "4718": 6,  "25571": 7, "2044": 8,
  "36257": 9, "29302": 10, "32319": 11,
  "61333": 12, "16069": 13, "30680": 14,
  "49445": 15, "18796": 16, "37055": 17,
  "26734": 18, "22343": 19, "34503": 20
}

sign_to_prediction_map = {
    "eye": 0, "gum": 1, "scissors": 2, "icecream": 3, "story": 4
}

In [170]:
def pad_samples(original_df, desired_frames = MAX_FRAMES):
    num_col = len(original_df.columns)
    original_frames = len(original_df)
    padding_before = (desired_frames - original_frames) // 2
    padding_after = desired_frames - original_frames - padding_before
    
    if desired_frames <= original_frames:
        start = original_frames / 2 - desired_frames / 2 
        end = start + desired_frames
        return original_df.iloc[start:end]
    else: 
        pad_before_df = pd.DataFrame(
            np.zeros((padding_before, num_col)), columns = original_df.columns
        )
        pad_after_df = pd.DataFrame(
            np.zeros((padding_after, num_col)), columns = original_df.columns
        )
    return pd.concat([pad_before_df, original_df, pad_after_df], ignore_index=True)

In [246]:
def get_pose_features():
    POSE_FEATURE_ROOT = 'data/staged/pose_data/train_landmark_files'
    RAW_ROOT = 'data/filtered_data_5/'
    POSE_ROOT = 'data/staged/pose_data'
    file_paths = pd.read_csv('data/filtered_data_5/train.csv')['path']
    for _, path in file_paths.items():
        parts = path.split('/')
        participant = parts[1]
        features = get_features(RAW_ROOT+path, ['pose'])
        features = pad_samples(features)
        full_path = os.path.join(POSE_ROOT, path)
        if not os.path.exists(POSE_FEATURE_ROOT + '/' + participant):
            os.makedirs(POSE_FEATURE_ROOT + '/' + participant)
        features.to_parquet(full_path)

def get_hand_features():
    HAND_FEATURE_ROOT = 'data/staged/hand_data/train_landmark_files'
    RAW_ROOT = 'data/filtered_data_5/'
    HAND_ROOT = 'data/staged/hand_data'
    file_paths = pd.read_csv('data/filtered_data_5/train.csv')['path']
    for _, path in file_paths.items():
        parts = path.split('/')
        participant = parts[1]
        features = get_features(RAW_ROOT+path, ['hand'])
        features = pad_samples(features)
        full_path = os.path.join(HAND_ROOT, path)
        if not os.path.exists(HAND_FEATURE_ROOT + '/' + participant):
            os.makedirs(HAND_FEATURE_ROOT + '/' + participant)
        features.to_parquet(full_path)

def get_hand_face_features():
    HAND_FACE_FEATURE_ROOT = 'data/staged/hand_face_data/train_landmark_files'
    RAW_ROOT = 'data/filtered_data_5/'
    HAND_FACE_ROOT = 'data/staged/hand_face_data'
    file_paths = pd.read_csv('data/filtered_data_5/train.csv')['path']
    for _, path in file_paths.items():
        parts = path.split('/')
        participant = parts[1]
        features = get_features(RAW_ROOT+path, ['hand','face'])
        features = pad_samples(features)
        full_path = os.path.join(HAND_FACE_ROOT, path)
        if not os.path.exists(HAND_FACE_FEATURE_ROOT + '/' + participant):
            os.makedirs(HAND_FACE_FEATURE_ROOT + '/' + participant)
        features.to_parquet(full_path)
        
get_hand_face_features()

In [10]:
def label_test_train():
    files = pd.read_csv('data/filtered_data_5/train.csv')
    def get_sign_class(sign):
        return sign_to_prediction_map.get(sign, -1)
    files['class'] = files['sign'].apply(get_sign_class)
    
    random_split = (list(range(21)))
    random.shuffle(random_split)
    test = random_split[:4]
    train = random_split[4:]
    
    def train(participant_id):
        if participant_map.get(str(participant_id), -1) in test:
            return 0
        else:
            return 1   
    
    files["train"] = files['participant_id'].apply(train)
    new_map = pd.DataFrame()
    new_map["path"] = files['path']
    new_map["class"] = files['class']
    new_map["train"] = files["train"]
    return new_map
    
new_map = label_test_train()
new_map

4


Unnamed: 0,path,class,train
0,train_landmark_files/55372/1304024428.parquet,0,1
1,train_landmark_files/55372/2261487237.parquet,0,1
2,train_landmark_files/55372/3544840292.parquet,0,1
3,train_landmark_files/55372/3566054186.parquet,0,1
4,train_landmark_files/55372/431106023.parquet,0,1
...,...,...,...
1045,train_landmark_files/55372/1820892894.parquet,4,1
1046,train_landmark_files/55372/3987453892.parquet,4,1
1047,train_landmark_files/55372/2910954459.parquet,4,1
1048,train_landmark_files/55372/4031658451.parquet,4,1


In [2]:
ROOT = 'data/staged/hand_data/'
DROP_Z = True

X_test = []
X_train = []
Y_test = []
Y_train = []

# Read in the parquet files as a Data Frame, Fill in NaN with Zero,
# and drop Z axis from all landmarks 
def preprocess_data(full_path):
    data = pd.read_parquet(full_path)
    data.fillna(0.0, inplace=True)
    if DROP_Z:
        return data.loc[:, ~data.columns.str.startswith("z")]
    return data

# Iterate over the map and split into x/y test/train
for index, row in new_map.iterrows():
    path = row[0]
    class_item = row[1]
    train = row[2]
    
    if train == 1:
        full_path = ROOT + path
        Y_train.append(class_item)
        X_train.append(preprocess_data(full_path))
    else: 
        full_path = ROOT + path
        Y_test.append(class_item)
        X_test.append(preprocess_data(full_path))

# iterate over all the dataframes added to the X Train list and convert 
# them into lists of numpy arrays, then stack the list into an 3d numpy array
X_train = [df.to_numpy() for df in X_train]
X_train = np.stack(X_train, axis = 0)

# iterate over all the dataframes added to the X Test list and convert 
# them into lists of numpy arrays, then stack the list into an 3d numpy array
X_test = [df.to_numpy() for df in X_test]
X_test = np.stack(X_test, axis = 0)

# One Hot encode Y Train and Y test 
Y_train = to_categorical(Y_train, num_classes=NUM_CLASSES)
Y_test = to_categorical(Y_test, num_classes=NUM_CLASSES)


# Reshape the 3d numpy array so we can normalize the data
samples, frames, features = X_train.shape
X_train_reshape = X_train.reshape((samples*frames, features))

scaler = MinMaxScaler()
scaler.fit(X_train_reshape)

X_train = scaler.transform(X_train_reshape).reshape(samples, frames, features)

X_test_reshaped = X_test.reshape(X_test.shape[0] * frames, features)
X_test = scaler.transform(X_test_reshaped).reshape(X_test.shape)

NameError: name 'new_map' is not defined

In [1]:
def create1dCNN(X_train, NUM_CLASSES):

    INPUT_SHAPE = X_train.shape[1:]

    model = Sequential()
    model.add(Input(INPUT_SHAPE))
    model.add(Conv1D(filters=32, kernel_size=3, activation='relu'))
    model.add(MaxPooling1D(pool_size=3, padding='same'))
    model.add(Conv1D(filters=64, kernel_size=3, activation='relu'))
    model.add(MaxPooling1D(pool_size=3, padding='same'))
    model.add(Conv1D(filters=64, kernel_size=3, activation='relu'))
    model.add(GlobalAveragePooling1D())
    model.add(Dense(units=64, activation='relu'))
    model.add(Dropout(rate=0.5))
    model.add(Dense(units=NUM_CLASSES, activation='softmax'))

    model.compile(
        optimizer=Adam(), 
        loss='categorical_crossentropy',
        metrics=['accuracy'],
        )
    return model