## 1. Set Up
Run all the cells up to "Execute Model"

In [1]:
import collections
import os
import time
from typing import Tuple, List
import pandas as pd

from pathlib import Path

import cv2
import numpy as np
from IPython import display
import openvino as ov
from openvino.runtime.ie_api import CompiledModel

import mediapipe as mp
mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose
pose = mp_pose.Pose()

# Fetch `notebook_utils` module
import urllib.request
urllib.request.urlretrieve(
    url='https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/main/notebooks/utils/notebook_utils.py',
    filename='notebook_utils.py'
)
import notebook_utils as utils




In [2]:
# A directory where the model will be downloaded.
base_model_dir = "model"
# The name of the model from Open Model Zoo.
model_name = "action-recognition-0001"
# Selected precision (FP32, FP16, FP16-INT8).
precision = "FP16"
model_path_decoder = (
    f"model/intel/{model_name}/{model_name}-decoder/{precision}/{model_name}-decoder.xml"
)
model_path_encoder = (
    f"model/intel/{model_name}/{model_name}-encoder/{precision}/{model_name}-encoder.xml"
)
encoder_url = f"https://storage.openvinotoolkit.org/repositories/open_model_zoo/temp/{model_name}/{model_name}-encoder/{precision}/{model_name}-encoder.xml"
decoder_url = f"https://storage.openvinotoolkit.org/repositories/open_model_zoo/temp/{model_name}/{model_name}-decoder/{precision}/{model_name}-decoder.xml"

if not os.path.exists(model_path_decoder):
    utils.download_ir_model(decoder_url, Path(model_path_decoder).parent)
if not os.path.exists(model_path_encoder):
    utils.download_ir_model(encoder_url, Path(model_path_encoder).parent)

In [3]:
# Download the text from the openvino_notebooks storage
vocab_file_path = utils.download_file(
    "https://storage.openvinotoolkit.org/repositories/openvino_notebooks/data/data/text/kinetics.txt",
    directory="data"
)

with vocab_file_path.open(mode='r') as f:
    labels = [line.strip() for line in f]

print(labels[0:9], np.shape(labels))

'data\kinetics.txt' already exists.
['abseiling', 'air drumming', 'answering questions', 'applauding', 'applying cream', 'archery', 'arm wrestling', 'arranging flowers', 'assembling computer'] (400,)


In [4]:
specified_list = ['clean and jerk','throwing ball','swinging legs','stretching leg','squat','situp','side kick',
                  'push up','pull ups','snatch weight lifting','lunge','exercising with an exercise ball',
                  'exercising arm','deadlifting','yoga','stretching arm']
labels = ['no exercise' if  all(spec not in label.lower() for spec in specified_list) else label for label in labels]
labels[255] 

'pull ups'

In [5]:
import ipywidgets as widgets

core = ov.Core()
device = widgets.Dropdown(
    options=core.available_devices + ["AUTO"],
    value='AUTO',
    description='Device:',
    disabled=False,
)

device

Dropdown(description='Device:', index=2, options=('CPU', 'GPU', 'AUTO'), value='AUTO')

In [6]:
# Initialize OpenVINO Runtime.
core = ov.Core()


def model_init(model_path: str, device: str) -> Tuple:
    """
    Read the network and weights from a file, load the
    model on CPU and get input and output names of nodes

    :param:
            model: model architecture path *.xml
            device: inference device
    :retuns:
            compiled_model: Compiled model
            input_key: Input node for model
            output_key: Output node for model
    """

    # Read the network and corresponding weights from a file.
    model = core.read_model(model=model_path)
    # Compile the model for specified device.
    compiled_model = core.compile_model(model=model, device_name=device)
    # Get input and output names of nodes.
    input_keys = compiled_model.input(0)
    output_keys = compiled_model.output(0)
    return input_keys, output_keys, compiled_model

In [7]:
#Calculate Angles of Joints
def calculate_angle(a,b,c):
    a = np.array(a) # First
    b = np.array(b) # Mid
    c = np.array(c) # End
    
    radians = np.arctan2(c[1]-b[1], c[0]-b[0]) - np.arctan2(a[1]-b[1], a[0]-b[0])
    angle = np.abs(radians*180.0/np.pi)
    
    if angle >180.0:
        angle = 360-angle
        
    return angle 

