In [1]:
#step 1: Import
import cv2
import mediapipe as mp
import numpy as np
from PIL import ImageFont, ImageDraw, Image
import math
import os
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
mp_pose = mp.solutions.pose
fontpath = "fonts/gulim.ttc"
font = ImageFont.truetype(fontpath, 35)
train_data_path = "C:\jupyter-notebook\Mediapipe\Train Video"   #train video folder
result_data_path = "C:\jupyter-notebook\Mediapipe\Result Image" #result image folder

In [2]:
#step 2: Define Function
def calculateangle(a,b,c):
    a = np.array(a)
    b = np.array(b)
    c = np.array(c)
    
    
    radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - np.arctan2(a[1] - b[1], a[0] - b[0])   #arc tangent of x1/x2.
    angle = np.abs(radians*180.0 / np.pi)   # Radian to degree
    if angle > 180.0:
        angle = 360 - angle   #180도이상 안펴짐
        
    return angle

def calculatedistance(a,b):
    a = np.array(a)
    b = np.array(b)
    distance = math.sqrt((a[0] - b[0])*(a[0] - b[0]) + (a[1] - b[1])*(a[1] - b[1]))
    return distance


def DrawFeedback(fb1, fb2, fb3, fb4, longfemur, cnt, frame, image):
    #fb1 >> feedback angle2
    #fb2 >> distance between ankle
    #fb3 >> whether ankle z value  difference is high or not
    #fb3 >> whether shoulder z value  difference is high or not
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    img_pil = Image.fromarray(image)
    draw = ImageDraw.Draw(img_pil, 'RGBA')
    draw.rectangle((0,0,640,90), outline=(0,0,0,255), fill=(160,160,160,180),width=3)
    draw.rectangle((0,90,640,180), outline=(0,0,0,255), fill=(160,160,160,180),width=3)
    draw.rectangle((0,180,640,270), outline=(0,0,0,255), fill=(160,160,160,180),width=3)    
    draw.rectangle((0,270,640,360), outline=(0,0,0,255), fill=(160,160,160,180),width=3)
    draw.rectangle((0,360,640,450), outline=(0,0,0,255), fill=(160,160,160,180),width=3)
    if(fb1 == 0):
        draw.text((5, 25), "Feedback1 : 이상없음", font = font, fill=(255,255,255,255))
    elif(fb1 == 1):
        draw.text((5, 25), "Feedback1 : 허리를 더 펴주세요", font = font, fill=(255,255,255,255))
    if(fb2 == 0):
        draw.text((5, 115), "Feedback2 : 이상없음", font = font, fill=(255,255,255,255))
    elif(fb2 == 1):
        draw.text((5, 115), "Feedback2 : 보폭을 넓혀주세요", font = font, fill=(255,255,255,255))
    elif(fb2 == 2):
        draw.text((5, 115), "Feedback2 : 보폭을 좁혀주세요", font = font, fill=(255,255,255,255))
    if(fb3 == 0):
        draw.text((5, 205), "Feedback3 : 이상없음", font = font, fill=(255,255,255,255))
    elif(fb3 == 1):
        draw.text((5, 205), "Feedback3 : 발 균형을 맞춰주세요", font = font, fill=(255,255,255,255))
    if(fb4 == 0):
        draw.text((5, 295), "Feedback4 : 이상없음", font = font, fill=(255,255,255,255))
    elif(fb4 == 1):
        draw.text((5, 295), "Feedback4 : 어깨 균형을 맞춰주세요", font = font, fill=(255,255,255,255))
    if(longfemur):
        draw.text((5, 390), "대퇴골이 경골보다 기므로 어께보다 발을 넓혀주세요", font = ImageFont.truetype(fontpath, 20), fill=(255,255,255,255))
    else:
        draw.text((5, 390), "경골이 대퇴골보다 기므로 어께랑 비슷하게 발을 맞춰주세요", font = ImageFont.truetype(fontpath, 20), fill=(255,255,255,255))
                 
    image = np.array(img_pil)
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    cv2.imwrite(os.path.join(result_data_path, f"phase{cnt+1}-{frame+1}.png"), image)
    return



In [4]:
#step 3: Train data extracting

