# 1 - Setup

In [2]:
import cv2
import argparse
import mediapipe as mp
import numpy as np
import time

import csv
import os
from matplotlib import pyplot as plt

import pandas as pd
import pickle

In [3]:
mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose

# (2) - Test the pose algorithm
Don't use if video already recorded

In [3]:
cap = cv2.VideoCapture(0)

In [3]:
with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    # Loop through frames
    while cap.isOpened():
        ret, frame = cap.read()
            
        if ret:
            # Recolor image to RGB
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  # mediapipe needs RGB
            frame.flags.writeable = False
            # Make detection
            results = pose.process(frame)   # Store detections in result
            # Recolor back to BGR
            frame.flags.writeable = True
            frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)


            mp_drawing.draw_landmarks(frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                                        mp_drawing.DrawingSpec(color=(245,177,66), thickness=2, circle_radius=2), # landmark drawing spec
                                        mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)  # connection drawing spec
                                        )
            # Extract landmarks
            try:
                landmarks = results.pose_landmarks.landmark
            except:
                pass
            
            # Get coordinates of joints
            r_shoulder_xy = [landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y]
            r_elbow_xy = [landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value].y]
            r_wrist_xy = [landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value].y]
            # Calculate angle between joints
            # angle = get_angle(r_shoulder_xy, r_elbow_xy, r_wrist_xy)

            cv2.imshow('Inference', frame)
            if cv2.waitKey(10) & 0xFF == ord('q'):
                break

    cap.release()
    cv2.destroyAllWindows()

# (3) - Record and save video
Don't use if video already recorded

In [7]:
cap = cv2.VideoCapture(0)

height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
fps = cap.get(cv2.CAP_PROP_FPS)
data_folder = '../data/'
video_name = 'tester.mp4'
# create the full path to the video file
video_path = os.path.join(data_folder, video_name)
# make sure the folder exists - if not create it.
if not os.path.exists(data_folder):
    os.makedirs(data_folder)

# create the video writer
videoWriter = cv2.VideoWriter(video_path, cv2.VideoWriter_fourcc('P','I', 'M', '1'), fps, (int(width), int(height)))

# start the webcam
while cap.isOpened():
    ret, frame = cap.read()

    if ret:
        videoWriter.write(frame)
        cv2.imshow('frame', frame)

        # press q to quit (and save video)
        if cv2.waitKey(1) & 0xFF == ord('q'):   
            break
    else:
        break

# release the capture and video writer. Close windows.
cap.release()
videoWriter.release()
cv2.destroyAllWindows()

# [4] - Capture Landmarks & Export to CSV
Generating training data by annotating video

From this we capture landmarks from a video and write it to a csv file by annotating up, transition, down state of a deadlift.

In [64]:
csv_file_name = 'coords.csv'

In [65]:
landmarks = ['Class']
# Add coordinates (x, y, z) for all landmarks and visibility flag (v)
for val in range(1, 33+1):
    landmarks += [f'x{val}', f'y{val}', f'z{val}', f'v{val}']

In [66]:
# landmarks[1:]

In [67]:
with open(csv_file_name, mode='w', newline='') as f:
    csv_writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
    csv_writer.writerow(landmarks)

In [68]:
def export_landmark(result, action):
    try:
        keypoints = np.array([[res.x, res.y, res.z, res.visibility] for res in result.pose_landmarks.landmark]).flatten().tolist()  # Extract pose landmarks
        keypoints.insert(0, action)  # Insert class number to list
        # Export to CSV
        with open(csv_file_name, mode='a', newline='') as f:
            csv_writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
            csv_writer.writerow(keypoints)
    except Exception as e:
        pass

In [69]:
cap = cv2.VideoCapture('../data/tester.mp4')
# Initiate holistic model
with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    while cap.isOpened():
        ret, frame = cap.read()

        # Recolor image to RGB
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False

        # Make detection
        results = pose.process(image)

        # Recolor back to BGR
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

        # Extract landmarks
        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                                    mp_drawing.DrawingSpec(color=(245, 177, 66), thickness=2, circle_radius=4),
                                    mp_drawing.DrawingSpec(color=(245, 66, 230), thickness=2, circle_radius=2)
                                    )
        k = cv2.waitKey(1)
        if k == 117:    # u - press 'u' when in upright state
            export_landmark(results, 'up')
        if k == 116:  # t - press 't' when in transition state
            export_landmark(results, 'transition')
        if k == 100:  # d - press 'd' when in down state
            export_landmark(results, 'down')

        # resize image
        scale_percent = 50 # percent of original size
        width = int(image.shape[1] * scale_percent / 100)
        height = int(image.shape[0] * scale_percent / 100)
        dim = (width, height)
        resized = cv2.resize(image, dim, interpolation = cv2.INTER_AREA)
        
        cv2.imshow('Raw webcam feed', resized)

        if cv2.waitKey(5) & 0xFF == ord('q'):
            break
cap.release()
cv2.destroyAllWindows()

# [5] - Train custom model using Scikit Learn
Based on the landmarks and annotation we just did we train and evaluate multiple classification models using Scikit Learn.
Finally we use pickle to save the desired model for later use.

Read collected data and process

In [70]:
from sklearn.model_selection import train_test_split

In [71]:
df = pd.read_csv(csv_file_name)

In [72]:
# df.head()

In [73]:
# df.tail()

In [74]:
X = df.drop('Class', axis=1) # Features
y = df['Class'] # Target variable

In [75]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=0)

In [76]:
# y_test

Train ML classification model

In [77]:
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler

from sklearn.linear_model import LogisticRegression, RidgeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier

