In [1]:
import torch
import numpy as np
from PIL import Image
import os
import sys
from tqdm.notebook import tqdm # Or from tqdm import tqdm
import matplotlib.pyplot as plt
from scipy.spatial.distance import cosine as cosine_distance
import lpips # Still needed for visual distance

from transformers import (
    CLIPProcessor, CLIPModel,
    SiglipProcessor, SiglipModel,
    BlipProcessor, BlipModel, BlipVisionModel, # Add BLIP classes
    AutoProcessor, AutoModel # Generic loaders can sometimes work but explicit is safer
)
# --- End Import ---

import collections
import re
import yaml
import argparse

In [2]:
CLIP_MODEL_ID = "openai/clip-vit-base-patch16"
Siglip_MODEL_ID = "google/siglip-base-patch16-512"
BLIP_MODEL_ID = "Salesforce/blip-image-captioning-base" 
ViT_MODEL_ID = "google/vit-base-patch16-224" # For ViT
LPIPS_NET_TYPE = "vgg"

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
clip_model = AutoModel.from_pretrained(CLIP_MODEL_ID).to(device)
clip_processor = AutoProcessor.from_pretrained(CLIP_MODEL_ID)

siglip_model = AutoModel.from_pretrained(Siglip_MODEL_ID).to(device)
siglip_processor = AutoProcessor.from_pretrained(Siglip_MODEL_ID)

blip_model = AutoModel.from_pretrained(BLIP_MODEL_ID).to(device)
blip_processor = AutoProcessor.from_pretrained(BLIP_MODEL_ID)

