# DSI #39 Capstone Project: Productivity State Developer (PSD) - Train

This code book will do the following:
1. Extract the coordinates of the landmarks of the user video.
2. Feed the coordinates into multiple classification models for both the productivity and fatigue classes.
3. Evaluate the performance of models and output the best performing one.

### Import Libraries

In [1]:
import pandas as pd
import mediapipe as mp
import cv2
from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline 
from sklearn.preprocessing import StandardScaler 

from sklearn.linear_model import LogisticRegression, RidgeClassifier
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier

from sklearn.metrics import accuracy_score

import csv #
import os
import numpy as np

import pickle

### Defining Drawing and Holistic Solutions from Mediapipe

In [2]:
# Drawing Helpers - to draw the keypoints and lines on video feed
# https://github.com/google/mediapipe/blob/master/mediapipe/python/solutions/drawing_utils.py
mp_drawing = mp.solutions.drawing_utils 

# Holistic pipeline integrates separate models for pose, face and hand components
# Each model is optimised for their particular domain
# Read more at https://google.github.io/mediapipe/solutions/holistic.html
mp_holistic = mp.solutions.holistic

### Initiate function to get the headers for the coordinate file (i.e. the x,y,z for all the landmarks)

In [4]:
cap = cv2.VideoCapture("../data/productive_1.mp4")

# Initiate holistic model - https://google.github.io/mediapipe/solutions/holistic.html
# Minimum confidence value ([0.0, 1.0]) from the person-detection model for the detection to be considered successful. Default to 0.5.
# Minimum confidence value ([0.0, 1.0]) from the landmark-tracking model for the pose landmarks to be considered tracked successfully. Default to 0.5.
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    
    while cap.isOpened():
        ret, frame = cap.read() # Read Feed

        if not ret:
            break  # If there are no more frames to read, break out of the loop

        # Recolor Feed
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
        image.flags.writeable = False # Image is no longer writeable     
        
        # Make Some Detections
        results = holistic.process(image) # Make prediction
        # print(results.face_landmarks)
        
        # face_landmarks, pose_landmarks, left_hand_landmarks, right_hand_landmarks
        
        # Recolor image back to BGR for rendering
        image.flags.writeable = True # Image is now writeable
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR CONVERSION RGB 2 BGR
        
        # 1. Draw face landmarks and face connections
        mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION, 
                                 mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1),
                                 mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                                 )
        
        # 2. Right hand
        mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                                 mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4),
                                 mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                                 )

        # 3. Left Hand
        mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                                 mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4),
                                 mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                                 )

        # 4. Pose Detections
        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS, 
                                 mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4),
                                 mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                                 )
        
        # Display the Webcam Capture window with window title and keypoints and connections drawn on the feed
        # cv2.imshow('Raw Webcam Feed', image)

        # Press q key to terminate webcam capture mode
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

# Once the q key is clicked, close the capture mode and webcam windows
cap.release()
cv2.destroyAllWindows()

# Calculate the total number of landmarks detected by Pose model and Face model
num_coords = len(results.pose_landmarks.landmark)+len(results.face_landmarks.landmark)

# Prepare the list of column names starting with prediction class, coordinates of 1st landmark, coordinates of 2nd landmark, etc
landmarks = ['class']

for val in range(1, num_coords+1):
    landmarks += ['x{}'.format(val), 'y{}'.format(val), 'z{}'.format(val), 'v{}'.format(val)]

### Define function to get coordinates from a pre-recorded video

In [6]:
def video_to_coords(cap, class_name,coords_target):

    # Initiate holistic model
    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        
        while cap.isOpened():
            ret, frame = cap.read()
            
            if not ret:
                break  # If there are no more frames to read, break out of the loop
            
            # Recolor Feed
            image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            image.flags.writeable = False        
            
            # Make Detections
            results = holistic.process(image)
            # print(results.face_landmarks)
            
            # face_landmarks, pose_landmarks, left_hand_landmarks, right_hand_landmarks
            
            # Recolor image back to BGR for rendering
            image.flags.writeable = True   
            image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
            
            # 1. Draw face landmarks
            mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION, 
                                    mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1),
                                    mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                                    )
            
            # 2. Right hand
            mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                                    mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4),
                                    mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                                    )

            # 3. Left Hand
            mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                                    mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4),
                                    mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                                    )

            # 4. Pose Detections
            mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS, 
                                    mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4),
                                    mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                                    )
            # Export coordinates
            try:
                # Extract Pose landmarks
                pose = results.pose_landmarks.landmark            
                pose_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in pose]).flatten())
                
                # Extract Face landmarks
                face = results.face_landmarks.landmark
                face_row = list(np.array([[landmark.x, landmark.y, landmark.z, landmark.visibility] for landmark in face]).flatten())
                
                # Concate rows
                row = pose_row+face_row
                
                # Append class name 
                row.insert(0, class_name)
                
                # Export to CSV
                with open(coords_target, mode='a', newline='') as f:
                    csv_writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
                    csv_writer.writerow(row)
                
            except:

                pass
                            
            cv2.imshow('Raw Webcam Feed', image)
            
            # Press q key to terminate webcam capture mode
            if cv2.waitKey(10) & 0xFF == ord('q'):
                break

    cap.release()
    cv2.destroyAllWindows()

