In [None]:
#Install the nessasary packages from pip
!pip install mediapipe opencv-python pandas scikit-learn --user

In [None]:
#Import all the nessacary libraries from the packages

#Import mediapipe to be use as the model
import mediapipe as mp
#Import opencv for rendaring and drawing capabilities
import cv2

import numpy as np #Handle numpy arrays
import pandas as pd #Handle tabular data
import csv #Handle csv files
import os #Handle folder structure
import glob
import pickle #Save and oad ML model

from sklearn.model_selection import train_test_split #Partition the data into training and testing

from sklearn.pipeline import make_pipeline #Creates a pipeline
from sklearn.preprocessing import StandardScaler #Standadize data 

#Classification algorithms
from sklearn.linear_model import LogisticRegression, RidgeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier

from sklearn.metrics import accuracy_score #Evaluate model through accuracy

In [None]:
#Helper to draw the landmarks and provide the landmark detection models
draw_helpers = mp.solutions.drawing_utils 
holistic_model = mp.solutions.holistic
# mp_pose = mp.solutions.pose

In [None]:
#Number of landmarks considered.
num_landmarks = 12

In [None]:
#Save the landmarks to a table to be exported as a csv file

#0th column of the table
table_columns = ['class']
#Add columns to the table according to the no.of landmarks
for num in range(1, num_landmarks + 1):
    table_columns += ['x{}'.format(num), 'y{}'.format(num), 'z{}'.format(num), 'v{}'.format(num)]
    
#Display columns of the table
table_columns

# Creating the csv file for store extracted data

In [None]:
csv_file_pth = 'data.csv'

#Write to the csv file
with open(csv_file_pth, mode='w', newline='') as f:
    #Define the csv writer
    csv_writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
    csv_writer.writerow(table_columns)
    print("File Created")

# Function to extract data from video and store inside the CSV

In [None]:

def video_processor(class_name, video_pth):
    #Connect the sample video from the device
    sample_video = cv2.VideoCapture(video_pth)
    processed = False


    #Load the holistic model
    with holistic_model.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:

        #Loop through each frame of the video 
        while sample_video.isOpened():
            #Returns the status of the read and the frame as an image
            status, frame = sample_video.read()

            #If frame is read correctly, status is true
            if status == False:
                break

            #Recolor the captured frame from BGR to RGB (Medipipe requies frames to be in RGB format)
            rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

            #Prevent writing and copying frame data to improve performance while making the detection
            rgb_frame.flags.writeable = False        

            #Use holistic model to make detections
            result_frame = holistic.process(rgb_frame)

            #Set frame back to writable format after detection
            rgb_frame.flags.writeable = True   

            #Recolor the captured frame from BGR for rendering with opencv
            bgr_frame = cv2.cvtColor(rgb_frame, cv2.COLOR_RGB2BGR)

            #Use pose model to detect only the landmarks of the the body and not the landmarks of the face and hand
            draw_helpers.draw_landmarks(bgr_frame, result_frame.pose_landmarks, holistic_model.POSE_CONNECTIONS, 
                                 draw_helpers.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4),
                                 draw_helpers.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2))
         

            #Export the coordinates of the landmarks to the csv file
            try:
                pose_landmarks_array = result_frame.pose_landmarks.landmark
                # Filter out only the upper body landmarks
                upper_body_landmarks = [pose_landmarks_array[i] for i in [11, 12, 13, 14, 15, 16, 23, 24, 25, 26, 27, 28]]
                # Format the upper body landmarks into a numpy array for better structuring and collapse the array to 1 dimension
                pose_landmarks_nparray = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in upper_body_landmarks]).flatten() 
                              if result_frame.pose_landmarks else np.zeros(12*4))


                #Append class name as the Oth element
                pose_landmarks_nparray.insert(0, class_name)

                #Append the data to table in the csv file
                with open(csv_file_pth, mode='a', newline='') as f:
                    csv_writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
                    csv_writer.writerow(pose_landmarks_nparray) 
                
                processed = True
            except:
                pass
            
            #Display the frames    
            cv2.imshow('Results Feed', bgr_frame)

            if cv2.waitKey(10) & 0xFF == ord('q'):
                break
    
    
    sample_video.release()
    cv2.destroyAllWindows()
    
    #If the try block is processed without any error
    if processed:
        print("Processed")

In [None]:
#Specify the location to the datasets
valid_path = "./datasets/valid/*.mp4"
invalid_path = "./datasets/invalid/*.mp4"

#Displat the no of files in the datasets
print("Valid Video Count: ", len(glob.glob(valid_path)))
print("Invalid Video Count: ", len(glob.glob(invalid_path)))

In [None]:
#Adding landmarks of the invalid dataset to the csv
class_name = "correct"
dir_size = len(glob.glob(valid_path))
for i in range (1, dir_size + 1):
    video_pth = "./datasets/valid/" + str(i) + ".mp4"
    print("Video: ", str(i), "/", str(dir_size))
    video_processor(class_name, video_pth)

In [None]:
#Adding landmarks of the invalid dataset to the csv
class_name = "Incorrect"
dir_size = len(glob.glob(invalid_path))
for i in range (1, dir_size + 1):
    video_pth = "./datasets/invalid/" + str(i) + ".mp4"
    print("Video: ", str(i), "/", str(dir_size))
    video_processor(class_name, video_pth)

