<a href="https://colab.research.google.com/github/1ForrestWargo1/alarming_poses/blob/main/InnovationWeek2-mask.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
from enum import Enum

# class syntax
class Color(Enum):
    nose = 0
    left_eye = 1
    right_eye = 2
    left_ear = 3
    right_ear = 4
    left_shoulder = 5
    right_shoulder = 6
    left_elbow = 7
    right_elbow = 8
    left_hand = 9
    right_hand = 10
    left_hip = 11
    right_hip = 12
    left_knee = 13
    right_knee = 14
    left_foot = 15
    right_foot = 16

In [None]:
p_map = {'nose':0,'left_foot': 16, "right_foot": 15,'left_eye':2,"right_shoulder":5,'left_shoulder':6,'left_hip':12,'left_hand':9,'left_ear':4}

In [6]:
! git clone https://github.com/1ForrestWargo1/alarming_yolov7_poses.git
%cd /content/alarming_yolov7_poses


fatal: destination path 'alarming_yolov7_poses' already exists and is not an empty directory.
/content/alarming_yolov7_poses


In [7]:
! git pull https://github.com/1ForrestWargo1/alarming_yolov7_poses.git

From https://github.com/1ForrestWargo1/alarming_yolov7_poses
 * branch            HEAD       -> FETCH_HEAD
Already up to date.


In [8]:
! curl -L https://github.com/WongKinYiu/yolov7/releases/download/v0.1/yolov7-w6-pose.pt -o yolov7-w6-pose.pt


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100  153M  100  153M    0     0  96.6M      0  0:00:01  0:00:01 --:--:--  145M


In [9]:
import torch
from torchvision import transforms

from utils.datasets import letterbox
from utils.general import non_max_suppression_kpt
from utils.plots import output_to_keypoint, plot_skeleton_kpts

import matplotlib.pyplot as plt
import cv2
import numpy as np

In [10]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

def load_model():
    model = torch.load('yolov7-w6-pose.pt', map_location=device)['model']
    # Put in inference mode
    model.float().eval()

    if torch.cuda.is_available():
        # half() turns predictions into float16 tensors
        # which significantly lowers inference time
        model.half().to(device)
    return model

pose_model = load_model()

In [11]:
def run_inference(image,model):
    # Resize and pad image
    image = letterbox(image, 960, stride=64, auto=True)[0] # shape: (567, 960, 3)
    # Apply transforms
    image = transforms.ToTensor()(image) # torch.Size([3, 567, 960])
    if torch.cuda.is_available():
      image = image.half().to(device)
    # Turn image into batch
    image = image.unsqueeze(0) # torch.Size([1, 3, 567, 960])
    with torch.no_grad():
      output, other = model(image)
    return output, image


In [12]:
def parse_video(filename, save_file = None,privacy = True, do_draw_avatar = True, do_estimate_activity = True):
    cap = cv2.VideoCapture(filename)
    # VideoWriter for saving the video
    fourcc = cv2.VideoWriter_fourcc(*'MP4V')
    if not save_file:
      save_file =  'test_output.mp4'
    out = cv2.VideoWriter(save_file, fourcc, 30.0, (int(cap.get(3)), int(cap.get(4))))
    while cap.isOpened():
        (ret, frame) = cap.read()
        if ret == True:
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

            if do_estimate_activity or do_draw_avatar:
              pose_output,frame = run_inference(frame,pose_model)
              pose_output = non_max_suppression_kpt(pose_output, 
                                     0.25, # Confidence Threshold
                                     0.65, # IoU Threshold
                                     nc=pose_model.yaml['nc'], # Number of Classes
                                     nkpt=pose_model.yaml['nkpt'], # Number of Keypoints
                                     kpt_label=True)
  
              with torch.no_grad():
                    pose_output = output_to_keypoint(pose_output)
              nimg = frame[0].permute(1, 2, 0) * 255
              nimg = nimg.cpu().numpy().astype(np.uint8)
              frame = cv2.cvtColor(nimg, cv2.COLOR_RGB2BGR)
            else:
              pose_output = None

            frame = apply_privacy(frame,bboxes = None)
            frame = estimate_activity(frame,pose_output)
            frame = draw_avatar(frame,pose_output)
            #frame = draw_keypoints(pose_output, frame,set_standing_angle = 0,p_img = False)

            frame = cv2.resize(frame, (int(cap.get(3)), int(cap.get(4))))
            out.write(frame)
        else:
            break
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

