In [18]:
import mediapipe as mp
import cv2
import pandas as pd
import pickle
import numpy as np
import csv

from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression, SGDClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.metrics import precision_score, accuracy_score, f1_score, recall_score, confusion_matrix
from sklearn.preprocessing import StandardScaler
from sklearn.calibration import CalibratedClassifierCV

import warnings
warnings.filterwarnings('ignore')

# Drawing helpers
mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose

### 1. Set up important landmarks and functions

#### Generate Data Frame

According to my research *the correct form* for a squat is analyzed through the position of:
- Back
- Hip
- Legs

Therefore, there will be *9 keypoints* which will be extract from mediapipe in order to train or detect a correct form of a squat:
- "NOSE",
- "LEFT_SHOULDER",
- "RIGHT_SHOULDER",
- "LEFT_HIP",
- "RIGHT_HIP",
- "LEFT_KNEE",
- "RIGHT_KNEE",
- "LEFT_ANKLE",
- "RIGHT_ANKLE"

The data frame will be saved in a .csv file.

A data frame will contains a "Label" columns which represent the label of a data point.

There are another 9 x 4 columns represent 9 features of a human pose that are important for a squat.
In that each landmark's info will be flatten

According to the [Mediapipe documentation](https://google.github.io/mediapipe/solutions/pose#python-solution-api),
Each landmark consists of the following:
- x and y: Landmark coordinates normalized to [0.0, 1.0] by the image width and height respectively.
- z: Represents the landmark depth with the depth at the midpoint of hips being the origin, and the smaller the value the closer the landmark is to the camera. The magnitude of z uses roughly the same scale as x.
- visibility: A value in [0.0, 1.0] indicating the likelihood of the landmark being visible (present and not occluded) in the image.

In [2]:
# Determine important landmarks for squat
IMPORTANT_LMS = [
    "NOSE",
    "LEFT_SHOULDER",
    "RIGHT_SHOULDER",
    "LEFT_HIP",
    "RIGHT_HIP",
    "LEFT_KNEE",
    "RIGHT_KNEE",
    "LEFT_ANKLE",
    "RIGHT_ANKLE"
]

# Generate all columns of the data frame

landmarks = ["label"] # Label column

for lm in IMPORTANT_LMS:
    landmarks += [f"{lm.lower()}_x", f"{lm.lower()}_y", f"{lm.lower()}_z", f"{lm.lower()}_v"]

In [4]:
def rescale_frame(frame, percent=50):
    '''
    Rescale a frame to a certain percentage compare to its original frame
    '''
    width = int(frame.shape[1] * percent/ 100)
    height = int(frame.shape[0] * percent/ 100)
    dim = (width, height)
    return cv2.resize(frame, dim, interpolation =cv2.INTER_AREA)
    

def init_csv(dataset_path: str):
    '''
    Create a blank csv file with just columns
    '''

    # Write all the columns to a file
    with open(dataset_path, mode="w", newline="") as f:
        csv_writer = csv.writer(f, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
        csv_writer.writerow(landmarks)

        
def export_landmark_to_csv(dataset_path: str, results, action: str) -> None:
    '''
    Export Labeled Data from detected landmark to csv
    '''
    landmarks = results.pose_landmarks.landmark
    keypoints = []

    try:
        # Extract coordinate of important landmarks
        for lm in IMPORTANT_LMS:
            keypoint = landmarks[mp_pose.PoseLandmark[lm].value]
            keypoints.append([keypoint.x, keypoint.y, keypoint.z, keypoint.visibility])
        
        keypoints = list(np.array(keypoints).flatten())

        # Insert action as the label (first column)
        keypoints.insert(0, action)

        # Append new row to .csv file
        with open(dataset_path, mode="a", newline="") as f:
            csv_writer = csv.writer(f, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
            csv_writer.writerow(keypoints)
        

    except Exception as e:
        print(e)
        pass


def concat_csv_files_with_same_headers(file_paths: list, saved_path: str):
    '''
    Concat different csv files
    '''
    all_df = []
    for path in file_paths:
        df = pd.read_csv(path, index_col=None, header=0)
        all_df.append(df)
    
    results = pd.concat(all_df, axis=0, ignore_index=True)
    results.to_csv(saved_path, sep=',', encoding='utf-8', index=False)


def describe_dataset(dataset_path: str):
    '''
    Describe dataset
    '''

    data = pd.read_csv(dataset_path)
    print(f"Headers: {list(data.columns.values)}")
    print(f'Number of rows: {data.shape[0]} \nNumber of columns: {data.shape[1]}\n')
    print(f"Labels: \n{data['label'].value_counts()}\n")
    print(f"Missing values: {data.isnull().values.any()}\n")
    duplicate = data[data.duplicated()]
    print(f"Duplicate Rows : {duplicate.sum(axis=1)}")

    return data


def round_up_metric_results(results) -> list:
    '''Round up metrics results such as precision score, recall score, ...'''
    return list(map(lambda el: round(el, 3), results))

### 2. Extract data for train set

In [17]:
DATASET_PATH = "./train.csv"

cap = cv2.VideoCapture("../data/squat/squat_right_3.mp4")
up_save_count = 0
down_save_count = 0

# init_csv(DATASET_PATH)

with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    while cap.isOpened():
        ret, image = cap.read()

        # Reduce size of a frame
        image = rescale_frame(image, 80)

        if not ret:
            break

        # Recolor image from BGR to RGB for mediapipe
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False

        results = pose.process(image)

        # Recolor image from BGR to RGB for mediapipe
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

        # Draw landmarks and connections
        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS, mp_drawing.DrawingSpec(color=(244, 117, 66), thickness=2, circle_radius=4), mp_drawing.DrawingSpec(color=(245, 66, 230), thickness=2, circle_radius=2))

        # Display the saved count
        cv2.putText(image, f"UP saved: {up_save_count}", (50, 150), cv2.FONT_HERSHEY_COMPLEX, 2, (255, 255, 255), 1, cv2.LINE_AA)
        cv2.putText(image, f"DOWN saved: {down_save_count}", (50, 200), cv2.FONT_HERSHEY_COMPLEX, 2, (255, 255, 255), 1, cv2.LINE_AA)

        cv2.imshow("CV2", image)
        # Press to extract and save to .csv
        pressed_key = cv2.waitKey(1)

        # Pressed U to label frame as UP position
        if pressed_key == 117:
            export_landmark_to_csv(DATASET_PATH, results, "up")
            up_save_count += 1

        # Pressed D to label frame as DOWN position
        if pressed_key == 100:
            export_landmark_to_csv(DATASET_PATH, results, "down")
            down_save_count += 1
        
        
        # Press Q to close cv2 window
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

    # (Optional)Fix bugs cannot close windows in MacOS (https://stackoverflow.com/questions/6116564/destroywindow-does-not-close-window-on-mac-using-python-and-opencv)
    for i in range (1, 5):
        cv2.waitKey(1)
        

name 'np' is not defined
name 'np' is not defined
name 'np' is not defined
name 'np' is not defined
name 'np' is not defined


In [16]:
df = describe_dataset(DATASET_PATH)

Headers: ['label', 'nose_x', 'nose_y', 'nose_z', 'nose_v', 'left_shoulder_x', 'left_shoulder_y', 'left_shoulder_z', 'left_shoulder_v', 'right_shoulder_x', 'right_shoulder_y', 'right_shoulder_z', 'right_shoulder_v', 'left_hip_x', 'left_hip_y', 'left_hip_z', 'left_hip_v', 'right_hip_x', 'right_hip_y', 'right_hip_z', 'right_hip_v', 'left_knee_x', 'left_knee_y', 'left_knee_z', 'left_knee_v', 'right_knee_x', 'right_knee_y', 'right_knee_z', 'right_knee_v', 'left_ankle_x', 'left_ankle_y', 'left_ankle_z', 'left_ankle_v', 'right_ankle_x', 'right_ankle_y', 'right_ankle_z', 'right_ankle_v']
Number of rows: 374 
Number of columns: 37

Labels: 
up      188
down    186
Name: label, dtype: int64

Missing values: False

Duplicate Rows : Series([], dtype: float64)


### 3. Extract data for test set 

In [14]:
TEST_DATASET_PATH = "./test.csv"

cap = cv2.VideoCapture("../data/squat/squat_test_3.mp4")
up_save_count = 0
down_save_count = 0

init_csv(TEST_DATASET_PATH)

with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    while cap.isOpened():
        ret, image = cap.read()

        if not ret:
            break
            
        # Reduce size of a frame
        image = rescale_frame(image, 80)

        # Recolor image from BGR to RGB for mediapipe
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False

        results = pose.process(image)

        # Recolor image from BGR to RGB for mediapipe
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

        # Draw landmarks and connections
        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS, mp_drawing.DrawingSpec(color=(244, 117, 66), thickness=2, circle_radius=4), mp_drawing.DrawingSpec(color=(245, 66, 230), thickness=2, circle_radius=2))

        # Display the saved count
        cv2.putText(image, f"UP saved: {up_save_count}", (50, 150), cv2.FONT_HERSHEY_COMPLEX, 2, (255, 255, 255), 1, cv2.LINE_AA)
        cv2.putText(image, f"DOWN saved: {down_save_count}", (50, 200), cv2.FONT_HERSHEY_COMPLEX, 2, (255, 255, 255), 1, cv2.LINE_AA)

        cv2.imshow("CV2", image)
        # Press to extract and save to .csv
        pressed_key = cv2.waitKey(1)

        # Pressed U to label frame as UP position
        if pressed_key == 117:
            export_landmark_to_csv(TEST_DATASET_PATH, results, "up")
            up_save_count += 1

        # Pressed D to label frame as DOWN position
        if pressed_key == 100:
            export_landmark_to_csv(TEST_DATASET_PATH, results, "down")
            down_save_count += 1
        
        
        # Press Q to close cv2 window
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

    # (Optional)Fix bugs cannot close windows in MacOS (https://stackoverflow.com/questions/6116564/destroywindow-does-not-close-window-on-mac-using-python-and-opencv)
    for i in range (1, 5):
        cv2.waitKey(1)
        

### 4. Train custom model using Scikit Learn

#### 4.1 Read and describe data

In [5]:
# Brief describe of the dataset
df = describe_dataset("./train.csv")
df.head(5)

Headers: ['label', 'nose_x', 'nose_y', 'nose_z', 'nose_v', 'left_shoulder_x', 'left_shoulder_y', 'left_shoulder_z', 'left_shoulder_v', 'right_shoulder_x', 'right_shoulder_y', 'right_shoulder_z', 'right_shoulder_v', 'left_hip_x', 'left_hip_y', 'left_hip_z', 'left_hip_v', 'right_hip_x', 'right_hip_y', 'right_hip_z', 'right_hip_v', 'left_knee_x', 'left_knee_y', 'left_knee_z', 'left_knee_v', 'right_knee_x', 'right_knee_y', 'right_knee_z', 'right_knee_v', 'left_ankle_x', 'left_ankle_y', 'left_ankle_z', 'left_ankle_v', 'right_ankle_x', 'right_ankle_y', 'right_ankle_z', 'right_ankle_v']
Number of rows: 374 
Number of columns: 37

Labels: 
up      188
down    186
Name: label, dtype: int64

Missing values: False

Duplicate Rows : Series([], dtype: float64)


Unnamed: 0,label,nose_x,nose_y,nose_z,nose_v,left_shoulder_x,left_shoulder_y,left_shoulder_z,left_shoulder_v,right_shoulder_x,...,right_knee_z,right_knee_v,left_ankle_x,left_ankle_y,left_ankle_z,left_ankle_v,right_ankle_x,right_ankle_y,right_ankle_z,right_ankle_v
0,down,0.600248,0.433268,-0.006637,0.999949,0.650759,0.517787,0.041632,0.994121,0.571711,...,-0.133012,0.976462,0.659328,0.836607,-0.294549,0.994219,0.575143,0.81438,-0.157936,0.96627
1,down,0.60016,0.449894,-0.008615,0.999966,0.651127,0.535333,0.03624,0.994432,0.571456,...,-0.115595,0.974204,0.659568,0.831231,-0.279601,0.993431,0.57656,0.812989,-0.135235,0.959162
2,down,0.599425,0.466234,-0.047636,0.999957,0.651464,0.550511,-0.021722,0.994265,0.571824,...,-0.084128,0.949108,0.659044,0.82841,-0.202085,0.987354,0.575949,0.813362,-0.053549,0.928765
3,down,0.597382,0.485779,-0.057994,0.999926,0.651386,0.571049,-0.023268,0.994848,0.572379,...,-0.069531,0.921725,0.658455,0.832319,-0.168358,0.982599,0.576511,0.813286,-0.058735,0.904242
4,down,0.594474,0.438586,-0.041261,0.999931,0.650657,0.518908,-0.003251,0.993209,0.571241,...,-0.119639,0.969358,0.658565,0.836644,-0.274596,0.994046,0.576144,0.815093,-0.11795,0.96577


In [7]:
# Extract features and class
X = df.drop("label", axis=1) # features
y = df["label"]

In [8]:
# Split train set and test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=1234)
y_test.head(3)

286    down
71       up
66       up
Name: label, dtype: object

#### 4.2 Train Machine learning model

In [9]:
algorithms =[("LR", LogisticRegression()),
         ("SVC", SVC(probability=True)),
         ('KNN',KNeighborsClassifier()),
         ("DTC", DecisionTreeClassifier()),
         ("SGDC", CalibratedClassifierCV(SGDClassifier())),
         ("NB", GaussianNB()),
         ('RF', RandomForestClassifier()),]

models = {}
final_results = []

for name, model in algorithms:
    trained_model = model.fit(X_train, y_train)
    models[name] = trained_model

    # Evaluate model
    model_results = model.predict(X_test)

    p_score = precision_score(y_test, model_results, average=None, labels=["down", "up"])
    a_score = accuracy_score(y_test, model_results)
    r_score = recall_score(y_test, model_results, average=None, labels=["down", "up"])
    f1_score_result = f1_score(y_test, model_results, average=None, labels=["down", "up"])
    cm = confusion_matrix(y_test, model_results, labels=["down", "up"])
    final_results.append(( name,  round_up_metric_results(p_score), a_score, round_up_metric_results(r_score), round_up_metric_results(f1_score_result), cm))

# Sort results by F1 score
final_results.sort(key=lambda k: sum(k[4]), reverse=True)
pd.DataFrame(final_results, columns=["Model", "Precision Score", "Accuracy score", "Recall Score", "F1 score", "Confusion Matrix"])

Unnamed: 0,Model,Precision Score,Accuracy score,Recall Score,F1 score,Confusion Matrix
0,LR,"[1.0, 1.0]",1.0,"[1.0, 1.0]","[1.0, 1.0]","[[59, 0], [0, 54]]"
1,SVC,"[1.0, 1.0]",1.0,"[1.0, 1.0]","[1.0, 1.0]","[[59, 0], [0, 54]]"
2,DTC,"[1.0, 1.0]",1.0,"[1.0, 1.0]","[1.0, 1.0]","[[59, 0], [0, 54]]"
3,SGDC,"[1.0, 1.0]",1.0,"[1.0, 1.0]","[1.0, 1.0]","[[59, 0], [0, 54]]"
4,NB,"[1.0, 1.0]",1.0,"[1.0, 1.0]","[1.0, 1.0]","[[59, 0], [0, 54]]"
5,RF,"[1.0, 1.0]",1.0,"[1.0, 1.0]","[1.0, 1.0]","[[59, 0], [0, 54]]"
6,KNN,"[0.983, 1.0]",0.99115,"[1.0, 0.981]","[0.992, 0.991]","[[59, 0], [1, 53]]"


#### 4.3 Test set evaluation

In [11]:
test_df = describe_dataset("./test.csv")

test_x = test_df.drop("label", axis=1)
test_y = test_df["label"]

Headers: ['label', 'nose_x', 'nose_y', 'nose_z', 'nose_v', 'left_shoulder_x', 'left_shoulder_y', 'left_shoulder_z', 'left_shoulder_v', 'right_shoulder_x', 'right_shoulder_y', 'right_shoulder_z', 'right_shoulder_v', 'left_hip_x', 'left_hip_y', 'left_hip_z', 'left_hip_v', 'right_hip_x', 'right_hip_y', 'right_hip_z', 'right_hip_v', 'left_knee_x', 'left_knee_y', 'left_knee_z', 'left_knee_v', 'right_knee_x', 'right_knee_y', 'right_knee_z', 'right_knee_v', 'left_ankle_x', 'left_ankle_y', 'left_ankle_z', 'left_ankle_v', 'right_ankle_x', 'right_ankle_y', 'right_ankle_z', 'right_ankle_v']
Number of rows: 86 
Number of columns: 37

Labels: 
down    49
up      37
Name: label, dtype: int64

Missing values: False

Duplicate Rows : Series([], dtype: float64)


In [13]:
testset_final_results = []

for name, model in models.items():
    # Evaluate model
    model_results = model.predict(test_x)

    p_score = precision_score(test_y, model_results, average=None, labels=["down", "up"])
    a_score = accuracy_score(test_y, model_results)
    r_score = recall_score(test_y, model_results, average=None, labels=["down", "up"])
    f1_score_result = f1_score(test_y, model_results, average=None, labels=["down", "up"])
    cm = confusion_matrix(test_y, model_results, labels=["down", "up"])
    testset_final_results.append(( name,  round_up_metric_results(p_score), a_score, round_up_metric_results(r_score), round_up_metric_results(f1_score_result), cm ))


testset_final_results.sort(key=lambda k: sum(k[4]), reverse=True)
pd.DataFrame(testset_final_results, columns=["Model", "Precision Score", "Accuracy score", "Recall Score", "F1 score", "Confusion Matrix"])

Unnamed: 0,Model,Precision Score,Accuracy score,Recall Score,F1 score,Confusion Matrix
0,LR,"[1.0, 1.0]",1.0,"[1.0, 1.0]","[1.0, 1.0]","[[49, 0], [0, 37]]"
1,SVC,"[1.0, 1.0]",1.0,"[1.0, 1.0]","[1.0, 1.0]","[[49, 0], [0, 37]]"
2,KNN,"[1.0, 1.0]",1.0,"[1.0, 1.0]","[1.0, 1.0]","[[49, 0], [0, 37]]"
3,SGDC,"[1.0, 1.0]",1.0,"[1.0, 1.0]","[1.0, 1.0]","[[49, 0], [0, 37]]"
4,RF,"[1.0, 0.86]",0.930233,"[0.878, 1.0]","[0.935, 0.925]","[[43, 6], [0, 37]]"
5,DTC,"[1.0, 0.841]",0.918605,"[0.857, 1.0]","[0.923, 0.914]","[[42, 7], [0, 37]]"
6,NB,"[0.0, 0.43]",0.430233,"[0.0, 1.0]","[0.0, 0.602]","[[0, 49], [0, 37]]"


#### 4.3. Dumped model using pickle

According to the evaluations, there are multiple good models at the moment, therefore, I will pick the Random Forrest model to use.

In [20]:
# Dump the best model to a pickle file
import pickle

with open("./model/LR_model.pkl", "wb") as f:
    pickle.dump(models["LR"], f)

In [21]:
with open("./model/KNN_model.pkl", "wb") as f:
    pickle.dump(models["KNN"], f)