In [None]:
import numpy as np
import pandas as pd

def find_base_normal_length(points_df, start_point, end_point):
    
    base_normal_length = 0
    
    for i in points_df.index:
        
        if ((points_df.loc[i, start_point] != None) and (points_df.loc[i, end_point] != None)):
            
            start_point_x, start_point_y = points_df.loc[i, start_point]
            end_point_x, end_point_y = points_df.loc[i, end_point]
            
            base_normal_length = int(round(np.sqrt((start_point_x - end_point_x)**2 + (start_point_y - end_point_y)**2)))
            
            break
            
    return base_normal_length

def normalizing_body_points_in_df(points_df, base_normal_start, base_normal_end, base_normal_mult, origin_point):
    
    base_normal_length = find_base_normal_length(points_df, base_normal_start, base_normal_end)
    
    points_norm_df = points_df.copy()
    points_norm_df["normalized?"] = "no"
    points_norm_df["origin_point"] = origin_point

    for i in list(points_norm_df.index):
        
        if points_norm_df.loc[i, origin_point] != None:
            points_norm_df.loc[i, "normalized?"] = "yes"
            origin_x, origin_y = points_norm_df.loc[i, origin_point]
    
            for p in [point for point in points_norm_df.columns if point not in ["normalized?", "origin_point"]]:
                if p == origin_point:
                    norm_x = 0.0
                    norm_y = 0.0
                    points_norm_df.at[i, p] = (norm_x, norm_y)
                    
                elif points_norm_df.loc[i, p] == None:
                    continue
                
                else:
                    p_x, p_y = points_norm_df.loc[i, p]
                    norm_x = (p_x - origin_x) / (base_normal_length * base_normal_mult)
                    norm_y = (p_y - origin_y) / (base_normal_length * base_normal_mult)
                    # norm_y = -1 * norm_y
                    points_norm_df.at[i, p] = (norm_x, norm_y)
            
    return points_norm_df

In [None]:
import numpy as np
import pandas as pd
import cv2
import os
import logging

import matplotlib.pyplot as plt
import seaborn as sns

class VJump_data():

    def __init__(self, video_path):

        self.video_path = video_path

    def create_frame_path(self, frame_counter):
        vid_path, vid_name_ext = os.path.split(self.video_path)
        vid_name, vid_ext = os.path.splitext(vid_name_ext)

        frame_path = os.path.join(vid_path, vid_name)
        
        if not os.path.exists(frame_path):
            os.makedirs(frame_path)
            
        frame_path = os.path.join(frame_path, f"{vid_name}_frame_{frame_counter}.png")

        return frame_path

    def count_and_save_frames(self):

        # Source 1: https://opencv-python-tutroals.readthedocs.io/en/latest/py_tutorials/py_gui/py_video_display/py_video_display.html
        # Source 2: https://www.learnopencv.com/read-write-and-display-a-video-using-opencv-cpp-python/

        # Creating a VideoCapture object.
        cap = cv2.VideoCapture(self.video_path)

        # Getting the video frame width and height.
        self.frame_width = int(cap.get(3))
        self.frame_height = int(cap.get(4))
        
        total_frames = 0

        while(cap.isOpened()):

            # Grabbing each individual frame, frame-by-frame.
            ret, frame = cap.read()

            if ret==True:

                total_frames += 1

                cv2.imwrite(self.create_frame_path(total_frames), frame)

                # Exiting if "Q" key is pressed on the keyboard.
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

            else:
                break

        # Releasing the VideoCapture object.
        cap.release()
        
        self.total_frames = total_frames

    def import_body_net(self):

        # Source for code: https://www.learnopencv.com/deep-learning-based-human-pose-estimation-using-opencv-cpp-python/

        # Specify the paths for the 2 files
        protoFile = "../../inputs_outputs/models/pose_deploy.prototxt"
        weightsFile = "../../inputs_outputs/models/pose_iter_584000.caffemodel"

        # Read the network into Memory
        self.net = cv2.dnn.readNetFromCaffe(protoFile, weightsFile)

    def find_skeleton(self, frame):

        frame_copy = np.copy(frame)

        # Specify the input image dimensions
        inWidth = 368
        inHeight = 368

        # Prepare the frame to be fed to the network
        inpBlob = cv2.dnn.blobFromImage(frame_copy, 1.0 / 255, (inWidth, inHeight), (0, 0, 0), swapRB=False, crop=False)

        # Set the prepared object as the input blob of the network
        self.net.setInput(inpBlob)

        output = self.net.forward()

        H = output.shape[2]
        W = output.shape[3]

        # Empty list to store the detected keypoints
        points = []
        
        for i in range(15):
            # Confidence map of corresponding body's part.
            probMap = output[0, i, :, :]

            # Find global maxima of the probMap.
            minVal, prob, minLoc, point = cv2.minMaxLoc(probMap)

            # Scale the point to fit on the original image
            x = (frame_copy.shape[1] * point[0]) / W
            y = (frame_copy.shape[0] * point[1]) / H

            if prob > 0.5:
                # Add the point to the list if the probability is greater than the threshold
                points.append((int(x), int(y)))

            else:
                points.append(None)
        
        return points
    
    def locate_body_points(self):
    
        # Creating a VideoCapture object.
        cap = cv2.VideoCapture(self.video_path)

        # Defining a new dataframe to store the skeleton point coordinates from each frame of the video.
        df_vid_points = pd.DataFrame()
        frame_counter = 1

        while(cap.isOpened()):

            # Grabbing each individual frame, frame-by-frame.
            ret, frame = cap.read()

            if ret==True:

                print(f"Analysing video - frame {frame_counter} of {self.total_frames}...")
                logging.debug(f"Locating body points - frame {frame_counter} of {self.total_frames}.")

                # Running human skeleton points detection on the grabbed frame.
                # if frame_counter < 4:
                skeleton_frame_points = self.find_skeleton(frame)

                # Saving frame output skeleton point coordinates as a new row in above defined dataframe.
                df_vid_points[frame_counter] = skeleton_frame_points
                frame_counter += 1

                # Exiting if "Q" key is pressed on the keyboard.
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

            else:
                break

        # Releasing the VideoCapture object.
        cap.release()
        
        df_vid_points = df_vid_points.T.copy()
        
        self.df_vid_points = df_vid_points
        
        # self.df_vid_points_norm = self.df_vid_points.copy()

    def normalize_body_points(self):

        # from VJump import Body_Points_Normalization

        self.df_vid_points_norm = normalizing_body_points_in_df(self.df_vid_points, 1, 8, 3, 0)

    def separate_x_y_points(self, df):

        return_df = pd.DataFrame(index=df.index)

        for p in [col for col in df.columns if type(col)==int]:

            p_x = []
            p_y = []
            
            for i in df[p]:

                if i==None:
                    p_x.append(None)
                    p_y.append(None)
                
                else:
                    p_x.append(float(i[0]))
                    p_y.append(float(i[1]))
            
            return_df[f"{p}_x"] = p_x
            return_df[f"{p}_y"] = p_y

        for p in [col for col in df.columns if type(col)!=int]:
            
            return_df[p] = df[p]

        return return_df