In [13]:
def estimate_activity(frame,pose_output):
  return frame

  




In [14]:
def apply_privacy(frame,bboxes):
  return frame

In [15]:
def draw_avatar(frame,pose_output):
  return frame

In [None]:
import numpy as np
def calc_shoulder_to_foot_dist(points):
  foot_points = np.array([points[p_map["left_foot"]*3:p_map["left_foot"]*3+3],points[p_map["right_foot"]*3:p_map["right_foot"]*3+3]])
  shoulder_points = np.array([points[p_map["left_shoulder"]*3:p_map["left_shoulder"]*3+3],points[p_map["right_shoulder"]*3:p_map["right_shoulder"]*3+3]])
  if sum(foot_points[:,2]) < 0.1 or sum(shoulder_points[:,2]) < 0.1:
    return [-1,-1,-1],[-1,-1],[-1,-1]

  foot_conf = foot_points[:,2]/sum(foot_points[:,2])
  shoulder_conf = shoulder_points[:,2]/sum(shoulder_points[:,2])

  avg_foot = foot_points[0,:2]*foot_conf[0] + foot_points[1,:2]*foot_conf[1]
  avg_shoulder = shoulder_points[0,:2]*shoulder_conf[0] + shoulder_points[1,:2]*shoulder_conf[1]
  return [sum(abs(avg_foot-avg_shoulder)),abs(avg_foot[0]-avg_shoulder[0]),abs(avg_foot[1]-avg_shoulder[1])],avg_foot,avg_shoulder

  



In [None]:
def add_distance_chart(nimg,shoulder_to_foot_dist,idx):
      thickness = 5
      gap = thickness*2

      cv2.line(nimg, [0,0+3*idx*gap], [int(shoulder_to_foot_dist[0]),3*idx*gap], (255,0,255), thickness) 
      cv2.line(nimg, [0,gap+3*idx*gap], [int(shoulder_to_foot_dist[1]),gap+3*idx*gap], (0,0,255), thickness) 
      cv2.line(nimg, [0,2*gap+3*idx*gap], [int(shoulder_to_foot_dist[2]),2*gap+3*idx*gap], (255,0,0), thickness) 
      return nimg
def draw_posture_lines(nimg,standing_line,current_line):
  thickness = 5
  cv2.line(nimg, (100,100), np.array((100,100)+standing_line*[100,100]).astype(int),(0,0,0), thickness)
  cv2.line(nimg, (200,100), np.array((200,100)+current_line*[100,100]).astype(int),(0,0,0), thickness)





In [None]:
standing_angle = []
standing_std = None
standing_mean = None

In [None]:
standing_mean = np.mean(standing_angle,axis = 0)
standing_std = np.std(standing_angle,axis = 0)
print(standing_mean,standing_std)

