<a href="https://colab.research.google.com/github/deepmind/perception_test/blob/main/data_visualisation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#@title Prerequisites
import sys
import os
import io
import json
import random
import imageio
import colorsys
import cv2
import numpy as np
import moviepy.editor as mvp
import matplotlib.pyplot as plt
import librosa.display

from IPython.display import Audio
from google.colab.patches import cv2_imshow
from scipy.io import wavfile
from typing import Tuple, List

In [None]:
#@title Download Dataset Sample
!mkdir data
!wget https://storage.googleapis.com/dm-perception-test/zip_data/sample_annotations.zip
!unzip sample_annotations.zip -d data/annotations
!rm sample_annotations.zip

!wget https://storage.googleapis.com/dm-perception-test/zip_data/sample_videos.zip
!unzip sample_videos.zip -d data/
!rm sample_videos.zip

In [3]:
#@title Utility Functions
def load_db_json(db_path: str) -> dict:
    """
    Loads a JSON file as a dictionary.

    Args:
        db_path (str): Path to the JSON file.

    Returns:
        dict: Loaded JSON data as a dictionary.
    """
    if not os.path.isfile(db_path):
        raise FileNotFoundError(f"No such file: '{db_path}'")

    with open(db_path, 'r') as f:
        label_dict = json.load(f)
        if not isinstance(label_dict, dict):
            raise TypeError("JSON file is not formatted as a dictionary.")
        return label_dict


def load_mp4_to_frames(filename: str) -> np.array:
    """
    Loads an MP4 video file and returns its frames as a NumPy array.

    Args:
        filename (str): Path to the MP4 video file.

    Returns:
        np.array: Frames of the video as a NumPy array.
    """
    assert os.path.exists(filename), f"File '{filename}' does not exist."
    cap = cv2.VideoCapture(filename)

    fps = int(cap.get(cv2.CAP_PROP_FPS))
    num_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))

    frames = np.empty((num_frames, height, width, 3), dtype=np.uint8)

    idx = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        frames[idx] = frame
        idx += 1

    cap.release()
    return frames


def get_video_frames(video_item: dict, video_path: str) -> np.array:
    """
    Loads frames of a video specified by an item from the dataset.

    Args:
        video_item (dict): Item from dataset containing metadata.
        video_path (str): Path to the directory containing videos.

    Returns:
        np.array: Frames of the video as a NumPy array.
    """
    video_file_path = os.path.join(video_path,
                                video_item['metadata']['video_id']) + '.mp4'
    frames = load_mp4_to_frames(video_file_path)
    assert video_item['metadata']['num_frames'] == frames.shape[0], \
        print(video_item['metadata']['num_frames'], frames.shape[0])
    return frames


def get_audio(audio_item: dict, audio_path: str) -> np.array:
    """
    Loads audio specified by an item from the dataset.

    Args:
        audio_item (dict): Item from dataset containing metadata.
        audio_path (str): Path to the directory containing audios.

    Returns:
        np.array: Audio as a NumPy array.
    """
    audio_file_path = os.path.join(audio_path,
                                audio_item['metadata']['video_id']) + '.wav'
    sample_rate, audio = wavfile.read(audio_file_path)

    assert audio_item['metadata']['audio_samples'] == audio.shape[0]
    assert audio_item['metadata']['audio_sample_rate'] == sample_rate

    return audio.astype(np.float32)

In [4]:
#@title Visualisation functions
def get_colors(num_colors: int) -> Tuple[int, int, int]:
    """
    Generate random colormaps for visualizing different objects and points.

    Parameters:
    - num_colors (int): The number of colors to generate.

    Returns:
    - Tuple[int, int, int]: A tuple of RGB values representing the
    generated colors.
    """
    colors = []
    for i in np.arange(0., 360., 360. / num_colors):
        hue = i / 360.
        lightness = (50 + np.random.rand() * 10) / 100.
        saturation = (90 + np.random.rand() * 10) / 100.
        color = colorsys.hls_to_rgb(hue, lightness, saturation)
        color = (int(color[0] * 255), int(color[1] * 255), int(color[2] * 255))
        colors.append(color)
    random.seed(0)
    random.shuffle(colors)
    return colors


def display_video(video: np.array, fps: int=30):
    """
    Create and display temporary video from numpy array frames.

    Parameters:
    - video (np.array): The input video frames.
    format of frames should be: (num_frames, height, width, channels)
    - fps (int): Frames per second for the video playback. Default is 30.
    """
    kargs = { 'macro_block_size': None } # to avoid auto resizing
    imageio.mimwrite('tmp_video_display.mp4',
                     video[:, :, :, ::-1], fps=fps, **kargs)
    display(mvp.ipython_display('tmp_video_display.mp4'))


