In [1]:
class Config:
    """Configuration manager for eye tracking visualization.
    
    This class handles all configuration parameters for eye gaze tracking visualization:
    - File paths and validation
    - Screen resolution and DPI settings
    - Point visualization parameters
    - Gaze movement smoothing parameters
    """
    
    def __init__(self):
        # Internal cache for computed values
        self._cache = {}
        
        # Input/Output file paths
        # Note: These paths are relative to the assets directory
        self.video_path = "assets/Test Your Awareness.avi"  # Source video
        self.data_path = "assets/DataPOR.csv"              # Eye tracking data
        self.output_path = "assets/LatestEyeTracking.avi"    # Output video
        
        # Validate file existence on initialization
        self._verify_files()
        
        # Source video resolution settings
        # Important: Must match the resolution of eye tracking data capture
        self._source_resolution = (1600, 1050)
        
        # DPI and pixel conversion settings
        # Used to ensure consistent visualization across different displays
        self._dpi = 96  # Standard screen DPI
        self._cm_to_pixels = self._dpi / 2.54  # Convert centimeters to pixels (1 inch = 2.54 cm)
        
        # Gaze point visualization settings
        # These determine how the gaze point appears in the video
        self._points_config = {
            'color': (0, 0, 255),  # BGR format (Red in this case)
            'radius': int(1.2 * self._cm_to_pixels),    # ~1.2cm radius in pixels
            'thickness': int(0.1 * self._cm_to_pixels)  # ~0.1cm thickness in pixels
        }
        
        # Gaze point offset settings
        # Compensates for potential calibration offsets
        self._offset = {
            'vertical': int(1.0 * self._cm_to_pixels),    # 1cm up
            'horizontal': int(1.0 * self._cm_to_pixels)   # 1cm horizontal
        }
        
        # Gaze movement smoothing parameters
        # These help reduce jitter and create more natural eye movement visualization
        self._smoothing_config = {
            'smoothing_factor': 0.8,        # Main smoothing weight (higher = smoother)
            'velocity_weight': 0.2,         # Velocity influence on movement
            'position_history_size': 3      # Number of positions to consider for smoothing
        }

    @property
    def source_resolution(self):
        """Get the source video resolution."""
        return self._source_resolution

    @property
    def points_config(self):
        """Get point visualization settings."""
        return self._points_config

    @property
    def offset(self):
        """Get calibration offset values."""
        return self._offset

    @property
    def smoothing_factor(self):
        """Get the main smoothing factor for gaze movement."""
        return self._smoothing_config['smoothing_factor']

    @property
    def velocity_weight(self):
        """Get the velocity influence weight for movement calculation."""
        return self._smoothing_config['velocity_weight']

    @property
    def position_history_size(self):
        """Get the number of historical positions to use for smoothing."""
        return self._smoothing_config['position_history_size']

    def _verify_files(self):
        """Verify that all required input files exist.
        
        Raises:
            FileNotFoundError: If any required file is missing
        """
        missing_files = []
        
        for path, desc in [
            (self.video_path, "Source video file"),
            (self.data_path, "Eye tracking data file")
        ]:
            if not os.path.exists(path):
                missing_files.append(f"{desc}: {path}")
        
        if missing_files:
            raise FileNotFoundError(
                "Required files not found:\n" + "\n".join(missing_files)
            )

    def get_output_paths(self, base_name=None):
        """Generate paths for all output files.
        
        Args:
            base_name (str, optional): Base name for output files.
                                     Default derives from output_path.
        
        Returns:
            dict: Paths for video, heatmap, and statistics files.
        """
        if base_name is None:
            base_name = os.path.splitext(self.output_path)[0]
            
        return {
            'video': f"{base_name}.avi",           # Main output video
            'heatmap': f"{base_name}_heatmap.png", # Final heatmap image
            'stats': f"{base_name}_stats.json"     # Analysis statistics
        }

    def to_dict(self):
        """Export configuration as a dictionary for logging or saving.
        
        Returns:
            dict: All configuration parameters in a serializable format.
        """
        return {
            'video_path': self.video_path,
            'data_path': self.data_path,
            'output_path': self.output_path,
            'source_resolution': self._source_resolution,
            'dpi': self._dpi,
            'points_config': self._points_config,
            'offset': self._offset,
            'smoothing_config': self._smoothing_config
        }

    def validate_paths(self):
        """Validate and prepare output directory structure.
        
        Creates output directory if needed and verifies write permissions.
        
        Raises:
            PermissionError: If the output directory is not writable.
        """
        output_dir = os.path.dirname(self.output_path)
        if output_dir and not os.path.exists(output_dir):
            os.makedirs(output_dir, exist_ok=True)
        
        # Verify write permissions with a test file
        test_file = os.path.join(output_dir, '.test_write')
        try:
            with open(test_file, 'w') as f:
                f.write('')
            os.remove(test_file)
        except (IOError, OSError) as e:
            raise PermissionError(f"Cannot write to output directory: {output_dir}. Error: {e}")

