In [1]:
import torch
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from transformers import AutoImageProcessor, AutoModel
from transformers.image_utils import load_image
from sklearn.decomposition import PCA
import math
from sklearn.decomposition import PCA

device = "cuda" if torch.cuda.is_available() else "cpu"
processor = AutoImageProcessor.from_pretrained("facebook/dinov3-vitb16-pretrain-lvd1689m", output_attentions=True)
model = AutoModel.from_pretrained("facebook/dinov3-vitb16-pretrain-lvd1689m").to(device)  # dinov3-vitb16-pretrain-lvd1689m
patch_size = model.config.patch_size
num_register_tokens = model.config.num_register_tokens

# === Extract patch features from image ===
def get_dino_features(image_path):
    image = load_image(image_path)
    original = image.resize((224, 224))
    inputs = processor(images=image, return_tensors="pt").to(device)

    with torch.inference_mode():
        outputs = model(**inputs)

    last_hidden = outputs.last_hidden_state
    _, _, H, W = inputs.pixel_values.shape
    num_patches_h, num_patches_w = H // patch_size, W // patch_size

    patch_tokens = last_hidden[:, 1 + num_register_tokens:, :]
    patch_grid = patch_tokens.unflatten(1, (num_patches_h, num_patches_w))
    patch_grid = patch_grid.view(1, -1, 768)
    return patch_grid, original.size

Missing colon in file PosixPath('/home/bongo/anaconda3/lib/python3.11/site-packages/matplotlib/mpl-data/matplotlibrc'), line 263 (' sans-serif')
2025-10-08 14:17:55.079810: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-10-08 14:17:55.107810: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2025-10-08 14:17:55.107831: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-10-08 14:17:55.108651: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting t

ValueError: Unrecognized image processor in facebook/dinov3-vitb16-pretrain-lvd1689m. Should have a `image_processor_type` key in its preprocessor_config.json of config.json, or one of the following `model_type` keys in its config.json: align, beit, bit, blip, blip-2, bridgetower, chinese_clip, clip, clipseg, conditional_detr, convnext, convnextv2, cvt, data2vec-vision, deformable_detr, deit, deta, detr, dinat, dinov2, donut-swin, dpt, efficientformer, efficientnet, flava, focalnet, git, glpn, groupvit, idefics, imagegpt, instructblip, layoutlmv2, layoutlmv3, levit, mask2former, maskformer, mgp-str, mobilenet_v1, mobilenet_v2, mobilevit, mobilevitv2, nat, oneformer, owlvit, perceiver, pix2struct, poolformer, pvt, regnet, resnet, sam, segformer, swiftformer, swin, swin2sr, swinv2, table-transformer, timesformer, tvlt, upernet, van, videomae, vilt, vit, vit_hybrid, vit_mae, vit_msn, xclip, yolos

In [2]:
from torchvision import transforms
import torch.nn.functional as F

def load_ground_truth( gt_path):
    """
    Load and process ground truth image
    Args:
        gt_path (str): Path to the ground truth image
    Returns:
        torch.Tensor: Processed ground truth tensor normalized to [0, 1]
    """
    try:
        # Load the ground truth image
        gt_img = Image.open(gt_path)

        # Convert to grayscale if image is RGB
        if gt_img.mode == 'RGB':
            gt_img = gt_img.convert('L')

        # Convert to tensor
        gt_tensor = transforms.ToTensor()(gt_img).squeeze(0)

        # Normalize to [0, 1]
        if gt_tensor.max() > 0:
            gt_tensor = (gt_tensor - gt_tensor.min()) / (gt_tensor.max() - gt_tensor.min())

        return gt_tensor
    except Exception as e:
        print(f"⚠️ Failed to load ground truth image: {str(e)}")
        return None    

In [3]:
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import torch
import timm
import torchvision.transforms as T
from sklearn.cluster import KMeans
from VLM_model_dot import  MetricsTracker

import os
from file_managing import (
    load_selected_samples,
    get_actual_path,
    get_gt_path,
)


