In [None]:
# continuous consecutive in a row unbroken
# 6x speedup

# 1. Import and Install Dependencies

In [None]:
# !pip install tensorflow==2.4.1 tensorflow-gpu==2.4.1 opencv-python mediapipe sklearn matplotlib

In [1]:
import cv2, os, time, math, csv
import numpy as np; import matplotlib.pyplot as plt
import mediapipe as mp
from scipy import stats
from csv import writer

from sklearn.model_selection import train_test_split
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score

import tensorflow as tf; import tensorflow.keras as tfk
from tensorflow import nn
from tensorflow.keras import layers
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import TensorBoard

# CONST, VARIABLES

In [2]:
LANDMARK_TYPES = ['pose'] # ~
# LANDMARK_TYPES = ['face', 'pose', 'left_hand', 'right_hand']

num_pose_landmarks = 33
num_face_landmarks = 468
num_hand_landmarks = 21
pose_landmark_num_vals = 4
face_landmark_num_vals = 3
hand_landmark_num_vals = 3

num_features = 0
if ('pose' in LANDMARK_TYPES):
    num_features += num_pose_landmarks*pose_landmark_num_vals
if ('face' in LANDMARK_TYPES):
    num_features += num_face_landmarks*face_landmark_num_vals
if ('left_hand' in LANDMARK_TYPES):
    num_features += num_hand_landmarks*hand_landmark_num_vals
if ('right_hand' in LANDMARK_TYPES):
    num_features += num_hand_landmarks*hand_landmark_num_vals

# Path for exported data, numpy arrays
DATA_PATH = os.path.join('DATA') 

# ACTIONS that we try to detect. ~
ACTIONS = {
    'PushUp': {'ANGLE': [70, 130], 
               'JOINTS': [['right_shoulder', 'right_elbow', 'right_wrist'],
                          ['left_shoulder', 'left_elbow', 'left_wrist']]},
    'Squat': {'ANGLE': [90, 160],
              'JOINTS': [['right_hip', 'right_knee', 'right_ankle'],
                         ['left_hip', 'left_knee', 'left_ankle']]},
    'ReverseCrunch': {'ANGLE': [80, 110],
                      'JOINTS': [['right_shoulder', 'right_hip', 'right_knee'],
                                 ['left_shoulder', 'left_hip', 'left_knee']]},
    'Lunge': {'ANGLE': [90, 160],
              'JOINTS': [['right_hip', 'right_knee', 'right_ankle'],
                         ['left_hip', 'left_knee', 'left_ankle']]},
    'JumpingJacks': {'ANGLE': [30, 90],
                     'JOINTS': [['right_elbow', 'right_shoulder', 'right_hip'],
                                ['left_elbow', 'left_shoulder', 'left_hip']]},
    'Standing': {'ANGLE': [None, None],
                 'JOINTS': [[None, None, None],
                            [None, None, None]]},
}

# 2. Keypoints using MP Holistic

In [3]:
mp_holistic = mp.solutions.holistic # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities

In [4]:
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image = cv2.flip(image, 1)
    image.flags.writeable = False                  # Image is no longer writeable
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True                   # Image is now writeable 
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    return image, results

In [5]:
def draw_landmarks(image, results, LANDMARK_TYPES:list):
    # _CONNECTIONS: Relationship between all of the different component
    # within the detection set. Which landmarks between each set of models 
    # actually connect to each other
    
    if ('face' in LANDMARK_TYPES):
        mp_drawing.draw_landmarks(image, results.face_landmarks,
                                mp_holistic.FACEMESH_CONTOURS) # Draw face connections
    if ('pose' in LANDMARK_TYPES):
        mp_drawing.draw_landmarks(image, results.pose_landmarks,
                                mp_holistic.POSE_CONNECTIONS) # Draw pose connections
    if ('left_hand' in LANDMARK_TYPES):
        mp_drawing.draw_landmarks(image, results.left_hand_landmarks,
                                mp_holistic.HAND_CONNECTIONS) # Draw left hand connections
    if ('right_hand' in LANDMARK_TYPES):
        mp_drawing.draw_landmarks(image, results.right_hand_landmarks,
                                mp_holistic.HAND_CONNECTIONS) # Draw right hand connections

In [6]:
def create_color():
    color = []
    for i in range(3):
        channel = np.random.randint(0, 256)
        color.append(channel)
    return color

In [7]:
color_landmarks = {}
if ('face' in LANDMARK_TYPES):
    color_landmarks['face'] = {}
