# Team 5 💣
1. Maximus Lee
2. Aloysius Woo
3. Lim Huai Fu
4. Tan Ai Xin
5. Jin Zhenglong

In [None]:
#@markdown Check type of GPU and VRAM available.
!nvidia-smi --query-gpu=name,memory.total,memory.free --format=csv,noheader

## Google Colab First-time Setup

### Install Environment
Est Time: 8 mins

In [None]:
# @title Install Python and other required packages

%cd /content
!sudo apt-get install python3.8
!sudo apt-get install python3.8-distutils

!update-alternatives --install /usr/local/bin/python3 python3 /usr/bin/python3.8 2
!apt-get update
!apt install software-properties-common
!sudo dpkg --remove --force-remove-reinstreq python3-pip python3-setuptools python3-wheel
!apt-get install python3-pip
!apt-get install imagemagick
!cat /etc/ImageMagick-6/policy.xml | sed 's/none/read,write/g'> /etc/ImageMagick-6/policy.xml
!apt update

In [None]:
#@title Clone Repository

# Empty default folder
!rm -rf /content/.config /content/sample_data

!git clone https://github.com/maximus-lee-678/ict3104_team_05.git .

In [None]:
#@title Install required python libraries

# https://github.com/open-mmlab/mmpose
# https://blog.csdn.net/qq_21532607/article/details/130226728
# https://colab.research.google.com/github/open-mmlab/mmpose/blob/master/demo/MMPose_Tutorial.ipynb

%cd /content
!python3.8 -m pip install -r other_files/requirements/requirements_colab.txt

!python3.8 -m mim install mmengine
!python3.8 -m mim install mmpose
!python3.8 -m mim install "mmcv>=2.0.0"
!python3.8 -m mim install "mmdet>=3.0.0"

### Mount Drive 🗻

1. Run the first cell to authenicate.
2. Click on the URL of the Failure in the output Failure("Error opening URL: ...").
3. Select the ICT3104 shared account.
4. Once successful, you can close the tab.
5. Run the next cell.
6. Done.

In [None]:
# @title First Cell
%cd -q /content
!sudo add-apt-repository -y ppa:alessandro-strada/ppa 2>&1 > /dev/null
!sudo apt-get update -qq 2>&1 > /dev/null
!sudo apt -y install -qq google-drive-ocamlfuse 2>&1 > /dev/null
!google-drive-ocamlfuse

In [None]:
# @title Next Cell
!sudo apt-get install -qq w3m # to act as web browser
!xdg-settings set default-web-browser w3m.desktop # to set default browser
%cd -q /content
!mkdir gdrive
%cd -q gdrive
!mkdir MyDrive
%cd -q ..
%cd -q ..
!google-drive-ocamlfuse /content/gdrive/MyDrive

In [None]:
#@title Symbolic linking of Checkpoints folder 🔗
!mkdir /content/FollowYourPose/checkpoints

