In [None]:
import pandas as pd
import os
import sys
import torch
import re
import json
import math
import os

import numpy as np
from torchvision.transforms import GaussianBlur
from torchvision import transforms
import torch.nn.functional as F
import torchvision.transforms as T
import cv2

from PIL import Image, ImageDraw, ImageFont, ImageFilter
current_dir = os.getcwd()
from pathlib import Path
# 현재 작업 경로 가져오기
current_dir = Path.cwd()
# 상위 폴더 가져오기
parent_dir = current_dir.parent
# sys.path에 추가 (문자열로 변환 필요)
sys.path.append(str(parent_dir))
print(f"Added to sys.path: {parent_dir}")
# 경로 설정이 완료된 후 import 해야 합니다.
from VLM_model_dot_relative import QwenVLModel, MetricsTracker
from file_managing import (
    load_selected_samples,
    get_actual_path,
    get_gt_path,
)
from config import AGD20K_PATH

In [2]:
def load_ground_truth(gt_path):
    """
    Load and process ground truth image
    Args:
        gt_path (str): Path to the ground truth image
    Returns:
        torch.Tensor: Processed ground truth tensor normalized to [0, 1]
    """
    try:
        # Load the ground truth image
        gt_img = Image.open(gt_path)
        
        # Convert to grayscale if image is RGB
        if gt_img.mode == 'RGB':
            gt_img = gt_img.convert('L')
        
        # Convert to tensor
        gt_tensor = transforms.ToTensor()(gt_img).squeeze(0)
        
        # Normalize to [0, 1]
        if gt_tensor.max() > 0:
            gt_tensor = (gt_tensor - gt_tensor.min()) / (gt_tensor.max() - gt_tensor.min())
        
        return gt_tensor
        
    except Exception as e:
        print(f"⚠️ Failed to load ground truth image: {str(e)}")
        return None

def create_heatmap_from_dots_v2(image_size, dots):
    """
    Create a heatmap from dot coordinates using Gaussian kernels with dynamic sigma.
    Args:
        image_size (tuple): Size of the image (height, width)
        dots (list): List of dot coordinates [x, y]
    Returns:
        torch.Tensor: Heatmap tensor
    """
    height, width = image_size

    # Dynamic sigma based on image dimensions (simple linear scaling)
    base_size = 640  # Reference size
    base_sigma = 60
    scale_factor = ((height + width) / 2) / base_size
    sigma = int( base_sigma * scale_factor)
    heatmap = torch.zeros((height, width))
    for dot in dots:
        # Convert coordinates to integers
        x, y = map(int, dot)
        # Ensure coordinates are within image bounds
        x = max(0, min(x, width-1))
        y = max(0, min(y, height-1))
        # Create coordinate grids for the entire image
        y_grid, x_grid = torch.meshgrid(
            torch.arange(height, dtype=torch.float32),
            torch.arange(width, dtype=torch.float32),
            indexing='ij'
        )
        # Calculate Gaussian values centered at the dot
        gaussian = torch.exp(
            -((x_grid - x)**2 + (y_grid - y)**2) / (2 * sigma**2)
        )
        # Add to heatmap
        heatmap += gaussian
    # Normalize heatmap
    if heatmap.max() > 0:
        heatmap = (heatmap - heatmap.min()) / (heatmap.max() - heatmap.min() + 1e-10)
    return heatmap