if ('right_hand' in LANDMARK_TYPES):
    color_landmarks['right_hand'] = {}
if ('left_hand' in LANDMARK_TYPES):
    color_landmarks['left_hand'] = {}
if ('pose' in LANDMARK_TYPES):
    color_landmarks['pose'] = {}
    
if ('face' in LANDMARK_TYPES):
    color_landmarks['face']['keypoints'] = create_color()
    color_landmarks['face']['connections'] = create_color()
if ('right_hand' in LANDMARK_TYPES):
    color_landmarks['right_hand']['keypoints'] = create_color()
    color_landmarks['right_hand']['connections'] = create_color()
if ('left_hand' in LANDMARK_TYPES):
    color_landmarks['left_hand']['keypoints'] = create_color()
    color_landmarks['left_hand']['connections'] = create_color()
if ('pose' in LANDMARK_TYPES):
    color_landmarks['pose']['keypoints'] = create_color()
    color_landmarks['pose']['connections'] = create_color()

def draw_styled_landmarks(image, results):
    if ('face' in LANDMARK_TYPES):
        mp_drawing.draw_landmarks(image, results.face_landmarks,
            mp_holistic.FACEMESH_CONTOURS, 
            mp_drawing.DrawingSpec(color=color_landmarks['face']['keypoints'], thickness=1, circle_radius=1),
            mp_drawing.DrawingSpec(color=color_landmarks['face']['connections'], thickness=1, circle_radius=1))
    if ('right_hand' in LANDMARK_TYPES):
        mp_drawing.draw_landmarks(image, results.right_hand_landmarks,
            mp_holistic.HAND_CONNECTIONS, 
            mp_drawing.DrawingSpec(color=color_landmarks['right_hand']['keypoints'], thickness=2, circle_radius=4),
            mp_drawing.DrawingSpec(color=color_landmarks['right_hand']['connections'], thickness=2, circle_radius=2))
    if ('left_hand' in LANDMARK_TYPES):
        mp_drawing.draw_landmarks(image, results.left_hand_landmarks,
            mp_holistic.HAND_CONNECTIONS, 
            mp_drawing.DrawingSpec(color=color_landmarks['left_hand']['keypoints'], thickness=2, circle_radius=4),
            mp_drawing.DrawingSpec(color=color_landmarks['left_hand']['connections'], thickness=2, circle_radius=2))
    if ('pose' in LANDMARK_TYPES):
        mp_drawing.draw_landmarks(image, results.pose_landmarks,
            mp_holistic.POSE_CONNECTIONS,   
            mp_drawing.DrawingSpec(color=color_landmarks['pose']['keypoints'], thickness=2, circle_radius=4),
            mp_drawing.DrawingSpec(color=color_landmarks['pose']['connections'], thickness=2, circle_radius=2))

In [8]:
# cap = cv2.VideoCapture(0)
# # Set mediapipe model 
# with mp_holistic.Holistic(min_detection_confidence=0.5, 
#                           min_tracking_confidence=0.5) as holistic:
#     while cap.isOpened():

#         ret, frame = cap.read()
#         image, results = mediapipe_detection(frame, holistic) # Make detections
#         draw_styled_landmarks(image, results) # Draw landmarks

#         cv2.imshow('OpenCV Feed', image) # Show to screen
#         if cv2.waitKey(10) & 0xFF == ord('q'):
#             break
        
# cap.release()
# cv2.destroyAllWindows()

# 3. Extract Keypoint Values

In [9]:
def extract_keypoints(results):    
    keypoints = np.array([])
    if ('pose' in LANDMARK_TYPES):
      pose = np.array([[res.x, res.y, res.z, res.visibility] for res in 
                      results.pose_landmarks.landmark]).flatten() \
                        if results.pose_landmarks \
                        else np.zeros(num_pose_landmarks*pose_landmark_num_vals)
      keypoints = np.concatenate([keypoints, pose])
    if ('face' in LANDMARK_TYPES):
      face = np.array([[res.x, res.y, res.z] for res in 
                      results.face_landmarks.landmark]).flatten() \
                        if results.face_landmarks \
                        else np.zeros(num_face_landmarks*face_landmark_num_vals)
      keypoints = np.concatenate([keypoints, face])
    if ('left_hand' in LANDMARK_TYPES):
      left_hand = np.array([[res.x, res.y, res.z] for res in 
                            results.left_hand_landmarks.landmark]).flatten() \
                              if results.left_hand_landmarks \
                              else np.zeros(num_hand_landmarks*hand_landmark_num_vals)
      keypoints = np.concatenate([keypoints, left_hand])
    if ('right_hand' in LANDMARK_TYPES):
      right_hand = np.array([[res.x, res.y, res.z] for res in
                            results.right_hand_landmarks.landmark]).flatten() \
                              if results.right_hand_landmarks \
                              else np.zeros(num_hand_landmarks*hand_landmark_num_vals)
      keypoints = np.concatenate([keypoints, right_hand])
    return keypoints

