In [26]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [27]:
import sys

sys.path.append("../")

import os
import cv2
import numpy as np
import pandas as pd
import seaborn as sns
from math import ceil
from tqdm import tqdm
from typing import Dict
from pathlib import Path
import concurrent.futures
from justpfm import justpfm
import multiprocessing as mp
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
from scipy.optimize import curve_fit
from skimage.measure import shannon_entropy

os.environ["KMP_DUPLICATE_LIB_OK"] = "True"
os.environ["OMP_NUM_THREADS"] = "1"
tqdm.pandas()

from src.utils.gaze_data import get_gaze_data
from src.utils.file import get_files_recursive, get_ids_from_file_path, get_set_str
from src.config import (
    SALIENCY_MAP_PFM_PATH, 
    RAW_GAZE_FRAME_WIDTH,
    RAW_GAZE_FRAME_HEIGHT,
    IMAGE_WIDTH,
    IMAGE_HEIGHT,
    DEPTH_MAP_PFM_PATH,
    DEPTH_SEG_PFM_PATH,
    SETS_PATH,
)

FRAME_STEP = 5

N_NSEC_IN_SEC = 1e9
FPS = 25
SEED = 0

## Get Fixation Data

In [None]:
def get_fixation_data(
    frame_step: int = FRAME_STEP,
) -> pd.DataFrame:
    """
    Get fixation data for videos.

    Returns:
        pd.DataFrame: Fixation data for videos
    """
    fixation_data_1 = get_gaze_data(
        experiment_ids=[1],
        set_ids=[0],
        fixation=True,
    )
    fixation_data_2 = get_gaze_data(
        experiment_ids=[2],
        fixation=True,
    )
    fixation_data = pd.concat([fixation_data_1, fixation_data_2])

    # Sort data and add Fixation Id
    fixation_data = fixation_data.sort_values(by=["SequenceId", "StartTimestamp_ns"])
    fixation_data["FixationId"] = np.arange(len(fixation_data))

    # Scale fixation coordinates
    fixation_data["X_px"] = fixation_data["X_px"] * IMAGE_WIDTH / RAW_GAZE_FRAME_WIDTH
    fixation_data["Y_px"] = fixation_data["Y_px"] * IMAGE_HEIGHT / RAW_GAZE_FRAME_HEIGHT

    # Get Frame Id per step
    fixation_data["FrameId"] = (fixation_data["TimeSinceStart_ns"] / N_NSEC_IN_SEC * FPS).astype(int)
    fixation_data["FrameId"] = (fixation_data["FrameId"] / frame_step).astype(int) * frame_step

    fixation_data = fixation_data[[
        "FixationId",
        "ExperimentId",
        "SetId",
        "SequenceId",
        "FrameId",
        "X_px",
        "Y_px",
    ]]

    return fixation_data

fixation_data = get_fixation_data()
fixation_data

## Get Null Fixation Data

In [13]:
def get_global_saliency_map() -> np.ndarray:
    """
    Get the global saliency map by merging all saliency maps

    Returns:
        np.ndarray: global saliency map
    """
    saliency_file_paths = Path(SALIENCY_MAP_PFM_PATH).rglob("*.pfm")
    saliency_file_paths = sorted(saliency_file_paths)

    global_saliency_map = None
    for saliency_file_path in tqdm(
        saliency_file_paths, desc="⌛ Loading saliency maps..."
    ):
        # Load map and merge
        saliency_map = justpfm.read_pfm(saliency_file_path)
        if global_saliency_map is None:
            global_saliency_map = saliency_map
        else:
            global_saliency_map += saliency_map

    # To unit distribution
    global_saliency_map /= global_saliency_map.sum()
    global_saliency_map = np.squeeze(global_saliency_map)

    return global_saliency_map