In [78]:
pipielines = {
    'lr':make_pipeline(StandardScaler(), LogisticRegression()),
    'rc':make_pipeline(StandardScaler(), RidgeClassifier()),
    'rf':make_pipeline(StandardScaler(), RandomForestClassifier()),
    'gb':make_pipeline(StandardScaler(), GradientBoostingClassifier()),
}

In [79]:
fit_models = {}
for algo, pipeline in pipielines.items():
    model = pipeline.fit(X_train, y_train)
    fit_models[algo] = model

STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(


In [80]:
fit_models

{'lr': Pipeline(steps=[('standardscaler', StandardScaler()),
                 ('logisticregression', LogisticRegression())]),
 'rc': Pipeline(steps=[('standardscaler', StandardScaler()),
                 ('ridgeclassifier', RidgeClassifier())]),
 'rf': Pipeline(steps=[('standardscaler', StandardScaler()),
                 ('randomforestclassifier', RandomForestClassifier())]),
 'gb': Pipeline(steps=[('standardscaler', StandardScaler()),
                 ('gradientboostingclassifier', GradientBoostingClassifier())])}

In [81]:
#fit_models['rf'].predict(X_test)

Evaluate and serialize model

In [82]:
from sklearn.metrics import accuracy_score, precision_score, recall_score

In [83]:
for algo, model in fit_models.items():
    yhat = model.predict(X_test)
    print(algo)
    print('---')
    print(f'Accuracy: {accuracy_score(y_test, yhat):.2f}')
    print(f'Precision: {precision_score(y_test, yhat, average="micro"):.2f}')
    print(f'Recall: {recall_score(y_test, yhat, average="micro"):.2f}')
    print()

lr
---
Accuracy: 0.98
Precision: 0.98
Recall: 0.98

rc
---
Accuracy: 0.99
Precision: 0.99
Recall: 0.99

rf
---
Accuracy: 0.99
Precision: 0.99
Recall: 0.99

gb
---
Accuracy: 0.99
Precision: 0.99
Recall: 0.99



In [85]:
test_model = 'rf'
yhat = fit_models[test_model].predict(X_test)

print("Model being tested: ", test_model)
print("--- Prediction ---")
print(yhat[:10])
print("--- True values ---")
print(y_test[:10])

Model being tested:  rf
--- Prediction ---
['transition' 'transition' 'transition' 'down' 'up' 'down' 'up' 'down'
 'up' 'up']
--- True values ---
172    transition
466    transition
196    transition
416          down
533            up
210          down
489            up
236          down
318            up
162            up
Name: Class, dtype: object


In [86]:
model_dir_name = '../models/deadlift_trainer_model.pkl'

In [87]:
with open(model_dir_name, 'wb') as f:   # save / write model
    pickle.dump(fit_models['rf'], f)

# 6 - Make detections with model

We load the desired model and use it to detect the classified stages in a deadlift.

In [88]:
with open(model_dir_name, 'rb') as f:   # Load / read model (model_dir_name is the path to the model... the variable and used before in training the model but another can be used)
    pickle.load(f)

In [89]:
# X = pd.DataFrame([row], columns=landmarks[1:])

In [94]:
cap = cv2.VideoCapture('../data/tester.mp4') # can also test with webcam: cap = cv2.VideoCapture(0)
counter = 0
current_state = ''
# Initiate holistic model
with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:

    while cap.isOpened():
        ret, frame = cap.read()

        # Recolor image to RGB
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False

        # Make detection
        results = pose.process(image)

        # Recolor back to BGR
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

        # Extract landmarks
        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                                    mp_drawing.DrawingSpec(color=(245, 177, 66), thickness=2, circle_radius=4),
                                    mp_drawing.DrawingSpec(color=(245, 66, 230), thickness=2, circle_radius=2)
                                    )
        try:
            row = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten().tolist()  # Extract pose landmarks
            X = pd.DataFrame([row], columns=landmarks[1:])
            body_language_class = model.predict(X)[0]
            body_language_prob = model.predict_proba(X)[0]
            
            if body_language_class == 'down' and body_language_prob[body_language_prob.argmax()] >= 0.7:
                current_state = 'down'
            # elif body_language_class == 'transition' and body_language_prob[body_language_prob.argmax()] >= 0.7:
            #     current_state = 'transition'
            elif current_state == 'down' and body_language_class == 'up' and body_language_prob[body_language_prob.argmax()] >= 0.7:
                current_state = 'up'
                counter += 1
           # print(current_state)

            # Get status box
            cv2.rectangle(image, (0,0), (350, 60), (245, 117, 16), -1)

            # Display Class
            cv2.putText(image, 'CLASS'
                        , (95,12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,0), 1, cv2.LINE_AA)
            cv2.putText(image, body_language_class.split(' ')[0]
                        , (90,40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
            
            # Display Probability
            cv2.putText(image, 'PROB'
                        , (15,12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,0), 1, cv2.LINE_AA)
            cv2.putText(image, str(round(body_language_prob[body_language_prob.argmax()],2))
                        , (10,40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
            
            # Display conter
            cv2.putText(image, 'COUNTER'
                        , (260,12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,0), 1, cv2.LINE_AA)
            cv2.putText(image, str(counter)
                        , (255,40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
            
        except Exception as e:
            pass
        
        # resize image
        scale_percent = 50 # percent of original size
        width = int(image.shape[1] * scale_percent / 100)
        height = int(image.shape[0] * scale_percent / 100)
        dim = (width, height)
        resized = cv2.resize(image, dim, interpolation = cv2.INTER_AREA)

        cv2.imshow('Camera / video feed', resized)

        if cv2.waitKey(5) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()