## YouTube Scraper, Video Extractor and MoveNet Processor

This notebook runs the function to infer pose keypoints from YouTube / any video using MoveNet and TensorFlow Lite. The result is a CSV consisting of 17 keypoints * 3 [x, y , score] dimensions 1

## Preparation

In this section, you'll import the necessary libraries and define several functions to scrape the YouTube Video and preprocess the frames

In [None]:
!pip install -q opencv-python

In [None]:
!pip install pytube@git+https://github.com/priyankaj1311/pytube.git@master_copy
from pytube import YouTube

Collecting pytube@ git+https://github.com/priyankaj1311/pytube.git@master_copy
  Cloning https://github.com/priyankaj1311/pytube.git (to revision master_copy) to /tmp/pip-install-rtw2fo6b/pytube_9f6eb9f9c6414bceb072cabf2bfac42b
  Running command git clone --filter=blob:none --quiet https://github.com/priyankaj1311/pytube.git /tmp/pip-install-rtw2fo6b/pytube_9f6eb9f9c6414bceb072cabf2bfac42b
  Running command git checkout -b master_copy --track origin/master_copy
  Switched to a new branch 'master_copy'
  Branch 'master_copy' set up to track remote branch 'master_copy' from 'origin'.
  Resolved https://github.com/priyankaj1311/pytube.git to commit 68468bd1c0a2d3cd4166d928f4f7f44913c1f77f
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pytube
  Building wheel for pytube (setup.py) ... [?25l[?25hdone
  Created wheel for pytube: filename=pytube-15.0.0-py3-none-any.whl size=57583 sha256=18b3854c979a533725db6b3cb64b59aaed9fd3d25c454a6b1311f1d3f0c

In [None]:
import csv
import cv2
import itertools
import numpy as np
import pandas as pd
import os
import sys
import tempfile
import tqdm

from matplotlib import pyplot as plt
from matplotlib.collections import LineCollection

import tensorflow as tf
import tensorflow_hub as hub
from tensorflow import keras
from IPython.display import Image, display

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

### Code to run pose estimation using MoveNet

In [None]:
#@title Functions to run pose estimation with MoveNet

#@markdown You'll download the MoveNet Thunder model from [TensorFlow Hub](https://www.google.com/url?sa=D&q=https%3A%2F%2Ftfhub.dev%2Fs%3Fq%3Dmovenet), and reuse some inference and visualization logic from the [MoveNet Raspberry Pi (Python)](https://github.com/tensorflow/examples/tree/master/lite/examples/pose_estimation/raspberry_pi) sample app to detect landmarks (ear, nose, wrist etc.) from the input images.

#@markdown *Note: You should use the most accurate pose estimation model (i.e. MoveNet Thunder) to detect the keypoints and use them to train the pose classification model to achieve the best accuracy. When running inference, you can use a pose estimation model of your choice (e.g. either MoveNet Lightning or Thunder).*

# Download model from TF Hub and check out inference code from GitHub
!wget -q -O movenet_thunder.tflite https://tfhub.dev/google/lite-model/movenet/singlepose/thunder/tflite/float16/4?lite-format=tflite
!git clone https://github.com/tensorflow/examples.git
pose_sample_rpi_path = os.path.join(os.getcwd(), 'examples/lite/examples/pose_estimation/raspberry_pi')
sys.path.append(pose_sample_rpi_path)

# Load MoveNet Thunder model
import utils
from data import BodyPart
from ml import Movenet
movenet = Movenet('movenet_thunder')

# Define function to run pose estimation using MoveNet Thunder.
# You'll apply MoveNet's cropping algorithm and run inference multiple times on
# the input image to improve pose estimation accuracy.
def detect(input_tensor, inference_count=3):
  """Runs detection on an input image.

  Args:
    input_tensor: A [height, width, 3] Tensor of type tf.float32.
      Note that height and width can be anything since the image will be
      immediately resized according to the needs of the model within this
      function.
    inference_count: Number of times the model should run repeatly on the
      same input image to improve detection accuracy.

  Returns:
    A Person entity detected by the MoveNet.SinglePose.
  """
  image_height, image_width, channel = input_tensor.shape

  # Detect pose using the full input image
  movenet.detect(input_tensor.numpy(), reset_crop_region=True)

  # Repeatedly using previous detection result to identify the region of
  # interest and only croping that region to improve detection accuracy
  for _ in range(inference_count - 1):
    person = movenet.detect(input_tensor.numpy(),
                            reset_crop_region=False)

  return person

Cloning into 'examples'...
remote: Enumerating objects: 23745, done.[K
remote: Total 23745 (delta 0), reused 0 (delta 0), pack-reused 23745[K
Receiving objects: 100% (23745/23745), 44.08 MiB | 22.53 MiB/s, done.
Resolving deltas: 100% (12948/12948), done.


In [None]:
#@title Functions to download YouTube video.
YouTubeVideo = "https://youtu.be/wYzGtkcttVE" #@param {type:"string"}
filename = "woozooDance" #@param {type:"string"}
video_path = "exercise/woozooDance/video" #@param {type:"string"}

def download_youtube_video(url, output_path, filename='filename'):
    # Ensure the output directory exists
    os.makedirs(output_path, exist_ok=True)

    yt = YouTube(url)
    stream = yt.streams.get_highest_resolution()
    file_path = os.path.join(output_path, f"{filename}.mp4")

    if not os.path.exists(file_path):
        stream.download(output_path=output_path, filename=filename)
    else:
        print(f"File '{filename}.mp4' already exists in '{output_path}', skipping download.")
        print(f"Existing file path: {file_path}")

download_youtube_video(YouTubeVideo, video_path, filename)

In [None]:
#@title Functions to see video properties.

video_path = "exercise/woozooDance/video/woozooDance" #@param {type:"string"}

def get_video_properties(video_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error: Unable to open video file.")
        return None, None, None, None
    # Get frames per second (fps)
    fps = cap.get(cv2.CAP_PROP_FPS)
    # Get total frame count
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    # Calculate duration in seconds
    duration = total_frames / fps
    # Get video width and height resolution
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    cap.release()
    return duration, total_frames, fps, width, height

duration, total_frames, fps, width, height = get_video_properties(video_path)

if duration is not None and total_frames is not None and fps is not None:
    print(f"Video Duration: {duration:.2f} seconds")
    print(f"Total Frame Count: {total_frames}")
    print(f"Frames Per Second (FPS): {fps:.2f}")
    print(f"Resolution: width is {width} and height is {height}")

Video Duration: 88.10 seconds
Total Frame Count: 2643
Frames Per Second (FPS): 30.00
Resolution: width is 606 and height is 360


In [None]:
#@title Functions to extract frames.

video_path = "exercise/woozooDance/video/woozooDance" #@param {type:"string"}
start_frame = 0 #@param {type:"integer"}
end_frame = 2600 #@param {type:"integer"}
frame_interval = 2 #@param {type:"integer"}
output_path = "exercise/woozooDance/inputFrames/" #@param {type:"string"}

def extract_frames(video_path, start_frame, end_frame, frame_interval, output_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error: Unable to open video file.")
        return

    frame_count = 0
    os.makedirs(output_path, exist_ok=True)

    print("Starting frame extraction process...")

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        if frame_count < start_frame:
            frame_count += 1
            continue
        elif frame_count > end_frame:
            break

        # Process every `frame_interval` frame
        if frame_count % frame_interval == 0:
            print(f"Processing frame {frame_count}")

            # Save the frame directly to the output path
            frame_filename = os.path.join(output_path, f"frame_{frame_count}.jpg")
            cv2.imwrite(frame_filename, frame)
            print(f"Saved frame {frame_count}")

        frame_count += 1

    cap.release()
    print("Frame extraction process completed.")

extract_frames(video_path, start_frame, end_frame, frame_interval, output_path)


Starting frame extraction process...
Processing frame 0
Saved frame 0
Processing frame 2
Saved frame 2
Processing frame 4
Saved frame 4
Processing frame 6
Saved frame 6
Processing frame 8
Saved frame 8
Processing frame 10
Saved frame 10
Processing frame 12
Saved frame 12
Processing frame 14
Saved frame 14
Processing frame 16
Saved frame 16
Processing frame 18
Saved frame 18
Processing frame 20
Saved frame 20
Processing frame 22
Saved frame 22
Processing frame 24
Saved frame 24
Processing frame 26
Saved frame 26
Processing frame 28
Saved frame 28
Processing frame 30
Saved frame 30
Processing frame 32
Saved frame 32
Processing frame 34
Saved frame 34
Processing frame 36
Saved frame 36
Processing frame 38
Saved frame 38
Processing frame 40
Saved frame 40
Processing frame 42
Saved frame 42
Processing frame 44
Saved frame 44
Processing frame 46
Saved frame 46
Processing frame 48
Saved frame 48
Processing frame 50
Saved frame 50
Processing frame 52
Saved frame 52
Processing frame 54
Saved fr

In [None]:
#@title Functions to visualize the pose estimation results.

def draw_prediction_on_image(
    image, person, crop_region=None, close_figure=True,
    keep_input_size=False):
  """Draws the keypoint predictions on image.

  Args:
    image: An numpy array with shape [height, width, channel] representing the
      pixel values of the input image.
    person: A person entity returned from the MoveNet.SinglePose model.
    close_figure: Whether to close the plt figure after the function returns.
    keep_input_size: Whether to keep the size of the input image.

  Returns:
    An numpy array with shape [out_height, out_width, channel] representing the
    image overlaid with keypoint predictions.
  """
  # Draw the detection result on top of the image.
  image_np = utils.visualize(image, [person])

  # Plot the image with detection results.
  height, width, channel = image.shape
  aspect_ratio = float(width) / height
  fig, ax = plt.subplots(figsize=(12 * aspect_ratio, 12))
  im = ax.imshow(image_np)

  if close_figure:
    plt.close(fig)

  if not keep_input_size:
    image_np = utils.keep_aspect_ratio_resizer(image_np, (512, 512))

  return image_np

## Preprocess the input images into CSV

Because the input for our pose classifier is the *output* landmarks from the MoveNet model, we need to generate our training dataset by running labeled images through MoveNet and then capturing all the landmark data and ground truth labels into a CSV file.

In [None]:
#@title Functions to extract keypoints from frames and saves to csv

# Assume detect and draw_prediction_on_image functions are defined

class MoveNetPreprocessor(object):
    """Helper class to preprocess pose sample images for classification."""

    def __init__(self, images_in_folder, images_out_folder, csvs_out_path):
        """Creates a preprocessor to detection pose from images and save as CSV."""
        self._images_in_folder = images_in_folder
        self._images_out_folder = images_out_folder
        self._csvs_out_path = csvs_out_path
        self._messages = []

        # Create a temp dir to store the pose CSVs per class
        self._csvs_out_folder_per_class = tempfile.mkdtemp()

    def process(self, per_pose_class_limit=None, detection_threshold=0.1):
        """Preprocesses images in the given folder.
        Args:
          per_pose_class_limit: Number of images to load. As preprocessing usually
            takes time, this parameter can be specified to make the reduce of the
            dataset for testing.
          detection_threshold: Only keep images with all landmark confidence score
            above this threshold.
        """
        pose_class_name = "pose_class"
        print('Preprocessing', pose_class_name, file=sys.stderr)

        # Paths for the pose class.
        images_out_folder = self._images_out_folder
        csv_out_path = os.path.join(self._csvs_out_folder_per_class, pose_class_name + '.csv')
        if not os.path.exists(images_out_folder):
            os.makedirs(images_out_folder)

        # Detect landmarks in each image and write it to a CSV file
        with open(csv_out_path, 'w') as csv_out_file:
            csv_out_writer = csv.writer(csv_out_file, delimiter=',', quoting=csv.QUOTE_MINIMAL)
            # Get list of images
            image_names = sorted([n for n in os.listdir(self._images_in_folder) if not n.startswith('.')])
            if per_pose_class_limit is not None:
                image_names = image_names[:per_pose_class_limit]

            valid_image_count = 0

            # Detect pose landmarks from each image
            for image_name in tqdm.tqdm(image_names):
                image_path = os.path.join(self._images_in_folder, image_name)

                try:
                    image = tf.io.read_file(image_path)
                    image = tf.io.decode_jpeg(image)
                except:
                    self._messages.append('Skipped ' + image_path + '. Invalid image.')
                    continue
                else:
                    image = tf.io.read_file(image_path)
                    image = tf.io.decode_jpeg(image)
                    image_height, image_width, channel = image.shape

                # Skip images that isn't RGB because Movenet requires RGB images
                if channel != 3:
                    self._messages.append('Skipped ' + image_path + '. Image isn\'t in RGB format.')
                    continue
                person = detect(image)

                # Save landmarks if all landmarks were detected
                min_landmark_score = min([keypoint.score for keypoint in person.keypoints])
                should_keep_image = min_landmark_score >= detection_threshold
                if not should_keep_image:
                    self._messages.append('Skipped ' + image_path + '. No pose was confidently detected.')
                    continue

                valid_image_count += 1

                # Draw the prediction result on top of the image for debugging later
                output_overlay = draw_prediction_on_image(image.numpy().astype(np.uint8), person,
                                                          close_figure=True, keep_input_size=True)

                # Write detection result into an image file
                output_frame = cv2.cvtColor(output_overlay, cv2.COLOR_RGB2BGR)
                cv2.imwrite(os.path.join(images_out_folder, image_name), output_frame)

                # Get landmarks and scale it to the same size as the input image
                pose_landmarks = np.array(
                    [[keypoint.coordinate.x, keypoint.coordinate.y, keypoint.score] for keypoint in person.keypoints],
                    dtype=np.float32)

                # Write the landmark coordinates to its per-class CSV file
                coordinates = pose_landmarks.flatten().astype(str).tolist()
                csv_out_writer.writerow([image_name] + coordinates)

            if not valid_image_count:
                raise RuntimeError('No valid images found for the "{}" class.'.format(pose_class_name))

        # Print the error message collected during preprocessing.
        print('\n'.join(self._messages))

        # Combine all per-class CSVs into a single output file
        all_landmarks_df = self._all_landmarks_as_dataframe()
        all_landmarks_df.to_csv(self._csvs_out_path, index=False)

    def _all_landmarks_as_dataframe(self):
        """Merge all per-class CSVs into a single dataframe."""
        total_df = None
        csv_out_path = os.path.join(self._csvs_out_folder_per_class, 'pose_class.csv')
        per_class_df = pd.read_csv(csv_out_path, header=None)

        # Add the labels
        per_class_df['class_no'] = [0] * len(per_class_df)
        per_class_df['class_name'] = ['pose_class'] * len(per_class_df)

        # Append the folder name to the filename column (first column)
        per_class_df[per_class_df.columns[0]] = (os.path.join('pose_class', '')
            + per_class_df[per_class_df.columns[0]].astype(str))

        total_df = per_class_df

        list_name = [[bodypart.name + '_x', bodypart.name + '_y', bodypart.name + '_score'] for bodypart in BodyPart]
        header_name = []
        for columns_name in list_name:
            header_name += columns_name
        header_name = ['file_name'] + header_name
        header_map = {total_df.columns[i]: header_name[i] for i in range(len(header_name))}

        total_df.rename(header_map, axis=1, inplace=True)

        return total_df
# Example usage

images_in_folder = "exercise/woozooDance/inputFrames" #@param {type:"string"}
images_out_folder = "exercise/woozooDance/annotatedFrames" #@param {type:"string"}
csvs_out_path = "exercise/csv/woozooDance.csv"#@param {type:"string"}

preprocessor = MoveNetPreprocessor(
    images_in_folder,
    images_out_folder,
    csvs_out_path
)

preprocessor.process(per_pose_class_limit=None, detection_threshold=0.2)

Preprocessing pose_class
100%|██████████| 1301/1301 [1:12:54<00:00,  3.36s/it]

Skipped /content/drive/Shareddrives/Capstone C241-PS416/ML/Data/keypointsDataset/woozooDance/inputFrames/frame_0.jpg. No pose was confidently detected.
Skipped /content/drive/Shareddrives/Capstone C241-PS416/ML/Data/keypointsDataset/woozooDance/inputFrames/frame_10.jpg. No pose was confidently detected.
Skipped /content/drive/Shareddrives/Capstone C241-PS416/ML/Data/keypointsDataset/woozooDance/inputFrames/frame_1006.jpg. No pose was confidently detected.
Skipped /content/drive/Shareddrives/Capstone C241-PS416/ML/Data/keypointsDataset/woozooDance/inputFrames/frame_1010.jpg. No pose was confidently detected.
Skipped /content/drive/Shareddrives/Capstone C241-PS416/ML/Data/keypointsDataset/woozooDance/inputFrames/frame_102.jpg. No pose was confidently detected.
Skipped /content/drive/Shareddrives/Capstone C241-PS416/ML/Data/keypointsDataset/woozooDance/inputFrames/frame_1020.jpg. No pose was confidently detected.
Skipped /content/drive/Shareddrives/Capstone C241-PS416/ML/Data/keypointsDat