In [8]:
joints_dictionary = {'push up': [['LEFT_SHOULDER','LEFT_ELBOW','LEFT_WRIST'], ['RIGHT_SHOULDER','RIGHT_ELBOW','RIGHT_WRIST'],[155, 90]],
                     'pull ups': [['LEFT_SHOULDER','LEFT_ELBOW','LEFT_WRIST'], ['RIGHT_SHOULDER','RIGHT_ELBOW','RIGHT_WRIST'],[155, 80]],
                     'situp': [['LEFT_SHOULDER','LEFT_HIP','LEFT_ANKLE'], ['RIGHT_SHOULDER','RIGHT_HIP','RIGHT_ANKLE'],[160, 130]],
                     'squat': [['LEFT_SHOULDER','LEFT_HIP','LEFT_KNEE'], ['RIGHT_SHOULDER','RIGHT_HIP','RIGHT_KNEE'],[155, 110]],
                     'snatch weight lifting': [['LEFT_WRIST','LEFT_SHOULDER','LEFT_ANKLE'], ['RIGHT_WRIST','RIGHT_SHOULDER', 'RIGHT_ANKLE'],[155, 60]],
                     'burpee': [['LEFT_HIP','LEFT_KNEE', 'LEFT_ANKLE'], ['RIGHT_HIP','RIGHT_KNEE', 'RIGHT_ANKLE'],[155, 90]]
                     }
#Returns the 3 angles that constitute a side of the body
def initialize_joints(side_list, landmarks):
    joint_coordinates = []

    for joint_name in side_list:
    # Get the index of the joint name in mp_pose.PoseLandmark enum
        joint_index = getattr(mp_pose.PoseLandmark, joint_name).value
        # Extract the x and y coordinates of the joint and append them to joint_coordinates
        joint_x = landmarks[joint_index].x
        joint_y = landmarks[joint_index].y
        joint_coordinates.append([joint_x, joint_y])

    return joint_coordinates[0], joint_coordinates[1], joint_coordinates[2]

# Customized exercise finder
def print_actual_exercise(exercie_results_df):
    label_probabilities = dict(zip(exercie_results_df['label'], exercie_results_df['probability']))

    situp = label_probabilities.get('situp', None)
    exercising_with_an_exercise_ball = label_probabilities.get('exercising with an exercise ball', None)
    throwing_ball = label_probabilities.get('throwing ball', None)
    stretching_leg = label_probabilities.get('stretching leg', None)

    squat = label_probabilities.get('squat', None)
    lunge = label_probabilities.get('lunge', None)
    snatch_weight_lifting = label_probabilities.get('snatch weight lifting', None)
    clean_and_jerk = label_probabilities.get('clean and jerk', None)

    deadlifting = label_probabilities.get('deadlifting', None)
    push_up = label_probabilities.get('push up', None)
    exercising_arm = label_probabilities.get('exercising arm', None)
    swinging_legs = label_probabilities.get('swinging legs', None)

    stretching_arm = label_probabilities.get('stretching arm', None)
    side_kick = label_probabilities.get('side kick', None)
    pull_ups = label_probabilities.get('pull ups', None)
    yoga = label_probabilities.get('yoga', None)

    current_exercise = 'no exercise'
    base_situp = situp + exercising_with_an_exercise_ball + throwing_ball + stretching_leg + yoga
    base_squad = squat + lunge + snatch_weight_lifting + situp
    base_snatch = snatch_weight_lifting + lunge + clean_and_jerk + deadlifting + squat
    base_pushup = push_up + exercising_arm + stretching_leg + swinging_legs + stretching_arm + side_kick + exercising_with_an_exercise_ball
    base_burpee = push_up + squat + exercising_arm + situp + lunge + throwing_ball

    if pull_ups >= 0.8:
        current_exercise = 'pull ups'
    #    print(f'Recognized exercie: {current_exercise} with {pull_ups}')
    elif situp >= 0.22:
        if base_situp >= 0.6:
            current_exercise = 'situp'
    #        print(f'Recognized exercie: {current_exercise} with {base_situp}')
    elif squat >= 0.3:
        if base_squad >= 0.6 and snatch_weight_lifting < 0.1:
            current_exercise = 'squat'
    #        print(f'Recognized exercie: {current_exercise} with {base_squad}')
    elif snatch_weight_lifting + clean_and_jerk + deadlifting > 0.15:
        if  base_snatch > 0.5:
            current_exercise = 'snatch weight lifting'
    #        print(f'Recognized exercie: {current_exercise} with {base_snatch}')
    elif push_up > 0.35 or stretching_leg > 0.6:
        if base_pushup > 0.7:
            current_exercise = 'push up'
    #        print(f'Recognized exercie: {current_exercise} with {base_pushup}')
    elif push_up < 0.25 and squat < 0.25 and situp < 0.25 and  base_burpee > 0.6:
        current_exercise = 'burpee'
        print(f'Recognized exercie: {current_exercise} with {base_pushup}')
    #print(f'Recognized exercie: {current_exercise}')
    return current_exercise

