# 姿勢解析プログラム

In [2]:
### 必要なライブラリのインポート

In [3]:
import tensorflow as tf
import tensorflow_hub as hub
from tensorflow_docs.vis import embed
import numpy as np
import cv2

# Import matplotlib libraries
from matplotlib import pyplot as plt
from matplotlib.collections import LineCollection
import matplotlib.patches as patches

# Some modules to display an animation using imageio
import imageio
from IPython.display import HTML, display

import math

from PIL import Image
from os.path import dirname,basename

import moviepy.editor as mp

### Helper functions for visualization from Movenet tutorial

https://www.tensorflow.org/hub/tutorials/movenet?hl=ja#helper_functions_for_visualization

In [4]:
# Dictionary that maps from joint names to keypoint indices.
KEYPOINT_DICT = {
    'nose': 0,
    'left_eye': 1,
    'right_eye': 2,
    'left_ear': 3,
    'right_ear': 4,
    'left_shoulder': 5,
    'right_shoulder': 6,
    'left_elbow': 7,
    'right_elbow': 8,
    'left_wrist': 9,
    'right_wrist': 10,
    'left_hip': 11,
    'right_hip': 12,
    'left_knee': 13,
    'right_knee': 14,
    'left_ankle': 15,
    'right_ankle': 16
}

# Maps bones to a matplotlib color name.
KEYPOINT_EDGE_INDS_TO_COLOR = {
    (0, 1): 'm',
    (0, 2): 'c',
    (1, 3): 'm',
    (2, 4): 'c',
    (0, 5): 'm',
    (0, 6): 'c',
    (5, 7): 'm',
    (7, 9): 'm',
    (6, 8): 'c',
    (8, 10): 'c',
    (5, 6): 'y',
    (5, 11): 'm',
    (6, 12): 'c',
    (11, 12): 'y',
    (11, 13): 'm',
    (13, 15): 'm',
    (12, 14): 'c',
    (14, 16): 'c'
}

def _keypoints_and_edges_for_display(keypoints_with_scores,
                                     height,
                                     width,
                                     keypoint_threshold=0.11):
  """Returns high confidence keypoints and edges for visualization.

  Args:
    keypoints_with_scores: A numpy array with shape [1, 1, 17, 3] representing
      the keypoint coordinates and scores returned from the MoveNet model.
    height: height of the image in pixels.
    width: width of the image in pixels.
    keypoint_threshold: minimum confidence score for a keypoint to be
      visualized.

  Returns:
    A (keypoints_xy, edges_xy, edge_colors) containing:
      * the coordinates of all keypoints of all detected entities;
      * the coordinates of all skeleton edges of all detected entities;
      * the colors in which the edges should be plotted.
  """
  keypoints_all = []
  keypoint_edges_all = []
  edge_colors = []
  num_instances, _, _, _ = keypoints_with_scores.shape
  for idx in range(num_instances):
    kpts_x = keypoints_with_scores[0, idx, :, 1]
    kpts_y = keypoints_with_scores[0, idx, :, 0]
    kpts_scores = keypoints_with_scores[0, idx, :, 2]
    kpts_absolute_xy = np.stack(
        [width * np.array(kpts_x), height * np.array(kpts_y)], axis=-1)
    kpts_above_thresh_absolute = kpts_absolute_xy[
        kpts_scores > keypoint_threshold, :]
    keypoints_all.append(kpts_above_thresh_absolute)

    for edge_pair, color in KEYPOINT_EDGE_INDS_TO_COLOR.items():
      if (kpts_scores[edge_pair[0]] > keypoint_threshold and
          kpts_scores[edge_pair[1]] > keypoint_threshold):
        x_start = kpts_absolute_xy[edge_pair[0], 0]
        y_start = kpts_absolute_xy[edge_pair[0], 1]
        x_end = kpts_absolute_xy[edge_pair[1], 0]
        y_end = kpts_absolute_xy[edge_pair[1], 1]
        line_seg = np.array([[x_start, y_start], [x_end, y_end]])
        keypoint_edges_all.append(line_seg)
        edge_colors.append(color)
  if keypoints_all:
    keypoints_xy = np.concatenate(keypoints_all, axis=0)
  else:
    keypoints_xy = np.zeros((0, 17, 2))

  if keypoint_edges_all:
    edges_xy = np.stack(keypoint_edges_all, axis=0)
  else:
    edges_xy = np.zeros((0, 2, 2))
  return keypoints_xy, edges_xy, edge_colors


