In [1]:
!pip install mediapipe opencv-python pandas scikit-learn

In [25]:
import cv2
import mediapipe as mp
import numpy as np
mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose #this is where we are importing the pre-built pose detection model from mediapipe


In [27]:
from IPython.display import HTML

# Define the HTML content
html_content = '<image src="https://i.imageur.com/3j8BPdc.png" style="height:300px">'

# Display the HTML content
display(HTML(html_content))


In [28]:
def calculate_angle(a,b,c):
    a = np.array(a) # First
    b = np.array(b) # Mid
    c = np.array(c) # End
    
    radians = np.arctan2(c[1]-b[1], c[0]-b[0]) - np.arctan2(a[1]-b[1], a[0]-b[0])
    angle = np.abs(radians*180.0/np.pi)
    
    if angle >180.0:
        angle = 360-angle
        
    return angle 

In [29]:
import csv
import os
import numpy as np
from matplotlib import pyplot as plt

landmarks = ['class']
for val in range(1, 34): #because 33 points
    landmarks += ['x{}'.format(val), 'y{}'.format(val), 'z{}'.format(val), 'v{}'.format(val)]


In [30]:
landmarks[1:]

In [31]:
with open('coords.csv', mode='w', newline='') as f:
    csv_writer = csv.writer(f, delimiter=',', quotechar= '"', quoting= csv.QUOTE_MINIMAL)
    csv_writer.writerow(landmarks)


In [32]:
def export_landmark(results, action):
    try:
        keypoints = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten()
        keypoints = np.insert(keypoints, 0, action)  # Insert action at the beginning

        with open('coords.csv', mode='a', newline='') as f:
            csv_writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
            csv_writer.writerow(keypoints)
    except Exception as e:
        print('Error occurred while exporting landmarks:', e)


In [9]:
#video feed
cap = cv2.VideoCapture('squats.mp4') #this number is the number of my web cam

#setup mediapipe instance
with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    while cap.isOpened():
        ret, frame = cap.read() 
        
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) 
        image.flags.writeable = False #save a bunch of memory

        #make detections from the pose
        results = pose.process(image) #by processing it, we get our detections back
    
        #back to BGR
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)
       
        k = cv2.waitKey(1)
        if k == 117:
            export_landmark(results, '1')
        if k == 100:
            export_landmark(results, '2')
            
        cv2.imshow('Raw Webcam feed', image)
        
       # cv2.imshow('Mediapipe Feed', image)        
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()

In [10]:
cap.release()
cv2.destroyAllWindows()


In [1]:
import pandas as pd
from sklearn.model_selection import train_test_split

In [2]:
df = pd.read_csv('coords.csv')

In [3]:
df.head()

In [4]:
df.tail()


In [5]:
df[df['class'] == '1.0']

In [6]:
X = df.drop('class', axis=1)
y = df['class']


In [7]:
#train_test_split is a function from the sklearn.model_selection module that is used to
# split arrays or matrices into random train and test subsets.

#class is my target variable and I want to predict that

#X is the dataset with no class coulmn
#y extracts the targer variable class

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.3, random_state=1234)

In [8]:
y_test

In [9]:
X_test

In [10]:
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler

from sklearn.linear_model import LogisticRegression, RidgeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier

In [11]:
pipelines = {
    #StandardScaler() before the Random Forest Classifier. 
    # The StandardScaler standardizes features by removing the mean and scaling to unit variance. 
    'rf':make_pipeline(StandardScaler(), RandomForestClassifier()),
    'lr':make_pipeline(StandardScaler(), LogisticRegression()),
    'rc':make_pipeline(StandardScaler(), RidgeClassifier()),
    'gb':make_pipeline(StandardScaler(), GradientBoostingClassifier())
}

In [12]:
RandomForestClassifier().get_params()

In [13]:
fit_models = {} #create empty dictionary
for algorithm, pipeline in pipelines.items():
    model = pipeline.fit(X_train, y_train)
    fit_models[algorithm] = model #put the results of trained (fitted) models in the dictionary

