# Evaluating Taekwondo Moves
System to recognize and evaluate video of performed taekwondo moves

##Setup

In [None]:
import sys
import os
from os.path import exists, join, basename, splitext
import time
import math
import pickle
import numpy as np
import cv2
import json
import random
from google.colab.patches import cv2_imshow
from datetime import datetime

In [None]:
!pip install -U scikit-learn

In [None]:
def show_local_mp4_video(file_name, width=640, height=480):
    import io
    import base64
    from IPython.display import HTML
    video_encoded = base64.b64encode(io.open(file_name, 'rb').read())
    return HTML(data='''<video width="{0}" height="{1}" alt="test" controls>
                        <source src="data:video/mp4;base64,{2}" type="video/mp4" />
                      </video>'''.format(width, height, video_encoded.decode('ascii')))

In [None]:
def zip_folder(zip_folder_name, direc):
    """
    Zips up the given folder
    :param zip_folder_name: the name for the zip folder, ex. `smedaram.zip`
    :param direc: the directory for which all the files inside are zipped
    """
    !zip -q -r "$zip_folder_name" "$direc"

In [None]:
def unzip_folder(zip_folder_name, direc='/content'):
    """
    Unzips the given folder
    :param zip_folder_name: Unzips the given .zip file
    :param direc: the directory for where the .zip file is stored, assumed to be within /content, which is the base directory in Google Colab
    """
    !unzip "$zip_folder_name" -d "$direc"

In [None]:
def remove_folder(folder_name):
    !rm -r "$folder_name"

In [None]:
def get_time():
    return f'{datetime.now()}'.replace(' ', '_').replace(':', '-')

In [None]:
#Base Directory
lu = 'drive/MyDrive/Senior_Research'

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
#abbreviations for each of the 10 moves
abbreviations = {
    "High block": "hb",
    "Middle block (in-to-out)": "mbio",
    "Middle block (out-to-in)": "mboi",
    "Knife hand block": "khb",
    "Low block": "lb",
    "Punch (middle-level)": "p",
    "High punch (face-level)": "hp",
    "Front kick": "fk",
    "Round kick": "rk",
    "Side kick": "sk"
}

## Skeleton Representation

Get Existing Custom Datasets as of February 23, 2021

In [None]:
unzip_folder(os.path.join(lu,'VideoPose3D-2021-02-23.zip'))

###Create Custom Datasets

In [None]:
# git_repo_url = 'https://github.com/facebookresearch/VideoPose3D.git'
# project_name = splitext(basename(git_repo_url))[0]
# if not exists(project_name):
#   # clone and install dependencies
#    !git clone -q --depth 1 $git_repo_url

In [None]:
!python3 -m pip install --upgrade pip

In [None]:
# install dependencies: 
!pip install pyyaml==5.1
import torch, torchvision
print(torch.__version__, torch.cuda.is_available())
!gcc --version
# opencv is pre-installed on colab

In [None]:
# install detectron2: (Colab has CUDA 10.1 + torch 1.8)
# See https://detectron2.readthedocs.io/tutorials/install.html for instructions
import torch
assert torch.__version__.startswith("1.8")   # need to manually install torch 1.8 if Colab changes its default version
!pip install detectron2 -f https://dl.fbaipublicfiles.com/detectron2/wheels/cu101/torch1.8/index.html
# exit(0)  # After installation, you need to "restart runtime" in Colab. This line can also restart runtime

In [None]:
# Some basic setup:
# Setup detectron2 logger
import detectron2
from detectron2.utils.logger import setup_logger
setup_logger()

# import some common detectron2 utilities
from detectron2 import model_zoo
from detectron2.engine import DefaultPredictor
from detectron2.config import get_cfg
from detectron2.utils.visualizer import Visualizer
from detectron2.data import MetadataCatalog, DatasetCatalog

*Make sure to upload the pretrained model to the checkpoint folder if you took VideoPose3D straight from GitHub

In [None]:
def create_train_custom_dataset(videos, side, move, slow):
    """
    Creates a custom dataset within VideoPose3D for the given move on the given side given training dataset
    :param videos: the dict of dict of dict objects that contains all the fileids of the videos organized progressively by view, side, and move
    :param side: the side for which to create a custom dataset, ex. `Left`
    :param move: the move for which to create a custom dataset, ex. `High block`
    :param slow: boolean for whether or not the video should be slowed down before its processed frame by frame; currently obsolete, since discovered the slowing down the video does little to change a blurry frame in and of itself
    :return: if downloads are successful, returns the prefix of the move (ex. `mboi_l`) and the custom dataset name (ex. `mboi_l_dataset`); if not, returns None
    """
    prefix = download_all_videos(videos, side, move, slow)
    if prefix != None:
        # create an output folder for the 2D estimates
        twod_outputs_folder = f'{prefix}_2d_outputs'
        if not os.path.exists(twod_outputs_folder):
            os.makedirs(twod_outputs_folder)
        !cd VideoPose3D/inference && python3 infer_video_d2.py --cfg COCO-Keypoints/keypoint_rcnn_R_101_FPN_3x.yaml --output-dir ../../"$twod_outputs_folder"  --image-ext mp4 ../../"$prefix"/
        !cd VideoPose3D/data/ && python3 prepare_data_2d_custom.py -i ../../"$twod_outputs_folder" -o "$prefix"_dataset
        return prefix, f'{prefix}_dataset'
    return None, None

