# ETL for Classification model

Necesitamos un dataset para entrenar el modelo de clasificación así que en este notebook crearemos el procedimiento para crear datasets de poses.

In [2]:
import os
workpath = 'C:/Users/Bruno/TFM/Tareas'
os.chdir(workpath)
import warnings
warnings.filterwarnings("ignore")

In [3]:
import cv2
import time
import urllib
import re
import mediapipe as mp
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from PIL import Image
from IPython.display import Image as IPyImage
from ultralytics import YOLO
from matplotlib import pyplot as plt
mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose

In [4]:

def create_angle(a, b, c):
    A = np.array(a)
    B = np.array(b)
    C = np.array(c)
    # Vectors BA y BC
    BA = A - B
    BC = C - B
    # Dot product between vectors BA y BC
    dot_product = np.dot(BA, BC)
    # Vector norms BA y BC
    magnitude_BA = np.linalg.norm(BA)
    magnitude_BC = np.linalg.norm(BC)
    # Angle between vectors' cosine
    cos_theta = dot_product / (magnitude_BA * magnitude_BC)
    # Angle in radions
    theta_radians = np.arccos(cos_theta)
    # Angle in degrees
    theta_degrees = np.degrees(theta_radians)
    return theta_degrees


def retrieve_data_video(path):
    cap = cv2.VideoCapture(path)
    landmarks_raw = []
    angles = {'elbow_l':[], 'elbow_r':[],
              'shoulder_l': [], 'shoulder_r':[],
              'hip_l': [], 'hip_r': [],
              'knee_l': [], 'knee_r': []}
    ## Setup mediapipe instance
    with mp_pose.Pose(min_detection_confidence=0.7 , min_tracking_confidence=0.7) as pose:
        while cap.isOpened():
            ret, frame = cap.read()
        
            # Recolor image to RGB
            if ret == True:
                image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                image.flags.writeable = False
          
                # Make detection
                results = pose.process(image)
                try:
                    landmarks_frame = results.pose_landmarks.landmark
                    landmarks_raw.append(landmarks_frame)
                    left_wrist = [landmarks_frame[mp_pose.PoseLandmark.LEFT_WRIST.value].x, landmarks_frame[mp_pose.PoseLandmark.LEFT_WRIST.value].y,
                                  landmarks_frame[mp_pose.PoseLandmark.LEFT_WRIST.value].z]
                    left_elbow = [landmarks_frame[mp_pose.PoseLandmark.LEFT_ELBOW.value].x, landmarks_frame[mp_pose.PoseLandmark.LEFT_ELBOW.value].y,
                                  landmarks_frame[mp_pose.PoseLandmark.LEFT_ELBOW.value].z]
                    left_shoulder = [landmarks_frame[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x, landmarks_frame[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y,
                                     landmarks_frame[mp_pose.PoseLandmark.LEFT_SHOULDER.value].z]
                    left_hip = [landmarks_frame[mp_pose.PoseLandmark.LEFT_HIP.value].x, landmarks_frame[mp_pose.PoseLandmark.LEFT_HIP.value].y,
                                landmarks_frame[mp_pose.PoseLandmark.LEFT_HIP.value].z]
                    left_knee = [landmarks_frame[mp_pose.PoseLandmark.LEFT_KNEE.value].x, landmarks_frame[mp_pose.PoseLandmark.LEFT_KNEE.value].y,
                                 landmarks_frame[mp_pose.PoseLandmark.LEFT_KNEE.value].z]
                    left_ankle = [landmarks_frame[mp_pose.PoseLandmark.LEFT_ANKLE.value].x, landmarks_frame[mp_pose.PoseLandmark.LEFT_ANKLE.value].y,
                                  landmarks_frame[mp_pose.PoseLandmark.LEFT_ANKLE.value].z]
                    right_wrist = [landmarks_frame[mp_pose.PoseLandmark.RIGHT_WRIST.value].x, landmarks_frame[mp_pose.PoseLandmark.RIGHT_WRIST.value].y,
                                  landmarks_frame[mp_pose.PoseLandmark.RIGHT_WRIST.value].z]
                    right_elbow = [landmarks_frame[mp_pose.PoseLandmark.RIGHT_ELBOW.value].x, landmarks_frame[mp_pose.PoseLandmark.RIGHT_ELBOW.value].y,
                                  landmarks_frame[mp_pose.PoseLandmark.RIGHT_ELBOW.value].z]
                    right_shoulder = [landmarks_frame[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x, landmarks_frame[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y,
                                     landmarks_frame[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].z]
                    right_hip = [landmarks_frame[mp_pose.PoseLandmark.RIGHT_HIP.value].x, landmarks_frame[mp_pose.PoseLandmark.RIGHT_HIP.value].y,
                                 landmarks_frame[mp_pose.PoseLandmark.RIGHT_HIP.value].z]
                    right_knee = [landmarks_frame[mp_pose.PoseLandmark.RIGHT_KNEE.value].x, landmarks_frame[mp_pose.PoseLandmark.RIGHT_KNEE.value].y,
                                  landmarks_frame[mp_pose.PoseLandmark.RIGHT_KNEE.value].z]
                    right_ankle = [landmarks_frame[mp_pose.PoseLandmark.RIGHT_ANKLE.value].x, landmarks_frame[mp_pose.PoseLandmark.RIGHT_ANKLE.value].y,
                                   landmarks_frame[mp_pose.PoseLandmark.RIGHT_ANKLE.value].z]
                    
                    angles['elbow_l'].append(create_angle(left_wrist, left_elbow, left_shoulder))
                    angles['elbow_r'].append(create_angle(right_wrist, right_elbow, right_shoulder))
                    angles['shoulder_l'].append(create_angle(left_elbow, left_shoulder, left_hip))
                    angles['shoulder_r'].append(create_angle(right_elbow, right_shoulder, right_hip))
                    angles['hip_l'].append(create_angle(left_shoulder, left_hip, left_knee))
                    angles['hip_r'].append(create_angle(right_shoulder, right_hip, right_knee))
                    angles['knee_l'].append(create_angle(left_hip, left_knee, left_ankle))
                    angles['knee_r'].append(create_angle(right_hip, right_knee, right_ankle))
                except:
                    pass
                # Recolor back to BGR
                image.flags.writeable = True
                image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        
                # Render detections
                mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                                        mp_drawing.DrawingSpec(color=(148,7,240), thickness=2, circle_radius=2), 
                                        mp_drawing.DrawingSpec(color=(149,158,0), thickness=2, circle_radius=2) 
                                         )               
        
                cv2.imshow('Mediapipe Feed', image)

                if cv2.waitKey(25) == ord('q'):
                    break
            else:
                break
        cap.release()
        cv2.destroyAllWindows()
    return landmarks_raw, angles

def landmarks_to_dataframe(rawdata, angles):
    landmarks_dict = {
    'NOSE_x' : [],
    'NOSE_y' : [],
    'NOSE_z' : [],
    'LEFT_SHOULDER_x' : [],
    'LEFT_SHOULDER_y' : [],
    'LEFT_SHOULDER_z' : [],
    'RIGHT_SHOULDER_x' : [],
    'RIGHT_SHOULDER_y' : [],
    'RIGHT_SHOULDER_z' : [],
    'LEFT_ELBOW_x' : [],
    'LEFT_ELBOW_y' : [],
    'LEFT_ELBOW_z' : [],
    'RIGHT_ELBOW_x' : [],
    'RIGHT_ELBOW_y' : [],
    'RIGHT_ELBOW_z' : [],
    'LEFT_WRIST_x' : [],
    'LEFT_WRIST_y' : [],
    'LEFT_WRIST_z' : [],
    'RIGHT_WRIST_x' : [],
    'RIGHT_WRIST_y' : [],
    'RIGHT_WRIST_z' : [],
    'LEFT_PINKY_x' : [],
    'LEFT_PINKY_y' : [],
    'LEFT_PINKY_z' : [],
    'RIGHT_PINKY_x' : [],
    'RIGHT_PINKY_y' : [],
    'RIGHT_PINKY_z' : [],
    'LEFT_INDEX_x' : [],
    'LEFT_INDEX_y' : [],
    'LEFT_INDEX_z' : [],
    'RIGHT_INDEX_x' : [],
    'RIGHT_INDEX_y' : [],
    'RIGHT_INDEX_z' : [],
    'LEFT_THUMB_x' : [],
    'LEFT_THUMB_y' : [],
    'LEFT_THUMB_z' : [],
    'RIGHT_THUMB_x' : [],
    'RIGHT_THUMB_y' : [],
    'RIGHT_THUMB_z' : [],
    'LEFT_HIP_x' : [],
    'LEFT_HIP_y' : [],
    'LEFT_HIP_z' : [],
    'RIGHT_HIP_x' : [],
    'RIGHT_HIP_y' : [],
    'RIGHT_HIP_z' : [],
    'LEFT_KNEE_x' : [],
    'LEFT_KNEE_y' : [],
    'LEFT_KNEE_z' : [],
    'RIGHT_KNEE_x' : [],
    'RIGHT_KNEE_y' : [],
    'RIGHT_KNEE_z' : [],
    'LEFT_ANKLE_x' : [],
    'LEFT_ANKLE_y' : [],
    'LEFT_ANKLE_z' : [],
    'RIGHT_ANKLE_x' : [],
    'RIGHT_ANKLE_y' : [],
    'RIGHT_ANKLE_z' : [],
    'LEFT_ELBOW_ANGLE': [],
    'RIGHT_ELBOW_ANGLE': [],
    'LEFT_SHOULDER_ANGLE': [],
    'RIGHT_SHOULDER_ANGLE': [],
    'LEFT_HIP_ANGLE': [],
    'RIGHT_HIP_ANGLE': [],
    'LEFT_KNEE_ANGLE': [],
    'RIGHT_KNEE_ANGLE': []
    }
    for index in range(len(rawdata)):
        landmarks_dict['NOSE_x'].append(rawdata[index][mp_pose.PoseLandmark.NOSE.value].x)
        landmarks_dict['NOSE_y'].append(1-rawdata[index][mp_pose.PoseLandmark.NOSE.value].y)
        landmarks_dict['NOSE_z'].append(rawdata[index][mp_pose.PoseLandmark.NOSE.value].z)
        landmarks_dict['LEFT_SHOULDER_x'].append(rawdata[index][mp_pose.PoseLandmark.LEFT_SHOULDER.value].x)
        landmarks_dict['LEFT_SHOULDER_y'].append(1-rawdata[index][mp_pose.PoseLandmark.LEFT_SHOULDER.value].y)
        landmarks_dict['LEFT_SHOULDER_z'].append(rawdata[index][mp_pose.PoseLandmark.LEFT_SHOULDER.value].z)
        landmarks_dict['RIGHT_SHOULDER_x'].append(rawdata[index][mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x)
        landmarks_dict['RIGHT_SHOULDER_y'].append(1-rawdata[index][mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y)
        landmarks_dict['RIGHT_SHOULDER_z'].append(rawdata[index][mp_pose.PoseLandmark.RIGHT_SHOULDER.value].z)
        landmarks_dict['LEFT_ELBOW_x'].append(rawdata[index][mp_pose.PoseLandmark.LEFT_ELBOW.value].x)
        landmarks_dict['LEFT_ELBOW_y'].append(1-rawdata[index][mp_pose.PoseLandmark.LEFT_ELBOW.value].y)
        landmarks_dict['LEFT_ELBOW_z'].append(rawdata[index][mp_pose.PoseLandmark.LEFT_ELBOW.value].z)
        landmarks_dict['RIGHT_ELBOW_x'].append(rawdata[index][mp_pose.PoseLandmark.RIGHT_ELBOW.value].x)
        landmarks_dict['RIGHT_ELBOW_y'].append(1-rawdata[index][mp_pose.PoseLandmark.RIGHT_ELBOW.value].y)
        landmarks_dict['RIGHT_ELBOW_z'].append(rawdata[index][mp_pose.PoseLandmark.RIGHT_ELBOW.value].z)
        landmarks_dict['LEFT_WRIST_x'].append(rawdata[index][mp_pose.PoseLandmark.LEFT_WRIST.value].x)
        landmarks_dict['LEFT_WRIST_y'].append(1-rawdata[index][mp_pose.PoseLandmark.LEFT_WRIST.value].y)
        landmarks_dict['LEFT_WRIST_z'].append(rawdata[index][mp_pose.PoseLandmark.LEFT_WRIST.value].z)
        landmarks_dict['RIGHT_WRIST_x'].append(rawdata[index][mp_pose.PoseLandmark.RIGHT_WRIST.value].x)
        landmarks_dict['RIGHT_WRIST_y'].append(1-rawdata[index][mp_pose.PoseLandmark.RIGHT_WRIST.value].y)
        landmarks_dict['RIGHT_WRIST_z'].append(rawdata[index][mp_pose.PoseLandmark.RIGHT_WRIST.value].z)
        landmarks_dict['LEFT_PINKY_x'].append(rawdata[index][mp_pose.PoseLandmark.LEFT_PINKY.value].x)
        landmarks_dict['LEFT_PINKY_y'].append(1-rawdata[index][mp_pose.PoseLandmark.LEFT_PINKY.value].y)
        landmarks_dict['LEFT_PINKY_z'].append(rawdata[index][mp_pose.PoseLandmark.LEFT_PINKY.value].z)
        landmarks_dict['RIGHT_PINKY_x'].append(rawdata[index][mp_pose.PoseLandmark.RIGHT_PINKY.value].x)
        landmarks_dict['RIGHT_PINKY_y'].append(1-rawdata[index][mp_pose.PoseLandmark.RIGHT_PINKY.value].y)
        landmarks_dict['RIGHT_PINKY_z'].append(rawdata[index][mp_pose.PoseLandmark.RIGHT_PINKY.value].z)
        landmarks_dict['LEFT_INDEX_x'].append(rawdata[index][mp_pose.PoseLandmark.LEFT_INDEX.value].x)
        landmarks_dict['LEFT_INDEX_y'].append(1-rawdata[index][mp_pose.PoseLandmark.LEFT_INDEX.value].y)
        landmarks_dict['LEFT_INDEX_z'].append(rawdata[index][mp_pose.PoseLandmark.LEFT_INDEX.value].z)
        landmarks_dict['RIGHT_INDEX_x'].append(rawdata[index][mp_pose.PoseLandmark.RIGHT_INDEX.value].x)
        landmarks_dict['RIGHT_INDEX_y'].append(1-rawdata[index][mp_pose.PoseLandmark.RIGHT_INDEX.value].y)
        landmarks_dict['RIGHT_INDEX_z'].append(rawdata[index][mp_pose.PoseLandmark.RIGHT_INDEX.value].z)
        landmarks_dict['LEFT_THUMB_x'].append(rawdata[index][mp_pose.PoseLandmark.LEFT_THUMB.value].x)
        landmarks_dict['LEFT_THUMB_y'].append(1-rawdata[index][mp_pose.PoseLandmark.LEFT_THUMB.value].y)
        landmarks_dict['LEFT_THUMB_z'].append(rawdata[index][mp_pose.PoseLandmark.LEFT_THUMB.value].z)
        landmarks_dict['RIGHT_THUMB_x'].append(rawdata[index][mp_pose.PoseLandmark.RIGHT_THUMB.value].x)
        landmarks_dict['RIGHT_THUMB_y'].append(1-rawdata[index][mp_pose.PoseLandmark.RIGHT_THUMB.value].y)
        landmarks_dict['RIGHT_THUMB_z'].append(rawdata[index][mp_pose.PoseLandmark.RIGHT_THUMB.value].z)
        landmarks_dict['LEFT_HIP_x'].append(rawdata[index][mp_pose.PoseLandmark.LEFT_HIP.value].x)
        landmarks_dict['LEFT_HIP_y'].append(1-rawdata[index][mp_pose.PoseLandmark.LEFT_HIP.value].y)
        landmarks_dict['LEFT_HIP_z'].append(rawdata[index][mp_pose.PoseLandmark.LEFT_HIP.value].z)
        landmarks_dict['RIGHT_HIP_x'].append(rawdata[index][mp_pose.PoseLandmark.RIGHT_HIP.value].x)
        landmarks_dict['RIGHT_HIP_y'].append(1-rawdata[index][mp_pose.PoseLandmark.RIGHT_HIP.value].y)
        landmarks_dict['RIGHT_HIP_z'].append(rawdata[index][mp_pose.PoseLandmark.RIGHT_HIP.value].z)
        landmarks_dict['LEFT_KNEE_x'].append(rawdata[index][mp_pose.PoseLandmark.LEFT_KNEE.value].x)
        landmarks_dict['LEFT_KNEE_y'].append(1-rawdata[index][mp_pose.PoseLandmark.LEFT_KNEE.value].y)
        landmarks_dict['LEFT_KNEE_z'].append(rawdata[index][mp_pose.PoseLandmark.LEFT_KNEE.value].z)
        landmarks_dict['RIGHT_KNEE_x'].append(rawdata[index][mp_pose.PoseLandmark.RIGHT_KNEE.value].x)
        landmarks_dict['RIGHT_KNEE_y'].append(1-rawdata[index][mp_pose.PoseLandmark.RIGHT_KNEE.value].y)
        landmarks_dict['RIGHT_KNEE_z'].append(rawdata[index][mp_pose.PoseLandmark.RIGHT_KNEE.value].z)
        landmarks_dict['LEFT_ANKLE_x'].append(rawdata[index][mp_pose.PoseLandmark.LEFT_ANKLE.value].x)
        landmarks_dict['LEFT_ANKLE_y'].append(1-rawdata[index][mp_pose.PoseLandmark.LEFT_ANKLE.value].y)
        landmarks_dict['LEFT_ANKLE_z'].append(rawdata[index][mp_pose.PoseLandmark.LEFT_ANKLE.value].z)
        landmarks_dict['RIGHT_ANKLE_x'].append(rawdata[index][mp_pose.PoseLandmark.RIGHT_ANKLE.value].x)
        landmarks_dict['RIGHT_ANKLE_y'].append(1-rawdata[index][mp_pose.PoseLandmark.RIGHT_ANKLE.value].y)
        landmarks_dict['RIGHT_ANKLE_z'].append(rawdata[index][mp_pose.PoseLandmark.RIGHT_ANKLE.value].z)
        landmarks_dict['LEFT_ELBOW_ANGLE'].append(angles['elbow_l'][index])
        landmarks_dict['RIGHT_ELBOW_ANGLE'].append(angles['elbow_r'][index])
        landmarks_dict['LEFT_SHOULDER_ANGLE'].append(angles['shoulder_l'][index])
        landmarks_dict['RIGHT_SHOULDER_ANGLE'].append(angles['shoulder_r'][index])
        landmarks_dict['LEFT_HIP_ANGLE'].append(angles['hip_l'][index])
        landmarks_dict['RIGHT_HIP_ANGLE'].append(angles['hip_r'][index])
        landmarks_dict['LEFT_KNEE_ANGLE'].append(angles['knee_l'][index])
        landmarks_dict['RIGHT_KNEE_ANGLE'].append(angles['knee_r'][index])
    return pd.DataFrame(landmarks_dict)
    
def minmaxing_rows(df):
    standarized_df = df.subtract(
        df.min(axis=1), axis=0).divide(
        df.max(axis=1) - df.min(axis=1), axis=0).combine_first(df)
    return standarized_df


def standarize_coordinates(dataframe):
    sorted_cols = list(dataframe.columns)
    x_coord = dataframe[[xcol for xcol in dataframe.columns if '_x' in xcol]]
    y_coord = dataframe[[ycol for ycol in dataframe.columns if '_y' in ycol]]
    z_coord = dataframe[[zcol for zcol in dataframe.columns if '_z' in zcol]]
    angles = dataframe[[anglecol for anglecol in dataframe.columns if '_ANGLE' in anglecol]]
    
    x_coord_ref = x_coord.sub(x_coord['NOSE_x'], axis=0)
    y_coord_ref = y_coord.sub(y_coord['NOSE_y'], axis=0)
    z_coord_ref = z_coord.sub(z_coord['NOSE_z'], axis=0)

    minmaxed_x_coord = minmaxing_rows(x_coord_ref)
    minmaxed_y_coord = minmaxing_rows(y_coord_ref)
    minmaxed_z_coord = minmaxing_rows(z_coord_ref)

    df_scaled_xy = minmaxed_x_coord.join(minmaxed_y_coord, how='inner')
    df_scaled_xyz = df_scaled_xy.join(minmaxed_z_coord, how='inner')
    df_scaled = df_scaled_xyz.join(angles, how='inner')
    
    return df_scaled[sorted_cols]

def excercise_type(path):
    video = path.split('/')[-1].split('.')[0]
    excercise = re.sub(r'[\d_]+$', '', video)
    label_dict = {'Dominadas': 0, 'Fondos': 1,
                  'Flexiones': 2, 'Sentadillas': 3, 
                  'Standup': 4}
    return label_dict[excercise]

def pipeline_video_csv(input_path, output_path):
    try:
        excercise = excercise_type(input_path)
        landmarks_video, angles = retrieve_data_video(input_path)
        landmarks_df = landmarks_to_dataframe(landmarks_video, angles)
        standarized_data = standarize_coordinates(landmarks_df)
        standarized_data['label'] = [excercise for x in range(len(standarized_data))]
        standarized_data.to_csv(output_path, sep=';')
    except Exception as e:
        print('There was an error with video in path {0}'.format(input_path))
        print(e)



In [8]:
vid_list = os.listdir(workpath+'/VideosTR/VideosAug')
abs_paths_vids = [workpath+'/VideosTR/VideosAug/'+x for x in vid_list if '.' in x]

In [9]:
abs_paths_csv_vids = [workpath+'/VideosTR/Data2model/'+x for x in vid_list if '.' in x]
abs_paths_csv = []
for path in abs_paths_csv_vids:
    abs_paths_csv.append(path.split('.')[0]+'.csv')
abs_paths_csv

['C:/Users/Bruno/TFM/Tareas/VideosTR/Data2model/Flexiones10.csv',
 'C:/Users/Bruno/TFM/Tareas/VideosTR/Data2model/Flexiones10_0.csv',
 'C:/Users/Bruno/TFM/Tareas/VideosTR/Data2model/Flexiones10_1.csv',
 'C:/Users/Bruno/TFM/Tareas/VideosTR/Data2model/Flexiones10_2.csv',
 'C:/Users/Bruno/TFM/Tareas/VideosTR/Data2model/Flexiones10_3.csv',
 'C:/Users/Bruno/TFM/Tareas/VideosTR/Data2model/Flexiones7.csv',
 'C:/Users/Bruno/TFM/Tareas/VideosTR/Data2model/Flexiones7_0.csv',
 'C:/Users/Bruno/TFM/Tareas/VideosTR/Data2model/Flexiones7_1.csv',
 'C:/Users/Bruno/TFM/Tareas/VideosTR/Data2model/Flexiones7_2.csv',
 'C:/Users/Bruno/TFM/Tareas/VideosTR/Data2model/Flexiones7_3.csv',
 'C:/Users/Bruno/TFM/Tareas/VideosTR/Data2model/Flexiones8.csv',
 'C:/Users/Bruno/TFM/Tareas/VideosTR/Data2model/Flexiones8_0.csv',
 'C:/Users/Bruno/TFM/Tareas/VideosTR/Data2model/Flexiones8_1.csv',
 'C:/Users/Bruno/TFM/Tareas/VideosTR/Data2model/Flexiones8_2.csv',
 'C:/Users/Bruno/TFM/Tareas/VideosTR/Data2model/Flexiones8_3.cs

In [10]:
for index_path in range(len(abs_paths_vids)):
    pipeline_video_csv(abs_paths_vids[index_path], abs_paths_csv[index_path])