In [2]:
class PrecisePositionTracker:
    """Manages precise gaze position tracking with motion smoothing.
    
    This class implements an advanced position tracking system that:
    - Maintains a history of recent gaze positions
    - Applies velocity-based smoothing
    - Reduces jitter in eye tracking data
    - Provides movement direction analysis
    """
    
    def __init__(self, config):
        """Initialize the position tracker with configuration parameters.
        
        Args:
            config: Configuration object containing smoothing parameters
        """
        self.config = config
        
        # Initialize tracking arrays with numpy for better performance
        self.position_history = np.zeros((config.position_history_size, 2), dtype=np.float32)
        self.current_index = 0
        self.is_history_filled = False
        
        # Velocity tracking for smooth movement
        self.velocity = np.zeros(2, dtype=np.float32)
        
        # Constants for movement detection
        self._MOVEMENT_THRESHOLD = 0.1  # Minimal movement detection threshold
        self._MIN_VELOCITY_NORM = 1e-5  # Minimum velocity magnitude
        
        # Initialize smoothing parameters from config
        self._smoothing_factor = config.smoothing_factor
        self._velocity_weight = config.velocity_weight
        self._history_size = config.position_history_size

    def update(self, new_position):
        """Update position tracking with new gaze coordinates.
        
        Implements a sophisticated position update algorithm that:
        1. Calculates instantaneous velocity
        2. Updates smoothed velocity
        3. Applies position smoothing
        4. Updates position history
        
        Args:
            new_position (tuple): New (x, y) gaze position coordinates
            
        Returns:
            tuple: Smoothed (x, y) position coordinates
        """
        new_position = np.array(new_position, dtype=np.float32)
        
        # First position initialization
        if not self.is_history_filled and self.current_index == 0:
            self.position_history.fill(0)
            self.position_history[0] = new_position
            self.current_index = 1
            return tuple(map(int, new_position))
            
        # Calculate instantaneous velocity
        last_position = self.position_history[self.current_index - 1]
        current_velocity = new_position - last_position
        
        # Update smoothed velocity using exponential moving average
        self.velocity = (
            self.velocity * (1 - self._velocity_weight) +
            current_velocity * self._velocity_weight
        )
        
        # Apply position smoothing with velocity consideration
        smoothed_position = (
            last_position * self._smoothing_factor +
            new_position * (1 - self._smoothing_factor) +
            self.velocity * self._velocity_weight
        )
        
        # Update position history
        self.position_history[self.current_index] = smoothed_position
        self.current_index = (self.current_index + 1) % self._history_size
        
        # Mark history as filled if we've completed one cycle
        if self.current_index == 0:
            self.is_history_filled = True
        
        # Return integer coordinates for pixel-perfect rendering
        return tuple(map(int, smoothed_position))

    def get_movement_direction(self):
        """Analyze movement direction based on current velocity.
        
        Returns:
            int: Movement direction indicator
                 1 = moving right
                 -1 = moving left
                 0 = stationary or minimal movement
        """
        # Calculate velocity magnitude
        velocity_x = self.velocity[0]
        
        # Check if movement is significant
        if abs(velocity_x) < self._MOVEMENT_THRESHOLD:
            return 0
            
        # Determine direction based on x velocity
        return 1 if velocity_x > 0 else -1

    def get_average_velocity(self):
        """Calculate average velocity over recent history.
        
        Returns:
            tuple: Average (dx, dy) velocity components
        """
        if not self.is_history_filled:
            return (0.0, 0.0)
            
        # Calculate velocities between consecutive positions
        velocities = np.diff(self.position_history, axis=0)
        
        # Return average velocity components
        return tuple(np.mean(velocities, axis=0))

    def reset(self):
        """Reset tracker to initial state.
        
        Useful when starting a new tracking session or after significant gaps in data.
        """
        self.position_history.fill(0)
        self.velocity.fill(0)
        self.current_index = 0
        self.is_history_filled = False

    def get_smoothing_info(self):
        """Get current smoothing parameters for debugging or analysis.
        
        Returns:
            dict: Current smoothing parameters and state
        """
        return {
            'smoothing_factor': self._smoothing_factor,
            'velocity_weight': self._velocity_weight,
            'history_size': self._history_size,
            'current_velocity': tuple(self.velocity),
            'is_history_filled': self.is_history_filled
        }

