In [None]:
import os
import numpy as np
import pandas as pd
import math
from random import randint, seed
import pickle

# Feature Extraction functions

In [None]:
import math

def get_centroid(hand):
    centroid = [0]*3
    for i in range(3):
        centroid[i] = hand[0][i] + hand[4][i] + hand[8][i] + hand[12][i] + hand[16][i] + hand[20][i]
    centroid = list(x / 6 for x in centroid)
    return centroid

def get_active_fingers(hand):
    centroid = get_centroid(hand)
    
    active = []
    for i in range(1,6):
        finger = hand[4*i]
        active.append((finger[1] - centroid[1]) > 0)

    return active

def get_bent_fingers(hand):
    centroid = get_centroid(hand)
    bent = []    
    
    sign = -1 * math.floor(hand[4][0] - centroid[0])
    bent.append((sign * (hand[4][0] - hand[3][0])) > 0)
    for i in range(2,6):
        bent.append(hand[4*i][1] < hand[4*i - 1][1])

    return bent

def bent_active(window):
    instance = []
    names = []
    for frame_ind in range(len(window)):
        frame = window[frame_ind]
        
        hand1, hand2 = frame[:63], frame[63:126]
        hand1, hand2 = hand1.reshape(21, 3), hand2.reshape(21, 3)
        hand1_active, hand1_bent = get_active_fingers(hand1), get_bent_fingers(hand1)
        hand2_active, hand2_bent = get_active_fingers(hand2), get_bent_fingers(hand2)
        
        lists = [hand1_active, hand2_active, hand1_bent, hand2_bent]
        for l in lists:
            instance.extend(l)
            
        for string in ["frame {} rhand_active {}", "frame {} lhand_active {}", "frame {} rhand_bent {}", "frame {} lhand_bent {}"]:
            for i in range(len(hand1_active)):
                names.append(string.format(frame_ind, i))
                
    return instance, names

In [None]:
# Assumes finite and small size, e.g. 5-10 frames
def time_vectors(window):
    vectors = []
    lengths_3d = []
    lengths_2d = []
    angles = []
    for frame_ind in range(len(window) - 1):
        frame_1, frame_2 = window[frame_ind], window[frame_ind+1]
        for point_ind in range(0, 42*3, 3):
            a, b = frame_1[point_ind:point_ind+3], frame_2[point_ind:point_ind+3]
            v = a - b
            vectors.extend(v)
            lengths_3d.append(np.linalg.norm(v))
            
            v = v[:-1]
            lengths_2d.append(np.linalg.norm(v))
            angles.append(np.arctan(v[1]/v[0] if v[0] != 0 else 0))
    
    vectors.extend(lengths_3d)
    vectors.extend(lengths_2d)
    vectors.extend(angles)
    
    names = []
    for i in range(len(window) - 1):
        for pt_ind in range(42):
            names.append(f'frame {i}-{i+1} diff_x pt {pt_ind}')
            names.append(f'frame {i}-{i+1} diff_y pt {pt_ind}')
            names.append(f'frame {i}-{i+1} diff_z pt {pt_ind}')
    
    for string in ["frame {}-{} pt {} len_3d", "frame {}-{} pt {} len_2d", "frame {}-{} pt {} angle"]:
        for i in range(len(window) - 1):
            for pt_ind in range(42):
                names.append(string.format(i, i+1, pt_ind))

    return vectors, names

In [None]:
def fingertip_dists(window):
    right_inds, left_inds = [12, 24, 36, 48, 60], [75, 87, 99, 111, 123]
    tip_dists = []
    names = []
    
    for frame_ind in range(len(window)):
        frame = window[frame_ind]
        
        for i in range(len(right_inds)):
            for j in range(i+1, len(right_inds)):
                a_ind, b_ind = right_inds[i], right_inds[j]
                a = frame[a_ind:a_ind+3]
                b = frame[b_ind:b_ind+3]
                tip_dists.append(np.linalg.norm(a-b))
                names.append(f'frame {frame_ind} dist {a_ind}-{b_ind}')

        for i in range(len(left_inds)):
            for j in range(i+1, len(left_inds)):
                a_ind, b_ind = left_inds[i], left_inds[j]
                a = frame[a_ind:a_ind+3]
                b = frame[b_ind:b_ind+3]
                tip_dists.append(np.linalg.norm(a-b))
                names.append(f'frame {frame_ind} dist {a_ind}-{b_ind}')
    return tip_dists, names

