In [None]:
%pip install tqdm

In [None]:
import cv2
import mediapipe as mp

In [None]:
mp_drawing = mp.solutions.drawing_utils  # Drawing helpers
mp_pose = mp.solutions.pose  # Mediapipe Solutions


In [None]:
cap = cv2.VideoCapture(0)

#pose model
with mp_pose.Pose() as pose_tracker:

    while cap.isOpened():
        ret, frame = cap.read()

        # Recolor Feed
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False

        # Make Detections
        result= pose_tracker.process(image)
        pose_landmarks = result.pose_landmarks

        # Recolor image back to BGR for rendering
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

        # 4. Pose Detections
        mp_drawing.draw_landmarks(image, result.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                                  mp_drawing.DrawingSpec(
                                      color=(245, 117, 66), thickness=2, circle_radius=4),
                                  mp_drawing.DrawingSpec(
                                      color=(245, 66, 230), thickness=2, circle_radius=2)
                                  )

        cv2.imshow('Webcam Feed', image)

        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()

In [None]:
(pose_landmarks.landmark)[0].visibility

# Capture Landmarks and Export CSV

In [None]:
import csv
import os
import numpy as np
import tqdm

In [None]:
num_of_landmarks = len(pose_landmarks.landmark)
num_of_columns = 1 + num_of_landmarks

In [None]:
landmark_names = [
        'nose',
        'left_eye_inner', 'left_eye', 'left_eye_outer',
        'right_eye_inner', 'right_eye', 'right_eye_outer',
        'left_ear', 'right_ear',
        'mouth_left', 'mouth_right',
        'left_shoulder', 'right_shoulder',
        'left_elbow', 'right_elbow',
        'left_wrist', 'right_wrist',
        'left_pinky_1', 'right_pinky_1',
        'left_index_1', 'right_index_1',
        'left_thumb_2', 'right_thumb_2',
        'left_hip', 'right_hip',
        'left_knee', 'right_knee',
        'left_ankle', 'right_ankle',
        'left_heel', 'right_heel',
        'left_foot_index', 'right_foot_index',
    ]

In [None]:
landmarks = ['class']
for i, name in enumerate(landmark_names):
    landmarks += ['{}_x'.format(name), '{}_y'.format(name), '{}_z'.format(name), '{}_v'.format(name)]
len(landmarks)

In [None]:
with open('video_coordinate.csv', mode = 'w', newline ='') as f:
    csv_writer = csv.writer(f, delimiter = ',', quotechar = '"', quoting = csv.QUOTE_MINIMAL)
    csv_writer.writerow(landmarks)

# Read data from folder

In [None]:
images_in_folder = 'yoga_poses_images_in'
images_out_folder = 'yoga_poses_images_out'

pose_class_names = [n for n in os.listdir(images_in_folder) if not n.startswith('.')]
for pose_class_name in pose_class_names:
    image_names = sorted([n for n in os.listdir(os.path.join(images_in_folder, pose_class_name)) 
                      if not n.startswith('.')])
    
    try:
        os.mkdir(os.path.join(images_out_folder, pose_class_name))
    except FileExistsError:
        pass
        
    for image_name in tqdm.tqdm(image_names, position=0):
        input_frame = cv2.imread(os.path.join(images_in_folder, pose_class_name, image_name))
        input_frame = cv2.cvtColor(input_frame, cv2.COLOR_BGR2RGB)

      # Initialize fresh pose tracker and run it.
        with mp_pose.Pose() as pose_tracker:
            result = pose_tracker.process(image=input_frame)
            pose_landmarks = result.pose_landmarks
        
    #write the data to a csv
        if pose_landmarks is not None:
            pose = pose_landmarks.landmark
            pose_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in pose]).flatten())
            pose_row.insert(0, pose_class_name)
        
            try:
                with open('coordinate.csv', mode = 'a', newline ='') as f:
                    csv_writer = csv.writer(f, delimiter = ',', quotechar = '"', quoting = csv.QUOTE_MINIMAL)
                    csv_writer.writerow(pose_row)
            except Exception as e:
                print("An error occured", e)
        
    # Save image with pose prediction (if pose was detected).
        output_frame = input_frame.copy()
        if pose_landmarks is not None:
            mp_drawing.draw_landmarks(
            image=output_frame,
            landmark_list=pose_landmarks,
            connections=mp_pose.POSE_CONNECTIONS)
        output_frame = cv2.cvtColor(output_frame, cv2.COLOR_RGB2BGR)
        cv2.imwrite(os.path.join(images_out_folder, pose_class_name ,image_name), output_frame)

# Read data from user camera and populate the csv

In [None]:
csv_file_name = "video.csv"

In [None]:
class_name = "t"

In [None]:
cap = cv2.VideoCapture(0)

#pose model
with mp_pose.Pose() as pose_tracker:

    while cap.isOpened():
        ret, frame = cap.read()

        # Recolor Feed
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False

        # Make Detections
        result= pose_tracker.process(image)
        pose_landmarks = result.pose_landmarks

        # Recolor image back to BGR for rendering
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

        # 4. Pose Detections
        mp_drawing.draw_landmarks(image, result.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                                  mp_drawing.DrawingSpec(
                                      color=(245, 117, 66), thickness=2, circle_radius=4),
                                  mp_drawing.DrawingSpec(
                                      color=(245, 66, 230), thickness=2, circle_radius=2)
                                  )
        
        try:
            pose = pose_landmarks.landmark
            pose_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in pose]).flatten())
            pose_row.insert(0, class_name)  
            
            with open(csv_file_name, mode='a', newline='') as f:
                csv_writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
                csv_writer.writerow(pose_row) 
        except:
            pass           
        
        cv2.imshow('Webcam Feed', image)

        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()

