### Pose Classifier Final

This notebook creates the final pose classification using the best model we have observed.

In [0]:
import numpy as np
import random
import cv2
from os import listdir
from os.path import isfile, join
import os
import json
import matplotlib.pyplot as plt
import tensorflow as tf


class PoseDataGenerator(tf.keras.utils.Sequence):
    """
    Custom Keras generator for raw pose keypoint data.

    Only body keypoints are extracted and normalized.
    """
    def __init__(self, keyframe_dir, batch_size=32, frames_to_use=-1, is_test=False, shuffle=True):
        self.frames_to_use = frames_to_use
        self.batch_size = batch_size
        self.keyframe_dir = keyframe_dir
        self.shuffle = shuffle
        self.is_test = is_test
        self.classes = self.find_classes()
        self.video_names, self.video_map, self.video_to_class, self.num_samples, self.min_frames = self.find_samples()
        self.on_epoch_end()
        if self.is_test:
            print(f"Found {self.num_samples} frames belonging to {len(self.video_names)} videos (test-mode).")
        else:
            print(f"Found {self.num_samples} frames belonging to {len(self.video_names)} videos belonging to {len(self.classes)} classes.")
        print(f"Min frames determined to be {self.min_frames}")

    def find_classes(self):
        if self.is_test:
            return []
        else:
            category_folders = [f for f in listdir(self.keyframe_dir) if not isfile(join(self.keyframe_dir, f))]
            return sorted(list(set(category_folders)))


    def find_samples(self):
        """
        """
        num_samples = 0
        min_frames = -1
        video_map = {}
        vid_to_cat = {}

        if self.is_test:
            category_folders = [self.keyframe_dir]
        else:
            category_folders = [f for f in listdir(self.keyframe_dir) if not isfile(join(self.keyframe_dir, f))]
        print(category_folders)
        for category_folder in category_folders:
            if self.is_test:
                cat_path = category_folder
            else:
                cat_path = join(self.keyframe_dir, category_folder)
            frames = [f for f in listdir(cat_path) if isfile(join(cat_path, f))]
            for frame in frames:
                frame_arr = frame.split(".mp4_")
                vid_name = frame_arr[0]
                if vid_name not in video_map:
                    video_map[vid_name] = []
                    vid_to_cat[vid_name] = category_folder
                video_map[vid_name].append(frame)
                
            for k in video_map.keys():
                # make sure the frames for each video are in sorted order
                video_map[vid_name] = sorted(video_map[vid_name])
                if min_frames == -1 or len(video_map[vid_name]) < min_frames:
                    min_frames = len(video_map[vid_name])

        return list(video_map.keys()), video_map, vid_to_cat, len(vid_to_cat), min_frames

    def get_body_joints(self, x):
        body_parts = [
            1, # neck       --
            2, # r shoulder
            3, # r elbow
            4, # r wrist
            5, # l shoulder
            6, # l elbow
            7, # l wrist
            9, # r hip
            10, # r knee
            11, # r ankle
            12, # l hip
            13, # l knee
            14, # l ankle   -- 
        ]
        body_parts_xy = []
        for b in body_parts:
            body_parts_xy.append(b * 3)
            body_parts_xy.append(b * 3 + 1)
        return x[body_parts_xy]

    def normalize(self, x_input):
        # Separate original data into x_list and y_list
        lx = []
        ly = []
        N = len(x_input)
        i = 0
        while i<N:
            lx.append(x_input[i])
            ly.append(x_input[i+1])
            i+=2
        lx = np.array(lx)
        ly = np.array(ly)

        # Get rid of undetected data (=0)
        non_zero_x = []
        non_zero_y = []
        for i in range(int(N/2)):
            if lx[i] != 0:
                non_zero_x.append(lx[i])
            if ly[i] != 0:
                non_zero_y.append(ly[i])
        if len(non_zero_x) == 0 or len(non_zero_y) == 0:
            return np.array([0] * N)

        # Normalization x/y data according to the bounding box
        origin_x = np.min(non_zero_x)
        origin_y = np.min(non_zero_y)
        len_x = np.max(non_zero_x) - np.min(non_zero_x)
        len_y = np.max(non_zero_y) - np.min(non_zero_y)
        x_new = []
        for i in range(int(N/2)):
            if (lx[i] + ly[i]) == 0:
                x_new.append(-1)
                x_new.append(-1)
            else:
                x_new.append((lx[i] - origin_x) / len_x)
                x_new.append((ly[i] - origin_y) / len_y)
        return x_new

    def __len__(self):
        """
        Denotes the number of batches per epoch
        """
        return int(np.floor(self.num_samples / self.batch_size))

    def __getitem__(self, index):
        """
        Generate one batch of data
        """
        video_names = self.video_names[index*self.batch_size:(index+1)*self.batch_size]
        num_frames = self.min_frames if self.frames_to_use == -1 else self.frames_to_use
        X = np.zeros((len(video_names), num_frames, 13 * 2 + 1), dtype=np.float64)
        y = []
        i = 0
        for vid in video_names:
            j = 0
            for frame in self.video_map[vid]:
                if self.is_test:
                    keypoint_file = join(self.keyframe_dir, frame)
                else:
                    keypoint_file = join(join(self.keyframe_dir, self.video_to_class[vid]), frame)
                with open(keypoint_file) as json_file:
                    keypoint_data = json.load(json_file)

                    # Extract some features from the keypoint data like averaging
                    arrs = []

                    for person in keypoint_data["people"]:
                        # Each person is assigned the label of the video
                        kp = np.array(person["pose_keypoints_2d"])
                        kp = self.get_body_joints(kp)
                        kp = self.normalize(kp)
                        arrs.append(kp)
                        # if i == 0 and j == 0:
                        #     print(kp)
                    
                    if len(arrs) > 0:
                        arrs = np.array(arrs)
                        features = []
                        features.extend(np.average(arrs, axis=0).tolist())
                        features.append(len(keypoint_data["people"]))
                        features = np.array(features)
                        features[np.isnan(features)] = -1
                        X[i, j, :] = np.array(features)
                    
                j += 1
                if j >= num_frames:
                    break

            if self.is_test:
                y.append(0)
            else:
                y.append(int(self.video_to_class[vid]) - 1)
            i += 1
        y = np.array(y)
        return X, tf.keras.utils.to_categorical(y, num_classes=len(self.classes))

    def on_epoch_end(self):
        if self.shuffle == True:
            np.random.shuffle(self.video_names)