nan nan


  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
  ret = _var(a, axis=axis, dtype=dtype, out=out, ddof=ddof,
  arrmean = um.true_divide(arrmean, div, out=arrmean, casting='unsafe',
  ret = ret.dtype.type(ret / rcount)


In [None]:
def draw_keypoints(output, image,set_standing_angle,p_img):
  global standing_angle
  output = non_max_suppression_kpt(output, 
                                     0.25, # Confidence Threshold
                                     0.65, # IoU Threshold
                                     nc=pose_model.yaml['nc'], # Number of Classes
                                     nkpt=pose_model.yaml['nkpt'], # Number of Keypoints
                                     kpt_label=True)
  
  with torch.no_grad():
        output = output_to_keypoint(output)
  #print("output after",output.shape,output)
  nimg = image[0].permute(1, 2, 0) * 255
  nimg = nimg.cpu().numpy().astype(np.uint8)
  nimg = cv2.cvtColor(nimg, cv2.COLOR_RGB2BGR)
  #print('nimg',nimg.shape)
  if p_img and len(output.shape) > 1:
    print("output shape",output.shape)
    print(output.shape)
    bboxs = get_bbox_from_skeleton(output[:, 7:])
    p_img.replace_boxes(nimg, bboxs)
    nimg = p_img.image
  return nimg


  for idx in range(output.shape[0]):

      plot_skeleton_kpts(nimg, output[idx, 7:].T, 3,write_number = True)
      shoulder_to_foot_dist, foot_point, shoulder_point = calc_shoulder_to_foot_dist(output[idx, 7:].T)
      if foot_point[0] == -1:
        continue
      #nimg = add_distance_chart(nimg,shoulder_to_foot_dist,idx)
      current_angle = calc_current_angle(foot_point,shoulder_point)
      if set_standing_angle:
        standing_angle.append(current_angle)
      else:
        angle_diff = (current_angle - standing_mean)**2
        print("angle diff",angle_diff)
        draw_posture_lines(nimg,standing_mean,current_angle)
        if angle_diff[0] > standing_std[0] or angle_diff[1] > standing_std[1]:
          cv2.line(nimg, [300,30], [300,60], (0,255,0), 10) 
        else:
          cv2.line(nimg, [300,30], [300,60], (255,255,255), 10) 


    
  
  
    
  





  #calc standing from example video
  #compare to laying down cangle
  #comapre to sitting angle adding waist -- these weights could be learned (?)


      #print("test",shoulder_to_foot_dist)
      #print("skeleton point", idx,output[idx, 7:].T, len(output[idx, 7:]) )

  return nimg

In [None]:
def calc_current_angle(foot_point,shoulder_point):
  #print(foot_point,shoulder_point)
  vec = abs(foot_point-shoulder_point)/sum(abs(foot_point-shoulder_point))
  #print(vec)
  return vec



In [None]:
standing_angle = []
standing_std = None
standing_mean = None
pose_estimation_video('/content/drive/MyDrive/Alarm/innovation week/walking.mov','/content/drive/MyDrive/Alarm/innovation week/save/standing_output.mp4')


In [None]:
pose_estimation_video('/content/drive/MyDrive/Alarm/innovation week/privacy_test.mov','/content/drive/MyDrive/Alarm/innovation week/save/privacy_test.mp4')


NameError: ignored

In [16]:
class private_image():
  def __init__(self):
    self.image = []

  def replace_boxes(self,new_image,bboxes):
    #print(bboxes)
    if len(self.image) == 0:
      self.image  = new_image*0
    for bbox in bboxes:
      x1,y1,x2,y2 = bbox
      #ensure all values are inside image
      x1 = max(0, x1)
      y1 = max(0, y1)
      x2 = min(self.image.shape[1], x2)
      y2 = min(self.image.shape[0], y2)
      new_image[y1:y2,x1:x2] =  self.image[y1:y2,x1:x2]
      self.image = new_image

def get_bbox_from_skeleton(xy_lists):
    print("xy list",xy_lists.shape)
    bboxes = []
    for list in xy_lists:
        x_list = []
        y_list = []
        for i in range(0, len(list), 3):
            x_list.append(list[i])
            y_list.append(list[i+1])
        bboxes.append([int(min(x_list))-100, int(min(y_list))-100, int(max(x_list))+100, int(max(y_list))+100])
    return bboxes


In [None]:
parse_video('/content/drive/MyDrive/Alarm/innovation week/privacy_test.mov','/content/drive/MyDrive/Alarm/innovation week/save/REFACTOR_TEST.mp4')

In [None]:
def pose_estimation_video(filename,save_file = None,privacy = True):
    global standing_angle, standing_mean, standing_std
    set_standing_angle = len(standing_angle) == 0
    if set_standing_angle:
      print("SETTING STANDING ANGLE")
    else:
      print("CALCULATING CURRENT ANGLE")


    cap = cv2.VideoCapture(filename)
    # VideoWriter for saving the video
    fourcc = cv2.VideoWriter_fourcc(*'MP4V')
    if not save_file:
      save_file =  'test_output.mp4'
    out = cv2.VideoWriter(save_file, fourcc, 30.0, (int(cap.get(3)), int(cap.get(4))))
    count =0
    if privacy:
      p_img = private_image() 
    else:
      p_img = None
    print('starting')
    while cap.isOpened():
        
        (ret, frame) = cap.read()
        count +=1
        #print(count)
        if ret == True:
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

            output, frame = run_inference(frame)
            
  

            frame = draw_keypoints(output, frame,set_standing_angle,p_img)
            frame = cv2.resize(frame, (int(cap.get(3)), int(cap.get(4))))
            out.write(frame)
            #cv2.imshow('Pose estimation', frame)
        else:
            break
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    if set_standing_angle:
      standing_angle = np.array(standing_angle)
      standing_mean = np.mean(standing_angle,axis = 0)
      standing_std = np.std(standing_angle,axis = 0)


    cap.release()
    out.release()
    cv2.destroyAllWindows()