In [None]:
# from VJump import VJump_dataset_creation
import cv2
import time
import logging

timestr = time.strftime("%Y%m%d_%H%M%S")
# logging.basicConfig(filename=f"../inputs_outputs/logs/vjump_dataset_creation_log_{timestr}.txt", filemode='a', level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%d-%b-%y %H:%M:%S')
# logging.disable(logging.CRITICAL)

def run_vjump_dataset_creation():

    # logging.debug(f"----- MAIN PROGRAM STARTED -----\n")

    video_read_successfully = False
    attempt = 1

    while video_read_successfully == False and attempt <= 5:
        video_path = input("Provide the video filepath.\n\nINPUT-> ")
        logging.debug(f"Video path given by user: '{video_path}'.")
        print("\n")
        
        try:
            cap = cv2.VideoCapture(video_path)

            # Checking if video can be accessed successfully.
            if (cap.isOpened() == False):
                print("Unable to read video. Re-enter video path.")
                print(f"Attempts remaining: {5-attempt}")
                logging.debug(f"Attempt {attempt} - unable to access video.")
                attempt += 1
            else:
                video_read_successfully = True
                logging.debug(f"Attempt {attempt} - video accessed successfully.")

            # Releasing the VideoCapture object.
            cap.release()

        except:
            print("Unable to read video. Re-enter video path.")
            print(f"Attempts remaining: {5-attempt}")
            logging.exception(f"EXCEPTION OCCURED - Attempt {attempt} - unable to access video.")
            attempt += 1

    if video_read_successfully == False:
        print("Unable to read video.")
        print("Program will exit in 10 seconds.")
        logging.debug(f"Unable to access video after 5 attempts. Program will be TERMINATED.")
        time.sleep(10)
        return
    
    print("\nPROCESSING... PLEASE WAIT!\n")

    my_video = VJump_data(video_path)
    logging.debug(f"VJump_data object successfully created.")

    logging.debug(f"Starting to count total frames and save them.")
    my_video.count_and_save_frames()
    print(f"Total frames in video = {my_video.total_frames}")
    logging.debug(f"Total frames in video = {my_video.total_frames}")

    try:
        logging.debug(f"Starting to import body net model files.")
        my_video.import_body_net()
        logging.debug(f"Successfully imported body net model files.")
    except:
        print("Could not find body net model files to find body points in video. Ensure that files pose_deploy.prototxt & pose_iter_584000.caffemodel are stored in ../inputs_outputs/models folder.\n")
        print("Program will exit in 10 seconds.")
        logging.exception(f"EXCEPTION OCCURED - Unable to access body net model files. Program will be TERMINATED.")
        time.sleep(10)
        return

    try:
        logging.debug(f"Starting to locate body points in video.")
        my_video.locate_body_points()
        logging.debug(f"Successfully located body points in video.")
    except:
        print("Error while locating body points on video. Please provide a new video.\n")
        print("Program will exit in 10 seconds.")
        logging.exception(f"EXCEPTION OCCURED - Unable to locate body points in video. Program will be TERMINATED.")
        time.sleep(10)
        return
    
    # return my_video.df_vid_points
    
    try:
        print("Normalizing body points...")
        logging.debug(f"Starting to normalize body points.")
        my_video.normalize_body_points()
        print("Successfully normalized body points.\n")
        logging.debug(f"Successfully normalized body points.")

    except:
        print("Error while normalizing body points. Please provide a new video.\n")
        print("Program will exit in 10 seconds.")
        logging.exception(f"EXCEPTION OCCURED - Unable to normalize body points. Program will be TERMINATED.")
        time.sleep(10)
        return
    
    # return my_video.df_vid_points, my_video.df_vid_points_norm
    
    my_video.df_vid_points = my_video.separate_x_y_points(my_video.df_vid_points)
    my_video.df_vid_points_norm = my_video.separate_x_y_points(my_video.df_vid_points_norm)
    
    return my_video.df_vid_points, my_video.df_vid_points_norm

