# Openpose light weight Test
 - https://github.com/Daniil-Osokin/lightweight-human-pose-estimation.pytorch

# lightweight-human-pose-estimation.pytorch Setting

In [None]:
%cd /content/

import os
from os.path import exists, join, basename, splitext

git_repo_url = 'https://github.com/Daniil-Osokin/lightweight-human-pose-estimation.pytorch.git'
git_path = '/content/lightweight-human-pose-estimation.pytorch'
pre_model_path = os.path.join(git_path,'pre_model')

project_name = splitext(basename(git_repo_url))[0]
if not exists(project_name):
  !git clone -q --depth 1 $git_repo_url
  !wget "http://images.cocodataset.org/annotations/annotations_trainval2017.zip" -P $git_path
  !wget "https://download.01.org/opencv/openvino_training_extensions/models/human_pose_estimation/checkpoint_iter_370000.pth" -P $pre_model_path
  !mkdir coco && unzip "/content/lightweight-human-pose-estimation.pytorch/annotations_trainval2017.zip" -d "/content/coco/"

%cd $git_path

In [None]:
!pip install -r requirements.txt

# Demo - mp4
 - colab ( demo_test.py )

In [1]:
!rm -f demo_test.py

## create demo_test.py 

In [None]:
%%writefile demo_test.py
import argparse

import cv2
import numpy as np
import torch

from models.with_mobilenet import PoseEstimationWithMobileNet
from modules.keypoints import extract_keypoints, group_keypoints
from modules.load_state import load_state
from modules.pose import Pose, track_poses
from val import normalize, pad_width


class ImageReader(object):
    def __init__(self, file_names):
        self.file_names = file_names
        self.max_idx = len(file_names)

    def __iter__(self):
        self.idx = 0
        return self

    def __next__(self):
        if self.idx == self.max_idx:
            raise StopIteration
        img = cv2.imread(self.file_names[self.idx], cv2.IMREAD_COLOR)
        if img.size == 0:
            raise IOError('Image {} cannot be read'.format(self.file_names[self.idx]))
        self.idx = self.idx + 1
        return img


class VideoReader(object):
    def __init__(self, file_name):
        self.file_name = file_name
        try:  # OpenCV needs int to read from webcam
            self.file_name = int(file_name)
        except ValueError:
            pass

    def __iter__(self):
        self.cap = cv2.VideoCapture(self.file_name)
        if not self.cap.isOpened():
            raise IOError('Video {} cannot be opened'.format(self.file_name))
        return self

    def __next__(self):
        was_read, img = self.cap.read()
        if not was_read:
            raise StopIteration
        return img


def infer_fast(net, img, net_input_height_size, stride, upsample_ratio, cpu,
               pad_value=(0, 0, 0), img_mean=(128, 128, 128), img_scale=1/256):
    height, width, _ = img.shape
    scale = net_input_height_size / height

    scaled_img = cv2.resize(img, (0, 0), fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC)
    scaled_img = normalize(scaled_img, img_mean, img_scale)
    min_dims = [net_input_height_size, max(scaled_img.shape[1], net_input_height_size)]
    padded_img, pad = pad_width(scaled_img, stride, pad_value, min_dims)

    tensor_img = torch.from_numpy(padded_img).permute(2, 0, 1).unsqueeze(0).float()
    if not cpu:
        tensor_img = tensor_img.cuda()

    stages_output = net(tensor_img)

    stage2_heatmaps = stages_output[-2]
    heatmaps = np.transpose(stage2_heatmaps.squeeze().cpu().data.numpy(), (1, 2, 0))
    heatmaps = cv2.resize(heatmaps, (0, 0), fx=upsample_ratio, fy=upsample_ratio, interpolation=cv2.INTER_CUBIC)

    stage2_pafs = stages_output[-1]
    pafs = np.transpose(stage2_pafs.squeeze().cpu().data.numpy(), (1, 2, 0))
    pafs = cv2.resize(pafs, (0, 0), fx=upsample_ratio, fy=upsample_ratio, interpolation=cv2.INTER_CUBIC)

    return heatmaps, pafs, scale, pad


