In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [48]:
import os, time

import itertools
from os.path import basename

import shutil
import time
import numpy as np
from PIL import Image

import torch
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from torchvision.models import mobilenet_v3_small

from sklearn.metrics.pairwise import cosine_similarity
from sklearn.cluster import DBSCAN

In [14]:
dir = r"/content/drive/MyDrive/pics"

In [30]:
def print_cluster_similarities(image_paths, labels, embeddings):
    label_to_indices = {}
    for idx, label in enumerate(labels):
        if label == -1:
            continue  # Skip noise
        label_to_indices.setdefault(label, []).append(idx)

    for label, indices in label_to_indices.items():
        if len(indices) < 2:
            continue  # Skip singleton clusters

        print(f"\nCluster {label+1} — {len(indices)} images")
        for i, j in itertools.combinations(indices, 2):
            emb_i, emb_j = embeddings[i], embeddings[j]
            sim = np.dot(emb_i, emb_j)  # cosine similarity (L2-normalized)
            print(f"  {basename(image_paths[i])} ↔ {basename(image_paths[j])}  →  similarity: {sim:.4f}, distance: {1-sim:.4f}")

In [49]:
# Define model and transform only once
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

_model = mobilenet_v3_small(pretrained=True)
_model = torch.nn.Sequential(*list(_model.children())[:-1])
_model.to(device).eval()

_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])



In [50]:
# Dataset class for image paths
class ImageDataset(Dataset):
    def __init__(self, image_paths, transform):
        self.image_paths = image_paths
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        path = self.image_paths[idx]
        image = Image.open(path).convert("RGB")
        return self.transform(image), path

# Main grouping function
def group_similar_images_opt(input_dir, eps=0.13, min_samples=2, batch_size=32, model=None, transform=None, use_gpu=True):
    if model is None or transform is None:
        raise ValueError("Model and transform must be provided for optimized usage.")

    device = torch.device('cuda' if torch.cuda.is_available() and use_gpu else 'cpu')

    # Collect valid image paths
    valid_exts = ('.jpg', '.jpeg', '.png')
    image_paths = [os.path.join(input_dir, f)
                   for f in os.listdir(input_dir)
                   if f.lower().endswith(valid_exts)]

    if not image_paths:
        print("No valid images found.")
        return

    # Load dataset and dataloader
    dataset = ImageDataset(image_paths, transform)
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=False, num_workers=2)

    embeddings, final_paths = [], []

    for batch_imgs, batch_paths in loader:
        batch_imgs = batch_imgs.to(device)
        with torch.no_grad():
            batch_emb = model(batch_imgs).squeeze(-1).squeeze(-1).cpu().numpy()
            batch_emb = batch_emb / np.linalg.norm(batch_emb, axis=1, keepdims=True)
        embeddings.append(batch_emb)
        final_paths.extend(batch_paths)

    embeddings = np.vstack(embeddings).astype(np.float32)

    # Compute cosine distance matrix
    sim_matrix = cosine_similarity(embeddings)
    dist_matrix = np.clip(1.0 - sim_matrix, 0.0, None)

    # DBSCAN clustering
    db = DBSCAN(eps=eps, min_samples=min_samples, metric='precomputed')
    labels = db.fit_predict(dist_matrix)

    label_counts = {label: list(labels).count(label) for label in set(labels)}

    # Move images into cluster folders
    cluster_id = 1
    for label in set(labels):
        if label == -1 or label_counts[label] < 2:
            continue
        group_dir = os.path.join(input_dir, f"group_{cluster_id}")
        os.makedirs(group_dir, exist_ok=True)
        for i, img_path in enumerate(final_paths):
            if labels[i] == label:
                shutil.move(img_path, os.path.join(group_dir, os.path.basename(img_path)))
        cluster_id += 1

    # print_cluster_similarities(image_paths, labels, embeddings)
    print(f"[✓] Grouped {len(final_paths)} images into {cluster_id - 1} clusters (no singleton folders).")

In [51]:
# Run clustering
t1 = time.time()
group_similar_images_opt(dir, eps=0.13, min_samples=2, batch_size=32, model=_model, transform=_transform)
print(f"Completed in {time.time() - t1:.2f} seconds.")

[✓] Grouped 48 images into 7 clusters (no singleton folders).
Completed in 5.09 seconds.


In [46]:
def revert(parent_dir):
    for root, dirs, files in os.walk(parent_dir, topdown=False):
        if root == parent_dir:
            continue  # Skip top-level dir

        for file in files:
            src_path = os.path.join(root, file)
            dest_path = os.path.join(parent_dir, file)

            # Handle filename conflict
            if os.path.exists(dest_path):
                base, ext = os.path.splitext(file)
                i = 1
                while True:
                    new_name = f"{base}_{i}{ext}"
                    new_dest = os.path.join(parent_dir, new_name)
                    if not os.path.exists(new_dest):
                        dest_path = new_dest
                        break
                    i += 1

            os.rename(src_path, dest_path)

        # Remove the now-empty subdirectory
        try:
            os.rmdir(root)
        except OSError:
            pass

revert(dir)