In [14]:
fit_models['rf'].predict(X_test)

In [15]:
fit_models['rf'].predict(X_test)

In [16]:
fit_models['lr'].predict(X_test)

We just used the pipeline to train different models on X_train and y_train and store the collected trainer models in fit_models

Next step:
Evaluate and serialize model

In [17]:
from sklearn.metrics import accuracy_score, precision_score, recall_score #accuracy metrics
import pickle

In [18]:
yhat = model.predict(X_test)
yhat

In [19]:
for algorithm, model in fit_models.items():
    yhat = model.predict(X_test)
    print(algorithm, accuracy_score(y_test.values, yhat),
          precision_score(y_test.values, yhat, average="binary", pos_label=1),
          recall_score(y_test.values, yhat, average="binary", pos_label=2))
    

In [20]:
yhat = fit_models['rf'].predict(X_test)

In [21]:
yhat[:10]

#slicing yhat to display the first 10 elements


In [22]:
with open('squats.pkl', 'wb') as f:
    pickle.dump(fit_models['rf'], f)

#'squats.pkl' contains the serialized (pickled) 
# version of the trained Random Forest classifier (fit_models['rf'])

In [23]:
def calculate_angle(a,b,c):
    a = np.array(a) # First
    b = np.array(b) # Mid
    c = np.array(c) # End
    
    radians = np.arctan2(c[1]-b[1], c[0]-b[0]) - np.arctan2(a[1]-b[1], a[0]-b[0])
    angle = np.abs(radians*180.0/np.pi)
    
    if angle >180.0:
        angle = 360-angle
        
    return angle 

In [33]:
#video feed
cap = cv2.VideoCapture('squats.mp4') #this number is the number of my web cam

height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
fps = cap.get(cv2.CAP_PROP_FPS)
#videoWriter = cv2.VideoWriter('press2.avi', cv2.VideoWriter_fourcc(*'XVID'), fps, (int(width), int(height)))
current_stage = ''
counter = 0
stage = ''
feedback = 'he'

#setup mediapipe instance
with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    while cap.isOpened():
        ret, frame = cap.read() 
        # detect stuff and render
        # print(landmarks)
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) 
        image.flags.writeable = False #save a bunch of memory

        #make detections from the pose
        results = pose.process(image) #by processing it, we get our detections back
    
        #back to BGR
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)

        try: 
            row = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]). flatten()
            X = pd.DataFrame([row], columns=landmarks[1:])
            body_language_class = model.predict(X)[0]
            body_language_prob = model.predict_proba(X)[0]
            print(body_language_class, body_language_prob)

            #up is class 1; down is class 2

            if body_language_class == 1 and body_language_prob[body_language_prob.argmax()] >= .7:
                current_stage = 'down'
            elif current_stage == 'down' and body_language_class == 2 and body_language_prob[body_language_prob.argmax()] >= .7:
                current_stage = 'up'
                counter += 1

            if body_language_class == 1:
                stage = "up"
            elif body_language_class == 2:
                stage = "down"

            # Drawing rectangles and text on the image
            cv2.rectangle(image, (0,0), (400,60), (245,117,16), -1)
            
            # Drawing predicted class
            cv2.putText(image, "CLASS", (95,12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA)
            cv2.putText(image, str(stage), (90,40), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 2, cv2.LINE_AA)
            
            # Drawing predicted probability
            cv2.putText(image, "PROB", (15,12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA)
            cv2.putText(image, str(round(body_language_prob[np.argmax(body_language_prob)],2)), (10,40), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 2, cv2.LINE_AA)
            
            # Drawing counter
            cv2.putText(image, "COUNTER", (180,12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1, cv2.LINE_AA)
            cv2.putText(image, str(counter), (175,40), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 2, cv2.LINE_AA)

        except Exception as e:
            print(e)
            break

    
        cv2.imshow('Mediapipe Feed', image)        
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

cap.release()
videoWriter.release()
cv2.destroyAllWindows()