# Customize data read from the csv file

In [None]:
#Import dataframe
df = pd.read_csv(csv_file_pth)

In [None]:
#Display the first 5 rows in the dataframe
df.head()

In [None]:
#Display the last 5 rows in the dataframe
df.tail()

In [None]:
#Remove the class column so the dataframe only contains features
X = df.drop('class', axis=1)
#Use the class as the target value
Y = df['class'] 

In [None]:
#Split the data with 30% for testing
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.3, random_state=1234)

# Make predictions and select the best classifier

In [None]:
#Setup the machine learning model pipelines
pipelines = {
    'lr':make_pipeline(StandardScaler(), LogisticRegression(max_iter=20000)),
    'rc':make_pipeline(StandardScaler(), RidgeClassifier()),
    'rf':make_pipeline(StandardScaler(), RandomForestClassifier()),
    'gb':make_pipeline(StandardScaler(), GradientBoostingClassifier()),
}

In [None]:
#Dictionary to store the label of the model and model after training
train_models = {}
for label, pipeline in pipelines.items():
    model = pipeline.fit(X_train.values, Y_train.values)
    train_models[label] = model

In [None]:
train_models

In [None]:
#Test the accuracies of the model to choose the best classifier
for label, model in train_models.items():
    output_class = model.predict(X_test.values)
    print(label, accuracy_score(Y_test.values, output_class))

In [None]:
#Select and dump the classifier into a pickle file
model = train_models['rf']
model

In [None]:
#Save the model as a binary file
with open('shoulder_press.pkl', 'wb') as f:
    pickle.dump(model, f)

In [None]:
#Import the model from the binary file
with open('shoulder_press.pkl', 'rb') as f:
    model = pickle.load(f)

In [None]:
model

In [None]:
#PREDICT AND DISPLAY THE RESULTS OF THE MODEL BY PASSING THE TEST VIDEO

#Connect the test video from the device
sample_video = cv2.VideoCapture('datasets/IMG_0126.MOV')

#Load the holistic model
with holistic_model.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    
    #Loop through each frame of the video 
    while sample_video.isOpened():
        #Returns the status of the read and the frame as an image
        status, frame = sample_video.read()
        
        #If frame is read correctly, status is true
        if status == False:
            print("Done")
            break
          
        #Recolor the captured frame from BGR to RGB (Medipipe requies frames to be in RGB format)
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        #Prevent writing and copying frame data to improve performance while making the detection
        rgb_frame.flags.writeable = False        
        
        #Use holistic model to make detections
        result_frame = holistic.process(rgb_frame)
        
        #Set frame back to writable format after detection
        rgb_frame.flags.writeable = True   
        
        #Recolor the captured frame from BGR for rendering with opencv
        bgr_frame = cv2.cvtColor(rgb_frame, cv2.COLOR_RGB2BGR)

        #Use pose model to detect only the landmarks of the the body and not the landmarks of the face and hand
        draw_helpers.draw_landmarks(bgr_frame, result_frame.pose_landmarks, holistic_model.POSE_CONNECTIONS, 
                                 draw_helpers.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4),
                                 draw_helpers.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                                 )
        
        #Predict the coordinates of the landmarks (resulrs screen)
        try:
            pose_landmarks_array = result_frame.pose_landmarks.landmark
            # Filter out only the upper body landmarks
            upper_body_landmarks = [pose_landmarks_array[i] for i in [11, 12, 13, 14, 15, 16, 23, 24, 25, 26, 27, 28]]
            # Format the upper body landmarks into a numpy array for better structuring and collapse the array to 1 dimension
            pose_landmarks_nparray = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in upper_body_landmarks]).flatten() 
                              if result_frame.pose_landmarks else np.zeros(12*4))
            #Pass the numpy array into a data frame
            features = pd.DataFrame([pose_landmarks_nparray])
            
            #Store the top class of the prediction
            pose_class_status = model.predict(features.values)[0]
            #Store the probability of the prediction
            pose_class_status_prob = model.predict_proba(features.values)[0]
            
            print("Class:", pose_class_status)
            print(pose_class_status_prob)
            
            #Set a rectangle box to display the results of the prediction in the video frame
            #rectangle(container, top_coord, bottom_coord, color, line_thickness)
            cv2.rectangle(bgr_frame, (0,0), (250, 60), (245, 117, 16), -1)
            
            #Display the class label inside the rectangle box
            cv2.putText(bgr_frame, 'Class'
                        , (95,12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
            
            #Extract =and display the top class of the prediction
            cv2.putText(bgr_frame, pose_class_status.split(' ')[0]
                        , (90,40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
            
            #Display the class probability inside the rectangle box
            cv2.putText(bgr_frame, 'Probability'
                        , (15,12), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
            
            #Extract and dispthe maximum probability
            cv2.putText(bgr_frame, str(round(pose_class_status_prob[np.argmax(pose_class_status_prob)],2))
                        , (10,40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
 
        except:
            pass
                        
        #Display the frames    
        cv2.imshow('Results Feed', bgr_frame)

        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

sample_video.release()
cv2.destroyAllWindows()