In [None]:
dataset = {
    0: "./inpainting_dataset",  # parent folder expected to contain 'inpainted' and 'masks'
    1: "./trends_cv_data"        # parent folder; should have 'inpainted' and 'masks'
}

In [None]:
demonstrative = True
# Parameters for anomaly scoring visualization
model_checkpoint = "lmms-lab/llava-onevision-qwen2-0.5b-ov"  # or local path
size = "05b"  # or "7b"
images_dir = dataset[int(demonstrative)]  # this is now the PARENT folder
pattern = "*.*"  # glob pattern
max_side = 1024
show_n = 12  # number of images to visualize (sorting functionality retained)
sort_by_score = True  # if True, show top-N highest scores; else first N
images_dir

In [None]:
#open json file

import json
with open('config.json') as f:
    config = json.load(f)

image_grid_pinpoints = config["image_grid_pinpoints"]
image_aspect_ratio = config["image_aspect_ratio"]

In [None]:

from glob import glob
from PIL import Image
import os, torch
from custom_anomaly_detector import OVAnomalyDetector, build_ov_anomaly_detector
from utils_trends.image_processing import process_images
from llava.model.anomaly_expert import AnomalyOV
from utils_trends.vulnerability_map import predict_with_model, adversarial_recompute
from utils_trends.visualization import visualize_anomaly_gradcam
from utils_trends.metrics import compute_mask_anomaly_metrics, MetricsAverages

# Derive inpainted and masks subfolders from parent
inpainted_dir = os.path.join(images_dir, 'inpainted')
masks_dir = os.path.join(images_dir, 'masks')
if not os.path.isdir(inpainted_dir):
    raise SystemExit(f"Inpainted folder not found: {inpainted_dir}")
if not os.path.isdir(masks_dir):
    print(f"Warning: Masks folder not found: {masks_dir}. Mask visualization will be skipped.")

device = "cpu"
# Load model
model_name = "llava_qwen"
"""
vision_tower_name: str = "google/siglip-so400m-patch14-384",
    anomaly_expert_path: str = "./pretrained_expert_7b.pth",
    mm_projector_type: str = "mlp2x_gelu",
    mm_hidden_size: int = 1152,
    hidden_size: int = 896,
    device: str = "cuda",
    dtype: Any | None = None,
    load_pretrained_projector: Any | None = None,
    device_map: str = "auto"
"""
# model, image_processor = build_ov_anomaly_detector(
#     vision_tower_name="google/siglip-so400m-patch14-384",
#     anomaly_expert_path='./pretrained_expert_05b.pth' if size != '7b' else './pretrained_expert_7b.pth',
#     mm_projector_type="mlp2x_gelu",
#     mm_hidden_size=1152,
#     hidden_size=896,
#     device=device,
#     dtype=torch.float32,
#     device_map="auto"
# )

# model.save_checkpoint("./zs_checkpoint.pt")
# model, image_processor = OVAnomalyDetector.load_from_checkpoint("./checkpoints/checkpoint_epoch_1.pt", device=device)
model, image_processor = OVAnomalyDetector.load_from_checkpoint("./zs_checkpoint.pt", device=device)

model.eval()



# Collect images ONLY from inpainted subfolder now
image_paths = sorted([
    p for p in glob(os.path.join(inpainted_dir, '**', pattern), recursive=True)
    if os.path.isfile(p)
])
print(f"Found {len(image_paths)} inpainted images in {inpainted_dir}")
if not image_paths:
    raise SystemExit("No inpainted images found. Adjust parent folder or pattern.")

In [None]:
# Sampling removed: use ALL inpainted images
image_paths_subset = image_paths  # keep variable name for downstream compatibility
print(f"Using all {len(image_paths_subset)} images for scoring and mask visualization.")

# Optional: quick check of mask correspondence count
missing_masks = 0
for p in image_paths_subset:
    mp = os.path.join(masks_dir, os.path.basename(p))
    if not os.path.isfile(mp):
        missing_masks += 1
if missing_masks:
    print(f"Warning: {missing_masks} masks missing out of {len(image_paths_subset)} images.")

In [None]:
def get_image(img_path:str, device, max_side=1024):
    img = Image.open(img_path).convert('RGB')
    # Load corresponding mask (single full-image mask)
    mask_path = os.path.join(masks_dir, os.path.basename(img_path))
    mask_img = Image.open(mask_path).convert('L') if os.path.isfile(mask_path) else None

    if max(img.size) > max_side:
        if img.width > img.height:
            new_w = max_side
            new_h = int(max_side * img.height / img.width)
        else:
            new_h = max_side
            new_w = int(max_side * img.width / img.height)
        img = img.resize((new_w, new_h))
    image_tensor = process_images([img], image_processor, image_aspect_ratio, image_grid_pinpoints).to(device)
    mask_tensor = process_images([mask_img], image_processor, image_aspect_ratio, image_grid_pinpoints).to(device)
    return image_tensor, mask_tensor


In [None]:
# Compute anomaly scores for ALL inpainted images now (sampling removed)
import numpy as np
scores = []
metrics_results = []  # store per-image metrics
break_at = 3

# Metrics aggregator class


metrics_agg_original = MetricsAverages()  # For original anomaly maps
metrics_agg_output = MetricsAverages()    # For output difference maps