In [0]:
import zipfile
import cv2
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
import os
from os.path import isfile, join
import tensorflow as tf
import tempfile
import shutil



class PoseClassifier:
    """
    Classifies sentiment based on poses extracted from video frames
    """

    def __init__(self, pose_folder, model_location=None, is_test=None, is_zip=True, frames_to_use=12, batch_size=32):
        """
        @param pose_folder    The folder where the list of poses are stored. If 
                              `is_zip` is set to True, this should be a single zip 
                              file containing the poses. Paths can either by a local 
                              folder or a GDrive mounted path.
        @param model_location The pre-trained model to perform predictions
        @param is_test        If set to True, we assume that `pose_folder` contains a flat
                              list of videos. If False, we assume that `pose_folder` first 
                              contains subdirectories corresponding to category labels. 
        @param is_zip         If set to True, the `pose_folder` will be unzipped prior to accessing
        @param frames_to_use  The number of frames to use per video
        @param batch_size     The batch size used to feed into the model evaluation
        """
        self.is_zip = is_zip
        self.pose_folder = pose_folder
        self.is_test = is_test
        self.model_location = model_location
        self.frames_to_use = frames_to_use
        self.batch_size = batch_size
        print(f"PoseClassifier created with is_zip = {is_zip}, pose_folder = {pose_folder} , is_test = {is_test} , model_location = {model_location}")

    def predict(self):
        folder = self.unzip_folder()
        generator = PoseDataGenerator(folder, is_test=self.is_test, frames_to_use=self.frames_to_use, batch_size=self.batch_size)
        model = tf.keras.models.load_model(self.model_location)
        return model.predict(generator)

    def evaluate(self):
        if self.is_test:
            print("Evaluation cannot be done in test-mode")
            return

        folder = self.unzip_folder()
        generator = PoseDataGenerator(folder, is_test=self.is_test, frames_to_use=self.frames_to_use, batch_size=self.batch_size)
        model = tf.keras.models.load_model(self.model_location)
        return model.evaluate(generator)

    def unzip_folder(self):
        if self.is_zip:
              # Unzips files to a temp directory
              tmp_output_folder = "pose_tmp"
              if os.path.exists(tmp_output_folder) and os.path.isdir(tmp_output_folder):
                  print("Removing existing dir...")
                  shutil.rmtree(tmp_output_folder)

              print(f"Unzipping files to temp dir {tmp_output_folder}...")
              Path(f"{tmp_output_folder}").mkdir(parents=True, exist_ok=True)
              with zipfile.ZipFile(self.pose_folder, 'r') as zip_ref:
                  zip_ref.extractall(tmp_output_folder)
              print("Finished unzipping files")
        else:
            tmp_output_folder = self.pose_folder
            print("Skipping unzipping files as input is a folder")
        return tmp_output_folder




