In [1]:
import cv2
import mediapipe as mp
import numpy as np

In [2]:
mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose

In [3]:
import csv
import os
import numpy as np
from matplotlib import pyplot as plt



In [4]:
landmarks = ['class']
for val in range(1, 33+1):
    landmarks += ['x{}'.format(val), 'y{}'.format(val), 'z{}'.format(val), 'v{}'.format(val)]

In [5]:
with open('coords.csv', mode='w', newline='') as f:
    csv_writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
    csv_writer.writerow(landmarks)

In [6]:
def export_landmark(results, action):
    try:
        keypoints = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten().tolist()
        keypoints.insert(0,action)
        
        print("Upload Succesfull", action)
        
        with open('coords.csv', mode='a', newline='') as f:
            csv_writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
            csv_writer.writerow(keypoints)
    except Exception as e:
        print("Error:", e)
        pass


# Labelling

In [7]:
import os
import pickle

# Load or initialize the last processed video index
last_processed_index_file = 'last_processed_index.pkl'
if os.path.isfile(last_processed_index_file):
    with open(last_processed_index_file, 'rb') as f:
        last_processed_index = pickle.load(f)
else:
    last_processed_index = 0

for x in range(last_processed_index + 1, 36):
    video_path = r"C:\Users\eddcr\Downloads\Fit3D Dataset\squat\squat ({}).mp4".format(x)
    
    # Check if the video file exists before processing
    if not os.path.isfile(video_path):
        print(f"Video file not found: {video_path}")
        continue

    cap = cv2.VideoCapture(video_path)
    pause = False  # Flag to indicate whether the video is paused

    # initiate holistic model
    with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
        while cap.isOpened():
            ret, frame = cap.read()

            if not ret:
                break

            # Recolor image to RGB
            image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            image.flags.writeable = False

            # Make Detection
            results = pose.process(image)

            # Recolor image to BGR
            image.flags.writeable = True
            image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

            # Render detections
            mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)

            # Add text overlay to show the currently playing video
            cv2.putText(image, f"Video: squat({x})", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
            
            k = cv2.waitKey(1)
            if k == 117:  # press u to capture upwards position
                export_landmark(results, 'up')
            if k == 100:  # press d to capture downwards position
                export_landmark(results, 'down')
            if k == 112:  # press p to toggle pause/play
                pause = not pause
            if k == 27:  # press Esc key to exit
                break
            
            if not pause:
                cv2.imshow('Mediapipe Feed', image)
                
        cap.release()
        cv2.destroyAllWindows()
    # Save the last processed index
    with open(last_processed_index_file, 'wb') as f:
        pickle.dump(x, f)
        
    if cv2.waitKey(0) & 0xFF == ord('p'):
            # If 'p' is pressed after a video, pause before proceeding to the next video
            pause = True
            
    exit_choice = input("Press 'q' to quit or any other key to continue to the next video: ")
    if exit_choice.lower() == 'q':
        break

Upload Succesfull up
Upload Succesfull up
Upload Succesfull down
Upload Succesfull up
Upload Succesfull down
Upload Succesfull up
Upload Succesfull down
Upload Succesfull up
Upload Succesfull down
Upload Succesfull up
Upload Succesfull down
Upload Succesfull up


Press 'q' to quit or any other key to continue to the next video:  


Upload Succesfull up
Upload Succesfull up
Upload Succesfull down
Upload Succesfull up
Upload Succesfull down
Upload Succesfull up
Upload Succesfull down
Upload Succesfull up
Upload Succesfull down
Upload Succesfull up
Upload Succesfull down
Upload Succesfull up


Press 'q' to quit or any other key to continue to the next video:  


Upload Succesfull up
Upload Succesfull up
Upload Succesfull down
Upload Succesfull up
Upload Succesfull down
Upload Succesfull up
Upload Succesfull down
Upload Succesfull up
Upload Succesfull down
Upload Succesfull up
Upload Succesfull down
Upload Succesfull up


Press 'q' to quit or any other key to continue to the next video:  q


# Training

In [6]:
import pandas as pd
from sklearn.model_selection import train_test_split

from sklearn.model_selection import cross_val_score, KFold

In [7]:
df = pd.read_csv('coords-cleaned.csv')
df.head()
df.tail()
df[df['class']== 'up']
x = df.drop('class', axis = 1)
y = df['class']

x_train, x_test, y_train, y_test = train_test_split(x, y, test_size= 0.3, random_state = 2)

y_test

258    down
356      up
653    down
309    down
68       up
       ... 
716      up
221      up
12     down
171    down
503      up
Name: class, Length: 228, dtype: object

In [8]:
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler

from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression, RidgeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier

In [9]:
pipelines = {
    'lr':make_pipeline(StandardScaler(), LogisticRegression()),
    'rc':make_pipeline(StandardScaler(), RidgeClassifier()),
    'rf':make_pipeline(StandardScaler(), RandomForestClassifier()),
    'gb':make_pipeline(StandardScaler(), GradientBoostingClassifier()),
    'svm': make_pipeline(StandardScaler(), SVC(kernel='linear', C=1.0))
}

In [10]:
fit_models = {}
for algo, pipeline in pipelines.items():
    model = pipeline.fit(x_train, y_train)
    fit_models[algo] = model

In [11]:
# Specify the number of folds (e.g., 5-fold cross-validation)
num_folds = 10
kf = KFold(n_splits=num_folds, shuffle=True, random_state=42)

# Loop through each model in the pipelines and perform cross-validation
for algo, pipeline in pipelines.items():
    scores = cross_val_score(pipeline, x_train, y_train, cv=kf, scoring='accuracy')
    print(f'{algo} Cross-Validation Scores: {scores}')
    print(f'{algo} Mean Accuracy: {scores.mean()}')
    print()

lr Cross-Validation Scores: [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
lr Mean Accuracy: 1.0

rc Cross-Validation Scores: [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
rc Mean Accuracy: 1.0

rf Cross-Validation Scores: [1.         1.         1.         1.         0.98113208 1.
 0.96226415 1.         1.         1.        ]
rf Mean Accuracy: 0.9943396226415094

gb Cross-Validation Scores: [1.         1.         1.         1.         1.         1.
 0.96226415 1.         0.98113208 0.98113208]
gb Mean Accuracy: 0.9924528301886794

svm Cross-Validation Scores: [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
svm Mean Accuracy: 1.0



In [12]:
fit_models

{'lr': Pipeline(steps=[('standardscaler', StandardScaler()),
                 ('logisticregression', LogisticRegression())]),
 'rc': Pipeline(steps=[('standardscaler', StandardScaler()),
                 ('ridgeclassifier', RidgeClassifier())]),
 'rf': Pipeline(steps=[('standardscaler', StandardScaler()),
                 ('randomforestclassifier', RandomForestClassifier())]),
 'gb': Pipeline(steps=[('standardscaler', StandardScaler()),
                 ('gradientboostingclassifier', GradientBoostingClassifier())]),
 'svm': Pipeline(steps=[('standardscaler', StandardScaler()),
                 ('svc', SVC(kernel='linear'))])}

In [13]:
fit_models['rc'].predict(x_test)

array(['down', 'up', 'down', 'down', 'up', 'up', 'down', 'down', 'down',
       'down', 'up', 'up', 'down', 'down', 'down', 'down', 'down', 'down',
       'up', 'up', 'up', 'up', 'down', 'up', 'down', 'up', 'up', 'down',
       'up', 'up', 'up', 'up', 'down', 'up', 'down', 'down', 'up', 'down',
       'up', 'up', 'up', 'down', 'up', 'up', 'up', 'up', 'down', 'down',
       'up', 'down', 'up', 'down', 'down', 'down', 'down', 'up', 'up',
       'down', 'down', 'up', 'down', 'down', 'up', 'down', 'down', 'up',
       'down', 'up', 'down', 'up', 'up', 'down', 'up', 'up', 'up', 'down',
       'up', 'down', 'down', 'up', 'up', 'down', 'down', 'down', 'up',
       'up', 'down', 'down', 'up', 'down', 'up', 'up', 'down', 'up',
       'down', 'down', 'down', 'up', 'down', 'down', 'up', 'down', 'down',
       'up', 'up', 'up', 'down', 'down', 'up', 'up', 'down', 'down', 'up',
       'down', 'up', 'up', 'down', 'down', 'up', 'up', 'down', 'down',
       'up', 'down', 'down', 'down', 'up', 'down', 

# Evaluate

In [14]:
from sklearn.metrics import accuracy_score, precision_score, recall_score
import pickle

In [15]:
for algo, model in fit_models.items():
    yhat = model.predict(x_test)
    print(algo, accuracy_score(y_test.values, yhat), 
        precision_score(y_test.values, yhat, average = "binary", pos_label="down"),
        recall_score(y_test.values, yhat, average="binary", pos_label="down"))

lr 1.0 1.0 1.0
rc 1.0 1.0 1.0
rf 0.9912280701754386 1.0 0.9838709677419355
gb 1.0 1.0 1.0
svm 1.0 1.0 1.0


In [16]:
yhat = fit_models['rf'].predict(x_test)

In [17]:
with open('squat.pkl', 'wb') as f:
    pickle.dump(fit_models['rf'], f)

# Detect Using Model

In [18]:
with open('squat.pkl', 'rb') as f:
    model = pickle.load(f)

In [35]:
cap = cv2.VideoCapture(2)
counter = 0
current_stage = ''

with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    while cap.isOpened():
        ret, frame = cap.read()

        #Recolor image to RGB
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False

        #Recolor image to BGR
        image.flags.writeable = True
        image = cv2.cvtColor(image,cv2.COLOR_RGB2BGR)
        
        image = cv2.rotate(image, cv2.ROTATE_90_CLOCKWISE)
        
        #Make Detection
        results = pose.process(image)
       
        #Render detections
        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)
        
        try:
            row = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten().tolist()
            X = pd.DataFrame([row], columns=landmarks[1:])
            body_language_class = model.predict(X)[0]
            body_language_prob = model.predict_proba(X)[0]
            print(body_language_class, body_language_prob)
            
            # counter
            if body_language_class == 'up' and body_language_prob[body_language_prob.argmax()] >= .7:
                current_stage = 'up'
            elif current_stage == 'up' and body_language_class == 'down' and body_language_prob[body_language_prob.argmax()] >= .7:
                current_stage="down"
                counter +=1
                print(current_stage)
            
            #Get status box
            cv2.rectangle (image, (0,0), (250, 60), (245, 117, 16), -1)
            
            #Display Class
            cv2.putText(image, 'CLASS'
                        , (95,12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,0), 1, cv2.LINE_AA)
            cv2.putText(image, body_language_class.split(' ')[0]
                        , (90,40), cv2.FONT_HERSHEY_SIMPLEX, 1,(255,255,255), 2, cv2.LINE_AA)
            
            #display probability
            cv2.putText(image, 'PROB'
                        , (15,12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,0), 1, cv2.LINE_AA)
            cv2.putText(image, str(round(body_language_prob[np.argmax(body_language_prob)],2))
                        , (10,40), cv2.FONT_HERSHEY_SIMPLEX, 1,(255,255,255), 2, cv2.LINE_AA)
            
            #display counter
            cv2.putText(image, 'COUNT'
                        , (180,12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,0), 1, cv2.LINE_AA)
            cv2.putText(image, str(counter)
                        , (175,40), cv2.FONT_HERSHEY_SIMPLEX, 1,(255,255,255), 2, cv2.LINE_AA)
            
        except Exception as e:
            print("Error")
            pass
        
        cv2.imshow('Mediapipe Feed', image)

        if cv2.waitKey(10) & 0xFF == ord ('q'):
            break
        
cap.release()
cv2.destroyAllWindows()

Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
Error
up [0.14 0.86]
up [0.11 0.89]
up [0.1 0.9]
up [0.03 0.97]
up [0.02 0.98]
up [0.02 0.98]
up [0.03 0.97]
up [0.06 0.94]
up [0.07 0.93]
up [0.07 0.93]
up [0.08 0.92]
up [0.08 0.92]
up [0.08 0.92]
up [0.14 0.86]
up [0.17 0.83]
up [0.07 0.93]
up [0.08 0.92]
up [0.07 0.93]
up [0.07 0.93]
up [0.07 0.93]
up [0.15 0.85]
up [0.16 0.84]
up [0.09 0.91]
up [0.07 0.93]
up [0.16 0.84]
up [0.15 0.85]
up [0.12 0.88]
up [0.16 0.84]
down [0.71 0.29]
down
down [0.75 0.25]
down [0.99 0.01]
down [1. 0.]
down [1. 0.]
down [1. 0.]
down [1. 0.]
down [1. 0.]
down [1. 0.]
down [1. 0.]
down [1. 0.]
down [1. 0.]
down [0.93 0.07]
down [0.6 0.4]
up [0.19 0.81]
up [0.18 0.82]
up [0