#LICENSE

Copyright 2024 DeepMind Technologies Limited.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

# Pre-Requisities

In [None]:
# @title Install libraries
!pip install --upgrade https://github.com/ytdl-org/youtube-dl/archive/master.zip --quiet

!pip install mediapy ego4d awscli --quiet

!apt-get -qq update
!apt-get -qq install ffmpeg

In [None]:
# @title Imports
import json
import os

from typing import Any, List, Dict, Union

from matplotlib.animation import FuncAnimation
import matplotlib.pyplot as plt

import mediapy as mpy
import numpy as np
import pandas as pd

In [None]:
# @title  Common Functions to both datasets

def create_count_video(
  frames: Union[List[np.ndarray], np.ndarray],
  start: int,
  end: int,
  count: int,
  desc: str,
  output_file: str = "/tmp/output.mp4",) -> None:
  """
  Creates a video that displays a frame-by-frame count alongside the original
  video frames.

  Args:
    frames: A list of numpy arrays representing the video frames.
    start: The frame index where the count should start.
    end: The frame index where the count should end.
    count: The final count to be reached at the end frame.
    desc: A description to be displayed on the video.
    output_file: The path to save the output video. Defaults to
    '/tmp/output.mp4'.
  """
  num_frames = len(frames)
  labels = np.zeros((num_frames))
  start, end = np.clip([start, end], 0, num_frames)
  for i in range(start, end):
    labels[i] = int(np.ceil(count * (i-start) / (end - start)))

  signal_2d = np.tile(np.expand_dims(labels, axis=0), [32, 1])

  fig, axs = plt.subplots(2, 1,
                          figsize=(5,5),
                          gridspec_kw={'height_ratios': [1, 10]})


  frames = mpy.resize_video(frames, (320, 320))
  im = axs[1].imshow(frames[0])
  axs[1].set_title(desc, fontsize=16)
  axs[1].set_xticks([])
  axs[1].set_yticks([])
  axs[0].imshow(signal_2d*2, cmap='GnBu')
  red_dot, = axs[0].plot([], [], 'ro')

  plt.tight_layout()
  plt.axis('off')
  plt.grid(False)

  def update_count_plot(i):
    """Updates the count and frame in plot."""
    im.set_data(frames[i])
    red_dot.set_data([i], [16])
    axs[0].set_xticks([])
    axs[0].set_yticks([])


  anim = FuncAnimation(
      fig,
      update_count_plot,
      frames=np.arange(1, num_frames),
      interval=30,
      blit=False,
  )
  anim.save(output_file, dpi=100, fps=24)
  fig.clear()
  plt.close(fig)
  plt.close()
  mpy.show_video(mpy.read_video('/tmp/output.mp4'))

# Download OVR Annotations

In [None]:
PATH_TO_KINETICS_ANNOTATIONS = "https://storage.googleapis.com/semantic_repetitions/ovr_kinetics_release.json"
PATH_TO_EGO4D_ANNOTATIONS = "https://storage.googleapis.com/semantic_repetitions/ovr_ego4d_release.json"

!wget -q  $PATH_TO_KINETICS_ANNOTATIONS $PATH_TO_EGO4D_ANNOTATIONS

PATH_TO_KINETICS_700_2020_ANNOTATIONS_TRAIN = "https://s3.amazonaws.com/kinetics/700_2020/annotations/train.csv"
PATH_TO_KINETICS_700_2020_ANNOTATIONS_VAL = "https://s3.amazonaws.com/kinetics/700_2020/annotations/val.csv"
PATH_TO_KINETICS_700_2020_ANNOTATIONS_TEST = "https://s3.amazonaws.com/kinetics/700_2020/annotations/test.csv"


!wget -q $PATH_TO_KINETICS_700_2020_ANNOTATIONS_TRAIN $PATH_TO_KINETICS_700_2020_ANNOTATIONS_VAL $PATH_TO_KINETICS_700_2020_ANNOTATIONS_TEST

dfs = []
for split in ["train", "val", "test"]:
  dfs.append(pd.read_csv(f"{split}.csv"))
df = pd.concat(dfs)
df = df.set_index('youtube_id')

with open('ovr_kinetics_release.json') as f:
  kinetics_data = json.load(f)

with open('ovr_ego4d_release.json') as f:
  ego4d_data = json.load(f)

# Kinetics Explorer

In [None]:
# @title Kinetics Specific Functions

def download_kinetics_video(
    video_id: str,
    start_time: float,
    end_time: float,
    output_path: str = '/tmp/video.mp4',
    fps: int = 25
) -> None:
  """
  Downloads a specific segment of a Kinetics video from YouTube using youtube-dl
   and ffmpeg.

  Args:
    video_id: The YouTube video ID.
    start_time: The start time of the segment to download (in seconds).
    end_time: The end time of the segment to download (in seconds).
    output_path: The path to save the downloaded video. Defaults to
    '/tmp/video.mp4'.
    fps: The desired frame rate of the downloaded video. Defaults to 25.
  """
  duration = end_time - start_time
  os.system("ffmpeg" + " -y " +
            f"-ss {start_time}" +
            " -i $(youtube-dl -f 18 --get-url " +
            f"https://youtube.com/v/{video_id}" +
            f") -t {duration} -c:v libx264 -r {fps} -vsync 0 {output_path}")