In [3]:
class EyeTrackingDataProcessor:
    """Processes and manages eye tracking data from raw input.
    
    This class handles:
    - Loading and preprocessing of eye tracking data
    - Coordinate scaling and transformation
    - Real-time data processing for video synchronization
    - Position tracking with precision enhancement
    """
    
    # Define column names as class constants
    REQUIRED_COLUMNS = {
        'left_x': "L POR X [px]",
        'left_y': "L POR Y [px]",
        'right_x': "R POR X [px]",
        'right_y': "R POR Y [px]",
        'left_pos_x': "L EPOS X",
        'left_pos_y': "L EPOS Y",
        'left_pos_z': "L EPOS Z",
        'right_pos_x': "R EPOS X",
        'right_pos_y': "R EPOS Y",
        'right_pos_z': "R EPOS Z"
    }
    
    def __init__(self, config):
        """Initialize the data processor with configuration.
        
        Args:
            config: Configuration object containing path and processing parameters
        """
        self.config = config
        
        # Initialize data structures
        self.data = None
        self.eye_data = None
        self.frame_count = 0
        self.current_row = 0
        self.increment = 0
        
        # Create position tracker instance
        self.position_tracker = PrecisePositionTracker(config)
        
        # Data validation flags
        self._data_loaded = False
        self._processing_initialized = False
        
        # Performance optimization: pre-allocate buffers
        self._coord_buffer = np.zeros(2, dtype=np.float32)

    def load_data(self):
        """Load and preprocess eye tracking data from file.
        
        Raises:
            FileNotFoundError: If data file is not found
            ValueError: If required columns are missing
        """
        try:
            # Load data with optimized parameters
            self.data = pd.read_csv(
                self.config.data_path,
                delimiter='\t',
                encoding='utf-8',
                usecols=list(self.REQUIRED_COLUMNS.values()),
                dtype={col: np.float32 for col in self.REQUIRED_COLUMNS.values()},
                low_memory=False
            )
            
            # Validate required columns
            missing_cols = set(self.REQUIRED_COLUMNS.values()) - set(self.data.columns)
            if missing_cols:
                raise ValueError(f"Missing required columns: {missing_cols}")
            
            # Extract eye tracking data, skipping header row
            self.eye_data = self.data.iloc[1:].copy()
            
            # Optimize memory usage
            self.eye_data = self.eye_data.astype(np.float32)
            
            self._data_loaded = True
            
        except FileNotFoundError:
            raise FileNotFoundError(f"Eye tracking data file not found: {self.config.data_path}")
        except Exception as e:
            raise ValueError(f"Error loading eye tracking data: {str(e)}")

    def initialize_processing(self, frame_count):
        """Initialize data processing for video synchronization.
        
        Args:
            frame_count (int): Total number of video frames to process
            
        Raises:
            RuntimeError: If data hasn't been loaded
        """
        if not self._data_loaded:
            raise RuntimeError("Data must be loaded before initializing processing")
            
        self.frame_count = frame_count
        self.increment = len(self.eye_data) / frame_count
        self._processing_initialized = True

    def get_coordinates(self, target_resolution):
        """Get processed coordinates for current frame.
        
        Args:
            target_resolution (tuple): Target video resolution (width, height)
            
        Returns:
            tuple: Processed (x, y) coordinates or None if end of data
            
        Raises:
            RuntimeError: If processing hasn't been initialized
        """
        if not self._processing_initialized:
            raise RuntimeError("Processing must be initialized before getting coordinates")
            
        if int(self.current_row) >= len(self.eye_data) - 1:
            return None
            
        # Get current data point
        current_data = self.eye_data.iloc[int(self.current_row)]
        
        # Process coordinates
        coords = self._process_coordinates(current_data, target_resolution)
        
        # Update row counter
        self.current_row += self.increment
        
        return coords

    def _process_coordinates(self, point_data, target_resolution):
        """Process raw coordinates with precision enhancement.
        
        Args:
            point_data (pd.Series): Raw eye tracking data point
            target_resolution (tuple): Target video resolution
            
        Returns:
            tuple: Processed (x, y) coordinates
        """
        # Average left and right eye coordinates
        x = (float(point_data[self.REQUIRED_COLUMNS['left_x']]) + 
             float(point_data[self.REQUIRED_COLUMNS['right_x']])) / 2
        y = (float(point_data[self.REQUIRED_COLUMNS['left_y']]) + 
             float(point_data[self.REQUIRED_COLUMNS['right_y']])) / 2
        
        # Scale coordinates to target resolution
        x = x / self.config.source_resolution[0] * target_resolution[0]
        y = y / self.config.source_resolution[1] * target_resolution[1]
        
        # Track position with smoothing
        return self.position_tracker.update((int(x), int(y)))

    def get_processing_stats(self):
        """Get current processing statistics.
        
        Returns:
            dict: Processing statistics and state information
        """
        return {
            'total_rows': len(self.eye_data) if self._data_loaded else 0,
            'current_row': self.current_row,
            'frame_count': self.frame_count,
            'increment': self.increment,
            'is_initialized': self._processing_initialized,
            'tracker_info': self.position_tracker.get_smoothing_info()
        }

    def reset(self):
        """Reset processor to initial state."""
        self.current_row = 0
        self.position_tracker.reset()