def gaussian_2d(
    x: np.ndarray,
    y: np.ndarray,
    x0: float,
    y0: float,
    sigma_x: float,
    sigma_y: float,
    A: float,
) -> np.ndarray:
    """
    A 2D Gaussian function

    Args:
        x (np.ndarray): x values
        y (np.ndarray): y values
        x0 (float): x center
        y0 (float): y center
        sigma_x (float): x standard deviation
        sigma_y (float): y standard deviation
        A (float): amplitude
    """

    return A * np.exp(
        -((x - x0) ** 2 / (2 * sigma_x**2) + (y - y0) ** 2 / (2 * sigma_y**2))
    )


def fit_gaussian(
    data: np.ndarray,
    width: int,
    height: int,
) -> np.ndarray:
    """
    Fit a 2D Gaussian to the given data

    Args:
        data (np.ndarray): 2D data to fit
        width (int): width of the data
        height (int): height of the data

    Returns:
        np.ndarray: fitted Gaussian
    """
    # Create a 2D grid
    x, y = np.meshgrid(np.arange(width), np.arange(height))

    # Flatten the arrays to fit with curve_fit
    x_data = np.vstack((x.ravel(), y.ravel()))
    z_data = data.ravel()

    # Start with an initial guess for (x0, y0, sigma_x, sigma_y, A)
    initial_guess = (width // 2, height // 2, 20, 30, 1)

    # Fit the Gaussian model to the data
    popt, _ = curve_fit(
        lambda xdata, x0, y0, sigma_x, sigma_y, A: gaussian_2d(
            xdata[0], xdata[1], x0, y0, sigma_x, sigma_y, A
        ),
        x_data,
        z_data,
        p0=initial_guess,
    )

    # Get the optimized parameters
    x0, y0, sigma_x, sigma_y, A = popt
    print(
        f"Optimized parameters: x0={x0}, y0={y0}, sigma_x={sigma_x}, sigma_y={sigma_y}, A={A}"
    )

    # Generate the fitted Gaussian using the optimized parameters
    fitted_gaussian = gaussian_2d(x, y, x0, y0, sigma_x, sigma_y, A)

    return fitted_gaussian

In [None]:
global_saliency_map = get_global_saliency_map()
fitted_gaussian = fit_gaussian(
    data=global_saliency_map,
    width=global_saliency_map.shape[1],
    height=global_saliency_map.shape[0],
)

plt.figure(figsize=(10, 4))
plt.subplot(1, 2, 1)
plt.imshow(global_saliency_map, cmap="jet")
plt.colorbar(label="Saliency map")
plt.title("Global saliency map")
plt.subplot(1, 2, 2)
plt.imshow(fitted_gaussian, cmap="jet")
plt.colorbar(label="Saliency map")
plt.title("Fitted gaussian distribution")
plt.tight_layout()
plt.show()

In [15]:
def get_null_fixation_data(fixation_data: pd.DataFrame, fitted_gaussian: np.ndarray) -> pd.DataFrame:
    """
    Get a null fixation data by sampling from a fitted Gaussian distribution

    Args:
        fixation_data (pd.Dataframe): The fixation data
        fitted_gaussian (np.ndarray): The fitted Gaussian distribution

    Returns:
        pd.DataFrame: The null fixation data
    """
    # Get the x and y coordinates
    x, y = np.meshgrid(np.arange(fitted_gaussian.shape[1]), np.arange(fitted_gaussian.shape[0]))

    # Flatten the arrays
    x = x.ravel()
    y = y.ravel()
    z = fitted_gaussian.ravel()

    # Normalize the z values
    z /= z.sum()

    # Sample from the fitted Gaussian distribution
    sampled_indices = np.random.choice(np.arange(len(x)), size=len(fixation_data), p=z)
    sampled_x = x[sampled_indices]
    sampled_y = y[sampled_indices]

    # Create a new DataFrame
    null_fixation_data = fixation_data.copy()
    null_fixation_data["X_px"] = sampled_x
    null_fixation_data["Y_px"] = sampled_y

    return null_fixation_data

null_fixation_data = get_null_fixation_data(fixation_data=fixation_data, fitted_gaussian=fitted_gaussian)

In [None]:
subsampled_null_fixation_data = null_fixation_data.sample(n=10_000, random_state=SEED)
plt.figure(figsize=(20, 6))
plt.imshow(fitted_gaussian, cmap="magma")
plt.colorbar(label="Saliency map")
plt.title("Fitted gaussian distribution")
plt.scatter(subsampled_null_fixation_data["X_px"], subsampled_null_fixation_data["Y_px"], color="green", s=2)
plt.axis("off")
plt.tight_layout()
plt.show()

## Get Feature Data

In [17]:
def get_feature_data(
    folder_path: str, 
    file_extension: str,
) -> Dict[int, Dict[int, Dict[int, np.ndarray]]]:
    file_paths = get_files_recursive(folder_path, f"*.{file_extension}")
    feature_data = {}
    for file_path in file_paths:
        # Get ids and create empty dict
        experiment_id, set_id, sequence_id = get_ids_from_file_path(file_path)
        if experiment_id not in feature_data:
            feature_data[experiment_id] = {}
        if set_id not in feature_data[experiment_id]:
            feature_data[experiment_id][set_id] = {}
        if sequence_id not in feature_data[experiment_id][set_id]:
            feature_data[experiment_id][set_id][sequence_id] = {}

        # Load data and add to dict
        data = justpfm.read_pfm(file_path)
        feature_data[experiment_id][set_id][sequence_id] = data

    return feature_data


def get_feature_value(
    data: Dict[int, Dict[int, Dict[int, np.ndarray]]] | np.ndarray,
    row: pd.Series,
    transform: callable = None,
    r: int = 5,
) -> float:
    # Retrieve image
    if isinstance(data, Dict):
        experiment_id = row["ExperimentId"]
        set_id = row["SetId"]
        sequence_id = row["SequenceId"]
        data = data[experiment_id][set_id][sequence_id]

    # Get the shape and define the region of interest
    x = row["X_px"]
    y = row["Y_px"]
    height, width = data.shape[:2]
    x_min = int(max(0, x - r))
    x_max = int(min(width, x + r + 1))
    y_min = int(max(0, y - r))
    y_max = int(min(height, y + r + 1))

    # Apply transform if available
    if transform is not None:
        data = transform(data)

    # Extract the region of interest and calculate the mean
    roi = data[y_min:y_max, x_min:x_max]
    value = roi.mean()

    return value


def get_brightness(image: np.ndarray) -> float:
    brightness = np.mean(image, axis=2)[..., np.newaxis]
    return brightness


def get_colorfulness(image: np.ndarray) -> np.ndarray:
    rg = np.abs(image[:, :, 0] - image[:, :, 1])
    yb = np.abs(0.5 * (image[:, :, 0] + image[:, :, 1]) - image[:, :, 2])
    colorfulness_map = np.sqrt(rg ** 2 + yb ** 2)
    return colorfulness_map


def get_saturation(image: np.ndarray) -> np.ndarray:
    hsv_image = mcolors.rgb_to_hsv(image / 255.0)
    saturation_map = hsv_image[:, :, 1]
    return saturation_map


def get_hue(image: np.ndarray) -> np.ndarray:
    hsv_image = mcolors.rgb_to_hsv(image / 255.0)
    hue_map = hsv_image[:, :, 0]
    return hue_map


def get_gradient_magnitude(image: np.ndarray) -> np.ndarray:
    grayscale_image = np.mean(image, axis=2).astype(np.uint8)
    gradient_x = cv2.Sobel(grayscale_image, cv2.CV_64F, 1, 0, ksize=3)
    gradient_y = cv2.Sobel(grayscale_image, cv2.CV_64F, 0, 1, ksize=3)
    gradient_magnitude = np.sqrt(gradient_x**2 + gradient_y**2)
    return gradient_magnitude


def get_local_contrast(image: np.ndarray, patch_size: int = 10) -> np.ndarray:
    grayscale_image = np.mean(image, axis=2)
    kernel = np.ones((patch_size, patch_size)) / (patch_size ** 2)
    local_mean = cv2.filter2D(grayscale_image, -1, kernel)
    local_contrast = np.sqrt((grayscale_image - local_mean) ** 2)
    return local_contrast


def get_local_sharpness(image: np.ndarray, patch_size: int = 10) -> np.ndarray:
    grayscale_image = np.mean(image, axis=2).astype(np.uint8)
    laplacian = cv2.Laplacian(grayscale_image, cv2.CV_64F)
    kernel = np.ones((patch_size, patch_size)) / (patch_size ** 2)
    local_sharpness = cv2.filter2D(np.abs(laplacian), -1, kernel)
    return local_sharpness

def get_local_entropy(image: np.ndarray, patch_size: int = 10) -> np.ndarray:
    grayscale_image = np.mean(image, axis=2).astype(np.uint8)
    entropy_map = np.zeros(grayscale_image.shape)
    for i in range(0, grayscale_image.shape[0], patch_size):
        for j in range(0, grayscale_image.shape[1], patch_size):
            patch = grayscale_image[i:i+patch_size, j:j+patch_size]
            entropy_map[i:i+patch_size, j:j+patch_size] = shannon_entropy(patch)
    return entropy_map

In [None]:
def process_frame(group, video):
    # Get the frame data
    frame_id = group["FrameId"].values[0]
    video.set(cv2.CAP_PROP_POS_FRAMES, frame_id)
    ret, frame = video.read()
    if not ret:
        raise ValueError(f"❌ Frame {frame_id} could not be read.")

    # Apply the feature extraction for all fixations in this frame
    group["Brightness"] = group.apply(
        lambda x: get_feature_value(data=frame, row=x, transform=get_brightness), axis=1
    )
    
    return group


def process_video(group, sets_path):
    # Get group ids and video
    experiment_id = group["ExperimentId"].values[0]
    set_id = group["SetId"].values[0]
    sequence_id = group["SequenceId"].values[0]
    set_str = get_set_str(experiment_id=experiment_id, set_id=set_id)
    video_path = f"{sets_path}/experiment{experiment_id}/{set_str}/scene{sequence_id:02}.mp4"
    video = cv2.VideoCapture(video_path)

    # Process frames
    grouped_data = group.groupby("FrameId")
    frame_rows = [row for _, row in grouped_data]
    results = []
    with concurrent.futures.ThreadPoolExecutor() as executor:
        with tqdm(total=len(frame_rows), desc=f"Processing video {sequence_id}") as bar:
            futures = [executor.submit(process_frame, row, video) for row in frame_rows]
            for future in concurrent.futures.as_completed(futures):
                results.append(future.result())
                bar.update(1)

    video.release()

    return pd.concat(results)


def get_fixation_features(data: pd.DataFrame, sets_path: str) -> pd.DataFrame:
    # Get depth features
    bar = tqdm(total=2, desc="⌛ Loading depth features...")
    depth_maps = get_feature_data(DEPTH_MAP_PFM_PATH, "pfm")
    bar.update(1)
    depth_segs = get_feature_data(DEPTH_SEG_PFM_PATH, "pfm")
    bar.update(1)
    bar.close()

    data["Depth"] = data.progress_apply(
        lambda x: get_feature_value(data=depth_maps, row=x),
        axis=1,
    )
    data["DepthSeg"] = data.progress_apply(
        lambda x: get_feature_value(data=depth_segs, row=x),
        axis=1,
    )

    # Group by experiment, set, and sequence
    grouped_data = data.groupby(["ExperimentId", "SetId", "SequenceId"])
    results = grouped_data.apply(lambda group: process_video(group, sets_path))
    results = results.reset_index(drop=True)

    return results

# Example usage
fixation_data = get_fixation_features(fixation_data, SETS_PATH)
null_fixation_data = get_fixation_features(null_fixation_data, SETS_PATH)

⌛ Loading depth features...: 100%|██████████| 2/2 [00:00<00:00,  8.15it/s]
100%|██████████| 337638/337638 [00:04<00:00, 69636.50it/s]
100%|██████████| 337638/337638 [00:04<00:00, 70513.13it/s]
Processing video 1:   0%|          | 0/585 [00:00<?, ?it/s]

In [None]:
def process_frame(group, video):
    # Get the frame data
    frame_id = group["FrameId"].values[0]
    video.set(cv2.CAP_PROP_POS_FRAMES, frame_id)
    ret, frame = video.read()
    if not ret:
        raise ValueError(f"❌ Frame {frame_id} could not be read.")

    # Apply the feature extraction for all fixations in this frame
    group["Brightness"] = group.apply(
        lambda x: get_feature_value(data=frame, row=x, transform=get_brightness), axis=1
    )
    
    return group


def process_video(group, sets_path):
    # Get group ids and video
    experiment_id = group["ExperimentId"].values[0]
    set_id = group["SetId"].values[0]
    sequence_id = group["SequenceId"].values[0]
    set_str = get_set_str(experiment_id=experiment_id, set_id=set_id)
    video_path = f"{sets_path}/experiment{experiment_id}/{set_str}/scene{sequence_id:02}.mp4"
    video = cv2.VideoCapture(video_path)

    # Process frames
    grouped_data = group.groupby("FrameId")
    results = grouped_data.progress_apply(process_frame, video=video)

    video.release()

    return pd.concat(results)


def get_fixation_features(data: pd.DataFrame, sets_path: str) -> pd.DataFrame:
    # Get depth features
    bar = tqdm(total=2, desc="⌛ Loading depth features...")
    depth_maps = get_feature_data(DEPTH_MAP_PFM_PATH, "pfm")
    bar.update(1)
    depth_segs = get_feature_data(DEPTH_SEG_PFM_PATH, "pfm")
    bar.update(1)
    bar.close()

    data["Depth"] = data.progress_apply(
        lambda x: get_feature_value(data=depth_maps, row=x),
        axis=1,
    )
    data["DepthSeg"] = data.progress_apply(
        lambda x: get_feature_value(data=depth_segs, row=x),
        axis=1,
    )

    # Group by experiment, set, and sequence
    grouped_data = data.groupby(["ExperimentId", "SetId", "SequenceId"])
    results = grouped_data.apply(lambda group: process_video(group, sets_path))
    results = results.reset_index(drop=True)

    return results

# Example usage
fixation_data = get_fixation_features(fixation_data, SETS_PATH)
null_fixation_data = get_fixation_features(null_fixation_data, SETS_PATH)

In [None]:
display(fixation_data)
display(null_fixation_data)

In [None]:
def plot_feature_boxplots(fixation_data: pd.DataFrame, null_fixation_data: pd.DataFrame, features: list):
    """
    Generate box plots for the specified features, comparing actual and null fixation data.

    Parameters:
        fixation_data (pd.DataFrame): DataFrame containing the actual fixation features.
        null_fixation_data (pd.DataFrame): DataFrame containing the null fixation features.
        features (list): List of feature names to plot.
    """
    # Add a column to differentiate the actual and null data
    fixation_data["Type"] = "Actual Fixations"
    null_fixation_data["Type"] = "Null Fixations"
    
    # Combine both datasets
    combined_data = pd.concat([fixation_data, null_fixation_data])

    for feature in features:
        plt.figure(figsize=(10, 6))
        sns.boxplot(x="Type", y=feature, data=combined_data)
        plt.title(f'Box Plot of {feature} - Actual vs Null Fixations')
        plt.ylabel(f'{feature}')
        plt.tight_layout()
        plt.show()

features_to_plot = [
    "Depth", "DepthSeg", "Brightness", "Colorfulness", "Saturation",
    "Hue", "GradientMagnitude", "LocalContrast", "LocalSharpness", "LocalEntropy"
]

# Plot box plots for each feature comparing actual and null fixation data
plot_feature_boxplots(fixation_data, null_fixation_data, features_to_plot)