In [None]:
def create_train_custom_dataset_for_given_moves(moves):
    """
    Creates a custom dataset for each side of all moves given
    :param moves: list of strings for all the moves for which to generate custom datasets for, ex. `['High block', 'Low block']
    """
    # pdts = {}
    for m in moves:
        prefix, dts = create_train_custom_dataset(videos, 'Left', m, slow=False)
        # pdts[prefix] = dts
        prefix, dts = create_train_custom_dataset(videos, 'Right', m, slow=False)
        # pdts[prefix] = dts
    folder_name = f"{lu}/VideoPose3D-{get_t()}.zip"
    zip_folder(folder_name, "VideoPose3D/")
    # pickle.dump(pdts, open( "pdts.p", "wb" )

In [None]:
def create_custom_dataset(leadup, ip_folder, slow):
    """
    Creates custom dataset for all the files inside ip_folder
    :param leadup: the path to ip_folder
    :param ip_folder: the name of the folder which contains all video files for which the custom dataset should be made
    :param slow: boolean for whether or not the video should be slowed down before its processed frame by frame; currently obsolete, since discovered the slowing down the video does little to change a blurry frame in and of itself
    :return: the name of the output folder (where the .npy and .mp4 files will be stored) and the name of the custom dataset
    """
    p = os.path.join(leadup, ip_folder)
    if slow == True:
        oup_folder = f'{ip_folder}_slow'
        new_p = os.path.join(leadup, oup_folder)
        if not os.path.exists(new_p):
          os.makedirs(new_p)

        for f in os.listdir(p):
            # print(f)
            vidfn, ext = os.path.splitext(f)
            slow_down_video(os.path.join(p, f), os.path.join(new_p, f), speed=2.0)
    else:
        new_p = p
        oup_folder = ip_folder

    twod_outputs_folder = f'{oup_folder}_2d_outputs'
    if not os.path.exists(twod_outputs_folder):
        os.makedirs(twod_outputs_folder)
    !cd VideoPose3D/inference && python3 infer_video_d2.py --cfg COCO-Keypoints/keypoint_rcnn_R_101_FPN_3x.yaml --output-dir ../../"$twod_outputs_folder"  --image-ext mp4 ../../"$new_p"/
    !cd VideoPose3D/data/ && python3 prepare_data_2d_custom.py -i ../../"$twod_outputs_folder" -o "$oup_folder"_dataset
    return oup_folder, f'{oup_folder}_dataset'

## Get Data

In [None]:
import pandas as pd
import io

In [None]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials
from googleapiclient.http import MediaIoBaseDownload

In [None]:
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [None]:
from google.colab import auth
auth.authenticate_user()
from googleapiclient.discovery import build
drive_service = build('drive', 'v3')

In [None]:
df_train = pd.read_excel(f'{lu}/Evaluating_TKD_Video_Submission_Responses_v2.xlsx', index_col=None, na_values=['NA'], usecols=['Timestamp', 'Athlete Name', 'Level Ability', 'Move', 'Side', 'Iteration #', "View", "Upload video here"])
df_train.head()

In [None]:
df_full = pd.read_excel(f'{lu}/Evaluating_TKD_Video_Submission_Responses_v4.xlsx', index_col=None, na_values=['NA'], usecols=['Timestamp', 'Athlete Name', 'Level Ability', 'Move', 'Side', 'Iteration #', "View", "Upload video here", "Ideal"])
df_full.head()

In [None]:
df_test = pd.read_excel(f'{lu}/Evaluating_TKD_Video_Submission_Responses_v4_Test.xlsx', index_col=None, na_values=['NA'], usecols=['Timestamp', 'Athlete Name', 'Level Ability', 'Move', 'Side', 'Iteration #', "View", "Upload video here"])
df_test.head()

In [None]:
def get_file_ids(df):
    """
    Gets the file ids from the df object
    :param df: the pandas dataframe object
    :return: the dict of dict of dicts objects that store the file ids, progressively organized by view, side, and move
    """
    videos = {'Front': {'Left': {}, 'Right': {}},
              'Diagonal': {'Left': {}, 'Right': {}}}  # view --> side --> moves --> ids

    for index, row in df.iterrows():
        view = videos[row['View']]
        if row['Move'] in view[row['Side']]:
            side = view[row['Side']]
            side[row['Move']].add(row['Upload video here'][row['Upload video here'].find('id') + 3:])
        else:
            side = view[row['Side']]
            side[row['Move']] = set()
            url = row['Upload video here']
            # print(url)
            i = url.find('id') + 3
            # print(i)
            side[row['Move']].add(url[i:])
    return videos

In [None]:
def get_ideal_file_ids(df):
    """
    Gets the ideal file ids (the videos for the "ideal" execution) from the df object
    :param df: the pandas dataframe object
    :return: the dict of dict of dicts objects that store the file ids, progressively organized by view, side, and move
    """

    move2ideal = {}

    for index, row in df.iterrows():
        ideal = row['Ideal']
        if ideal == "Yes":
            prefix = f"{abbreviations[row['Move']]}_{row['Side'][0].lower()}"
            move2ideal[prefix] = row['Upload video here'][row['Upload video here'].find('id') + 3:]
    return move2ideal