# 4. Setup for Collection

In [10]:
file_names = {}
for action in ACTIONS.keys():
    file_names[action] = os.path.join(DATA_PATH, f'{action}.csv')

DATA_PATH = os.path.join('DATA')
os.makedirs(DATA_PATH, exist_ok=True)
    
label_map = {label:num for num, label in enumerate(list(ACTIONS.keys()))}

In [11]:
# # Create csv file based file_names
# for file_name in file_names.values():
#     with open(file_name, 'w', newline='') as f:
#         f.write('')

In [12]:

def append_row_to_file(file_name, row):
    with open(file_name, 'a+', newline='') as f:
        # Write new line data to csv file
        writer_object = writer(f)
        writer_object.writerow(row)
        f.close()

# 5. Collect Keypoint Values for Training and Testing

In [13]:
def collect_data_one_action(cap, file_name):

    # Set mediapipe model 
    with mp_holistic.Holistic(min_detection_confidence=0.5, 
                            min_tracking_confidence=0.5) as holistic:

        while cap.isOpened():            
            try:
                ret, frame = cap.read() # Read feed
                frame = cv2.flip(frame, 1)
                image, results = mediapipe_detection(frame, holistic) # Make detections
                draw_styled_landmarks(image, results) # Draw landmarks
                append_row_to_file(file_name, extract_keypoints(results))
                
                cv2.imshow('OpenCV Feed', image)
                if cv2.waitKey(10) & 0xFF == ord('q'):
                    break
            except:
                pass
    cap.release()
    cv2.destroyAllWindows()

In [14]:
list(ACTIONS.keys())

['PushUp', 'Squat', 'ReverseCrunch', 'Lunge', 'JumpingJacks', 'Standing']

In [15]:
# cap = cv2.VideoCapture(r'Videos\Standing.mp4')
# collect_data_one_action(cap=cap, file_name=file_names['Standing'])

In [16]:
# print('REMOVE ZERO ROWS', '-'*50)
# for action in list(ACTIONS.keys()):
#     with open(file_names[action], 'r') as f:
#         reader = csv.reader(f, delimiter=',')
#         data = list(reader)
#         # Remove '' in data
#         data = [[x for x in data_i if (x != '')] for data_i in data if (data_i != [])]
#         data = np.array([x for x in data if (x != [])], dtype=np.float32)
#         print(f'{action}: {data.shape} -> ', end='')
        
#         i = 0
#         for data_i in data:
#             if (sum(data_i) == 0):
#                 # Remove rows with all zeros in the csv file
#                 # This is to remove the rows where no landmarks were detected
#                 # and the array was filled with zeros
#                 data = np.delete(data, i, 0)
#                 # Decrement i to account for the deleted row so that the next row is not skipped
#                 # when the for loop increments i
#                 # This is because the array is now one row shorter and the next row is now the current row
#                 # (i.e. the row that was skipped)
#                 i -= 1
#             i += 1
#         print(data.shape)
        
#         # Delete and create new csv file
#         # This is to remove the rows where no landmarks were detected
#         # and the array was filled with zeros
#         # This is done so that the data is not used in training
#         # and the model does not learn to predict zeros
#         # (i.e. the model does not learn to predict no landmarks)
#         # This is done because the model will be used in real-time
#         # and it is better to have the model predict no landmarks
#         # than to have the model predict landmarks that are not there
#     os.remove(file_names[action])
        
#     with open(file_names[action], 'w', newline='') as f:
#         writer_object = writer(f)
#         for row in data:
#             writer_object.writerow(row)

# print('BALANCING CLASSES', '-'*50)
# num_rows = {}
# for action in list(ACTIONS.keys()):
#     with open(file_names[action], 'r') as f:
#         reader = csv.reader(f, delimiter=',')
#         data = list(reader)
#         data = np.array(data, dtype=np.float32)
        
#         row = data.shape[0]
#         num_rows[action] = row
# num_row_min = min(num_rows.values())

