In [None]:
pip install mediapipe opencv-python

## 1. Import all necessary libraries.

In [1]:
import mediapipe as mp
import numpy as np
import cv2
import csv
import os
import matplotlib.pyplot as plt

## 2. Initialize mediapipe.

In [2]:
mp_drawing = mp.solutions.drawing_utils # help in drawing on the input video/image
mp_pose = mp.solutions.pose

## 3. Check:- Mediapipe is working properly or not.

In [3]:
cap = cv2.VideoCapture(0)

with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    while cap.isOpened():
        ret, frame = cap.read()

        # Recolour feed
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame.flags.writeable = False

        # Make detections
        results = pose.process(frame)

        # Recolour back for rendering
        frame.flags.writeable = True
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)

        mp_drawing.draw_landmarks(frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                                 mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4),
                                 mp_drawing.DrawingSpec(color=(117,66,117), thickness=2, circle_radius=2))
    
        cv2.imshow('Initial Raw webcam feed',frame)
    
        if cv2.waitKey(10) & 0xFF == ord("q"):
            break

cap.release()
cv2.destroyAllWindows()



## 4. Capture Video for ML model training.

In [None]:
cap = cv2.VideoCapture(0)

height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
fps = cap.get(cv2.CAP_PROP_FPS)
videoWriter = cv2.VideoWriter('video.avi', cv2.VideoWriter_fourcc('P', 'I', 'M', '1'), fps, (int(width), int(height)))

while cap.isOpened():
    ret, frame = cap.read()

    try:
        cv2.imshow('Saving Video', frame)
        videoWriter.write(frame)
    except Exception as e:
        break

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

print("Video saved")
cap.release()
cv2.destroyAllWindows()

## 5. Capture landmarks from video and export to csv.

In [4]:
landmarks = ['class']
for val in range(1, 33+1):
    landmarks += ['x{}'.format(val), 'y{}'.format(val), 'z{}'.format(val), 'v{}'.format(val)]

In [5]:
landmarks[1:]

['x1',
 'y1',
 'z1',
 'v1',
 'x2',
 'y2',
 'z2',
 'v2',
 'x3',
 'y3',
 'z3',
 'v3',
 'x4',
 'y4',
 'z4',
 'v4',
 'x5',
 'y5',
 'z5',
 'v5',
 'x6',
 'y6',
 'z6',
 'v6',
 'x7',
 'y7',
 'z7',
 'v7',
 'x8',
 'y8',
 'z8',
 'v8',
 'x9',
 'y9',
 'z9',
 'v9',
 'x10',
 'y10',
 'z10',
 'v10',
 'x11',
 'y11',
 'z11',
 'v11',
 'x12',
 'y12',
 'z12',
 'v12',
 'x13',
 'y13',
 'z13',
 'v13',
 'x14',
 'y14',
 'z14',
 'v14',
 'x15',
 'y15',
 'z15',
 'v15',
 'x16',
 'y16',
 'z16',
 'v16',
 'x17',
 'y17',
 'z17',
 'v17',
 'x18',
 'y18',
 'z18',
 'v18',
 'x19',
 'y19',
 'z19',
 'v19',
 'x20',
 'y20',
 'z20',
 'v20',
 'x21',
 'y21',
 'z21',
 'v21',
 'x22',
 'y22',
 'z22',
 'v22',
 'x23',
 'y23',
 'z23',
 'v23',
 'x24',
 'y24',
 'z24',
 'v24',
 'x25',
 'y25',
 'z25',
 'v25',
 'x26',
 'y26',
 'z26',
 'v26',
 'x27',
 'y27',
 'z27',
 'v27',
 'x28',
 'y28',
 'z28',
 'v28',
 'x29',
 'y29',
 'z29',
 'v29',
 'x30',
 'y30',
 'z30',
 'v30',
 'x31',
 'y31',
 'z31',
 'v31',
 'x32',
 'y32',
 'z32',
 'v32',
 'x33',
 'y3

In [None]:
with open('landmarks.csv', mode='w', newline='') as f:
    csv_writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
    csv_writer.writerow(landmarks)

In [6]:
def export_landmarks(results, action):
    try:
        keypoints = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten().tolist()
        keypoints.insert(0, action)

        with open('landmarks.csv', mode='a', newline='') as f:
            csv_writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
            csv_writer.writerow(keypoints)
    
    except Exception as e:
        pass

In [7]:
results.pose_landmarks

landmark {
  x: 0.582444966
  y: 0.566214144
  z: -0.84398973
  visibility: 0.999964833
}
landmark {
  x: 0.603772283
  y: 0.496982753
  z: -0.794165671
  visibility: 0.99993
}
landmark {
  x: 0.621761
  y: 0.495295525
  z: -0.794160843
  visibility: 0.999943733
}
landmark {
  x: 0.638644457
  y: 0.494484633
  z: -0.794403195
  visibility: 0.999923
}
landmark {
  x: 0.546986818
  y: 0.504129469
  z: -0.79225111
  visibility: 0.999930859
}
landmark {
  x: 0.528844
  y: 0.507408917
  z: -0.791513383
  visibility: 0.999942601
}
landmark {
  x: 0.513014913
  y: 0.511374593
  z: -0.791813
  visibility: 0.999931037
}
landmark {
  x: 0.661555886
  y: 0.527239919
  z: -0.453322
  visibility: 0.999927521
}
landmark {
  x: 0.494069397
  y: 0.550649762
  z: -0.421676815
  visibility: 0.999957
}
landmark {
  x: 0.622580886
  y: 0.630941272
  z: -0.718244672
  visibility: 0.999945223
}
landmark {
  x: 0.548650861
  y: 0.634876966
  z: -0.708034098
  visibility: 0.999958336
}
landmark {
  x: 0.84039

## 6. Load saved video and make csv file through it.

In [None]:
cap = cv2.VideoCapture('ml.avi')
# Initiate holistic model
with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.2) as pose:
    while cap.isOpened():
        ret, frame = cap.read()

        # Recolour Feed
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame.flags.writeable = False

        # Make Detections
        results = pose.process(frame)

        # Recolour frame back to BGR for rendering
        frame.flags.writeable = True
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)

        mp_drawing.draw_landmarks(frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                                 mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4),
                                 mp_drawing.DrawingSpec(color=(117,66,117), thickness=2, circle_radius=2))

        k = cv2.waitKey(1)
        if k == ord('r'):
            export_landmarks(results, "right")
        if k == ord('l'):
            export_landmarks(results, "left")
        if k == ord('m'):
            export_landmarks(results, "mid")
        
        cv2.imshow('Raw webcam feed',frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break


cap.release()
cv2.destroyAllWindows()

## 7. Train Coustom Model (wide narrow neutral) using scikit-learn

In [7]:
import pandas as pd
import cv2
import mediapipe as mp
from sklearn.model_selection import train_test_split

from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler

from sklearn.linear_model import LogisticRegression, RidgeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier

In [8]:
df = pd.read_csv("landmarks.csv") # if you didn't changed it in video_capture.ipynb, then it is "landmarks.csv"

*7.1 Split data into x and y.*

In [9]:
x = df.drop('class', axis=1) # features
y = df['class'] # target variable

In [10]:
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size = 0.2, random_state=42)