In [None]:
def print_videos(videos):
    for view in videos:
        print(f'{view}')
        for side in videos[view]:
            print(f'\t{side}')
            moves = videos[view][side]
            for m in moves:
                print(f'\t\t{m} ({len(moves[m])}): {moves[m]}')

In [None]:
train_videos = get_file_ids(df_train)
print_videos(train_videos)

In [None]:
test_videos = get_file_ids(df_test)
print_videos(test_videos)

In [None]:
def slow_down_video(input_name, output_name, speed=3.0):
    """
    Slows down the given video by the appropriate speed
    :param input_name: the filename for the original video
    :param output_name: the filename for the output video
    :param speed: the speed by which the video should be slowed down, ex. x3
    """

    !ffmpeg -i "$input_name" -filter:v "setpts=$speed*PTS" "$output_name"

In [None]:
def naming_convention(c, prefix, view, count):
    """
    How the new 3D reconstruction video should be named
    :param c: the format by which the video should be named, either 'regular' for the train/test dataset or 'ideal' for the ideal executions
    :param prefix: in train/test sets, the prefix for the given video, ex. 'mbio_l'
    :param view: from which view the video was taken, 'Front' or 'Diagonal'
    :param count: a unique identifier for this video, usually a count within the prefix and view categories
    :return: how the given video should be named
    """
    if c == 'regular':
        filename = prefix + f'{view[0].lower()}_{count}.mp4'
    else:
        filename = prefix + '_ideal.mp4'
    return filename

In [None]:
def download_video_helper(file_id, dir, prefix, view, count, convention):
    """
    Downloads the given video
    :param file_id: the file id for the video (stored in GDrive)
    :param dir: the directory where the downloaded video should be outputted
    :param prefix: the category this video belongs to, ex. 'mbio_l', for the naming convention
    :param view: the view from which this video was taken, ex. 'Diagonal', for the naming convention
    :param count: a unique identifier for this video, usually a count within the prefix and view categories
    :param convention: the format by which the video should be named, either 'regular' for the train/test dataset or 'ideal' for the ideal executions
    """

    request = drive_service.files().get_media(fileId=file_id)
    fh = io.BytesIO()
    downloader = MediaIoBaseDownload(fh, request)
    done = False
    while done is False:
        status, done = downloader.next_chunk()
        # print("Download %d%%." % int(status.progress() * 100))
    fh.seek(0)

    filename = naming_convention(convention, prefix, view, count)

    with open(os.path.join(dir, filename), 'wb') as f:
        f.write(fh.read())
        f.close()

def download_all_videos_helper(videos, dir, prefix, view, side, move):
    """
    Downloads all videos for this move, side, and view
    :param videos: the dict of dict of dict objects that contains all the fileids of the videos organized progressively by view, side, and move
    :param dir: the directory where the downloaded videos should be stored
    :param prefix: in train/test sets, the prefix for the given video, ex. 'mbio_l'
    :param view: from which view the video was taken, 'Front' or 'Diagonal'
    :param side: the side for which to download videos, ex. `Left`
    :param move: the move for which to download videos, ex. `High block`
    """
    count = 0
    for file_id in videos[view][side][move]:
        # print(file_id)
        download_video_helper(file_id, dir, prefix, view, count, 'regular')
        count += 1
    print(f'Downloaded all {view}-view videos of {side}-side {move} moves')

def download_all_video_views(dir, prefix, videos, side, move):
    """
    Downloads all videos for this move and side on all views
    :param dir: the directory where the downloaded videos should be stored
    :param prefix: in train/test sets, the prefix for the given video, ex. 'mbio_l'
    :param videos: the dict of dict of dict objects that contains all the fileids of the videos organized progressively by view, side, and move
    :param side: the side for which to download videos, ex. `Left`
    :param move: the move for which to download videos, ex. `High block`
    """
    if move in videos['Front'][side]:
        download_all_videos_helper(videos, dir, prefix, 'Front', side, move)
    if move in videos['Diagonal'][side]:
        download_all_videos_helper(videos, dir, prefix, 'Diagonal', side, move)

def download_all_videos(leadup, videos, side, move, slow, dts_id=None):
    """
    Downloads all videos in `videos`
    :param leadup: the leadup to the directory where all the videos should be stored
    :param videos: the dict of dict of dict objects that contains all the fileids of the videos organized progressively by view, side, and move
    :param side: the side for which to download videos, ex. `Left`
    :param move: the move for which to download videos, ex. `High block`
    :param slow: whether to slow down the given video, often defaults to false
    :param dts_id: a special identifier added to the end of directory name, ex. `test`
    :return: if successful, the prefix, if not, None
    """
    print('f')
    if slow:
        if dts_id == None:
            dir = os.path.join(leadup, f'{abbreviations[move]}_{side[0].lower()}_init_')
            print(dir)
            if not os.path.exists(dir):  # make the initial videos folder
                os.makedirs(dir)
    else:
        if dts_id == None:
            dir = os.path.join(leadup, f'{abbreviations[move]}_{side[0].lower()}')
        else:
            dir = os.path.join(leadup, f'{abbreviations[move]}_{side[0].lower()}_{dts_id}')

    if dts_id == None:
        fdir = os.path.join(leadup, f'{abbreviations[move]}_{side[0].lower()}')
    else:
        fdir = os.path.join(leadup, f'{abbreviations[move]}_{side[0].lower()}_{dts_id}')

    if not os.path.exists(fdir):
        os.makedirs(fdir)  # make the final videos folder
        prefix = f'{abbreviations[move]}_{side[0].lower()}'
        print(prefix)

        download_all_video_views(dir, prefix, videos, side, move)

        if slow:
            for f in os.listdir(dir):
                vidfn, ext = os.path.splitext(f)
                slow_down_video(os.path.join(dir, f), os.path.join(fdir, f'{vidfn}.mp4'))

        return prefix
    else:
        return None

