# DINOv3 visual search with FiftyOne

End-to-end notebook following https://docs.voxel51.com/tutorials/dinov3.html: install, authenticate to Hugging Face, load data, extract DINOv3 embeddings, visualize and search, train a small classifier, and generate PCA/CLS foreground masks.


## 1. Install required libraries

Uncomment if needed. Requires network access for pip and dataset downloads.


In [1]:
# !pip install --upgrade pip
# !pip install git+https://github.com/huggingface/transformers
# !pip install -q huggingface_hub
# !pip install fiftyone


## 2. Log in to Hugging Face
Needed to access gated models like DINOv3.


In [2]:
from huggingface_hub import notebook_login
notebook_login()


## 3. Load a dataset
Uses COCO validation via FiftyOne Zoo; swap with your dataset path if desired.


In [None]:
import fiftyone as fo
import fiftyone.zoo as foz
from pathlib import Path

# Point to your local image root. All JPGs in subfolders will be ingested.
data_root = Path("/home/trudes/Projects/Dinov3-WhaleID/Data")
image_paths = sorted([str(p) for p in data_root.rglob("*.jpg")])
print(f"Found {len(image_paths)} images under {data_root}")

# Clean up any empty prior dataset to avoid MediaTypeError
if fo.dataset_exists("whale-local-dinov3"):
    fo.delete_dataset("whale-local-dinov3")

# Create an image-only dataset from local files
dataset = fo.Dataset.from_images(image_paths, name="whale-local-dinov3")
dataset.persistent = True
dataset


Found 0 images under /Data
 100% |█████████████████████| 0/0 [1.7ms elapsed, ? remaining, ? samples/s]  


Name:        whale-local-dinov3
Media type:  None
Num samples: 0
Persistent:  True
Tags:        []
Sample fields:
    id:               fiftyone.core.fields.ObjectIdField
    filepath:         fiftyone.core.fields.StringField
    tags:             fiftyone.core.fields.ListField(fiftyone.core.fields.StringField)
    metadata:         fiftyone.core.fields.EmbeddedDocumentField(fiftyone.core.metadata.Metadata)
    created_at:       fiftyone.core.fields.DateTimeField
    last_modified_at: fiftyone.core.fields.DateTimeField

## 4. Build the DINOv3 model wrapper
Creates a transformers model and wraps it for FiftyOne embedding extraction.


In [4]:
import transformers
import fiftyone.utils.transformers as fouhft

transformers_model = transformers.AutoModel.from_pretrained("facebook/dinov3-vitl16-pretrain-lvd1689m")
model_config = fouhft.FiftyOneTransformerConfig(
    {"model": transformers_model, "name_or_path": "facebook/dinov3-vitl16-pretrain-lvd1689m"}
)
model = fouhft.FiftyOneTransformer(model_config)


Loading weights:   0%|          | 0/415 [00:00<?, ?it/s]

## 5. Compute embeddings
Writes embeddings to `embeddings_dinov3`.


In [5]:
dataset.compute_embeddings(model, embeddings_field="embeddings_dinov3")


MediaTypeError: Unsupported media type 'None'

## 6. Visualize embeddings (UMAP)
Compute a dense UMAP and store in the brain.


In [None]:
import fiftyone.brain as fob

viz = fob.compute_visualization(
    dataset,
    embeddings="embeddings_dinov3",
    brain_key="dino_dense_umap",
)
viz


## 7. Launch App and explore
Start the FiftyOne App, then compute similarity and view nearest neighbors to a query sample.


In [None]:
session = fo.launch_app(dataset, port=5151)
session


In [None]:
print(session.url)


In [None]:
idx = fob.compute_similarity(
    dataset,
    embeddings="embeddings_dinov3",
    metric="cosine",
    brain_key="dino_sim",
)
idx


In [None]:
query_id = dataset.first().id
view = dataset.sort_by_similarity(query_id, k=20)
session.view = view
view


## 8. Classification from embeddings
Derive image-level labels from detections, train a linear head, run inference, and evaluate.


In [None]:
from collections import Counter
from sklearn.preprocessing import normalize
from sklearn.linear_model import LogisticRegression
import numpy as np

# Auto-generate image-level labels from detections if present; otherwise skip
has_detections = dataset.has_field("detections")
ids        = dataset.values("id")
paths      = dataset.values("filepath")
embs       = dataset.values("embeddings_dinov3")