In [11]:
pipelines = {
    'rf':make_pipeline(StandardScaler(), RandomForestClassifier()),
    'gb':make_pipeline(StandardScaler(), GradientBoostingClassifier())
}

In [12]:
fit_models = {}
for algo, pipeline in pipelines.items():
    model = pipeline.fit(x_train, y_train)
    fit_models[algo] = model

In [13]:
fit_models

{'rf': Pipeline(steps=[('standardscaler', StandardScaler()),
                 ('randomforestclassifier', RandomForestClassifier())]),
 'gb': Pipeline(steps=[('standardscaler', StandardScaler()),
                 ('gradientboostingclassifier', GradientBoostingClassifier())])}

In [14]:
fit_models['gb'].predict(x_test)

array(['right', 'right', 'right', 'left', 'right', 'left', 'left',
       'right', 'left', 'left', 'right', 'right', 'left', 'right',
       'right', 'left', 'right', 'left', 'right', 'right', 'right',
       'left', 'right', 'left', 'left', 'left', 'left', 'left', 'left',
       'right', 'left', 'right', 'left', 'left', 'left', 'left', 'left',
       'left', 'left', 'right', 'right', 'left', 'right', 'left', 'right',
       'left', 'right', 'left', 'left', 'right', 'right', 'left', 'right',
       'right', 'right'], dtype=object)

## 8. Evaluate and serialize the model

In [15]:
from sklearn.metrics import accuracy_score, precision_score, recall_score
import pickle

In [16]:
for algo, model in fit_models.items():
    yhat = model.predict(x_test)
    print(algo, accuracy_score(y_test.values, yhat),
         precision_score(y_test.values, yhat, average='weighted'),
         recall_score(y_test.values, yhat, average="weighted"))

rf 1.0 1.0 1.0
gb 1.0 1.0 1.0


In [None]:
with open("wide_narrow_neutral.pkl", "wb") as f:
    pickle.dump(fit_models['rf'], f)

*8.1 Make detections with model*

In [17]:
# Load the model.
with open("wide_narrow_neutral.pkl", "rb") as f:
    model = pickle.load(f)

In [None]:
cap = cv2.VideoCapture(0)
counter = 0
current_stage = ''

# Initiate holistic model
with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    while cap.isOpened():
        ret, frame = cap.read()

        # recolour feed
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False

        # Make detections
        results = pose.process(image)

        # Recolour image back to BGR for rendering
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                                 mp_drawing.DrawingSpec(color=(245, 117,66), thickness=2, circle_radius=4),
                                 mp_drawing.DrawingSpec(color=(117,66,117), thickness=2, circle_radius=2)
                                                       )
        try:
            row = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten().tolist()
            X = pd.DataFrame([row], columns=landmarks[1:])
            body_language_class = model.predict(X)[0]
            body_language_prob = model.predict_proba(X)[0]
            print(body_language_class, body_language_prob)

            if body_language_class == 'down' and body_language_prob[body_language_prob.argmax()] >= 0.9:
                current_stage = 'down'
            elif current_stage == 'down' and body_language_class == 'up' and body_language_prob[body_language_prob.argmax()] >= 0.9:
                current_stage = 'up'
                counter += 1
                print(current_stage)

        except Exception as e:
            print("Executing except condition")

        # Get status box
        cv2.rectangle(image, (0,0), (250,60), (245, 117, 16), -1)
    
        # Display class
        cv2.putText(image, 'CLASS',
                       (95, 12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
        cv2.putText(image, body_language_class.split(' ')[0],
                        (90, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
    
        # Display probability
        cv2.putText(image, 'PROB',
                        (15, 12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 2, cv2.LINE_AA)
        cv2.putText(image, str(round(body_language_prob[np.argmax(body_language_prob)], 2)),
                        (10, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)

        
        
        
        cv2.imshow("detection frame", image)
    
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()