# Import Necessary libraries

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import random
import math
import collections
import os
import json
import pickle

from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.model_selection import train_test_split

# Defining Global Variables

In [2]:
root_folder = "data"
datatype_folder = "csv"
data_folders = ["buy","communicate","fun","hope","mother","really"]

path = os.path.join("..", root_folder, datatype_folder)

# Sample wise Normalizing
----------------
Various sample normalizing functions to try and decide upon. Convert the data into relative distance rather than absolute cooridnates

In [None]:
# x values are relative to the nose
# left body y values are relative to the leftShoulder_y value
# right body y values are relative to the rightShoulder_y value

def normalize_sample(X):
    x_num = X[:,0]
    left_y = X[:,3]
    right_y = X[:,5]
    
    for col in range(0, len(feature_list), 2):
        X[:,col] = (X[:,col] - x_num)
    
    for col in range(3, len(feature_list), 4):
        X[:,col] = (X[:,col] - left_y)
    
    for col in range(5, len(feature_list), 4):
        X[:,col] = (X[:,col] - right_y)
    
    return X[:, [1,2,4,6,7,8,9,10,11,12,13]]

In [3]:
# x values are relative to the nose_x
# x values are relative to the nose_y

def normalize_sample(X):    
    x_num = X[:,0].copy()
    y_num = X[:,1].copy()
    
    for col in range(0, len(feature_list), 2):
        X[:,col] = X[:,col] - x_num
    
    for col in range(1, len(feature_list), 2):
        X[:,col] = X[:,col] - y_num
    
    return X[:,2:]

In [None]:
# universal normalization in terms of shoulder coordinates
def normalize_sample(X):
    x_denom = abs(X[:,2] - X[:,4]).copy()
    y_denom = abs(X[:,3] - X[:,5]).copy()
    x_num = X[:,0].copy()
    y_num = X[:,1].copy()
    
    for col in range(6, len(feature_list), 2):
        X[:,col] = (X[:,col] - x_num)/x_denom
    
    for col in range(7, len(feature_list), 2):
        X[:,col] = (X[:,col] - y_num)/y_denom
    
    return X[:,6:]

# Extracting only useful data

In [4]:
feature_list = ["nose_x", "nose_y", "leftShoulder_x", "leftShoulder_y", "rightShoulder_x", "rightShoulder_y", "leftElbow_x", "leftElbow_y", "rightElbow_x", "rightElbow_y", "leftWrist_x", "leftWrist_y", "rightWrist_x", "rightWrist_y"]
label_dict = {'buy':0,'communicate':1,'fun':2,'hope':3,'mother':4,'really':5}
X = []
Y = []
for sign in data_folders:
    for file in os.listdir(os.path.join(path, sign)):
        file_path = os.path.join(path, sign, file)
        
        df = pd.read_csv(file_path)[feature_list]
        sample = df.to_numpy()
        sample = normalize_sample(sample)
        # sample = scaling_sample(sample)
        
        X.append(sample)
        Y.append(label_dict[sign])

# Creating dataframe of same size
---------------
Adding content to videos to make them of equal size. Reducing the size of larger videos would remove important information from them.

In [5]:
def extend_data(X, kind="zeros"):
    def zeros(sample, diff, num_features):
        return np.full(shape=(diff, num_features), fill_value=0)
    
    def means(sample, diff, num_features):
        mean_array = np.reshape(np.mean(sample, axis=0), (1,num_features))
        return np.repeat(mean_array, diff, axis=0)
    
    def copies(sample, diff, num_features):
        last_array = np.reshape(sample[-1], (1,num_features))
        return np.repeat(last_array, diff, axis=0)
    
    num_features = X[0].shape[-1]
    max_timeframes = max([len(sign) for sign in X])
    max_timeframes = max_timeframes + 50 - (max_timeframes%50)
    print("All videos now of length: {}".format(max_timeframes))
    
    for i in range(len(X)):
        diff = max_timeframes - len(X[i])
        switcher = {"zeros": zeros(X[i], diff, num_features), 
                   "means": means(X[i], diff, num_features), 
                   "copies": copies(X[i], diff, num_features)}
        append_array = switcher[kind] 
        X[i] = np.append(X[i], append_array, axis=0)
        
    return X

X = extend_data(X, kind="zeros")
X = np.array(X)
Y = np.array(Y)
num_samples, num_timeframes, num_features = X.shape
print("X shape : {} \nY Shape : {}".format(X.shape, Y.shape))

All videos now of length: 250
X shape : (415, 250, 12) 
Y Shape : (415,)


# Saving Noramlized list data
-----------------
saving the entire normalized data and the label so that we dont run it again and again. We are only saving features that we need.

In [6]:
np.save(os.path.join("..", "IPD", "normalized_x.npy"), X)
np.save(os.path.join("..", "IPD", "normalized_y.npy"), Y)

# Train Test split

In [7]:
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2)
print("Training samples : {} \nTesting Samples : {}".format(X_train.shape, X_test.shape))
print("Training Labels : {} \nTesting Labels : {}".format(Y_train.shape, Y_test.shape))

Training samples : (332, 250, 12) 
Testing Samples : (83, 250, 12)
Training Labels : (332,) 
Testing Labels : (83,)


# Scaling data

In [8]:
def scaling(X, kind="minmax", feature_range=(-1,1)):
    scaling_dict = { "standard": StandardScaler(), 
                "minmax": MinMaxScaler(feature_range=feature_range)}
    scaler = scaling_dict[kind]
    scaler.fit(X)
    return scaler

In [9]:
X_train = X_train.reshape((X_train.shape[0],-1))
X_test = X_test.reshape((X_test.shape[0],-1))

scaler = scaling(X_train, kind="minmax", feature_range=(-1,1))
X_train = scaler.transform(X_train)
X_test = scaler.transform(X_test)

X_train = X_train.reshape((-1,num_timeframes,num_features))
X_test = X_test.reshape((-1,num_timeframes,num_features))
print("Training samples : {} \nTesting Samples : {}".format(X_train.shape, X_test.shape))
print("Training Labels : {} \nTesting Labels : {}".format(Y_train.shape, Y_test.shape))

Training samples : (332, 250, 12) 
Testing Samples : (83, 250, 12)
Training Labels : (332,) 
Testing Labels : (83,)


# Save scaling function

In [10]:
pickle.dump(scaler, open(os.path.join("..", "IPD", "scaler.pkl"), "wb"))

# Saving Train - Test split

In [11]:
np.save(os.path.join("..", "IPD", "x_train.npy"), X_train)
np.save(os.path.join("..", "IPD", "x_test.npy"), X_test)
np.save(os.path.join("..", "IPD", "y_train.npy"), Y_train)
np.save(os.path.join("..", "IPD", "y_test.npy"), Y_test)