train_up_data = np.zeros((10, 3, 10))
stage = None
state = None
up_point = 0
down_point = 0
#video load
for num in range(3):
    cap = cv2.VideoCapture(os.path.join(train_data_path, r"%d.avi" %(num+1)))
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080)
    if not cap.isOpened():
        print('Train data no.%d load FAILED' %(num+1))
        continue
    with mp_pose.Pose(
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5) as pose:
        while cap.isOpened():
            success, image = cap.read()
            if not success:
                print('Train data no.%d FRAME End' %(num+1))
                break
            
            #Recolor image(BGR >> RGB)
            image.flags.writeable = False
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            #RGB Image Processing(Make detection)
            results = pose.process(image)
            #Recolor back(RGB >> BGR)
            image.flags.writeable = True
            image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        
            #Extract Landmarks
            try:
                landmarks = results.pose_landmarks.landmark
            
                #extracting 9 joints
                hip = [landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].x, landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].y]
                knee = [landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value].x, landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value].y]
                left_ankle = [landmarks[mp_pose.PoseLandmark.LEFT_ANKLE.value].x, landmarks[mp_pose.PoseLandmark.LEFT_ANKLE.value].y]
                right_ankle = [landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value].y]
                left_shoulder = [landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x, landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y]
                left_elbow = [landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].x, landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].y]
                left_wrist = [landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].x, landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].y]
                right_shoulder = [landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y]
                right_elbow = [landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value].y]
                right_wrist = [landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value].y]
                #calculating 4 angle
                angle = calculateangle(hip, knee, left_ankle)
                angle2 = calculateangle(knee, hip, left_shoulder)
                left_angle = calculateangle(left_shoulder, left_elbow, left_wrist)
                right_angle = calculateangle(right_shoulder, right_elbow, right_wrist)
                #state detect
                if(left_angle <= 90 and right_angle <= 90 and abs(left_shoulder[1] - left_wrist[1]) < 0.15 and abs(right_shoulder[1] - right_wrist[1]) < 0.15):
                    state = "ready"
                else:
                    state = "wait"
            
                #Conter Logic
                if angle > 160:     
                    if(stage == 'down'):
                        counter +=1
                        print('Train data no.%d Count Detected!'%(num+1))
                        up_point = 0
                        down_point = 0
                    stage = "up"
                if angle < 105 and stage == 'up':      
                    stage = "down"

                #searching up point & down point
                if state == 'ready' and stage == 'up' and up_point == 0:
                    up_point = left_shoulder[1]   #assigning y value
                    data = [left_shoulder[1], angle, angle2]    #assigning first value
                    data = np.array(data).reshape(3,1)
                if state == 'ready' and stage == 'down' and down_point == 0:
                    down_point = left_shoulder[1]
                    stride = (up_point - down_point) / 10
    
                #when going up
                if state == 'ready' and stage == 'down':
                    for i in range(10):
                        point = down_point + stride*i
                        #finding up_data
                        if((point - 0.02) < left_shoulder[1] and left_shoulder[1] < (point + 0.02)):
                            train_up_data[num, 0, i] = left_shoulder[1]
                            train_up_data[num, 1, i] = angle
                            train_up_data[num, 2, i] = angle2
            except:    #if there is no detection
                pass
        print('Train data no.%d load COMPLETED' %(num+1))
    cap.release()    

Train data no.1 FRAME End
Train data no.1 load COMPLETED
Train data no.2 FRAME End
Train data no.2 load COMPLETED
Train data no.3 FRAME End
Train data no.3 load COMPLETED


In [13]:
#step 4: Main
#변수 선언 & 초기화
counter = 0
stage = None
state = None
longfemur = 2     #assigning value for defining only once
up_point = 0
down_point = 0
up_data = np.zeros((3,10))
pre_shoulder_y = 0
accuracy = 0

cap = cv2.VideoCapture(1)  #Video from webcam

cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)   #해상도조절
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)

