In [1]:
import os
import cv2
import mediapipe as mp
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, models

In [2]:
# initialize mediapipe pose estimation model
mp_pose = mp.solutions.pose
pose = mp_pose.Pose()
# pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5)

In [3]:
# get keypoints from image with mediapipe
def extract_keypoints(image):
    # convert image to RGB
    rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    results = pose.process(rgb_image)

    # if no landmarks (joints) return zeros
    if not results.pose_landmarks:
        return np.zeros(33*3)
    
    # get keypoints in (x,y,z) coordinates format
    keypoints = []
    for landmark in results.pose_landmarks.landmark:
        keypoints.append([landmark.x, landmark.y, landmark.z])
    return np.array(keypoints).flatten()


In [5]:
# load images and their labels
def load_images(folder):
    data = []
    labels = []
    #big3 = ["bench press","squat", "deadlift" ]
    big3 = ["push up","barbell biceps curl", "squat" ]
    # go through each folder
    for exercise in os.listdir(folder):
        if exercise in big3:
            exercise_folder = os.path.join(folder,exercise)
            if os.path.isdir(exercise_folder):
                # go through each image in folder
                for img_file in os.listdir(exercise_folder):
                    img_path = os.path.join(exercise_folder, img_file)

                    # read image
                    image = cv2.imread(img_path)
                    if image is not None:
                        keypoints = extract_keypoints(image)
                        data.append(keypoints)
                        labels.append(exercise)
    return np.array(data), np.array(labels)

In [4]:
# load images from workout data folder
""" base_folder = "./workout_data/images"
data,labels = load_images(base_folder) """
labels = ['barbell biceps curl', 'push up', 'squat']

In [5]:
# encode exercise names into numerical format
from sklearn.preprocessing import LabelEncoder
label_encoder = LabelEncoder()
labels_encoded = label_encoder.fit_transform(labels)

In [None]:
# split data into training and test sets
from sklearn.model_selection import train_test_split
trainData, testData, trainLabel, testLabel = train_test_split(data, labels_encoded, test_size=0.2, random_state=42)

In [None]:
# build neural network to classify exercises based on keypoints
model = models.Sequential([
    layers.Dense(128, activation="relu", input_shape=(trainData.shape[1],)),
    layers.Dense(64, activation="relu"),
    layers.Dense(len(np.unique(labels_encoded)), activation="softmax")
])

# def create_exercise_recognition_model(num_exercises, sequence_length=30):
#     input_layer = tf.keras.Input(shape=(sequence_length, 33, 3))  # Sequence of 33 pose landmarks, 3 coordinates each
    
#     x = tf.keras.layers.TimeDistributed(tf.keras.layers.Flatten())(input_layer)
#     x = tf.keras.layers.LSTM(128, return_sequences=True)(x)
#     x = tf.keras.layers.LSTM(64)(x)
#     x = tf.keras.layers.Dense(32, activation='relu')(x)
#     output = tf.keras.layers.Dense(num_exercises, activation='softmax')(x)

#     model = tf.keras.Model(inputs=input_layer, outputs=output)
#     return model

# # Create and compile the model
# num_exercises = 5  # Adjust based on the number of exercises you want to recognize
# sequence_length = 30  # Adjust based on your video length and frame rate
# model = create_exercise_recognition_model(num_exercises, sequence_length)


# compile model
model.compile(optimizer='adam', loss="sparse_categorical_crossentropy", metrics=['accuracy'])

In [None]:
model.fit(trainData, trainLabel, epochs=10, validation_data=(testData, testLabel))
# model.fit(trainData, trainLabel, epochs=5, validation_data=(testData, testLabel), batch_size=32)


In [None]:
loss, accuracy = model.evaluate(testData, testLabel)
print(f"test accuracy: {accuracy}")

In [6]:
new_model = tf.keras.models.load_model('RepModel.keras')

  saveable.load_own_variables(weights_store.get(inner_path))


In [16]:
# use trained model to predict on new images
def classify_image(image_path, model):
    image = cv2.imread(image_path)
    keypoints = extract_keypoints(image)
    keypoints = np.expand_dims(keypoints, axis=0)
    prediction = model.predict(keypoints)
    predicted_class = label_encoder.inverse_transform([np.argmax(prediction)])
    return predicted_class[0]