def draw_dots_on_image( image_path, dots, gt_path, action, exo_path=None, exo_type=None, output_path=None):
    """
    Draw dots and create heatmap, save results side by side with GT
    Args:
        image_path (str): Path to the ego image
        dots (list): List of dot coordinates [x, y]
        gt_path (str): Path to the ground truth image
        action (str): Action name for the filename
        exo_path (str, optional): Path to the exo image (if provided, creates 3x2 layout)
        exo_type (str, optional): Type of exo image ('random' or 'selected')
        output_path (str, optional): Path to save the result image
    Returns:
        str: Path to the saved image
        torch.Tensor: Generated heatmap for metric calculation
    """
    # Load the ego image
    ego_img = Image.open(image_path)
    if exo_path is not None:
        exo_file_name = os.path.basename(exo_path)
    width, height = ego_img.size
    
    # Load exo image if provided
    exo_img = None
    if exo_path:
        exo_img = Image.open(exo_path)
    
    # Create heatmap from dots
    heatmap_tensor = create_heatmap_from_dots_v2((height, width), dots)
    
    # Convert heatmap to RGB image
    heatmap_img = transforms.ToPILImage()(heatmap_tensor.unsqueeze(0).repeat(3, 1, 1))
    
    # Create a copy for dot drawing
    dot_img = draw_dots_on_single_image(ego_img, dots, color='red', radius=15)
    
    # Determine layout based on image aspect ratio
    aspect_ratio = width / height
    
    # For very wide images (aspect ratio > 2), adjust font size based on width
    if aspect_ratio > 2:
        font_size = min(50, width // 12)  # Larger font for wide images
        header_height = 110  # Increased header height
        spacing = 30  # Normal spacing
    elif aspect_ratio > 1.5:  # For moderately wide images
        font_size = min(55, width // 10)  # Larger font for moderately wide images
        header_height = 120  # Increased header height
        spacing = 35  # Slightly increased spacing
    else:
        font_size = max(60, width // 8)  # Largest font size for normal images
        header_height = 130  # Normal header height
        spacing = 40  # Normal spacing
    
    # Create a new image with 3x2 layout
    combined_width = width * 3
    combined_height = height * 2 + header_height * 2 + spacing * 3 + 40  # Dynamic height
    combined_img = Image.new('RGB', (combined_width, combined_height), 'white')
    
    # Try to load fonts (size proportional to image width and aspect ratio)
    try:
        # Try to load a font that supports Korean
        font = ImageFont.truetype("/usr/share/fonts/truetype/nanum/NanumGothic.ttf", font_size)
    except:
        try:
            # Fallback to DejaVu font
            font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", font_size)
        except:
            # Last resort: default font
            font = ImageFont.load_default()

    # Get file names
    ego_filename = os.path.basename(image_path)
    gt_filename = os.path.basename(gt_path) if gt_path else "No GT"
    
    # Draw file names and titles for 3x2 layout
    draw = ImageDraw.Draw(combined_img)
    
    # Configure headers based on whether exo image is provided
    if exo_img:
        exo_filename = os.path.basename(exo_path)
        # Top row headers for exo version
        top_headers = [
            ("Ego", ego_filename),
            ("Exo", exo_filename),
            ("", "")  # Empty space
        ]
    else:
        # Top row headers for ego only version
        top_headers = [
            ("Original", ego_filename),
            ("", ""),  # Empty space
            ("", "")   # Empty space
        ]
    
    # Bottom row headers (same for both versions)
    bottom_headers = [
        ("Dots", action+"_"+ego_filename),
        ("Heatmap", action+"_"+ ego_filename),
        ("GT", action+"_"+gt_filename)
    ]
    
    # Draw top row headers with background
    for idx, (title, filename) in enumerate(top_headers):
        if title:  # Only draw if not empty
            section_width = width
            section_x = idx * section_width
            
            # Draw white background for text area
            draw.rectangle([section_x, 0, section_x + section_width, header_height], fill='white', outline='lightgray')
            
            # Draw title
            title_width = draw.textlength(title, font=font)
            title_x = section_x + (section_width - title_width) // 2
            draw.text((title_x, 5), title, fill='black', font=font)
            
            # Draw filename (truncate if too long)
            max_filename_width = section_width - 20
            filename_truncated = filename
            while draw.textlength(filename_truncated + "...", font=font) > max_filename_width and len(filename_truncated) > 0:
                filename_truncated = filename_truncated[:-1]
            if filename_truncated != filename:
                filename_truncated += "..."
            
            filename_width = draw.textlength(filename_truncated, font=font)
            filename_x = section_x + (section_width - filename_width) // 2
            draw.text((filename_x, header_height // 2 + 5), filename_truncated, fill='black', font=font)
    
    # Draw bottom row headers with background
    for idx, (title, filename) in enumerate(bottom_headers):
        section_width = width
        section_x = idx * section_width
        section_y = height + header_height + spacing  # Position below first row
        
        # Draw white background for text area
        draw.rectangle([section_x, section_y - 10, section_x + section_width, section_y + header_height], fill='white', outline='lightgray')
        
        # Draw title
        title_width = draw.textlength(title, font=font)
        title_x = section_x + (section_width - title_width) // 2
        draw.text((title_x, section_y), title, fill='black', font=font)
        
        # Draw filename (truncate if too long)
        max_filename_width = section_width - 20
        filename_truncated = filename
        while draw.textlength(filename_truncated + "...", font=font) > max_filename_width and len(filename_truncated) > 0:
            filename_truncated = filename_truncated[:-1]
        if filename_truncated != filename:
            filename_truncated += "..."
        
        filename_width = draw.textlength(filename_truncated, font=font)
        filename_x = section_x + (section_width - filename_width) // 2
        draw.text((filename_x, section_y + header_height // 2), filename_truncated, fill='black', font=font)
    
    # Paste images in 3x2 layout
    # Top row: Ego image and optionally Exo image
    top_image_y = header_height + spacing
    combined_img.paste(ego_img, (0, top_image_y))  # Ego
    if exo_img:
        combined_img.paste(exo_img, (width, top_image_y))  # Exo
    
    # Bottom row: Dots, Heatmap, GT
    bottom_image_y = height + header_height * 2 + spacing * 2
    combined_img.paste(dot_img, (0, bottom_image_y))  # Image with dots
    combined_img.paste(heatmap_img, (width, bottom_image_y))  # Heatmap
    
    # Add GT image and calculate metrics
    gt_map = load_ground_truth(gt_path)
    metrics_text = "No GT provided"
    
    if gt_map is not None:
        if isinstance(gt_map, torch.Tensor):
            gt_img = transforms.ToPILImage()(gt_map.unsqueeze(0).repeat(3, 1, 1))
        else:
            gt_map_tensor = torch.tensor(gt_map)
            gt_img = transforms.ToPILImage()(gt_map_tensor.unsqueeze(0).repeat(3, 1, 1))
        combined_img.paste(gt_img, (width * 2, bottom_image_y))  # GT heatmap
        
        # Calculate metrics
        metrics = calculate_metrics(heatmap_tensor, gt_map)
        metrics_text = f"KLD: {metrics['KLD']:.4f} | SIM: {metrics['SIM']:.4f} | NSS: {metrics['NSS']:.4f}"
    else:
        # If no GT provided, create blank white image
        blank_img = Image.new('RGB', (width, height), 'white')
        combined_img.paste(blank_img, (width * 2, bottom_image_y))
        metrics_text = "ERRRR"
    
    # Draw metrics text at the bottom with background
    text_width = draw.textlength(metrics_text, font=font)
    text_x = (combined_width - text_width) // 2
    text_y = bottom_image_y + height + spacing
    
    # Draw white background for metrics text
    padding = 10
    draw.rectangle([text_x - padding, text_y - padding, 
                    text_x + text_width + padding, text_y + font_size + padding], 
                    fill='white', outline='gray')
    
    draw.text((text_x, text_y), metrics_text, fill='black', font=font)
    
    # Create res_images directory if it doesn't exist
    script_dir = os.path.dirname(os.path.abspath(current_dir ))
    if exo_type is None:
        res_dir = os.path.join(script_dir, f'dot_images')
        os.makedirs(res_dir, exist_ok=True)
        os.makedirs(os.path.join(res_dir, "with_exo"), exist_ok=True)
        os.makedirs(os.path.join(res_dir, "only_ego"), exist_ok=True)
    else:
        res_dir = os.path.join(script_dir, f'dot_images_{exo_type}')
        os.makedirs(res_dir, exist_ok=True)
        os.makedirs(os.path.join(res_dir, "with_exo"), exist_ok=True)
        os.makedirs(os.path.join(res_dir, f"{exo_type}"), exist_ok=True)
        os.makedirs(os.path.join(res_dir, "only_ego"), exist_ok=True)            
        
    # Generate output path if not provided
    if output_path is None:
        base_name = os.path.splitext(ego_filename)[0]
        ext = os.path.splitext(ego_filename)[1]
        if exo_img and exo_type:
            # Format: skis_002829_jump_exo_random.jpg or skis_002829_jump_exo_selected.jpg
            output_filename = f"{base_name}_{action}_exo_{exo_type}{ext}"
            output_path = os.path.join(res_dir, f"with_exo/{output_filename}")
        elif exo_img:
            # Fallback if exo_type not specified
            output_filename = f"{base_name}_{action}_exo_{exo_file_name}"
            output_path = os.path.join(res_dir, f"with_exo/{output_filename}")
        elif exo_type is not None:
            output_filename = f"{base_name}_{action}_exo_{exo_type}{ext}"
            output_path = os.path.join(res_dir, f"{exo_type}/{output_filename}")
        else:
            # Format: skis_002829_jump.jpg
            output_filename = f"{base_name}_{action}{ext}"
            output_path = os.path.join(res_dir, f"only_ego/{output_filename}")
    
    # Save the combined image
    combined_img.save(output_path)
    # print(f"✅ Saved comparison image with heatmap and GT: {output_path}")
    
    return output_path, heatmap_tensor

def draw_dots_on_single_image( image, dots, color='red', radius=15):
    """
    Draw dots on an image
    Args:
        image (PIL.Image): Image to draw on
        dots (list): List of dot coordinates [x, y]
        color (str): Color of the dots
        radius (int): Radius of the dots
    Returns:
        PIL.Image: Image with dots drawn
    """
    img_copy = image.copy()
    draw = ImageDraw.Draw(img_copy)
    
    for dot in dots:
        x, y = map(int, dot)
        # Draw circle
        draw.ellipse([x-radius, y-radius, x+radius, y+radius], 
                    fill=color, outline=color)
    
    return img_copy


def calculate_metrics(pred_heatmap, gt_map):
    """
    Calculate comparison metrics between predicted heatmap and GT (following original metric.py)
    Args:
        pred_heatmap (torch.Tensor): Predicted heatmap
        gt_map (torch.Tensor): Ground truth map
    Returns:
        dict: Dictionary containing KLD, SIM, and NSS metrics
    """
    # Ensure inputs are proper tensors
    if not isinstance(pred_heatmap, torch.Tensor):
        pred_heatmap = torch.tensor(pred_heatmap)
    if not isinstance(gt_map, torch.Tensor):
        gt_map = torch.tensor(gt_map)
    
    # Flatten tensors and add batch dimension for compatibility
    pred = pred_heatmap.flatten().float().unsqueeze(0)  # [1, H*W]
    gt = gt_map.flatten().float().unsqueeze(0)          # [1, H*W]
    
    eps = 1e-10
    
    # Calculate KLD following original implementation
    # Normalize to probability distributions
    pred_norm = pred / pred.sum(dim=1, keepdim=True)
    gt_norm = gt / gt.sum(dim=1, keepdim=True)
    pred_norm += eps
    kld = F.kl_div(pred_norm.log(), gt_norm, reduction="batchmean").item()
    
    # Calculate SIM following original implementation
    pred_sim = pred / pred.sum(dim=1, keepdim=True)
    gt_sim = gt / gt.sum(dim=1, keepdim=True)
    sim = torch.minimum(pred_sim, gt_sim).sum().item() / len(pred_sim)
    
    # Calculate NSS following original implementation
    # First normalize by max values
    pred_nss = pred / pred.max(dim=1, keepdim=True).values
    gt_nss = gt / gt.max(dim=1, keepdim=True).values
    
    # Calculate z-score for prediction
    std = pred_nss.std(dim=1, keepdim=True)
    u = pred_nss.mean(dim=1, keepdim=True)
    smap = (pred_nss - u) / (std + eps)
    
    # Create fixation map from GT
    fixation_map = (gt_nss - torch.min(gt_nss, dim=1, keepdim=True).values) / (
        torch.max(gt_nss, dim=1, keepdim=True).values - torch.min(gt_nss, dim=1, keepdim=True).values + eps)
    fixation_map = (fixation_map >= 0.1).float()
    
    # Calculate NSS
    nss_values = smap * fixation_map
    nss = nss_values.sum(dim=1) / (fixation_map.sum(dim=1) + eps)
    nss = nss.mean().item()
    
    return {
        'KLD': kld,
        'SIM': sim,
        'NSS': nss
    }

In [3]:
df_fin = pd.read_pickle('test_verify_qwen3_2b.pkl')
df_fin.loc[df_fin['final_dot'].apply(lambda x : len(x))==0,'final_dot']= df_fin.loc[df_fin['final_dot'].apply(lambda x : len(x))==0,'dots']
df_fin

Unnamed: 0,action,object,filename,dots,veri_result,veri_reason,final_dot
0,jump,skis,skis_002829.jpg,"[[280, 490], [720, 490], [280, 900], [720, 900]]","[Fail, Fail, Fail, Fail]","[The query point (280,490) is on the upper par...","[[280, 490], [720, 490], [280, 900], [720, 900]]"
1,jump,skateboard,skateboard_002387.jpg,"[[500, 350], [350, 650], [650, 650]]","[Fail, Fail, Fail]","[The query point (500,350) is on the deck of t...","[[500, 350], [350, 650], [650, 650]]"
2,jump,surfboard,surfboard_000658.jpg,"[[200, 200], [500, 500], [800, 800]]","[Fail, Fail, Fail]","[The point (200,200) is on the deck of the sur...","[[200, 200], [500, 500], [800, 800]]"
3,jump,snowboard,snowboard_001704.jpg,"[[250, 200], [500, 500], [750, 200]]","[Fail, Fail, Fail]","[The query point (250,200) is on the top surfa...","[[250, 200], [500, 500], [750, 200]]"


In [4]:
# Get total number of samples
total_samples = len(df_fin)
metrics_tracker_ego = MetricsTracker(name="only_ego")
# Process each sample
print(f"Processing {total_samples} samples...")
print("=" * 50)    
for idx, row in df_fin.iterrows():
    object_name = row['object']
    action = row['action']
    filename = row['filename']
    dot_list =  row['dots']
    image_path = f"{AGD20K_PATH}/Seen/testset/egocentric/{action}/{object_name}/{filename}"
    gt_path = get_gt_path(image_path) 
    image_name = image_path.split('/')[-1]
    print(f"Action : {action}, Object : {object_name} image_name : {image_name}")
    final_dots = row['final_dot']
    print(f"parsed dots!!! : {final_dots}")

    raw_image = Image.open(image_path).convert("RGB")
    orig_width, orig_height = raw_image.size

    dots = [
                [int(x * (orig_width / 1000)), int(y * (orig_height / 1000))] 
                for x, y in final_dots
            ]
    print(f"restored_dots!!! : {dots}")

    dot_image_path, heatmap_tensor = draw_dots_on_image(image_path, dots, gt_path, action)

    # Save heatmap image
    script_dir = os.path.dirname(os.path.abspath(current_dir))
    res_dir = os.path.join(script_dir, f'dot_images', 'heatmaps')
    os.makedirs(res_dir, exist_ok=True)
    
    base_name = os.path.splitext(os.path.basename(image_path))[0]
    ext = os.path.splitext(image_path)[1]
    heatmap_filename = f"{base_name}_{action}_heatmap{ext}"
    heatmap_path = os.path.join(res_dir, heatmap_filename)
    
    # Convert heatmap tensor to image and save
    heatmap_img = transforms.ToPILImage()(heatmap_tensor.unsqueeze(0).repeat(3, 1, 1))
    heatmap_img.save(heatmap_path)

    # Save dot image separately for validation  
    dot_res_dir = os.path.join(script_dir, f'dot_images', 'dots_only')
    os.makedirs(dot_res_dir, exist_ok=True)
    dot_only_filename = f"{base_name}_{action}_dots{ext}"
    dot_only_path = os.path.join(dot_res_dir, dot_only_filename)
        
    # Create dot image (ego image with dots)
    ego_img = Image.open(image_path)
    dot_only_img = draw_dots_on_single_image(ego_img, dots, color='red', radius=15)
    dot_only_img.save(dot_only_path)
    
    # Calculate metrics if GT is available
    metrics = None
    gt_map = load_ground_truth(gt_path)
    if gt_map is not None and len(dots) > 0:
        metrics = calculate_metrics(heatmap_tensor, gt_map)
    
    dot_images_res =  {
        'dots': dots,
        'dot_image_path': dot_image_path,
        'dot_only_image_path': dot_only_path,
        'heatmap_image_path': heatmap_path,
        'heatmap_tensor': heatmap_tensor,
        'metrics': metrics,
    }

    metrics_ego = dot_images_res['metrics']

    if metrics_ego:
        # Update and print metrics
        metrics_tracker_ego.update(metrics_ego)
        metrics_tracker_ego.print_metrics(metrics_ego, image_path.split('/')[-1])

Processing 4 samples...
Action : jump, Object : skis image_name : skis_002829.jpg
parsed dots!!! : [[280, 490], [720, 490], [280, 900], [720, 900]]
restored_dots!!! : [[157, 735], [403, 735], [157, 1350], [403, 1350]]

Metrics for only_ego skis_002829.jpg:
 only_ego Current - KLD: 1.1155 | SIM: 0.3736 | NSS: 1.4524

Cumulative only_ego  Averages over 1 samples:
Average - KLD: 1.1155 | SIM: 0.3736 | NSS: 1.4524

Action : jump, Object : skateboard image_name : skateboard_002387.jpg
parsed dots!!! : [[500, 350], [350, 650], [650, 650]]
restored_dots!!! : [[750, 332], [525, 617], [975, 617]]

Metrics for only_ego skateboard_002387.jpg:
 only_ego Current - KLD: 3.6953 | SIM: 0.3071 | NSS: 0.5182

Cumulative only_ego  Averages over 2 samples:
Average - KLD: 2.4054 | SIM: 0.3404 | NSS: 0.9853

Action : jump, Object : surfboard image_name : surfboard_000658.jpg
parsed dots!!! : [[200, 200], [500, 500], [800, 800]]
restored_dots!!! : [[160, 300], [401, 750], [642, 1200]]

Metrics for only_ego s

In [66]:
metrics_tracker_ego.get_averages()

{'KLD': 1.638624616398299,
 'SIM': 0.48650581333478377,
 'NSS': 1.5015618012109695}