In [4]:
def calculate_metrics( pred_heatmap, gt_map):
    """
    Calculate comparison metrics between predicted heatmap and GT (following original metric.py)
    Args:
        pred_heatmap (torch.Tensor): Predicted heatmap
        gt_map (torch.Tensor): Ground truth map
    Returns:
        dict: Dictionary containing KLD, SIM, and NSS metrics
    """
    # Ensure inputs are proper tensors
    if not isinstance(pred_heatmap, torch.Tensor):
        pred_heatmap = torch.tensor(pred_heatmap)
    if not isinstance(gt_map, torch.Tensor):
        gt_map = torch.tensor(gt_map)

    # Flatten tensors and add batch dimension for compatibility
    pred = pred_heatmap.flatten().float().unsqueeze(0)  # [1, H*W]
    gt = gt_map.flatten().float().unsqueeze(0)          # [1, H*W]

    eps = 1e-10

    # Calculate KLD following original implementation
    # Normalize to probability distributions
    pred_norm = pred / pred.sum(dim=1, keepdim=True)
    gt_norm = gt / gt.sum(dim=1, keepdim=True)
    pred_norm += eps
    kld = F.kl_div(pred_norm.log(), gt_norm, reduction="batchmean").item()

    # Calculate SIM following original implementation
    pred_sim = pred / pred.sum(dim=1, keepdim=True)
    gt_sim = gt / gt.sum(dim=1, keepdim=True)
    sim = torch.minimum(pred_sim, gt_sim).sum().item() / len(pred_sim)

    # Calculate NSS following original implementation
    # First normalize by max values
    pred_nss = pred / pred.max(dim=1, keepdim=True).values
    gt_nss = gt / gt.max(dim=1, keepdim=True).values

    # Calculate z-score for prediction
    std = pred_nss.std(dim=1, keepdim=True)
    u = pred_nss.mean(dim=1, keepdim=True)
    smap = (pred_nss - u) / (std + eps)

    # Create fixation map from GT
    fixation_map = (gt_nss - torch.min(gt_nss, dim=1, keepdim=True).values) / (
        torch.max(gt_nss, dim=1, keepdim=True).values - torch.min(gt_nss, dim=1, keepdim=True).values + eps)
    fixation_map = (fixation_map >= 0.1).float()

    # Calculate NSS
    nss_values = smap * fixation_map
    nss = nss_values.sum(dim=1) / (fixation_map.sum(dim=1) + eps)
    nss = nss.mean().item()

    return {
        'KLD': kld,
        'SIM': sim,
        'NSS': nss
    }


def load_vlm_heatmap(heatmap_path: str, target_size: tuple) -> np.ndarray:
    """저장된 VLM 히트맵을 불러옵니다."""
    heatmap_img = Image.open(heatmap_path).convert('L')
    heatmap_img = heatmap_img.resize(target_size, resample=Image.Resampling.BILINEAR)
    heatmap_array = np.array(heatmap_img).astype(np.float32) / 255.0
    return heatmap_array

def cluster_and_select_affordance(
    dino_patch_tokens: torch.Tensor, 
    vlm_heatmap: np.ndarray, 
    original_image_size: tuple, 
    n_clusters: int = 5
) -> np.ndarray:
    """
    DINO 특징을 클러스터링하고, VLM 히트맵을 이용해 affordance 클러스터를 선택합니다.
    """
    features = dino_patch_tokens.squeeze(0).cpu().numpy() # 예: (1369, 768)

    # ✨ 해결책: 패치 개수로부터 h, w를 동적으로 계산
    num_patches = features.shape[0]
    h = w = int(np.sqrt(num_patches))
    
    # --- 1. DINO 특징 벡터에 K-Means 클러스터링 적용 ---
#     print(f"Performing K-Means clustering with K={n_clusters}...")
    features = dino_patch_tokens.squeeze(0).cpu().numpy() # (196, 768)
    kmeans = KMeans(n_clusters=n_clusters, random_state=0, n_init='auto').fit(features)
    cluster_ids = kmeans.labels_ # 각 패치가 속한 클러스터 ID (0~4)
    
    cluster_map = cluster_ids.reshape(h, w) # (14, 14)

    # --- 2. VLM 앵커 포인트가 속한 클러스터 ID 식별 ---
    # VLM 히트맵에서 가장 값이 높은 지점(앵커)의 좌표를 찾습니다.
    anchor_y_px, anchor_x_px = np.unravel_index(np.argmax(vlm_heatmap), vlm_heatmap.shape)
    
    # 앵커 좌표를 클러스터 맵 크기(14x14)에 맞게 스케일링
    anchor_y_map = int(anchor_y_px / original_image_size[1] * h)
    anchor_x_map = int(anchor_x_px / original_image_size[0] * w)
    
    # 앵커가 속한 클러스터 ID를 찾습니다.
    target_cluster_id = cluster_map[anchor_y_map, anchor_x_map]
#     print(f"VLM anchor belongs to Cluster ID: {target_cluster_id}")
    
    # --- 3. 최종 히트맵(마스크) 생성 ---
    # 타겟 클러스터에 속하는 모든 픽셀을 1로, 나머지를 0으로 설정
    final_mask_small = (cluster_map == target_cluster_id).astype(np.float32)
    
    # 원본 이미지 크기로 리사이즈하여 최종 히트맵 생성
    final_heatmap = np.array(Image.fromarray(final_mask_small).resize(original_image_size, resample=Image.Resampling.NEAREST))
    
    return final_heatmap, cluster_map