with mp_pose.Pose(
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5) as pose:  #Mediapipe 적용
    while cap.isOpened():
        success, image = cap.read()  #Webcam에서 image 가져옴
        if not success:
            print("Camera Input ERROR")
            continue
            
        #Recolor image(BGR >> RGB)
        image.flags.writeable = False
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        #RGB Image Processing(Make detection)
        results = pose.process(image)  #results에 모든 joint들의 좌표들이 들어가짐
        #Recolor back(RGB >> BGR)
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        
        #Extract Landmarks
        try:
            landmarks = results.pose_landmarks.landmark
            
            #extracting 10 joints
            hip = [landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].x, landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].y]
            knee = [landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value].x, landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value].y]
            left_ankle = [landmarks[mp_pose.PoseLandmark.LEFT_ANKLE.value].x, landmarks[mp_pose.PoseLandmark.LEFT_ANKLE.value].y]
            right_ankle = [landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value].y]
            left_shoulder = [landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x, landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y]
            left_elbow = [landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].x, landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].y]
            left_wrist = [landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].x, landmarks[mp_pose.PoseLandmark.LEFT_WRIST.value].y]
            right_shoulder = [landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y]
            right_elbow = [landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value].y]
            right_wrist = [landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value].x, landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value].y]
            #calculating 4 angle
            angle = calculateangle(hip, knee, left_ankle)
            angle2 = calculateangle(knee, hip, left_shoulder)
            left_angle = calculateangle(left_shoulder, left_elbow, left_wrist)
            right_angle = calculateangle(right_shoulder, right_elbow, right_wrist)
            
            ########## State & Counter Logic ########
            #detectiong up/down
            if(pre_shoulder_y - right_shoulder[1] > 0):
                direction = 1    #going up
            else:
                direction = 0   #going down
            pre_shoulder_y = right_shoulder[1]  
            
            #pose detect
            if(left_angle <= 90 and right_angle <= 90 and abs(left_shoulder[1] - left_wrist[1]) < 0.15 and abs(right_shoulder[1] - right_wrist[1]) < 0.15):
                state = "ready"
            else:
                state = "wait"
                
            #Conter Logic
            if angle > 175:     
                if(stage == 'down'):
                    counter +=1
                    ##########squat accuracy logic ##########
                    error_up = abs(train_up_data[2, 2:, :] - up_data[2:, :])  #train의 angle2 10개와 실제 데이터를 빼서 오차를구함
                    error_up_avg = np.sum(np.ones((1, 10)) - error_up/train_up_data[2, 2:, :])/10*100  #얻은 오차를 기반으로 percentage구함
                    accuracy = round(error_up_avg)
                    #reset values
                    up_point = 0
                    down_point = 0
                    up_data = np.zeros((3,10))
                stage = "up"
            if angle < 105 and stage == 'up':      
                stage = "down"

            #searching up point & down point
            if state == 'ready' and stage == 'up' and up_point == 0:
                if(longfemur == 2):
                    #distance(calculate only once)
                    L1 = calculatedistance(hip, knee)
                    L2 = calculatedistance(knee, left_ankle)
                    #대퇴골이 경골보다 긴지 측정
                    if(L1 >= L2):
                        longfemur = 1
                    else:
                        longfemur = 0
                up_point = left_shoulder[1]   #assigning y value
                data = [left_shoulder[1], angle, angle2]    #assigning first value
                data = np.array(data).reshape(3,1)
            if state == 'ready' and stage == 'down' and down_point == 0:
                down_point = left_shoulder[1]
                stride = (up_point - down_point) / 10

            #when going up
            if state == 'ready' and stage == 'down':
                for i in range(10):
                    point = down_point + stride*i
                    #finding up_data
                    if((point - 0.02) < left_shoulder[1] and left_shoulder[1] < (point + 0.02)):
                        up_data[0, i] = left_shoulder[1]
                        up_data[1, i] = angle
                        up_data[2, i] = angle2
                        ###########feedback logic ###########
                        fb1, fb2, fb3, fb4 = 0, 0, 0, 0
                        L3 = calculatedistance(left_ankle, right_ankle)
                        L4 = calculatedistance(left_shoulder, right_shoulder)
                        left_shoulder_z = landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].z
                        right_shoulder_z = landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].z
                        left_ankle_z = landmarks[mp_pose.PoseLandmark.LEFT_ANKLE.value].z
                        right_ankle_z = landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value].z
                        dif = abs(left_ankle_z - right_ankle_z)
                        dif2 = abs(left_shoulder_z - right_shoulder_z)
                        #if(train_up_data[2, 2, i] >= angle2 + 10):
                        if(train_up_data[2, 1, i]*1.05  <= angle):
                            fb1 = 1
                        if(longfemur):
                            if(L3 < L4):
                                fb2 = 1  #대퇴골이 더 긴경우 어께보다 보폭을 넓혀야됨
                            elif(L3 > L4*1.25):
                                fb2 = 2  #너무 넓힌 경우 좁혀야됨
                        else:
                            if(L3 > L4*1.2):
                                fb2 = 2  #대퇴골이 더 짧은경우 어께랑 비슷하게 해야됨 (범위는 b로 결정)
                            elif(L3  < L4*0.8):
                                fb2 = 1
                        
                        if(dif >= 0.1):
                            fb3 = 1
                        if(dif2 >= 0.05):
                            fb4 = 1
            
                        if(os.path.isfile(os.path.join(result_data_path, f"phase{counter+1}-{i+1}.png"))): #break if file already exists
                            break
                        else:
                            DrawFeedback(fb1, fb2, fb3, fb4, longfemur ,counter, i, image)
                            break
                
        except:    #if there is no detection
            pass
        #Drawing the results
        mp_drawing.draw_landmarks(
        image,
        results.pose_landmarks,
        mp_pose.POSE_CONNECTIONS,
        landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style())
        
        flip_image = cv2.flip(image, 1)        
        #status box setup
        cv2.rectangle(flip_image, (0,0), (350,73), (160, 160, 160), -1)
        #Visualize data
        cv2.putText(flip_image, 'Cnt', (15, 12), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
        cv2.putText(flip_image, str(counter), (10, 60), 
                   cv2.FONT_HERSHEY_SIMPLEX, 2, (255, 255, 255), 2, cv2.LINE_AA)
        cv2.putText(flip_image, 'Stage', (65, 12), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
        cv2.putText(flip_image, stage, (60, 60), 
                   cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        cv2.putText(flip_image, 'State', (150, 12), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
        cv2.putText(flip_image, state, (150, 60), 
                   cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
        cv2.putText(flip_image, 'Accuracy', (250, 12), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1, cv2.LINE_AA)
        cv2.putText(flip_image, str(accuracy), (270, 60), 
                   cv2.FONT_HERSHEY_SIMPLEX, 2, (255, 255, 255), 2, cv2.LINE_AA)
        
        #Plotting image
        cv2.imshow('MediaPipe Result', flip_image)
        if cv2.waitKey(5) & 0xFF == 27:
            break
cap.release()
cv2.destroyAllWindows()