# Attack hyperparameters
attack_epsilon = 0.05  # norm bound
attack_top_k = 0.50    # fraction for M_topk
# ['fgsm', 'pgd', 'deepfool']
attack_noise_mode = 'pgd'  # 'random' or 'structured'

for idx, img_path in enumerate(image_paths_subset, 1):
    image_tensor, mask_tensor = get_image(img_path, device)
    print(f"mask shape: {mask_tensor.shape}")
    W, H = image_tensor.shape[3:]

    print(f"width: {W}, height: {H}, device: {image_tensor.device}")
    with torch.no_grad():
        score, attn_maps, anomaly_maps, vuln_map, image_tensor = predict_with_model(
            model=model,
            image_tensor=image_tensor,
            device=device,
            max_side=1024,
            anomaly_map_size=(W,H),
        )
    print(f"Processed image tensor shape: {image_tensor.shape}")

    # Compute metrics on ORIGINAL anomaly map
    metrics_original = compute_mask_anomaly_metrics(
        anomaly_maps=anomaly_maps,
        mask_image=mask_tensor,
        top_k=0.50,
        aggregate='mean',
        white_threshold=None,
        inpainted_is_white=True,
    )
    metrics_agg_original.update(metrics_original)

    # Adversarial attack + recompute anomaly map
    attacked_image, anomaly_maps_adv, output_map, adv_score = adversarial_recompute(
        model=model,
        image_tensor=image_tensor,
        original_anomaly_maps=anomaly_maps,
        device=device,
        epsilon=attack_epsilon,
        top_k=attack_top_k,
        noise_mode=attack_noise_mode,
    )

    # Compute metrics on OUTPUT difference map
    metrics_output = compute_mask_anomaly_metrics(
        anomaly_maps=output_map,
        mask_image=mask_tensor,
        top_k=0.50,
        aggregate='mean',
        white_threshold=None,
        inpainted_is_white=True,
    )
    metrics_results.append((img_path, {'original': metrics_original, 'output': metrics_output}))
    metrics_agg_output.update(metrics_output)

    # Prepare metrics label with two decimals (showing both)
    def f2(x):
        return 'nan' if (x is None or (isinstance(x, float) and np.isnan(x))) else f"{float(x):.2f}"
    
    metrics_label = (
        f"Original: IoU={f2(metrics_original.get('iou_topk'))} Mass={f2(metrics_original.get('mass_frac'))} "
        f"ROC={f2(metrics_original.get('roc_auc'))} PR={f2(metrics_original.get('pr_auc'))} | "
   
        f"Output Δ: IoU={f2(metrics_output.get('iou_topk'))} Mass={f2(metrics_output.get('mass_frac'))} "
        f"ROC={f2(metrics_output.get('roc_auc'))} PR={f2(metrics_output.get('pr_auc'))} "
        f"score={score:.4f} adv_Score={adv_score:.4f}"
    )
    print(f"Metrics for image {idx}:")
    print(f"  Original anomaly: IoU={f2(metrics_original.get('iou_topk'))}, Mass={f2(metrics_original.get('mass_frac'))}, ROC={f2(metrics_original.get('roc_auc'))}, PR={f2(metrics_original.get('pr_auc'))}")
    print(f"  Output Δ map:     IoU={f2(metrics_output.get('iou_topk'))}, Mass={f2(metrics_output.get('mass_frac'))}, ROC={f2(metrics_output.get('roc_auc'))}, PR={f2(metrics_output.get('pr_auc'))}")

    # Updated visualization with Adv Anomaly and Output |Δ| Map (single metrics label)
    fig = visualize_anomaly_gradcam(
        image_tensor,
        anomaly_maps,
        vuln_maps=vuln_map if len(vuln_map) > 0 else None,
        mask_image=mask_tensor,
        anomaly_maps_adv=anomaly_maps_adv,
        output_maps=output_map,
        alpha=0.5,
        metrics_label=metrics_label,
    )
    if idx == break_at:
        break
    scores.append((img_path, score))
    if idx % 20 == 0:
        print(f"[{idx}] {img_path} -> {score:.6f}")

print(f"\nScored {len(scores)} images")
scores_array = np.array([[s[0], s[1]] for s in scores], dtype=object)
print("\n=== Average Metrics ===")
print("Original anomaly maps:", str(metrics_agg_original))
print("Output difference maps:", str(metrics_agg_output))

In [None]:
# # Example usage:
# # Load a test image
# test_img_path = image_paths_subset[0] if image_paths_subset else image_paths[0]
# test_img = Image.open(test_img_path).convert('RGB')

# # Process the image
# image_tensor = process_images([test_img], image_processor, model.config)
# print(f"Image tensor shape: {image_tensor.shape}")

# # Get attention maps and anomaly maps from the model
# _, _, final_preds, attn_maps, anomaly_maps = model.get_anomaly_fetures_from_images(
#     image_tensor, 
#     with_attention_map=True,
#     anomaly_map_size=(350, 350)
# )

# print(f"Attention maps shape: {attn_maps.shape}")
# print(f"Anomaly maps shape: {anomaly_maps.shape}")

# # Visualize everything
# fig = visualize_attention_and_anomaly_maps(image_tensor, attn_maps, anomaly_maps)
# plt.show()