## Pose Classification

In [None]:
# moves = ["High block", "Middle block (in-to-out)", "Middle block (out-to-in)", "Knife hand block", "Low block", "Punch (middle-level)", "High punch (face-level)", "Front kick", "Round kick", "Side kick"]
# moves = ["High block", "Middle block (in-to-out)", "Middle block (out-to-in)", "Knife hand block", "Low block", "Punch (middle-level)", "High punch (face-level)"]
# moves = ["Front kick", "Round kick", "Side kick"]

In [None]:
def reconstruct_video_helper(path, threed_outputs_folder, custom_dataset, video_fn):
    """
    Runs the command to reconstruct the given video into its 3D representation; creates a .npy (frame by frame position of joints) file and .mp4 video
    :param path: the path to the video
    :param threed_outputs_folder: the (path to the) folder where the output is stored
    :param custom_dataset: the name of the custom dataset which stores the 2D results from Detectron
    :param video_fn: the name of the video to be reconstructed
    """

    p = os.path.join(path, video_fn)
    print(p)
    # print(f'cd VideoPose3D/ && python run.py -d custom -k {custom_dataset} -arc 3,3,3,3,3 -c checkpoint --evaluate pretrained_h36m_detectron_coco.bin --render --viz-subject {video_fn}.mp4 --viz-action custom --viz-camera 0 --viz-video ../{p}.mp4 --viz-output ../{threed_outputs_folder}/{video_fn}_si.mp4 --viz-export ../{threed_outputs_folder}/{video_fn}_si --viz-size 6')
    !cd VideoPose3D/ && python run.py -d custom -k "$custom_dataset" -arc 3,3,3,3,3 -c checkpoint --evaluate pretrained_h36m_detectron_coco.bin --render --viz-subject "$video_fn".mp4 --viz-action custom --viz-camera 0 --viz-video ../"$p".mp4 --viz-output ../"$threed_outputs_folder"/"$video_fn"_si.mp4 --viz-export ../"$threed_outputs_folder"/"$video_fn"_si --viz-size 6

def reconstruct_videos(leadup, prefix, dts, folder=None):
    """
    Runs `reconstruct_video_helper` for all the videos in the given folder
    :param leadup: the path to the folder
    :param prefix: if the videos are the training set, prefix is the name of the folder
    :param dts: the name of the custom dataset
    :param folder: if the videos are not part of the training set, this is the name of the folder
    :return: the name of the output folder
    """

    # path = f'../{prefix}/'
    # print(f'Dataset name: {dts}')
    if folder is None:
        p = os.path.join(leadup, prefix)
        threed_outputs_folder = os.path.join(leadup, f'{prefix}_3d_outputs')
    else:
        p = os.path.join(leadup, folder)
        threed_outputs_folder = os.path.join(leadup, f'{folder}_3d_outputs')

    if not os.path.exists(threed_outputs_folder):
        os.makedirs(threed_outputs_folder)

    for f in os.listdir(p):
        print(f)
        vidfn, ext = os.path.splitext(f)
        reconstruct_video_helper(p, threed_outputs_folder, dts, vidfn)
    return threed_outputs_folder

###Generate Vocabulary Poses with K-means clustering

In [None]:
hand_techniques = ["High block", "Middle block (in-to-out)", "Middle block (out-to-in)", "Knife hand block", "Low block", "Punch (middle-level)", "High punch (face-level)"]
kicking_techniques = ["Front kick", "Round kick", "Side kick"]

In [None]:
from sklearn.cluster import KMeans

In [None]:
h36m_num_joints = 17
h36m_joints = {
    0: [7,8,9],
    1: [14,8,11],
    2: [8,14,15],
    3: [14,15,16],
    4: [8,11,12],
    5: [11,12,13],
    6: [7,1,2],
    7: [1,2,3],
    8: [7,4,5],
    9: [4,5,6],
    10: [16,7,13],
    11: [3,7,6],
    12: [8,7,16],
    13: [8,7,13]
}
def calc_angle(angle_jgroup, coordinates):
    """
    Calculates all the angles as specified by angle_jgroup and coordinates
    :param angle_jgroup: a dictionary which references which 3 joints make which angle
    :param coordinates: a numpy array which represents all the x,y,z coordinates of the joints
    :return: a numpy array of angles between joints in radians
    """
    angles = np.array([])
    for i in angle_jgroup:
        s,c,e = angle_jgroup[i]
        s_hat = coordinates[s] - coordinates[c]
        c_hat = coordinates[c] - coordinates[c]
        e_hat = coordinates[e] - coordinates[c]

        dot_product = s_hat@e_hat
        alpha = math.acos((dot_product)/(np.linalg.norm(s_hat)*np.linalg.norm(e_hat)))

        angles = np.append(angles, alpha)
    return angles

