###### General Steps to Follow
1. Importing Packages
1. Test Bicep

# -------------------------------------------------------------------------------------------------------------

## 1) Importing Packages

In [1]:
import os
import sys
REPO_DIR_PATH = os.path.normpath(os.path.join(
    os.path.join(os.getcwd(), "..")))
sys.path.append(REPO_DIR_PATH)

In [2]:
from src.models.predict_model import ModelPredictor
import cv2
import mediapipe as mp
from src.data_preprocessing.data_transformation import DataTransformer
from scipy.signal import medfilt
from scipy.signal import find_peaks
import numpy as np
from datetime import datetime

2024-04-14 13:25:34.598255: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


# -------------------------------------------------------------------------------------------------------------

# -------------------------------------------------------------------------------------------------------------

## 2) Data Preprocessing

In [4]:
# Setup mediapipe instance
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils
pose_model = mp_pose.Pose(min_detection_confidence = 0.5, min_tracking_confidence = 0.5)

I0000 00:00:1713093941.464133   11031 gl_context_egl.cc:85] Successfully initialized EGL. Major : 1 Minor: 5
I0000 00:00:1713093941.493477   11107 gl_context.cc:357] GL version: 3.2 (OpenGL ES 3.2 NVIDIA 535.171.04), renderer: NVIDIA GeForce GTX 1650/PCIe/SSE2
INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


#### 1- Get frames and angles from the video

In [5]:
def calculate_angle(a,b,c):
    a = np.array(a) # First
    b = np.array(b) # Mid
    c = np.array(c) # End
    
    radians = np.arctan2(c[1]-b[1], c[0]-b[0]) - np.arctan2(a[1]-b[1], a[0]-b[0])
    angle = np.abs(radians*180.0/np.pi)
    
    if angle >180.0:
        angle = 360-angle
        
    return angle

In [6]:
def get_video_frames_and_angles(video_source):
    cap = cv2.VideoCapture(video_source)
    frames = []
    angles = []
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        frames.append(frame)
        

        # Recolor image to RGB
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False

        # Make detection
        results = pose_model.process(image)

        # Recolor back to BGR
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

        # Extract landmarks
        if results.pose_landmarks == None:
            continue
        landmarks = results.pose_landmarks.landmark

        # Get coordinates
        shoulder = [landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x,landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y]
        elbow = [landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].x,landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].y]
        wrist = [landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].x,landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].y]

        # Calculate angle
        angle = calculate_angle(shoulder, elbow, wrist)
        angles.append(angle)

        # Visualize angle
        height, width, _ = image.shape
        cv2.putText(image, str(round(angle,2)), 
                       tuple(np.multiply(elbow, [width+40, height+20]).astype(int)), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2, cv2.LINE_AA)

        # Rep data
        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                                mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=2), 
                                mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2) 
                                 )   
        cv2.imshow('Mediapipe Feed', image)

        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()
            
    return frames, angles

#### 2- Apply median filter to the angles

In [7]:
def MedianFilter(angles):
    window_size = 41
    # Apply median filter to the angles_series
    denoised_angles = medfilt(angles, kernel_size=window_size)
    return denoised_angles

#### 3- Get the peaks and troughs

In [8]:
def get_peaks_and_troughs(angles):
    peaks, _ = find_peaks(angles, prominence=0.1)  # Adjust prominence threshold as needed
    troughs, _ = find_peaks(-angles, prominence=0.1)  # Find troughs by negating the angle data
    return peaks, troughs

#### 4- Get the cycles by deviding the frames according to the increasing/decreasing in the angles (peaks/troughs)

In [9]:
def get_cycles(frames, peaks):
    cycles = []
    for peak_index in range(len(peaks)-1):
        cycle = []
        cnt = peaks[peak_index]
        while True:
            cycle.append(frames[cnt])
            if cnt == peaks[peak_index+1]:
                break
            cnt+=1
        cycles.append(cycle)
    return cycles

