In [None]:
import os

os.environ["CHECKPOINTS_PATH"] = "../checkpoints"
os.environ["TRACKING_RESULTS_PATH"] = "data/processed_tracking_results"

import itertools
import shutil
from pathlib import Path

import matplotlib.pyplot as plt
import pandas as pd
from src.api.models.pydantic import SimRoomClassDTO
from src.config import UNKNOWN_CLASS_ID
from ultralytics import YOLO

from experiment.prediction_utils import (
    calculate_metrics,
    create_confusion_matrix,
    evaluate_predictions,
    render_confusion_matrix,
    update_confusion_matrix,
)
from experiment.settings import (
    CLASS_ID_TO_NAME,
    FINAL_PREDICTIONS_PATH,
    FULLY_LABELED_RECORDINGS,
    GAZE_SEGMENTATION_RESULTS_PATH,
    LABELING_REC_DIFF_BACKGROUND_ID,
    LABELING_REC_SAME_BACKGROUND_ID,
    OBJECT_DATASETS_PATH,
    TRAINING_DATASETS_PATH,
    YOLO_MODELS_PATH,
    RECORDING_FRAMES_PATH,
    SIMROOM_ID,
    LABELING_VALIDATION_VIDEOS_PATH,
    RECORDINGS_PATH,
    PROCESSED_TRACKING_RESULTS_PATH,
)
from src.api.db import Session, engine
from src.api.repositories import simrooms_repo
from src.api.services import simrooms_service
import cv2
import numpy as np
from src.config import TOBII_GLASSES_FPS
import torch
from torchvision.ops import masks_to_boxes
from tqdm import tqdm

%matplotlib inline

In [None]:
def get_tracking_results_per_class(session: Session, labeling_recording_id: str):
    calibration_id = simrooms_repo.get_calibration_recording(
        session, simroom_id=SIMROOM_ID, recording_id=labeling_recording_id
    ).id
    tracked_classes = simrooms_repo.get_tracked_classes(session, calibration_id)

    if len(tracked_classes) != 15:
        raise ValueError(f"Expected 15 tracked classes but got {len(tracked_classes)}")

    tracking_results_per_class = {
        tracked_class.id: simrooms_repo.get_tracking_result_paths(
            session, calibration_id, tracked_class.id
        )
        for tracked_class in tracked_classes
    }

    return tracking_results_per_class, tracked_classes

# Draw graphs of Mask and Bounding Box sizes

In [None]:
def validate_tracking_results(session: Session, labeling_recording_id: str):
    tracking_results_per_class, tracked_classes = get_tracking_results_per_class(
        session, labeling_recording_id
    )

    for tracked_class in tracked_classes:
        class_id = tracked_class.id
        tracking_results = tracking_results_per_class[class_id]
        tracking_results = sorted(tracking_results, key=lambda x: int(x.stem))

        last_frame_idx = int(tracking_results[-1].stem)
        mask_size_per_frame = {frame_idx: None for frame_idx in range(last_frame_idx + 1)}
        box_size_per_frame = {frame_idx: None for frame_idx in range(last_frame_idx + 1)}

        for tracking_result in tracking_results:
            frame_idx = int(tracking_result.stem)
            result = np.load(tracking_result)
            mask_size = np.sum(result["mask"])
            x1, y1, x2, y2 = result["box"]
            box_size = (x2 - x1) * (y2 - y1)
            mask_size_per_frame[frame_idx] = mask_size
            box_size_per_frame[frame_idx] = box_size

        plot_frames = []
        plot_mask_sizes = []
        plot_box_sizes = []

        for frame_idx in range(last_frame_idx + 1):
            mask_size = mask_size_per_frame[frame_idx]
            box_size = box_size_per_frame[frame_idx]
            if mask_size is not None:
                plot_frames.append(frame_idx)
                plot_mask_sizes.append(mask_size)
                plot_box_sizes.append(box_size)

        if not plot_frames:
            continue

        plt.figure(figsize=(15, 6))
        plt.plot(plot_frames, plot_mask_sizes, "o", color=tracked_class.color)
        plt.xlabel("Frame Index")
        plt.ylabel("Mask Size")
        plt.title(f"Mask Size per Frame for Class: {CLASS_ID_TO_NAME[class_id]}")
        plt.grid(True)
        plt.show()

        plt.figure(figsize=(15, 6))
        plt.plot(plot_frames, plot_box_sizes, "o", color=tracked_class.color)
        plt.xlabel("Frame Index")
        plt.ylabel("Box Size")
        plt.title(f"Box Size per Frame for Class: {CLASS_ID_TO_NAME[class_id]}")
        plt.grid(True)
        plt.show()

In [None]:
with Session(engine) as session:
    validate_tracking_results(session, LABELING_REC_SAME_BACKGROUND_ID)

# Create Validation Videos

In [None]:
import subprocess
from src.utils import extract_frames_to_dir
from tqdm import tqdm
from src.api.utils import image_utils
import tempfile
from torchvision.ops import masks_to_boxes
import torch

