In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from g_media_pipe import extract_data_from_video, list_files_in_directory, process_and_save_mp4_to_csv
import ast
import glob
import os

  _warn(("h5py is running against HDF5 {0} when it was built against {1}, "


In [13]:
directory = os.curdir
mp4_directory = os.path.join(directory,"datasets")
BICEP_CURLS_MP4 = os.path.join(mp4_directory,"Bicep_Curls")
PROPER_BICEP_CURLS_MP4 = os.path.join(BICEP_CURLS_MP4,"Proper")
IMPROPER_BICEP_CURLS_MP4 = os.path.join(BICEP_CURLS_MP4,"Improper")
proper_mp4_list = list_files_in_directory(PROPER_BICEP_CURLS_MP4)
improper_mp4_list = list_files_in_directory(IMPROPER_BICEP_CURLS_MP4)

proper_output_csv_directory = os.path.join(directory,"model_data","Bicep_curls","Proper")
improper_output_csv_directory = os.path.join(directory,"model_data","Bicep_curls","Improper")
assert os.path.exists(proper_output_csv_directory) and os.path.exists(improper_output_csv_directory), "Proper check: {} Improper check: {}".format(os.path.exists(proper_output_csv_directory),os.path.exists(improper_output_csv_directory))    

In [15]:
process_and_save_mp4_to_csv(proper_mp4_list,proper_output_csv_directory)

Starting processing of 9 .mp4 files
Extracting data from video file: .\datasets\Bicep_Curls\Proper\dumbellFront.mp4




Saved csv file: 1/9 to .\model_data\Bicep_curls\Proper
Extracting data from video file: .\datasets\Bicep_Curls\Proper\dumbellSide.mp4
Saved csv file: 2/9 to .\model_data\Bicep_curls\Proper
Extracting data from video file: .\datasets\Bicep_Curls\Proper\g1_bicepcurl_1.mp4
Saved csv file: 3/9 to .\model_data\Bicep_curls\Proper
Extracting data from video file: .\datasets\Bicep_Curls\Proper\g1_bicepcurl_2.mp4
Saved csv file: 4/9 to .\model_data\Bicep_curls\Proper
Extracting data from video file: .\datasets\Bicep_Curls\Proper\g1_bicepcurl_3.mp4
Saved csv file: 5/9 to .\model_data\Bicep_curls\Proper
Extracting data from video file: .\datasets\Bicep_Curls\Proper\g1_bicepcurl_4.mp4
Saved csv file: 6/9 to .\model_data\Bicep_curls\Proper
Extracting data from video file: .\datasets\Bicep_Curls\Proper\g1_bicepcurl_5.mp4
Saved csv file: 7/9 to .\model_data\Bicep_curls\Proper
Extracting data from video file: .\datasets\Bicep_Curls\Proper\g1_bicepcurl_6.mp4
Saved csv file: 8/9 to .\model_data\Bicep_cu

In [16]:
process_and_save_mp4_to_csv(improper_mp4_list,improper_output_csv_directory)

Starting processing of 15 .mp4 files
Extracting data from video file: .\datasets\Bicep_Curls\Improper\b1_bicepcurl_1.mp4
Saved csv file: 1/15 to .\model_data\Bicep_curls\Improper
Extracting data from video file: .\datasets\Bicep_Curls\Improper\b1_bicepcurl_2.mp4
Saved csv file: 2/15 to .\model_data\Bicep_curls\Improper
Extracting data from video file: .\datasets\Bicep_Curls\Improper\b1_bicepcurl_3.mp4
Saved csv file: 3/15 to .\model_data\Bicep_curls\Improper
Extracting data from video file: .\datasets\Bicep_Curls\Improper\b1_bicepcurl_4.mp4
Saved csv file: 4/15 to .\model_data\Bicep_curls\Improper
Extracting data from video file: .\datasets\Bicep_Curls\Improper\b2_bicepcurl_1.mp4
Saved csv file: 5/15 to .\model_data\Bicep_curls\Improper
Extracting data from video file: .\datasets\Bicep_Curls\Improper\b2_bicepcurl_2.mp4
Saved csv file: 6/15 to .\model_data\Bicep_curls\Improper
Extracting data from video file: .\datasets\Bicep_Curls\Improper\b3_bicepcurl_2.mp4
Saved csv file: 7/15 to .\m

In [6]:
class VideoProcessingPipeline: 
    """
    Class for identifying the number of squats done in a .mp4 video
    """
    def __init__(self,video_file_path:str,threshold:float = 0.25) -> None:
        self.threshold = threshold
        raw_data = extract_data_from_video(video_path=video_file_path)
        df = pd.DataFrame(raw_data).drop(['NOSE', 'LEFT_EYE_INNER', 'LEFT_EYE', 'LEFT_EYE_OUTER',
       'RIGHT_EYE_INNER', 'RIGHT_EYE', 'RIGHT_EYE_OUTER', 'LEFT_EAR',
       'RIGHT_EAR', 'MOUTH_LEFT', 'MOUTH_RIGHT', 'LEFT_SHOULDER',
       'RIGHT_SHOULDER', 'LEFT_ELBOW', 'RIGHT_ELBOW', 'LEFT_WRIST',
       'RIGHT_WRIST', 'LEFT_PINKY', 'RIGHT_PINKY', 'LEFT_INDEX', 'RIGHT_INDEX','LEFT_THUMB', 'RIGHT_THUMB','LEFT_ANKLE', 'RIGHT_ANKLE', 'LEFT_HEEL', 'RIGHT_HEEL',
       'LEFT_FOOT_INDEX', 'RIGHT_FOOT_INDEX'],axis=1)
        processed_df = self.process_df(df)
        for col in processed_df.columns:
            processed_df[col] = self.smooth_gaussian(self.min_max_scaler(processed_df[col]))
        self.data = processed_df
        
    def process_df(self,data:pd.DataFrame) -> pd.DataFrame:
        if isinstance(data.iloc[0][0],str):
            data = data.map(ast.literal_eval)

        x_coors = data.map(lambda coords: coords[0])  # Get the X coordinates
        y_coors = data.map(lambda coords: coords[1])  # Get the Y coordinates
        z_coors = data.map(lambda coords: coords[2])  # Get the Z coordinates

        # If you want to combine these into a single DataFrame with columns like NOSE_x, NOSE_y, NOSE_z, etc.
        x_coors.columns = [f'{col}_x' for col in x_coors.columns]
        y_coors.columns = [f'{col}_y' for col in y_coors.columns]
        z_coors.columns = [f'{col}_z' for col in z_coors.columns]
        # Combine the x, y, z data into a single DataFrame
        return pd.concat([x_coors, y_coors, z_coors], axis=1)
    
    def min_max_scaler(self,col:pd.Series)->pd.Series:
        min_value = np.min(col)
        max_value = np.max(col)
        return col.map(lambda x: (x-min_value)/(max_value-min_value))
    
    def smooth_gaussian(self,data:pd.Series, sigma=2)->pd.Series:
        """
        Smoothen the curves using a Gaussian filter.

        Parameters:
        - data: NumPy array, the input signal to smooth.
        - sigma: Standard deviation of the Gaussian kernel.

        Returns:
        - smoothed_data: NumPy array of the smoothed data.
        """
        kernel_radius = int(3 * sigma)  # 3 standard deviations cover ~99% of data
        x = np.arange(-kernel_radius, kernel_radius + 1)
        gaussian_kernel = np.exp(-x**2 / (2 * sigma**2))
        gaussian_kernel /= gaussian_kernel.sum()  # Normalize the kernel
        
        # Padding to avoid edge effects
        padded_data = np.pad(data, pad_width=kernel_radius, mode='edge')
        
        # Convolution with Gaussian kernel
        smoothed_data = np.convolve(padded_data, gaussian_kernel, mode='valid')
        
        return smoothed_data
    
    def count_reps(self):
        left_knee_count = right_knee_count = 0
        right_knee_col = self.data['RIGHT_KNEE_z']
        left_knee_col = self.data['LEFT_KNEE_z']
        for i in range(len(right_knee_col)-1):
            if right_knee_col[i] > self.threshold and right_knee_col[i+1] <= self.threshold:
                right_knee_count += 1
            if left_knee_col[i] > self.threshold and left_knee_col[i+1] <= self.threshold:
                left_knee_count += 1
        return (left_knee_count,right_knee_count)

In [7]:
# read a new mp4 file and count the number of reps
#pipeline = VideoProcessingPipeline(r"D:\NirwanaWarehouse\uniWork\Term 7\Capstone\backend\FITTR_WEBSOCKET\datasets\Test\Exercise Library_ Squats_8_reps.mp4")
#save_data(videoData=video_data,output_path=r"D:\NirwanaWarehouse\uniWork\Term 7\Capstone\backend\datasets\Test\Squats_8_reps.csv")
# pipeline = extract_data_from_video()
# pipeline = pd.DataFrame.from_dict(pipeline)
# pipeline.head()

In [8]:
# Define joint groups for left side (similar logic applies to right side)
left_knee_angle_joints = ("LEFT_HIP", "LEFT_KNEE", "LEFT_ANKLE")
left_hip_angle_joints = ("LEFT_SHOULDER", "LEFT_HIP", "LEFT_KNEE")
#left_ankle_angle_joints = ("LEFT_KNEE", "LEFT_ANKLE", "LEFT_FOOT_INDEX")

right_knee_angle_joints = ("RIGHT_HIP", "RIGHT_KNEE", "RIGHT_ANKLE")
right_hip_angle_joints = ("RIGHT_SHOULDER", "RIGHT_HIP", "RIGHT_KNEE")
#right_ankle_angle_joints = ("RIGHT_KNEE", "RIGHT_ANKLE", "RIGHT_FOOT_INDEX")

def euclidean(joint1,joint2):
        joint1 = eval(joint1)
        joint2 = eval(joint2)
        xi,yi,zi = joint1
        xj,yj,zj = joint2
        return (np.square(xi-xj)+np.square(yi-yj)+np.square(zi-zj))**0.5

def calculate_joint_angle(a,b,c):
    """
    Computes 3D joint angle inferred by 3 keypoints and their relative positions to one another
    
    """
    a = np.array(a) # First
    b = np.array(b) # Mid
    c = np.array(c) # End
    
    radians = np.arctan2(c[1]-b[1], c[0]-b[0]) - np.arctan2(a[1]-b[1], a[0]-b[0])
    angle = np.abs(radians*180.0/np.pi)
    
    if angle >180.0:
        angle = 360-angle
        
    return angle 

def smooth_gaussian(data:pd.Series, sigma=2)->pd.Series:
    """
    Smoothen the curves using a Gaussian filter.
    Parameters:
    - data: NumPy array, the input signal to smooth.
    - sigma: Standard deviation of the Gaussian kernel.
    Returns:
    - smoothed_data: NumPy array of the smoothed data.
    """
    kernel_radius = int(3 * sigma)  # 3 standard deviations cover ~99% of data
    x = np.arange(-kernel_radius, kernel_radius + 1)
    gaussian_kernel = np.exp(-x**2 / (2 * sigma**2))
    gaussian_kernel /= gaussian_kernel.sum()  # Normalize the kernel
    
    # Padding to avoid edge effects
    padded_data = np.pad(data, pad_width=kernel_radius, mode='edge')
    
    # Convolution with Gaussian kernel
    smoothed_data = np.convolve(padded_data, gaussian_kernel, mode='valid')
    
    return pd.Series(smoothed_data,index=data.index)

def calculate_angle(joint_a, joint_b, joint_c):
    """
    Calculate the angle between three joints in 3D space using numpy.
    
    Args:
    joint_a (tuple): The (x, y, z) coordinates of the first joint.
    joint_b (tuple): The (x, y, z) coordinates of the second joint (vertex joint).
    joint_c (tuple): The (x, y, z) coordinates of the third joint.
    
    Returns:
    float: The angle in degrees between the three joints.
    """
    # Convert joint coordinates to numpy arrays
    joint_a = np.array(joint_a, dtype=float)
    joint_b = np.array(joint_b, dtype=float)
    joint_c = np.array(joint_c, dtype=float)


    # Calculate vectors: Joint A to B and Joint B to C
    vector_ab = joint_b - joint_a
    vector_bc = joint_c - joint_b
    
    # Calculate dot product and magnitudes using numpy
    dot_product = np.dot(vector_ab, vector_bc)
    magnitude_ab = np.linalg.norm(vector_ab)
    magnitude_bc = np.linalg.norm(vector_bc)
    
    # Avoid division by zero by ensuring valid magnitude values
    if magnitude_ab == 0 or magnitude_bc == 0:
        raise ValueError("One of the vectors has zero length, can't calculate angle.")
    
    # Calculate the cosine of the angle
    cos_theta = dot_product / (magnitude_ab * magnitude_bc)
    
    # Ensure cos_theta is within [-1, 1] to avoid domain errors
    cos_theta = np.clip(cos_theta, -1.0, 1.0)
    
    # Calculate the angle in radians, then convert to degrees
    angle_rad = np.arccos(cos_theta)
    angle_deg = np.degrees(angle_rad)
    
    return angle_deg

def joint_angles_per_record(record, joints):
    """
    Calculate the joint angle for a single record (pd.Series).
    
    Args:
    record (pd.Series): A single row from a DataFrame containing joint positions.
    joints (list): List of three joint names as strings (column names).
    
    Returns:
    float: The calculated angle for the given joints in the record.
    """
    joint_a, joint_b, joint_c = joints

    # Extract joint coordinates directly from the record
    a_coords = record[joint_a]
    b_coords = record[joint_b]
    c_coords = record[joint_c]
    
    # Convert string of coordinates to tuple (x, y, z) if necessary
    if isinstance(a_coords, (list, tuple)):
        pass  # already in (x, y, z) format
    if isinstance(a_coords,str):
        a_coords = ast.literal_eval(a_coords)
        b_coords = ast.literal_eval(b_coords)
        c_coords = ast.literal_eval(c_coords)
    else:
        a_coords = tuple(a_coords)
        b_coords = tuple(b_coords)
        c_coords = tuple(c_coords)
    # Calculate the angle between the joints
    angle = calculate_angle(a_coords, b_coords, c_coords)
    
    return angle

def plot_joint_angles(df, joints, angle_name="Angle"):
    """
    Calculate and plot joint angles over time.

    Args:
    df (pd.DataFrame): DataFrame containing joint positions for each timeframe.
    joints (list): List of three joints as tuples of (x, y, z) columns.
    angle_name (str): Name for the angle being calculated.
    """
    joint_a, joint_b, joint_c = joints
    
    # Initialize an empty list to store the angles
    angles = []
    
    # Iterate over each row (timeframe) in the DataFrame
    for _, row in df.iterrows():
        # Extract joint coordinates
        a_coords = row[joint_a]
        b_coords = row[joint_b]
        c_coords = row[joint_c]
        if isinstance(a_coords,str):
            a_coords = ast.literal_eval(a_coords)
            b_coords = ast.literal_eval(b_coords)
            c_coords = ast.literal_eval(c_coords)
        # Calculate angle and append to list
        angle = calculate_angle(a_coords, b_coords, c_coords)
        angles.append(angle)
    angles = smooth_gaussian(pd.Series(angles))
    
    # Plot the angles over time
    plt.figure(figsize=(10, 6))
    plt.plot(df.index, angles, label=f'{angle_name}')
    plt.xlabel("Timeframe")
    plt.ylabel("Angle (degrees)")
    plt.title(f'{angle_name} Over Time')
    plt.legend()
    plt.grid(True)
    plt.show()