if has_detections:
    det_lists  = dataset.values("detections.detections.label")
    img_labels = [Counter(L).most_common(1)[0][0] if L else None for L in det_lists]
    dataset.set_values(
        "image_label",
        [fo.Classification(label=l) if l is not None else None for l in img_labels],
    )
    print("Image labels populated from detections.")
else:
    img_labels = [None for _ in ids]
    print("No detections field; skipping auto image labels. Add detections to use this section.")


In [None]:
# Prepare train data if labels exist
if any(img_labels):
    mask = [(x is not None) and (y is not None) for x, y in zip(embs, img_labels)]
    X = normalize(np.stack([x for x,m in zip(embs,mask) if m], axis=0))
    y = [lab for lab,m in zip(img_labels,mask) if m]
    print(f"Training samples: {len(y)}")
else:
    X, y = None, None
    print("No labels available; skip classifier training.")


In [None]:
if X is not None and len(y) > 1:
    clf = LogisticRegression(max_iter=2000, class_weight="balanced", n_jobs=-1).fit(X, y)
else:
    clf = None
    print("Classifier not trained (need at least 2 labeled samples).")


In [None]:
if clf is None:
    print("Skipping inference; classifier not trained.")
else:
    for sample in dataset.iter_samples(autosave=True, progress=True):
        v = sample["embeddings_dinov3"]
        if v is None:
            continue

        Xpred = normalize(np.asarray(v, dtype=np.float32).reshape(1, -1))
        p = clf.predict_proba(Xpred)[0]
        k = int(np.argmax(p))

        sample["predict_dinov3"] = fo.Classification(
            label=str(clf.classes_[k]),
            confidence=float(p[k]),
        )


In [None]:
if clf is None:
    print("Skipping evaluation; classifier not trained.")
else:
    results = dataset.evaluate_classifications(
        "predict_dinov3", gt_field="image_label", method="simple", eval_key="dino_simple"
    )
    results


## 9. PCA/CLS foreground segmentation
Helper to compute CLS/foreground masks and optional heatmaps.


In [None]:
import numpy as np
from PIL import Image, ImageOps
import torch
import torch.nn.functional as F
from transformers import AutoImageProcessor, AutoModel
from fiftyone import Segmentation, Heatmap

