In [1]:
__author__ = "Bruce Pannaman"
# Taken from https://github.com/rwightman/posenet-python

In [2]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))
import warnings
warnings.filterwarnings('ignore')

import tensorflow as tf
import cv2
import time
import argparse
import statistics
import math
import posenet
import time
import IPython
from io import BytesIO
import PIL
from PIL import Image
from PIL import ImageColor
from PIL import ImageDraw
from PIL import ImageFont
from PIL import ImageOps

## Load Posenet Model

In [None]:
# with tf.Session() as sess:
#     model_cfg, model_outputs = posenet.load_model(101, sess)
#     output_stride = model_cfg['output_stride']

## Setup Webcam Methods

In [None]:
def get_frame(cam):
    """
    Grab the latest frame from the cv2.VideoCapture instances that is connected to my Mac's Webcam
    
    Returns a full size frame for viewing and another one that is 25% of the size for faster facial recognition scans per frame

    """
    # Capture frame-by-frame
    ret, frame = cam.read()

    # flip image for natural viewing
    frame = cv2.flip(frame, 1)
    
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    # frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    small_frame = cv2.resize(frame, (0, 0), fx=0.25, fy=0.25)
    small_frame = small_frame[:, :, ::-1]

    return frame, small_frame

In [None]:
def array_to_image(a, fmt='jpeg'):
    """
    Creates an image from the numpy array that you have been edited. Only use when you want to view it again.
    
    #Use 'jpeg' instead of 'png' (~5 times faster)
    """
    #Create binary stream object
    f = BytesIO()

    #Convert array to binary stream object
    PIL.Image.fromarray(a).save(f, fmt)

    return IPython.display.Image(data=f.getvalue())

In [None]:
def process_frame(frame):
    width, height, inference_time, results = yolo.inference(frame)
    for detection in results:
#         print(detection)
        id, name, confidence, x, y, w, h = detection
        color = colors[id]

        cv2.rectangle(frame, (x, y), (x+w, y+h), color, 2)
        cv2.rectangle(frame, (x, y+h - 35), (x+w, y+h), color, cv2.FILLED)
        font = cv2.FONT_HERSHEY_DUPLEX
        cv2.putText(frame, classes[id], (x + 6, y+h - 6), font, 1.0, (255, 255, 255), 1)
    return frame

In [None]:
def process_frame2(cap, pa_status, calibrated, squat_count):        
        input_image, display_image, output_scale = posenet.read_cap(cap, scale_factor=0.4, output_stride=output_stride)


        heatmaps_result, offsets_result, displacement_fwd_result, displacement_bwd_result = sess.run(
            model_outputs, feed_dict={'image:0': input_image})

        pose_scores, keypoint_scores, keypoint_coords = posenet.decode_multi.decode_multiple_poses(
            heatmaps_result.squeeze(axis=0),
            offsets_result.squeeze(axis=0),
            displacement_fwd_result.squeeze(axis=0),
            displacement_bwd_result.squeeze(axis=0),
            output_stride=output_stride,
            max_pose_detections=2,
            min_pose_score=0.5)

        keypoint_coords *= output_scale

        # TODO this isn't particularly fast, use GL for drawing and display someday...
        overlay_image, body_part_coordinates = posenet.draw_skel_and_kp(
            display_image, pose_scores, keypoint_scores, keypoint_coords, pa_status, calibrated, squat_count,
            min_pose_score=0.15, min_part_score=0.1)

        return overlay_image, body_part_coordinates

## Pose Analysis

