<a href="https://colab.research.google.com/github/Victoooooor/SimpleJobs/blob/main/movenet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#@title
!pip install -q imageio
!pip install -q opencv-python
!pip install -q git+https://github.com/tensorflow/docs

In [None]:
#@title
import tensorflow as tf
import tensorflow_hub as hub
from tensorflow_docs.vis import embed
import numpy as np
import cv2
import os
# Import matplotlib libraries
from matplotlib import pyplot as plt
from matplotlib.collections import LineCollection
import matplotlib.patches as patches
import imageio
from IPython.display import HTML, display
from google.colab import files
import sys
import time
import shutil
from google.colab.patches import cv2_imshow
import copy
from base64 import b64encode

In [None]:
#@title
KEYPOINT_DICT = {
    'nose': 0,
    'left_eye': 1,
    'right_eye': 2,
    'left_ear': 3,
    'right_ear': 4,
    'left_shoulder': 5,
    'right_shoulder': 6,
    'left_elbow': 7,
    'right_elbow': 8,
    'left_wrist': 9,
    'right_wrist': 10,
    'left_hip': 11,
    'right_hip': 12,
    'left_knee': 13,
    'right_knee': 14,
    'left_ankle': 15,
    'right_ankle': 16
}

# Maps bones to a matplotlib color name.
KEYPOINT_EDGE_INDS_TO_COLOR = {
    (0, 1): 'm',
    (0, 2): 'c',
    (1, 3): 'm',
    (2, 4): 'c',
    (0, 5): 'm',
    (0, 6): 'c',
    (5, 7): 'm',
    (7, 9): 'm',
    (6, 8): 'c',
    (8, 10): 'c',
    (5, 6): 'y',
    (5, 11): 'm',
    (6, 12): 'c',
    (11, 12): 'y',
    (11, 13): 'm',
    (13, 15): 'm',
    (12, 14): 'c',
    (14, 16): 'c'
}

def _keypoints_and_edges_for_display(keypoints_with_scores,
                                     height,
                                     width,
                                     keypoint_threshold=0.11):
  """Returns high confidence keypoints and edges for visualization.

  Args:
    keypoints_with_scores: A numpy array with shape [1, 1, 17, 3] representing
      the keypoint coordinates and scores returned from the MoveNet model.
    height: height of the image in pixels.
    width: width of the image in pixels.
    keypoint_threshold: minimum confidence score for a keypoint to be
      visualized.

  Returns:
    A (keypoints_xy, edges_xy, edge_colors) containing:
      * the coordinates of all keypoints of all detected entities;
      * the coordinates of all skeleton edges of all detected entities;
      * the colors in which the edges should be plotted.
  """
  keypoints_all = []
  keypoint_edges_all = []
  edge_colors = []
  num_instances, _, _, _ = keypoints_with_scores.shape
  for idx in range(num_instances):
    kpts_x = keypoints_with_scores[0, idx, :, 1]
    kpts_y = keypoints_with_scores[0, idx, :, 0]
    kpts_scores = keypoints_with_scores[0, idx, :, 2]
    kpts_absolute_xy = np.stack(
        [width * np.array(kpts_x), height * np.array(kpts_y)], axis=-1)
    kpts_above_thresh_absolute = kpts_absolute_xy[
        kpts_scores > keypoint_threshold, :]
    keypoints_all.append(kpts_above_thresh_absolute)

    for edge_pair, color in KEYPOINT_EDGE_INDS_TO_COLOR.items():
      if (kpts_scores[edge_pair[0]] > keypoint_threshold and
          kpts_scores[edge_pair[1]] > keypoint_threshold):
        x_start = kpts_absolute_xy[edge_pair[0], 0]
        y_start = kpts_absolute_xy[edge_pair[0], 1]
        x_end = kpts_absolute_xy[edge_pair[1], 0]
        y_end = kpts_absolute_xy[edge_pair[1], 1]
        line_seg = np.array([[x_start, y_start], [x_end, y_end]])
        keypoint_edges_all.append(line_seg)
        edge_colors.append(color)
  if keypoints_all:
    keypoints_xy = np.concatenate(keypoints_all, axis=0)
  else:
    keypoints_xy = np.zeros((0, 17, 2))

  if keypoint_edges_all:
    edges_xy = np.stack(keypoint_edges_all, axis=0)
  else:
    edges_xy = np.zeros((0, 2, 2))
  return keypoints_xy, edges_xy, edge_colors


