In [9]:
from transformers import CLIPProcessor, CLIPModel
import torch
import numpy as np
from pathlib import Path
from PIL import Image
import tensorflow as tf
import numpy as np
from sklearn.cluster import DBSCAN
from collections import defaultdict
import shutil
from pathlib import Path
import matplotlib.pyplot as plt
from time import time

total_start = time()
model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

In [10]:
demo_directory = Path("../coil-100/")
images_to_paths = {image_path.stem: image_path for image_path in demo_directory.iterdir() if image_path.suffix.lower() in ['.jpg', '.jpeg', '.png', '.gif']}

images = [np.array(Image.open(path)) for path in images_to_paths.values()]
images = [torch.tensor(image, dtype=torch.float32) for image in images]

images = [tf.constant(image.numpy(), dtype=tf.float32) for image in images]

inputs = processor(images=images, return_tensors="pt", padding=True)

with torch.no_grad():
    outputs = model.get_image_features(**inputs)

images_to_embeddings = {image_id: tensor_embedding.detach().numpy() for image_id, tensor_embedding in zip(images_to_paths.keys(), outputs)}

In [11]:
image_ids = list(images_to_embeddings.keys())
embeddings = list(images_to_embeddings.values())

clustering = DBSCAN(min_samples=2, eps=3).fit(np.stack(embeddings))

image_id_communities = defaultdict(set)
independent_image_ids = set()

for image_id, cluster_idx in zip(image_ids, clustering.labels_):
    cluster_idx = int(cluster_idx)
    if cluster_idx == -1:
        independent_image_ids.add(image_id)
        continue

    image_id_communities[cluster_idx].add(image_id)

In [12]:
len(independent_image_ids)

image_id_communities

defaultdict(set,
            {0: {'obj82__0',
              'obj82__10',
              'obj82__100',
              'obj82__105',
              'obj82__110',
              'obj82__115',
              'obj82__120',
              'obj82__125',
              'obj82__130',
              'obj82__135',
              'obj82__140',
              'obj82__145',
              'obj82__15',
              'obj82__150',
              'obj82__155',
              'obj82__160',
              'obj82__165',
              'obj82__170',
              'obj82__175',
              'obj82__180',
              'obj82__185',
              'obj82__190',
              'obj82__195',
              'obj82__20',
              'obj82__200',
              'obj82__205',
              'obj82__210',
              'obj82__215',
              'obj82__220',
              'obj82__225',
              'obj82__230',
              'obj82__235',
              'obj82__240',
              'obj82__245',
              'obj82__25',
      

In [13]:
# Output folder for non-duplicate images
output_folder = demo_directory / "NoDuplicates"
output_folder.mkdir(exist_ok=True)

# Mapping of cluster index to selected image ID
selected_images = {}

for cluster_idx, image_ids in image_id_communities.items():
    if cluster_idx == -1:
        continue  # Skip independent images

    # Select one image from the cluster (you can choose based on some criteria)
    selected_image_id = next(iter(image_ids))

    # Copy the selected image to the output folder
    shutil.copy(images_to_paths[selected_image_id], output_folder / f"{selected_image_id}.jpg")

# Copy independent images to the output folder
for image_id in independent_image_ids:
    shutil.copy(images_to_paths[image_id], output_folder / f"{image_id}.jpg")

print(f"Total non-duplicate images: {len(selected_images)}")
print(f"Output folder: {output_folder}")

total_end = time()

print(f"Total time: {total_end - total_start:.2f} seconds")

Total non-duplicate images: 0
Output folder: ../coil-100/NoDuplicates
Total time: 265.27 seconds


In [14]:
# for image_id_community in image_id_communities.values():
#     for image_id in image_id_community:
#         plt.figure()
#         plt.imshow(Image.open(images_to_paths[image_id]))

# for image_id in independent_image_ids:
#     plt.figure()
#     plt.imshow(Image.open(images_to_paths[image_id]))