def get_random_ovrc_kinetics_video(
    anno_list: List[Dict[str, Any]],
    visualize_only_repetition: bool,
    tmp_output_path: str = '/tmp/output.mp4',
    fps: int = 25
) -> None:
  """
  Fetches a random video from the Kinetics dataset with repetition annotations,
  downloads the relevant segment, and optionally visualizes only the repetition.

  Args:
    anno_list: A list of dictionaries containing annotation data for Kinetics
      videos.
    visualize_only_repetition: If True, only the repetition segment is
    visualized. If False, the entire video with a count overlay is shown.
    tmp_output_path: The temporary path to save the downloaded video. Defaults
      to '/tmp/output.mp4'.
    fps: The frame rate to use for video processing. Defaults to 25.
  """
  DELTA = 10  # All Kinetics videos are 10s long.

  if os.path.exists(tmp_output_path):
    os.remove(tmp_output_path)
  random_anno = np.random.choice(anno_list)
  yt_id = random_anno['video_id']
  if yt_id not in df.index:
    print(f"Video {yt_id} not found in Kinetics annotations. Possibly deleted.")
    return
  yt_clip_start_time = df.loc[yt_id]['time_start']
  random_anno = np.random.choice(random_anno['ovr_annotations'])

  download_kinetics_video(yt_id,
                          yt_clip_start_time,
                          yt_clip_start_time + DELTA,
                          tmp_output_path, fps=fps)
  if os.path.exists(tmp_output_path):
    frames = mpy.read_video(tmp_output_path)
  else:
    print(f"Failed to get video {yt_id} at this time.")
    return
  start_idx = int(fps * random_anno['start_time'])
  end_idx = int(fps * random_anno['end_time'])

  if visualize_only_repetition:
    mpy.show_video(frames[start_idx:end_idx],
                 title=f"Count: {random_anno['count']},"
                       f"Desc: {random_anno['description']}")
  else:
    create_count_video(frames,
                       start_idx,
                       end_idx,
                       random_anno['count'],
                       random_anno['description'])


In [None]:
# @title Visualize samples from OVRC-Kinetics dataset.
num_videos = 10 # @param {type:"slider", min:1, max:25, step:1}
visualize_only_repetition = False # @param {type:"boolean"}
for _ in range(num_videos):
  get_random_ovrc_kinetics_video(kinetics_data, visualize_only_repetition)

# Ego4D Explorer

In [None]:
# Configure using your Ego4D credentials  (https://ego4d-data.org/docs/start-here/)
# Get AWS Access Key ID and AWS Secret Access Key to access Ego4D data.
!aws configure

In [None]:
# @title Ego4D Functions

def download_ego4d_video(
  video_id: str,
  start_time: float,
  end_time: float,
  output_path: str = '/tmp/video.mp4',
  fps: int = 30
) -> None:
  """
  Downloads a specific segment of an Ego4D video using the ego4d CLI and ffmpeg.

  Args:
    video_id: The unique identifier of the Ego4D video.
    start_time: The start time of the segment to download (in seconds).
    end_time: The end time of the segment to download (in seconds).
    output_path: The path to save the downloaded video. Defaults to
      '/tmp/video.mp4'.
    fps: The desired frame rate of the downloaded video. Defaults to 30.
  """
  os.system(f'ego4d -y -o /tmp/ego4d --dataset full_scale --video_uids {video_id}')
  duration = end_time - start_time
  os.system("ffmpeg" + " -y " + f"-ss {start_time}" +
            f" -i /tmp/ego4d/v2/full_scale/{video_id}.mp4  -t {duration}" +
            f"-c:v libx264 -r {fps} -vsync 0 {output_path}")
  os.system(f"rm -rf /tmp/ego4d/v2/full_scale/{video_id}.mp4")


def get_random_ovrc_ego4d_video(
  anno_list: List[Dict[str, Any]],
  visualize_only_repetition: bool,
  tmp_output_path: str = '/tmp/output.mp4',
  fps: int = 30
) -> None:
  """
  Fetches a random video from the Ego4D dataset with repetition annotations,
  downloads the relevant segment, and optionally visualizes only the repetition.

  Args:
    anno_list: A list of dictionaries containing annotation data for Ego4D
      videos.
    visualize_only_repetition: If True, only the repetition segment is
      visualized. If False, the entire video with a count overlay is shown.
    tmp_output_path: The temporary path to save the downloaded video. Defaults
      to '/tmp/output.mp4'.
    fps: The frame rate to use for video processing. Defaults to 30.
  """
  DELTA = 10 + 2/fps
  # Random annotation sampling.
  if os.path.exists(tmp_output_path):
    os.remove(tmp_output_path)
  random_anno = np.random.choice(anno_list)
  video_id = random_anno['video_id']

  clip_start_time = random_anno['timestamp_sec']
  random_anno = np.random.choice(random_anno['ovr_annotations'])

  download_ego4d_video(video_id,
                       max(0, clip_start_time - DELTA/2),
                       clip_start_time + DELTA/2,
                       tmp_output_path,
                       fps=fps)
  # Visualize whole clip including non-repeating part.
  if os.path.exists(tmp_output_path):
    frames = mpy.resize_video(mpy.read_video(tmp_output_path),
    (640, 360))
  else:
    print(f"Failed to download video {video_id}")
    return
  start_idx = int(fps * random_anno['start_time'])
  end_idx = int(fps * random_anno['end_time'])

  if visualize_only_repetition:
    mpy.show_video(frames[start_idx:end_idx],
                 title=f"Count: {random_anno['count']},"
                 f"Desc: {random_anno['description']}")
  else:
    create_count_video(frames,
                       start_idx,
                       end_idx,
                       random_anno['count'],
                       random_anno['description'])


In [None]:
# @title Visualize samples from OVRC-Ego4D dataset.
num_videos = 10 # @param {type:"slider", min:1, max:25, step:1}
visualize_only_repetition = False # @param {type:"boolean"}
for _ in range(num_videos):
  get_random_ovrc_ego4d_video(ego4d_data, visualize_only_repetition)