In [None]:
metrics_tracker_dino = MetricsTracker(name="only_ego")

json_path = os.path.join("selected_samples.json")
data = load_selected_samples(json_path)
missing_gt = 0
processed_count = 0

# Get total number of samples
total_samples = len(data['selected_samples'])

# Process each sample
print(f"Processing {total_samples} samples...")
print("=" * 50)    
for pair_key, sample_info in data["selected_samples"].items():
    processed_count += 1
    print(f"--- Start  {processed_count}  / {total_samples}", "-"*80) 
    original_image_path = sample_info['image_path'].replace("${AGD20K_PATH}",'/home/DATA/AGD20K')
    
    file_name = os.path.basename(sample_info['image_path'])
    action_name = sample_info['image_path'].split('/')[4]
    if file_name.count("_") ==1:
        item_name = file_name.split("_")[0]
    else:
        item_name = file_name.split("_")[0] + "_" + file_name.split("_")[1]
    AGD20K_PATH = '/home/DATA/AGD20K'
    vlm_heatmap_path = f"/home/bongo/porter_notebook/research/new_qwen_AG/32B_dino_1_power2/heatmaps/{file_name.split('.')[0]}_{action_name}_heatmap.jpg"
    gt_path =  f"{AGD20K_PATH}/Seen/testset/GT/{action_name}/{item_name}/{file_name.split('.')[0]}.png"
    dot_path = f"/home/bongo/porter_notebook/research/new_qwen_AG/32B_dino_1_power2/dots_only//{file_name.split('.')[0]}_{action_name}_dots.jpg"
    print(item_name, action_name, file_name)
    output_path = f"dino_fusion/{file_name.split('.')[0]}_{action_name}.png"
    # --- 2. VLM 히트맵 로드 및 DINO 특징 추출 ---
    original_image = Image.open(original_image_path).convert('RGB')
    dot_image = Image.open(dot_path).convert('RGB')
    vlm_heatmap_image =  Image.open(vlm_heatmap_path).convert('RGB')
#     print("Loading VLM heatmap...")
    vlm_heatmap = load_vlm_heatmap(vlm_heatmap_path, original_image.size)

    # PCA Guide
    heatmap_4d = torch.from_numpy(vlm_heatmap).float().unsqueeze(0).unsqueeze(0)
    guidance_heatmap_14x14 = F.adaptive_avg_pool2d(heatmap_4d, (14, 14)).squeeze()
    # 2. 가이드 히트맵을 기반으로 마스크 생성
    flat_heatmap = guidance_heatmap_14x14.flatten() # 1D로 변환 (196,)
    threshold = 0.5 # 활성화 기준으로 삼을 임계값 (조정 필요)
    mask = flat_heatmap > threshold