# Data cleaning and filtering

In [None]:
dir_name = './collated/'
min_samples = 0

In [None]:
labels = list(os.listdir(dir_name))

## Filter scenes with too many bad frames (more than 1/5)

In [None]:
def window_filter(window):
    num = 0
    for frame in window:
        if (frame[0] == 0) and (frame[3] == 0) and (frame[6] == 0):
            num += 1
    return num

In [None]:
rej_dict = {}
for label in labels:
    file_path = dir_name + label
    scenes = pd.read_pickle(file_path)
    print('-'*50)
    print("Label: " + label)
    
    rej_inds = []
    for i, window in enumerate(scenes):
        num_rej = window_filter(window)
        if num_rej > 0:
            print("REJ: {} bad frames out of ".format(num_rej), end='')
        print(len(window), i)
        if (num_rej/len(window)) > 0.2:
            rej_inds.append(i)
    print("REJ WINDOWS:", rej_inds)
    rej_dict[label] = rej_inds

In [None]:
out_dir = './filtered/'

for label in labels:
    file_path = dir_name + label
    scenes = pd.read_pickle(file_path)
    
    valid_scenes = []
    for i, window in enumerate(scenes):
        num_rej = window_filter(window)
        if (num_rej / len(window)) > 0.2:
            print("REJ")
        else:
            valid_scenes.append(window)
    
    print(f'Saving {len(valid_scenes)} scenes out of {len(scenes)} for {label}')
    out_path = out_dir + label
    pickle.dump(valid_scenes, open(out_path, "wb"))

## Filter bad frames from each scene

In [None]:
def clean_scene(window):
    new_window = []
    for frame in window:
        if (frame[0] == 0) and (frame[3] == 0) and (frame[6] == 0):
            pass
        else:
            new_window.append(frame)
    return new_window

In [None]:
dir_name = './filtered/'
out_dir = './cleaned/'

len_dict = {}
for label in labels:
    file_path = dir_name + label
    scenes = pd.read_pickle(file_path)
    
    cleaned_scenes = [clean_scene(window) for window in scenes]
    
    print(f'Saving cleaned scenes of {label}')
    len_dict[label] = len(cleaned_scenes)
    out_path = out_dir + label
    pickle.dump(cleaned_scenes, open(out_path, "wb"))

# Preprocessing and feature extraction

In [None]:
seed(1234)
np_random = np.random.default_rng(1234)

In [None]:
# first, last and x-2 uniform frames in between

def uniform_sample(arr, x):
    res = []
    size = len(arr)
    gap = size // (x - 1)
    for i in range(x - 1):
        res.append(arr[i * gap])
    res.append(arr[-1])
    return res

In [None]:
# 1 from x uniform segments

def random_sample(arr, x):
    res = []
    size = len(arr)
    gap = math.floor(size / x)
    
    for i in range(x):
        r = min(randint(i * gap, (i+1) * gap), size - 1)
        res.append(arr[r])
    return res

## Split into train and test data

In [None]:
selected_labels = ['look', 'same', 'cancel', 'devil', 'dress', 'live']
label_map = {x: i for i, x in enumerate(selected_labels)}

In [None]:
train_scenes = []
test_scenes = []

file_format = "./cleaned/{}_scenes.pkl"
for label in selected_labels:
    file_path = file_format.format(label)
    
    scenes = pd.read_pickle(file_path)
    
    test_size = math.ceil(len(scenes) * 0.3)
    test_inds = np_random.choice(len(scenes), test_size)
    
    for i, scene in enumerate(scenes):
        if i in test_inds:
            test_scenes.append((scene, label))
        else:
            train_scenes.append((scene, label))

## Up/down sample and Extract features

