In [1]:
from pathlib import Path
import os
import glob
import numpy as np
import quaternion
import pandas as pd
from parse import parse
from Salient360Toolbox import helper
from Salient360Toolbox.generation import scanpath as scanp_generate
from Salient360Toolbox.generation import saliency as sal_generate
from natsort import natsorted
from tqdm.notebook import tqdm_notebook
from joblib import Parallel, delayed


In [2]:
def get_file_paths(directory, ext):
    # Use glob to get all files in the directory based on extensions
    files = glob.glob(os.path.join(directory, ext))
    
    # Natural sort
    files = natsorted(files)
    
    return files

In [3]:
def parse_filename(filename):
    # Extract the base filename without the directory path
    base_filename = os.path.basename(filename)
    
    # Remove the file extension
    base_filename = os.path.splitext(base_filename)[0]
    
    # Split the base filename by hyphens
    parts = base_filename.split('-')
    
    # Ensure we have exactly 11 parts to match the format
    if len(parts) != 11:
        print(f"Filename format is incorrect: {filename}")
        return None
    
    # Assign parts to respective variables based on the format
    user_name = parts[0]
    session_number = parts[1]
    environment_number = parts[2]
    world_rotation = parts[3]
    context_factor = parts[4]
    year = parts[5]
    month = parts[6]
    day = parts[7]
    hour = parts[8]
    minute = parts[9]
    second = parts[10]
    
    # Create a dictionary to hold the parsed values
    parsed_values = {
        'userName': user_name,
        'session_number': session_number,
        'environment_number': environment_number,
        'world_rotation': world_rotation,
        'context_factor': context_factor,
        'year': year,
        'month': month,
        'day': day,
        'hour': hour,
        'minute': minute,
        'second': second
    }
    
    return parsed_values

In [4]:
def process_head_rotations(df, world_rotation):
    """Process head rotation data from DataFrame."""
    head_rotations = df["HMDRotation"]
    parsed_headrotations = [parse("({x}, {y}, {z}, {w})", hr).named for hr in head_rotations]
    df_headrotations = pd.DataFrame(pd.DataFrame(parsed_headrotations).astype(float), 
                                  columns=["x", "y", "z", "w"])
    
    # Reorder columns and apply rotation correction
    cols = list(df_headrotations.columns)
    cols = [cols[-1]] + cols[:-1]
    df_headrotations = df_headrotations[cols]
    
    qs_hr = quaternion.as_quat_array(df_headrotations)
    rotation_correction = quaternion.from_euler_angles([
        np.deg2rad(0), np.deg2rad(world_rotation), np.deg2rad(0)
    ])
    df_headrotations_corrected = pd.DataFrame(
        quaternion.as_float_array(qs_hr * rotation_correction)
    )
    
    # Add corrected head rotation columns to main DataFrame
    df["xhead"] = df_headrotations_corrected[1]
    df["yhead"] = df_headrotations_corrected[2]
    df["zhead"] = df_headrotations_corrected[3]
    df["whead"] = df_headrotations_corrected[0]
    
    return df

In [5]:
def process_eye_directions(df):
    """Process eye direction data for both eyes and combined gaze."""
    # Process right eye
    reye_directions = df["RightEyeForward"]
    parsed_reyedirections = [parse("({x}, {y}, {z})", red).named for red in reye_directions]
    df_reyedirections = pd.DataFrame(pd.DataFrame(parsed_reyedirections).astype(float),
                                   columns=["x", "y", "z"])
    
    # Process left eye
    leye_directions = df["LeftEyeForward"]
    parsed_leyedirections = [parse("({x}, {y}, {z})", led).named for led in leye_directions]
    df_leyedirections = pd.DataFrame(pd.DataFrame(parsed_leyedirections).astype(float),
                                   columns=["x", "y", "z"])
    
    # Process combined gaze
    beye_directions = df["CombinedGazeForward"]
    parsed_beyedirections = [parse("({x}, {y}, {z})", bed).named for bed in beye_directions]
    df_beyedirections = pd.DataFrame(pd.DataFrame(parsed_beyedirections).astype(float),
                                   columns=["x", "y", "z"])
    
    # Add eye direction columns to main DataFrame
    for prefix, df_directions in [
        ("rightgaze", df_reyedirections),
        ("leftgaze", df_leyedirections),
        ("meangazedir", df_beyedirections)
    ]:
        for axis in ["x", "y", "z"]:
            df[f"{prefix}{axis}"] = df_directions[axis]
    
    return df

In [6]:

def generate_visualization_maps(fix_list, dimension, savename, savepath, path_stim):
    """Generate and save various visualization maps."""
    # Generate saliency map
    sal_map = helper.getSaliencyMap(
        fix_list[:, [2,3,4,0,1]],
        dimension,
        name=savename,
        path_save=savepath,
        gauss_sigma=GAUSS_SIGMA,
        force_return_data=True,
        force_generate=False,
        caching=False
    )
    
    # Generate fixation map
    fix_map = helper.getFixationMap(fix_list[:, :2], dimension)
    sal_image = sal_generate.toImage(sal_map, cmap=SALIENCY_CMAP)
    
    # Save various visualization maps
    fix_map_img = sal_generate.toImage(fix_map, cmap=FIXATION_CMAP, reverse=True)
    sal_generate.saveImage(fix_map_img, savepath + "_fixmap")
    sal_generate.saveImage(sal_image / sal_map.max() * 255, savepath + "_salmap")
    sal_generate.saveImage(sal_image[:,:, [2,1,0]], savepath + "_csalmap")
    sal_generate.saveImage(sal_map, savepath + "_bsalmap", blend=path_stim)
    
    # Generate and save scanpath visualization
    scanp_generate.toImage(
        fix_list[:, :2], 
        dimension, 
        savepath + "_bscanpath", 
        blend=path_stim
    )
    
    # Save fixation data
    scanp_generate.toFile(
        fix_list, 
        savepath + "_fixation.csv",
        saveArr=np.arange(fix_list.shape[1]), 
        mode="w"
    )

In [7]:
def process_eye_tracking_data(csv_file_paths, parsed_values_list, texture_paths, 
                            eye, tracking, resample, filterSettings, parsingSettings, dimension):
    """Process eye tracking data and generate visualization maps."""
    
    for k in tqdm_notebook(range(1, len(csv_file_paths))):
        # Get file paths and indices
        path_raw_file = csv_file_paths[k]
        environment_index = int(parsed_values_list[k].get("environment_number"))
        path_stim = texture_paths[environment_index-1]
        
        # Create output directory structure
        parsed_values = list(parsed_values_list[k].values())
        write_path = '\\'.join([PARENT_WRITE_PATH, "-".join(parsed_values)])
        gazeLog_write_path = '\\'.join([write_path, 'gazelog.csv'])
        
        if not os.path.exists(write_path):
            os.makedirs(write_path)
        
        # Load and clean data
        df = pd.read_csv(path_raw_file, sep=";")
        df.replace("INVALID", np.nan, inplace=True)
        df.dropna(inplace=True)
        df.reset_index(drop=True, inplace=True)
        
        # Process timestamps
        initial_time = df.iloc[0,1]
        df["ts"] = df["CaptureTime"].apply(lambda x: (x - initial_time) * 1e-6)
        
        # Process head rotations
        world_rotation = int(parsed_values_list[k].get("world_rotation"))
        df = process_head_rotations(df, world_rotation)
        
        # Process eye directions
        df = process_eye_directions(df)
        
        # Save processed gaze data
        df.to_csv(gazeLog_write_path, columns=COLUMNS_TO_SAVE)
        
        # Generate gaze data and fixation list
        gaze_data, fix_list = helper.loadRawData(
            gazeLog_write_path,
            eye=eye,
            tracking=tracking,
            resample=resample,
            filter=filterSettings,
            parser=parsingSettings
        )
        
        # Generate and save visualization maps
        savename = "-".join(parsed_values)
        savepath = write_path + '\\'
        generate_visualization_maps(
            fix_list, dimension, savename, savepath, path_stim
        )

In [8]:
csv_file_paths = get_file_paths('raw_gaze', '*.csv')
texture_paths = get_file_paths('textures', '*.jpg')

# List to hold all parsed values
parsed_values_list = []

# Parse each CSV file's parameters
for path in csv_file_paths:
    parsed_values = parse_filename(path)
    if parsed_values:
        parsed_values_list.append(parsed_values)
        
# Tracking can be HE (Head+Eye) or H (Head alone)
tracking = "HE"

# Targeted eye
eye = "B"

# Resampling rate
resample = 120

# Filter settings
filterSettings = {"name": "savgol", "params": {"win": 9, "poly": 2}}

# Gaze parsing settings
parsingSettings = {"name": "I-VT", "params": {"threshold": 120}}

# Dimensions of output images (Height, Width)
dimension = [8192, 16384]

# Constants and settings
PARENT_WRITE_PATH = 'processed_gaze'
COLUMNS_TO_SAVE = [
    "ts", "xhead", "yhead", "zhead", "whead",
    "rightgazex", "rightgazey", "rightgazez",
    "leftgazex", "leftgazey", "leftgazez",
    "meangazedirx", "meangazediry", "meangazedirz"
]

# Saliency map settings
GAUSS_SIGMA = 2
SALIENCY_CMAP = "coolwarm"
FIXATION_CMAP = "binary"

In [None]:
process_eye_tracking_data(
    csv_file_paths=csv_file_paths,
    parsed_values_list=parsed_values_list,
    texture_paths=texture_paths,
    eye=eye,
    tracking=tracking,
    resample=resample,
    filterSettings=filterSettings,
    parsingSettings=parsingSettings,
    dimension=dimension
)