def run_demo(net, image_provider, height_size, cpu, track, smooth):
    net = net.eval()
    if not cpu:
        net = net.cuda()

    stride = 8
    upsample_ratio = 4
    num_keypoints = Pose.num_kpts
    previous_poses = []
    delay = 33

    frame = image_provider.__iter__().__next__()
    vid_writer = cv2.VideoWriter('output.avi',cv2.VideoWriter_fourcc('M','J','P','G'), 10, (frame.shape[1],frame.shape[0]))

    for img in image_provider:
        orig_img = img.copy()
        heatmaps, pafs, scale, pad = infer_fast(net, img, height_size, stride, upsample_ratio, cpu)

        total_keypoints_num = 0
        all_keypoints_by_type = []
        for kpt_idx in range(num_keypoints):  # 19th for bg
            total_keypoints_num += extract_keypoints(heatmaps[:, :, kpt_idx], all_keypoints_by_type, total_keypoints_num)

        pose_entries, all_keypoints = group_keypoints(all_keypoints_by_type, pafs, demo=True)
        for kpt_id in range(all_keypoints.shape[0]):
            all_keypoints[kpt_id, 0] = (all_keypoints[kpt_id, 0] * stride / upsample_ratio - pad[1]) / scale
            all_keypoints[kpt_id, 1] = (all_keypoints[kpt_id, 1] * stride / upsample_ratio - pad[0]) / scale
        current_poses = []
        for n in range(len(pose_entries)):
            if len(pose_entries[n]) == 0:
                continue
            pose_keypoints = np.ones((num_keypoints, 2), dtype=np.int32) * -1
            for kpt_id in range(num_keypoints):
                if pose_entries[n][kpt_id] != -1.0:  # keypoint was found
                    pose_keypoints[kpt_id, 0] = int(all_keypoints[int(pose_entries[n][kpt_id]), 0])
                    pose_keypoints[kpt_id, 1] = int(all_keypoints[int(pose_entries[n][kpt_id]), 1])
            pose = Pose(pose_keypoints, pose_entries[n][18])
            current_poses.append(pose)

        if track:
            track_poses(previous_poses, current_poses, smooth=smooth)
            previous_poses = current_poses
        for pose in current_poses:
            pose.draw(img)
        img = cv2.addWeighted(orig_img, 0.6, img, 0.4, 0)
        for pose in current_poses:
            cv2.rectangle(img, (pose.bbox[0], pose.bbox[1]),
                          (pose.bbox[0] + pose.bbox[2], pose.bbox[1] + pose.bbox[3]), (0, 255, 0))
            if track:
                cv2.putText(img, 'id: {}'.format(pose.id), (pose.bbox[0], pose.bbox[1] - 16),
                            cv2.FONT_HERSHEY_COMPLEX, 0.5, (0, 0, 255))
        #cv2.imshow('Lightweight Human Pose Estimation Python Demo', img)
        vid_writer.write(img)
        key = cv2.waitKey(delay)
        if key == 27:  # esc
            return
        elif key == 112:  # 'p'
            if delay == 33:
                delay = 0
            else:
                delay = 33
    vid_writer.release()

if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        description='''Lightweight human pose estimation python demo.
                       This is just for quick results preview.
                       Please, consider c++ demo for the best performance.''')
    parser.add_argument('--checkpoint-path', type=str, required=True, help='path to the checkpoint')
    parser.add_argument('--height-size', type=int, default=256, help='network input layer height size')
    parser.add_argument('--video', type=str, default='', help='path to video file or camera id')
    parser.add_argument('--images', nargs='+', default='', help='path to input image(s)')
    parser.add_argument('--cpu', action='store_true', help='run network inference on cpu')
    parser.add_argument('--track', type=int, default=1, help='track pose id in video')
    parser.add_argument('--smooth', type=int, default=1, help='smooth pose keypoints')
    args = parser.parse_args()

    if args.video == '' and args.images == '':
        raise ValueError('Either --video or --image has to be provided')

    net = PoseEstimationWithMobileNet()
    checkpoint = torch.load(args.checkpoint_path, map_location='cpu')
    load_state(net, checkpoint)

    frame_provider = ImageReader(args.images)
    if args.video != '':
        frame_provider = VideoReader(args.video)
    else:
        args.track = 0

    run_demo(net, frame_provider, args.height_size, args.cpu, args.track, args.smooth)


## run demo_test.py
 - upload video
 - run

In [None]:
%%shell
rm -rf videos
mkdir videos; cd videos
curl -LJO https://github.com/jeeenn/SportsFriends/raw/master/Data/yogaFriend_demo.mp4  > test.mp4
cd videos
mkdir save

In [None]:
# GPU
!python demo_test.py --checkpoint-path $pre_model_path/checkpoint_iter_370000.pth --video '/content/lightweight-human-pose-estimation.pytorch/videos/yogaFriend_demo.mp4'
# CPU
#!python demo_test.py --checkpoint-path $pre_model_path/checkpoint_iter_370000.pth --cpu --video '/content/lightweight-human-pose-estimation.pytorch/videos/yogaFriend_demo.mp4'

In [None]:
!ffmpeg -y -loglevel info -i 'output.avi' openpose.mp4

In [None]:
import io
import base64
from IPython.display import HTML
file_name = "openpose.mp4"
video_encoded = base64.b64encode(io.open(file_name, 'rb').read())
HTML(data='''<video width="{0}" height="{1}" alt="test" controls>
                      <source src="data:video/mp4;base64,{2}" type="video/mp4" />
                    </video>'''.format(640, 480, video_encoded.decode('ascii')))