def build_pca_fg_masks(
    dataset: fo.Dataset,
    model_id: str = "facebook/dinov3-vits16-pretrain-lvd1689m",
    field: str = "pca_fg",
    heatmap_field: str | None = None,
    thresh: float = 0.5,
    smooth_k: int = 3,
    device: str | None = None,
):
    """
    Compute a DINOv3 PCA/CLS-style foreground mask for every sample and write to dataset.

    - ViT: cosine(sim) to CLS over patch tokens
    - ConvNeXt: cosine(sim) to global-avg feature over feature map
    - Masks are overlaid natively in the FiftyOne App.
    """
    device = device or ("cuda" if torch.cuda.is_available() else "cpu")
    processor = AutoImageProcessor.from_pretrained(model_id)
    model = AutoModel.from_pretrained(model_id).to(device).eval()

    if not dataset.has_field(field):
        dataset.add_sample_field(field, fo.EmbeddedDocumentField, embedded_doc_type=fo.Segmentation)

    if heatmap_field and not dataset.has_field(heatmap_field):
        dataset.add_sample_field(heatmap_field, fo.EmbeddedDocumentField, embedded_doc_type=fo.Heatmap)

    mt = dict(dataset.mask_targets or {})
    mt[field] = {0: "background", 1: "foreground"}
    dataset.mask_targets = mt
    dataset.save()

    @torch.inference_mode()
    def _fg_mask(path: str):
        img = ImageOps.exif_transpose(Image.open(path).convert("RGB"))
        W0, H0 = img.size

        bf = processor(images=img, return_tensors="pt").to(device)
        last = model(**bf).last_hidden_state

        if last.ndim == 3:
            hs = last[0].float()
            num_reg = getattr(model.config, "num_register_tokens", 0)
            patch = getattr(model.config, "patch_size", 16)
            patches = hs[1 + num_reg :, :]
            _, _, Hc, Wc = bf["pixel_values"].shape
            gh, gw = Hc // patch, Wc // patch

            cls = hs[0:1, :]
            sims = (F.normalize(patches, dim=1) @ F.normalize(cls, dim=1).T).squeeze(1)
            fg = sims.detach().cpu().view(gh, gw)
        else:
            fm = last[0].float()
            C, gh, gw = fm.shape
            grid = F.normalize(fm.permute(1, 2, 0).reshape(-1, C), dim=1)
            gvec = F.normalize(fm.mean(dim=(1, 2), keepdim=True).squeeze().unsqueeze(0), dim=1)
            fg = (grid @ gvec.T).detach().cpu().reshape(gh, gw)

        fg01 = (fg - fg.min()) / (fg.max() - fg.min() + 1e-8)

        if smooth_k and smooth_k > 1:
            fg01 = F.avg_pool2d(fg01.unsqueeze(0).unsqueeze(0), smooth_k, 1, smooth_k // 2).squeeze()

        mask_small = (fg01 > thresh).to(torch.uint8).numpy()

        mask_full = Image.fromarray(mask_small * 255).resize((W0, H0), Image.NEAREST)
        soft_full = Image.fromarray((fg01.numpy() * 255).astype(np.uint8)).resize((W0, H0), Image.BILINEAR)

        mask = (np.array(mask_full) > 127).astype(np.uint8)
        soft = np.array(soft_full).astype(np.float32) / 255.0
        return mask, soft

    skipped = 0
    for s in dataset.iter_samples(autosave=True, progress=True):
        try:
            m, soft = _fg_mask(s.filepath)
            s[field] = Segmentation(mask=m)
            if heatmap_field:
                s[heatmap_field] = Heatmap(map=soft)
        except Exception:
            s[field] = None
            if heatmap_field:
                s[heatmap_field] = None
            skipped += 1

    print(f"✓ wrote masks to '{field}'" + (f" and heatmaps to '{heatmap_field}'" if heatmap_field else "") + f". skipped: {skipped}")


In [None]:
build_pca_fg_masks(dataset, field="pca_fg", heatmap_field="pca_fg_heat", thresh=0.5, smooth_k=3)


## Search for a specific label
Filter views to samples containing a target label (e.g., `whale`). Use detections or the image-level labels created earlier.


In [None]:
from fiftyone import ViewField as F

# Detection-level filter (skip if no detections field)
if dataset.has_field("detections"):
    whales = dataset.filter_labels("detections", F("label") == "whale")
    session.view = whales  # optional: show in App
    print("Detections filtered. Count:", len(whales))
else:
    whales = dataset
    print("Dataset has no detections field; skipping detection-level filter.")


In [None]:
# Image-level labels (from majority vote step)
if dataset.has_field("image_label"):
    whales_img = dataset.match(F("image_label.label") == "whale")
    session.view = whales_img  # optional
    print("Image-label filter count:", len(whales_img))
else:
    whales_img = dataset
    print("Dataset has no image_label field; skipping image-level filter.")


## Foreground-only embeddings (masking background)
Use the PCA/CLS mask to zero out background before embedding, then build similarity on foreground-only vectors.


In [None]:
import numpy as np
from PIL import Image
from transformers import AutoImageProcessor

fg_processor = AutoImageProcessor.from_pretrained("facebook/dinov3-vitl16-pretrain-lvd1689m")


In [None]:
def masked_cls_embedding(path, mask, model, processor, device=None):
    if mask is None:
        return None
    device = device or ("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    img = Image.open(path).convert("RGB")
    m = mask
    if m.shape != (img.height, img.width):
        m = np.array(Image.fromarray(m.astype(np.uint8) * 255).resize((img.width, img.height), Image.NEAREST)) > 127

    arr = np.array(img)
    arr[~m] = 0
    inputs = processor(images=Image.fromarray(arr), return_tensors="pt").to(device)
    with torch.no_grad():
        out = model(**inputs).last_hidden_state
    cls = out[0, 0].detach().cpu().numpy()
    return cls


In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
transformers_model = transformers_model.to(device)

for sample in dataset.iter_samples(autosave=True, progress=True):
    seg = sample["pca_fg"] if sample.has_field("pca_fg") else None
    mask = None if seg is None else seg.mask
    emb = masked_cls_embedding(sample.filepath, mask, transformers_model, fg_processor, device=device)
    sample["embeddings_dinov3_fg"] = emb


In [None]:
# Similarity on foreground embeddings
fg_idx = fob.compute_similarity(
    dataset,
    embeddings="embeddings_dinov3_fg",
    metric="cosine",
    brain_key="dino_fg_sim",
)
fg_idx


In [None]:
query_id = dataset.first().id
fg_view = dataset.sort_by_similarity(query_id, k=30, brain_key="dino_fg_sim")
session.view = fg_view
fg_view


In [None]:
# Optional: visualize foreground-embedding UMAP
fg_viz = fob.compute_visualization(
    dataset,
    embeddings="embeddings_dinov3_fg",
    brain_key="dino_fg_umap",
)
fg_viz


## Cluster by pose, then by individual (foreground embeddings)
Use foreground embeddings (`embeddings_dinov3_fg`) to cluster images by pose first, then sub-cluster each pose to approximate individuals. Adjust `pose_k` and `individual_k` as needed.


In [None]:
import numpy as np
from sklearn.cluster import KMeans

pose_k = 3  # tweak as needed
embs = dataset.values("embeddings_dinov3_fg")
ids = dataset.values("id")
mask = [e is not None for e in embs]
X = np.stack([e for e,m in zip(embs,mask) if m], axis=0) if any(mask) else None
ids_masked = [i for i,m in zip(ids,mask) if m]

if X is None or len(ids_masked) < 2:
    print("Not enough embeddings for clustering.")
else:
    k = min(pose_k, max(2, len(ids_masked)))
    kmeans = KMeans(n_clusters=k, random_state=51, n_init="auto").fit(X)
    pose_clusters = kmeans.labels_.tolist()
    # Assign back to samples in the same order as ids_masked
    for sid, lbl in zip(ids_masked, pose_clusters):
        sample = dataset[sid]
        sample["pose_cluster"] = int(lbl)
        sample.save()
    print("Pose clusters sizes:", dict(zip(*np.unique(pose_clusters, return_counts=True))))


In [None]:
# Show 5 random examples per pose cluster
for pose_id in sorted(set(pose_clusters)):
    pose_view = dataset.match(F("pose_cluster") == pose_id)
    sample_count = pose_view.count()
    few = pose_view.shuffle(seed=pose_id).limit(5)
    print(f"Pose {pose_id}: showing {len(few)} of {sample_count}")
    for s in few:
        print("    ", s.filepath)


In [None]:
from sklearn.cluster import KMeans

individual_k = 5  # max clusters per pose; will cap to available samples
pose_vals = dataset.values("pose_cluster") if dataset.has_field("pose_cluster") else []
embs = dataset.values("embeddings_dinov3_fg")
ids = dataset.values("id")

if not pose_vals:
    print("No pose_cluster labels available; run pose clustering first.")
else:
    for pose_id in sorted(set(p for p in pose_vals if p is not None)):
        pose_embs = [e for e,p in zip(embs, pose_vals) if p == pose_id and e is not None]
        pose_ids  = [i for i,p in zip(ids,  pose_vals) if p == pose_id and p is not None]
        if len(pose_embs) < 2:
            continue
        k = min(individual_k, max(2, len(pose_embs)))
        kmeans = KMeans(n_clusters=k, random_state=pose_id, n_init="auto").fit(np.stack(pose_embs))
        for sid, lbl in zip(pose_ids, kmeans.labels_.tolist()):
            sample = dataset[sid]
            sample["individual_cluster"] = int(lbl)
            sample.save()
    print("Done assigning individual_cluster labels per pose.")


In [None]:
# Show 5 random examples per inferred individual within each pose
for pose_id in sorted(set(pose_clusters)):
    pose_view = dataset.match(F("pose_cluster") == pose_id)
    if pose_view.count() == 0:
        continue
    indiv_ids = sorted(set(pose_view.values("individual_cluster")))
    print(f"Pose {pose_id}: {len(indiv_ids)} inferred individuals")
    for cid in indiv_ids:
        cluster_view = pose_view.match(F("individual_cluster") == cid)
        sample_count = cluster_view.count()
        few = cluster_view.shuffle(seed=cid).limit(5)
        print(f"  individual_{cid}: showing {len(few)} of {sample_count}")
        for s in few:
            print("     ", s.filepath)



import matplotlib.pyplot as plt
from PIL import Image
from fiftyone import ViewField as F

max_poses = 3
max_per_pose = 5
pose_vals = dataset.values("pose_cluster") if dataset.has_field("pose_cluster") else []
if not pose_vals:
    print("No pose_cluster field; run clustering first.")
else:
    pose_ids = sorted(set(p for p in pose_vals if p is not None))[:max_poses]
    for pid in pose_ids:
        view = dataset.match(F("pose_cluster") == pid)
        samples = list(view.shuffle(seed=pid).limit(max_per_pose))
        print(f"Pose {pid}: showing {len(samples)} of {view.count()}")
        if len(samples) == 0:
            continue
        fig, axes = plt.subplots(1, len(samples), figsize=(4*len(samples), 4))
        if len(samples) == 1:
            axes = [axes]
        for ax, sample in zip(axes, samples):
            img = Image.open(sample.filepath).convert("RGB")
            ax.imshow(img)
            mask = None
            if sample.has_field("pca_fg") and sample["pca_fg"] is not None:
                mask = sample["pca_fg"].mask
            if mask is not None:
                ax.imshow(mask, cmap="inferno", alpha=0.35)
            ax.set_xticks([]); ax.set_yticks([])
        plt.show()


In [None]:

import matplotlib.pyplot as plt
from PIL import Image
from fiftyone import ViewField as F

max_poses = 3
max_per_pose = 5
pose_vals = dataset.values("pose_cluster") if dataset.has_field("pose_cluster") else []
if not pose_vals:
    print("No pose_cluster field; run clustering first.")
else:
    pose_ids = sorted(set(p for p in pose_vals if p is not None))[:max_poses]
    for pid in pose_ids:
        view = dataset.match(F("pose_cluster") == pid)
        few = view.shuffle(seed=pid).limit(max_per_pose)
        print(f"Pose {pid}: showing {len(few)} of {view.count()}")
        if len(few) == 0:
            continue
        fig, axes = plt.subplots(1, len(few), figsize=(4*len(few), 4))
        if len(few) == 1:
            axes = [axes]
        for ax, sample in zip(axes, few):
            img = Image.open(sample.filepath).convert("RGB")
            ax.imshow(img)
            mask = None
            if sample.has_field("pca_fg") and sample["pca_fg"] is not None:
                mask = sample["pca_fg"].mask
            if mask is not None:
                ax.imshow(mask, cmap="inferno", alpha=0.35)
            ax.set_xticks([]); ax.set_yticks([])
        plt.show()


## Visualize individual clusters (filepaths)
Print up to 5 samples per inferred individual cluster (first 5 clusters).


In [None]:

from fiftyone import ViewField as F
max_individuals = 5
max_per_individual = 5
if not dataset.has_field("individual_cluster"):
    print("No individual_cluster field; run individual clustering first.")
else:
    ids = dataset.values("individual_cluster")
    clusters = sorted(set(c for c in ids if c is not None))[:max_individuals]
    for cid in clusters:
        view = dataset.match(F("individual_cluster") == cid)
        few = view.shuffle(seed=cid).limit(max_per_individual)
        print(f"individual_{cid}: showing {len(few)} of {view.count()}")
        for s in few:
            print("   ", s.filepath)


## Save clustered images to disk
Exports copies of clustered images under `runs/<run_name>/pose_clusters` and `runs/<run_name>/individuals` for inspection.


In [None]:
from pathlib import Path
import shutil
from fiftyone import ViewField as F

run_name = "run1"  # change per run
run_dir = Path("runs") / run_name
pose_dir = run_dir / "pose_clusters"
indiv_dir = run_dir / "individuals"
pose_dir.mkdir(parents=True, exist_ok=True)
indiv_dir.mkdir(parents=True, exist_ok=True)

# Export pose clusters
if dataset.has_field("pose_cluster"):
    pose_vals = dataset.values("pose_cluster")
    pose_ids = sorted(set(p for p in pose_vals if p is not None))
    for pid in pose_ids:
        cluster_dir = pose_dir / f"pose_{pid}"
        cluster_dir.mkdir(parents=True, exist_ok=True)
        view = dataset.match(F("pose_cluster") == pid)
        for s in view:
            dest = cluster_dir / f"{s.id}_{Path(s.filepath).name}"
            if not dest.exists():
                shutil.copy2(s.filepath, dest)
    print(f"Saved pose clusters to {pose_dir}")
else:
    print("No pose_cluster field; run pose clustering first.")

# Export individual clusters
if dataset.has_field("individual_cluster"):
    indiv_vals = dataset.values("individual_cluster")
    indiv_ids = sorted(set(c for c in indiv_vals if c is not None))
    for cid in indiv_ids:
        cluster_dir = indiv_dir / f"individual_{cid}"
        cluster_dir.mkdir(parents=True, exist_ok=True)
        view = dataset.match(F("individual_cluster") == cid)
        for s in view:
            dest = cluster_dir / f"{s.id}_{Path(s.filepath).name}"
            if not dest.exists():
                shutil.copy2(s.filepath, dest)
    print(f"Saved individual clusters to {indiv_dir}")
else:
    print("No individual_cluster field; run individual clustering first.")