In [0]:
from google.colab import drive
drive.mount('/content/drive')



Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [0]:
HOME_DIR = "drive/My Drive/"
pose_classifier = PoseClassifier(
    pose_folder = HOME_DIR + "cs231n-project/datasets/emotiw/train-tiny-pose.zip", 
    is_test = False,
    model_location = HOME_DIR + "cs231n-project/models/pose-classifier-v5.h5"
)
pose_classifier.evaluate()

PoseClassifier created with is_zip = True, pose_folder = drive/My Drive/cs231n-project/datasets/emotiw/train-tiny-pose.zip , is_test = False , model_location = drive/My Drive/cs231n-project/models/pose-classifier-v5.h5
Removing existing dir...
Unzipping files to temp dir pose_tmp...
Finished unzipping files
['1', '3', '2']
Found 50 frames belonging to 50 videos belonging to 3 classes.
Min frames determined to be 12






[0.8743239045143127, 0.59375]

In [0]:
HOME_DIR = "drive/My Drive/"
pose_classifier = PoseClassifier(
    pose_folder = HOME_DIR + "cs231n-project/datasets/emotiw/test-tiny-pose.zip", 
    is_test = True,
    model_location = HOME_DIR + "cs231n-project/models/pose-classifier-v5.h5"
)
pose_classifier.predict()

PoseClassifier created with is_zip = True, pose_folder = drive/My Drive/cs231n-project/datasets/emotiw/test-tiny-pose.zip , is_test = True , model_location = drive/My Drive/cs231n-project/models/pose-classifier-v5.h5
Unzipping files to temp dir pose_tmp...
Finished unzipping files
['pose_tmp']
Found 50 frames belonging to 50 videos (test-mode).
Min frames determined to be 15




array([[0.6961418 , 0.26301387, 0.04084435],
       [0.32225487, 0.3230175 , 0.35472766],
       [0.5572255 , 0.3802794 , 0.06249508],
       [0.4602124 , 0.5062294 , 0.03355821],
       [0.41146445, 0.5709377 , 0.01759779],
       [0.15125038, 0.2793723 , 0.56937736],
       [0.357679  , 0.6239649 , 0.01835612],
       [0.15157497, 0.13344376, 0.71498126],
       [0.35371315, 0.26059487, 0.385692  ],
       [0.308326  , 0.39899087, 0.29268312],
       [0.44515494, 0.5332356 , 0.02160949],
       [0.3417638 , 0.44532776, 0.21290845],
       [0.46070442, 0.5080671 , 0.03122848],
       [0.20537141, 0.20358285, 0.59104574],
       [0.461516  , 0.507118  , 0.03136603],
       [0.3698711 , 0.5790788 , 0.05105008],
       [0.4006774 , 0.5714115 , 0.02791112],
       [0.32452822, 0.6155299 , 0.0599419 ],
       [0.38561583, 0.59843326, 0.01595092],
       [0.37565228, 0.36649704, 0.25785065],
       [0.4620367 , 0.4182856 , 0.11967767],
       [0.16377105, 0.75288755, 0.08334146],
       [0.