In [None]:
def window_to_instance_w_names(window):
    bents, names1 = bent_active(window)
    times, names2 = time_vectors(window)
    dists, names3 = fingertip_dists(window)
    
    bents.extend(times)
    bents.extend(dists)
    
    names1.extend(names2)
    names1.extend(names3)
    
    return bents, names1

In [None]:
def window_to_instance(window):
    bents, _ = bent_active(window)
    times, _ = time_vectors(window)
    dists, _ = fingertip_dists(window)
        
    bents.extend(times)
    bents.extend(dists)
    
    return bents

In [None]:
frames_choose = 5    # 5 seems fair

In [None]:
# split, down sample and merge

look_scenes = []
other_scenes = []
for tup in train_scenes:
    if tup[1] == 'look':
        look_scenes.append(tup)
    else:
        other_scenes.append(tup)
        
chosen_inds = np_random.choice(len(look_scenes), 20)    # 20 seems fair
for i in chosen_inds:
    other_scenes.append(look_scenes[i])

X_down, y_down = [], []
X_names = None
for scene, label in other_scenes:
    label_id = label_map[label]
    
    y_down.append(label_id)
    
    window = uniform_sample(scene, frames_choose)
    if X_names == None:
        instance, names = window_to_instance_w_names(window)
        X_names = names
    else:
        instance = window_to_instance(window)
    X_down.append(instance)

In [None]:
# upsample less freq classes to triple their instance

X_up, y_up = [], []
for scene, label in train_scenes:
    label_id = label_map[label]
    
    windows = []
    if label == 'look':
        windows.append(uniform_sample(scene, frames_choose))
    else:
        for i in range(3):
            windows.append(random_sample(scene, frames_choose))
    
    for window in windows:
        instance = window_to_instance(window)
        X_up.append(instance)
        y_up.append(label_id)

## Test data

In [None]:
X_test, y_test = [], []
for scene, label in test_scenes:
    label_id = label_map[label]
    
    y_test.append(label_id)
    
    window = uniform_sample(scene, frames_choose)
    X_test.append(window_to_instance(window))

## Save to files

In [None]:
X_up = np.array(X_up)
y_up = np.array(y_up)
X_down = np.array(X_down)
y_down = np.array(y_down)
X_test = np.array(X_test)
y_test = np.array(y_test)

In [None]:
np.save("X_up", X_up)
np.save("y_up", y_up)
np.save("X_down", X_down)
np.save("y_down", y_down)
np.save("X_test", X_test)
np.save("y_test", y_test)

In [None]:
pickle.dump(X_names, open("X_names.pkl", "wb"))

# Model training and evaluation

In [None]:
X_up = np.load("X_up.npy")
y_up = np.load("y_up.npy")
X_down = np.load("X_down.npy")
y_down = np.load("y_down.npy")
X_test = np.load("X_test.npy")
y_test = np.load("y_test.npy")

In [None]:
X_names = pickle.load(open("X_names.pkl", "rb"))

In [None]:
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import precision_recall_fscore_support

from matplotlib import pyplot

In [None]:
selected_labels = ['look', 'same', 'cancel', 'devil', 'dress', 'live']
label_map = {i: x for i, x in enumerate(selected_labels)}

In [None]:
def get_metrics(y_actual, y_predicted):
    precision, recall, fscore, support = precision_recall_fscore_support(y_actual, y_predicted)

    metrics_df = pd.DataFrame()

    metrics_df["Class"] = [label_map[x] for x in range(len(precision))]
    metrics_df["Precision"] = precision
    metrics_df["Recall"] = recall
    metrics_df["F Score"] = fscore

    avg = ["Average",]
    avg.extend(metrics_df.mean(axis=0, numeric_only=True))
    metrics_df.loc[len(metrics_df)] = avg
    
    num_correct = 0
    for i in range(len(y_actual)):
        if y_actual[i] == y_predicted[i]:
            num_correct += 1

    return metrics_df, (num_correct / len(y_actual))

In [None]:
dtc_up = DecisionTreeClassifier()
knn_up = KNeighborsClassifier()

dtc_up.fit(X_up, y_up)
knn_up.fit(X_up, y_up)