In [4]:
class PointVisualizer:
    """Manages the visualization of eye tracking points on video frames.
    
    This class handles:
    - Drawing gaze points with enhanced visibility
    - Creating visual effects (halos, shadows)
    - Optimized frame processing
    - Coordinate validation
    """
    
    def __init__(self, config):
        """Initialize the point visualizer with configuration.
        
        Args:
            config: Configuration object containing visualization parameters
        """
        self.config = config
        
        # Pre-calculate visualization parameters
        self._setup_visualization_params()
        
        # Pre-allocate masks for better performance
        self._initialize_masks()

    def _setup_visualization_params(self):
        """Pre-calculate visualization parameters for performance."""
        self._vis_params = {
            'color': self.config.points_config['color'],
            'radius': self.config.points_config['radius'],
            'thickness': self.config.points_config['thickness'],
            'halo_radius': self.config.points_config['radius'] + 10,
            'blur_size': (35, 35),
            'halo_alpha': 0.3,
            'point_alpha': 0.9
        }
        
        # Pre-calculate blur kernel for halo effect
        self._blur_kernel = cv2.getGaussianKernel(35, 0)
        self._blur_kernel = self._blur_kernel * self._blur_kernel.T

    def _initialize_masks(self):
        """Pre-allocate masks used in visualization."""
        self._masks = {
            'temp': None,  # Will be initialized with first frame
            'halo': None,
            'main': None
        }

    def process_frame(self, frame, point):
        """Process a video frame by adding gaze point visualization.
        
        Args:
            frame (np.ndarray): Input video frame
            point (tuple): (x, y) coordinates of gaze point
            
        Returns:
            np.ndarray: Frame with visualized gaze point
            
        Note:
            - Creates a halo effect for better visibility
            - Applies smooth blending of effects
            - Handles edge cases for point coordinates
        """
        # Validate coordinates
        if not self._are_coordinates_valid(point, frame.shape):
            return frame
            
        # Initialize masks if needed
        if self._masks['temp'] is None:
            shape = frame.shape
            for key in self._masks:
                self._masks[key] = np.zeros(shape, dtype=frame.dtype)
        
        # Clear previous masks
        for mask in self._masks.values():
            mask.fill(0)
            
        # Draw halo effect
        cv2.circle(
            self._masks['halo'],
            point,
            self._vis_params['halo_radius'],
            self._vis_params['color'],
            -1  # Filled circle
        )
        
        # Apply optimized gaussian blur for halo
        self._apply_blur(self._masks['halo'])
        
        # Draw main point
        cv2.circle(
            self._masks['main'],
            point,
            self._vis_params['radius'],
            self._vis_params['color'],
            self._vis_params['thickness']
        )
        
        # Combine layers efficiently
        return self._combine_layers(frame)

    def _apply_blur(self, mask):
        """Apply pre-calculated gaussian blur to mask.
        
        Args:
            mask (np.ndarray): Mask to blur
        """
        cv2.filter2D(mask, -1, self._blur_kernel, mask)

    def _combine_layers(self, frame):
        """Combine all visual layers efficiently.
        
        Args:
            frame (np.ndarray): Original frame
            
        Returns:
            np.ndarray: Combined frame with all effects
        """
        # Create working copy
        result = frame.copy()
        
        # Add halo effect
        cv2.addWeighted(
            result, 1.0,
            self._masks['halo'], self._vis_params['halo_alpha'],
            0, result
        )
        
        # Add main point
        cv2.addWeighted(
            result, 1.0,
            self._masks['main'], self._vis_params['point_alpha'],
            0, result
        )
        
        return result

    @staticmethod
    def _are_coordinates_valid(point, frame_shape):
        """Validate point coordinates against frame dimensions.
        
        Args:
            point (tuple): (x, y) coordinates to validate
            frame_shape (tuple): Shape of the frame
            
        Returns:
            bool: True if coordinates are valid
        """
        if point is None:
            return False
            
        x, y = point
        height, width = frame_shape[:2]
        
        return (0 <= x < width) and (0 <= y < height)

    def get_visualization_params(self):
        """Get current visualization parameters.
        
        Returns:
            dict: Current visualization parameters and effects settings
        """
        return {
            'visualization_params': self._vis_params.copy(),
            'frame_info': {
                'mask_shape': self._masks['temp'].shape if self._masks['temp'] is not None else None,
                'blur_kernel_size': self._blur_kernel.shape
            }
        }