# for action in list(ACTIONS.keys()):
#     with open(file_names[action], 'r') as f:
#         reader = csv.reader(f, delimiter=',')
#         data = list(reader)
#         data = np.array(data, dtype=np.float32)
#         print(f'{action}: {data.shape} -> ', end='')
#         data = data[:num_row_min]
    
#     os.remove(file_names[action])
        
#     with open(file_names[action], 'w', newline='') as f:
#         writer_object = writer(f)
#         for row in data:
#             writer_object.writerow(row)
#         print(data.shape)

# 6. Preprocess Data and Create Labels and Features

In [17]:
# 1 Video are going to be SEQUENCE_LENGTH frames (each frame contains a no. extracted 
# keypoint) in length. ~
SEQUENCE_LENGTH = 60

In [18]:

sequences, labels = [], []
# Extract content of csv file into sequences
for action in list(ACTIONS.keys()):
    with open(file_names[action], 'r') as f:
        reader = csv.reader(f, delimiter=',')
        data = list(reader)
        
        for i in range(0, len(data) - SEQUENCE_LENGTH, SEQUENCE_LENGTH):
            # Extract SEQUENCE_LENGTH frames from csv file
            sequence = data[i: i + SEQUENCE_LENGTH]
            sequences.append(sequence)
            labels.append(label_map[action])

X = np.array(sequences, dtype=np.float32)
y = to_categorical(labels).astype(int)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05)

print(f'label_map: {label_map}')
print(f'Sequences shape: {np.array(sequences).shape}')
print(f'Labels shape: {np.array(labels).shape}')
print(f'X_train shape: {X_train.shape}')
print(f'y_train shape: {y_train.shape}')
print(f'X_test shape: {X_test.shape}')
print(f'y_test shape: {y_test.shape}')

label_map: {'PushUp': 0, 'Squat': 1, 'ReverseCrunch': 2, 'Lunge': 3, 'JumpingJacks': 4, 'Standing': 5}
Sequences shape: (2310, 60, 132)
Labels shape: (2310,)
X_train shape: (2194, 60, 132)
y_train shape: (2194, 6)
X_test shape: (116, 60, 132)
y_test shape: (116, 6)


# 7. Build and Train LSTM Neural Network

In [19]:
log_dir = os.path.join('Logs')
tb_callback = TensorBoard(log_dir=log_dir)

checkpoint_filepath = os.path.join('checkpoint')
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_filepath, save_weights_only=True,
    monitor='val_accuracy', mode='max', save_best_only=True)

def step_decay(epoch):
    initial_lr = 1e-4
    drop = 0.5
    epochs_drop = 10
    lr = initial_lr*math.pow(drop, math.floor((1 + epoch)/epochs_drop))
    return lr
LRScheduler_callback = tfk.callbacks.LearningRateScheduler(step_decay, verbose=1)

reduceLROnPlateau_callback = tfk.callbacks.ReduceLROnPlateau(monitor='val_loss', 
                                factor=0.2, verbose=1, patience=1, min_lr=0.001)

callback_earlyStopping = tfk.callbacks.EarlyStopping(patience=100, min_delta=0.05,
                            baseline=0.8, mode='min', monitor='val_loss',
                            restore_best_weights=True, verbose=1)

model = tfk.models.Sequential([
    layers.LSTM(64, return_sequences=True, activation=nn.leaky_relu, 
                input_shape=(SEQUENCE_LENGTH, num_features)),
    layers.LSTM(128, return_sequences=True, activation=nn.leaky_relu),
    layers.LSTM(64, return_sequences=False, activation=nn.leaky_relu),
    layers.Dense(64, activation=nn.leaky_relu),
    layers.Dense(32, activation=nn.leaky_relu),
    layers.Dense(len(list(ACTIONS.keys())), activation=nn.softmax)
])



model.compile(optimizer=tfk.optimizers.Adam(learning_rate=1e-5),
              loss=tfk.losses.CategoricalCrossentropy(),
              metrics=['accuracy'])

model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 lstm (LSTM)                 (None, 60, 64)            50432     
                                                                 
 lstm_1 (LSTM)               (None, 60, 128)           98816     
                                                                 
 lstm_2 (LSTM)               (None, 64)                49408     
                                                                 
 dense (Dense)               (None, 64)                4160      
                                                                 
 dense_1 (Dense)             (None, 32)                2080      
                                                                 
 dense_2 (Dense)             (None, 6)                 198       
                                                                 
