In [1]:
!pip install tensorflow opencv-python matplotlib sklearn gTTS ipython



In [1]:
import tensorflow as tf
import numpy as np
import cv2
from gtts import gTTS
from IPython.display import Audio
import threading
import time

In [5]:
# model = tf.keras.models.load_model('pushup-counter-softmax.h5')
# model = tf.keras.models.load_model('pushup-counter-with-reduced-landmark-and-sigmoid.h5')
model = tf.keras.models.load_model('pushup-counter-movenet.h5')

In [6]:
movenet_folder = 'models/movenet_lightning.tflite'
interpreter = tf.lite.Interpreter(model_path=movenet_folder)
interpreter.allocate_tensors()

In [7]:
def processPose(image):
    img = tf.image.resize_with_pad(np.expand_dims(image, axis=0), 192,192)
    img = tf.cast(img, dtype=tf.float32)
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()
    interpreter.set_tensor(input_details[0]['index'], np.array(img))
    # Invoke inference.
    interpreter.invoke()
    # Get the model prediction.
    keypoints_with_scores = interpreter.get_tensor(output_details[0]['index'])
    keypoints = keypoints_with_scores.flatten().reshape(17,3)
    fixed_keypoints = []
    for [x, y, score] in keypoints:
        if score < 0.2:
            fixed_keypoints.append([0, 0, 0])
        else:
            fixed_keypoints.append([x, y, score])
    return np.array(fixed_keypoints)

In [20]:
def allEqual(iterable):
    iterator = iter(iterable)
    
    try:
        firstItem = next(iterator)
    except StopIteration:
        return True
        
    for x in iterator:
        if x!=firstItem:
            return False
    return True

In [22]:
def text_to_speech(sentence):
    tts = gTTS(sentence) #Provide the string to convert to speech
    sound_file = '1.wav'
    tts.save(sound_file) #save the string converted to speech as a .wav file
    display(Audio('1.wav', autoplay=True))

In [28]:
# im just trying to test with static image first
image = cv2.imread('gambar4.jpg')
keypoints = processPose(image).flatten().reshape(1, 51)
predd = model.predict(keypoints)
posee = np.argmax(predd)
confidence = predd[0][posee]
print(predd)
print(posee)
print(confidence)

[[1.0000000e+00 2.7352547e-17]]
0
1.0


In [30]:
any([0, 0, 0, 1])

True

In [39]:
def body_is_visible(keypoints):
    keypoints = keypoints.reshape(17, 3)
    important_bodyparts = [5, 7, 9, 11, 13] #this is only for the left part
    for index in important_bodyparts:
        if (keypoints[index][2] < 0.2) and (keypoints[index+1][2] < 0.2):  # one of the left and right bodypart needs to be seen
            return False
    return True

In [44]:
threshold = 0.6
history = []
fail_count = 0
pushup_count = 0
num_frames_requirement = 3
pushup_down_done = False

cap = cv2.VideoCapture(0)

while cap.isOpened():
    # Read feed
    ret, frame = cap.read()
    #frame = cv2.flip(frame, 1)

    cv2.putText(frame, "push up count:"+str(pushup_count), (3,30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)

    keypoints = processPose(frame).flatten().reshape(1, 51)
    
    if body_is_visible(keypoints):
        y_pred = model.predict(keypoints)
        pushup_pose = np.argmax(y_pred)
        pose_confidence = y_pred[0][pushup_pose]
        
        print(pushup_pose, end=" ") #######
        
        if pose_confidence > threshold:
            # reset continuous fail count
            fail_count = 0

            history.append(pushup_pose)
            
            if len(history) >= (2*num_frames_requirement):
                if allEqual(history[(-1*num_frames_requirement):]):
                    if history[(-1*num_frames_requirement)] != history[(-1*num_frames_requirement)-1]:
                        if allEqual(history[(-2*num_frames_requirement):(-1*num_frames_requirement)]):
                            if pushup_pose == 0:
                                pushup_down_done = True
                            else:
                                if pushup_down_done:
                                    pushup_count += 1
                                    pushup_down_done = False
                                    cv2.putText(frame, "push up count:"+str(pushup_count), (3,30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
                                    threading.Thread(target=text_to_speech, args=[str(pushup_count)+" pushup"]).start()
#                                         t = threading.Thread(target=text_to_speech, args=[str(pushup_count)+" pushup"])
#                                         t.start()
#                                         t.join()
#         else:
#             fail_count += 1
#             if fail_count > 100:
#                 history = []
#                 pushup_down_done = False

    cv2.imshow('OpenCV Feed', frame)

    if cv2.waitKey(10) & 0xFF == ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 1 0 0 0 0 0 0 1 0 0 0 0 0 1 0 0 0 0 0 0 0 0 1 1 1 1 0 0 0 1 1 1 1 

0 1 1 0 0 0 0 0 0 0 0 1 1 1 1 1 1 0 0 0 1 0 0 1 1 1 1 1 1 1 1 1 0 0 0 0 0 0 1 0 0 1 0 0 0 0 0 0 0 0 0 1 0 0 1 1 1 1 1 1 1 1 1 1 0 0 0 0 0 1 0 0 0 0 1 0 0 0 0 0 0 0 1 1 0 1 1 1 1 1 1 1 1 1 0 0 0 1 1 1 1 

1 1 0 0 0 0 0 0 1 1 0 1 1 1 1 1 1 0 0 0 1 1 1 1 

1 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 1 1 

1 1 1 1 1 1 1 0 1 1 1 0 0 1 0 1 1 1 1 0 0 0 0 0 0 0 1 1 1 1 

1 1 1 1 1 1 1 1 1 1 1 1 1 0 0 0 0 0 0 0 0 0 0 0 0 1 1 1 1 

1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0 0 0 0 0 0 1 1 1 1 

1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0 1 1 1 1 1 1 0 0 0 0 0 0 1 1 1 1 

1 0 0 1 1 1 1 1 1 1 0 0 0 0 0 0 0 0 0 1 1 1 0 

0 1 1 0 1 1 1 1 0 0 0 0 0 0 0 0 0 1 1 1 

0 0 0 1 0 1 1 0 1 1 0 0 0 0 0 0 0 0 0 0 1 