#     print("Extracting DINO features...")
    dino_patch_tokens, _ = get_dino_features(original_image_path)

    # 3. 마스크를 사용해 관심 영역의 패치만 선택
    all_patches = dino_patch_tokens.cpu().reshape(-1, 768) # (196, 768)
    selected_patches = all_patches[mask]


    ## PCA !!!
    # patch_grid가 (196, 768) 또는 (1, 196, 768) 모양이라고 가정
    X = dino_patch_tokens.cpu().reshape(-1, 768).numpy()
    # 1️⃣ n_components를 1로 변경
    pca = PCA(n_components=1)
    X1 = pca.fit_transform(selected_patches) # 이제 X1의 shape은 (196, 1)이 됩니다.
    X1_normalized = (X1 - X1.min()) / (X1.max() - X1.min())

    # --- 시각화 ---
    # 전체 히트맵을 담을 빈 배열 생성
    heatmap = np.zeros_like(flat_heatmap, dtype=float)
    
    # 선택된 위치에만 PCA 결과 값을 다시 채워 넣기
    heatmap[mask] = X1_normalized.flatten()
    

    # 1D 맵을 2D 히트맵 형태로 변경
    # heatmap = X1_normalized.reshape(14, 14)
    # np.kron을 사용해 히트맵 확대
    dino_attention_heatmap =np.array(Image.fromarray(heatmap).resize(original_image.size, resample=Image.Resampling.BILINEAR))

    # with torch.no_grad():
    #     features = dino_patch_tokens.squeeze(0).cpu().numpy() # 예: (1369, 768)

    #     # ✨ 해결책: 패치 개수로부터 h, w를 동적으로 계산
    #     num_patches = features.shape[0]
    #     h = w = int(np.sqrt(num_patches))

    #     # 패치 토큰의 norm을 사용하여 어텐션 맵 계산
    #     attn_map = torch.norm(dino_patch_tokens, dim=-1).reshape(h, w)
    #     # 0~1 범위로 정규화
    #     attn_map = (attn_map - attn_map.min()) / (attn_map.max() - attn_map.min())
    #     # 원본 이미지 크기로 리사이즈 후 1에서 빼서 값을 반전시킴
    #     dino_attention_heatmap = np.array(Image.fromarray(attn_map.squeeze(0).cpu().numpy()).resize(original_image.size, resample=Image.Resampling.BILINEAR))


    vlm_heatmap = vlm_heatmap + vlm_heatmap.mean()*0.75
    weighted_dino_heatmap = dino_attention_heatmap
    vlm_fused_heatmap = vlm_heatmap * weighted_dino_heatmap
    

    
    # Calculate metrics if GT is available
    metrics = None
    gt_map = load_ground_truth(gt_path)
    if gt_map is not None:
        metrics_dino  = calculate_metrics(vlm_fused_heatmap, gt_map)
        metrics_tracker_dino.update(metrics_dino)
    else:
        print("NO GT!!!")
        continue
    metrics_tracker_dino.print_metrics(metrics_dino, vlm_heatmap_path.split('/')[-1])
    
    # --- 4. 결과 시각화 ---
    # ✨ 레이아웃을 1x4에서 1x5로 변경하고, figsize을 조정합니다.
    fig, ax = plt.subplots(1, 6, figsize=(25, 5))

    # --- Plot 1: 원본 이미지 (ax[0]) ---
    ax[0].imshow(original_image)
    ax[0].set_title('Original Image')
    ax[0].axis('off')

    # --- ✨ Plot 5: 최종 퓨전 히트맵 (기존 ax[3] -> ax[4]로 이동) ---
    ax[1].imshow(dot_image)
    ax[1].set_title('Dot image')
    ax[1].axis('off')
    
    # --- Plot 2: dot 히트맵 (ax[1]) ---
    ax[2].imshow(original_image)
    ax[2].imshow(vlm_heatmap_image , cmap='jet', alpha=0.5)
    ax[2].set_title('dot (Input)')
    ax[2].axis('off')

    # --- Plot 2: VLM 히트맵 (ax[1]) ---
    ax[3].imshow(original_image)
    ax[3].imshow(vlm_fused_heatmap, cmap='jet', alpha=0.5)
    ax[3].set_title('VLM Heatmap (Input)')
    ax[3].axis('off')
    
    # --- ✨ Plot 3: DINO 원본 히트맵 (새로 추가된 부분) ---
    # 이 dino_attention_heatmap 변수는 클러스터링 전에 미리 계산해 두어야 합니다.
    # (예: dino_attention_heatmap = generate_dino_heatmap(original_image_path, dino_model) )
    ax[4].imshow(original_image)
    ax[4].imshow(dino_attention_heatmap, cmap='jet', alpha=0.5)
    ax[4].set_title('DINO Heatmap (Attention)')
    ax[4].axis('off')


    # --- ✨ Plot 5: 최종 퓨전 히트맵 (기존 ax[3] -> ax[4]로 이동) ---
    ax[5].imshow(original_image)
    ax[5].imshow(gt_map, cmap='jet', alpha=0.5)
    ax[5].set_title('GT')
    ax[5].axis('off')
#     전체 레이아웃 정리 및 출력
    plt.tight_layout()
    plt.show()

    # # --- 5. 최종 퓨전 히트맵 이미지로 저장 ---
    # fused_heatmap_img = Image.fromarray((fused_heatmap * 255).astype(np.uint8))
    # fused_heatmap_img.save(output_path)
    # print(f"Fused heatmap saved to {output_path}")
    # break


Processing 123 samples...
--- Start  1  / 123 --------------------------------------------------------------------------------
skis jump skis_002829.jpg


NameError: name 'X1_selected' is not defined

In [11]:
np.shape(X)

(196, 768)

In [14]:
mask.sum()

tensor(54)

In [12]:
np.shape(all_patches)

torch.Size([196, 768])

In [None]:
np.shape(selected_patches)

torch.Size([54, 768])

In [9]:
np.shape(X1_normalized)

(54, 1)

In [None]:
vit b 16 result
Cumulative only_ego  Averages over 121 samples:
Average - KLD: 1.3131 | SIM: 0.3883 | NSS: 1.1577
==================================================