In [34]:
def save_cycle_frames_as_video(cycle_frames, output_path, fps=30):
    """
    Save a list of frames as a video file.
    input:
        cycle_frames(list): List of frames
        output_path(str): Output video file path
        fps(int): Frames per second
    output:
        None
    """
    try:
        # Get the shape of the first frame to determine video dimensions
        height, width, _ = cycle_frames[0].shape

        # Define the codec and create VideoWriter object
        # Choose the codec (here, MP4V)
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

        try:
            # Write each frame to the video file
            for frame in cycle_frames:
                out.write(frame)
        finally:
            # Release the VideoWriter object
            out.release()
    except Exception as e:
        logging.error("Error: "+str(e))
        raise CustomException(e, sys)

def save_cycles_as_videos(cycles: list):
    """
    This function will save the cycles as videos in the output directory.
    input:
        cycles(list): List of cycles
        video_name(str): Video name
    output:
        None
    """
    video_name = str(datetime.now().strftime('%Y-%m-%d-%I-%M-%S'))
    output_dir = os.path.normpath(os.path.join(
    os.path.join(os.getcwd(), "..","results","cycles_divider",video_name)))
    # Check if the directory already exists
    if not os.path.exists(output_dir):
        # Create the directory
        os.makedirs(output_dir)
    for i, cycle in enumerate(cycles):
        output_path = os.path.join(
            output_dir, video_name+"_cycle"+str(i+1)+".mp4")
        save_cycle_frames_as_video(cycle, output_path)

#### 5- Convert each cycle to sequential data

In [18]:
def get_seq_data(cycles):
    transformer = DataTransformer("bicep","angles")
    seq_data = transformer.get_sequential_data(cycles)
    return seq_data

In [19]:
def data_preprocessing(video_source):
    frames, angles = get_video_frames_and_angles(video_source)
    denoised_angles = MedianFilter(angles)
    peaks, troughs = get_peaks_and_troughs(denoised_angles)
    cycles = get_cycles(frames, peaks)
    seq_data = get_seq_data(cycles)
    return frames, angles, denoised_angles, peaks, troughs, cycles, seq_data

# -------------------------------------------------------------------------------------------------------------

## 2) Get Predictions

In [3]:
def get_predictions(predictor, seq):
    pred1 = predictor.predict_criteria1(seq)
    pred2 = predictor.predict_criteria2(seq)
    pred3 = predictor.predict_criteria3(seq)
    print("criteria1:", pred1)
    print("criteria2:", pred2)
    print("criteria3:", pred3)

In [38]:
video_source = "/media/amro/944A72844A726342/E-JUST/Mine/Year3/S1/Seminar/repo/data/external/self_collected_data/bicep/criteria_1/1/12B.mp4"
#video_source = 0
frames, angles, denoised_angles, peaks, troughs, cycles, seq_data = data_preprocessing(video_source)

I0000 00:00:1713095514.334092   11031 gl_context_egl.cc:85] Successfully initialized EGL. Major : 1 Minor: 5
I0000 00:00:1713095514.369645   15181 gl_context.cc:357] GL version: 3.2 (OpenGL ES 3.2 NVIDIA 535.171.04), renderer: NVIDIA GeForce GTX 1650/PCIe/SSE2


In [39]:
save_cycles_as_videos(cycles) #saved at results/cycles_divider

In [25]:
exercise_name = "bicep"
evaluation_type = "angles"
predictor = ModelPredictor(exercise_name, evaluation_type)

In [33]:
i = 1
for seq in seq_data:
    print("Cycle:",i)
    i+=1
    get_predictions(predictor, seq)
    print("\n")

Cycle: 1
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 63ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 57ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 46ms/step
criteria1: 0.9850567
criteria2: 0.979162
criteria3: 0.79863536


Cycle: 2
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 48ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 38ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 40ms/step
criteria1: 0.9837821
criteria2: 0.9877338
criteria3: 0.93033993


Cycle: 3
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 40ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 47ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 49ms/step
criteria1: 0.9826963
criteria2: 0.9895569
criteria3: 0.06478944


Cycle: 4
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 53ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m