def draw_prediction_on_image(
    image, keypoints_with_scores, crop_region=None, close_figure=False,
    output_image_height=None):
  """Draws the keypoint predictions on image.

  Args:
    image: A numpy array with shape [height, width, channel] representing the
      pixel values of the input image.
    keypoints_with_scores: A numpy array with shape [1, 1, 17, 3] representing
      the keypoint coordinates and scores returned from the MoveNet model.
    crop_region: A dictionary that defines the coordinates of the bounding box
      of the crop region in normalized coordinates (see the init_crop_region
      function below for more detail). If provided, this function will also
      draw the bounding box on the image.
    output_image_height: An integer indicating the height of the output image.
      Note that the image aspect ratio will be the same as the input image.

  Returns:
    A numpy array with shape [out_height, out_width, channel] representing the
    image overlaid with keypoint predictions.
  """
  height, width, channel = image.shape
  aspect_ratio = float(width) / height
  fig, ax = plt.subplots(figsize=(12 * aspect_ratio, 12))
  # To remove the huge white borders
  fig.tight_layout(pad=0)
  ax.margins(0)
  ax.set_yticklabels([])
  ax.set_xticklabels([])
  plt.axis('off')

  im = ax.imshow(image)
  line_segments = LineCollection([], linewidths=(4), linestyle='solid')
  ax.add_collection(line_segments)
  # Turn off tick labels
  scat = ax.scatter([], [], s=60, color='#FF1493', zorder=3)

  (keypoint_locs, keypoint_edges,
   edge_colors) = _keypoints_and_edges_for_display(
       keypoints_with_scores, height, width)

  line_segments.set_segments(keypoint_edges)
  line_segments.set_color(edge_colors)
  if keypoint_edges.shape[0]:
    line_segments.set_segments(keypoint_edges)
    line_segments.set_color(edge_colors)
  if keypoint_locs.shape[0]:
    scat.set_offsets(keypoint_locs)

  if crop_region is not None:
    xmin = max(crop_region['x_min'] * width, 0.0)
    ymin = max(crop_region['y_min'] * height, 0.0)
    rec_width = min(crop_region['x_max'], 0.99) * width - xmin
    rec_height = min(crop_region['y_max'], 0.99) * height - ymin
    rect = patches.Rectangle(
        (xmin,ymin),rec_width,rec_height,
        linewidth=1,edgecolor='b',facecolor='none')
    ax.add_patch(rect)

  fig.canvas.draw()
  image_from_plot = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
#  image_from_plot = np.frombuffer(fig.canvas.buffer_rgba(), dtype=np.uint8)
  image_from_plot = image_from_plot.reshape(
      fig.canvas.get_width_height()[::-1] + (3,))
  plt.close(fig)
  if output_image_height is not None:
    output_image_width = int(output_image_height / height * width)
    image_from_plot = cv2.resize(
        image_from_plot, dsize=(output_image_width, output_image_height),
         interpolation=cv2.INTER_CUBIC)
  return image_from_plot

def to_gif(images, duration):
  """Converts image sequence (4D numpy array) to gif."""
  imageio.mimsave('./tmp/animation.gif', images, duration=duration)
  return embed.embed_file('./tmp/animation.gif')

def progress(value, max=100):
  return HTML("""
      <progress
          value='{value}'
          max='{max}',
          style='width: 100%'
      >
          {value}
      </progress>
  """.format(value=value, max=max))


### Tensorflowのモデル読み込み+推論関数定義

In [5]:
# movenet_lightning
module = hub.load("https://tfhub.dev/google/movenet/singlepose/lightning/4")
input_size = 192

# movenet_thunder
# module = hub.load("https://tfhub.dev/google/movenet/singlepose/thunder/4")
# input_size = 256

def movenet(input_image):
    model = module.signatures['serving_default']
    # SavedMode format expects tensor type of int32
    input_image = tf.cast(input_image, dtype=tf.int32)
    # Run mode inference
    outputs = model(input_image)
    # Out put is a [1, 1, 17, 3] tensor.
    keypoints_with_scores = outputs['output_0'].numpy()
    return keypoints_with_scores



### mp4->animationGIF変換関数の定義

In [9]:
def mp4_to_frames(path, max_frame_per_sec=-1, do_resize=False, max_long_side=1000):
    cap = cv2.VideoCapture(path)
    
    if not cap.isOpened():
        return None,None
    
    # fpsを取得
    fps = cap.get(cv2.CAP_PROP_FPS)

    if max_frame_per_sec == -1:
        # 全フレームを対象とする
        per_frame = -1
        # フレームの間隔（ミリ秒）を計算
        dur = int(1000 / fps)
    else:
        # 何枚おきにフレームを取得するか計算
        per_frame = math.ceil(fps / max_frame_per_sec)
        # フレームの間隔（ミリ秒）を計算
        dur = int(1000 / fps * per_frame)
        
    print(f'per_frame:{per_frame}')
    
    dur = int(1000 / fps * per_frame)
    print(f'dur:{dur}')
    
    frames = []
    i = 0
    while True:
        ret, frame = cap.read()
        if ret:
            # フレームの間引き
            if (i % per_frame) != 0:
                i += 1
                continue
            
            # BGRをRGBに変換
            rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            pillow_image = Image.fromarray(rgb_frame)
            
            if do_resize:
                w, h = cal_resided_w_h(pillow_image.width,pillow_image.height,max_long_side)
                pillow_image = pillow_image.resize((w, h))
            frames.append(pillow_image)
            i += 1
        else:
            return frames,dur