ViT_model = AutoModel.from_pretrained(ViT_MODEL_ID).to(device)
processor = AutoProcessor.from_pretrained(ViT_MODEL_ID)

Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.
`BlipModel` is going to be deprecated in future release, please use `BlipForConditionalGeneration`, `BlipForQuestionAnswering` or `BlipForImageTextRetrieval` depending on your usecase.
Some weights of BlipModel were not initialized from the model checkpoint at Salesforce/blip-image-captioning-base and are newly initialized: ['logit_scale', 'text_model.embeddings.LayerNorm.bias', 'text_model.embeddings.LayerNorm.weight', 'text_model.embeddings.position_embeddings.weight', 'text_model.embeddings.word_embeddings.weight', 'text_model.encoder.layer.0.attention.output.LayerNorm.bias', 'text_model.encoder.layer.0.attention.output.LayerNorm.weight', 'text_model.encoder.layer.

In [4]:
def get_image_embedding(image_path, model, processor, model_type, device, VISION_MODEL_ID):
    """
    Loads image and extracts image embedding using the provided model and processor.
    Handles differences between CLIP, SigLIP, and BLIP.
    """
    try:
        image = Image.open(image_path).convert("RGB")
        # --- Preprocessing ---
        # Processors might handle inputs slightly differently.
        # BlipProcessor might require separate image/text handling if used for multimodal tasks,
        # but for image-only, passing `images=` should work.
        # Use pixel_values common key if possible
        inputs = processor(images=image, return_tensors="pt", padding=True).to(device)

        with torch.no_grad():
            if model_type == "CLIP" or model_type == "SigLIP" or model_type == "BLIP":
                image_features = model.get_image_features(**inputs)
            elif model_type == "google/vit-base-patch16-224" or model_type == "ViT":
                # Example of a specific model type that might need different handling
                # Assuming this is a ViT model, we can use the processor directly
                image_features = model(**inputs).last_hidden_state[:, 0, :] # CLS token
            else:
                raise ValueError(f"Unknown model_type '{model_type}' for feature extraction")

            # --- Normalization ---
            # Apply L2 normalization for consistency when using cosine distance
            if image_features is not None and isinstance(image_features, torch.Tensor):
                image_features = image_features / (image_features.norm(p=2, dim=-1, keepdim=True) + 1e-6) # Add epsilon for stability
            else:
                # Handle case where feature extraction failed
                print(f"Warning: image_features are None or not a Tensor for {image_path}. Skipping normalization.")
                return None # Return None if features couldn't be extracted

        return image_features.cpu().numpy().squeeze()

    except Exception as e:
        # Include model type in error for easier debugging
        print(f"Error processing {image_path} with {model_type} model ({VISION_MODEL_ID}): {e}")
        # Optionally raise e # Uncomment to stop execution on first error
        return None

In [5]:
import os
import numpy as np
from pathlib import Path
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
import matplotlib.cm as cm # For color mapping for sizes
import torch
# Assuming PIL is used within get_image_embedding

# --- Configuration ---
BASE_IMAGE_DIR = Path("../test_images/3D_shape_sweep_only")
OUTPUT_PLOT_DIR = Path("embedding_visualizations")
OUTPUT_PLOT_DIR.mkdir(parents=True, exist_ok=True)

# Embedding dimensions (as provided by user)
MODEL_DIMS = {
    "CLIP": 512,
    "SigLIP": 768,
    "BLIP": 512,
    "ViT": 768, 
}

# --- Assumed Variables (Make sure these are loaded/defined) ---
# CLIP_MODEL_ID = ... Siglip_MODEL_ID = ... BLIP_MODEL_ID = ...
# clip_model = ... clip_processor = ...
# siglip_model = ... siglip_processor = ...
# blip_model = ... blip_processor = ...
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

MODEL_INFO = {
    "CLIP": {"model": clip_model, "processor": clip_processor, "id": CLIP_MODEL_ID, "dim": MODEL_DIMS["CLIP"]},
    "SigLIP": {"model": siglip_model, "processor": siglip_processor, "id": Siglip_MODEL_ID, "dim": MODEL_DIMS["SigLIP"]},
    "BLIP": {"model": blip_model, "processor": blip_processor, "id": BLIP_MODEL_ID, "dim": MODEL_DIMS["BLIP"]},
    "ViT": {"model": ViT_model, "processor": processor, "id": ViT_MODEL_ID, "dim": MODEL_DIMS["ViT"]},
}

# Define markers for models
MARKERS = {"CLIP": "o", "SigLIP": "s",  "ViT": "D", "BLIP": "^"} # Add more if needed

# Output filename
OUTPUT_PLOT_FILE = OUTPUT_PLOT_DIR / "all_models_by_size_color_marker_tsne.png"

# t-SNE config
TSNE_PERPLEXITY = 30
TSNE_N_ITER = 350

# --- Helper Function (Assuming you have this) ---
# Define or import your get_image_embedding function here.
# def get_image_embedding(image_path, model, processor, model_name, device, model_id):
#     # ... (load image, process, inference) ...
#     # Return embedding as numpy array or tensor
#     pass # Replace with your actual function


# --- Data Collection (Grouped by Model, Across All Sizes) ---
print("Collecting embeddings for ALL models across all sizes...")
model_embeddings_dict = {model_name: [] for model_name in MODEL_INFO}
# Store corresponding size labels for each embedding within each model's list
model_size_labels_dict = {model_name: [] for model_name in MODEL_INFO}
unique_size_labels = sorted([d.name for d in BASE_IMAGE_DIR.glob("size_*") if d.is_dir()])
if not unique_size_labels:
    print("Error: No 'size_*' directories found in BASE_IMAGE_DIR.")
    exit()

for size_label in unique_size_labels:
    size_dir = BASE_IMAGE_DIR / size_label
    print(f"  Processing directory: {size_label}")
    image_paths = list(size_dir.glob("*.png"))

    if not image_paths:
        print(f"    No PNG images found. Skipping.")
        continue

    for img_path in image_paths:
        for model_name, info in MODEL_INFO.items():
            try:
                embedding = get_image_embedding(
                    img_path, info["model"], info["processor"], model_name, DEVICE, info["id"]
                )

                if isinstance(embedding, torch.Tensor):
                    embedding = embedding.detach().cpu().numpy()
                embedding = embedding.flatten()

                # Validate dimension
                expected_dim = info['dim']
                if embedding.shape[0] != expected_dim:
                    print(f"    Warning: Embedding dim mismatch for {model_name} ({img_path.name}). Expected {expected_dim}, got {embedding.shape[0]}. Skipping this embedding.")
                    continue

                model_embeddings_dict[model_name].append(embedding)
                model_size_labels_dict[model_name].append(size_label)

            except Exception as e:
                print(f"    Error processing {img_path.name} with {model_name}: {e}")

Collecting embeddings for ALL models across all sizes...
  Processing directory: size_0.10


  return self.preprocess(images, **kwargs)


  Processing directory: size_0.15
  Processing directory: size_0.20
  Processing directory: size_0.25
  Processing directory: size_0.30
  Processing directory: size_0.35
  Processing directory: size_0.40
  Processing directory: size_0.45
  Processing directory: size_0.50
  Processing directory: size_0.55
  Processing directory: size_0.60
  Processing directory: size_0.65
  Processing directory: size_0.70
  Processing directory: size_0.75
  Processing directory: size_0.80
  Processing directory: size_0.85
  Processing directory: size_0.90
  Processing directory: size_0.95
  Processing directory: size_1.00
  Processing directory: size_1.05
  Processing directory: size_1.10
  Processing directory: size_1.15
  Processing directory: size_1.20


In [6]:
for model_name, embeddings_list in model_embeddings_dict.items():
    if not embeddings_list:
        print(f"  No embeddings collected for {model_name}. Skipping t-SNE.")
        continue

    embeddings_array = np.array(embeddings_list)
    n_samples, emb_dim = embeddings_array.shape
    print(f"  Processing {model_name}: {n_samples} samples, Dim={emb_dim}")

    # Check conditions for t-SNE
    effective_perplexity = min(TSNE_PERPLEXITY, max(1, n_samples - 1))
    if n_samples <= 1:
        # 
        print(f"    Only {n_samples} embedding(s). Cannot run t-SNE. Skipping.")
        continue
    if n_samples <= effective_perplexity:
        print(f"    Adjusting perplexity for {model_name} from {TSNE_PERPLEXITY} to {max(1, n_samples - 1)}.")
        effective_perplexity = max(1, n_samples - 1)

    print(f"    Running t-SNE for {model_name} (perplexity={effective_perplexity}, n_iter={TSNE_N_ITER})...")
    tsne = TSNE(
        n_components=2, random_state=42, perplexity=effective_perplexity,
        n_iter=TSNE_N_ITER, init='pca', learning_rate='auto', n_jobs=-1
    )
    
    embeddyings_2d = tsne.fit_transform(embeddings_array)
    
    model_point_colors = np.array([unique_size_labels.index(label) for label in model_size_labels_dict[model_name]])
    
    # print the image
    plt.figure(figsize=(12, 8))
    plt.title(f"t-SNE Visualization for {model_name} (perplexity={effective_perplexity})")
    plt.xlabel("t-SNE 1")
    plt.ylabel("t-SNE 2")
    plt.grid(True)
    plt.scatter(embeddyings_2d[:, 0], embeddyings_2d[:, 1], marker=MARKERS[model_name], s=100, alpha=0.5, c = model_point_colors)
    plt.colorbar(label="Size Labels")
    plt.clim(0, len(unique_size_labels) - 1)
    plt.xticks(rotation=45)
    plt.tight_layout()
    image_name = f"{model_name}_tsne.png"
    plt.savefig(OUTPUT_PLOT_DIR / image_name)
    plt.close()

  Processing CLIP: 552 samples, Dim=512
    Running t-SNE for CLIP (perplexity=30, n_iter=350)...


[WinError 2] The system cannot find the file specified
  File "d:\anaconda\envs\ve\Lib\site-packages\joblib\externals\loky\backend\context.py", line 257, in _count_physical_cores
    cpu_info = subprocess.run(
               ^^^^^^^^^^^^^^^
  File "d:\anaconda\envs\ve\Lib\subprocess.py", line 548, in run
    with Popen(*popenargs, **kwargs) as process:
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "d:\anaconda\envs\ve\Lib\subprocess.py", line 1026, in __init__
    self._execute_child(args, executable, preexec_fn, close_fds,
  File "d:\anaconda\envs\ve\Lib\subprocess.py", line 1538, in _execute_child
    hp, ht, pid, tid = _winapi.CreateProcess(executable, args,
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


  Processing SigLIP: 552 samples, Dim=768
    Running t-SNE for SigLIP (perplexity=30, n_iter=350)...




  Processing BLIP: 552 samples, Dim=512
    Running t-SNE for BLIP (perplexity=30, n_iter=350)...




  Processing ViT: 552 samples, Dim=768
    Running t-SNE for ViT (perplexity=30, n_iter=350)...




In [None]:
import torch
import torchvision.models as models
from PIL import Image

def get_cnn_embedding_vgg16(image_path, feature_extractor, pooling_layer, transform, device):
    """
    Calculates a 1D embedding vector for an image using a pre-trained VGG16
    feature extractor and Global Average Pooling.

    Args:
        image_path (str or Path): Path to the image file.
        feature_extractor (torch.nn.Module): The VGG16 features module.
        pooling_layer (torch.nn.Module): The Global Average Pooling layer.
        transform (torchvision.transforms.Compose): Preprocessing transforms.
        device (torch.device): CPU or CUDA device.

    Returns:
        np.ndarray: A 1D NumPy array representing the image embedding (shape (512,)),
                    or None if an error occurs.
    """
    try:
        img = Image.open(image_path).convert('RGB')
        # Apply transformations (resize, crop, normalize, etc.)
        img_t = transform(img)
        # Add batch dimension [C, H, W] -> [1, C, H, W]
        batch_t = torch.unsqueeze(img_t, 0).to(device)

        # --- Inference ---
        # Set model to evaluation mode and disable gradients
        feature_extractor.eval()
        pooling_layer.eval()
        with torch.no_grad():
            # 1. Extract features from convolutional layers
            # Output shape: [1, 512, H/32, W/32] (e.g., [1, 512, 7, 7] for 224x224 input)
            features = feature_extractor(batch_t)

            # 2. Apply Global Average Pooling
            # Output shape: [1, 512, 1, 1]
            pooled_features = pooling_layer(features)

        # 3. Flatten the pooled features to get the embedding vector
        # Reshape from [1, 512, 1, 1] to [1, 512] then flatten to (512,)
        # Using .squeeze() removes dimensions of size 1
        embedding = pooled_features.squeeze() # Shape: [512]

        # Detach from GPU, move to CPU, convert to NumPy
        embedding_np = embedding.detach().cpu().numpy()

        # Optional: L2 Normalize (consistent with previous steps, though maybe less critical
        # for some CNN embeddings depending on downstream use)
        # norm = np.linalg.norm(embedding_np)
        # if norm > 0:
        #     embedding_np = embedding_np / norm

        return embedding_np

    except Exception as e:
        print(f"Error getting VGG16 embedding for {image_path}: {e}")
        return None
    
print("Loading ResNet50 model...")
weights = models.VGG16_Weights.IMAGENET1K_V1 # Use recommended weights enum
vgg_model = models.vgg16(weights=weights).to(device)
vgg_model.eval() # Set to evaluation mode

# 2. Isolate the feature extractor part (convolutional layers)
# The output of this has 512 channels for VGG16
feature_extractor = vgg_model.features

# 3. Define the Global Average Pooling layer
# Output size (1, 1) means pool each channel down to a single value
pooling_layer = torch.nn.AdaptiveAvgPool2d((1, 1)).to(device)

# 4. Get the correct preprocessing transforms for these weights
preprocess_transform = weights.transforms()
print("VGG16 Preprocessing Transforms:")
print(preprocess_transform)


Loading ResNet50 model...
VGG16 Preprocessing Transforms:
ImageClassification(
    crop_size=[224]
    resize_size=[256]
    mean=[0.485, 0.456, 0.406]
    std=[0.229, 0.224, 0.225]
    interpolation=InterpolationMode.BILINEAR
)


In [13]:
vgg_embeddings_dict = []
vgg_size_labels_dict = [] # Initialize for VGG16
for size_label in unique_size_labels:
    size_dir = BASE_IMAGE_DIR / size_label
    print(f"  Processing directory: {size_label}")
    image_paths = list(size_dir.glob("*.png"))

    if not image_paths:
        print(f"    No PNG images found. Skipping.")
        continue

    for img_path in image_paths:
        try:
            embedding = get_cnn_embedding_vgg16(
                img_path, feature_extractor, pooling_layer, preprocess_transform, device
            )

            if isinstance(embedding, torch.Tensor):
                embedding = embedding.detach().cpu().numpy()
            embedding = embedding.flatten()

            # Validate dimension
            expected_dim = 512  # VGG16 output size
            if embedding.shape[0] != expected_dim:
                print(f"    Warning: Embedding dim mismatch for VGG16 ({img_path.name}). Expected {expected_dim}, got {embedding.shape[0]}. Skipping this embedding.")
                continue

            vgg_embeddings_dict.append(embedding)
            vgg_size_labels_dict.append(size_label)

        except Exception as e:
            print(f"    Error processing {img_path.name} with VGG16: {e}")

  Processing directory: size_0.10
  Processing directory: size_0.15
  Processing directory: size_0.20
  Processing directory: size_0.25
  Processing directory: size_0.30
  Processing directory: size_0.35
  Processing directory: size_0.40
  Processing directory: size_0.45
  Processing directory: size_0.50
  Processing directory: size_0.55
  Processing directory: size_0.60
  Processing directory: size_0.65
  Processing directory: size_0.70
  Processing directory: size_0.75
  Processing directory: size_0.80
  Processing directory: size_0.85
  Processing directory: size_0.90
  Processing directory: size_0.95
  Processing directory: size_1.00
  Processing directory: size_1.05
  Processing directory: size_1.10
  Processing directory: size_1.15
  Processing directory: size_1.20


In [9]:
# t-SNE for VGG16
vgg_embeddings_array = np.array(vgg_embeddings_dict)
n_samples, emb_dim = vgg_embeddings_array.shape
print(f"Processing VGG16: {n_samples} samples, Dim={emb_dim}")
# Check conditions for t-SNE
effective_perplexity = min(TSNE_PERPLEXITY, max(1, n_samples - 1))
if n_samples <= 1:
    print(f"    Only {n_samples} embedding(s). Cannot run t-SNE. Skipping.")
if n_samples <= effective_perplexity:
    print(f"    Adjusting perplexity for VGG16 from {TSNE_PERPLEXITY} to {max(1, n_samples - 1)}.")
    effective_perplexity = max(1, n_samples - 1)
vgg_tsne = TSNE(
    n_components=2, random_state=42, perplexity=effective_perplexity,
    n_iter=TSNE_N_ITER, init='pca', learning_rate='auto', n_jobs=-1
)
vgg_embeddings_2d = vgg_tsne.fit_transform(vgg_embeddings_array)
vgg_point_colors = np.array([unique_size_labels.index(label) for label in model_size_labels_dict[model_name]])

# Plotting

plt.figure(figsize=(12, 8))
plt.title(f"t-SNE Visualization for VGG16 (perplexity={effective_perplexity})")
plt.xlabel("t-SNE 1")
plt.ylabel("t-SNE 2")
plt.grid(True)
plt.scatter(vgg_embeddings_2d[:, 0], vgg_embeddings_2d[:, 1], marker="*", s=100, alpha=0.5, c = vgg_point_colors)
plt.colorbar(label="Size Labels")
plt.clim(0, len(unique_size_labels) - 1)
plt.xticks(rotation=45)
plt.tight_layout()
image_name = "VGG16_tsne.png"
plt.savefig(OUTPUT_PLOT_DIR / image_name)
plt.close()
print(f"t-SNE plot saved to {OUTPUT_PLOT_DIR / image_name}")
print("VGG16 t-SNE visualization complete.")


Processing VGG16: 552 samples, Dim=512




t-SNE plot saved to embedding_visualizations\VGG16_tsne.png
VGG16 t-SNE visualization complete.