Total params: 205,094
Trainable params: 205,094
Non-trai

In [102]:
model.fit(X_train, y_train, epochs=40, validation_data=(X_test, y_test), # 40
          callbacks=[callback_earlyStopping, LRScheduler_callback])


Epoch 1: LearningRateScheduler setting learning rate to 0.0001.
Epoch 1/40

Epoch 2: LearningRateScheduler setting learning rate to 0.0001.
Epoch 2/40

Epoch 3: LearningRateScheduler setting learning rate to 0.0001.
Epoch 3/40

Epoch 4: LearningRateScheduler setting learning rate to 0.0001.
Epoch 4/40

Epoch 5: LearningRateScheduler setting learning rate to 0.0001.
Epoch 5/40

Epoch 6: LearningRateScheduler setting learning rate to 0.0001.
Epoch 6/40

Epoch 7: LearningRateScheduler setting learning rate to 0.0001.
Epoch 7/40

Epoch 8: LearningRateScheduler setting learning rate to 0.0001.
Epoch 8/40

Epoch 9: LearningRateScheduler setting learning rate to 0.0001.
Epoch 9/40

Epoch 10: LearningRateScheduler setting learning rate to 5e-05.
Epoch 10/40

Epoch 11: LearningRateScheduler setting learning rate to 5e-05.
Epoch 11/40

Epoch 12: LearningRateScheduler setting learning rate to 5e-05.
Epoch 12/40

Epoch 13: LearningRateScheduler setting learning rate to 5e-05.
Epoch 13/40

Epoch 1

<keras.callbacks.History at 0x1b20d9f3b20>

# 8. Make Predictions

In [22]:
# res = model.predict(X_test)
# print(f'y_pred: {list(ACTIONS.keys())[np.argmax(res[4])]}')
# print(f'y_true: {list(ACTIONS.keys())[np.argmax(y_test[4])]}')

# 9. Save Weights

In [103]:
# model.save('action.h5')
# model.save('action1.h5')

In [20]:
model.load_weights('action.h5')

# 10. Evaluation using Confusion Matrix and Accuracy

In [21]:
# yhat = model.predict(X_test)
# ytrue = np.argmax(y_test, axis=1).tolist()
# yhat = np.argmax(yhat, axis=1).tolist()

In [22]:
# multilabel_confusion_matrix(ytrue, yhat)

In [23]:
# accuracy_score(ytrue, yhat)

# 11. Test in Real Time

In [24]:
colors = []
for i in range(len(ACTIONS)):
    colors.append(create_color())
def prob_viz(res_prob, ACTIONS, input_frame):
    output_frame = input_frame.copy()
    for i, prob in enumerate(res_prob):
        cv2.rectangle(output_frame, (0, 60 + i*40), (int(prob*100), 90 + i*40),
                      create_color(), -1)
        cv2.putText(output_frame, ACTIONS[i], (0, 85 + i*40), 
                    cv2.FONT_HERSHEY_SIMPLEX, 1, colors[i], 2, cv2.LINE_AA)
        
    return output_frame

# Calculate Angle

<img src="https://i.imgur.com/3j8BPdc.png" style="height:300px" >

https://manivannan-ai.medium.com/find-the-angle-between-three-points-from-2d-using-python-348c513e2cd

In [25]:
POSE_LANDMARKS = {'nose': 0,
                  'left_eye_inner': 1, 'left_eye': 2, 'left_eye_outer': 3,
                  'right_eye_inner': 4, 'right_eye': 5, 'right_eye_outer': 6,
                  'left_ear': 7, 'right_ear': 8,
                  'mouth_left': 9, 'mouth_right': 10,
                  'left_shoulder': 11, 'right_shoulder': 12,
                  'left_elbow': 13, 'right_elbow': 14,
                  'left_wrist': 15, 'right_wrist': 16,
                  'left_pinky': 17, 'right_pinky': 18,
                  'left_index': 19, 'right_index': 20,
                  'left_thumb': 21, 'right_thumb': 22,
                  'left_hip': 23, 'right_hip': 24,
                  'left_knee': 25, 'right_knee': 26,
                  'left_ankle': 27, 'right_ankle': 28,
                  'left_heel': 29, 'right_heel': 30,
                  'left_foot_index': 31, 'right_foot_index': 32}

def get_joint_coordinate(results, joint_name, get_z=False):
    joint = results.pose_landmarks.landmark[POSE_LANDMARKS[joint_name]]
    if get_z:
        return [joint.x, joint.y, joint.z]
    else:
        return [joint.x, joint.y]
    