def get_angles(data):
    """
    Calculates angles between joints for each frame in a given video, represented as `data`
    :param data: the .npy file containing a numpy array of coordinates of joints for each frame in video
    :return: all the angles, a series of numpy arrays (each array is one frame)
    """
    ang = np.empty((len(data), len(h36m_joints)))
    for i, frame in enumerate(data):
        f_ang = ang = calc_angle(h36m_joints, frame)
        ang[i] = f_ang
    return ang

In [None]:
def get_cluster_centroids(k, ips):
    """
    Returns `k` cluster centroids for videos represented by `ips`, a bunch of NumPy archives
    :param k: number of clusters
    :param ips: all the input files, a list of .npy files
    :return: scikit-learn's kmeans object, which contains .cluster_centers_
    """
    X = get_angles(np.load(ips[0]))
    i = 1
    while i < len(ips):
        data = get_angles(np.load(ips[i]))
        X = np.concatenate((X, data))
        i += 1

    kmeans = KMeans(n_clusters=k, random_state=0).fit(X)

    # print(kmeans.labels_)
    # cc = kmeans.cluster_centers_
    # print(cc)
    return kmeans

In [None]:
def get_ips(leadup, prefix):
    """
    Gets all the input filenames, as in all the 3D reconstruction .npy files within the folder
    :param leadup: the path to the folder
    :param prefix: the name of the folder (most often the prefix, if its the training set)
    :return: the list of all input filenames (including paths)
    """
    threed_outputs_folder = os.path.join(leadup, f'{prefix}_3d_outputs')
    ips = []
    for f in os.listdir(threed_outputs_folder):
        vidfn, ext = os.path.splitext(f)
        # print(ext)
        if ext == '.npy':
            ips.append(os.path.join(threed_outputs_folder, f))
    print(ips)
    return ips

In [None]:
def get_all_cluster_centroids_helper(leadup, videos, k, side, move, ks, dl=False):
    """
    For a given move and side, gets all the vocabulary poses (for training)
    :param leadup: the path to the videos
    :param videos: the dict of dict of dict objects that contains all the fileids of the videos organized progressively by view, side, and move
    :param k: the number of vocabulary poses for each move
    :param side: the side for which to get vocabulary poses, ex. `Left`
    :param move: the move for which to get vocabulary poses, ex. `High block`
    :param ks: the dictionary which maps prefix to scikit-learn's kmeans object that contains the vocabulary poses as cluster centroids
    :param dl: if a download is necessary, defaults to false since oftentimes the videos have already been downloaded in the current runtime
    :return: the kmeans object containing the vocabulary poses as cluster centroids
    """

    if dl == True:
        prefix = download_all_videos(leadup, videos, side, move, slow=False)
        dts = f'{prefix}_dataset'
        threed_outputs_folder = reconstruct_videos(leadup, prefix, dts)
    else:
        prefix = f'{abbreviations[move]}_{side[0].lower()}'
    ips = get_ips(leadup, prefix)
    kmeans = get_cluster_centroids(k, ips)
    ks[prefix] = kmeans
    return kmeans


def get_all_cluster_centroids(leadup, videos, k, moves, ks, ks_path, dl=False):
    """
    For all the moves given, gets vocabulary poses
    :param leadup: the path to the videos
    :param videos: the dict of dict of dict objects that contains all the fileids of the videos organized progressively by view, side, and move
    :param k: the number of vocabulary poses for each move
    :param moves: list of all the moves for which to get vocabulary poses
    :param ks: the dictionary which maps prefix to scikit-learn's kmeans object that contains the vocabulary poses as cluster centroids
    :param ks_path: the path for where to dump the `ks` object
    :param dl: if a download is necessary, defaults to false since oftentimes the videos have already been downloaded in the current runtime
    """

    for m in moves:
        print(f'{abbreviations[m]}_r')
        get_all_cluster_centroids_helper(leadup, videos, k, "Right", m, ks, dl=dl)
        pickle.dump(ks, open(ks_path, "wb"))
        print(f'{abbreviations[m]}_l')
        get_all_cluster_centroids_helper(leadup, videos, k, "Left", m, ks, dl=dl)
        pickle.dump(ks, open(ks_path, "wb"))

In [None]:
# unzip_folder(os.path.join(lu, 'TrainSet_NPYs&MP4s.zip'))

In [None]:
# ks = {}
hand_vp = 10
kick_vp = 12
# ks_path = os.path.join(lu, f'ks-h{hand_vp}k{kick_vp}.p')

In [None]:
# get_all_cluster_centroids('', train_videos, hand_vp, hand_techniques, ks, ks_path)
# get_all_cluster_centroids('', train_videos, kick_vp, kicking_techniques, ks, ks_path)

###KNN

In [None]:
from sklearn.neighbors import KNeighborsClassifier