In [5]:
class VideoProcessor:
    """Handles video processing with audio preservation and optimized I/O operations.
    
    This class manages:
    - Video capture and writing with proper codec handling
    - Audio extraction and reintegration
    - Memory-efficient frame processing
    - Progress tracking and error handling
    """
    
    # Class constants for video processing
    VIDEO_CODEC = 'mp4v'  # Default codec for temporary video
    FINAL_CODEC = 'libx264'  # H.264 codec for final output
    QUALITY_CRF = '17'  # High quality (15-18 is visually lossless)
    
    def __init__(self, config):
        """Initialize video processor with configuration.
        
        Args:
            config: Configuration object containing video parameters
        """
        self.config = config
        
        # Video handling objects
        self.cap = None  # Video capture
        self.out = None  # Video writer
        
        # Video properties
        self.frame_size = None
        self.fps = None
        self.frame_count = None
        self.current_frame = 0
        
        # Temporary file handling
        self.temp_video_path = "temp_output.mp4"
        
        # Performance optimization
        self._buffer = None  # Pre-allocated frame buffer
        self._processing_stats = {
            'frames_processed': 0,
            'start_time': None,
            'dropped_frames': 0
        }

    def open_video(self):
        """Open and initialize video capture and writer.
        
        Returns:
            int: Total number of frames in video
            
        Raises:
            RuntimeError: If video file cannot be opened
        """
        try:
            self.cap = cv2.VideoCapture(self.config.video_path)
            if not self.cap.isOpened():
                raise RuntimeError(f"Cannot open video file: {self.config.video_path}")
            
            # Get video properties
            self.frame_size = (
                int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
                int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
            )
            self.fps = int(self.cap.get(cv2.CAP_PROP_FPS))
            self.frame_count = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
            
            # Initialize video writer with optimized settings
            self._initialize_writer()
            
            # Initialize performance tracking
            self._processing_stats['start_time'] = time.time()
            
            return self.frame_count
            
        except Exception as e:
            self.release()  # Clean up on error
            raise RuntimeError(f"Error initializing video processing: {str(e)}")

    def _initialize_writer(self):
        """Initialize video writer with optimized codec settings."""
        try:
            fourcc = cv2.VideoWriter_fourcc(*self.VIDEO_CODEC)
            self.out = cv2.VideoWriter(
                self.temp_video_path,
                fourcc,
                self.fps,
                self.frame_size,
                True  # isColor
            )
            
            if not self.out.isOpened():
                raise RuntimeError("Failed to initialize video writer")
                
        except Exception as e:
            raise RuntimeError(f"Error creating video writer: {str(e)}")

    def read_frame(self):
        """Read next frame from video efficiently.
        
        Returns:
            tuple: (success, frame) where success is bool and frame is np.ndarray
        """
        # Pre-allocate buffer for first frame
        if self._buffer is None and self.frame_size:
            self._buffer = np.empty((self.frame_size[1], self.frame_size[0], 3), 
                                  dtype=np.uint8)
        
        success = self.cap.grab()  # Faster than retrieve for sequential access
        if not success:
            return False, None
            
        success, frame = self.cap.retrieve()
        if success:
            self.current_frame += 1
            self._processing_stats['frames_processed'] += 1
            return True, frame
        else:
            self._processing_stats['dropped_frames'] += 1
            return False, None

    def write_frame(self, frame):
        """Write frame to output video.
        
        Args:
            frame (np.ndarray): Processed frame to write
        """
        if self.out is not None:
            self.out.write(frame)

    def release(self):
        """Release resources and finalize video with audio.
        
        This method:
        1. Releases video capture and writer
        2. Merges audio from original video
        3. Applies final compression settings
        4. Cleans up temporary files
        """
        # Release video objects
        if self.cap:
            self.cap.release()
        if self.out:
            self.out.release()
        cv2.destroyAllWindows()
        
        try:
            self._merge_audio_and_finalize()
        except Exception as e:
            print(f"Warning: Error during audio merging: {e}")
            print("Falling back to video without audio")
            self._fallback_to_video_only()

    def _merge_audio_and_finalize(self):
        """Merge audio and apply final video compression."""
        try:
            # FFmpeg command for high-quality output
            command = [
                'ffmpeg',
                '-y',  # Overwrite output
                '-i', self.temp_video_path,  # Processed video
                '-i', self.config.video_path,  # Original video (audio source)
                '-c:v', self.FINAL_CODEC,
                '-preset', 'slower',  # Higher quality encoding
                '-crf', self.QUALITY_CRF,
                '-c:a', 'copy',  # Copy audio without re-encoding
                '-movflags', '+faststart',  # Enable streaming
                '-pix_fmt', 'yuv420p',  # Ensure compatibility
                self.config.output_path
            ]
            
            # Execute FFmpeg command
            subprocess.run(command, check=True, capture_output=True)
            
            # Clean up temporary file
            if os.path.exists(self.temp_video_path):
                os.remove(self.temp_video_path)
                
        except subprocess.CalledProcessError as e:
            raise RuntimeError(f"FFmpeg error: {e.stderr.decode()}")

    def _fallback_to_video_only(self):
        """Fall back to processed video without audio if merging fails."""
        if os.path.exists(self.temp_video_path):
            shutil.move(self.temp_video_path, self.config.output_path)

    def get_processing_stats(self):
        """Get current processing statistics.
        
        Returns:
            dict: Processing statistics and performance metrics
        """
        stats = self._processing_stats.copy()
        if stats['start_time']:
            elapsed_time = time.time() - stats['start_time']
            stats.update({
                'elapsed_time': elapsed_time,
                'fps_average': (stats['frames_processed'] / elapsed_time
                              if elapsed_time > 0 else 0),
                'progress_percent': (self.current_frame / self.frame_count * 100
                                   if self.frame_count else 0)
            })
        return stats