def get_three_joint_coordinates(results, joint_names, get_z=False):
    joint_coordinates = []
    for joint_name in joint_names:
        joint_coordinates.append(get_joint_coordinate(results, joint_name, get_z))
    return joint_coordinates

def get_three_joint_coordinates_two_sides(results, joint_names_left,
                            joint_names_right, get_z=False):
    joint_coordinates_left = get_three_joint_coordinates(results, joint_names_left, get_z)
    joint_coordinates_right = get_three_joint_coordinates(results, joint_names_right, get_z)
    return [joint_coordinates_left, joint_coordinates_right]

In [26]:
def calculate_angle(joint_coordinates):
    pt1 = np.array(joint_coordinates[0]) # First
    pt2 = np.array(joint_coordinates[1]) # Mid
    pt3 = np.array(joint_coordinates[2]) # End
    
    # x: 0, y: 1
    # arctan2 measures the counterclockwise angle θ, in radians, between the positive 
    # x-axis and the point (x, y)
    angle_radians = np.arctan2(pt3[1] - pt2[1], pt3[0] - pt2[0]) - \
                    np.arctan2(pt1[1] - pt2[1], pt1[0]- pt2[0])
    angle_degree = np.abs(angle_radians*180.0/np.pi)
    
    if angle_degree > 180.0: # Convert to 0-180 degree
        angle_degree = 360 - angle_degree
        
    return angle_degree

def calculate_angle_two_side(joint_coordinates_left, joint_coordinates_right):
    angle_left = calculate_angle(joint_coordinates_left)
    angle_right = calculate_angle(joint_coordinates_right)
    
    return angle_left, angle_right

In [27]:
def counting(angle, stage, counter, max_angle, min_angle):
    if angle[0] > max_angle:
        stage[0] = "down"
    if angle[0] < min_angle and stage[0] == 'down':
        stage[0] = "up"
        counter[0] += 1
    if angle[1] > max_angle:
        stage[1] = "down"
    if angle[1] < min_angle and stage[1] == 'down':
        stage[1] = "up"
        counter[1] += 1
    return stage, counter

In [28]:
WEBCAM_DIM = [640, 480]
COLOR_REPS = create_color()
COlOR_STAGE = create_color()
COLOR_ANGLE = create_color()
COLOR_ACTION = create_color()