In [None]:
def get_data(path_to_ks, suffix, new_identifiers=False):
    """
    Given previously identified number of angles, vocabulary poses for hand techniques, and vocabulary poses for kicking techniques,
    this method takes all vocabulary poses for each move from a `path_to_ks` pickle file, which contains a dictionary that maps prefix to vocab poses, and generates
    a random identifier for each of the 20 possibilities (randomly chosen, to avoid any mixup with similar moves being assigned close numbers), and generates and saves the X,y arrays for KNN training
    :param path_to_ks: the path to the ks pickle file
    :param suffix: the identifier for a specific number of vocabulary poses, ex. h10k12 means 10 vocab poses for hand techniques and 12 vocab poses for kicking techniques
    :param new_identifiers: whether or not to create new identifiers, or use the existing pickle file (defaults to false)
    :return: X (the angles representations of the vocabulary poses), y (the identifier of the prefix the corresponding vocab pose belongs to), the dictionary that maps the random int indentifier to the prefix, and the dictionary that maps the prefix to the identifier
    """

    ks = pickle.load(open(path_to_ks, 'rb'))

    num_angles = 14
    total_poses = (hand_vp * 2 * len(hand_techniques)) + (kick_vp * 2 * len(kicking_techniques))

    X = np.empty((total_poses, num_angles))
    y = np.array([])

    if new_identifiers == True:
        identifier2move = {}
        move2identifier = {}
        identifiers = [i for i in range(len(ks.values()))]
        random.shuffle(identifiers)
    else:
        identifier2move, move2identifier = pickle.load(open(os.path.join(lu, f'moveids.p'), 'rb'))

    count = 0
    for move in ks:
        if new_identifiers == True:
            moveid = identifiers.pop()
            identifier2move[moveid] = move
            move2identifier[move] = moveid
        else:
            moveid = move2identifier[move]
        vocab_poses = ks[move].cluster_centers_
        for pose in vocab_poses:
            X[count] = pose #where X is the collection of vocab poses that map
            y = np.append(y, [moveid])
            count += 1
    np.savez(os.path.join(lu, suffix), X, y)
    if new_identifiers == True:
        pickle.dump((identifier2move, move2identifier), open(os.path.join(lu, f'moveids.p'), 'wb'))
    return X,y, identifier2move, move2identifier

In [None]:
def classify_videos(ips, tech, ks_name, new_ids=False):
    """
    Classifies all videos in `ips` according to whether its a hand or foot technique and to the kmeans objects containing vocabulary poses into one of the 20 possibilities (10 moves, each side)
    :param ips: list of strings for the names of all .npy files that should be classified into a technique
    :param tech: list of strings, either 'h' for hand technique or 'k' for kicking technique, that match up with the list of filenames in `ips` #obsolete in sample runs since we just assume its the technique which requires the largest number of vocabulary poses (so kicking techniques)
    :param ks_name: the name of the pickle file where the kmeans objects from training are stored (containing the vocabulary poses for each of the 20 possibilities), ex. ks-h10k12.p
    :param new_ids: boolean for whether or not new identifiers should be generated, assumed to already be stored in `moveids.p`
    :return: an array of tuples, the first value is the string abbreviation for which move the given video has been classified into and a string for how many out of the total vocabulary poses led to that conclusion
    """
    path_to_ks = os.path.join(lu, ks_name)
    suffix = path_to_ks[path_to_ks.find('-')+1:path_to_ks.find('.p')]

    if not os.path.exists(os.path.join(lu, f'{suffix}.npz')):
        X, y, identifier2move, move2identifier = get_data(path_to_ks, suffix, new_identifiers=new_ids)
    else:
        npzfile = np.load(os.path.join(lu, f'{suffix}.npz'))
        X,y = npzfile['arr_0'], npzfile['arr_1']
        identifier2move, move2identifier = pickle.load(open(os.path.join(lu, f'moveids.p'), 'rb'))

    # neigh = KNeighborsClassifier(n_neighbors=5)
    neigh = KNeighborsClassifier(n_neighbors=10)
    # neigh = KNeighborsClassifier(n_neighbors=12)
    neigh.fit(X, y)

    final_res = []
    for i, ip in enumerate(ips):
        if tech == None or tech[i] == 'k':
          num_vp = int(suffix[suffix.find('k')+1:])
        elif tech[i] == 'h':
            num_vp = int(suffix[suffix.find('h')+1:suffix.find('k')])

        sample_kmeans = get_cluster_centroids(num_vp, [ip])

        res = []
        for pose in sample_kmeans.cluster_centers_:
            res.append(neigh.predict([pose]))

        tally = {}
        for r in res:
            m = identifier2move[r[0]]
            if m not in tally:
                tally[m] = 1
            else:
                tally[m] += 1

        # for w in sorted(tally, key=tally.get, reverse=True):
        #     print(w, tally[w])
            # final_res.append(tally.values())
        maxvote_move = max(tally, key=tally.get)
        final_res.append((maxvote_move, f'{tally[maxvote_move]}/{sum(tally.values())}'))
    return final_res

In [None]:
def pose_classification_helper(leadup, m, moves_type, dts_id, side, ks_name, pr, acc_writer, dl=False):
    """
    Classifies test videos of a given move into one of the 20 possibilities
    :param leadup: the path to the test videos
    :param m: the move for which to download videos, ex. `High block`
    :param moves_type: whether the given move is a hand or kicking technique, 'h' or 'k'
    :param dts_id: a special identifier added to the end of directory name, ex. `test`
    :param side: the side for which to download videos, ex. `Left`
    :param ks_name: the name of where the pickle file storing kmeans objects containing the vocabulary poses is stored
    :param pr: the list of (observed, expected) tuples to which the results of this move's classification results will be appended
    :param acc_writer: the file writer which prints the bare-bones accuracy, misclassified/total
    :param dl: if a download is necessary, defaults to false since oftentimes the videos have already been downloaded in the current runtime
    :return: the prefix (one of the 20 possibilities) and the accuracy
    """
    if dl == True:
        prefix = download_all_videos(leadup, test_videos, side, m, False, dts_id)
        oup_folder, dts = create_custom_dataset(leadup, f'{prefix}_{dts_id}', slow=False)
        reconstruct_videos(leadup, f'{prefix}_{dts_id}', dts)
    else:
        prefix = f'{abbreviations[m]}_{side[0].lower()}'

    ips = get_ips(leadup, f'{prefix}_{dts_id}')
    tech = [moves_type] * len(ips)

    res = classify_videos(ips, tech, ks_name)
    for ip, fr in zip(ips, res):
        print(f'{os.path.basename(ip)}\t|\t{fr[0]}\t{fr[1]}')
        acc_writer.write(f'{os.path.basename(ip)}\t|\t{fr[0]}\t{fr[1]}\n')

    misclass = 0
    for (elem, vote) in res:
        if elem != prefix:
            misclass += 1
        pr.append((elem, prefix))
    acc = 1 - (misclass / len(ips))
    print(f'Accuracy: {acc}')
    acc_writer.write(f'Accuracy: {acc}\n')
    return prefix, acc