In [9]:
# Encoder initialization
input_key_en, output_keys_en, compiled_model_en = model_init(model_path_encoder, device.value)
# Decoder initialization
input_key_de, output_keys_de, compiled_model_de = model_init(model_path_decoder, device.value)

# Get input size - Encoder.
height_en, width_en = list(input_key_en.shape)[2:]
# Get input size - Decoder.
frames2decode = list(input_key_de.shape)[0:][1]

In [10]:
def center_crop(frame: np.ndarray) -> np.ndarray:
    """
    Center crop squared the original frame to standardize the input image to the encoder model

    :param frame: input frame
    :returns: center-crop-squared frame
    """    
    img_h, img_w, _ = frame.shape
    min_dim = min(img_h, img_w)
    start_x = int((img_w - min_dim) / 2.0)
    start_y = int((img_h - min_dim) / 2.0)
    roi = [start_y, (start_y + min_dim), start_x, (start_x + min_dim)]
    return frame[start_y : (start_y + min_dim), start_x : (start_x + min_dim), ...], roi


#Will crop and center the image on the identified body.
def center_body (frame, thres = 0.05):
    image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = pose.process(image_rgb)
    landmarks = results.pose_landmarks.landmark
    landmarks = [[lm.x, lm.y] for lm in landmarks]
    landmarks = np.array(landmarks)

    #Get Landmark X and Y
    x_coords = landmarks[:, 0]
    y_coords = landmarks[:, 1]

    # Find min and max x and y coordinates
    min_x = round(max(np.amin(x_coords) - thres, 0) * frame.shape[1])
    min_y = round(max(np.amin(y_coords) - thres, 0) * frame.shape[0])
    max_x = round(min(np.amax(x_coords) + thres, 1) * frame.shape[1])
    max_y = round(min(np.amax(y_coords) + thres, 1) * frame.shape[0])

    # Create width and height.
    w = max_x - min_x
    h = max_y - min_y

    # Determine size of the square frame
    size = max(w, h)

    # Calculate center of the bounding box
    center_x = min_x + w // 2
    center_y = min_y + h // 2

    # Calculate coordinates for cropping
    start_x = max(0, center_x - size // 2)
    start_y = max(0, center_y - size // 2)
    end_x = min(frame.shape[1], start_x + size)
    end_y = min(frame.shape[0], start_y + size)

    roi = [start_y, (start_y + end_y), start_x, (start_x + end_x)]
    cropped_image = frame[start_y:end_y, start_x:end_x]
    return cropped_image, roi


def adaptive_resize(frame: np.ndarray, size: int) -> np.ndarray:
    """
     The frame going to be resized to have a height of size or a width of size

    :param frame: input frame
    :param size: input size to encoder model
    :returns: resized frame, np.array type
    """
    h, w, _ = frame.shape
    scale = size / min(h, w)
    w_scaled, h_scaled = int(w * scale), int(h * scale)
    if w_scaled == w and h_scaled == h:
        return frame
    #return cv2.resize(frame, (w_scaled, h_scaled))
    return cv2.resize(frame, (size, size))


def decode_output(probs: np.ndarray, labels: np.ndarray, top_k: int = 3) -> np.ndarray:
    """
    Decodes top probabilities into corresponding label names

    :param probs: confidence vector for 400 actions
    :param labels: list of actions
    :param top_k: The k most probable positions in the list of labels
    :returns: decoded_labels: The k most probable actions from the labels list
              decoded_top_probs: confidence for the k most probable actions

    top_ind = np.argsort(-1 * probs)[:top_k]
    out_label = np.array(labels)[top_ind.astype(int)]
    decoded_labels = [out_label[0][0], out_label[0][1], out_label[0][2]]
    top_probs = np.array(probs)[0][top_ind.astype(int)]
    decoded_top_probs = [top_probs[0][0], top_probs[0][1], top_probs[0][2]]

    """
    # Step 1: Create a DataFrame with columns 'label' and 'probability'
    df = pd.DataFrame({'label': labels, 'probability': probs[0]})
    # Step 3: Group by 'label' with the sum of probabilities
    grouped_df = df.groupby('label')['probability'].sum().reset_index()
    current_exercise = print_actual_exercise(grouped_df)

    # Step 4: Get the top k results
    sorted_df = grouped_df.sort_values(by='probability', ascending=False).head(top_k)
    
    # Get decoded labels and probabilities
    decoded_labels = sorted_df['label'].tolist()
    decoded_top_probs = sorted_df['probability'].tolist()

    return decoded_labels, decoded_top_probs, current_exercise


def rec_frame_display(frame: np.ndarray, roi) -> np.ndarray:
    """
    Draw a rec frame over actual frame

    :param frame: input frame
    :param roi: Region of interest, image section processed by the Encoder
    :returns: frame with drawed shape

    """

    cv2.line(frame, (roi[2] + 3, roi[0] + 3), (roi[2] + 3, roi[0] + 100), (0, 200, 0), 2)
    cv2.line(frame, (roi[2] + 3, roi[0] + 3), (roi[2] + 100, roi[0] + 3), (0, 200, 0), 2)
    cv2.line(frame, (roi[3] - 3, roi[1] - 3), (roi[3] - 3, roi[1] - 100), (0, 200, 0), 2)
    cv2.line(frame, (roi[3] - 3, roi[1] - 3), (roi[3] - 100, roi[1] - 3), (0, 200, 0), 2)
    cv2.line(frame, (roi[3] - 3, roi[0] + 3), (roi[3] - 3, roi[0] + 100), (0, 200, 0), 2)
    cv2.line(frame, (roi[3] - 3, roi[0] + 3), (roi[3] - 100, roi[0] + 3), (0, 200, 0), 2)
    cv2.line(frame, (roi[2] + 3, roi[1] - 3), (roi[2] + 3, roi[1] - 100), (0, 200, 0), 2)
    cv2.line(frame, (roi[2] + 3, roi[1] - 3), (roi[2] + 100, roi[1] - 3), (0, 200, 0), 2)
    # Write ROI over actual frame
    FONT_STYLE = cv2.FONT_HERSHEY_SIMPLEX
    org = (roi[2] + 3, roi[1] - 3)
    org2 = (roi[2] + 2, roi[1] - 2)
    FONT_SIZE = 0.5
    FONT_COLOR = (0, 200, 0)
    FONT_COLOR2 = (0, 0, 0)
    cv2.putText(frame, "ROI", org2, FONT_STYLE, FONT_SIZE, FONT_COLOR2)
    cv2.putText(frame, "ROI", org, FONT_STYLE, FONT_SIZE, FONT_COLOR)
    return frame


def display_text_fnc(frame: np.ndarray, display_text: str, index: int):
    """
    Include a text on the analyzed frame

    :param frame: input frame
    :param display_text: text to add on the frame
    :param index: index line dor adding text

    """
    # Configuration for displaying images with text.
    FONT_COLOR = (0, 255, 0)
    FONT_COLOR2 = (0, 0, 0)
    FONT_STYLE = cv2.FONT_HERSHEY_DUPLEX
    FONT_SIZE = 1
    TEXT_VERTICAL_INTERVAL = 25
    TEXT_LEFT_MARGIN = 15

    text_loc = (TEXT_LEFT_MARGIN, TEXT_VERTICAL_INTERVAL * (index + 1))
    text_loc2 = (TEXT_LEFT_MARGIN + 1, TEXT_VERTICAL_INTERVAL * (index + 1) + 1)
    cv2.putText(frame, display_text, text_loc2, FONT_STYLE, FONT_SIZE, FONT_COLOR2, 2)
    cv2.putText(frame, display_text, text_loc, FONT_STYLE, FONT_SIZE, FONT_COLOR, 2)

In [11]:
def preprocessing(frame: np.ndarray, size: int) -> np.ndarray:
    """
    Preparing frame before Encoder.
    The image should be scaled to its shortest dimension at "size"
    and cropped, centered, and squared so that both width and
    height have lengths "size". The frame must be transposed from
    Height-Width-Channels (HWC) to Channels-Height-Width (CHW).

    :param frame: input frame
    :param size: input size to encoder model
    :returns: resized and cropped frame
    """
    try:
        (preprocessed, roi) = center_body(frame)
        preprocessed = adaptive_resize(preprocessed, size)

    except:
        # Adaptative resize
        preprocessed = adaptive_resize(frame, size)
        # Center_crop
        (preprocessed, roi) = center_crop(preprocessed)
        
    # Transpose frame HWC -> CHW
    preprocessed = preprocessed.transpose((2, 0, 1))[None,]  # HWC -> CHW
    return preprocessed, roi


def encoder(
    preprocessed: np.ndarray,
    compiled_model: CompiledModel
) -> List:
    """
    Encoder Inference per frame. This function calls the network previously
    configured for the encoder model (compiled_model), extracts the data
    from the output node, and appends it in an array to be used by the decoder.

    :param: preprocessed: preprocessing frame
    :param: compiled_model: Encoder model network
    :returns: encoder_output: embedding layer that is appended with each arriving frame
    """
    output_key_en = compiled_model.output(0)

    # Get results on action-recognition-0001-encoder model
    infer_result_encoder = compiled_model([preprocessed])[output_key_en]
    return infer_result_encoder


def decoder(encoder_output: List, compiled_model_de: CompiledModel) -> List:
    """
    Decoder inference per set of frames. This function concatenates the embedding layer
    froms the encoder output, transpose the array to match with the decoder input size.
    Calls the network previously configured for the decoder model (compiled_model_de), extracts
    the logits and normalize those to get confidence values along specified axis.
    Decodes top probabilities into corresponding label names

    :param: encoder_output: embedding layer for 16 frames
    :param: compiled_model_de: Decoder model network
    :returns: decoded_labels: The k most probable actions from the labels list
              decoded_top_probs: confidence for the k most probable actions
    """
    # Concatenate sample_duration frames in just one array
    decoder_input = np.concatenate(encoder_output, axis=0)
    # Organize input shape vector to the Decoder (shape: [1x16x512]]
    decoder_input = decoder_input.transpose((2, 0, 1, 3))
    decoder_input = np.squeeze(decoder_input, axis=3)
    output_key_de = compiled_model_de.output(0)
    # Get results on action-recognition-0001-decoder model
    result_de = compiled_model_de([decoder_input])[output_key_de]
    # Normalize logits to get confidence values along specified axis
    probs = softmax(result_de - np.max(result_de))
    df = pd.DataFrame({'label': labels, 'probability': probs[0]})
    grouped_df = df.groupby('label')['probability'].sum().reset_index()
    sorted_df = grouped_df.sort_values(by='probability', ascending=False)

    # Decodes top probabilities into corresponding label names
    decoded_labels, decoded_top_probs, current_exercise = decode_output(probs, labels, top_k=3)
    return decoded_labels, decoded_top_probs, current_exercise


def softmax(x: np.ndarray) -> np.ndarray:
    """
    Normalizes logits to get confidence values along specified axis
    x: np.array, axis=None
    """
    exp = np.exp(x)
    return exp / np.sum(exp, axis=None)
    

In [12]:
def run_action_recognition(
    source: str = "0",
    flip: bool = True,
    use_popup: bool = False,
    compiled_model_en: CompiledModel = compiled_model_en,
    compiled_model_de: CompiledModel = compiled_model_de,
    skip_first_frames: int = 0,
):
    size = height_en  # Encoder requiered size
    sample_duration = frames2decode  # Number of frames that decoder needs
    # Select FPS source.
    fps = 16
    player = None
    exercise_dict = {} #Store repetitions per exercise on the video
    record_video = {'no exercise':[]}

    #Mediapipe Pose Detection
    with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    
        try:
            # Create a video player.
            player = utils.VideoPlayer(source, flip=flip, fps=fps, skip_first_frames=skip_first_frames)
            # Start capturing.
            player.start()
            if use_popup:
                title = "Press ESC to Exit"
                cv2.namedWindow(title, cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)

            processing_times = collections.deque()
            processing_time = 0
            encoder_output = []
            decoded_labels = ['no exercise', 0, 0]
            actual_exercise = 'no exercise'
            decoded_top_probs = [0, 0, 0]
            counter = 0
            stage = None    #Stage inside a Cycle. Extension or Contraction


            # Create a text template to show inference results over video.
            text_inference_template = "Infer Time:{Time:.1f}ms,{fps:.1f}FPS"
            text_template = "{label},{conf:.2f}%"

            while True:
                counter = counter + 1

                # Read a frame from the video stream.
                frame = player.next()
                if frame is None:
                    print("Source ended")
                    break

                scale = 1280 / max(frame.shape)


                ####### Define Current Exercise Probabilities
                if counter % 2 == 0:
                    # Preprocess frame before Encoder.
                    (preprocessed, _) = preprocessing(frame, size)
                    #record_video['no exercise'].append(preprocessed)

                    # Measure processing time.
                    start_time = time.time()

                    # Encoder Inference per frame
                    encoder_output.append(encoder(preprocessed, compiled_model_en))

                    # Decoder inference per set of frames
                    # Wait for sample duration to work with decoder model.
                    if len(encoder_output) == sample_duration:
                        decoded_labels, decoded_top_probs, actual_exercise = decoder(encoder_output, compiled_model_de)
                        encoder_output = []

                        if actual_exercise != 'no exercise':
                            print(actual_exercise)

                    # Inference has finished. Display the results.
                    stop_time = time.time()

                    # Calculate processing time.
                    processing_times.append(stop_time - start_time)

                    # Use processing times from last 200 frames.
                    if len(processing_times) > 200:
                        processing_times.popleft()

                    # Mean processing time [ms]
                    processing_time = np.mean(processing_times) * 1000
                    fps = 1000 / processing_time

                ####### Define exercise repetitions 
                if actual_exercise == 'no exercise':
                        count_ext = 0
                        count_con = 0

                elif actual_exercise != 'no exercise':
                    # Recolor image to RGB
                    image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    image.flags.writeable = False
                
                    # Make detection
                    results = pose.process(image)
                
                    # Recolor back to BGR
                    image.flags.writeable = True
                    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

                    try:
                        landmarks = results.pose_landmarks.landmark
                        
                        # Get coordinates
                        A,B,C = initialize_joints(joints_dictionary[actual_exercise][0],landmarks)
                        D,E,F = initialize_joints(joints_dictionary[actual_exercise][1],landmarks)
                        
                        # Calculate angle
                        angle_l = calculate_angle(A, B, C)
                        angle_r = calculate_angle(D, E, F)
                        #print(f'angle izq: {angle_l}  -  angle der: {angle_r}')

                        AD = [(A[0] + D[0]) / 2, (A[1] + D[1]) / 2]
                        BE = [(B[0] + E[0]) / 2, (B[1] + E[1]) / 2]
                        CF = [(C[0] + F[0]) / 2, (C[1] + F[1]) / 2]
                        angle_mid = calculate_angle(AD, BE, CF)
                        #print(f'angle izq: {angle_l}  -  angle der: {angle_r}  -  angle_mid: {angle_mid}')

                        
                        #Special Case for Snatch (One Arm snatch)
                        if actual_exercise == 'snatch weight lifting':
                        
                            #Extension
                            if angle_mid > joints_dictionary[actual_exercise][2][0] and stage != "extension":
                                stage = "extension"
                                count_ext = 1
                                #print(f'stage: {stage}  -  angle mid: {angle_mid}')
                            
                            #Contraction
                            if angle_mid < joints_dictionary[actual_exercise][2][1] and stage !='contraction':
                                stage="contraction"
                                count_con = 1
                                #print(f'stage: {stage}  -  angle mid: {angle_mid}')

                        #Special Case for Situp (MiddlePoint)
                        elif actual_exercise == 'situp':
                        
                            #Extension
                            if angle_mid > joints_dictionary[actual_exercise][2][0] and stage != "extension":
                                stage = "extension"
                                count_ext = 1
                                #print(f'stage: {stage}  -  angle mid: {angle_mid}')
                            
                            #Contraction
                            if angle_mid < joints_dictionary[actual_exercise][2][1] and stage !='contraction':
                                stage="contraction"
                                count_con = 1
                                #print(f'stage: {stage}  -  angle mid: {angle_mid}')
                        
                        else:

                            #Extension
                            if angle_l > joints_dictionary[actual_exercise][2][0] and angle_r > joints_dictionary[actual_exercise][2][0] and stage != "extension":
                                stage = "extension"
                                count_ext = 1
                                #print(f'stage: {stage}  -  angulo izq: {angle_l}  -  angulo der: {angle_r}')
                            
                            #Contraction
                            if angle_l < joints_dictionary[actual_exercise][2][1] and angle_r < joints_dictionary[actual_exercise][2][1] and stage !='contraction':
                                stage="contraction"
                                count_con = 1
                                #print(f'stage: {stage}  -  angulo izq: {angle_l}  -  angulo der: {angle_r}')

                        #Complete Cycle, Add 1 To counter
                        if count_ext + count_con == 2:
                            count_ext = 0 
                            count_con = 0
                            if actual_exercise not in exercise_dict:
                                exercise_dict[actual_exercise] = 1
                            else:
                                exercise_dict[actual_exercise] += 1
                            print(exercise_dict)

                    except:
                        pass

                # Adaptative resize for visualization.
                if scale < 1:
                    frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)

                # Results for the customized results
                display_text_fnc(frame, f'Current Exercise: {actual_exercise}', 0)
                display_text_fnc(frame, 'REPETITIONS:', 1)
                
                print_count = 0
                for exer , reps in exercise_dict.items():
                    display_text_fnc(frame, f'{exer} : {reps}', print_count + 2)
                    print_count += 1
               
                record_video['no exercise'].append(frame)
                

                if use_popup:
                    cv2.imshow(title, frame)
                    key = cv2.waitKey(1)
                    # escape = 27
                    if key == 27:
                        break
                else:
                    _, encoded_img = cv2.imencode(".jpg", frame, params=[cv2.IMWRITE_JPEG_QUALITY, 90])
                    i = display.Image(data=encoded_img)
                    display.clear_output(wait=True)
                    display.display(i)

        except KeyboardInterrupt:
            print("Interrupted")
        except RuntimeError as e:
            print(e)
        finally:
            if player is not None:
                # Stop capturing.
                player.stop()
            if use_popup:
                cv2.destroyAllWindows()
    return exercise_dict, record_video

## 2. Execute Model
- USE_WEBCAM = False - to analyze a video in 'video_file'.
- USE_WEBCAM = True - to analyze webcam.


In [13]:
USE_WEBCAM = True

cam_id = 0
video_file = "video_path.mp4"

source = cam_id if USE_WEBCAM else video_file
additional_options = {"skip_first_frames": 0, "flip": False} if not USE_WEBCAM else {"flip": True}
exercise_dict, record = run_action_recognition(source=source, use_popup=True, **additional_options)

pull ups
{'pull ups': 1}
{'pull ups': 2}
pull ups
{'pull ups': 3}
{'pull ups': 4}
pull ups


## 3. Record video
Optional to create a video of the analyzed frames in the path output_video_path

In [15]:
import cv2
import numpy as np
fps = 16  # Adjust the frame rate as needed
output_video_path = "recorded.mp4"
#wat, channels, height, width = record['no exercise'][0].shape
height, width, channels = record['no exercise'][0].shape
video_writer = cv2.VideoWriter(output_video_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))

for frame in record['no exercise']:
    # Ensure the frame data type is uint8
    #frame = frame.reshape(3, 224, 224).transpose(1, 2, 0).astype('uint8')
    # Write the frame to the video
    video_writer.write(frame)

video_writer.release()

print("Video created successfully:", output_video_path)

Video created successfully: recorded.mp4