def show_status_box(image, counter, stage, angle, mid_point, action, is_two_side=False):
    # Render curl counter   
    # Setup status box
    BOX_WIDTH = 120
    BOX_HEIGHT = 35
    BOX_COLOR = (255, 255, 255)
    
    TEXT_REPS_START_LEFT = [10, 12]
    TEXT_STAGE_START_LEFT = [65, 12]
    TEXT_NUM_REPS_START_LEFT = [TEXT_REPS_START_LEFT[0], 30]
    TEXT_NUM_STAGE_START_LEFT = [TEXT_STAGE_START_LEFT[0], 30]
    
    TEXT_REPS_START_RIGHT = [WEBCAM_DIM[0] - BOX_WIDTH + TEXT_REPS_START_LEFT[0], 
                             TEXT_REPS_START_LEFT[1]]
    TEXT_STAGE_START_RIGHT = [WEBCAM_DIM[0] - BOX_WIDTH + TEXT_STAGE_START_LEFT[0], 
                              TEXT_STAGE_START_LEFT[1]]
    TEXT_NUM_REPS_START_RIGHT = [WEBCAM_DIM[0] - BOX_WIDTH + TEXT_REPS_START_LEFT[0],
                                 TEXT_NUM_REPS_START_LEFT[1]]
    TEXT_NUM_STAGE_START_RIGHT = [WEBCAM_DIM[0] - BOX_WIDTH + TEXT_STAGE_START_LEFT[0],
                                  TEXT_NUM_REPS_START_LEFT[1]]
    
    cv2.rectangle(image, (0, 0), (WEBCAM_DIM[0], BOX_HEIGHT), BOX_COLOR, -1, )
    cv2.putText(image, 'REPS', TEXT_REPS_START_LEFT, 
                cv2.FONT_HERSHEY_SIMPLEX, 0.5, COLOR_REPS, 2, cv2.LINE_AA)
    cv2.putText(image, 'STAGE', TEXT_STAGE_START_LEFT, 
                cv2.FONT_HERSHEY_SIMPLEX, 0.5, COlOR_STAGE, 2, cv2.LINE_AA)
    cv2.putText(image, 'REPS', TEXT_REPS_START_RIGHT, 
        cv2.FONT_HERSHEY_SIMPLEX, 0.5, COLOR_REPS, 2, cv2.LINE_AA)
    cv2.putText(image, 'STAGE', TEXT_STAGE_START_RIGHT, 
        cv2.FONT_HERSHEY_SIMPLEX, 0.5, COlOR_STAGE, 2, cv2.LINE_AA)
    
    BOX_ACTION_WIDTH = 75
    cv2.putText(image, 'ACTION', (int(WEBCAM_DIM[0]//2) - 25, 12), 
                cv2.FONT_HERSHEY_SIMPLEX, 0.5, COLOR_ACTION, 2, cv2.LINE_AA)
    cv2.putText(image, action, (int(WEBCAM_DIM[0]//2 - BOX_ACTION_WIDTH), 30), 
                cv2.FONT_HERSHEY_SIMPLEX, 0.5, COLOR_ACTION, 2, cv2.LINE_AA)
    
    ANGLE_TEXT_SIZE = 2
    
    if is_two_side:
        cv2.putText(image, str(counter[0]), TEXT_NUM_REPS_START_LEFT, 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, COLOR_REPS, 2, cv2.LINE_AA)
        cv2.putText(image, str(stage[0]), TEXT_NUM_STAGE_START_LEFT, 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, COlOR_STAGE, 2, cv2.LINE_AA)
        cv2.putText(image, str(int(angle[0])),
                    tuple(np.multiply(mid_point[0], WEBCAM_DIM).astype(int)), 
                    cv2.FONT_HERSHEY_SIMPLEX, ANGLE_TEXT_SIZE, COLOR_ANGLE, 2, cv2.LINE_AA)
        
        cv2.putText(image, str(counter[1]), TEXT_NUM_REPS_START_RIGHT, 
                cv2.FONT_HERSHEY_SIMPLEX, 0.5, COLOR_REPS, 2, cv2.LINE_AA)
        cv2.putText(image, str(stage[1]), TEXT_NUM_STAGE_START_RIGHT, 
                cv2.FONT_HERSHEY_SIMPLEX, 0.5, COlOR_STAGE, 2, cv2.LINE_AA)
        cv2.putText(image, str(int(angle[1])),
                    tuple(np.multiply(mid_point[1], WEBCAM_DIM).astype(int)), 
                    cv2.FONT_HERSHEY_SIMPLEX, ANGLE_TEXT_SIZE, COLOR_ANGLE, 2, cv2.LINE_AA)
    else:  
        counter = counter[0]
        stage = stage[0]
        angle = angle[0]
        mid_point = mid_point[0]
        
        cv2.putText(image, str(counter), TEXT_NUM_REPS_START_LEFT, 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, COLOR_REPS, 2, cv2.LINE_AA)
        cv2.putText(image, stage, TEXT_NUM_STAGE_START_LEFT, 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, COlOR_STAGE, 2, cv2.LINE_AA)
        cv2.putText(image, str(int(angle)),
                    tuple(np.multiply(mid_point, WEBCAM_DIM).astype(int)), 
                    cv2.FONT_HERSHEY_SIMPLEX, ANGLE_TEXT_SIZE, COLOR_ANGLE, 2, cv2.LINE_AA)

In [None]:
# Curl counter variables
counter = [0, 0]
stage = [None, None]
angle = [0, 0]
mid_point = [None, None]

cap = cv2.VideoCapture(0)

# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():
        ret, frame = cap.read()
        image, results = mediapipe_detection(frame, holistic) # Make detections
        draw_styled_landmarks(image, results) # Draw landmarks

        action = 'Squat'         
        action_angle = ACTIONS[action]['ANGLE']            
        joint_names_left, joint_names_right = ACTIONS[action]['JOINTS']
        
        # Extract landmarks (Joint, coordinate). Determining Joints
        try:  
            joint_coordinates_left, joint_coordinates_right = get_three_joint_coordinates_two_sides(results, joint_names_left,
                            joint_names_right, get_z=False)
            mid_point = [joint_coordinates_left[1], joint_coordinates_right[1]]
            angle = calculate_angle_two_side(joint_coordinates_left, joint_coordinates_right)
            stage, counter = counting(angle, stage, counter,
                                      min_angle=action_angle[0], 
                                      max_angle=action_angle[1])
            show_status_box(image, counter, stage, angle, mid_point, action, 
                            is_two_side=True)       
        except:
            pass
        
        cv2.imshow('Mediapipe Feed', image)

        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

In [29]:
ACTION_SURE_LIMIT = 5
MOST_OCCURRING_ACTION_LIMIT = ACTION_SURE_LIMIT//2
MAX_REPS = 5

def do_counting(cap, action, max_reps):
    # Curl counter variables
    counter = [0, 0]
    stage = [None, None]
    angle = [0, 0]
    mid_point = [None, None]
    cv2.destroyWindow('Detection')
    
    # Set mediapipe model 
    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        while cap.isOpened():
            ret, frame = cap.read()
            image, results = mediapipe_detection(frame, holistic) # Make detections
            draw_styled_landmarks(image, results) # Draw landmarks

            action_angle = ACTIONS[action]['ANGLE']            
            joint_names_left, joint_names_right = ACTIONS[action]['JOINTS']
            
            # Extract landmarks (Joint, coordinate). Determining Joints
            try:  
                joint_coordinates_left, joint_coordinates_right = get_three_joint_coordinates_two_sides(results, joint_names_left,
                                joint_names_right, get_z=False)
                mid_point = [joint_coordinates_left[1], joint_coordinates_right[1]]
                angle = calculate_angle_two_side(joint_coordinates_left, joint_coordinates_right)
                stage, counter = counting(angle, stage, counter,
                                        min_angle=action_angle[0], 
                                        max_angle=action_angle[1])
                show_status_box(image, counter, stage, angle, mid_point, action, 
                                is_two_side=True)     
            except:
                pass
            
            cv2.imshow('Counting', image)

            if (counter[0] > max_reps - 1 or counter[1] > max_reps - 1):
                do_detection(action_sure_limit=ACTION_SURE_LIMIT, SEQUENCE_LENGTH=SEQUENCE_LENGTH, model=model,
                             is_detection=False)
            # else:
            #     cap.release()
            #     cv2.destroyAllWindows()
            
            if cv2.waitKey(10) & 0xFF == ord('q'):
                break

        cap.release()
        cv2.destroyAllWindows()
    
# do_counting(cap, action='Squat', max_reps=4)

In [30]:

def do_detection(action_sure_limit, SEQUENCE_LENGTH, model, is_detection=True):
    if is_detection == False:
        cv2.destroyWindow('Counting')
    
    # Detection variables
    sequences = []
    actions = []
    threshold = 0.7
    todo_count = False
    cap = cv2.VideoCapture(0)
    
    # Set mediapipe model 
    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        while cap.isOpened():
            ret, frame = cap.read() # Read feed
            image, results = mediapipe_detection(frame, holistic) # Make detections
            draw_styled_landmarks(image, results) # Draw landmarks
            
            # Prediction logic
            keypoints = extract_keypoints(results)
            sequences.append(keypoints)
            sequences = sequences[-SEQUENCE_LENGTH:]
            
            if len(sequences) == SEQUENCE_LENGTH:
                res_prob = model.predict(np.expand_dims(sequences, axis=0))[0]
                res_max_prob_idx = np.argmax(res_prob)
                res_label = list(ACTIONS.keys())[res_max_prob_idx]
                
                actions.append(res_label)
                
                # Viz probabilities
                image = prob_viz(res_prob, list(ACTIONS.keys()), image) 
                
                # Check if actions are the same for the last `action_sure_limit` frames
                # If so, then we are sure of the action. If not, then we are not sure of the action
                if (actions == list(np.full((action_sure_limit,), res_label))):
                    todo_count = True
                    
            actions = actions[-action_sure_limit:]
            cv2.imshow('Detection', image)
            if cv2.waitKey(10) & 0xFF == ord('q'):
                break
            
            if (todo_count == True):
                do_counting(cap, action=res_label, max_reps=MAX_REPS)
                break
            
        cap.release()
        cv2.destroyAllWindows()
        return actions

actions = do_detection(action_sure_limit=ACTION_SURE_LIMIT, 
                       SEQUENCE_LENGTH=SEQUENCE_LENGTH, model=model, is_detection=True)



error: OpenCV(4.6.0) D:\a\opencv-python\opencv-python\opencv\modules\imgproc\src\color.cpp:182: error: (-215:Assertion failed) !_src.empty() in function 'cv::cvtColor'


In [108]:
actions

['PushUp', 'PushUp', 'PushUp', 'PushUp', 'PushUp', 'PushUp', 'PushUp']