def display_frame(frame: np.array):
    """
    Display a frame.

    Parameters:
    - frame (np.array): The frame to be displayed.
    """
    cv2_imshow(frame)

def paint_box(video: np.array, track: dict, color=(255, 0, 0)) -> np.array:
    """
    Paint bounding box and label on frames of a video for a given track.

    Parameters:
    - video (np.array): The input video frames.
    - track (dict): The track information containing bounding box and frame information.
    - color (Tuple[int, int, int], optional): The RGB color values for the
    bounding box. Default is red (255, 0, 0).

    Returns:
    - np.array: The modified video frames with painted bounding box and label.
    """
    num_frames, height, width, _ = video.shape
    name = str(track['id']) + ' : ' + track['label']
    bounding_boxes = np.array(track['bounding_boxes']).T

    for box, frame_idx in zip(bounding_boxes, track['frame_ids']):
        frame = np.array(video[frame_idx])
        y1 = int(round(box[2] * height))
        x1 = int(round(box[3] * width))
        y2 = int(round(box[0] * height))
        x2 = int(round(box[1] * width))
        frame = cv2.rectangle(frame, (x1, y1), (x2, y2),
                              color=color, thickness=2)
        frame = cv2.putText(frame, name, (x1, y1 + 20),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.75, color, 2)
        video[frame_idx] = frame

    return video

def paint_boxes(video: np.array, tracks: List[dict]) -> np.array:
    """
    Paint bounding boxes and labels on frames of a video for multiple tracks.

    Parameters:
    - video (np.array): The input video frames.
    - tracks (List[dict]): A list of track information,
    where each track contains bounding box and frame information.

    Returns:
    - np.array: The modified video frames with painted bounding boxes
    and labels.
    """
    for i, track in enumerate(tracks):
        video = paint_box(video, track, COLORS[i])
    return video


def paint_point(video: np.array,
                track: dict, color: tuple = (255, 0, 0)) -> np.array:
    """
    Paints a single tracked point on each frame of a video.

    Args:
        video (np.array): The input video frames.
        track (dict): The track containing frame IDs and corresponding points.
        color (tuple, optional): The color of the painted point.
        Defaults to (255, 0, 0).

    Returns:
        np.array: The video frames with painted points.
    """
    num_frames, height, width, _ = video.shape
    for idx, frame_id in enumerate(track['frame_ids']):
        frame = video[frame_id]
        y = int(round(track['points'][0][idx] * height))
        x = int(round(track['points'][1][idx] * width))
        frame = cv2.circle(frame, (x, y), radius=10, color=color, thickness=-1)
        video[frame_id] = frame
    return video


def paint_points(video: np.array, tracks: List[dict]) -> np.array:
    """
    Paints multiple tracked points on each frame of a video.

    Args:
        video (np.array): The input video frames.
        tracks (List[dict]): The list of tracks containing
        frame IDs and corresponding points.

    Returns:
        np.array: The video frames with painted points.
    """
    for i, track in enumerate(tracks):
        video = paint_point(video, track, COLORS[i])
    return video


def paint_action(video: np.array, action: dict,
                 labelled_frames: np.array,
                 color: tuple = (0, 255, 0)) -> np.array:
    """
    Paints an action label on each frame of a video.

    Args:
        video (np.array): The input video frames.
        action (dict): The action containing the label and frame IDs.
        labelled_frames (np.array): The array to keep track
        of the number of labels on each frame.
        color (tuple, optional): The color of the painted label.
        Defaults to (0, 255, 0).

    Returns:
        np.array: The video frames with painted labels.
    """
    num_frames, height, width, _ = video.shape
    name = 'Action: ' + action['label']
    [start_frame, end_frame] = action['frame_ids']

    for frame_idx in range(start_frame, end_frame):
        frame = np.array(video[frame_idx])
        y1 = int(round(0.9 * height) - (40 * labelled_frames[frame_idx]))
        x1 = int(round(0.05 * width))

        frame = cv2.putText(frame, name, (x1, y1),
                            cv2.FONT_HERSHEY_SIMPLEX, 1.00, color, 2)
        video[frame_idx] = frame
        labelled_frames[frame_idx] += 1

    return video


def paint_actions(video: np.array,
                  actions: List[dict], labelled_frames: np.array) -> np.array:
    """
    Paints multiple action labels on each frame of a video.

    Args:
        video (np.array): The input video frames.
        actions (List[dict]): The list of actions containing
        the labels and frame IDs.
        labelled_frames (np.array): The array to keep track
        of the number of labels on each frame.

    Returns:
        np.array: The video frames with painted labels.
    """
    for i, action in enumerate(actions):
        video = paint_action(video, action, labelled_frames)
    return video