In [6]:
import numba
import cv2
import numpy as np
import time
from threading import Lock

@numba.jit(nopython=True)
def calculate_grid_position(x, y, cell_width, cell_height, grid_rows, grid_cols):
    """Calcule la position dans la grille de manière optimisée."""
    col = min(max(int(x / cell_width), 0), grid_cols - 1)
    row = min(max(int(y / cell_height), 0), grid_rows - 1)
    return row, col

class HeatmapVisualizer:
    """Visualiseur de heatmap pour le suivi oculaire."""
    
    def __init__(self, frame_size, grid_size=(12, 8)):
        self.frame_width, self.frame_height = frame_size
        self.grid_cols, self.grid_rows = grid_size
        
        # Protection pour le multithreading
        self.lock = Lock()
        
        # Configuration de la grille
        self.cell_width = self.frame_width // self.grid_cols
        self.cell_height = self.frame_height // self.grid_rows
        
        # Compteurs temporels
        self._init_counters()
        
        # Configuration des couleurs et du kernel
        self._setup_colors()
        self._setup_kernel()

    def _init_counters(self):
        """Initialise les compteurs de temps."""
        self.grid_times = np.zeros((self.grid_rows, self.grid_cols), dtype=np.float32)
        self.frame_time = 1.0 / 30.0  # Temps estimé par frame (30 FPS)
        self.total_time = 0.0
        self.last_cell = None

    def _setup_colors(self):
        """Configure le schéma de couleurs vives mais adoucies."""
        # Couleurs vives mais adoucies (en BGR)
        self.colors = np.array([
            [180, 180, 120],  # Bleu-vert doux
            [140, 200, 100],  # Vert vif adouci
            [100, 140, 200],  # Orange doux
            [100, 120, 220]   # Rouge vif adouci
        ], dtype=np.uint8)
        
        # Génération du gradient de couleurs
        steps = 256
        self.color_gradient = np.zeros((steps, 3), dtype=np.uint8)
        
        # Transition douce entre les couleurs
        for i in range(steps):
            t = i / (steps - 1)
            if t < 0.33:
                color1, color2 = self.colors[0], self.colors[1]
                factor = t / 0.33
            elif t < 0.66:
                color1, color2 = self.colors[1], self.colors[2]
                factor = (t - 0.33) / 0.33
            else:
                color1, color2 = self.colors[2], self.colors[3]
                factor = (t - 0.66) / 0.34
                
            self.color_gradient[i] = np.round(
                color1 * (1 - factor) + color2 * factor
            ).astype(np.uint8)

    def _setup_kernel(self):
        """Configure le kernel gaussien pour l'effet de flou."""
        kernel_size = min(35, self.cell_width - 1)
        if kernel_size % 2 == 0:
            kernel_size -= 1
        sigma = kernel_size / 6.0
        
        self.gaussian_kernel = cv2.getGaussianKernel(kernel_size, sigma)
        self.gaussian_kernel = (self.gaussian_kernel * self.gaussian_kernel.T).astype(np.float32)
        self.kernel_size = kernel_size

    def update_heatmap(self, coords):
        """Met à jour la heatmap avec de nouvelles coordonnées."""
        if coords is None or not isinstance(coords, (tuple, list)) or len(coords) != 2:
            return
            
        x, y = coords
        if not (0 <= x < self.frame_width and 0 <= y < self.frame_height):
            return

        with self.lock:
            # Calcul de la cellule courante
            row, col = calculate_grid_position(
                float(x), float(y),
                self.cell_width, self.cell_height,
                self.grid_rows, self.grid_cols
            )
            
            # Mise à jour des temps
            self.grid_times[row, col] += self.frame_time
            self.total_time += self.frame_time
            
            # Application du kernel gaussien pour l'effet de chaleur
            half_size = self.kernel_size // 2
            y1 = max(0, int(y - half_size))
            y2 = min(self.frame_height, int(y + half_size + 1))
            x1 = max(0, int(x - half_size))
            x2 = min(self.frame_width, int(x + half_size + 1))
            
            if x2 > x1 and y2 > y1:
                kernel_y1 = max(0, half_size - (y - y1))
                kernel_y2 = min(self.kernel_size, kernel_y1 + (y2 - y1))
                kernel_x1 = max(0, half_size - (x - x1))
                kernel_x2 = min(self.kernel_size, kernel_x1 + (x2 - x1))
                
                # Application du noyau gaussien
                kernel_section = self.gaussian_kernel[
                    kernel_y1:kernel_y2,
                    kernel_x1:kernel_x2
                ]
                
                self.grid_times[row, col] *= 1.01  # Petit bonus pour la cellule active

    def overlay_heatmap(self, frame, alpha=0.5):
        """Superpose la heatmap sur l'image."""
        if frame is None:
            return frame

        with self.lock:
            try:
                output = frame.copy()
                
                if self.total_time > 0:
                    # Calcul des pourcentages
                    percentages = (self.grid_times / self.total_time) * 100
                    
                    # Création de la heatmap colorée
                    heatmap_overlay = np.zeros((*frame.shape[:2], 3), dtype=np.uint8)
                    
                    for row in range(self.grid_rows):
                        for col in range(self.grid_cols):
                            if percentages[row, col] > 0:
                                # Définition des limites de la cellule
                                y1 = row * self.cell_height
                                y2 = (row + 1) * self.cell_height
                                x1 = col * self.cell_width
                                x2 = (col + 1) * self.cell_width
                                
                                # Choix de la couleur basé sur le pourcentage
                                normalized = percentages[row, col] / np.max(percentages)
                                color_idx = int(np.clip(normalized * 255, 0, 255))
                                color = self.color_gradient[color_idx]
                                
                                # Application de la couleur
                                heatmap_overlay[y1:y2, x1:x2] = color
                    
                    # Fusion avec le frame
                    mask = (heatmap_overlay > 0).any(axis=2)
                    output[mask] = cv2.addWeighted(
                        output[mask], 1-alpha,
                        heatmap_overlay[mask], alpha,
                        0
                    )
                    
                    # Ajout de la grille et des pourcentages
                    self._add_grid_overlay(output, percentages)
                    
                    # Ajout du temps total
                    cv2.putText(output, f"Temps total: {self.total_time:.1f}s", 
                              (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, 
                              (255, 255, 255), 2, cv2.LINE_AA)
                
                return output
                
            except Exception as e:
                print(f"Error in overlay_heatmap: {e}")
                return frame.copy()

    def _add_grid_overlay(self, frame, percentages):
        """Ajoute la grille et les pourcentages."""
        try:
            font = cv2.FONT_HERSHEY_SIMPLEX
            font_scale = min(0.4, self.cell_width / 100)
            
            for row in range(self.grid_rows):
                for col in range(self.grid_cols):
                    # Dessin de la bordure de la cellule
                    x1 = col * self.cell_width
                    y1 = row * self.cell_height
                    x2 = x1 + self.cell_width
                    y2 = y1 + self.cell_height
                    cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 255, 255), 1)
                    
                    # Ajout du pourcentage si significatif
                    percentage = percentages[row, col]
                    if percentage >= 0.5:  # Seuil de visibilité
                        text = f"{percentage:.1f}%"
                        (text_w, text_h), _ = cv2.getTextSize(text, font, font_scale, 1)
                        text_x = x1 + (self.cell_width - text_w) // 2
                        text_y = y1 + (self.cell_height + text_h) // 2
                        
                        # Texte avec contour pour meilleure lisibilité
                        cv2.putText(frame, text, (int(text_x), int(text_y)), 
                                  font, font_scale, (0, 0, 0), 3, cv2.LINE_AA)
                        cv2.putText(frame, text, (int(text_x), int(text_y)), 
                                  font, font_scale, (255, 255, 255), 1, cv2.LINE_AA)
                        
        except Exception as e:
            print(f"Error in _add_grid_overlay: {e}")

    def save_final_visualization(self, output_path, frame):
        """Sauvegarde la visualisation finale."""
        if frame is None:
            return
            
        try:
            final_frame = self.overlay_heatmap(frame.copy())
            if final_frame is not None:
                cv2.imwrite(output_path, final_frame)
                
        except Exception as e:
            print(f"Error saving visualization: {e}")