In [None]:
df, df_norm = run_vjump_dataset_creation()

In [None]:
df

In [None]:
df_norm

In [None]:
df_frame_labels = pd.DataFrame(columns=["squat", "jump_peak", "landing"], index=range(1, 67+1), data=0)
df_frame_labels

In [None]:
import re

txt = "1, 2, 4000, 6, 777 , 10. "
x = re.findall("[0-9]+", txt)

for i, frame in enumerate(x):
    x[i] = int(frame)

x = sorted(x)
x

In [None]:
import re

txt = "The rain in Spain"
x = re.findall("^The.*Spain$", txt)
x

In [None]:
df_frame_labels

In [21]:
import os
import zipfile

my_video = "C:\\Users\\USER\\Documents\\Python Scripts\\wonder_years_fitness_ai\\w_yrs_poc1\\inputs_outputs\\videos\\vertical_jump_side_trial_1"

In [22]:
print(my_video)

C:\Users\USER\Documents\Python Scripts\wonder_years_fitness_ai\w_yrs_poc1\inputs_outputs\videos\vertical_jump_side_trial_1


In [23]:
counter=0
for root, dirs, files in os.walk(my_video):
    print(len(root))
    print(len(dirs))
    print(len(files))

122
0
70


In [35]:
with zipfile.ZipFile(my_video+".zip", "w") as myzip:
    for root, dirs, files in os.walk(my_video):
        for file in files:
            myzip.write(filename=os.path.join(my_video, file), arcname=file)

In [29]:
for files in os.walk(my_video):
    print(files)

('C:\\Users\\USER\\Documents\\Python Scripts\\wonder_years_fitness_ai\\w_yrs_poc1\\inputs_outputs\\videos\\vertical_jump_side_trial_1', [], ['vertical_jump_side_trial_1_FEATURES_BODY_POINTS.csv', 'vertical_jump_side_trial_1_FEATURES_BODY_POINTS_NORMALIZED.csv', 'vertical_jump_side_trial_1_FRAME_1.png', 'vertical_jump_side_trial_1_FRAME_10.png', 'vertical_jump_side_trial_1_FRAME_11.png', 'vertical_jump_side_trial_1_FRAME_12.png', 'vertical_jump_side_trial_1_FRAME_13.png', 'vertical_jump_side_trial_1_FRAME_14.png', 'vertical_jump_side_trial_1_FRAME_15.png', 'vertical_jump_side_trial_1_FRAME_16.png', 'vertical_jump_side_trial_1_FRAME_17.png', 'vertical_jump_side_trial_1_FRAME_18.png', 'vertical_jump_side_trial_1_FRAME_19.png', 'vertical_jump_side_trial_1_FRAME_2.png', 'vertical_jump_side_trial_1_FRAME_20.png', 'vertical_jump_side_trial_1_FRAME_21.png', 'vertical_jump_side_trial_1_FRAME_22.png', 'vertical_jump_side_trial_1_FRAME_23.png', 'vertical_jump_side_trial_1_FRAME_24.png', 'vertical

In [10]:
for root, dirs, files in os.walk(my_video):
    for file in files:
        ziph.write(os.path.join(root, file))

if __name__ == '__main__':
    zipf = zipfile.ZipFile('Python.zip', 'w', zipfile.ZIP_DEFLATED)
    zipdir('tmp/', zipf)
    zipf.close()

FileNotFoundError: [WinError 2] The system cannot find the file specified: 'vertical_jump_side_trial_1'