def paint_sound(video: np.array,
                sound: dict, labelled_frames: np.array,
                color: tuple = (0, 0, 255)) -> np.array:
    """
    Paints a sound label on each frame of a video.

    Args:
        video (np.array): The input video frames.
        sound (dict): The sound containing the label,
        frame IDs, and visibility.
        labelled_frames (np.array): The array to keep track of
        the number of labels on each frame.
        color (tuple, optional): The color of the painted label.
        Defaults to (0, 0, 255).

    Returns:
        np.array: The video frames with painted labels.
    """
    num_frames, height, width, _ = video.shape
    name = 'Sound: ' + sound['label'] + ' is_visible: ' + str(bool(sound['is_visible']))
    [start_frame, end_frame] = sound['frame_ids']

    for frame_idx in range(start_frame, end_frame):
        frame = np.array(video[frame_idx])
        y1 = int(round(0.9 * height) - (40 * labelled_frames[frame_idx]))
        x1 = int(round(0.6 * width))

        frame = cv2.putText(frame, name, (x1, y1),
                            cv2.FONT_HERSHEY_SIMPLEX, 1.0, color, 2)
        video[frame_idx] = frame
        labelled_frames[frame_idx] += 1

    return video


def paint_sounds(video: np.array,
                 sounds: List[dict], labelled_frames: np.array) -> np.array:
    """
    Paints multiple sound labels on each frame of a video.

    Args:
        video (np.array): The input video frames.
        sounds (List[dict]): The list of sounds containing the labels,
        frame IDs, and visibility.
        labelled_frames (np.array): The array to keep track of the
        number of labels on each frame.

    Returns:
        np.array: The video frames with painted labels.
    """
    for i, sound in enumerate(sounds):
        video = paint_sound(video, sound, labelled_frames)
    return video


def get_answer_tracks(example_data: dict, goq_ids: List) -> List[dict]:
    """
    Filters and retrieves object tracks from data
    based on the given object ids.

    Args:
        example_data (dict): The data containing object tracking information.
        goq_ids (List): The list of IDs to filter tracks.

    Returns:
        List[dict]: The filtered tracks matching the goq_ids.
    """
    goq_tracks = []
    for track in example_data['object_tracking']:
        if track['id'] in goq_ids:
            goq_tracks.append(track)
    return goq_tracks

In [5]:
#@title Loading Annotations
video_path = './data/videos/'
# audio_path = './data/audios/'

db_json_path = './data/annotations/sample.json'
db_dict = load_db_json(db_json_path)

In [6]:
#@title Show Example Annotations
video_id = list(db_dict.keys())[6]
example_data = db_dict[video_id]

In [None]:
print('---------------------------------------------------------------------')
print('Tasks annotated for this video: ')
for k,v in example_data.items():
    if v:
        print(f'{k} - available: yes - annotations: {len(v)}')
    else:
        print(f'{k} - available: no')
print('---------------------------------------------------------------------')
print('Video Metadata')
print('---------------------------------------------------------------------')
for k,v in example_data['metadata'].items():
    print(f'{k} : {v}')
print('---------------------------------------------------------------------')
print('Object Tracking data')
print('---------------------------------------------------------------------')
for k,v in example_data['object_tracking'][0].items():
    print(f'{k} : {v}')
print('---------------------------------------------------------------------')
print('Multiple-Choice VQA')
print('---------------------------------------------------------------------')
for k,v in example_data['mc_question'][0].items():
    print(f'{k} : {v}')
print('---------------------------------------------------------------------')

In [None]:
#@title Visualising Object Tracks
if example_data['object_tracking']:
    frames = get_video_frames(example_data, video_path)

    COLORS = get_colors(num_colors=100)
    show_all_tracks = True  #@param {type: "boolean"}
    show_track = 2  #@param {type: "integer"}

    if show_all_tracks:
        frames = paint_boxes(frames, example_data['object_tracking'])
    else:
        frames = paint_box(frames, example_data['object_tracking'][show_track])


    annotated_frames = []
    for frame_idx in example_data['object_tracking'][0]['frame_ids']:
        annotated_frames.append(frames[frame_idx])

    annotated_frames = np.array(annotated_frames)
    display_video(annotated_frames, 1)
    del frames # managing RAM in Colab

In [None]:
#@title Visualising Point Tracks
if example_data['point_tracking']:
    frames = get_video_frames(example_data, video_path)
    COLORS = get_colors(num_colors=100)
    frames = paint_points(frames, example_data['point_tracking'])
    display_video(frames, example_data['metadata']['frame_rate'])
    del frames # managing RAM in Colab

In [None]:
#@title Visualising Action & Sound Segments
if example_data['action_localisation']:
    frames = get_video_frames(example_data, video_path)
    labelled_frames = np.zeros(frames.shape[0])
    frames = paint_actions(frames, example_data['action_localisation'], labelled_frames)
    display_video(frames, example_data['metadata']['frame_rate'])
    del frames


