In [1]:
import os
from pathlib import Path
import numpy as np
import torch
import clip
from PIL import Image
from tqdm import tqdm
import pandas as pd
from sklearn.decomposition import PCA
import umap
import matplotlib.pyplot as plt
import hdbscan
import shutil
import random

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def collect_images(folder_list, exts=(".jpg", ".jpeg", ".png", ".bmp", ".webp")):
    files = {}
    for folder in folder_list:
        folder = Path(folder)
        for f in folder.rglob("*"):
            if f.is_file() and f.suffix.lower() in exts:
                key = f.stem.lower()  # имя файла без расширения
                if key not in files:  # сохраняем только первую встреченную копию
                    files[key] = str(f)
    return list(files.values())


# 🔹 Пример использования
input_folders = [
    r"C:\Users\kuzga\OneDrive\Рабочий стол\CV_case_tbanc\data_sirius",
    r"C:\Users\kuzga\OneDrive\Рабочий стол\CV_case_tbanc\project_x-6",
]
image_paths = collect_images(input_folders)
print(f"Найдено {len(image_paths)} уникальных изображений")


Найдено 30019 уникальных изображений


In [3]:
def build_embeddings(image_paths, device="cuda"):
    model, preprocess = clip.load("ViT-B/32", device=device)
    embeddings = []

    for path in tqdm(image_paths, desc="Extracting CLIP embeddings"):
        try:
            img = preprocess(Image.open(path).convert("RGB")).unsqueeze(0).to(device)
            with torch.no_grad():
                emb = model.encode_image(img).cpu().numpy()
            embeddings.append(emb)
        except Exception as e:
            print(f"Error with {path}: {e}")
            embeddings.append(np.zeros((1, 512)))
    return np.vstack(embeddings)


embeddings = build_embeddings(image_paths, device="cuda")

Extracting CLIP embeddings: 100%|██████████| 30019/30019 [11:31<00:00, 43.40it/s]


In [4]:
def cluster_embeddings(embeddings, pca_dim=50, min_cluster_size=20):
    # PCA для сжатия
    X_reduced = PCA(n_components=pca_dim).fit_transform(embeddings)

    # HDBSCAN для адаптивного числа кластеров
    clusterer = hdbscan.HDBSCAN(
        min_cluster_size=min_cluster_size,
        metric="euclidean"
    )
    labels = clusterer.fit_predict(X_reduced)

    return labels, X_reduced

labels, X_reduced = cluster_embeddings(embeddings, pca_dim=50, min_cluster_size=20)



In [5]:
def visualize_clusters(X_reduced, labels, out_path="clusters_umap.png"):
    reducer = umap.UMAP(n_components=2, random_state=42)
    X_2d = reducer.fit_transform(X_reduced)

    plt.figure(figsize=(10, 8))
    plt.scatter(X_2d[:, 0], X_2d[:, 1], c=labels, s=5, cmap="Spectral")
    plt.colorbar()
    plt.title("Image clusters (UMAP + HDBSCAN)")
    plt.savefig(out_path, dpi=200)
    plt.close()
    return X_2d

visualize_clusters(X_reduced, labels, out_path="clusters_umap.png")

  warn(


array([[-3.5450575, 10.450406 ],
       [13.556788 , 12.989753 ],
       [11.608732 , 15.289379 ],
       ...,
       [ 6.6778045, 17.796019 ],
       [ 6.5340757, 17.54287  ],
       [ 7.7417636,  7.302668 ]], shape=(30019, 2), dtype=float32)

In [6]:
def save_clusters(image_paths, labels, out_dir="clusterized_images"):
    os.makedirs(out_dir, exist_ok=True)
    for i, (path, label) in enumerate(zip(image_paths, labels)):
        cluster_folder = Path(out_dir) / f"cluster_{label}"
        cluster_folder.mkdir(parents=True, exist_ok=True)
        try:
            shutil.copy(path, cluster_folder / f"{i}_{Path(path).name}")
        except Exception as e:
            print(f"Copy error: {path} -> {e}")

save_clusters(image_paths, labels, out_dir="clusterized_images")

In [7]:
def show_cluster_samples(image_paths, labels, samples_per_cluster=4, out_path="cluster_samples.png"):
    df = pd.DataFrame({"path": image_paths, "cluster": labels})
    unique_clusters = sorted(df["cluster"].unique())

    n_clusters = len(unique_clusters)
    fig, axes = plt.subplots(n_clusters, samples_per_cluster, figsize=(samples_per_cluster * 3, n_clusters * 3))

    if n_clusters == 1:
        axes = [axes]  # если всего один кластер

    for row, cluster_id in enumerate(unique_clusters):
        cluster_images = df[df["cluster"] == cluster_id]["path"].tolist()
        chosen = random.sample(cluster_images, min(samples_per_cluster, len(cluster_images)))

        for col in range(samples_per_cluster):
            ax = axes[row][col] if n_clusters > 1 else axes[col]
            ax.axis("off")

            if col < len(chosen):
                img = Image.open(chosen[col]).convert("RGB")
                ax.imshow(img)
            else:
                ax.imshow(np.ones((10, 10, 3)))  # пустая заглушка

        # Название кластера над строкой
        fig.text(0.5, 1 - (row + 0.5) / n_clusters, f"Cluster {cluster_id}",
                 ha="center", va="center", fontsize=12, weight="bold")

    plt.tight_layout()
    plt.savefig(out_path, dpi=200)
    plt.close()

show_cluster_samples(image_paths, labels, samples_per_cluster=4, out_path="cluster_samples.png")