In [17]:
def classify_video(video_path, model):
    vid = cv2.VideoCapture(video_path)
    if not vid.isOpened():
        print("Error opening video file")
        return
    dict1 = {}
    while vid.isOpened():
        ret, frame = vid.read()
        if not ret:
            break
        keypoints = extract_keypoints(frame)
        keypoints = np.expand_dims(keypoints, axis=0)
        prediction = model.predict(keypoints)
        predicted_class = label_encoder.inverse_transform([np.argmax(prediction)])
        if predicted_class[0] not in dict1:
            dict1[predicted_class[0]] =1
        else:
            dict1[predicted_class[0]] +=1
        lol = max(dict1, key=dict1.get)

        print(predicted_class)
        # cv2.putText(frame, f'Predicted: {predicted_class[0]}', (10, 30), 
        #     cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
        cv2.putText(frame, f'Predicted: {lol}', (10, 30), 
            cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
        cv2.imshow("Video classification", frame)

        # break loop on 'q' key press
        if cv2.waitKey(1) & 0xFF == ord("q"):
            break
    vid.release()
    cv2.destroyAllWindows()
    # return predicted_class[0]
    maxv = None  # Initialize maxv to negative infinity
    curv = None  
    for v in dict1:
        if  maxv == None or dict1[v] > dict1[maxv]:
            cur1 = dict1[v]
            curv = v
            max1 = cur1
            maxv = curv
    return maxv


In [15]:
# Repetition counter
from rep_counting.pkg.kps_metrics import KpsMetrics
from rep_counting.rep_counter import RepetitionCounter

DEFAULT_CONFIG_DIR = "./rep_counting/smart_trainer_config/config.json"

exercise_dict = {'barbell biceps curl': 'bicep_curls', 'push up': 'push_ups', 'squat': 'squats'}

vid = cv2.VideoCapture(0, cv2.CAP_DSHOW)
if not vid.isOpened():
    print("Error opening video file")
dict1 = {}
maxv = None  # Initialize maxv to negative infinity
curv = None

rep_counter = RepetitionCounter(config_path=DEFAULT_CONFIG_DIR)
# rep_counter.set_metric("bicep_curls")

while vid.isOpened():
    ret, frame = vid.read()
    if not ret:
        break
        
    while sum(dict1.values()) < 10:
        for v in dict1:
            if maxv == None or dict1[v] > dict1[maxv]:
                cur1 = dict1[v]
                curv = v
                max1 = cur1
                maxv = curv
        
        keypoints = extract_keypoints(frame)
        keypoints = np.expand_dims(keypoints, axis=0)
        prediction = new_model.predict(keypoints)
        predicted_class = label_encoder.inverse_transform([np.argmax(prediction)])
        if predicted_class[0] not in dict1:
            dict1[predicted_class[0]] =1
        else:
            dict1[predicted_class[0]] +=1
        lol = max(dict1, key=dict1.get)
        
        cv2.putText(frame, f'Predicted: {maxv}', (10, 30), 
            cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
        cv2.imshow("Video classification", frame)

        print(predicted_class)
    
    # cv2.putText(frame, f'Predicted: {predicted_class[0]}', (10, 30), 
    #     cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
    cv2.putText(frame, f'Predicted: {maxv}', (10, 30), 
        cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
    # print(predicted_class)
    # cv2.putText(frame, f'Predicted: {predicted_class[0]}', (10, 30), 
    #     cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
    
    rep_counter.set_metric(exercise_dict[maxv])
    
    kps_norm = rep_counter.update_metric(frame)
    frame = rep_counter.draw_kps_skeleton(frame, kps_norm, 5)
    metric = rep_counter.get_metric(rep_counter.current_metric_name)
    
    cv2.putText(frame, f'Reps: {str(metric.reptition_count)}', (10, frame.shape[0] - 10), 
        cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
    cv2.putText(frame, f'Predicted: {maxv}', (10, 30), 
        cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
    cv2.imshow("Video classification", frame)
    
    # break loop on 'q' key press
    if cv2.waitKey(1) & 0xFF == ord("q"):
        break
vid.release()
cv2.destroyAllWindows()

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 34ms/step
['barbell biceps curl']
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 15ms/step



[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step
['barbell biceps curl']
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
['barbell biceps curl']
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
['barbell biceps curl']




[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step
['barbell biceps curl']
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step
['barbell biceps curl']
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
['barbell biceps curl']
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step



[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
['barbell biceps curl']
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 18ms/step
['barbell biceps curl']
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 17ms/step
['barbell biceps curl']




In [14]:
import warnings

# Suppress specific UserWarning from protobuf
warnings.filterwarnings("ignore", category=UserWarning, message=r'SymbolDatabase.GetPrototype\(\) is deprecated')


In [None]:
path_test_img = "./test_data/jessefront_curl.mov"
# predicted_exercise = classify_image(path_test_img)
predicted_exercise = classify_video(path_test_img, new_model)
print(f"predicted exercise: {predicted_exercise}")

In [139]:
save_format = 'keras'
model.save('RepModel.keras')

In [None]:
model = tf.keras.models.load_model('RepModel.keras')
converter = tf.lite.TFLiteConverter.from_keras_model(model)

# Convert the model
tflite_model = converter.convert()

# Save the TFLite model
with open('RepModelLite.tflite', 'wb') as f:
    f.write(tflite_model)