# Generating Landmarks using Mediapipe and YOLOv5

Mediapipe currently does not support detection of multiple persons in a single frame. YOLOv5 is used to detect the various subjects and crop out the relevant bounding box involving each of them. Mediapipe will then detect the hand, post and facial landmarks using the holistic model, saving the information in a csv file, with each frame having a seperate csv file

# TO-DOs
### Features
1) Calculate FPS (DONE)

2) Assign "index" to person ID

3) Store Landmark values and impute missing values

4) Feature engineering for relevant angles and distances

5) Train model based on engineered features

6) Create a window based detection algorithm on model output

7) Store prediction and relevant intermediate data in CSV, one file for each frame

### Issues
1) Slow interference by YOLOv5 model

2)

In [1]:
#Import relevant libraries
import torch
import cv2
import pandas as pd
import mediapipe as mp
import numpy as np
from mediapipe.python.solutions import pose as mp_pose
from sort import *
# import PIL
# from PIL import Image
# from matplotlib import pyplot
# import matplotlib.image as mpimg
%matplotlib inline

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Model
yolo_model = torch.hub.load('ultralytics/yolov5', 'yolov5s')  # or yolov5n - yolov5x6, custom
#since we are only intrested in detecting person
yolo_model.classes=[0]

Using cache found in /Users/chanchialer/.cache/torch/hub/ultralytics_yolov5_master
[31m[1mrequirements:[0m YOLOv5 requirements "scipy>=1.4.1" "seaborn>=0.11.0" not found, attempting AutoUpdate...

[31m[1mrequirements:[0m 2 packages updated per /Users/chanchialer/.cache/torch/hub/ultralytics_yolov5_master/requirements.txt
[31m[1mrequirements:[0m ⚠️ [1mRestart runtime or rerun command for updates to take effect[0m

YOLOv5 🚀 2022-10-15 Python-3.8.8 torch-1.12.1 CPU

Fusing layers... 
YOLOv5s summary: 213 layers, 7225885 parameters, 0 gradients
Adding AutoShape... 


In [3]:
# mp_drawing = mp.solutions.drawing_utils
# mp_hol = mp.solutions.holistic
# mp_pose = mp.solutions.pose

In [5]:
#Helper functions

#generates a new list containing xmin, ymin, xmax,   ymax,  index, confidence
#class is not included as an output since we are dealing with only perons
def bbs_info_generator(bbs_with_ids, bbs_with_confidence):
    if len(bbs_with_ids) != len(bbs_with_confidence):
        return
    output = []
    for i in range(len(bbs_with_ids)):
        bb_info = bbs_with_ids[i][0:].copy()
        bb_info.extend([bbs_with_confidence[i][4]])
        output.append(bb_info)
    return output

#Stores information about landmarks for a person in a nested list
#each item in the list represents a landmark and contains the [x,y,z,visibility] of the landmark
def person_landmark_info_generator(results,index):
    person_landmark_info = []
    if results.pose_landmarks != None:
        for landmark in results.pose_landmarks.landmark:
            landmark_info = []
            landmark_info.append(landmark.x)
            landmark_info.append(landmark.y)
            landmark_info.append(landmark.z)
            landmark_info.append(landmark.visibility)
            landmark_info.append(index)
            person_landmark_info.append(landmark_info)
    else:
        for landmark in range(33):
            person_landmark_info.append([np.nan,np.nan,np.nan,np.nan,index])

    if results.face_landmarks != None:
        for landmark in results.face_landmarks.landmark:
            landmark_info = []
            landmark_info.append(landmark.x)
            landmark_info.append(landmark.y)
            landmark_info.append(landmark.z)
            landmark_info.append(landmark.visibility)
            landmark_info.append(index)
            person_landmark_info.append(landmark_info)
    else:
        for landmark in range(468):
            person_landmark_info.append([np.nan,np.nan,np.nan,np.nan,index])

    if results.left_hand_landmarks != None:
        for landmark in results.left_hand_landmarks.landmark:
            landmark_info = []
            landmark_info.append(landmark.x)
            landmark_info.append(landmark.y)
            landmark_info.append(landmark.z)
            landmark_info.append(landmark.visibility)
            landmark_info.append(index)
            person_landmark_info.append(landmark_info)
    else:
        for landmark in range(21):
            person_landmark_info.append([np.nan,np.nan,np.nan,np.nan,index])       

    if results.right_hand_landmarks != None:
        for landmark in results.right_hand_landmarks.landmark:
            landmark_info = []
            landmark_info.append(landmark.x)
            landmark_info.append(landmark.y)
            landmark_info.append(landmark.z)
            landmark_info.append(landmark.visibility)
            landmark_info.append(index)
            person_landmark_info.append(landmark_info)
    else:
        for landmark in range(21):
            person_landmark_info.append([np.nan,np.nan,np.nan,np.nan,index])      
    return person_landmark_info

In [6]:
# df = pd.DataFrame(person_landmark_info, columns=[
#     str("person" + str(index) + ".x"), str("person" + str(index) + ".y"), 
#     str("person" + str(index) + ".z"), str("person" + str(index) + ".visibility")])

In [7]:
video_path ="/Users/chanchialer/Documents/GitHub/RCP/RCP 2.0/test.mp4"
# video_path ="/Users/chanchialer/Downloads/Baby.mp4"

#get the dimension of the video
cap = cv2.VideoCapture(video_path)
while cap.isOpened():
    ret, frame = cap.read()
    h, w, _ = frame.shape
    size = (w, h)
    print(size)
    break

(1280, 720)


In [8]:
cap = cv2.VideoCapture(video_path)

# used to record the time when we processed last frame
prev_frame_time = 0
 
# used to record the time at which we processed current frame
new_frame_time = 0

#create instance of SORT
mot_tracker = Sort() 

mp_hol = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils

#For saving the video file as output.avi
out = cv2.VideoWriter("output.avi", cv2.VideoWriter_fourcc(*"MJPG"), 20, size)

all_frame_landmark_info = []

frame_count = 0
with mp_hol.Holistic(min_detection_confidence=0.3, min_tracking_confidence=0.2) as holistic:
    while cap.isOpened():    
        ret, frame = cap.read()  
        if ret == False:
            break
        key = cv2.waitKey(1) & 0xFF
        frame_count += 1

        # Recolor Feed from RGB to BGR
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        #making image writeable to false improves prediction
        image.flags.writeable = False    

        # get detections
        result = yolo_model(image)  
        detections = result.pred[0].numpy()
        
        # update SORT
        track_bbs_ids = mot_tracker.update(detections)
        
        # Recolor image back to BGR for rendering
        image.flags.writeable = True   
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

        #This array will contain crops of images incase we need it 
        img_list =[]

        #we need some extra margin bounding box for human crops to be properly detected
        MARGIN=10
        
        bbs_info = []
        bbs_info = bbs_info_generator(track_bbs_ids.tolist(), result.xyxy[0].tolist())
        
        all_person_landmark_info = []
        
        if bbs_info != None:
            for (xmin, ymin, xmax,   ymax,  index,  confidence) in bbs_info:
                results = holistic.process(image[int(ymin)+MARGIN:int(ymax)+MARGIN,int(xmin)+MARGIN:int(xmax)+MARGIN:])
                
                #Draw bounding box on image
                cv2.rectangle(image, (int(xmin),int(ymin)), (int(xmax), int(ymax)), (255,0,0), 2)
                cv2.putText(image, str(int(index)), (int(xmax), int(ymax)), cv2.FONT_HERSHEY_PLAIN, 1, (255,0,0), 2)
                
                #Draw pose landmarks on image
                mp_drawing.draw_landmarks(
                    image[int(ymin)+MARGIN:int(ymax)+MARGIN,int(xmin)+MARGIN:int(xmax)+MARGIN:], 
                    results.pose_landmarks, 
                    mp_hol.POSE_CONNECTIONS,
                    mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=2), 
                    mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                ) 
                #collect information on landmarks of the person detected
                person_landmark_info = person_landmark_info_generator(results, index)
#                 df_person_landmark_info = pd.DataFrame(person_landmark_info, columns=[
#                                         str("person" + str(index) + ".x"), str("person" + str(index) + ".y"), 
#                                         str("person" + str(index) + ".z"), str("person" + str(index) + ".visibility")])
                
                #store this landmark infomration in a list
#                 df_all_person_landmark_info = pd.concat([df_all_person_landmark_info, df_person_landmark_info], axis=1)
                all_person_landmark_info.append(person_landmark_info)
                
#                 #Draw face landmarks on image  
#                 mp_drawing.draw_landmarks(
#                     image[int(ymin)+MARGIN:int(ymax)+MARGIN,int(xmin)+MARGIN:int(xmax)+MARGIN:], 
#                     results.face_landmarks, 
#                     mp_hol.FACEMESH_CONTOURS,
#                     mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=2), 
#                     mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2) 
#                 ) 
#                 #Draw left hand landmarks on image  
#                 mp_drawing.draw_landmarks(
#                     image[int(ymin)+MARGIN:int(ymax)+MARGIN,int(xmin)+MARGIN:int(xmax)+MARGIN:], 
#                     results.left_hand_landmarks, 
#                     mp_hol.HAND_CONNECTIONS,
#                     mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=2), 
#                     mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2) 
#                 ) 
#                 #Draw right hand landmarks on image  
#                 mp_drawing.draw_landmarks(
#                     image[int(ymin)+MARGIN:int(ymax)+MARGIN,int(xmin)+MARGIN:int(xmax)+MARGIN:], 
#                     results.right_hand_landmarks, 
#                     mp_hol.HAND_CONNECTIONS,
#                     mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=2), 
#                     mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2) 
#                 ) 

        #         img_list.append(image[int(ymin):int(ymax),int(xmin):int(xmax):])
        
        
        all_frame_landmark_info.append(all_person_landmark_info)
        
        new_frame_time = time.time()
        fps = 1/(new_frame_time-prev_frame_time)
        prev_frame_time = new_frame_time
        cv2.putText(image, str(int(fps)), (10,70), cv2.FONT_HERSHEY_PLAIN, 3, (0, 0, 0), 3)
        cv2.putText(image, str(frame_count), (w-140, 70), cv2.FONT_HERSHEY_PLAIN, 3, (0, 0, 0), 3)
        # Write the frame into the file 'output.avi'
        out.write(image)
        cv2.imshow("image", image)

            # writing in the video file 
        #     out.write(image)


        # if the 'q' key is pressed, stop the loop
        if key == ord("q"):
            break

# When everything done, release the video capture and video write objects
cap.release()
out.release()         
del mot_tracker

INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


In [64]:
all_person1_info = []
all_person2_info = []

for frame in all_frame_landmark_info:
    person1_info = []
    person2_info = []
    for person in frame:
        for landmark in person:
            if landmark[4] == 1:
                person1_info.extend(landmark[0:4])
            if landmark[4] == 2:
                person2_info.extend(landmark[0:4])
    all_person1_info.append(person1_info)
    all_person2_info.append(person2_info)

In [65]:
landmarks = []
for val in range(1, len(all_frame_landmark_info[0][0])+1):
    landmarks += ['x{}'.format(val), 'y{}'.format(val), 'z{}'.format(val), 'v{}'.format(val)]

In [66]:
df_all_person1_info = pd.DataFrame(data=all_person1_info, columns = landmarks)

In [67]:
df_all_person2_info = pd.DataFrame(data=all_person2_info, columns = landmarks)

In [68]:
df_all_person1_info = df_all_person1_info.fillna(0)

In [69]:
df_all_person2_info = df_all_person2_info.fillna(0)

In [70]:
df_all_person1_info.insert(0,"point", " ")
df_all_person1_info.insert(0,"reach", " ")

In [76]:
df_all_person2_info.insert(0,"point", " ")
df_all_person2_info.insert(0,"reach", " ")

In [77]:
df_all_person1_info.to_csv('person1.csv', index = False)

In [78]:
df_all_person2_info.to_csv('person2.csv', index = False)