In [1]:
import os
import cv2 as cv
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler

In [2]:
# read_video_to_movement returns the list of joint positions found using openCV
# Also returns a count of the number of joints not found, to be used to measure accuracy

def read_video_to_movement(vidpath):
    data = []
    net = cv.dnn.readNetFromTensorflow("graph_opt.pb")
    cap = cv.VideoCapture(vidpath)

    BODY_PARTS = { "Nose": 0, "Neck": 1, "RShoulder": 2, "RElbow": 3, "RWrist": 4,
                   "LShoulder": 5, "LElbow": 6, "LWrist": 7, "RHip": 8, "RKnee": 9,
                   "RAnkle": 10, "LHip": 11, "LKnee": 12, "LAnkle": 13, "Background": 14 }

    POSE_PAIRS = [ ["Neck", "RShoulder"], ["Neck", "LShoulder"], ["RShoulder", "RElbow"],
                   ["RElbow", "RWrist"], ["LShoulder", "LElbow"], ["LElbow", "LWrist"],
                   ["Neck", "RHip"], ["RHip", "RKnee"], ["RKnee", "RAnkle"], ["Neck", "LHip"],
                   ["LHip", "LKnee"], ["LKnee", "LAnkle"], ["Neck", "Nose"] ]

    error = 0
    while cv.waitKey(1) < 0:
        hasFrame, frame = cap.read()
        if not hasFrame:
            break

        #frame = cv.rotate(frame, cv.ROTATE_180) #ONLY needed if raw iPhone data
        frameWidth = frame.shape[1]
        frameHeight = frame.shape[0]
        net.setInput(cv.dnn.blobFromImage(frame, 1.0, (368, 368), (127.5, 127.5, 127.5), swapRB=True, crop=False))
        out = net.forward()
        out = out[:, :15, :, :]

        assert(len(BODY_PARTS) == out.shape[1])

        points = []
        for i in range(len(BODY_PARTS)):
            heatMap = out[0, i, :, :]
            _, conf, _, point = cv.minMaxLoc(heatMap)
            x = (frameWidth * point[0]) / out.shape[3]
            y = (frameHeight * point[1]) / out.shape[2]
            if conf > 0.1:
                points.append(np.array([x, y]))
            else:
                points.append(np.array([None,None]))
                error += 1

        data.append(points)
    return (data, error)

In [10]:
# Collect movements in a list, calculate number of missed joints
def generate_movement_list(vid_names, data_path):
    movement_list = []
    total_missed = 0
    NUM_FRAMES = float('inf')

    for i in range(len(vid_names)):
        print(i) # Cheap Status Bar
        (movement, missed) = read_video_to_movement(data_path + vid_names[i])
        movement_list.append(movement)
        vid_frames = len(movement)
        total_missed += missed
        if vid_frames < NUM_FRAMES:
            NUM_FRAMES = vid_frames
        print(NUM_FRAMES)

    # Compare number of missed joints to the number of expected joints
    expected_num_joints = NUM_FRAMES * 15 * NUM_VIDS
    error_factor = total_missed/expected_num_joints
    return(movement_list, error_factor, NUM_FRAMES)

In [4]:
# vectorize_movement transforms a movement into a characteristic vector
# A higher value for num_chunks increases the resolution of the characteristics

def vectorize_movement(movement, num_chunks, NUM_PARTS, NUM_FRAMES):
    move_vec = []
    chunk_size = int(NUM_FRAMES/num_chunks)
    for n in range(num_chunks):
        for joint in range(NUM_PARTS):
            x_pos_disp = 0
            x_neg_disp = 0
            y_pos_disp = 0
            y_neg_disp = 0

            for i in range(chunk_size - 1):
                prev_frame = movement[n*chunk_size + i]
                frame = movement[n*chunk_size + i + 1]

                if prev_frame[joint].all() and frame[joint].all():
                    disp = prev_frame[joint] - frame[joint]

                    if disp[0] > 0:
                        x_pos_disp += disp[0]
                    else:
                        x_neg_disp += disp[0]
                    if disp[1] > 0:
                        y_pos_disp += disp[1]
                    else:
                        y_neg_disp += disp[1]
            move_vec += [x_pos_disp, x_neg_disp, y_pos_disp, y_neg_disp]

    return move_vec