def draw_prediction_on_image(
    image, keypoints_with_scores, crop_region=None, close_figure=False,
    output_image_height=None):
  """Draws the keypoint predictions on image.

  Args:
    image: A numpy array with shape [height, width, channel] representing the
      pixel values of the input image.
    keypoints_with_scores: A numpy array with shape [1, 1, 17, 3] representing
      the keypoint coordinates and scores returned from the MoveNet model.
    crop_region: A dictionary that defines the coordinates of the bounding box
      of the crop region in normalized coordinates (see the init_crop_region
      function below for more detail). If provided, this function will also
      draw the bounding box on the image.
    output_image_height: An integer indicating the height of the output image.
      Note that the image aspect ratio will be the same as the input image.

  Returns:
    A numpy array with shape [out_height, out_width, channel] representing the
    image overlaid with keypoint predictions.
  """
  height, width, channel = image.shape
  aspect_ratio = float(width) / height
  fig, ax = plt.subplots(figsize=(12 * aspect_ratio, 12))
  # To remove the huge white borders
  fig.tight_layout(pad=0)
  ax.margins(0)
  ax.set_yticklabels([])
  ax.set_xticklabels([])
  plt.axis('off')

  im = ax.imshow(image)
  line_segments = LineCollection([], linewidths=(4), linestyle='solid')
  ax.add_collection(line_segments)
  # Turn off tick labels
  scat = ax.scatter([], [], s=60, color='#FF1493', zorder=3)

  (keypoint_locs, keypoint_edges,
   edge_colors) = _keypoints_and_edges_for_display(
       keypoints_with_scores, height, width)

  line_segments.set_segments(keypoint_edges)
  line_segments.set_color(edge_colors)
  if keypoint_edges.shape[0]:
    line_segments.set_segments(keypoint_edges)
    line_segments.set_color(edge_colors)
  if keypoint_locs.shape[0]:
    scat.set_offsets(keypoint_locs)

  if crop_region is not None:
    xmin = max(crop_region['x_min'] * width, 0.0)
    ymin = max(crop_region['y_min'] * height, 0.0)
    rec_width = min(crop_region['x_max'], 0.99) * width - xmin
    rec_height = min(crop_region['y_max'], 0.99) * height - ymin
    rect = patches.Rectangle(
        (xmin,ymin),rec_width,rec_height,
        linewidth=1,edgecolor='b',facecolor='none')
    ax.add_patch(rect)

  fig.canvas.draw()
  image_from_plot = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
  image_from_plot = image_from_plot.reshape(
      fig.canvas.get_width_height()[::-1] + (3,))
  plt.close(fig)
  if output_image_height is not None:
    output_image_width = int(output_image_height / height * width)
    image_from_plot = cv2.resize(
        image_from_plot, dsize=(output_image_width, output_image_height),
         interpolation=cv2.INTER_CUBIC)
  return image_from_plot

def to_gif(images, fps):
  """Converts image sequence (4D numpy array) to gif."""
  imageio.mimsave('./animation.gif', images, fps=fps)
  return embed.embed_file('./animation.gif')

def progress(value, max=100):
  return HTML("""
      <progress
          value='{value}'
          max='{max}',
          style='width: 100%'
      >
          {value}
      </progress>
  """.format(value=value, max=max))

def show_video(video_path, video_width = 600):
   
  video_file = open(video_path, "r+b").read()
 
  video_url = f"data:video/mp4;base64,{b64encode(video_file).decode()}"
  return HTML(f"""<video width={video_width} controls><source src="{video_url}"></video>""")