### Get coordinates for the productivity (prod) and fatigue (fat) models

In [7]:
# Training Productive
coords_target = "coords_prod.csv"

if os.path.isfile("../data/" + coords_target):  # delete existing file if found
    os.remove(coords_target)
else:
    pass

# Initialise CSV
with open(coords_target, mode='w', newline='') as f:
    csv_writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
    csv_writer.writerow(landmarks)

cap = cv2.VideoCapture("../data/productive_1.mp4")
class_name = "productive_1"
video_to_coords(cap, class_name, coords_target)

cap = cv2.VideoCapture("../data/productive_not_1.mp4")
class_name = "productive_not"
video_to_coords(cap, class_name, coords_target)

In [8]:
# Training Fatigue
coords_target = "coords_fatigue.csv"

if os.path.isfile("../data/" + coords_target):  # delete existing file if found
    os.remove(coords_target)
else:
    pass

# Initialise CSV
with open(coords_target, mode='w', newline='') as f:
    csv_writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
    csv_writer.writerow(landmarks)

cap = cv2.VideoCapture("../data/fatigue_1.mp4")
class_name = "fatigue_1"
video_to_coords(cap, class_name, coords_target)

cap = cv2.VideoCapture("../data/fatigue_2.mp4")
class_name = "fatigue_1"
video_to_coords(cap, class_name, coords_target)

cap = cv2.VideoCapture("../data/fatigue_3.mp4")
class_name = "fatigue_1"
video_to_coords(cap, class_name, coords_target)

cap = cv2.VideoCapture("../data/fatigue_not_1.mp4")
class_name = "fatigue_not_1"
video_to_coords(cap, class_name, coords_target)

### Model Training - Read coordindates from csv file

In [9]:
# Read coordinates csv file
df_prod = pd.read_csv('coords_prod.csv')

df_prod["class"].replace("productive_1","Productive",inplace=True)
df_prod["class"].replace("productive_not","Not Productive",inplace=True)

In [10]:
# Read coordinates csv file
df_fat = pd.read_csv('coords_fatigue.csv')

df_fat["class"].replace("fatigue_1","Fatigue",inplace=True)
df_fat["class"].replace("fatigue_not_1","Not Fatigue",inplace=True)

### Model Training - Define pipeline using 4 standard classification models

In [11]:
def train_xy(x_train, y_train):
    # Build a pipeline object of different models to test
    pipelines = {
        'lr':make_pipeline(StandardScaler(), LogisticRegression()),
        'rc':make_pipeline(StandardScaler(), RidgeClassifier()),
        'rf':make_pipeline(StandardScaler(), RandomForestClassifier()),
        'gb':make_pipeline(StandardScaler(), GradientBoostingClassifier()),
    }

    # Create a dictionary to store all the pipelines after they have been fitted
    fit_models = {}

    for algo, pipeline in pipelines.items():
        
        # [Note] Feature names have also been included in the scaled data so need to use .values. 
        ## Read more at https://stackoverflow.com/questions/69326639/sklearn-warning-valid-feature-names-in-version-1-0
        model = pipeline.fit(x_train.values, y_train.values) 
        fit_models[algo] = model

    return fit_models

### Model Training

In [12]:
x = df_prod.drop('class', axis=1) # Store the Features
y = df_prod['class'] # Store the Target value (i.e. Class Name)
xprod_train, xprod_test, yprod_train, yprod_test = train_test_split(x, y, test_size=0.2, random_state=42)

model_prod = train_xy(xprod_train, yprod_train)

In [13]:
x = df_fat.drop('class', axis=1) # Store the Features
y = df_fat['class'] # Store the Target value (i.e. Class Name)
xfat_train, xfat_test, yfat_train, yfat_test = train_test_split(x, y, test_size=0.2, random_state=42)

model_fat = train_xy(xfat_train, yfat_train)

### Model Evaluation

In [15]:
# Display the model test accuracy score for all the fitted models 
for algo, model in model_prod.items():
    yhat = model.predict(xprod_test.values)
    print(algo, accuracy_score(yprod_test, yhat))

lr 1.0
rc 1.0
rf 1.0
gb 1.0


In [16]:
# Display the model test accuracy score for all the fitted models 
for algo, model in model_fat.items():
    yhat = model.predict(xfat_test.values)
    print(algo, accuracy_score(yfat_test, yhat))

lr 1.0
rc 1.0
rf 1.0
gb 1.0


### Model Export

In [17]:
# Export the chosen pipeline to a .pkl file so that trained model run faster during for real-time predictions
with open('productive.pkl', 'wb') as f:
    pickle.dump(model_prod['rf'], f)

with open('fatigue.pkl', 'wb') as f:
    pickle.dump(model_fat['rf'], f)   