In [5]:
# Calculate characteristic vector of each movement
def calculate_X(movement_list, NUM_FRAMES, NUM_PARTS):
    move_vec_list = []
    for i in range(len(movement_list)):     #need to do AFTER determining min num_frames
        move_vec = vectorize_movement(movement_list[i][:NUM_FRAMES], 1, NUM_PARTS, NUM_FRAMES)
        move_vec_list.append(np.array(move_vec))

    # X is the data set we will use for PCA
    X = np.array(move_vec_list)
    return X

In [6]:
# Collecting dance files and setting test constants
data_path = "/Users/emmawaters/Desktop/Dance/Prod/"
song_list = ['Aurora', 'Aphex', 'Armatrading', 'Tnertle', 'Willow', 'Jiggle']
vid_names = []
dir_list = os.listdir(data_path)
for i in dir_list:
    if i.endswith(".mp4"):
        vid_names.append(i)
        
raw_name_dict = {'Aurora':[], 'Aphex':[], 'Armatrading':[], 'Tnertle':[], 'Willow':[], 'Jiggle':[]}
for vid in vid_names:
    name_split = vid.split('.')[0].split('_')
    raw_name_dict[name_split[1]].append(vid)

print(raw_name_dict)

{'Aurora': ['Falcon_Aurora.mp4', 'Sunny_Aurora.mp4', 'Dorissa_Aurora.mp4', 'Sophia_Aurora.mp4', 'Imogen_Aurora.mp4', 'Dot_Aurora.mp4', 'Lia_Aurora.mp4'], 'Aphex': ['Dorissa_Aphex.mp4', 'Falcon_Aphex.mp4', 'Dot_Aphex.mp4', 'Sophia_Aphex.mp4', 'Sunny_Aphex.mp4', 'Lia_Aphex.mp4', 'Imogen_Aphex.mp4'], 'Armatrading': ['Sunny_Armatrading.mp4', 'Dorissa_Armatrading.mp4', 'Lia_Armatrading.mp4', 'Imogen_Armatrading.mp4', 'Falcon_Armatrading.mp4', 'Sophia_Armatrading.mp4', 'Dot_Armatrading.mp4'], 'Tnertle': ['Imogen_Tnertle.mp4', 'Lia_Tnertle.mp4', 'Dorissa_Tnertle.mp4', 'Sophia_Tnertle.mp4', 'Sunny_Tnertle.mp4', 'Falcon_Tnertle.mp4', 'Dot_Tnertle.mp4'], 'Willow': ['Sunny_Willow.mp4', 'Falcon_Willow.mp4', 'Sophia_Willow.mp4', 'Dorissa_Willow.mp4', 'Imogen_Willow.mp4', 'Lia_Willow.mp4', 'Dot_Willow.mp4'], 'Jiggle': ['Sunny_Jiggle.mp4', 'Falcon_Jiggle.mp4', 'Sophia_Jiggle.mp4', 'Dorissa_Jiggle.mp4', 'Imogen_Jiggle.mp4', 'Lia_Jiggle.mp4', 'Dot_Jiggle.mp4']}


In [7]:
#started @12:42pm

In [11]:
movement_lists = {'Aurora':[], 'Aphex':[], 'Armatrading':[], 'Tnertle':[], 'Willow':[], 'Jiggle':[]}
data_dict = {'Aurora':[], 'Aphex':[], 'Armatrading':[], 'Tnertle':[], 'Willow':[], 'Jiggle':[]}
errors = []
for song in song_list:
    NUM_PARTS = 15
    NUM_TIME_CHUNKS = 5
    NUM_VIDS = len(vid_names)
    
    (movement_list, error_factor, NUM_FRAMES) = generate_movement_list(raw_name_dict[song], data_path)
    movement_lists[song]=movement_list
    errors.append(error_factor)
    print(NUM_FRAMES)
    data_dict[song] = calculate_X(movement_list, NUM_FRAMES, NUM_PARTS)
print(errors)

0
820
1
820
2
820
3
812
4
812
5
812
6
812
812


IndexError: list index out of range