# Squat Validator

## 1) Imports and functions to utilize pose-estimator

In [1]:
import cv2
import numpy as np
import os
import tensorflow as tf
import mediapipe as mp



In [2]:
#Initializing Media pipe model and drawing tools
mp_holistic = mp.solutions.holistic # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities
mp_drawing_styles = mp.solutions.drawing_styles

In [3]:
# Function to detect key points using mediapipe
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False                  # Image is no longer writeable
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True                   # Image is now writeable
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    return image, results

In [4]:
# Function to draw landmarks of detected keypoints for visualization
def draw_styled_landmarks(image, results):
    # Draw face connections
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION,
                             mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1),
                             mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                             )
    # Draw pose connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                             landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style()
                             )
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4),
                             mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                             )
    # Draw right hand connections
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4),
                             mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                             )

In [5]:
#Fucntion to resize input video to fit screen dimensions
def scale_video(scale_percent):
    width = int(image.shape[1] * scale_percent / 100)
    height = int(image.shape[0] * scale_percent / 100)
    dimensions = (width, height)
    return dimensions

In [6]:
# Function to extract body key-points detected
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    return pose

## 2) Creating Features and Labels for the Model

In [7]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [8]:
# Actions we want to predict
Squat_result = np.array(['Valid', 'Invalid'])

In [9]:
# 240 videos
# 120 each for valid and invalid squats
no_of_vids = 120

In [10]:
# Path for exported data, numpy arrays
VALID_SAVE_PATH = os.path.join('Squat_Data')

In [11]:
label_map = {label:num for num, label in enumerate(Squat_result)}

In [12]:
# initializing two arrays to hold features(sequences) and labels(labels)
sequences, labels = [], []
max_frame_num = 0

for action in Squat_result:
    for sequence in range(no_of_vids):
        counter = True
        window = []
        frame_no = 0
        while counter:
            try:
                res = np.load(os.path.join(VALID_SAVE_PATH, action, str(sequence), "{}.npy".format(frame_no)))
                print(sequence, action, frame_no)
                frame_no+=1
                if frame_no>max_frame_num:
                    max_frame_num=frame_no
                window.append(res)
            except:
                break

        sequences.append(window)
        labels.append(label_map[action])

In [13]:
# Each feature in sequences holds a shape of (X,132)
# X being the number of frames of the video
# As we are using an LSTM model it requires all input to be the same size
# We are effectively doing that by utilizing the keras pad_sequences function
# Which pads all the features to the same size as the longest video
# Which is a 191 frames in our scenario

X = tf.keras.preprocessing.sequence.pad_sequences(sequences)

In [14]:
# We now curretly have 240 features(videos)
# Each of the shape of 191(frames) and 132(key-points)

X.shape

(240, 0)

In [15]:
y = to_categorical(labels).astype(int)
y.shape

(240, 2)

In [16]:
# Saving Features and Labels for later use
np.save("Features",X)
np.save("Labels",y)

## 3) Creating, Training and Testing the Model

In [17]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

In [18]:
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import TensorBoard

In [19]:
# Setting callbacks for tensorboard
log_dir = os.path.join('Logs_Retrain')
tb_callback = TensorBoard(log_dir=log_dir)

In [20]:
# Basic LSTM model for demonstartion purposes

model = Sequential()
model.add(LSTM(64, return_sequences=False, activation='relu', input_shape=(191,132)))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(Squat_result.shape[0], activation='sigmoid'))

In [22]:
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm (LSTM)                 (None, 64)                50432     
                                                                 
 dense (Dense)               (None, 64)                4160      
                                                                 
 dense_1 (Dense)             (None, 32)                2080      
                                                                 
 dense_2 (Dense)             (None, 2)                 66        
                                                                 
Total params: 56738 (221.63 KB)
Trainable params: 56738 (221.63 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [25]:
model.fit(X_train, y_train, epochs=100, validation_data=(X_test, y_test), callbacks=[tb_callback])

Epoch 1/100


ValueError: in user code:

    File "/Users/anerisheth/Library/Python/3.9/lib/python/site-packages/keras/src/engine/training.py", line 1401, in train_function  *
        return step_function(self, iterator)
    File "/Users/anerisheth/Library/Python/3.9/lib/python/site-packages/keras/src/engine/training.py", line 1384, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "/Users/anerisheth/Library/Python/3.9/lib/python/site-packages/keras/src/engine/training.py", line 1373, in run_step  **
        outputs = model.train_step(data)
    File "/Users/anerisheth/Library/Python/3.9/lib/python/site-packages/keras/src/engine/training.py", line 1150, in train_step
        y_pred = self(x, training=True)
    File "/Users/anerisheth/Library/Python/3.9/lib/python/site-packages/keras/src/utils/traceback_utils.py", line 70, in error_handler
        raise e.with_traceback(filtered_tb) from None
    File "/Users/anerisheth/Library/Python/3.9/lib/python/site-packages/keras/src/engine/input_spec.py", line 298, in assert_input_compatibility
        raise ValueError(

    ValueError: Input 0 of layer "sequential" is incompatible with the layer: expected shape=(None, 191, 132), found shape=(32, 0)


### 3.1) Saving model weights

In [25]:
model.save('model.h5')

In [28]:
saved_model = load_model('model.h5')

## 4) Evaluation of Model

In [29]:
from sklearn.metrics import accuracy_score, confusion_matrix

In [30]:
yhat = saved_model.predict(X_test)

In [31]:
ytrue = np.argmax(y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()

In [32]:
confusion_matrix(ytrue, yhat)

array([[17,  6],
       [21,  4]], dtype=int64)

In [33]:
accuracy_score(ytrue, yhat)

0.4375

## 5) Making predictions to new videos

In [34]:
# Array is used to pad the recieved sequence to the shape the Deep Learning Model Accepts
longest_sequence = np.load("Longest_Sequence.npy")

In [40]:
# New detection variables
sequence_for_prediction = [longest_sequence]

window = []

# Capturing Video Stream
cap = cv2.VideoCapture(r'your video path here')


# Set mediapipe model
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():

        # Read feed
        ret, frame = cap.read()
        #Close window at end of video
        if not ret:
            break

        # Make detections
        image, results = mediapipe_detection(frame, holistic)

        # Draw landmarks
        draw_styled_landmarks(image, results)
        
        dim = scale_video(30)
        # Show to screen

        image = cv2.resize(image, dim, interpolation = cv2.INTER_AREA)


        cv2.imshow('OpenCV Feed', image)

        # Prediction logic
        keypoints = extract_keypoints(results)
        window.append(keypoints)
        
        

        

    cap.release()
    cv2.destroyAllWindows()

    sequence_for_prediction.append(window)
    padded_sequence = tf.keras.preprocessing.sequence.pad_sequences(sequence_for_prediction, maxlen=191)
    seq_to_predict = padded_sequence[1]       


In [24]:
print (seq_to_predict)

NameError: name 'seq_to_predict' is not defined

In [41]:
# Predicting the squat validity
res = model.predict(np.expand_dims(seq_to_predict, axis=0))
print (Squat_result[np.argmax(res)])

Valid