def pose_classification(leadup, moves, moves_type, dts_id, ks_name, pr, prefixes, version, dl=False):
    """
    Classifies test videos of every move in `moves` into one of the 20 possibilities
    :param leadup: the path to the test videos
    :param moves: the list of all the moves (ex. ['High block', 'Low block']) for which classification is necessary
    :param moves_type: whether the given move is a hand or kicking technique, 'h' or 'k'
    :param dts_id: a special identifier added to the end of directory name, ex. `test`
    :param ks_name: the name of where the pickle file storing kmeans objects containing the vocabulary poses is stored
    :param pr: the list of (observed, expected) tuples to which the results of this move's classification results will be appended
    :param prefixes: the list of all prefixes generated as a byproduct of this method in order to later calculate the f1 scores
    :param version: a number to identify this specific combination of modifications (ex. different number of vocab poses)
    :param dl: if a download is necessary, defaults to false since oftentimes the videos have already been downloaded in the current runtime
    :return: the list of bare-boned accuracies
    """
    accuracies = []
    acc_writer = open(
        os.path.join(lu, f"pose_class_res_{ks_name[ks_name.find('-') + 1:ks_name.find('.')]}_v{version}.txt"), "a")
    for m in moves:
        prefix, acc = pose_classification_helper(leadup, m, moves_type, dts_id, 'Left', ks_name, pr, acc_writer, dl=dl)
        prefixes.append(prefix)
        accuracies.append(acc)
        prefix, acc = pose_classification_helper(leadup, m, moves_type, dts_id, 'Right', ks_name, pr, acc_writer, dl=dl)
        prefixes.append(prefix)
        accuracies.append(acc)
    avg_accuracy = sum(accuracies) / len(accuracies)
    print(f'Average Accuracy: {avg_accuracy}')
    acc_writer.write(f'Average Accuracy: {avg_accuracy}\n\n')
    acc_writer.close()
    return accuracies

In [None]:
def get_acc_metrics(prefix, pr, metrics_writer):  # searching in pr for this prefix
    """
    Calculates and returns the F1 score for a given move with prefix given the predicted value and the result in pr
    :param prefix: the move for which to calculate the f1 score
    :param pr: an array of (predicted, result) values for all moves in the test set
    :param metrics_writer: the file writer to print the metrics results
    :return: the f1 score
    """

    print(f'{prefix}')
    metrics_writer.write(f'{prefix}\n')
    tp = 0
    fp = 0
    tn = 0
    fn = 0
    for pred, res in pr:
        if pred == res == prefix:
            tp += 1
        elif pred == prefix and res != prefix:
            fp += 1
        elif pred != prefix and res != prefix:
            tn += 1
        elif pred != prefix and res == prefix:
            fn += 1
    try:
        precision = tp / (tp + fp)
    except ZeroDivisionError:
        precision = float("nan")
    try:
        recall = tp / (tp + fn)
    except ZeroDivisionError:
        recall = float("nan")
    print(f'Precision: {precision}')
    metrics_writer.write(f'\tPrecision: {precision}\n')
    print(f'Recall: {recall}')
    metrics_writer.write(f'\tRecall: {recall}\n')
    try:
        f1 = 2 * (precision * recall) / (precision + recall)
    except:
        f1 = float("nan")
    print(f'F1: {f1}')
    metrics_writer.write(f'\tF1: {f1}\n\n')
    return f1

def get_all_f1s(prefixes, pred_res, ks_name, version=0):
    """
    Gets all the f1 scores for every prefix in `prefixes`
    :param prefixes: the list of every prefix for which to calculate an f1 score
    :param pred_res: the pr object that stores (observed, expected) tuples
    :param ks_name: the name of the ks file, from which to get the naming convention to print the f1 results in a file
    :param version: a number to identify this specific combination of modifications (ex. different number of vocab poses)
    :return: a dictionary that maps prefix to f1 score
    """
    f1_writer = open(os.path.join(lu, f"res_{ks_name[ks_name.find('-') + 1:ks_name.find('.')]}_v{version}.txt"), "w")
    f1_score = {}
    for p in prefixes:
        f1_score[p] = get_acc_metrics(p, pred_res, f1_writer)
    f1_writer.close()
    return f1_score

In [None]:
# !cd "$lu/TestSet" && zip -q -r "TestSet_NPYs&MP4s.zip" ''

In [None]:
unzip_folder(os.path.join(lu, 'TestSet_NPYs&MP4s.zip'))