In [None]:
#@title Visualising Action & Sound Segments
if example_data['action_localisation']:
    frames = get_video_frames(example_data, video_path)[:,:,:,::-1]

    action_labels = []
    action_start_times = []
    action_end_times = []

    for action in example_data['action_localisation']:
        action_labels.append(action['label'])
        action_start_times.append(action['timestamps'][0]/1e6)
        action_end_times.append(action['timestamps'][1]/1e6)

    action_start_times = np.array(action_start_times)
    action_end_times = np.array(action_end_times)

    plt.figure(figsize=(20, 15))
    # Strip of frames
    plt.subplot(4,1,2)
    plt.title("Video Frames")
    f_size = frames[0].shape
    small = tuple(reversed((np.array(f_size[:2]) / 4).astype(int)))
    strip = None
    num_frames = example_data['metadata']['num_frames']
    for i in range(0, num_frames, int(num_frames/4)):
        frame = cv2.resize(frames[i], small)
        if strip is None:
            strip = np.array(frame)
        else:
            strip = np.concatenate([strip, frame], axis=1)
        plt.imshow(strip)

    del frames

    plt.subplot(4,1,3)
    plt.title("Action Events")
    plt.barh(range(len(action_start_times)),
            action_end_times-action_start_times,
            left=action_start_times)
    plt.yticks(range(len(action_start_times)), action_labels)

    plt.show()

In [None]:
#@title Visualising Action & Sound Segments
if example_data['sound_localisation']:
    frames = get_video_frames(example_data, video_path)
    labelled_frames = np.zeros(frames.shape[0])
    frames = paint_sounds(frames, example_data['sound_localisation'], labelled_frames)
    display_video(frames, example_data['metadata']['frame_rate'])
    del frames

In [None]:
#@title Visualising Action & Sound Segments
if example_data['sound_localisation']:
    frames = get_video_frames(example_data, video_path)[:,:,:,::-1]

    audio_labels = []
    audio_start_times = []
    audio_end_times = []
    for audio_event in example_data['sound_localisation']:
        audio_labels.append(audio_event['label'])
        audio_start_times.append(audio_event['timestamps'][0]/1e6)
        audio_end_times.append(audio_event['timestamps'][1]/1e6)

    audio_start_times = np.array(audio_start_times)
    audio_end_times = np.array(audio_end_times)

    plt.figure(figsize=(20, 15))
    # Strip of frames
    plt.subplot(4,1,2)
    plt.title("Video Frames")
    f_size = frames[0].shape
    small = tuple(reversed((np.array(f_size[:2]) / 4).astype(int)))
    strip = None
    num_frames = example_data['metadata']['num_frames']
    for i in range(0, num_frames, int(num_frames/4)):
        frame = cv2.resize(frames[i], small)
        if strip is None:
            strip = np.array(frame)
        else:
            strip = np.concatenate([strip, frame], axis=1)
        plt.imshow(strip)

    del frames

    # Plot audio events
    plt.subplot(4,1,3)
    plt.title("Audio Events")
    plt.barh(range(len(audio_start_times)),
            audio_end_times-audio_start_times,
            left=audio_start_times)
    plt.yticks(range(len(audio_start_times)), audio_labels)

    plt.show()


In [None]:
#@title Visualising Multiple-Choice Video Question-Answering
if example_data['mc_question']:
    for question in example_data['mc_question']:
        print('---------------------------------')
        print('Question: ', question['question'])
        print('Options: ', question['options'])
        print('Correct Answer ID: ', question['answer_id'], ' - ', question['options'][question['answer_id']])
        print('Question info: ')
        print('Reasoning: ', question['reasoning'])
        print('Tag: ', question['tag'])
        print('area: ', question['area'])
        print('---------------------------------')

In [None]:
#@title Visualising Grounded Video Question-Answering

# loading and example that has grounded question annotations
video_id = list(db_dict.keys())[7]
example_data = db_dict[video_id]

# visualising grounded question annotations
if example_data['grounded_question']:
    question = example_data['grounded_question'][0]
    print('---------------------------------')
    print('Question: ', question['question'])
    print('Answer IDs: ', question['answers'])
    print('Question info: ')
    print('Reasoning: ', question['reasoning'])
    print('area: ', question['area'])
    print('---------------------------------')

    frames = get_video_frames(example_data, video_path)
    answer_tracks = get_answer_tracks(example_data, question['answers'])
    frames = paint_boxes(frames, answer_tracks)

    annotated_frames = []
    for frame_idx in answer_tracks[0]['frame_ids']:
        annotated_frames.append(frames[frame_idx])

    annotated_frames = np.array(annotated_frames)
    display_video(annotated_frames, 1)
    del frames # managing RAM in Colab