In [7]:
import cv2
import pandas as pd
import numpy as np
import os
from concurrent.futures import ThreadPoolExecutor
import multiprocessing
import psutil
import logging
import warnings
from tqdm.notebook import tqdm
import gc
import time
import subprocess
import shutil
import numba

# Configuration des avertissements et du logging
warnings.filterwarnings('ignore', category=UserWarning)
warnings.filterwarnings('ignore', category=FutureWarning)
logging.basicConfig(level=logging.INFO)

def process_frame_batch(data):
    """Traite une frame individuelle avec les données de suivi oculaire."""
    frame, coords, config, heatmap_visualizer, point_visualizer = data
    
    try:
        if coords is not None:
            # Application des visualisations
            heatmap_visualizer.update_heatmap(coords)
            processed_frame = point_visualizer.process_frame(frame, coords)
            processed_frame = heatmap_visualizer.overlay_heatmap(processed_frame)
            
            # Affichage en temps réel
            cv2.imshow('Eye Tracking Analysis', processed_frame)
            cv2.waitKey(1)
            
            return processed_frame
        else:
            return frame.copy()
            
    except Exception as e:
        print(f"Erreur de traitement de frame : {e}")
        return frame.copy()

def optimize_system_resources():
    """Optimise les ressources système pour le traitement."""
    try:
        # Optimisation de la priorité du processus
        p = psutil.Process(os.getpid())
        if hasattr(p, "nice"):
            p.nice(10)
        
        # Configuration du garbage collector
        gc.enable()
        gc.set_threshold(700, 10, 5)
        
        # Optimisations OpenCV
        cv2.setNumThreads(multiprocessing.cpu_count())
        
        # Optimisations NumPy
        np.set_printoptions(precision=3, suppress=True)
        
        return True
    except Exception as e:
        print(f"Erreur d'optimisation des ressources : {e}")
        return False