In [None]:
class PoseAnalysis:
    def __init__(self):
        self.squat_measurement_needs = ["leftShoulder", "rightShoulder", "leftHip", "rightHip", "leftKnee", "rightKnee", "leftAnkle", "rightAnkle"]
        self.status = "Attempting Calibration"
        self.calibrated = 0
        self.last_calibrated = time.time()
        self.squat_counter = 0
        self.cal_shoulder_hip = None
        self.cal_hip_knee = None
        self.cal_knee_ankle = None
        self.cal_hip_ankle = None
        self.minimum_squat_distance = None
        self.user_up = True
        
        
    def calculate_mean_height(self, measurements, body_part):
        both_sides = [measurements["left%s" % body_part]["coordinates"][0], measurements["left%s" % body_part]["coordinates"][0]]
        return statistics.mean(both_sides)
    
    def get_body_ratios(self, measurements):    
        shoulders = self.calculate_mean_height(measurements, "Shoulder")
        hips = self.calculate_mean_height(measurements, "Hip")
        knees = self.calculate_mean_height(measurements, "Knee")
        ankles = self.calculate_mean_height(measurements, "Ankle")
        
        # cartesian coordinates start on top left not bottom left so subtraction of y axis has to be reversed
        return (hips - shoulders), (knees- hips), (ankles - knees), (ankles-hips)
        
    def calibrate(self, measurements):
        s_h, h_k, k_a, h_a = self.get_body_ratios(measurements)
        
        # Reset Calibration if there has been too much time spent moving about 
        if time.time() - self.last_calibrated > 8:
            print(time.time() - self.last_calibrated)
            self.calibrated == 0

        self.last_calibrated = time.time()
        
        #Ignoring Invalid distances
        if any([x < 0 for x in [s_h, h_k, k_a, h_a]]):
            pass
            
        # First calibration
        elif self.calibrated == 0:
            self.cal_shoulder_hip = s_h
            self.cal_hip_knee = h_k
            self.cal_knee_ankle = k_a
            self.cal_hip_ankle = h_a
            self.calibrated += 1
        
        # Subsequent Calibration
        elif self.calibrated > 0 and self.calibrated < 3:
            self.cal_shoulder_hip = statistics.mean([self.cal_shoulder_hip, s_h])
            self.cal_hip_knee = statistics.mean([self.cal_hip_knee, h_k])
            self.cal_knee_ankle = statistics.mean([self.cal_knee_ankle, k_a])
            self.cal_hip_ankle = statistics.mean([self.cal_hip_ankle, h_a])
            self.calibrated += 1
            
        # Last Calibration run
        else:
            self.calibrated += 1
            self.minimum_squat_distance = math.sqrt(math.pow(self.cal_hip_knee, 2) + math.pow(self.cal_knee_ankle, 2))
            print("Successfully calibrated")
            return True, "Calibration Complete"
        
        return True, "Calibration # %d" % self.calibrated
    
    
    def assess_squat(self, measurements):
        # Don't assess if the camera can't see all of the joints needed
        if any([measurements[x]["visible"] == False for x in self.squat_measurement_needs]):
            self.status = "Step Back for full visibility"
            return False, "Not all body parts visible"
        
        # Not finished Calibrating
        elif self.calibrated <= 3:
            self.status = "Calibrating # %d" % self.calibrated
            return True, self.calibrate(measurements)
        
        elif self.minimum_squat_distance is not None:
            s_h, h_k, k_a, h_a = self.get_body_ratios(measurements)
            
            # Invalid measurement
            if h_a < 0:
                self.status = "Invalid Measurement"
                return False, self.status
            
            # If shoulders are dipped
            if s_h < (self.cal_shoulder_hip * 0.7):
                self.status = "Don't bend at the waist!!"
                return True, self.status
                
            # If the user is now in up Position
            if h_a > self.minimum_squat_distance:
                self.status = "Assessing new Squat"
                self.user_up = True
                return True, self.status
            
            # User is below minimum Squat position
            elif h_a <= self.minimum_squat_distance and self.user_up is True:
                self.squat_counter += 1
                self.status = "Successful Squat"
                self.user_up = False
                return True, self.status
            
            # User is below up threshold but not low enough
            else:
                self.status = "Squat not counted (up = %s)" % str(self.user_up)
                return False, self.status
        

## Webcam Operations

In [None]:
d = IPython.display.display("", display_id=1)
d2 = IPython.display.display("", display_id=2)

print("starting webcam...")
cam = cv2.VideoCapture(1)
counter = 0
pa = PoseAnalysis()

with tf.Session() as sess:
    model_cfg, model_outputs = posenet.load_model(101, sess)
    output_stride = model_cfg['output_stride']
    
    while True:
        t1 = time.time()
        frame, body_part_coordinates= process_frame2(cam, pa.status, pa.calibrated >= 3, pa.squat_counter)
        if counter > 50:
            if pa.calibrated < 3 and counter % 20 == 0:
                success, message = pa.assess_squat(body_part_coordinates)
            elif counter % 3 == 0:
                success, message = pa.assess_squat(body_part_coordinates)


        # For Jupyter
        im = array_to_image(frame)
        d.update(im)
        # For Normal Python
        # cv2.imshow("preview", frame)


        t2 = time.time()
        s = f"""{int(1/(t2-t1))} FPS"""
        if pa.calibrated < 3:
            message = "<h1 style='font-size: 45;'>Calibrating</h1>"
        else:
            message = "<h1 style='font-size: 45;'>%s</h1>" % "Squat Count = %d" % pa.squat_counter
#         d2.update( IPython.display.HTML(message) )
        counter += 1
        
        key = cv2.waitKey(20)
        if key == 27:  # exit on ESC
            break

cv2.destroyWindow("preview")
vc.release()


In [None]:
"""
{
	'nose': {
		'coordinates': array([0., 0.]),
		'visible': False
	},
	'leftEye': {
		'coordinates': array([0., 0.]),
		'visible': False
	},
	'rightEye': {
		'coordinates': array([0., 0.]),
		'visible': False
	},
	'leftEar': {
		'coordinates': array([0., 0.]),
		'visible': False
	},
	'rightEar': {
		'coordinates': array([0., 0.]),
		'visible': False
	},
	'leftShoulder': {
		'coordinates': array([0., 0.]),
		'visible': False
	},
	'rightShoulder': {
		'coordinates': array([0., 0.]),
		'visible': False
	},
	'leftElbow': {
		'coordinates': array([0., 0.]),
		'visible': False
	},
	'rightElbow': {
		'coordinates': array([0., 0.]),
		'visible': False
	},
	'leftWrist': {
		'coordinates': array([0., 0.]),
		'visible': False
	},
	'rightWrist': {
		'coordinates': array([0., 0.]),
		'visible': False
	},
	'leftHip': {
		'coordinates': array([0., 0.]),
		'visible': False
	},
	'rightHip': {
		'coordinates': array([0., 0.]),
		'visible': False
	},
	'leftKnee': {
		'coordinates': array([0., 0.]),
		'visible': False
	},
	'rightKnee': {
		'coordinates': array([0., 0.]),
		'visible': False
	},
	'leftAnkle': {
		'coordinates': array([0., 0.]),
		'visible': False
	},
	'rightAnkle': {
		'coordinates': array([0., 0.]),
		'visible': False
	}
}
"""