dtc_up_pred = dtc_up.predict(X_test)
knn_up_pred = knn_up.predict(X_test)

In [None]:
dtc_up_metrics, dtc_up_acc = get_metrics(y_test, dtc_up_pred)

In [None]:
knn_up_metrics, knn_up_acc = get_metrics(y_test, knn_up_pred)

In [None]:
dtc_down = DecisionTreeClassifier()
knn_down = KNeighborsClassifier()

dtc_down.fit(X_down, y_down)
knn_down.fit(X_down, y_down)

dtc_down_pred = dtc_down.predict(X_test)
knn_down_pred = knn_down.predict(X_test)

In [None]:
dtc_down_metrics, dtc_down_acc = get_metrics(y_test, dtc_down_pred)

In [None]:
knn_down_metrics, knn_down_acc = get_metrics(y_test, knn_down_pred)

In [None]:
print("{:<6} {:<10} : {:0.8f}".format("DTC", "Upsampled", dtc_up_acc))
print("{:<6} {:<10} : {:0.8f}".format("DTC", "Downsampled", dtc_down_acc))
print("{:<6} {:<10} : {:0.8f}".format("KNN", "Upsampled", knn_up_acc))
print("{:<6} {:<10} : {:0.8f}".format("KNN", "Downsampled", knn_down_acc))

In [None]:
display(dtc_up_metrics)
display(dtc_down_metrics)
display(knn_up_metrics)
display(knn_down_metrics)

In [None]:
prec, recall, fscore = dtc_up_metrics['Precision'], dtc_up_metrics['Recall'], dtc_up_metrics['F Score']

xs = np.arange(len(prec)) + 0.5
labels = [label_map[i] for i in range(len(prec) - 1)]
labels.append("Average")

fig, axs = pyplot.subplots(2, 2, sharex=True, figsize=(15, 8))

axs[0, 0].bar(xs, prec, tick_label=labels, color="tab:blue")
axs[0, 0].set_title("Precision")
axs[0, 0].set_ylim([0, 1.1])


axs[0, 1].bar(xs, recall, tick_label=labels, color="tab:red")
axs[0, 1].set_title("Recall")
axs[0, 1].set_ylim([0, 1.1])

axs[1, 0].bar(xs, fscore, tick_label=labels, color="tab:green")
axs[1, 0].set_title("F Score")
axs[1, 0].set_ylim([0, 1.1])

In [None]:
prec, recall, fscore = knn_up_metrics['Precision'], knn_up_metrics['Recall'], knn_up_metrics['F Score']

xs = np.arange(len(prec)) + 0.5
labels = [label_map[i] for i in range(len(prec) - 1)]
labels.append("Average")

fig, axs = pyplot.subplots(2, 2, sharex=True, figsize=(15, 8))

axs[0, 0].bar(xs, prec, tick_label=labels, color="tab:blue")
axs[0, 0].set_title("Precision")
axs[0, 0].set_ylim([0, 1.1])


axs[0, 1].bar(xs, recall, tick_label=labels, color="tab:red")
axs[0, 1].set_title("Recall")
axs[0, 1].set_ylim([0, 1.1])

axs[1, 0].bar(xs, fscore, tick_label=labels, color="tab:green")
axs[1, 0].set_title("F Score")
axs[1, 0].set_ylim([0, 1.1])

In [None]:
important_feats = []
for i, x in enumerate(dtc_up.feature_importances_):
    if not x == 0:
        important_feats.append((X_names[i], x))
        
important_feats.sort(key=lambda x: x[1], reverse=True)

print("{:<25}: {:>10}".format("Feature", "Importance"))
print("-"*50)
for name, val in important_feats:
    print("{:<25}: {:0.8f}".format(name, val))

In [None]:
important_feats = []
for i, x in enumerate(dtc_down.feature_importances_):
    if not x == 0:
        important_feats.append((X_names[i], x))
        
important_feats.sort(key=lambda x: x[1], reverse=True)

print("{:<25}: {:>10}".format("Feature", "Importance"))
print("-"*50)
for name, val in important_feats:
    print("{:<25}: {:0.8f}".format(name, val))