# Add video data to CSV

In [None]:
videos_in_folder = 'yoga_poses_videos_in'
videos_out_folder = 'yoga_poses_videos_out'

In [None]:
pose_class_names = [n for n in os.listdir(videos_in_folder) if not n.startswith('.')]

print(pose_class_names)

for pose_class_name in pose_class_names:
    
    video_names = sorted([n for n in os.listdir(os.path.join(videos_in_folder, pose_class_name)) 
                      if not n.startswith('.')])
    

    
    try:
        os.mkdir(os.path.join(videos_out_folder, pose_class_name))
    except FileExistsError:
        pass
   
    for video_name in tqdm.tqdm(video_names, position=0):
        
        print(f"Processing {os.path.join(videos_in_folder, pose_class_name, video_name)}")
        
        video_cap = cv2.VideoCapture(os.path.join(videos_in_folder, pose_class_name, video_name))

        # Get some video parameters to generate output video with classificaiton.
        video_n_frames = video_cap.get(cv2.CAP_PROP_FRAME_COUNT)
        video_fps = video_cap.get(cv2.CAP_PROP_FPS)
        video_width = int(video_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        video_height = int(video_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        
        #out_video = cv2.VideoWriter(os.path.join(videos_out_folder, pose_class_name, video_name[:-4]+'.avi'), cv2.VideoWriter_fourcc(*'MJPG'), video_fps, (video_width, video_height))
        output_frame = None
        
        with mp_pose.Pose() as pose_tracker:
            while True:
    # Get next frame of the video.
                success, frame = video_cap.read()
                if not success:
                    break

    # Run pose tracker.
                input_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                input_frame.flags.writeable = False
        
                result = pose_tracker.process(image=input_frame)
        
                input_frame.flags.writeable = True
                pose_landmarks = result.pose_landmarks
        
                input_frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
            
                
                try:
                    pose = pose_landmarks.landmark
                    pose_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in pose]).flatten())
                    pose_row.insert(0, pose_class_name)  
            
                    with open("video_coordinate.csv", mode='a', newline='') as f:
                        csv_writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
                        csv_writer.writerow(pose_row) 
                except:
                    pass

    # Draw pose prediction.
                #output_frame = input_frame.copy()
                #if pose_landmarks is not None:
                    #mp_drawing.draw_landmarks(
                    #image=output_frame,
                    #landmark_list=pose_landmarks,
                    #connections=mp_pose.POSE_CONNECTIONS)
            
                #out_video.write(np.array(output_frame))

In [None]:
video_path = "C:\Dev\Python\ML\MinorModel\yoga_poses_videos_in\mountain\IMG_1111.mov"

In [None]:
video_cap = cv2.VideoCapture(video_path)

video_n_frames = video_cap.get(cv2.CAP_PROP_FRAME_COUNT)
video_fps = video_cap.get(cv2.CAP_PROP_FPS)
video_width = int(video_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
video_height = int(video_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

ret, frame = video_cap.read()

out_video = cv2.VideoWriter("./yoga_poses_videos_out/test1.avi", cv2.VideoWriter_fourcc(*'MJPG'), video_fps, (video_width, video_height))
with mp_pose.Pose() as pose_tracker:
    
    while True:
        # Get next frame of the video.
        success, frame = video_cap.read()
        if not success:
            break
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False

        # Make Detections
        result= pose_tracker.process(image)
        pose_landmarks = result.pose_landmarks

        # Recolor image back to BGR for rendering
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        
        output_frame = image.copy()
        if pose_landmarks is not None:
            mp_drawing.draw_landmarks(
            image=output_frame,
            landmark_list=pose_landmarks,
            connections=mp_pose.POSE_CONNECTIONS)
            
        out_video.write(np.array(output_frame))




# Starting on the Model

In [1]:
import pandas as pd
from sklearn.model_selection import train_test_split

ModuleNotFoundError: No module named 'sklearn'

In [None]:
df = pd.read_csv('video_coordinate.csv')

In [None]:
#features
X = df.drop('class', axis = 1) 
#target
y = df['class'] 

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.3, random_state = 1234)

In [None]:
y_test

# ML pipeline

In [None]:
from sklearn.linear_model import LogisticRegression, RidgeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier

from sklearn.pipeline import make_pipeline 
from sklearn.preprocessing import StandardScaler 

In [None]:
pipelines = {
    'lr':make_pipeline(StandardScaler(), LogisticRegression()),
    'rc':make_pipeline(StandardScaler(), RidgeClassifier()),
    'rfc':make_pipeline(StandardScaler(), RandomForestClassifier()),
    'gbc':make_pipeline(StandardScaler(), GradientBoostingClassifier()),
}

In [None]:
fit_models = {}
for algo, pipeline in pipelines.items():
    model = pipeline.fit(X_train, y_train)
    fit_models[algo] = model

In [None]:
fit_models

# Evaluate Model

In [None]:
fit_models['rfc'].predict(X_test)

In [None]:
from sklearn.metrics import accuracy_score
import pickle

In [None]:
for algo, model in fit_models.items():
    yhat = model.predict(X_test)
    print(algo, accuracy_score(y_test, yhat), sep = ": ")

In [None]:
fit_models['rfc'].predict(X_test)

In [None]:
y_test

# Save Model

In [None]:
with open('mid_poses.pkl', 'wb') as f:
    pickle.dump(fit_models['rfc'], f)