# Real Time Pose Detection
  - colab ( ing )

In [None]:
import base64, logging
import numpy as np
from PIL import Image
from io import BytesIO

def data_uri_to_img(uri):
  try:
    image = base64.b64decode(uri.split(',')[1], validate=True)
    # make the binary image, a PIL image
    image = Image.open(BytesIO(image))
    # convert to numpy array
    image = np.array(image, dtype=np.uint8); 
    return image
  except Exception as e:
    logging.exception(e);print('\n')
    return None

def video_to_data_url(filename):
    ext = filename.split('.')[-1]
    prefix = 'data:video/{};base64,'.format(ext)
    with open(filename, 'rb') as f:
        vidoe = f.read()
    return prefix + base64.b64encode(vidoe).decode()


In [None]:
import cv2 as cv
import torch

from models.with_mobilenet import PoseEstimationWithMobileNet
from modules.keypoints import extract_keypoints, group_keypoints
from modules.load_state import load_state
from modules.pose import Pose, track_poses
from val import normalize, pad_width

def infer_fast(net, img, net_input_height_size, stride, upsample_ratio, cpu,
               pad_value=(0, 0, 0), img_mean=(128, 128, 128), img_scale=1/256):
    height, width, _ = img.shape
    scale = net_input_height_size / height

    scaled_img = cv.resize(img, (0, 0), fx=scale, fy=scale, interpolation=cv.INTER_CUBIC)
    scaled_img = normalize(scaled_img, img_mean, img_scale)
    min_dims = [net_input_height_size, max(scaled_img.shape[1], net_input_height_size)]
    padded_img, pad = pad_width(scaled_img, stride, pad_value, min_dims)

    tensor_img = torch.from_numpy(padded_img).permute(2, 0, 1).unsqueeze(0).float()
    if not cpu:
        tensor_img = tensor_img.cuda()

    stages_output = net(tensor_img)

    stage2_heatmaps = stages_output[-2]
    heatmaps = np.transpose(stage2_heatmaps.squeeze().cpu().data.numpy(), (1, 2, 0))
    heatmaps = cv2.resize(heatmaps, (0, 0), fx=upsample_ratio, fy=upsample_ratio, interpolation=cv2.INTER_CUBIC)

    stage2_pafs = stages_output[-1]
    pafs = np.transpose(stage2_pafs.squeeze().cpu().data.numpy(), (1, 2, 0))
    pafs = cv2.resize(pafs, (0, 0), fx=upsample_ratio, fy=upsample_ratio, interpolation=cv2.INTER_CUBIC)

    return heatmaps, pafs, scale, pad


In [None]:
def detect_key_point(frame):
  #(frameWidth, frameHeight) = frame.shape[:2]
  #inp = cv.dnn.blobFromImage(frame, inScale, (inWidth, inHeight),
  #                            (0, 0, 0), swapRB=False, crop=False)
  
  model_path = os.path.join(pre_model_path,'checkpoint_iter_370000.pth')
  height_size = 1  
  cpu = True

  net = PoseEstimationWithMobileNet()
  checkpoint = torch.load(model_path, map_location='cpu')
  load_state(net, checkpoint)
  #net = cv.dnn.readNetFromModelOptimizer(protoFile, weightsFile)
  net = net.eval()
  if not cpu:
      net = net.cuda()

  stride = 8
  upsample_ratio = 4
  num_keypoints = Pose.num_kpts
  previous_poses = []
  delay = 33
  orig_img = img.copy()
  heatmaps, pafs, scale, pad = infer_fast(net, frame, height_size, stride, upsample_ratio, cpu)

  total_keypoints_num = 0
  all_keypoints_by_type = []
  for kpt_idx in range(num_keypoints):  # 19th for bg
      total_keypoints_num += extract_keypoints(heatmaps[:, :, kpt_idx], all_keypoints_by_type, total_keypoints_num)

  pose_entries, all_keypoints = group_keypoints(all_keypoints_by_type, pafs, demo=True)
  for kpt_id in range(all_keypoints.shape[0]):
      all_keypoints[kpt_id, 0] = (all_keypoints[kpt_id, 0] * stride / upsample_ratio - pad[1]) / scale
      all_keypoints[kpt_id, 1] = (all_keypoints[kpt_id, 1] * stride / upsample_ratio - pad[0]) / scale
  current_poses = []
  for n in range(len(pose_entries)):
      if len(pose_entries[n]) == 0:
          continue
      pose_keypoints = np.ones((num_keypoints, 2), dtype=np.int32) * -1
      for kpt_id in range(num_keypoints):
          if pose_entries[n][kpt_id] != -1.0:  # keypoint was found
              pose_keypoints[kpt_id, 0] = int(all_keypoints[int(pose_entries[n][kpt_id]), 0])
              pose_keypoints[kpt_id, 1] = int(all_keypoints[int(pose_entries[n][kpt_id]), 1])
      pose = Pose(pose_keypoints, pose_entries[n][18])
      current_poses.append(pose)

      if track:
          track_poses(previous_poses, current_poses, smooth=smooth)
          previous_poses = current_poses
      for pose in current_poses:
          pose.draw(img)
      img = cv.addWeighted(orig_img, 0.6, img, 0.4, 0)
      for pose in current_poses:
          cv.rectangle(img, (pose.bbox[0], pose.bbox[1]),
                        (pose.bbox[0] + pose.bbox[2], pose.bbox[1] + pose.bbox[3]), (0, 255, 0))
          if track:
              cv.putText(img, 'id: {}'.format(pose.id), (pose.bbox[0], pose.bbox[1] - 16),
                          cv.FONT_HERSHEY_COMPLEX, 0.5, (0, 0, 255))


