# Detect keyptoints on video with OpenPose

## Get access to data

- Mount GoogleDrive
- Update path to directories
- Read names from data.txt

In [12]:
# Необходимо загрузить данные на гугл диск и использовать их оттуда.
# You need to upload the data to GoogleDisk and use it from there.

from os.path import exists

INPUT_PATH = "new_data\\"
OUTPUT_PATH  = "output_new_data\\"

if not exists(OUTPUT_PATH):
    !mkdir {OUTPUT_PATH}

# Рабочая директория должны выглядеть следующим образом:
# The input directory have to be like:

# - /work_directory
#    - data.txt
#    - 1.mov
#    - 2.mov
#    ...

In [13]:
# Данные в data.txt должны быть разделены символом новой строки.
# Последняя строка должна быть пустой.

# Names in data.txt have to divide by '\n'.
# Last line have to be empty.

with open(INPUT_PATH + 'data.txt') as f:
    names = f.read().split('\n')[:-1]

names

['1.mp4', '2.mp4', '3.mp4', '4.mp4', '5.mp4', '6.mp4', '7.mp4', '8.mp4']

## Install OpenPose-pytorch

- Install ffmpeg
- Install [openPose-pythorch](https://github.com/prasunroy/openpose-pytorch)



In [3]:
!pip install ffmpeg-python
!pip install git+https://github.com/prasunroy/openpose-pytorch.git





Collecting git+https://github.com/prasunroy/openpose-pytorch.git
  Cloning https://github.com/prasunroy/openpose-pytorch.git to c:\users\андрей токарев\appdata\local\temp\pip-req-build-rqwrnehl
  Resolved https://github.com/prasunroy/openpose-pytorch.git to commit a8bc85685512537c5b024ee135bfa3eed487cefb
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'


  Running command git clone --filter=blob:none --quiet https://github.com/prasunroy/openpose-pytorch.git 'C:\Users\Андрей Токарев\AppData\Local\Temp\pip-req-build-rqwrnehl'


In [4]:
import cv2

import ffmpeg
import json

from openpose.body.estimator import BodyPoseEstimator
from openpose.utils import draw_body_connections, draw_keypoints

In [8]:
import subprocess
from typing import NamedTuple

class FFProbeResult(NamedTuple):
    return_code: int
    json: str
    error: str

def ffprobe(file_path) -> FFProbeResult:
    command_array = ["ffprobe",
                     "-v", "quiet",
                     "-print_format", "json",
                     "-show_format",
                     "-show_streams",
                     file_path]
    result = subprocess.run(command_array, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
    return FFProbeResult(return_code=result.returncode,
                         json=result.stdout,
                         error=result.stderr)


In [14]:
input_name = INPUT_PATH + names[0]
output_name = OUTPUT_PATH + names[0]

ffprobe_result = ffprobe(input_name)
info = json.loads(ffprobe_result.json)
videoinfo = [i for i in info["streams"] if i["codec_type"] == "video"][0]
input_fps = videoinfo["avg_frame_rate"]
input_pix_fmt = videoinfo["pix_fmt"]
input_vcodec = videoinfo["codec_name"]

In [15]:
from os import remove

class Writer():
    def __init__(self, output_file, input_fps, input_framesize, input_pix_fmt,
                 input_vcodec):
        if exists(output_file):
            remove(output_file)
        self.ff_proc = (
            ffmpeg
            .input('pipe:',
                   format='rawvideo',
                   pix_fmt="bgr24",
                   s='%sx%s'%(input_framesize[1],input_framesize[0]),
                   r=input_fps
                   )
            .output(output_file, pix_fmt=input_pix_fmt, vcodec=input_vcodec)
            .overwrite_output()
            .run_async(pipe_stdin=True)
        )

    def __call__(self, frame):
        self.ff_proc.stdin.write(frame.tobytes())

    def close(self):
        self.ff_proc.stdin.close()
        self.ff_proc.wait()

In [16]:
estimator = BodyPoseEstimator(pretrained=True)
videoclip = cv2.VideoCapture(input_name)

print_keypoints = True
writer = None
while videoclip.isOpened():
    flag, frame = videoclip.read()
    if not flag:
        break

    keypoints = estimator(frame)
    if print_keypoints:
        print(keypoints)
        print_keypoints = False

    frame = draw_body_connections(frame, keypoints, thickness=2, alpha=0.7)
    frame = draw_keypoints(frame, keypoints, radius=4, alpha=0.8)

    if writer is None:
        input_framesize = frame.shape[:2]
        writer = Writer(output_name, input_fps, input_framesize, input_pix_fmt,
                        input_vcodec)

    # cv2_imshow(frame)
    writer(frame)

    if cv2.waitKey(20) & 0xff == 27: # exit if pressed `ESC`
        break
videoclip.release()
writer.close()
cv2.destroyAllWindows()

[[[ 486  400    1]
  [ 543  450    1]
  [ 541  450    1]
  [ 457  534    1]
  [ 475  459    1]
  [ 552  454    1]
  [ 460  534    1]
  [ 467  432    1]
  [ 530  679    1]
  [ 576  821    1]
  [ 619  949    1]
  [ 518  682    1]
  [ 479  855    1]
  [ 458 1029    1]
  [   0    0    0]
  [ 494  383    1]
  [   0    0    0]
  [ 532  374    1]]]