In [None]:
def draw_validation_video_frames(
    frames: list[Path],
    annotations_per_frame: dict[int, list[Path]],
    tracked_classes: list[SimRoomClassDTO],
):
    class_id_to_annotated_class = {
        anno_class.id: anno_class for anno_class in tracked_classes
    }

    # Iterate over frames and draw the annotations on them if they exist
    for frame in tqdm(frames, desc="Drawing annotations on frames"):
        frame_idx = int(frame.stem)
        frame_img = cv2.imread(str(frame))

        if annotations_per_frame.get(frame_idx) is not None:
            for annotation_path in annotations_per_frame[frame_idx]:
                annotation_file = np.load(annotation_path)
                class_id = int(annotation_file["class_id"])
                x1, y1, x2, y2 = annotation_file["box"]
                mask = annotation_file["mask"]

                # TEST
                eroded_mask = cv2.erode(
                    mask[0].astype(np.uint8), np.ones((3, 3), np.uint8), iterations=1
                )
                mask = cv2.dilate(eroded_mask, np.ones((3, 3), np.uint8), iterations=1)
                mask_torch = torch.from_numpy(mask).unsqueeze(0)

                # create empty 1920x1080 tensor
                full_mask = torch.zeros((1, 1080, 1920), dtype=torch.uint8)
                full_mask[:, y1:y2, x1:x2] = mask_torch

                boxes = masks_to_boxes(full_mask)
                x1, y1, x2, y2 = boxes[0].int().tolist()
                mask = full_mask[0, y1:y2, x1:x2].numpy()
                # TEST

                # Squeeze mask if it has an extra dimension
                if mask.ndim == 3 and mask.shape[0] == 1:
                    mask = mask[0]
                if mask.dtype != bool:
                    mask = mask.astype(bool)

                color = class_id_to_annotated_class[class_id].color
                class_name = class_id_to_annotated_class[class_id].class_name
                box = (x1, y1, x2, y2)

                frame_img = image_utils.draw_mask(frame_img, mask, box)
                frame_img = image_utils.draw_labeled_box(
                    frame_img, box, class_name, color
                )

        # Save the modified image back to its original location
        cv2.imwrite(str(frame), frame_img)

In [None]:
def create_validation_video(
    class_tracking_results: list[Path],
    tracked_class: SimRoomClassDTO,
    labeling_recording_id: str,
    result_videos_path: Path,
):
    # Extract frames from the video and save them to a temporary directory
    print(f"Extracting frames for {labeling_recording_id}")
    video_path = RECORDINGS_PATH / f"{labeling_recording_id}.mp4"
    tmp_frames_dir = tempfile.TemporaryDirectory()
    tmp_frames_path = Path(tmp_frames_dir.name)
    extract_frames_to_dir(
        video_path=video_path,
        frames_path=tmp_frames_path,
        print_output=False,
    )
    frames = sorted(list(tmp_frames_path.glob("*.jpg")), key=lambda x: int(x.stem))

    # Gather annotations for each frame
    print(f"Gathering annotations for {labeling_recording_id}")
    annotations_per_frame: dict[int, list[Path]] = {}
    for tracking_result in class_tracking_results:
        frame_idx = int(tracking_result.stem)
        if frame_idx not in annotations_per_frame:
            annotations_per_frame[frame_idx] = []

        annotations_per_frame[frame_idx].append(tracking_result)

    print(f"Drawing annotations for {labeling_recording_id}")
    draw_validation_video_frames(
        frames=frames,
        annotations_per_frame=annotations_per_frame,
        tracked_classes=[tracked_class],
    )

    print(f"Creating video for {labeling_recording_id}")
    cmd = f'ffmpeg -hwaccel cuda -y -pattern_type glob -framerate {TOBII_GLASSES_FPS} -i "{str(tmp_frames_path)!s}/*.jpg" -c:v libx264 -pix_fmt yuv420p "{result_videos_path}/{CLASS_ID_TO_NAME[tracked_class.id]}.mp4"'
    subprocess.run(cmd, shell=True)


def create_validation_videos(session, labeling_recording_id, selected_class_ids):
    result_videos_path = LABELING_VALIDATION_VIDEOS_PATH / labeling_recording_id
    result_videos_path.mkdir(parents=True, exist_ok=True)

    calibration_id = simrooms_repo.get_calibration_recording(
        session, simroom_id=SIMROOM_ID, recording_id=labeling_recording_id
    ).id
    tracked_classes = simrooms_repo.get_tracked_classes(session, calibration_id)

    tracking_results_per_class, _ = get_tracking_results_per_class(
        session, labeling_recording_id
    )

    for tracked_class in tracked_classes:
        class_id = tracked_class.id
        if class_id not in selected_class_ids:
            continue

        class_tracking_results = tracking_results_per_class[class_id]

        create_validation_video(
            class_tracking_results=class_tracking_results,
            tracked_class=tracked_class,
            labeling_recording_id=labeling_recording_id,
            result_videos_path=result_videos_path,
        )

In [None]:
selected_class_ids = [10]

with Session(engine) as session:
    create_validation_videos(session, LABELING_REC_SAME_BACKGROUND_ID, selected_class_ids)
    # create_validation_videos(session, LABELING_REC_DIFF_BACKGROUND_ID, selected_class_ids)