# Load the input image.
def get_pose(image, thresh = 0.2):
  detection_threshold = thresh
  image = tf.expand_dims(image, axis=0)
  image_origin = copy.copy(image)
  image = tf.cast(tf.image.resize_with_pad(
      image, 256, 256), dtype=tf.int32)
  _, image_height, image_width, channel = image_origin.shape
  # print(image_height, image_width)

  if channel != 3:
    sys.exit('Image isn\'t in RGB format.')
  output = movenet(image)
  people = output['output_0'].numpy()[:, :, :51].reshape((6, 17, 3))


  if image_width > image_height:
    # print('scaling')
    dif = people - 0.5
    people[:,:,0] = 0.5 + image_width/image_height * dif[:,:,0]
  elif image_width < image_height:
    # print('scaling')
    dif = people - 0.5
    people[:,:,1] = 0.5 + image_height/image_width * dif[:,:,1]


  # Save landmarks if all landmarks were detected
  ppl = []
  for i in range(6):
    # print(output['output_0'][0, i, -1])
    if output['output_0'][0, i, -1] > detection_threshold:
      ppl.append(people[i])

  should_keep_image = len(ppl) > 0
  if not should_keep_image:
    print('No pose was confidentlly detected.')
  #draw all
  merged_img = np.squeeze(image_origin.numpy(), axis=0)
  
  for pp in ppl:
    merged_img = draw_prediction_on_image(
        merged_img, np.array([[pp]]), output_image_height=image_height)
  return merged_img, ppl

def get_vid(filename, fhandle, desti = 'processed.mp4', interval = 5):
  video_file = desti
  video = cv2.VideoCapture(filename)
  if not video.isOpened():
    sys.exit('video does not exist')
  fps = int(video.get(cv2.CAP_PROP_FPS))
  frame_num = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
  frame_width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
  frame_height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
  fourcc = cv2.VideoWriter_fourcc(*'mp4v')
  video_writer = cv2.VideoWriter(video_file,fourcc,fps,(frame_width,frame_height))
  print("Frames per second using video.get(cv2.CAP_PROP_FPS) : {0}".format(fps))
  
  frame_counter = 0
  while True:
      ret, frame = video.read()
      if ret == True:
          tfframe= tf.convert_to_tensor(frame)
          new_frame, data = get_pose(tfframe)
          video_writer.write(new_frame)

          if frame_counter % interval == 0:
            data=np.delete(data,2,2)
            data[:,:,[0,1]] = data[:,:,[1,0]]
            np.savetxt(fhandle, data.flatten(),
                       fmt='%.18e', newline=',')  
            fhandle.write(b"\n")
          frame_counter += 1
      if ret == False:
          break
  video.release()
  video_writer.release()
  cv2.destroyAllWindows()
  return video_file

In [None]:
#@title
model = hub.load("https://tfhub.dev/google/movenet/multipose/lightning/1")
movenet = model.signatures['serving_default']


In [None]:
#params
interval = 5 #meaning save to csv every 5 frames
uploaded = files.upload()
filename = next(iter(uploaded))

In [None]:
#@title
text_name = 'pose.csv'
try:
  os.remove(text_name)
except:
  None
with open(text_name, "ab") as csv:
    # numpy.savetxt(csv, a)
  gen = get_vid(filename, csv, interval = interval)
  csv.close()
audiofile = '_sound.mp3'
withsound = 'output.mp4'
!ffmpeg -i {filename} -f mp3 -ab 192000 -vn {audiofile}
!ffmpeg -i {gen} -i {audiofile} -map 0:0 -map 1:0 -c:v copy -c:a copy {withsound}
!zip -r file.zip {text_name} {withsound}
files.download('file.zip')

try:
  os.remove('file.zip')
  os.remove(text_name)
  os.remove(filename)
  os.remove(audiofile)
  os.remove(gen)
  os.remove(withsound)
except:
  None