try:
    # Application des optimisations système
    optimize_system_resources()

    # Chargement de la configuration
    config = Config()
    print("Configuration chargée avec succès")
    
    # Création des processeurs avec un nombre optimal de threads
    n_threads = max(1, multiprocessing.cpu_count() - 1)
    video_processor = VideoProcessor(config)
    data_processor = EyeTrackingDataProcessor(config)
    
    # Initialisation du traitement
    print("Initialisation du traitement...")
    frame_count = video_processor.open_video()
    data_processor.load_data()
    data_processor.initialize_processing(frame_count)
    
    # Initialisation des visualiseurs (une seule instance pour toute la durée)
    heatmap_visualizer = HeatmapVisualizer(video_processor.frame_size)
    point_visualizer = PointVisualizer(config)
    
    # Variables de suivi
    last_frame = None
    frames_processed = 0
    start_time = time.time()
    
    print(f"Démarrage du traitement avec {n_threads} threads...")
    
    # Configuration de la fenêtre d'affichage
    cv2.namedWindow('Eye Tracking Analysis', cv2.WINDOW_NORMAL)
    
    # Traitement principal avec ThreadPoolExecutor
    with ThreadPoolExecutor(max_workers=n_threads) as executor:
        # Barre de progression pour Jupyter
        for _ in tqdm(range(frame_count), desc="Traitement des frames", unit="frame"):
            # Lecture de la frame
            ret, frame = video_processor.read_frame()
            if not ret:
                break
            
            # Obtention des coordonnées
            coords = data_processor.get_coordinates(video_processor.frame_size)
            
            # Traitement de la frame
            processed_frame = process_frame_batch((
                frame, coords, config, heatmap_visualizer, point_visualizer
            ))
            
            # Mise à jour et sauvegarde
            if processed_frame is not None:
                last_frame = processed_frame.copy()
                video_processor.write_frame(processed_frame)
                frames_processed += 1
            
            # Nettoyage périodique de la mémoire
            if frames_processed % 100 == 0:
                gc.collect()
                # Affichage des stats de performance
                elapsed_time = time.time() - start_time
                fps = frames_processed / elapsed_time if elapsed_time > 0 else 0
                print(f"\rPerformance - FPS moyen: {fps:.2f}", end='')
    
    # Sauvegarde de la visualisation finale
    if last_frame is not None:
        final_output = os.path.splitext(config.output_path)[0] + "_final_heatmap.png"
        heatmap_visualizer.save_final_visualization(final_output, last_frame)
        print(f"\nHeatmap finale sauvegardée : {final_output}")
    
    # Nettoyage et finalisation
    video_processor.release()
    
    # Statistiques finales
    total_time = time.time() - start_time
    average_fps = frames_processed / total_time if total_time > 0 else 0
    print(f"\nTraitement terminé !")
    print(f"Temps total : {total_time:.2f} secondes")
    print(f"Frames traitées : {frames_processed}")
    print(f"FPS moyen : {average_fps:.2f}")
    
except Exception as e:
    print(f"Erreur lors du traitement : {str(e)}")
    raise e
finally:
    # Attendre un peu avant de fermer les fenêtres
    cv2.waitKey(1000)
    cv2.destroyAllWindows()
    gc.collect()

Configuration chargée avec succès
Initialisation du traitement...
Démarrage du traitement avec 15 threads...


Traitement des frames:   0%|          | 0/1709 [00:00<?, ?frame/s]

Performance - FPS moyen: 10.96
Heatmap finale sauvegardée : assets/LatestEyeTracking_final_heatmap.png

Traitement terminé !
Temps total : 175.49 secondes
Frames traitées : 1709
FPS moyen : 9.74