def cal_resided_w_h(w,h,max_long_side):
    if w > max_long_side or h > max_long_side:
        if w > h:
            ret_w = max_long_side
            ret_h = int(h * max_long_side / w)
        else:
            ret_h = max_long_side
            ret_w = int(w * max_long_side / h)
    else:
        ret_w, ret_h = w,h
    return ret_w, ret_h

def save_gif(frames, save_path, dur):
    # see also: https://pillow.readthedocs.io/en/stable/handbook/image-file-formats.html#gif
    frames[0].save(save_path, format='GIF', save_all=True, append_images=frames[1:],duration=dur, loop=0 )

def mp4_to_gif(mp4_path, out_path, max_frame_per_sec=10, do_resize=True, max_long_side=1000):
    frames, dur = mp4_to_frames(mp4_path, max_frame_per_sec, do_resize, max_long_side)
    save_gif(frames, out_path, dur)



### GIF->MP4変換関数の定義

In [15]:
def gif2mp4(input_gif, output_mp4):
    with mp.VideoFileClip(input_gif) as movie_file:
        movie_file.write_videofile(output_mp4)
        movie_file.close()


# アニメーションGIFの推論実行関数の定義

In [11]:
def pred(image_path):

    #image_path = 'images/dance_input.gif'
    #image_path ='images/pexels-photo-4384679.jpeg'
    images = tf.io.read_file(image_path)
    images = tf.image.decode_gif(images)
    
    # フレーム間隔を取得
    duration = Image.open(image_path).info['duration']
    cnt = images.shape[0]#[0,:,:,:]
    results = []
    result_frames = []
    for idx in range(cnt):
        # 推論向け画像変換
        current_frame = tf.expand_dims(images[idx], axis=0)
        # フレームサイズを取得
        _, h, w, _ = current_frame.shape

        # 推論向けに正方形にリサイズ
        resized_current_frame = tf.image.resize_with_pad(current_frame, input_size, input_size)
        # 推論の実行
        keypoints_with_scores = movenet(resized_current_frame)

        # 推論結果のスコアを保存
        results.append(keypoints_with_scores)
        
        # 推論結果を重ねるために長辺をベースに正方形にリサイズしてパディング
        resize_size = h if h > w else w
        display_image = tf.cast(tf.image.resize_with_pad(current_frame, resize_size, resize_size), dtype=tf.int32)
        output_overlay = draw_prediction_on_image(np.squeeze(display_image.numpy(), axis=0), keypoints_with_scores)
    
        result_frames.append(output_overlay)
        
    output = np.stack(result_frames, axis=0)
    to_gif(output, duration=duration)
    # save_gif(output, '/tmp/animation.gif', duration)

    return output, results



# 入力のMP4をアニメーションGifに変換（リサイズ含む）

In [12]:
%%time
mp4_path = 'images/PXL_20240121_053909046.TS.mp4'
gif_path = dirname(mp4_path) + '/' + basename(mp4_path).split('.')[0]+'.gif'
mp4_to_gif(mp4_path, gif_path,do_resize=False)

per_frame:3
dur:100
CPU times: user 51.5 s, sys: 15.1 s, total: 1min 6s
Wall time: 53.5 s


# アニメーションGIFを推論し、姿勢情報を取得

In [13]:
%%time
output, keypoints_with_scores = pred(gif_path)

  image_from_plot = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)

  image_from_plot = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)

  image_from_plot = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)

  image_from_plot = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)

  image_from_plot = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)

  image_from_plot = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)

  image_from_plot = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)

  image_from_plot = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)

  image_from_plot = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)

  image_from_plot = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)

  image_from_plot = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)

  image_from_plot = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)

  image_from_plot = np.frombuffer(fig.canvas.tostring_rgb(), dty

CPU times: user 1min 15s, sys: 1min 1s, total: 2min 16s
Wall time: 1min 45s


In [18]:
out_path = gif_path + '.mp4'
gif2mp4('tmp/animation.gif', out_path)

Moviepy - Building video images/PXL_20240121_053909046.gif.mp4.
Moviepy - Writing video images/PXL_20240121_053909046.gif.mp4



                                                            

Moviepy - Done !
Moviepy - video ready images/PXL_20240121_053909046.gif.mp4