In [None]:
from IPython.display import display, Javascript
from google.colab.output import eval_js
from base64 import b64decode

def take_photo(filename='photo.jpg', quality=0.8):
  js = Javascript('''
    async function takePhoto(quality) {
      const div = document.createElement('div');
      const capture = document.createElement('button');
      capture.textContent = 'Capture';
      div.appendChild(capture);

      const video = document.createElement('video');
      video.style.display = 'block';
      const stream = await navigator.mediaDevices.getUserMedia({video: true});

      document.body.appendChild(div);
      div.appendChild(video);
      video.srcObject = stream;
      await video.play();

      // Resize the output to fit the video element.
      google.colab.output.setIframeHeight(document.documentElement.scrollHeight, true);

      // Wait for Capture to be clicked.
      await new Promise((resolve) => capture.onclick = resolve);

      const canvas = document.createElement('canvas');
      canvas.width = video.videoWidth;
      canvas.height = video.videoHeight;
      canvas.getContext('2d').drawImage(video, 0, 0);
      stream.getVideoTracks()[0].stop();

      for (let i = 0; isOpened; i++) {
        canvas.getContext("2d").clearRect(0, 0, canvas.width, canvas.height);
        canvas.getContext('2d').drawImage(video, 0, 0);
        img = canvas.toDataURL('image/jpeg', quality);

        // jsLog(i + "sending");
        // Call a python function and send this image
        google.colab.kernel.invokeFunction('notebook.run_PoseDetection', [img], {});
        // jsLog(i + "SENT");

        // wait for X miliseconds second, before next capture
        await new Promise(resolve => setTimeout(resolve, 250));        
      }       

      div.remove();
      return canvas.toDataURL('image/jpeg', quality);
    }
    ''')
  display(js)
  data = eval_js('takePhoto({})'.format(quality))
  binary = b64decode(data.split(',')[1])
  with open(filename, 'wb') as f:
    f.write(binary)
  return filename

In [None]:
from google.colab import output
frame_count = 0
writer = None

# InvokeFunction
# takes the numpy image and runs detection, then shows the results by visualizing
def run_PoseDetection(uri): 
  global frame_count, writer

  image = data_uri_to_img(uri)     
  if writer is None:		
      fourcc = cv.VideoWriter_fourcc(*'DIVX')  
      writer = cv.VideoWriter(outVideo, fourcc, 2, (image.shape[1], image.shape[0]), True)
  try:        
    detect_key_point(image)

    image = cv.cvtColor(image, cv.COLOR_BGR2RGB) 
    frame_count+=1    
    name = '{0}.jpg'.format(frame_count)
    name = os.path.join(VIDEO_SAVE_DIR, name)
    cv.imwrite(name, image)

    if writer is not None:
      writer.write(image)
  except Exception as e:
    logging.exception(e)
    print('\n')

# register this function, so JS code could call this
output.register_callback('notebook.run_PoseDetection', run_PoseDetection)

In [None]:
import os

VIDEO_DIR = os.path.join(git_path, "videos")
VIDEO_SAVE_DIR = os.path.join(VIDEO_DIR, "save")

In [None]:
inVideo = os.path.join(VIDEO_DIR, "test.mp4")
outVideo= os.path.join(VIDEO_DIR, "out.avi")

In [None]:
%%shell
mkdir videos; cd videos
curl -LJO https://github.com/jeeenn/SportsFriends/raw/master/Data/yogaFriend_demo.mp4  > test.mp4
cd videos
mkdir save

In [None]:
frame_count = 0
data_url = video_to_data_url(inVideo)
try: 
  # put the JS code in cell and run it
  take_photo(data_url)    
  if writer is not None:
    writer.release()   
except Exception as e:
  logging.exception(e)
  print('\n')

writer = None