<a href="https://colab.research.google.com/github/DavidePanza/Images_WebScraper/blob/main/notebooks/development/dino2_dev_HF.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
from torchvision import transforms
import torch
import time
from PIL import Image
import skimage.io as io
import pickle
import os
import numpy as np
from pathlib import Path
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoModel, AutoImageProcessor
from torch.utils.data import DataLoader
from pytorch_metric_learning import losses

!pip install fiftyone -q
import fiftyone as fo

drive.mount('/gdrive')

#Initialize model

In [None]:
def pad_to_square(image, fill=0):
    w, h = image.size
    max_dim = max(w, h)
    padded = Image.new(image.mode, (max_dim, max_dim), fill)
    padded.paste(image, ((max_dim - w) // 2, (max_dim - h) // 2))
    return padded


def make_transform(use_padding=True):
  if use_padding:
    # define image transformations
    transform = transforms.Compose([
      lambda img: pad_to_square(img, fill=0),  # Gray padding
      transforms.Resize(224),
      transforms.ToTensor(),
      transforms.Normalize(
          mean=[0.485, 0.456, 0.406],
          std=[0.229, 0.224, 0.225]
          )
        ])

  else:
    # define image transformations
    transform = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224), # new size will be 3x224x224
        transforms.ToTensor(),
        transforms.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
            )
        ])

  return transform

In [None]:
# Hyperparameters
EMBEDDING_DIM = 512  # Size of the feature vector

# Load DINOv2 and replace head with ArcFace-compatible layers
class DINOv2Embeddings(nn.Module):
    def __init__(self):
        super().__init__()
        self.backbone = AutoModel.from_pretrained("facebook/dinov2-base")
        self.embedding = nn.Linear(self.backbone.config.hidden_size, EMBEDDING_DIM)

    def forward(self, x, labels=None):
        features = self.backbone(x).last_hidden_state[:, 0]  # CLS token
        embeddings = F.normalize(self.embedding(features), p=2, dim=1)  # L2-normalize

        return embeddings

# Extract the Embeddings

In [None]:
# get the start time
st = time.time()

# define data paths
img_dir  = "/gdrive/MyDrive/DSR/Jaguars_Project/images/cropped_body"
# initialize empty dict
img_embedding  = {}

img_dir  = Path("/gdrive/MyDrive/DSR/Jaguars_Project/images/cropped_body")
image_extensions = (".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".gif")
image_paths = [str(path) for path in img_dir.rglob("*") if path.suffix.lower() in image_extensions]

transform = make_transform(use_padding=True)
img_embedding = {}

model = DINOv2Embeddings()
model.eval()  # Set model to evaluation mode

for i, image_path in enumerate(image_paths):
    # Progress
    print(f'Processing image {i+1}/{len(image_paths)}: {image_path}')

    # Get image name
    image_name = image_path.split('/')[-1]

    try:
        # Load and transform image
        img = Image.open(image_path).convert('RGB')  # Ensure RGB format
        img_t = transform(img)
        img_unsqueezed = img_t.unsqueeze(0)  # Add batch dimension

        # Forward pass with no gradients
        with torch.no_grad():
            # Get 512-D normalized embedding
            embedding = model(img_unsqueezed)

        # Convert to numpy and store
        img_embedding[image_name] = embedding.squeeze().cpu().numpy()

    except Exception as e:
        print(f"Error processing {image_path}: {str(e)}")
        continue


# get the end time
et = time.time()

# get the execution time
elapsed_time = et - st
print('Execution time:', elapsed_time, 'seconds')

In [None]:
import pickle

with open('/gdrive/MyDrive/DSR/Jaguars_Project/datasets/img_embedding_v9.pkl', 'wb') as f:
    pickle.dump(img_embedding, f)


# Add Embeddings to Dataset

In [None]:
# load dataset
image_dir = Path("/gdrive/MyDrive/DSR/Jaguars_Project/images/raw_images")
input_dir = Path("/gdrive/MyDrive/DSR/Jaguars_Project/datasets/dataset_v9")

dataset = fo.Dataset.from_dir(
    dataset_dir=str(input_dir),
    dataset_type=fo.types.FiftyOneDataset,
    rel_dir=image_dir,
)


Importing samples...


INFO:fiftyone.utils.data.importers:Importing samples...


 100% |███████████████| 4300/4300 [571.2ms elapsed, 0s remaining, 7.6K samples/s]      


INFO:eta.core.utils: 100% |███████████████| 4300/4300 [571.2ms elapsed, 0s remaining, 7.6K samples/s]      


In [None]:
# Add vector field (if not exists)
if not dataset.has_sample_field("dino2_embedding_v2"):
    dataset.add_sample_field(
        "dino2_embedding_v2",
        fo.VectorField,  # For fixed-length embeddings
        embedded_doc_type=None
    )

In [None]:
# Create a mapping from image_name to sample
sample_map = {
    sample.metadata["image_name"]: sample
    for sample in dataset
    if "image_name" in sample.metadata
}

In [None]:
updates = []
for img_name, embedding in img_embedding.items():
    if img_name in sample_map:
        sample = sample_map[img_name]
        sample["dino2_embedding_v2"] = embedding.astype(np.float32)
        updates.append(sample)

    # Save in batches
    if len(updates) >= 100:
        for sample in updates:
            sample.save()  # Save each sample individually
        updates = []

if updates:
    for sample in updates:
        sample.save()

# Store Dataset

In [None]:
# store dataset metadata
base_dir = Path('/gdrive/MyDrive/DSR/Jaguars_Project/images/cropped_body')
storage_dir = Path('/gdrive/MyDrive/DSR/Jaguars_Project/datasets/dataset_v10')
os.makedirs(storage_dir, exist_ok=True)

dataset.export(
    # Directory to save the datasets
    export_dir=str(storage_dir),
    dataset_type=fo.types.FiftyOneDataset,
    export_media=False, # turn this to True if you want to store also the images
    # Paths for samples will be stored relative to this directory
    rel_dir=base_dir
)