In [None]:
ks_name = f'ks-h{hand_vp}k{kick_vp}.p'
pr = []
prefixes = []
pose_classification('TestSet', hand_techniques, 'h', 'test', ks_name, pr, prefixes, version=5, dl=False)
pose_classification('TestSet', kicking_techniques, 'k', 'test', ks_name, pr, prefixes, version=5, dl=False)

In [None]:
# pickle.dump(pr, open('pr.p', 'wb'))

In [None]:
get_all_f1s(prefixes, pr, ks_name, version=0)

## Temporal Alignment (DTW)

In [None]:
from scipy.spatial.distance import euclidean
from fastdtw import fastdtw

In [None]:
def get_ideal_videos(leadup, dts_id='ideal'):
    """
    Downloads all the ideal videos
    :param leadup: the path for where to output the ideal videos
    :param dts_id: a special identifier added to the end of all files pertaining to the ideal videos (most specifically, the custom datasets), ex. `ideal`
    """

    move2ideal = get_ideal_file_ids(df_full)
    print(move2ideal)
    dir = os.path.join(leadup, dts_id)
    if not os.path.exists(dir):  # make the initial videos folder
        os.makedirs(dir)
    for prefix in move2ideal:
        download_video_helper(move2ideal[prefix], dir, prefix, None, None, dts_id)
    print("Downloaded all ideal videos")
    oup_folder, dts = create_custom_dataset(leadup, dts_id, slow=False)
    reconstruct_videos(leadup, None, dts, dts_id)

In [None]:
# get_ideal_videos()
unzip_folder(os.path.join(lu, 'IdealSet_NPYs&MP4s.zip'))

In [None]:
def dtw_compare_samples(ips, oup):
    """
    Compares all the videos represented by .npy files in `ips` to the output `oup` using fastdtw
    :param ips: the list of strings of the names of input .npy files
    :param oup: the path to the ideal .npy file for this given move
    :return: the distance (similarity) between the ip and oup
    """
    d = []
    y = get_angles(np.load(oup))
    for i in ips:
        X = get_angles(np.load(i))
        distance, path = fastdtw(X, y, dist=euclidean)
        d.append(distance)
    return d


In [None]:
# prefix = 'hb_l'
# ips = get_ips('TestSet', f'{prefix}_test')
# oup = os.path.join('ideal_3d_outputs', f'{prefix}_ideal_si.npy')
# dtw_compare_samples(ips, oup)

In [None]:
#To test whether just skipping the pose classification module and just directly comparing video to all ideals (best score is what this move is considered to be) is better; it's not

def dtw_testing_helper(leadup, prefixes, ip):
    maxs = {}
    for prefix in prefixes:
        oup = os.path.join(leadup, f'{prefix}_ideal_si.npy')
        maxs[prefix] = dtw_compare_samples(ip, oup)
    maxvote_move = min(maxs, key=maxs.get)
    print(maxvote_move)
    return maxvote_move


def dtw_testing(leadup, prefixes, pr_dtw):
    for prefix in prefixes:
        ips = get_ips('TestSet', f'{prefix}_test')
        for ip in ips:
            res = dtw_testing_helper(leadup, prefixes, [ip])
            pr_dtw.append((prefix, res))

pr_dtw = []
dtw_testing('ideal_3d_outputs', prefixes, pr_dtw)
get_all_f1s(prefixes, pr_dtw, ks_name, version=1)

## GUI

In [None]:
!chmod u+x "$lu"/convert2mp4.sh

In [None]:
def convert2mp4(base):
  !cd "$base" && ./../"$lu"/convert2mp4.sh

In [None]:
from IPython.display import clear_output
def evaluate_video():
    """
    Given the name of a video,
    1) ensures its in the necesssary directories and that it's a .mp4 file
    2) Creates a custom dataset
    3) Reconstructs the video
    4) Classifies the video
    5) Temporally aligns with DTW
    6) Prints the results
    """
    file = input('Enter the name of your video (ex. i/am/a/boss/test.mp4): ')  # assume in directory
    lu_pipe, tail = os.path.split(file)
    vidfn, ext = os.path.splitext(tail)

    if ext != ".mp4":
        convert2mp4(lu_pipe)

    ip_f_path = os.path.join(lu_pipe, vidfn)
    if not os.path.exists(ip_f_path):
        os.makedirs(ip_f_path)
    !mv
    "$file" "$ip_f_path"

    oup_f, dts = create_custom_dataset(lu_pipe, vidfn, slow=False)
    threed_outputs_folder_path = reconstruct_videos(lu_pipe, None, dts, oup_f)
    show_local_mp4_video(f'{threed_outputs_folder_path}/{vidfn}_si.mp4')
    ips = get_ips(lu_pipe, oup_f)
    res = classify_videos(ips, None, ks_name)
    move = res[0][0]
    confidence = res[0][1]
    confidence = int(confidence[:confidence.find('/')]) / int(confidence[confidence.find('/') + 1:])

    clear_output()

    print(f"Your video was classified as a {move} with {confidence * 100}% confidence")
    oup = os.path.join('ideal_3d_outputs', f'{move}_ideal_si.npy')
    d = dtw_compare_samples(ips, oup)
    print(f"Relative to the exemplar, your score is {d}. The closer to 0, the better your execution. Try again to get closer to 0.")

In [None]:
evaluate_video()