# symbolic link edition
!ln -s /content/gdrive/MyDrive/checkpoints/* /content/FollowYourPose/checkpoints

# copy edition
# %cp -r /content/gdrive/MyDrive/ICT3104/checkpoints/* /content/FollowYourPose/checkpoints/

## Local (Windows) First-time Setup
You only need to run this cell once after cloning. Subsequent launches do not require running of these cells. \
To prevent outputs from being truncated in the local version of Jupyter Notebook, press [Shift-O].

In [None]:
# Clone FYP inference base model

%cd ..\FollowYourPose
!mkdir checkpoints

%cd checkpoints
!git lfs install
!git clone https://huggingface.co/YueMafighting/FollowYourPose_v1 .
%cd ..\..\demos

## Post-setup
Ensure either Google Colab setup or Windows Setup has been fully completed before proceeding.

In [None]:
#@title Folder Scaffolding & Library Imports
from pathlib import Path
import os
import sys

'''
how 2 Path
Try to surround file paths with Path() to reduce confusion
Because Path(str / str) does not work, try to use fstrings as inputs to Path(), instead of Path(var / var)
This is to reduce the impact of inevitable missed Path()s
If need to write filepath as a string, use .as_posix()
'''

# Check if running on google colab or locally
if os.getenv("COLAB_RELEASE_TAG"):
  %cd /content
  # Check if the path is not already in sys.path before appending
  path_to_append = '/usr/local/lib/python3.8/dist-packages'
  if path_to_append not in sys.path:
      sys.path.append(path_to_append)

  IS_COLAB = True
  print('RUNNING IN COLAB.')
else:
  # If running from local, we are current running in "demos" directory, move one level up
  if 'ROOT_DIR_PATH' not in globals():
     %cd ..

  IS_COLAB = False
  print('RUNNING LOCALLY.')

if 'ROOT_DIR_PATH' not in globals():
    # Root Directory
    ROOT_DIR_PATH = os.getcwd()
else:
   %cd {ROOT_DIR_PATH}

# Video
VIDEO_DIR_PATH = Path(f"{ROOT_DIR_PATH}/video")
VIDEO_SKELETON_DIR_PATH = Path(f"{ROOT_DIR_PATH}/video/Skeleton")

# I/O Files
TRAINING_CONTENT_DIR_PATH = Path(f"{ROOT_DIR_PATH}/training_content")
!mkdir {TRAINING_CONTENT_DIR_PATH}
CUSTOM_MODEL_DIR_PATH = Path(f"{ROOT_DIR_PATH}/custom_model")
!mkdir {CUSTOM_MODEL_DIR_PATH}
INFERENCE_OUTPUT_DIR_PATH = Path(f"{ROOT_DIR_PATH}/inference_output")
!mkdir {INFERENCE_OUTPUT_DIR_PATH}
TEST_OUTPUT_DIR_PATH = Path(f"{ROOT_DIR_PATH}/test_output")
!mkdir {TEST_OUTPUT_DIR_PATH}

MMPOSE_DIR_PATH = Path(f"{ROOT_DIR_PATH}/MMPose")
FYP_DIR_PATH = Path(f"{ROOT_DIR_PATH}/FollowYourPose")

# FYP
CONFIG_DIR_PATH = Path(f"{FYP_DIR_PATH}/configs")

CHECKPOINT_DIR_PATH = Path(f"{FYP_DIR_PATH}/checkpoints")

# Dataset Paths
CHARADES_LOOKUP_PATH = Path(f"{ROOT_DIR_PATH}/other_files/charades_lookup")
SIMS_LOOKUP_PATH = Path(f"{ROOT_DIR_PATH}/other_files/sims4action_lookup")

# Functions
def outputWidgetsLayout():
  if IS_COLAB:
    return widgets.Layout(margin='0px 0px 20px 0px', display='flex', align_items='flex-start', height="532px")
  else:
    return widgets.Layout(margin='0px 0px 20px 0px', display='flex', align_items='flex-start', overflow='auto', height="532px")

def run_command(command):
  if IS_COLAB:
    with subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) as process:
      for line in process.stdout:
        print(line, end="")

      for line in process.stderr:
        print(line, end="")
  else:
    subprocess.run(command, shell=True)

# Libraries
import shutil

import csv
import time
import datetime
import subprocess

import re
import yaml
import json

from ipywidgets import Dropdown, Output, Layout, widgets, Button, VBox, HBox
from IPython.display import display, Markdown, HTML, Video, Image, clear_output

from moviepy.editor import VideoFileClip, clips_array, TextClip, CompositeVideoClip, ColorClip
from moviepy.config import change_settings

import numpy as np
import cv2
import imageio
import fnmatch

import pandas as pd
import matplotlib.pyplot as plt

In [None]:
#@title Load default YAMLs

src_dir = Path("./other_files/fyp_default_yamls")
dst_dir = CONFIG_DIR_PATH

# copy all contents of the source directory to the destination directory
shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True)

### Configuration Utils

In [None]:
#@title Configuration Presets

# Expected Configurations of a Inference yaml
expected_inference_config = {
  "pretrained_model_path": str,
  "output_dir": str,
  "validation_data": {
      "prompts": list,
      "video_length": int,
      "width": int,
      "height": int,
      "num_inference_steps": int,
      "guidance_scale": float,
      "use_inv_latent": bool,
      "num_inv_steps": int,
      "dataset_set": str
  },
  "train_batch_size": int,
  "validation_steps": int,
  "resume_from_checkpoint": str,
  "seed": int,
  "mixed_precision": str,
  "gradient_checkpointing": bool,
  "enable_xformers_memory_efficient_attention": bool
}

# Expected Configurations of a Training yaml
expected_training_config = {
  "pretrained_model_path": str,
  "output_dir": str,
  "train_data": {
    "video_path": str,
    "n_sample_frames": int,
    "width": int,
    "sample_frame_rate": int
  },
  "learning_rate": float,
  "train_batch_size": int,
  "max_train_steps": int,
  "trainable_modules": list,
  "seed": int,
  "mixed_precision": str,
  "use_8bit_adam": bool,
  "gradient_checkpointing": bool,
  "enable_xformers_memory_efficient_attention": bool
}

# Define the options for boolean dropdown
boolean_dropdown = [True, False]

# Create layout
configs_config_style = {'description_width': '150px'}
configs_config_layout = widgets.Layout(width="500px")
configs_config_button_layout = widgets.Layout(margin='0px 0px 20px 354px')

In [None]:
#@title Configuration Functions

# Find all models and creates a list of the models' path in the directory
def create_model_list():
  model_options = [("Default", Path(f"{FYP_DIR_PATH}/checkpoints/stable-diffusion-v1-4"))]
  for model_name in os.listdir(CUSTOM_MODEL_DIR_PATH):
    model_path = os.path.join(CUSTOM_MODEL_DIR_PATH, model_name)
    if os.path.isdir(model_path):
      model_options.append((model_name, Path(f"{CUSTOM_MODEL_DIR_PATH}/{model_name}")))
  return model_options

def create_video_path_list():
  video_path_options = [("None", Path(f"{TRAINING_CONTENT_DIR_PATH}/"))]
  for video_name in os.listdir(TRAINING_CONTENT_DIR_PATH):
    video_path = os.path.join(TRAINING_CONTENT_DIR_PATH, video_name)
    if os.path.isdir(video_path):
      video_path_options.append((video_name, Path(f"{TRAINING_CONTENT_DIR_PATH}/{video_name}")))
  return video_path_options

# Check if the yaml is in the correct structure for inference
def compare_dict_structure(expected, yaml_config):
  for key, value in expected.items():
    if not isinstance(value, dict):
      if key not in yaml_config or not isinstance(yaml_config[key], value): # Check if key exists and if data type matches
        if key in yaml_config and isinstance(list(yaml_config[key]), value): # Special check for list because omegaconf sees it as a ListConfig
          return True
        return False
    elif key in yaml_config: # If value is a dict recurse to its first non dict value
      compare_dict_structure(value, yaml_config[key])
  return True

## Video Browsing 👀

1. Play the Video Selection cell.
2. Pick a folder.
3. Pick a video.
4. Click **Display** to view the video.
5. The other buttons, **MMPose**, **FYP** and **Refresh** will be covered in the other sections.

In [None]:
# @title Video Selection { display-mode: "form" }

# Helper Functions
def getFolderContent(folder_name):
  subfolders = []
  for content in os.listdir(VIDEO_DIR_PATH):
        content_path = os.path.join(VIDEO_DIR_PATH, content)

        # Check if it's a directory and not hidden
        if os.path.isdir(content_path) and not content.startswith("."):
            subfolders.append(content)

  return subfolders

# Init
video_dir_folders = os.listdir(VIDEO_DIR_PATH)
video_dir_folder_content = getFolderContent(video_dir_folders)

# Create layout
layout_single_button = widgets.Layout(width='212px',margin='0px 0px 20px 90px')
layout_double_button = widgets.Layout(width='104px')
layout_hbox = widgets.Layout(margin='0px 0px 0px 88px')
layout_output = outputWidgetsLayout()

# Create widgets
video_output_placeholder = widgets.Output(layout=layout_output)
video_subdir_dropdown = widgets.Dropdown(options=video_dir_folder_content, description='Folder:', value=None, disabled=False)
video_dropdown = widgets.Dropdown(options=[], description='Video:', disabled=True)
video_display_button = widgets.Button(description="Display", disabled=True, layout=layout_double_button)
video_refresh_button = widgets.Button(description="Refresh", disabled=False, layout=layout_double_button)
video_mmpose_button = widgets.Button(description="MMPose", disabled=True, layout=layout_double_button)
video_fyp_button = widgets.Button(description="FYP", disabled=True, layout=layout_double_button)

video_hbox_1 = widgets.HBox([video_display_button, video_refresh_button], layout=layout_hbox)
video_hbox_2 = widgets.HBox([video_mmpose_button, video_fyp_button], layout=layout_hbox)

video_output_placeholder_content = HTML("""
  <div style="width: 512px; height: 512px; border-radius: 5%; background-color: black; margin: 0 auto; display: flex; justify-content: center; align-items: center;">
      <div style="width: 500px; height: 500px; border-radius: 5%; border: 2px solid white;" />
  </div>
""")

# Create listeners
## Update video dropdown options based on the selected folder
def video_subdir_select(change):
    selected_video_folder = video_subdir_dropdown.value

    if selected_video_folder != None:
      selected_VIDEO_DIR_PATH = Path(f"{VIDEO_DIR_PATH}/{selected_video_folder}")
      selected_video_dir_content = [file for file in os.listdir(selected_VIDEO_DIR_PATH) if file.endswith('.mp4') or file.endswith('.avi')]
    else:
      selected_video_dir_content = []

    video_dropdown.options = selected_video_dir_content
    if not selected_video_dir_content:
        video_dropdown.disabled = False
        video_dropdown.value = None
    else:
        video_dropdown.disabled = False

## Display the selected video
def display_selected_video(change):
    selected_video = video_dropdown.value
    selected_video_folder = video_subdir_dropdown.value

    if selected_video:
        video_path = Path(f"{VIDEO_DIR_PATH}/{selected_video_folder}/{selected_video}")
        video_display = Video(video_path, width=512, height=512, embed=True)

        # Clear the output placeholder and display the video
        with video_output_placeholder:
            clear_output()
            display(video_display)

## Refresh folder and directory
def refresh_folder_and_directory(change):
    video_dir_folders = os.listdir(VIDEO_DIR_PATH)
    video_dir_folder_content = getFolderContent(video_dir_folders)

    video_dropdown.options = []
    video_dropdown.value = None

    video_subdir_dropdown.options = video_dir_folder_content
    video_subdir_dropdown.value = None



## Enable button when a valid video is picked
def enable_button(change):
    if video_dropdown.value:
        video_display_button.disabled = False
        video_mmpose_button.disabled = False
        video_fyp_button.disabled = False
    else:
        video_display_button.disabled = True
        video_mmpose_button.disabled = True
        video_fyp_button.disabled = True

# Attach Listeners
video_subdir_dropdown.observe(video_subdir_select, 'value')
video_display_button.on_click(display_selected_video)
video_refresh_button.on_click(refresh_folder_and_directory)
video_dropdown.observe(enable_button, 'value')


# Display fields
with video_output_placeholder:
  display(video_output_placeholder_content)
display(video_output_placeholder)
display(video_subdir_dropdown)
display(video_dropdown)

display(video_hbox_1)
display(video_hbox_2)


## Inference: MMPOSE 웃

1. Play the Metadata Functions cell.
2. Play the Run MMPose Inference cell.
3. Go back to the Video Selection cell and select a folder and a corresponding video and click **MMPose** . (Ensure the video is of a human doing any action)
4. Wait for the **Video** box to be populated with the selected video.
5. Click **Start Inference**.
6. Done.

In [None]:
#@title Metadata Functions

# Reads the input
def read_video_metadata(input_file):
  field_names = ["original_path", "bg_path"]

  cmd = f'ffprobe -v error -select_streams v:0 -show_entries "format_tags={",".join(field_names)}" -of json "{input_file}"'
  metadata_info = subprocess.check_output(cmd, shell=True).decode()
  metadata_dict = json.loads(metadata_info)

  metadata = metadata_dict['format']['tags']
  return metadata["original_path"], metadata["bg_path"]

def add_metadata_to_mp4(video_path, metadata):
  metadata_args = []
  for key, value in metadata.items():
    metadata_args.extend(["-metadata", f"{key}={value}"])

  video_name = video_path.name
  temp_output_file = Path(f'{video_path.parents[0]}/{video_name[:-4]}_temp.mp4')

  # Use ffmpeg to add metadata
  # !ffmpeg -i {video_path} -c copy -movflags use_metadata_tags -map_metadata 0 {" ".join(metadata_args)} {temp_output_file}
  command = [
      "ffmpeg",
      "-i", video_path,
      "-c", "copy",
      "-movflags", "use_metadata_tags",
      "-map_metadata", "0",
      *metadata_args,
      temp_output_file
  ]
  run_command(command)

  # Remove the original file and rename the modified file
  os.remove(video_path)
  os.rename(temp_output_file, video_path)

In [None]:
# @title Run MMPose Inference { display-mode: "form" }

# Helper Functions
## Re-encode video due to H.264 video encoding error
def reencode_video(input_file):
  temp_output_file = Path(f"{VIDEO_SKELETON_DIR_PATH}/output.mp4")
  # !ffmpeg -i {input_file} -c:v libx264 -crf 23 -c:a aac -strict experimental {temp_output_file}
  command = [
    "ffmpeg",
    "-i", input_file,
    "-c:v", "libx264",
    "-crf", "23",
    "-c:a", "aac",
    "-strict", "experimental",
    temp_output_file
  ]
  run_command(command)

  os.remove(input_file)
  os.rename(temp_output_file, input_file)

# Create layout
layout_single_long_button = widgets.Layout(margin='0px 0px 20px 154px')

# Create widgets
selected_mmpose_video_input = widgets.Text(placeholder='Select a video above', description="Video:", disabled=True)
inf_mmpose_button = widgets.Button(description="Start Inference", disabled=True, layout=layout_single_long_button)

# Create listeners
## Retrieve video input
def update_mmpose_input_video(change):
    selected_video = video_dropdown.value
    selected_mmpose_video_input.value = selected_video
    if selected_video:
        inf_mmpose_button.disabled = False
    else:
        inf_mmpose_button.disabled = True

## Run mmpose
def run_mmpose_inference(button):
    selected_video = video_dropdown.value
    video_path = Path(f"{VIDEO_DIR_PATH}/{video_subdir_dropdown.value}/{selected_video}")

    %cd -q {MMPOSE_DIR_PATH}

    if not os.path.exists(VIDEO_SKELETON_DIR_PATH):
        os.mkdir(VIDEO_SKELETON_DIR_PATH)

    command = [
      "python",
      "demo/inferencer_demo.py", str(video_path),
      "--pose2d", "human",
      "--thickness", "4",
      "--radius", "0",
      "--vis-out-dir", str(VIDEO_SKELETON_DIR_PATH)
    ]
    run_command(command)

    # Change name for human background
    input_file = Path(f'{VIDEO_SKELETON_DIR_PATH}/{selected_video}')
    bg_file = Path(f'{VIDEO_SKELETON_DIR_PATH}/.{selected_video[:-4]}-bg.mp4')

    os.rename(input_file, bg_file)

    command = [
      "python",
      "demo/inferencer_demo.py", str(video_path),
      "--pose2d", "human",
      "--black-background",
      "--thickness", "4",
      "--radius", "0",
      "--vis-out-dir", str(VIDEO_SKELETON_DIR_PATH)
    ]
    run_command(command)

    reencode_video(bg_file)
    reencode_video(input_file)

    metadata = {
        "original_path": video_path.as_posix(),
        "bg_path": bg_file.as_posix()
    }
    add_metadata_to_mp4(input_file, metadata)

    clear_output()
    mmpose_inf_display()
    print("Done, Outputs:")
    print(f"With Background: {bg_file}")
    print(f"Black Background: {input_file}")

# Attach Listeners
video_mmpose_button.on_click(update_mmpose_input_video)
inf_mmpose_button.on_click(run_mmpose_inference)

# Display fields
def mmpose_inf_display():
  display(selected_mmpose_video_input)
  display(inf_mmpose_button)

mmpose_inf_display()

## Inference: FYP 💃

1. Play the Inference Configuration cell.
2. Update the configurations and **Save**. (Details for each field will be stated above the cell).
3. Play the Run FYP Inference cell.
4. Go back to the Video Selection cell click **Refresh**.
5. Select the **Skeleton** folder and a video in that folder then click **FYP**.
6. Once the Video box has been loaded, click **Start Inference** to begin the inference.
7. Once inference is completed, play the Post-Process Inference Functions cell.
8. Play the Post-Inference cell.
9. Play the Combine Gifs cell.
10. Input all the fields and click **Create**. (Details for each field will be stated above the cell).
11. Click the Gif Display cell.
12. Choose a folder and a gif and click **Display**.
13. Done.

**Inference Configuration** ⚙️

**pretrained_model_path**: The path that contains the model to be used for inference.

**output_dir**: The path where the inferred gifs are saved to. The box is for users to write the name of the inference folder.

**Validation Data**:
> **prompts**: A list of texts that the gifs will be generated based on.
>
> **video_length**: Number of frames referenced from the pose video.
>
> **width** and **height**: Resolution of the video.
>
> **num_inference_steps**: Higher the value the more relistic a video would be in exchange for higher memory usage, computational resouces and time spent to infer.
>
> **guidance_scale**: A scale used to control and predict noise.
>
> **use_inv_latent**: Whether to reverse engineer the process to determine the latent variables used to make up the real image. Unused in our current state.
>
> **num_inv_steps**: Adjust to optimize the inverse latent process.
>
> **dataset_set**: No need to be changed by the user.

**train_batch_size** How much training can be done together at once. (Larger batch means faster training at the cost of higher memory usage)

**resume_from_checkpoint**: The path that contains the checkpoint used for the model.

**seed**: A set inference seed to limit and control randomness and ensure reproducibility in case of error and/or for debugging.

**mixed_precision**: This is to set the type of precision for text encoding and VAE autoencoding weights. By default, this is set to single precision which is fp32. (High precision in exchange for more memory usage and computational resources used)

**gradient_checkpointing**: Reduces memory usage by doing some checkpoints for gradients, which increases the computational load. Decreases memory usage for increased time taken for inference to complete.

**enable_xformers_memory_efficient_attention**: Reduce memory usage in exchange for slight dip in inference performance.



In [None]:
# @title Inference Configuration { display-mode: "form" }

# Init
%cd -q {FYP_DIR_PATH}

# Initialize load_config with a default value
load_config = None

# Define a container for displayed widgets
displayed_widgets = []

# Get a list of all files in the directory
config_files = [f for f in os.listdir(CONFIG_DIR_PATH) if os.path.isfile(os.path.join(CONFIG_DIR_PATH, f))]
config_files.remove(".gitignore")

# Create a dropdown widget with the list of config files
config_files_dropdown = Dropdown(
  options=["- Select an Item -"] + config_files,
  description='Select a Config File:',
  layout=Layout(width="500px"),
  style={'description_width': '150px'}
)

# Function to clear displayed widgets (excluding the dropdown)
def clear_displayed_widgets():
    for widget in displayed_widgets:
        widget.close()
    displayed_widgets.clear()
    display(config_files_dropdown)  # Display the dropdown again

# Function to update the load_config variable based on the selected filename
def update_load_config(change):
    global load_config
    selected_filename = change.new
    if selected_filename and selected_filename != "- Select an Item -":
        clear_output(wait=True)  # Clear the output area
        clear_displayed_widgets()  # Clear previously displayed widgets

        # Load yaml file
        sample_yaml_path = Path(f"{CONFIG_DIR_PATH}/{selected_filename}")
        with open(sample_yaml_path, 'r') as yaml_file:
          load_config = yaml.load(yaml_file, Loader=yaml.FullLoader)
        print(f"Editing config from: {sample_yaml_path}")

        # Check if the yaml configuration matches expected training config
        if compare_dict_structure(expected_inference_config, load_config):

          # Create a list of model names and paths in the directory
          model_options = create_model_list()
          value_model_path = Path(load_config['pretrained_model_path'])
          if value_model_path not in model_options:
            value_model_path = model_options[0][1]

          checkpoints_path = Path(load_config['resume_from_checkpoint'])
          if not os.path.isdir(checkpoints_path):
             checkpoints_path = Path(f"{CHECKPOINT_DIR_PATH}/{checkpoints_path.name}")

          # =====================================
          ## Basic Data
          config_subheader1 = widgets.HTML(value="<h3>Basic Data</h3>")
          config_pretrained_model_path = Dropdown(options=model_options, description="pretrained_model_path:", value=value_model_path, style=configs_config_style, layout=configs_config_layout)
          config_output_dir_name = widgets.Text(description="output_dir_name:", value="", style=configs_config_style, layout=configs_config_layout)
          config_train_batch_size = widgets.IntText(description="train_batch_size:", value=load_config['train_batch_size'], style=configs_config_style, layout=configs_config_layout)
          config_validation_steps = widgets.IntText(description="validation_steps:", value=load_config['validation_steps'], style=configs_config_style, layout=configs_config_layout)
          config_seed = widgets.IntText(description="seed:", value=load_config['seed'], style=configs_config_style, layout=configs_config_layout)
          config_mixed_precision = widgets.Text(description="mixed_precision:", value=load_config['mixed_precision'], style=configs_config_style, layout=configs_config_layout)
          config_gradient_checkpointing = Dropdown(options=boolean_dropdown, value=load_config['gradient_checkpointing'], description="gradient_checkpointing:", style=configs_config_style, layout=configs_config_layout)
          config_enable_xformers_memory_efficient_attention = Dropdown(options=boolean_dropdown, value=load_config['enable_xformers_memory_efficient_attention'], description="enable_xformers_memory_efficient_attention:", style=configs_config_style, layout=configs_config_layout)
          # =====================================
          ## Validation_data
          config_subheader2 = widgets.HTML(value="<h3>Validation Data</h3>")
          config_prompts = widgets.Textarea(description="prompts:", value="\n".join(load_config['validation_data']['prompts']), style=configs_config_style, layout=widgets.Layout(width="500px", height="100px"))
          config_video_length = widgets.IntText(description="video_length:", value=load_config['validation_data']['video_length'], style=configs_config_style, layout=configs_config_layout)
          config_width = widgets.IntText(description="width:", value=load_config['validation_data']['width'], style=configs_config_style, layout=configs_config_layout)
          config_height = widgets.IntText(description="height:", value=load_config['validation_data']['height'], style=configs_config_style, layout=configs_config_layout)
          config_num_inference_steps = widgets.IntText(description="num_inference_steps:", value=load_config['validation_data']['num_inference_steps'], style=configs_config_style, layout=configs_config_layout)
          config_guidance_scale = widgets.FloatText(description="guidance_scale:", value=load_config['validation_data']['guidance_scale'], style=configs_config_style, layout=configs_config_layout)
          config_use_inv_latent = Dropdown(options=boolean_dropdown, value=load_config['validation_data']['use_inv_latent'], description="use_inv_latent:", style=configs_config_style, layout=configs_config_layout)
          config_num_inv_steps = widgets.IntText(description="num_inv_steps:", value=load_config['validation_data']['num_inv_steps'], style=configs_config_style, layout=configs_config_layout)
          # =====================================
          ## Group widgets
          config_vbox = widgets.VBox([
              config_pretrained_model_path,
              config_output_dir_name,
              config_train_batch_size,
              config_validation_steps,
              config_seed,
              config_mixed_precision,
              config_gradient_checkpointing,
              config_enable_xformers_memory_efficient_attention,
          ])
          config_vbox_validation_data = widgets.VBox([
              config_prompts,
              config_video_length,
              config_width,
              config_height,
              config_num_inference_steps,
              config_guidance_scale,
              config_use_inv_latent,
              config_num_inv_steps
          ])

          # Create listeners
          def save_config(change):
            config = {
                "pretrained_model_path": config_pretrained_model_path.value.as_posix(),
                "output_dir": Path(f"{INFERENCE_OUTPUT_DIR_PATH}/{config_output_dir_name.value}").as_posix(),
                "validation_data": {
                    "prompts": [prompt.strip() for prompt in config_prompts.value.splitlines() if prompt.strip()],
                    "video_length": config_video_length.value,
                    "width": config_width.value,
                    "height": config_height.value,
                    "num_inference_steps": config_num_inference_steps.value,
                    "guidance_scale": config_guidance_scale.value,
                    "use_inv_latent": config_use_inv_latent.value,
                    "num_inv_steps": config_num_inv_steps.value,
                    "dataset_set": load_config['validation_data']['dataset_set']
                },
                "train_batch_size": config_train_batch_size.value,
                "validation_steps": config_validation_steps.value,
                "resume_from_checkpoint": checkpoints_path.as_posix(),
                "seed": config_seed.value,
                "mixed_precision": config_mixed_precision.value,
                "gradient_checkpointing": config_gradient_checkpointing.value,
                "enable_xformers_memory_efficient_attention": config_enable_xformers_memory_efficient_attention.value
            }

            # Specify the folder path you want to check
            folder_path = Path(f'{INFERENCE_OUTPUT_DIR_PATH}/{config_output_dir_name.value}')

            # Check if the folder exists
            if os.path.exists(folder_path) and os.path.isdir(folder_path):
              print("\r", f'There is already a folder with the name {config_output_dir_name.value}! Please rename your folder!', end="")

            else:
              # Save updated config back into the yaml file
              with open(sample_yaml_path, "w") as file:
                yaml.dump(config, file, default_style='"', default_flow_style=False, sort_keys=False)

              print("\r", "Saving...", end="")
              time.sleep(2)
              print("\r", "Successfully saved!", end="")

          ## Button Widget and Attach Listener
          config_save_btn = widgets.Button(description="Save", layout=configs_config_button_layout)
          config_save_btn.on_click(save_config)

          # Display fields (same as before)
          display(
            config_subheader1,
            config_vbox,
            config_subheader2,
            config_vbox_validation_data,
            config_save_btn
          )

          # Update the displayed_widgets list
          displayed_widgets.extend([
              config_subheader1,
              config_vbox,
              config_subheader2,
              config_vbox_validation_data,
              config_save_btn
          ])

        else:
          print("The configuration for this yaml is not structured correctly for inference!")

# Attach the event handler to the dropdown's 'value' trait
config_files_dropdown.observe(update_load_config, names='value')

# Display the dropdown widget and the output widget
display(config_files_dropdown)

In [None]:
# @title Run FYP Inference { display-mode: "form" }

# Create layout
layout_single_long_button = widgets.Layout(margin='0px 0px 20px 154px')

# Create widgets
selected_fyp_video_input = widgets.Text(placeholder='Select a video above', description="Video:", disabled=True)
config_file_path = widgets.Text(value=Path(f"{CONFIG_DIR_PATH}/pose_sample.yaml").as_posix(),description="Config File:")
inf_fyp_button = widgets.Button(description="Start Inference", disabled=True, layout=layout_single_long_button)

# Create listeners
## Retrieve video input
def update_fyp_input_video(change):
    selected_video = video_dropdown.value
    selected_fyp_video_input.value = Path(f"{VIDEO_DIR_PATH}/{video_subdir_dropdown.value}/{selected_video}").as_posix()
    if selected_video:
        inf_fyp_button.disabled = False
    else:
        inf_fyp_button.disabled = True

## Run FYP
def run_fyp_inference(button):
    %cd -q {FYP_DIR_PATH}

    config_file_path_text = config_file_path.value
    video_file_path_text = selected_fyp_video_input.value

    # Specify the folder path you want to check
    with open(config_file_path_text, 'r') as yaml_file:
      load_config = yaml.load(yaml_file, Loader=yaml.FullLoader)
    folder_path = load_config['output_dir']

    # Check if the folder exists
    if os.path.exists(folder_path) and os.path.isdir(folder_path):
      print("\r", f"The folder path {load_config['output_dir']} already exists! Please rename your folder!", end="")

    else:
      print("\r", "", end="")

      command = [
        "accelerate", "launch",
        "txt2video.py",
        f"--config={config_file_path_text}",
        f"--skeleton_path={video_file_path_text}"
      ]
      run_command(command)

## Pass on skeleton path
def get_skeleton_path():
    skeleton_path = Path(selected_fyp_video_input.value)
    return skeleton_path

## Pass on config path
def get_config_path():
    config_path = Path(config_file_path.value)
    return config_path

# Attach Listeners
inf_fyp_button.on_click(run_fyp_inference)
video_fyp_button.on_click(update_fyp_input_video)

# Display fields
display(selected_fyp_video_input)
display(config_file_path)
display(inf_fyp_button)

In [None]:
#@title Post-Process Inference Functions

# View the duration, number of frames and fps of a given gif
def gif_stats(gif_path):
  gif = imageio.get_reader(gif_path)

  # Get the duration in seconds
  duration = 0.0
  for frame in gif:
      duration += frame.meta['duration'] / 1000.0

  fps = len(gif) / duration
  print(f"Duration: {duration}s, Frames: {len(gif)}, FPS: {fps}")

# Combines all gifs with parameters to show superimposition and captions
def postprocess_gif(inf_path, pose_type, display_superimposed, display_captions, is_combine):

  clips = []
  raw_gifs = []

  for gif in os.listdir(Path(f"{inf_path}/raw")):
    if gif.endswith(".gif"):
      raw_gifs.append((gif, gif[:-4]))

  reader = imageio.get_reader(Path(f"{inf_path}/raw/{raw_gifs[0][0]}"))
  width, height, _ = reader.get_data(0).shape

  # Get pose type
  pose_type_dict = {
    "Pose": ("pose.gif", "P"),
    "Human": ("pose-human.gif", "H"),
    "Human + Pose": ("pose-bg.gif", "HP"),
  }
  pose_path = Path(f"{inf_path}/{pose_type_dict[pose_type][0]}")
  pose_video = VideoFileClip(pose_path.as_posix()).resize((width,height))

  black_clip = ColorClip(size=(pose_video.w, pose_video.h + 40), color=(0, 0, 0), duration=pose_video.duration)

  if display_captions:
    clips.append(CompositeVideoClip([black_clip, pose_video]))
  else:
    clips.append(pose_video)

  # Get clips
  video_path_template = "{}/superimposed/{}.gif" if display_superimposed else "{}/raw/{}.gif"
  for _, prompt in raw_gifs:
    file_path = Path(video_path_template.format(inf_path, prompt))
    gif_video = VideoFileClip(file_path.as_posix())
    if display_captions:
      txt_clip = TextClip(prompt, font="Amiri-bold", fontsize=30, color='white')
      txt_clip = txt_clip.set_duration(gif_video.duration)
      txt_clip = txt_clip.set_position(("center", "bottom"))
      components = [black_clip, txt_clip, gif_video] if is_combine else [black_clip, gif_video]
      clips.append(CompositeVideoClip(components))
    else:
      clips.append(gif_video)

  suffix = f"{pose_type_dict[pose_type][1]}_{'S' if display_superimposed else 'X'}_{'C' if display_captions else 'X'}_{'C' if is_combine else 'X'}"

  if is_combine:
    gif_output_path = Path(f"{inf_path}/processed/all_combined_{suffix}.gif")
    result = clips_array([clips])
    result.write_gif(gif_output_path, fps=7.692, verbose=False, logger=None)
  else:
    for i, (path, prompt) in enumerate(raw_gifs):
      gif_output_path = Path(f"{inf_path}/processed/{prompt}_{suffix}.gif")
      result = clips_array([[clips[0], clips[i+1]]])
      result.write_gif(gif_output_path, fps=7.692, verbose=False, logger=None)

# Matches the skeleton video duration and fps to be the same as gif
def postprocess_mmpose(skeleton_path, video_length, output_dir):

  # Get the video duration using ffprobe
  duration = float(subprocess.check_output(['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', skeleton_path]))

  # Set your desired duration
  # video_length is defined original from the sample yaml
  # Each unit of video length has been calculated to be 130 ms
  desired_duration = video_length * 130 / 1000

  # Calculate the speedup factor
  speedup_factor = desired_duration / duration

  fps = video_length / desired_duration

  original_path, bg_path = read_video_metadata(skeleton_path)
  print(original_path, bg_path)

  # Run the FFmpeg command to adjust the video's duration
  output_pose_path = Path(f'{output_dir}/pose.gif')
  output_pose_bg_path = Path(f'{output_dir}/pose-bg.gif')
  output_pose_human_path = Path(f'{output_dir}/pose-human.gif')

  # !ffmpeg -i $skeleton_path -vf "setpts=$speedup_factor*PTS,fps=$fps" $output_pose_path
  # !ffmpeg -i $bg_path -vf "setpts=$speedup_factor*PTS,fps=$fps" $output_pose_bg_path
  # !ffmpeg -i $original_path -vf "setpts=$speedup_factor*PTS,fps=$fps" $output_pose_human_path

  command_skeleton = [
    "ffmpeg",
    "-i", skeleton_path,
    "-filter:v", f"setpts={speedup_factor}*PTS,fps={fps}",
    output_pose_path
  ]

  command_bg = [
    "ffmpeg",
    "-i", bg_path,
    "-filter:v", f"setpts={speedup_factor}*PTS,fps={fps}",
    output_pose_bg_path
  ]

  command_original = [
    "ffmpeg",
    "-i", original_path,
    "-filter:v", f"setpts={speedup_factor}*PTS,fps={fps}",
    output_pose_human_path
  ]

  run_command(command_skeleton)
  run_command(command_bg)
  run_command(command_original)

# Superimposes all gifs with pose in selected folder
def superimposeSkeleton(inf_folder_path):
  raw_path = Path(f"{inf_folder_path}/raw")
  skeleton = Path(f"{inf_folder_path}/pose.gif")

  output_folder_path = Path(f"{inf_folder_path}/superimposed")

  # Check if the superimposed folder exists
  if not os.path.exists(output_folder_path):
    # If it doesn't exist, create the superimposed folder
    os.makedirs(output_folder_path)

  # Get a list of all files in the directory
  generated_gifs = [f for f in os.listdir(raw_path) if os.path.isfile(os.path.join(raw_path, f))]

  for human_gif in generated_gifs:
    if not human_gif.endswith(".gif"):
      continue

    human_path = Path(f"{raw_path}/{human_gif}")
    output_file_path = Path(f"{output_folder_path}/{human_gif}")

    # Load your two GIFs
    fg = imageio.get_reader(skeleton)
    bg = imageio.get_reader(human_path)

    # Create a writer to save the result as a GIF
    output_gif = imageio.get_writer(output_file_path, fps=7.692, loop=0)  # Adjust the desired frame rate

    for i in range(min(len(fg), len(bg))):  # Process frames until one of the GIFs ends
        foreground = fg.get_data(i)
        background = bg.get_data(i)

        foreground = cv2.resize(foreground, (512, 512))
        background = cv2.resize(background, (512, 512))

        # Creating the alpha mask from the foreground image (e.g., removing the black background)
        gray = cv2.cvtColor(foreground, cv2.COLOR_BGR2GRAY)
        foreground = foreground.astype(float)
        background = background.astype(float)

        # Dark pixels filter (0 to 255)
        black_mask = (gray <= 50)

        # Combine the images based on the mask
        outImage = np.where(black_mask[:, :, np.newaxis], background, foreground)

        # Convert the frame to uint8
        ims = outImage.astype(np.uint8)

        # Add the frame to the output GIF
        output_gif.append_data(ims)

    output_gif.close()

In [None]:
#@title Post-Inference

# Load the inferred config file
with open(get_config_path(), 'r') as yaml_file:
  load_config = yaml.load(yaml_file, Loader=yaml.FullLoader)

# Retrieve info needed from the config file
video_length = load_config['validation_data']['video_length']
output_dir = load_config['output_dir']

# Proper getting skeleton
skeleton_path = get_skeleton_path()

# Create a pose gif of all 3 types of pose (Human, Skeleton, Superimposed)
postprocess_mmpose(skeleton_path, video_length, output_dir)

# Superimposes the pose onto generated gifs
superimposeSkeleton(output_dir)

#### **Combine Gifs**

**Inference Directory Name**: Shows a dropdown of folders in the inference output folder that the user can select.

**Pose Type**: Shows 3 options as dropdown for user to select
> **pose**: Skeleton pose video generated by MMPose with black background <br>
> **human**: Original video <br>
> **combination**: Skeleton pose superimposed onto the original video

**Superimpose on Gif**: Boolean dropdown for user to indicate if they would like to see only the inferred gifs generated or the inferred gifs with the skeleton pose superimposed onto them.

**Show Captions**: Boolean dropdown for user to indicate if they would like the prompt to be indicated for the inferred gif generated.

**Combine all**: Boolean dropdown for user to indicate if they would like all the inferred gifs generated + the pose gif to be merged into 1 gif for display or separate it into individual inferred gif + skeleton pose gif.

In [None]:
#@title Combine Gifs

def getInferenceRunFolderContent(folder_name):
  runfolders = []
  for content in os.listdir(INFERENCE_OUTPUT_DIR_PATH):
        content_path = os.path.join(INFERENCE_OUTPUT_DIR_PATH, content)

        # Check if it's a directory and not hidden
        if os.path.isdir(content_path) and not content.startswith("."):
            runfolders.append(content)

  return runfolders

inference_run_folders = os.listdir(INFERENCE_OUTPUT_DIR_PATH)
inference_run_folder_content = getInferenceRunFolderContent(inference_run_folders)

# Configure layouts
combine_gif_button_layout = widgets.Layout(margin='0px 0px 20px 210px', width="143px")
combine_gif_style = {'description_width': '200px'}
combine_gif_layout = widgets.Layout(width="350px")

# Create widgets
gif_inference_folder_name = Dropdown(options=inference_run_folder_content, description="Inference Directory Name:", style=combine_gif_style, layout=combine_gif_layout)
gif_pose_type = widgets.Dropdown(options=["Pose", "Human", "Human + Pose"], description='Pose Type:', value=None, style=combine_gif_style, layout=combine_gif_layout)
gif_inferred_type = widgets.Dropdown(options=boolean_dropdown, description='Superimpose on Gif:', value=None, style=combine_gif_style, layout=combine_gif_layout)
gif_show_captions = widgets.Dropdown(options=boolean_dropdown, description='Show Captions:', value=None, style=combine_gif_style, layout=combine_gif_layout)
gif_combine_all = widgets.Dropdown(options=boolean_dropdown, description='Combine all:', value=None, style=combine_gif_style, layout=combine_gif_layout)
gif_create_button = widgets.Button(description="Create", disabled=True, layout=combine_gif_button_layout)

def clearTextOutput():
  clear_output()

  display(gif_inference_folder_name)
  display(gif_pose_type)
  display(gif_inferred_type)
  display(gif_show_captions)
  display(gif_combine_all)
  display(gif_create_button)

# Create listeners
## Enable button when all dropdowns are populated
def enable_button(change):
    if gif_pose_type.value and gif_inferred_type.value is not None and gif_show_captions.value is not None and gif_combine_all.value is not None:
      gif_create_button.disabled = False
    else:
      gif_create_button.disabled = True

## Display the selected gif
def combine_gifs(change):
  gif_inf_folder = Path(f"{INFERENCE_OUTPUT_DIR_PATH}/{gif_inference_folder_name.value}")
  pose_type = gif_pose_type.value
  is_superimposed = gif_inferred_type.value
  show_captions = gif_show_captions.value
  combine_all = gif_combine_all.value

  # Check if the folder exists
  if os.path.exists(gif_inf_folder) and os.path.isdir(gif_inf_folder):
    clearTextOutput()

    postprocess_gif(gif_inf_folder, pose_type, is_superimposed, show_captions, combine_all)
    print("Done")
  else:
    print("\r", 'No such folder exists!', end="")

# Attach Listeners
gif_pose_type.observe(enable_button, names='value')
gif_inferred_type.observe(enable_button, names='value')
gif_show_captions.observe(enable_button, names='value')
gif_combine_all.observe(enable_button, names='value')
gif_create_button.on_click(combine_gifs)

# Display fields
display(gif_inference_folder_name)
display(gif_pose_type)
display(gif_inferred_type)
display(gif_show_captions)
display(gif_combine_all)
display(gif_create_button)

In [None]:
# @title Gif Display
def getInferenceRunFolderContent(folder_name):
  runfolders = []
  for content in os.listdir(INFERENCE_OUTPUT_DIR_PATH):
        content_path = os.path.join(INFERENCE_OUTPUT_DIR_PATH, content)

        # Check if it's a directory and not hidden
        if os.path.isdir(content_path) and not content.startswith("."):
            runfolders.append(content)

  return runfolders

# Init
inference_run_folders = os.listdir(INFERENCE_OUTPUT_DIR_PATH)
inference_run_folder_content = getInferenceRunFolderContent(inference_run_folders)

# Create widgets
gif_output_placeholder = widgets.Output(layout=layout_output)
gif_subdir_dropdown = widgets.Dropdown(options=inference_run_folder_content, description='Folder:', value=None, disabled=False)
gif_dropdown = widgets.Dropdown(options=[], description='Gif:', disabled=True)
gif_display_button = widgets.Button(description="Display", disabled=True, layout=layout_double_button)
gif_refresh_button = widgets.Button(description="Refresh", disabled=False, layout=layout_double_button)

gif_hbox_1 = widgets.HBox([gif_display_button, gif_refresh_button], layout=layout_hbox)

# Create listeners
## Update gif dropdown options based on the selected folder
def gif_subdir_select(change):
    selected_gif_folder = gif_subdir_dropdown.value

    if selected_gif_folder != None:
      selected_GIF_DIR_PATH = Path(f"{INFERENCE_OUTPUT_DIR_PATH}/{selected_gif_folder}/processed")
      selected_gif_dir_content = [file for file in os.listdir(selected_GIF_DIR_PATH) if file.endswith('.gif')]
    else:
      selected_gif_dir_content = []

    gif_dropdown.options = selected_gif_dir_content
    if not selected_gif_dir_content:
        gif_dropdown.disabled = False
        gif_dropdown.value = None
    else:
        gif_dropdown.disabled = False

## Display the selected gif
def display_selected_gif(change):
    selected_gif = gif_dropdown.value
    selected_gif_folder = gif_subdir_dropdown.value

    if selected_gif:
        gif_path = Path(f"{INFERENCE_OUTPUT_DIR_PATH}/{selected_gif_folder}/processed/{selected_gif}")
        gif_display = Image(filename=gif_path, embed=True)

        # Clear the output placeholder and display the gif
        with gif_output_placeholder:
            clear_output()
            display(gif_display)

## Refresh folder and directory
def refresh_folder_and_directory(change):
    inference_run_folders = os.listdir(INFERENCE_OUTPUT_DIR_PATH)
    inference_run_folder_content = getInferenceRunFolderContent(inference_run_folders)

    gif_dropdown.options = []
    gif_dropdown.value = None

    gif_subdir_dropdown.options = inference_run_folder_content
    gif_subdir_dropdown.value = None


## Enable button when a valid gif is picked
def enable_button(change):
    if gif_dropdown.value:
        gif_display_button.disabled = False
    else:
        gif_display_button.disabled = True

# Attach Listeners
gif_subdir_dropdown.observe(gif_subdir_select, 'value')
gif_display_button.on_click(display_selected_gif)
gif_refresh_button.on_click(refresh_folder_and_directory)
gif_dropdown.observe(enable_button, 'value')


# Display fields
display(gif_output_placeholder)
display(gif_subdir_dropdown)
display(gif_dropdown)

display(gif_hbox_1)

## Training 🏋

1. Play the Init cell.
2. Play the Dataset Preload (Video) cell.
3. Select a dataset folder and click **Start Cutting**.
4. Play Dataset Preload (Metadata) cell.
5. Play the Training Configuration cell.
6. Update the configurations and click **Save**. (Details for each field will be shown above the cell).
7. Play Run Training cell.
8. Done.

In [None]:
# @title Initialise Charades Data { display-mode: "form" }

# Charades Data Class from csv
class CharadesData:
  def __init__(self, row):
    id, subject, scene, quality, relevance, verified, script, objects, descriptions, actions, length = row
    self.id = id
    self.subject = subject
    self.scene = scene
    self.quality = quality
    self.relevance = relevance
    self.verified = verified
    self.script = script
    self.objects = objects.split(";")
    self.descriptions = descriptions
    self.length = length
    self.actions = {}

    # Convert actions in proper data structure ("class_id time_start time_end" -> class_id: [time_start, time_end])
    if len(actions) != 0:
      action_substrings = actions.split(';')
      for substring in action_substrings:
        parts = substring.split()
        key = parts[0]
        values = [self.convert_to_ms(parts[1]), self.convert_to_ms(parts[2])]
        self.actions[key] = values

  # For printing
  def __str__(self):
        return f"ID: {self.id}, Subject: {self.subject}, Scene: {self.scene}, Quality: {self.quality}, Relevance: {self.relevance}, Verified: {self.verified}, Script: {self.script}, Objects: {self.objects}, Descriptions: {self.descriptions}, Actions: {self.actions}, Length: {self.length}"

  # Helper function to convert time into ms
  def convert_to_ms(self, seconds):
    ss,ms = seconds.split('.')
    total_ms = 1000*int(ss) + int(ms)
    return total_ms

  # Caption getter with template
  def getCaption(self, index):
    return f"In a {self.scene} setting, within the context of '{self.script}', the action '{action_descriptions[list(self.actions.keys())[index]]}' is taking place."


action_descriptions = {}
charades_all = []

# Load classes lookup table
with open(Path(f"{CHARADES_LOOKUP_PATH}/Charades_v1_classes.txt"), 'r') as file:
    for line in file:
        code, description = line.strip().split(' ', 1)
        action_descriptions[code] = description

# Load charades data A
with open(Path(f"{CHARADES_LOOKUP_PATH}/Charades_v1_train.csv"), mode='r') as file:
    csv_reader = csv.reader(file)
    next(csv_reader, None)

    for row in csv_reader:
        charadeData = CharadesData(row)
        charades_all.append(charadeData)

# Load charades data B
with open(Path(f"{CHARADES_LOOKUP_PATH}/Charades_v1_test.csv"), mode='r') as file:
    csv_reader = csv.reader(file)
    next(csv_reader, None)

    for row in csv_reader:
        charadeData = CharadesData(row)
        charades_all.append(charadeData)

clear_output()
print("Data load successful!")

In [None]:
# @title Initialise Sims Data { display-mode: "form" }

# Sims Data Class from csv
class SimsData:
  def __init__(self, filename):
    pattern = r'^([^_]*)_S(\d*)([^_]*\d*)_(fC\d*|m\d*)'
    match = re.match(pattern, filename)

    self.isSims = True if match else False
    if self.isSims:
      self.activity = sims_const["activity"].get(match.group(1))
      self.subject = sims_const["subject"].get(match.group(2))
      self.scene = sims_const["scene"].get(match.group(3))
      self.camera = match.group(4)

      if (not self.activity or not self.subject or not self.scene or not self.camera):
        self.isSims = False

  # For printing
  def __str__(self):
      return f"Sims: {self.isSims}"

  # Caption getter with template
  def getCaption(self):
    return f"In a {self.scene} setting, {self.subject} is {self.activity}"

sims_const = {}
sims_const["subject"] = {
  "1": "a male character with a medium skin tone, wearing a black cap and a multicolored vest",
  "2": "a male character with pale skin and red hair, sporting glasses and a beige blazer over an orange shirt",
  "3": "a male character with a tan complexion, gray hair styled with a headband, and dressed in a light gray cardigan",
  "4": "a male character with a dark complexion, a shaved head, and wearing a white blazer over a gray, horizontally striped shirt",
  "5": "a female character with a medium complexion, straight black hair to the shoulders, wearing a dress with a distinct pattern",
  "6": "a female character with a medium complexion, sporting grey hair, orange glasses, and a black and white dress",
  "7": "a female character with a light to medium skin tone, featuring a shaved head, and wearing a beige wrap-style garment",
  "8": "a female character with a medium skin tone and hair in a ponytail, dressed in a light-colored, sleeveless top with a gentle pattern"
}

sims_const["scene"] = {
    "K1": "Kitchen",
    "K2": "Kitchen",
    "D1": "Dining Room",
    "D2": "Dining Room",
    "L1": "Living Room",
    "L2": "Living Room",
}

sims_const["activity"] = {
    "Co": "cooking",
    "Dr": "drinking",
    "Ea": "eating",
    "GS": "getting up and sitting Down",
    "RB": "reading book",
    "UC": "using computer",
    "UP": "using phone",
    "UT": "using tablet",
    "WA": "walking",
    "TV": "watching TV",
}

In [None]:
# @title Dataset Preload (Video) { display-mode: "form" }

# Init
dataset_dir_folders = os.listdir(VIDEO_DIR_PATH)

## Training env
training_dataset = None
training_idx = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
training_branch = Path(f"{TRAINING_CONTENT_DIR_PATH}/{training_idx}")

# Create layout
config_style = {'description_width': '100px'}
config_layout = widgets.Layout(width="300px")
config_button_layout = widgets.Layout(margin='0px 0px 20px 154px')

# Create widgets
dataset_folder_dropdown = widgets.Dropdown(options=[dir for dir in dataset_dir_folders if not dir.startswith(".")], description='Dataset Folder:', value=None, layout=config_layout, style=config_style)
dataset_cutting_button = widgets.Button(description="Start Cutting", disabled=True, layout=config_button_layout)

# Create listeners
def dataset_dir_select(change):
  if dataset_folder_dropdown:
    dataset_folder_content = os.listdir(Path(f"{VIDEO_DIR_PATH}/{dataset_folder_dropdown.value}"))
    total_dataset_videos = len([file for file in dataset_folder_content if (file.endswith('.mp4') or file.endswith('.avi'))])

    dataset_cutting_button.disabled = False
    dataset_cutting_button.description = f"Start Cutting ({total_dataset_videos})"

def video_cutting_select(change):
  os.makedirs(training_branch, exist_ok=True)
  training_dataset = Path(f"{VIDEO_DIR_PATH}/{dataset_folder_dropdown.value}")

  # Loop video files from selected dataset folder
  for video_file in os.listdir(training_dataset):
    video, ext = os.path.splitext(video_file)

    # Ignore non video files (Eg: .ipynb_checkpoint and csv)
    if not (ext == ".mp4" or ext == ".avi"):
      continue

    video_folder = Path(f"{training_branch}/{video}")
    if not os.path.exists(video_folder):
      os.mkdir(video_folder)

      # Retrieve charade object by ID
      charade_data = None
      for charade in charades_all:
        if charade.id == video:
            charade_data = charade
            break

      if not charade_data:
        sims_data = SimsData(video)
        if sims_data.isSims:
          print(f"{video}: Non-charades clip, no clipping required")
          input_video = Path(f"{training_dataset}/{video_file}")
          output_video = Path(f"{video_folder}/{video}01{ext}")
          # !cp {input_video} {output_video}
          shutil.copy(input_video, output_video)
        else:
          os.rmdir(video_folder)
          print(f"{video}: Not found in charades or sims, skip clipping")
      elif not charade_data.actions:    # If no clipping required, keep whole video
        print(f"No clipping needed for {video}")
        input_video = Path(f"{training_dataset}/{video_file}")
        output_video = Path(f"{video_folder}/{video}01{ext}")
        # !cp {input_video} {output_video}
        shutil.copy(input_video, output_video)
      else:
        print(f"Clipping {video}")
        charade_actions = charade_data.actions.items()
        total_charade_actions = len(charade_actions)
        for i, (class_id, timings) in enumerate(charade_actions):

          input_video = Path(f"{training_dataset}/{video_file}")
          output_video = Path(f"{video_folder}/{video}{i+1:02}{ext}")

          print(f"#{i+1}/{total_charade_actions}: {timings[0]}ms to {timings[0]+timings[1]}ms [I:{input_video}] [O:{output_video}]")
          # !ffmpeg -i {input_video} -ss {timings[0]}ms -t {timings[1]}ms -c:v libx264 -c:a aac {output_video} -loglevel quiet
          command = [
            "ffmpeg",
            "-i", input_video,
            "-ss", f"{timings[0]}ms",
            "-t", f"{timings[1]}ms",
            "-c:v", "libx264",
            "-c:a", "aac",
            output_video,
            "-loglevel", "quiet"
          ]

          run_command(command)
    else:
      print(f"Folder already exist for video_id: {video}. Skipping ...")

  print("Finished Clipping")

# Attach Listeners
dataset_folder_dropdown.observe(dataset_dir_select, 'value')
dataset_cutting_button.on_click(video_cutting_select)

# Display fields
display(dataset_folder_dropdown)
display(dataset_cutting_button)

In [None]:
# @title Dataset Preload (Metadata) { display-mode: "form" }

training_metadata_file = Path(f"{training_branch}/metadata.tsv")
with open(training_metadata_file, 'w', newline='', encoding='utf-8') as tsvfile:
  fieldnames = ['part_id', 'clip_id', 'caption']
  writer = csv.DictWriter(tsvfile, fieldnames=fieldnames, delimiter='\t')
  writer.writeheader()

  for part_id in os.listdir(training_branch):
    folder_path = os.path.join(training_branch, part_id)

    # Ignore non video files (Eg: .ipynb_checkpoint and csv)
    if not os.path.isdir(folder_path) or part_id.startswith("."):
      continue

    charade_data = None
    for charade in charades_all:
      if charade.id == part_id:
        charade_data = charade
        break

    if not charade_data:
      sims_data = SimsData(part_id)
      if not sims_data.isSims:
        print("Missing charades data, skipping ...")
        continue

    # Sort by video sub-id to maintain order
    training_video_files = sorted(os.listdir(folder_path), key=lambda x: int(os.path.splitext(x)[0][-2:]))
    for i, clip in enumerate(training_video_files):
      caption = charade_data.getCaption(i) if charade_data else sims_data.getCaption()
      writer.writerow({
          'part_id': part_id,
          'clip_id': clip,
          'caption': caption
      })
print(f"TSV created: {training_metadata_file}")

#### **Training Configuration** ⚙️

**pretrained_model_path**: The path that contains the model to fine-tune. This will be a dropdown for the user to select from.

**output_dir**: The path where the newly fine tuned model is pushed to. The folder name itself is written by the user, whereas the path to the folder is currently fixed.

**Train Data**:
> **video_path**: The path that contains the training dataset.
>
> **n_sample_frames**: Determines how many frames are referenced for training.
>
> **width**: Resolution of the video.
>
> **sample_frame_rate**: The rate at which the frames are sampled from. If the sample frames are set to 10 and the frame rate is set to 2, then every second 2 frames are referenced.

**learning_rate** The rate at which each step of the training is conducted.

**train_batch_size**: How much training can be done together at once. (Larger batch means faster training at the cost of higher memory usage)

**max_train_steps**: The number of iterations the dataset is ran to optimize training.

**trainable_modules**: The modules that are being trained (No change to be made be a user as the training state would be the same unless requirements change)

**seed**: A set training seed to limit and control randomness and ensure reproducibility in case of error and/or for debugging.

**mixed_precision**: This is to set the type of precision for text encoding and VAE autoencoding weights. By default, this is set to single precision which is fp32. (High precision in exchange for more memory usage and computational resources used)

**use_8bit_adam**: Can be toggled true to reduce memory usage and computational resources used by using 8 bit precision for some part of ADAM optimization computations.

**gradient_checkpointing**: Reduces memory usage by doing some checkpoints for gradients, which increases the computational load. Decreases memory usage for increased time taken for training completion.

**enable_xformers_memory_efficient_attention**: Reduce memory usage in exchange for slight dip in training performance.



In [None]:
# @title Training Configuration { display-mode: "form" }

# Init
%cd -q {FYP_DIR_PATH}

# Initialize load_config with a default value
load_config = None

# Get a list of all files in the directory
config_files = [f for f in os.listdir(CONFIG_DIR_PATH) if os.path.isfile(os.path.join(CONFIG_DIR_PATH, f))]
config_files.remove(".gitignore")

# Define a container for displayed widgets
displayed_widgets = []

# Create a dropdown widget with the list of config files
config_files_dropdown = Dropdown(
  options=["- Select an Item -"] + config_files,
  description='Select a Config File:',
  layout=Layout(width="500px"),
  style={'description_width': '150px'}
)

# Function to clear displayed widgets (excluding the dropdown)
def clear_displayed_widgets():
    for widget in displayed_widgets:
        widget.close()
    displayed_widgets.clear()
    display(config_files_dropdown)  # Display the dropdown again

# Function to update the load_config variable based on the selected filename
def update_load_config(change):
    global load_config
    selected_filename = change.new
    if selected_filename and selected_filename != "- Select an Item -":
        clear_output(wait=True)  # Clear the output area
        clear_displayed_widgets()  # Clear previously displayed widgets

        # Load yaml file
        sample_yaml_path = Path(f"{CONFIG_DIR_PATH}/{selected_filename}")
        # load_config = OmegaConf.load(sample_yaml_path)
        with open(sample_yaml_path, 'r') as yaml_file:
          load_config = yaml.load(yaml_file, Loader=yaml.FullLoader)
        print(f"Editing config from: {sample_yaml_path}")

        # Check if the yaml configuration matches expected training config
        if compare_dict_structure(expected_training_config, load_config):

          # Create a list of model names and paths in the directory
          model_options = create_model_list()
          video_options = create_video_path_list()
          video_path_default = Path(load_config["train_data"]["video_path"])

          if not (os.path.exists(video_path_default) and os.path.isdir(video_path_default)):
            video_path_default = TRAINING_CONTENT_DIR_PATH

          value_model_path = Path(load_config['pretrained_model_path'])
          if value_model_path not in model_options:
            value_model_path = model_options[0][1]
          # =====================================
          ## Basic Data
          config_subheader1 = widgets.HTML(value="<h3>Basic Data</h3>")
          config_pretrained_model_path = Dropdown(options=model_options, description="pretrained_model_path:", value=value_model_path, style=configs_config_style, layout=configs_config_layout)
          config_output_dir_name = widgets.Text(description="output_dir_name:", value="", style=configs_config_style, layout=configs_config_layout)
          config_learning_rate = widgets.FloatText(description="learning_rate:", value=load_config["learning_rate"], style=configs_config_style, layout=configs_config_layout)
          # =====================================
          # train_data
          config_subheader2 = widgets.HTML(value="<h3>Train Data</h3>")
          config_video_path = Dropdown(options=video_options, description="video_path:", value=video_path_default, style=configs_config_style, layout=configs_config_layout)
          config_n_sample_frames = widgets.IntText(description="n_sample_frames:", value=load_config["train_data"]["n_sample_frames"], style=configs_config_style, layout=configs_config_layout)
          config_train_data_width = widgets.IntText(description="width:", value=load_config["train_data"]["width"], style=configs_config_style, layout=configs_config_layout)
          config_sample_frame_rate = widgets.IntText(description="sample_frame_rate:", value=load_config["train_data"]["sample_frame_rate"], style=configs_config_style, layout=configs_config_layout)
          # =====================================
          config_train_batch_size = widgets.IntText(description="train_batch_size:", value=load_config["train_batch_size"], style=configs_config_style, layout=configs_config_layout)
          config_max_train_steps = widgets.IntText(description="max_train_steps:", value=load_config["max_train_steps"], style=configs_config_style, layout=configs_config_layout)
          config_seed = widgets.IntText(description="seed:", value=load_config["seed"], style=configs_config_style, layout=configs_config_layout)
          config_mixed_precision = widgets.Text(description="mixed_precision:", value=load_config["mixed_precision"], style=configs_config_style, layout=configs_config_layout)
          config_use_8bit_adam = Dropdown(options=boolean_dropdown, value=load_config["use_8bit_adam"], description="config_use_8bit_adam:", style=configs_config_style, layout=configs_config_layout)
          config_gradient_checkpointing = Dropdown(options=boolean_dropdown, value=load_config["gradient_checkpointing"], description="gradient_checkpointing:", style=configs_config_style, layout=configs_config_layout)
          config_enable_xformers_memory_efficient_attention = Dropdown(options=boolean_dropdown, value=load_config["enable_xformers_memory_efficient_attention"], description="enable_xformers_memory_efficient_attention:", style=configs_config_style, layout=configs_config_layout)
          # =====================================
          ## Button Widget
          config_save_btn = widgets.Button(description="Save", layout=configs_config_button_layout)
          ## Group widgets
          config_vbox = widgets.VBox([
              config_pretrained_model_path,
              config_output_dir_name,
              config_learning_rate,
              config_train_batch_size,
              config_max_train_steps,
              config_seed,
              config_mixed_precision,
              config_use_8bit_adam,
              config_gradient_checkpointing,
              config_enable_xformers_memory_efficient_attention
          ])
          config_vbox_train_data = widgets.VBox([
              config_video_path,
              config_n_sample_frames,
              config_train_data_width,
              config_sample_frame_rate,
          ])

          # Display fields (same as before)
          display(
            config_subheader1,
            config_vbox,
            config_subheader2,
            config_vbox_train_data,
            config_save_btn
          )

          # Create listeners
          def save_config(change):
            config = {
                "pretrained_model_path": Path(config_pretrained_model_path.value).as_posix(),
                "output_dir": Path(f"{CUSTOM_MODEL_DIR_PATH}/{config_output_dir_name.value}").as_posix(),
                "train_data": {
                    "video_path": Path(config_video_path.value).as_posix(),
                    "n_sample_frames": config_n_sample_frames.value,
                    "width": config_train_data_width.value,
                    "sample_frame_rate": config_sample_frame_rate.value
                },
                "learning_rate": config_learning_rate.value,
                "train_batch_size": config_train_batch_size.value,
                "max_train_steps": config_max_train_steps.value,
                "trainable_modules": load_config["trainable_modules"],
                "seed": config_seed.value,
                "mixed_precision": config_mixed_precision.value,
                "use_8bit_adam": config_use_8bit_adam.value,
                "gradient_checkpointing": config_gradient_checkpointing.value,
                "enable_xformers_memory_efficient_attention": config_enable_xformers_memory_efficient_attention.value
            }

            if Path(config_video_path.value) == TRAINING_CONTENT_DIR_PATH:
              print("\r", "Please choose a training folder!", end="")

            else:
              #Save updated config back into yaml file
              with open(sample_yaml_path, "w") as file:
                yaml.dump(config, file, default_style='"', default_flow_style=False, sort_keys=False)

              print("\r", "Saving...", end="")
              time.sleep(2)
              print("\r", "Successfully saved!", end="")

          # Attach Listeners
          config_save_btn.on_click(save_config)

          # Update the displayed_widgets list
          displayed_widgets.extend([
              config_subheader1,
              config_vbox,
              config_subheader2,
              config_vbox_train_data,
              config_save_btn,
          ])

        else:
          print("The configuration for this yaml is not structured correctly for training!")

# Attach the event handler to the dropdown's 'value' trait
config_files_dropdown.observe(update_load_config, names='value')

# Display the dropdown widgets
display(config_files_dropdown)


#### **Perform Training** ▶️

1. Running the code would immediately start the training process.

2. Once the training process starts, there will be a 2 minute buffer to load the necessary data for training.

3. After the buffer, a progress bar would show displaying the progress of the training together with the percentage of completion.

4. Finally when the training is completed, the newly generated model will be saved in the output directory path set by the user in the Training Configuration section.

In [None]:
# @title Run Training { display-mode: "form" }

%cd -q {FYP_DIR_PATH}

output_label = widgets.Label(value="")
display(output_label)

command = [
    "accelerate", "launch", "train_followyourpose.py",
    "--config=configs/pose_train.yaml"
]
run_command(command)

# def run_command_and_display_output(command):
#     process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
#     for line in process.stdout:
#         output_label.value = line.strip()  # Update the label with the live output
#     process.wait()

# For cross-platform compatibility, we do not run with flag TORCH_DISTRIBUTED_DEBUG=DETAIL
# run_command_and_display_output('accelerate launch train_followyourpose.py --config="configs/pose_train.yaml"')
# !TORCH_DISTRIBUTED_DEBUG=DETAIL accelerate launch train_followyourpose.py --config="configs/pose_train.yaml"

## Testing (WIP)

Testing Section:

1. Play the Test Configuration cell.
2. Update the configurations and **Save**. (Details for each field will be stated above the cell).
3. Play the Convert Gif to Mp4 Function cell.
4. Play Run Test cell and select Skeleton Folder and **Start Test**.
5. Play Comparison MMPose Inference and select the model chosen in the configuration.
6. Play Score Function cell.
7. Play CSV Function cell.
8. Play Scoring Skeleton cell.
9. Select the model chosen in the configuration and any video.
10. Play Test Grading cell.
11. Select the same model and click **Display**.
12. Slide and choose a score for each prompt and click **Save**.
13. Done.

**Test Configuration** ⚙️

**pretrained_model_path**: The path that contains the model to be used for inference.

**Validation Data**:
> **prompts**: A list of texts that the gifs will be generated based on.
>
> **video_length**: Number of frames referenced from the pose video.
>
> **num_inference_steps**: Higher the value the more relistic a video would be in exchange for higher memory usage, computational resouces and time spent to infer.


In [None]:
# @title Test Configuration { display-mode: "form" }

# Init
%cd -q {FYP_DIR_PATH}

# Initialize load_config with a default value
load_config = None

# Define a container for displayed widgets
displayed_widgets = []

# Get a list of all files in the directory
config_files = [f for f in os.listdir(CONFIG_DIR_PATH) if os.path.isfile(os.path.join(CONFIG_DIR_PATH, f))]
config_files.remove(".gitignore")

# Create a dropdown widget with the list of config files
config_files_dropdown = Dropdown(
  options=["- Select an Item -"] + config_files,
  description='Select a Config File:',
  layout=Layout(width="500px"),
  style={'description_width': '150px'}
)

checkpoints_path = ""

# Function to clear displayed widgets (excluding the dropdown)
def clear_displayed_widgets():
    for widget in displayed_widgets:
        widget.close()
    displayed_widgets.clear()
    display(config_files_dropdown)  # Display the dropdown again

# Function to update the load_config variable based on the selected filename
def update_load_config(change):
    global load_config
    selected_filename = change.new
    if selected_filename and selected_filename != "- Select an Item -":
        clear_output(wait=True)  # Clear the output area
        clear_displayed_widgets()  # Clear previously displayed widgets

        # Load yaml file
        sample_yaml_path = f"{CONFIG_DIR_PATH}/{selected_filename}"
        with open(sample_yaml_path, 'r') as yaml_file:
          load_config = yaml.load(yaml_file, Loader=yaml.FullLoader)
        print(f"Editing config from: {sample_yaml_path}")

        # Check if the yaml configuration matches expected training config
        if compare_dict_structure(expected_inference_config, load_config):

          # Create a list of model names and paths in the directory
          model_options = create_model_list()

          value_model_path = Path(load_config['pretrained_model_path'])
          if value_model_path not in model_options:
            value_model_path = model_options[0][1]

          checkpoints_path = Path(load_config['resume_from_checkpoint'])
          if not os.path.isdir(checkpoints_path):
             checkpoints_path = Path(f"{CHECKPOINT_DIR_PATH}/{checkpoints_path.name}")

          # =====================================
          ## Basic Data
          config_subheader1 = widgets.HTML(value="<h3>Basic Data</h3>")
          config_pretrained_model_path = Dropdown(options=model_options, description="pretrained_model_path:", value=value_model_path, style=configs_config_style, layout=configs_config_layout)
          # =====================================
          ## Validation_data
          config_subheader2 = widgets.HTML(value="<h3>Validation Data</h3>")
          config_prompts = widgets.Textarea(description="prompts:", value="\n".join(load_config['validation_data']['prompts']), style=configs_config_style, layout=widgets.Layout(width="500px", height="100px"))
          config_video_length = widgets.IntText(description="video_length:", value=load_config['validation_data']['video_length'], style=configs_config_style, layout=configs_config_layout)
          config_num_inference_steps = widgets.IntText(description="num_inference_steps:", value=load_config['validation_data']['num_inference_steps'], style=configs_config_style, layout=configs_config_layout)
          # =====================================
          ## Group widgets
          config_vbox = widgets.VBox([
              config_pretrained_model_path
          ])
          config_vbox_validation_data = widgets.VBox([
              config_prompts,
              config_video_length,
              config_num_inference_steps
          ])

          # Create listeners
          def save_config(change):
            load_config['pretrained_model_path'] = config_pretrained_model_path.value.as_posix()
            load_config['validation_data']['prompts'] = [prompt.strip() for prompt in config_prompts.value.splitlines() if prompt.strip()]
            load_config['validation_data']['video_length'] = config_video_length.value
            load_config['validation_data']['num_inference_steps'] = config_num_inference_steps.value
            load_config['resume_from_checkpoint'] = checkpoints_path.as_posix()

            # Save updated config back into the yaml file
            with open(sample_yaml_path, "w") as file:
              yaml.dump(load_config, file, default_style='"', default_flow_style=False, sort_keys=False)

            print("\r", "Saving...", end="")
            time.sleep(2)
            print("\r", "Successfully saved!", end="")

          ## Button Widget and Attach Listener
          config_save_btn = widgets.Button(description="Save", layout=configs_config_button_layout)
          config_save_btn.on_click(save_config)

          # Display fields (same as before)
          display(
            config_subheader1,
            config_vbox,
            config_subheader2,
            config_vbox_validation_data,
            config_save_btn
          )

          # Update the displayed_widgets list
          displayed_widgets.extend([
              config_subheader1,
              config_vbox,
              config_subheader2,
              config_vbox_validation_data,
              config_save_btn
          ])

        else:
          print("The configuration for this yaml is not structured correctly for testing!")

# Attach the event handler to the dropdown's 'value' trait
config_files_dropdown.observe(update_load_config, names='value')

# Display the dropdown widget and the output widget
display(config_files_dropdown)

In [None]:
# @title Convert Gif to Mp4 Function { display-mode: "form" }
# Converts the gifs to mp4
def convertGifs(inf_path):
  raw_gifs = [f for f in os.listdir(f"{inf_path}/raw") if os.path.isfile(os.path.join(f"{inf_path}/raw", f))]
  print(raw_gifs)

  for gif in raw_gifs:
    # Load the GIF using VideoFileClip
    gif_path = Path(f"{inf_path}/raw/{gif}")
    gif_clip = VideoFileClip(gif_path.as_posix())

    os.makedirs(f"{inf_path}/gif_to_video", exist_ok=True)

    # Define the output MP4 file name
    mp4_path =  Path(f"{inf_path}/gif_to_video/{gif[:-4]}.mp4")

    # Write the GIF as an MP4 video
    gif_clip.write_videofile(mp4_path.as_posix(), codec='libx264')

  # Convert the pose human gif to mp4 for testing
  human_gif_path = Path(f"{inf_path}/pose-human.gif")
  human_gif_clip = VideoFileClip(human_gif_path.as_posix())

  # Define the output MP4 file name
  human_mp4_path =  Path(f"{inf_path}/gif_to_video/pose-human.mp4")

  # Write the GIF as an MP4 video
  human_gif_clip.write_videofile(human_mp4_path.as_posix(), codec='libx264')

def postprocess_gif_test(model_name):
  model_path = Path(f"{TEST_OUTPUT_DIR_PATH}/{model_name}")
  for pose_name in os.listdir(model_path):
    pose_path = Path(f"{model_path}/{pose_name}")
    if not os.path.isdir(pose_path):
      continue

    raw_path = Path(f"{pose_path}/raw")
    superimpose_path = Path(f"{pose_path}/superimposed")
    pose_gif_path = Path(f"{pose_path}/pose.gif")
    flag = True

    for gif in os.listdir(raw_path):
      if gif.startswith("."):
        continue

      if flag:
        flag = False
        raw_path_first = Path(f"{pose_path}/raw/{gif}")
        reader = imageio.get_reader(raw_path_first)
        width, height, _ = reader.get_data(0).shape
        pose_gif_video = VideoFileClip(pose_gif_path.as_posix()).resize((width,height))

      clips = []
      clips.append(pose_gif_video)

      superimpose_gif_path = Path(f"{superimpose_path}/{gif}")
      superimpose_gif_video = VideoFileClip(superimpose_gif_path.as_posix())
      clips.append(superimpose_gif_video)

      raw_gif_path = Path(f"{raw_path}/{gif}")
      raw_gif_video = VideoFileClip(raw_gif_path.as_posix())
      clips.append(raw_gif_video)

      black_clip = ColorClip(size=(pose_gif_video.w * 3, pose_gif_video.h + 40), color=(0, 0, 0), duration=pose_gif_video.duration)
      txt_clip = TextClip(gif[:-4], font="Amiri-bold", fontsize=30, color='white')
      txt_clip = txt_clip.set_duration(pose_gif_video.duration)
      txt_clip = txt_clip.set_position(("center", "bottom"))

      gif_output_path = Path(f"{pose_path}/processed/{gif}")
      result = CompositeVideoClip([black_clip, clips_array([clips]), txt_clip])
      result.write_gif(gif_output_path, fps=7.692, verbose=False, logger=None)

In [None]:
# @title Run Test { display-mode: "form" }

dataset_dir_folders = os.listdir(VIDEO_DIR_PATH)

# Create layout
layout_single_long_button = widgets.Layout(margin='0px 0px 20px 154px')

# Create widgets
config_file_path = widgets.Text(value='pose_test.yaml',description="Config File:", disabled=True)
video_dir_select = widgets.Dropdown(options=[dir for dir in dataset_dir_folders if not dir.startswith(".")], description='Dataset Folder:', value=None)
video_dir_select = widgets.Dropdown(options=dataset_dir_folders, value=None,description="Folder", disabled=False)
test_button = widgets.Button(description=f"Start Test", layout=layout_single_long_button, disabled=True)

# Create listeners
def test_dataset_dir_select(change):
  if video_dir_select:
    dataset_folder_content = os.listdir(Path(f"{VIDEO_DIR_PATH}/{video_dir_select.value}"))
    # Get all skeletons
    total_dataset_videos = len([f for f in dataset_folder_content if ((f.endswith('.mp4') or f.endswith('.avi')) and not f.startswith('.'))])

    test_button.disabled = False
    test_button.description = f"Start Test ({total_dataset_videos})"

## Run FYP
def run_test(button):
  %cd -q {FYP_DIR_PATH}

  config_file_path_text = config_file_path.value
  full_config_file_path = Path(f"{CONFIG_DIR_PATH}/{config_file_path_text}")
  sample_yaml_path = str(full_config_file_path.as_posix())

  selected_folder = video_dir_select.value

  video_folder = Path(f"{VIDEO_DIR_PATH}/{selected_folder}")
  dataset_folder_content = os.listdir(video_folder)
  total_dataset_videos = [file for file in dataset_folder_content if (file.endswith('.mp4') or file.endswith('.avi'))]

  # Load yaml file
  with open(sample_yaml_path, 'r') as yaml_file:
    load_config = yaml.load(yaml_file, Loader=yaml.FullLoader)

  model_name = load_config["pretrained_model_path"].split("/")[-1]

  for video in total_dataset_videos:
    if not video.startswith("."):
      output_dir_folder_name = Path(f"{TEST_OUTPUT_DIR_PATH}/{model_name}/{video[:-4]}")
      load_config["output_dir"] = output_dir_folder_name
      print(output_dir_folder_name.as_posix())

      with open(sample_yaml_path, "w") as yaml_file:
        yaml.dump(load_config, yaml_file)

      current_skeleton_path = f"{video_folder}/{video}"
      print("\r", f"Now starting inference on {video}")
      # Start inference
      !accelerate launch txt2video.py \
          --config={sample_yaml_path}  \
          --skeleton_path={current_skeleton_path}

      clear_output()
      print(f"Inference on {video} has been completed!")

      # Retrieve info needed from the config file
      video_length = load_config['validation_data']['video_length']

      # Create a pose gif of all 3 types of pose (Human, Skeleton, Superimposed)
      postprocess_mmpose(current_skeleton_path, video_length, output_dir_folder_name)

      # Superimposes the pose onto generated gifs
      superimposeSkeleton(output_dir_folder_name)

      # Converts the raw gifs back to mp4 to get keypoints
      convertGifs(output_dir_folder_name)

      # Post process gifs
      postprocess_gif_test(model_name)

# Attach Listeners
test_button.on_click(run_test)
video_dir_select.observe(test_dataset_dir_select, 'value')

# Display fields
display(config_file_path)
display(video_dir_select)
display(test_button)

In [None]:
# @title Comparison MMPose Inference { display-mode: "form" }

# Helper Functions
## Re-encode video due to H.264 video encoding error
def reencode_video(input_file):
  temp_output_file = f"{TEST_OUTPUT_DIR_PATH}/output.mp4"
  # !ffmpeg -i {input_file} -c:v libx264 -crf 23 -c:a aac -strict experimental {temp_output_file}
  command = [
    "ffmpeg",
    "-i", input_file,
    "-c:v", "libx264",
    "-crf", "23",
    "-c:a", "aac",
    "-strict", "experimental",
    temp_output_file
  ]
  run_command(command)

  os.remove(input_file)
  os.rename(temp_output_file, input_file)

def getTestRunFolderContent(folder_name):
  runfolders = []
  for content in os.listdir(TEST_OUTPUT_DIR_PATH):
        content_path = os.path.join(TEST_OUTPUT_DIR_PATH, content)

        # Check if it's a directory and not hidden
        if os.path.isdir(content_path) and not content.startswith("."):
            runfolders.append(content)

  return runfolders

# Init
test_run_folders = os.listdir(TEST_OUTPUT_DIR_PATH)
test_run_folder_content = getTestRunFolderContent(test_run_folders)

# Create layout
layout_single_long_button = widgets.Layout(margin='0px 0px 20px 154px')

# Create widgets
test_folder_content_dropdown = widgets.Dropdown(options=test_run_folder_content, description='Folder:', value=None, disabled=False)
inf_mmpose_button = widgets.Button(description="Start Inference", disabled=False, layout=layout_single_long_button)

# Create listeners

## Run mmpose
def run_mmpose_inference(button):
    selected_video = test_folder_content_dropdown.value
    video_path = Path(f"{TEST_OUTPUT_DIR_PATH}/{selected_video}")
    test_folders = [f for f in os.listdir(video_path) if os.path.isdir(os.path.join(video_path, f))]

    %cd -q {MMPOSE_DIR_PATH}

    for folder in test_folders:
      if folder.startswith("."):
        continue

      mp4_path = Path(f"{video_path}/{folder}/gif_to_video")
      videos_to_process = [f for f in os.listdir(mp4_path) if os.path.isfile(os.path.join(mp4_path, f))]

      test_pose_path = Path(f"{video_path}/{folder}")

      generated_path = Path(f"{test_pose_path}/generated_poses")
      keypoints_folder = Path(f"{test_pose_path}/keypoints")

      os.makedirs(generated_path, exist_ok=True)
      os.makedirs(keypoints_folder, exist_ok=True)

      for video in videos_to_process:
        processed_video_path = Path(f"{mp4_path}/{video}")
        print(processed_video_path)

        command = [
          "python",
          "demo/inferencer_demo.py", str(processed_video_path),
          "--pose2d", "human",
          "--black-background",
          "--thickness", "4",
          "--radius", "0",
          "--vis-out-dir", str(generated_path),
          "--pred-out-dir", str(keypoints_folder)
        ]
        run_command(command)

        print("Done, Outputs:")
        print(f"Saved at: {generated_path}/{video}")

      clear_output()
      mmpose_inf_display()
      print("Done")
      print(f"Saved at: {generated_path}")

# Attach Listeners
inf_mmpose_button.on_click(run_mmpose_inference)

# Display fields
def mmpose_inf_display():
  display(test_folder_content_dropdown)
  display(inf_mmpose_button)
mmpose_inf_display()

In [None]:
# @title Score Function { display-mode: "form" }

def calculate_score(skeleton1, skeleton2):
  all_keypoints1 = []
  all_keypoints2 = []

  for frame in skeleton1:
    for instance in frame["instances"]:
      keypoints = instance["keypoints"]
      all_keypoints1.append(keypoints)

  for frame in skeleton2:
    for instance in frame["instances"]:
      keypoints = instance["keypoints"]
      all_keypoints2.append(keypoints)

  # Perform keypoint-based similarity measurement (e.g., using Euclidean distance)
  def euclidean_distance(p1, p2):
      return np.linalg.norm(np.array(p1) - np.array(p2))

  # Calculate the similarity score based on keypoint positions
  similarity_scores_per_frame = []
  similarity_scores = []
  keep_count_outer = 1
  for keypoints1, keypoints2 in zip(all_keypoints1, all_keypoints2):
    for point1, point2 in zip(keypoints1, keypoints2):

      filtered_keypoints1 = []
      filtered_keypoints2 = []

      # Remove all keypoints <= 0 as values < 0 are out of bounds (if origin is top left)
      if point1[0] > 0 and point2[0] > 0:
        filtered_keypoints1.append(point1)
        filtered_keypoints2.append(point2)

      distance = euclidean_distance(filtered_keypoints1, filtered_keypoints2)
      similarity_scores_per_frame.append(distance)

    per_frame_similarity = sum(similarity_scores_per_frame) / len(similarity_scores_per_frame)
    similarity_scores.append(per_frame_similarity)
    keep_count_outer += 1

  # Calculate an overall similarity score (e.g., average or sum of distances)
  # Scores from 0 to 100 with 0 being identical and 100 being the entire worlds apart
  overall_similarity = 100 - ((sum(similarity_scores) / len(similarity_scores)) / 10)

  return overall_similarity

In [None]:
# @title CSV Function { display-mode: "form" }

def create_or_append_csv(filename, data):
  # Check if the file exists
  file_exists = os.path.isfile(filename)
  csv_header = ["Pose", "Prompt", "Pose Score", "Environment Score"]

  with open(filename, 'a', newline='') as csv_file:
    csv_writer = csv.writer(csv_file)

    # If the file doesn't exist, write the header row
    if not file_exists and csv_header:
        csv_writer.writerow(csv_header)

    # Write the data
    csv_writer.writerow(data)

def update_manual_score(csv_filename, pose, prompt, new_manual_score):
  csv_header = ["Pose", "Prompt", "Pose Score", "Environment Score"]
  # Read the CSV file into a list of dictionaries
  rows = []
  with open(csv_filename, mode='r') as csv_file:
    reader = csv.DictReader(csv_file)
    for row in reader:
      rows.append(row)

  # Find the row that matches the given pose and prompt
  for row in rows:
    if row["Pose"] == pose and row["Prompt"] == prompt:
      row["Environment Score"] = new_manual_score
      break  # Stop searching once a match is found

  # Write the updated data back to the CSV file
  with open(csv_filename, mode='w', newline='') as csv_file:
    fieldnames = csv_header  # Use the original header
    writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerows(rows)

In [None]:
# @title Scoring Skeleton { display-mode: "form" }

def getTestRunFolderContent(folder_name):
  runfolders = []
  for content in os.listdir(TEST_OUTPUT_DIR_PATH):
        content_path = os.path.join(TEST_OUTPUT_DIR_PATH, content)

        # Check if it's a directory and not hidden
        if os.path.isdir(content_path) and not content.startswith("."):
            runfolders.append(content)

  return runfolders

# Init
test_run_folders = os.listdir(TEST_OUTPUT_DIR_PATH)
test_run_folder_content = getTestRunFolderContent(test_run_folders)

# Create widgets
select_keypoints_dropdown = widgets.Dropdown(options=test_run_folder_content, description='Folder:', value=None, disabled=False)
skeleton_test_folder_dropdown = widgets.Dropdown(options=[], description='Pose to Score:', disabled=True)
similarity_score_button = widgets.Button(description="Start Scoring", disabled=False)

## Update skeleton folder dropdown options based on the selected model folder
def folder_subdir_select_pose(button):
  selected_video_folder = select_keypoints_dropdown.value

  if selected_video_folder != None:
    skeleton_test_folder_content = [f for f in os.listdir(f"{TEST_OUTPUT_DIR_PATH}/{selected_video_folder}") if not f.startswith(".")]
  else:
    skeleton_test_folder_content = []

  skeleton_test_folder_dropdown.options = skeleton_test_folder_content
  if not skeleton_test_folder_content:
      skeleton_test_folder_dropdown.disabled = False
      skeleton_test_folder_dropdown.value = None
  else:
      skeleton_test_folder_dropdown.disabled = False

def start_score(button):
  clear_outputs()
  selected_video_folder = select_keypoints_dropdown.value
  selected_skeleton = skeleton_test_folder_dropdown.value

  keypoints_path =  Path(f"{TEST_OUTPUT_DIR_PATH}/{selected_video_folder}/{selected_skeleton}/keypoints")
  human_pose_path = Path(f"{keypoints_path}/pose-human.json")

  # Assuming skeleton1 and skeleton2 are dictionaries or data structures
  # with keypoint positions, e.g., skeleton1['keypoints'] and skeleton2['keypoints']
  with open(human_pose_path, 'r') as json_human:
    skeleton2 = json.load(json_human)

  for kp_file in os.listdir(keypoints_path):
    if kp_file == "pose-human.json":
      continue

    pose_inf_path = Path(f"{keypoints_path}/{kp_file}")
    with open(pose_inf_path, 'r') as json_file1:
      skeleton1 = json.load(json_file1)

    calculated_score = calculate_score(skeleton1, skeleton2)

    csv_path = Path(f"{TEST_OUTPUT_DIR_PATH}/{selected_video_folder}/score.csv")
    create_or_append_csv(csv_path, [selected_skeleton, kp_file[:-5], calculated_score, ""])

def clear_outputs():
  clear_output()
  display(select_keypoints_dropdown)
  display(skeleton_test_folder_dropdown)
  display(similarity_score_button)

# Attach Listeners
select_keypoints_dropdown.observe(folder_subdir_select_pose, 'value')
similarity_score_button.on_click(start_score)

# Display widgets
display(select_keypoints_dropdown)
display(skeleton_test_folder_dropdown)
display(similarity_score_button)

In [None]:
# @title Test Grading { display-mode: "form" }

# Helper Functions
def getModelFolderContent():
  subfolders = []
  for content in os.listdir(TEST_OUTPUT_DIR_PATH):
        content_path = os.path.join(TEST_OUTPUT_DIR_PATH, content)

        # Check if it's a directory and not hidden
        if os.path.isdir(content_path) and not content.startswith("."):
            subfolders.append(content)

  return subfolders

# Init
model_dir_folders_content = getModelFolderContent()

model_modules = []
model_modules_metadata = []

# Create layout
layout_single_button = widgets.Layout(width="143px")
layout_style = {'description_width': '50px'}
layout_style_2 = {'description_width': '120px'}
layout_dropdown = widgets.Layout(width="200px")
layout_dropdown_2 = widgets.Layout(width="200px")
layout_hbox = widgets.Layout(margin='0px 0px 0px 88px')
layout_output = outputWidgetsLayout()

# Create widgets
model_output_placeholder = widgets.Output(layout=layout_output)
model_subdir_dropdown = widgets.Dropdown(options=model_dir_folders_content, description='Folder:', value=None, disabled=False, style=layout_style, layout=layout_dropdown)
model_display_button = widgets.Button(description="Display", disabled=True, layout=layout_single_button)
model_refresh_button = widgets.Button(description="Refresh", disabled=False, layout=layout_single_button)
model_save_button = widgets.Button(description="Save", disabled=True, layout=layout_single_button)
model_status_label = widgets.Label(value="", layout=layout_single_button)

model_output_placeholder_content = widgets.HTML("""
  <div style="width: 512px; height: 276px; border-radius: 5%; background-color: black; margin: 0 auto; display: flex; justify-content: center; align-items: center;">
      <div style="width: 500px; height: 270px; border-radius: 5%; border: 2px solid white;" />
  </div>
""")

def display_selected_model(change):
  global model_modules, model_modules_metadata
  selected_model_folder = model_subdir_dropdown.value

  if selected_model_folder:
    model_status_label.value = "Loading display ..."
    model_pose_path = Path(f"{TEST_OUTPUT_DIR_PATH}/{selected_model_folder}")
    model_score_path = Path(f"{model_pose_path}/score.csv")

    score_dict = {}

    with open(model_score_path, newline='') as csvfile:
      csv_reader = csv.reader(csvfile)
      for row in csv_reader:
        key = f"{row[0]}-{row[1]}"
        score_dict[key] = row[2:]

    gif_widgets = []
    model_modules = []
    model_modules_metadata = []
    for pose in os.listdir(model_pose_path):
      pose_path = Path(f"{model_pose_path}/{pose}")
      if not os.path.isdir(pose_path) or pose.startswith("."):
        continue

      process_path = Path(f"{pose_path}/processed")
      for gif in os.listdir(process_path):
        if gif.startswith("."):
          continue

        img_bin = open(Path(f"{process_path}/{gif}"), 'rb').read()
        image_widget = widgets.Image(value=img_bin, format="gif", height=276, width=276*3)
        gif_widgets.append(image_widget)

        score_key = f"{pose}-{gif[:-4]}"
        score_data = score_dict.get(score_key)

        tb_pose = widgets.FloatText(description="Pose Score:", value="{:.2f}".format(float(score_data[0])) if score_data and score_data[0] else None, disabled=True, style=layout_style_2, layout=layout_dropdown_2)
        tb_env = widgets.IntSlider(
          value=score_data[1] if score_data and score_data[1] else 4,
          min=1,
          max=7,
          step=1,
          disabled=False,
          continuous_update=False,
          orientation='horizontal',
          readout=True,
          readout_format='d',
          description="Environment Score:",
          style=layout_style_2,
          layout=widgets.Layout(width="300px")
        )
        vbox = widgets.VBox([tb_pose, tb_env])

        model_modules_metadata.append((pose, gif[:-4]))
        model_modules.append(widgets.HBox([image_widget, vbox]))


    # Clear the output placeholder
    with model_output_placeholder:
      clear_output(wait=True)

      gif_vbox = widgets.VBox(model_modules)
      display(gif_vbox)

    model_status_label.value = ""

  if model_modules:
    model_save_button.disabled = False
  else:
    model_save_button.disabled = True

def refresh_folder_and_directory(change):
  model_dir_folders = os.listdir(TEST_OUTPUT_DIR_PATH)

  model_subdir_dropdown.options = model_dir_folders
  model_subdir_dropdown.value = None

def enable_folder_button(change):
  model_save_button.disabled = True
  if model_subdir_dropdown.value:
    model_display_button.disabled = False
  else:
    model_display_button.disabled = True

def save_score(change):
  global model_modules, model_modules_metadata
  model_status_label.value = "Saving ..."
  selected_model_folder = model_subdir_dropdown.value
  model_pose_path = Path(f"{TEST_OUTPUT_DIR_PATH}/{selected_model_folder}")
  model_score_path = Path(f"{model_pose_path}/score.csv")

  headers = []
  score_dict = {}
  with open(model_score_path, newline='') as csvfile:
    csv_reader = csv.DictReader(csvfile)
    headers = csv_reader.fieldnames

    for row in csv_reader:
      key = f"{row['Pose']}-{row['Prompt']}"
      score_dict[key] = {
        "values": [row['Pose Score'], row['Environment Score']],
        "pose": row['Pose'],
        "prompt": row['Prompt']
      }

  for components, meta in zip(model_modules, model_modules_metadata):
    _, vbox = components.children
    _, env_score = vbox.children
    pose, prompt = meta

    score_key = f"{pose}-{prompt}"
    score_dict[score_key]["values"][1] = env_score.value

  score_list = [[entry["pose"], entry["prompt"]] + entry["values"] for entry in score_dict.values()]
  with open(model_score_path, 'w', newline='') as csvfile:
    csv_writer = csv.writer(csvfile)
    csv_writer.writerows([headers] + score_list)

  model_status_label.value = "Saved"
  print("Done")

## Attach Listeners
model_subdir_dropdown.observe(enable_folder_button, 'value')
model_display_button.on_click(display_selected_model)
model_refresh_button.on_click(refresh_folder_and_directory)
model_save_button.on_click(save_score)

hr = widgets.HTML("<hr>", layout=widgets.Layout(margin="3px 0px 10px 0px"))
with model_output_placeholder:
  display(model_output_placeholder_content)

pane = widgets.HBox([
  model_subdir_dropdown,
  model_refresh_button,
  model_display_button,
  model_save_button,
  model_status_label
])
vbox = widgets.VBox([pane, hr, model_output_placeholder], layout=widgets.Layout(margin="10px 0px 10px 0px"))
display(vbox)

print(getModelFolderContent())