In [None]:
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms

from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering, SpectralClustering
from sklearn.decomposition import PCA
from sklearn.metrics import silhouette_score, calinski_harabasz_score
from PIL import ImageFilter, Image

import numpy as np
import os
import warnings

warnings.filterwarnings("ignore", category=FutureWarning, module="sklearn.cluster._kmeans")

In [None]:
NUM_CLUSTERS = 5
USE_GPU = False
REDUCE_DIMENSIONS = True
PCA_COMPONENTS = 141

In [None]:
def load_feature_extractor(use_gpu):
    print("Cargando modelo ResNet50 preentrenado...")

    # Cargar ResNet50 preentrenado
    model = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1)

    modules = list(model.children())[:-1]
    feature_extractor = nn.Sequential(*modules)

    feature_extractor.eval()

    device = torch.device("cuda" if use_gpu and torch.cuda.is_available() else "cpu")
    feature_extractor.to(device)
    print(f"Usando dispositivo: {device}")

    return feature_extractor, device

In [None]:
def get_preprocess_transform():
    return transforms.Compose([
        transforms.Resize((224, 224)),  # Redimensionar a 224x224
        transforms.ToTensor(),
    ])

In [None]:
def extract_features(pil_image_list, model, device, preprocess):
    features = []
    model.eval()

    print(f"Extrayendo características de {len(pil_image_list)} imágenes...")
    with torch.no_grad():
        for i, img_pil in enumerate(pil_image_list):
            if img_pil.mode != "RGB":
                try:
                    img_pil = img_pil.convert("RGB")
                except Exception as e:
                    print(f"Advertencia: No se pudo convertir la imagen en el índice {i} a RGB. Se omitirá. Error: {e}")
                    continue

            try:
                img_tensor = preprocess(img_pil)
                img_batch = img_tensor.unsqueeze(0).to(device)
            except Exception as e:
                print(f"Advertencia: No se pudo preprocesar la imagen en el índice {i}. Se omitirá. Error: {e}")
                continue

            try:
                output = model(img_batch)
                feature_vector = torch.flatten(output, 1)
                features.append(feature_vector.cpu().numpy().squeeze())
            except Exception as e:
                print(f"Advertencia: No se pudieron extraer características para la imagen en el índice {i}. Se omitirá. Error: {e}")
                continue

            if (i + 1) % 50 == 0:
                print(f"  Procesadas {i + 1}/{len(pil_image_list)} imágenes...")

    print("Extracción de características completada.")
    if not features:
        print("Error: No se extrajeron características. Verifica la carga y preprocesamiento de imágenes.")
        return None
    return np.array(features)

In [None]:
def perform_clustering(features, n_clusters, use_pca=False, pca_components=None):
    if features is None or features.shape[0] < n_clusters:
        print("Error: No hay suficientes vectores de características válidos para realizar clustering.")
        return None, None

    print(f"Realizando clustering K-Means con k={n_clusters}...")

    if use_pca:
        if pca_components is None or pca_components >= features.shape[1] or pca_components <= 0:
            print(f"Advertencia: Valor inválido para pca_components ({pca_components}). Usando configuración predeterminada o se omitirá PCA.")
            pca = PCA(n_components=min(features.shape[0], features.shape[1] // 2), random_state=42)
        else:
            print(f"Aplicando PCA para reducir dimensiones a {pca_components}...")
            pca = PCA(n_components=pca_components, random_state=42)

        try:
            features_reduced = pca.fit_transform(features)
            print(f"Varianza explicada por PCA: {np.sum(pca.explained_variance_ratio_):.4f}")
            features_to_cluster = features_reduced
        except ValueError as e:
            print(f"Error durante PCA: {e}. Se realizará clustering sobre las características originales.")
            features_to_cluster = features
    else:
        features_to_cluster = features

    model = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
    # model = AgglomerativeClustering(n_clusters=n_clusters, metric='euclidean', linkage='ward')
    try:
        cluster_labels = model.fit_predict(features_to_cluster)
        print("Clustering completado.")
        return cluster_labels, features_to_cluster
    except Exception as e:
        print(f"Error durante el clustering K-Means: {e}")
        return None, None

# Dataset Boold Cell

In [None]:
image_dir = "./storage/clean/blood_cell/segmenter"
pil_images = []
image_filenames = []

print(f"Cargando imágenes desde: {image_dir}")
valid_extensions = (".jpg", ".jpeg", ".png", ".bmp", ".gif", ".tiff")
for filename in sorted(os.listdir(image_dir)):
    if filename.lower().endswith(valid_extensions):
        try:
            img_path = os.path.join(image_dir, filename)
            img = Image.open(img_path)
            pil_images.append(img)
            image_filenames.append(filename)
        except Exception as e:
            print(f"Advertencia: No se pudo cargar la imagen {filename}. Error: {e}")
print(f"{len(pil_images)} imágenes cargadas exitosamente.")

if not pil_images:
    print("No se cargaron imágenes. Terminando ejecución.")
    exit()

# 1. Cargar modelo y herramientas de preprocesamiento
feature_extractor, device = load_feature_extractor(use_gpu=USE_GPU)
preprocess_transform = get_preprocess_transform()

# 2. Extraer características
extracted_features = extract_features(pil_images, feature_extractor, device, preprocess_transform)

if extracted_features is not None:
    print(f"Forma de las características extraídas: {extracted_features.shape}")

    # 3. Realizar clustering
    cluster_labels, features_for_eval = perform_clustering(
        extracted_features,
        NUM_CLUSTERS,
        use_pca=REDUCE_DIMENSIONS,
        pca_components=PCA_COMPONENTS
    )

    # 4. Mostrar resultados
    if cluster_labels is not None:
        print("\n--- Resultados del Clustering ---")
        results = {}
        for i, label in enumerate(cluster_labels):
            if label not in results:
                results[label] = []
            img_identifier = image_filenames[i] if i < len(image_filenames) else f"Imagen {i}"
            results[label].append(img_identifier)

        print("\n--- Resumen de Clusters ---")
        for cluster_id in sorted(results.keys()):
            print(f"Cluster {cluster_id}: Contiene {len(results[cluster_id])} imágenes")

        if len(set(cluster_labels)) > 1 and len(cluster_labels) > len(set(cluster_labels)):
            try:
                silhouette_avg = silhouette_score(features_for_eval, cluster_labels)
                print(f"\nPuntaje Silhouette: {silhouette_avg:.4f}")
                print("(El puntaje Silhouette varía entre -1 y 1. Mayor es generalmente mejor.)")
            except Exception as e:
                print(f"\nNo se pudo calcular el Puntaje Silhouette: {e}")

            try:
                ch_score = calinski_harabasz_score(features_for_eval, cluster_labels)
                print(f"Índice Calinski-Harabasz (CH): {ch_score:.2f}")
                print("(Valores más altos indican clusters mejor definidos - más densos y mejor separados.)")
            except Exception as e:
                print(f"\nNo se pudo calcular el Índice Calinski-Harabasz: {e}")
        else:
            print("\nNo se puede calcular el Puntaje Silhouette (requiere >1 cluster y >N muestras).")

    else:
        print("No se pudo completar el clustering.")
else:
    print("La extracción de características falló. No se puede continuar con el clustering.")

for img in pil_images:
    if hasattr(img, "close"):
        img.close()
print("\nProcesamiento finalizado.")

# Dataset CIFAR

In [None]:
from torchvision.datasets import CIFAR10

# Cargar CIFAR-10
print("Cargando CIFAR-10 desde torchvision...")
dataset = CIFAR10(root="./data", train=False, download=True)

# Extraer imágenes PIL y etiquetas
pil_images = [img for img, _ in dataset]
image_filenames = [f"CIFAR10_img_{i}" for i in range(len(pil_images))]

# 1. Cargar modelo y herramientas de preprocesamiento
feature_extractor, device = load_feature_extractor(use_gpu=USE_GPU)
preprocess_transform = get_preprocess_transform()

# 2. Extraer características
extracted_features = extract_features(pil_images, feature_extractor, device, preprocess_transform)

if extracted_features is not None:
    print(f"Forma de las características extraídas: {extracted_features.shape}")

    # 3. Realizar clustering
    cluster_labels, features_for_eval = perform_clustering(
        extracted_features,
        NUM_CLUSTERS,
        use_pca=REDUCE_DIMENSIONS,
        pca_components=PCA_COMPONENTS
    )

    # 4. Mostrar resultados
    if cluster_labels is not None:
        print("\n--- Resultados del Clustering ---")
        results = {}
        for i, label in enumerate(cluster_labels):
            if label not in results:
                results[label] = []
            img_identifier = image_filenames[i] if i < len(image_filenames) else f"Imagen {i}"
            results[label].append(img_identifier)

        print("\n--- Resumen de Clusters ---")
        for cluster_id in sorted(results.keys()):
            print(f"Cluster {cluster_id}: Contiene {len(results[cluster_id])} imágenes")

        if len(set(cluster_labels)) > 1 and len(cluster_labels) > len(set(cluster_labels)):
            try:
                silhouette_avg = silhouette_score(features_for_eval, cluster_labels)
                print(f"\nPuntaje Silhouette: {silhouette_avg:.4f}")
                print("(El puntaje Silhouette varía entre -1 y 1. Mayor es generalmente mejor.)")
            except Exception as e:
                print(f"\nNo se pudo calcular el Puntaje Silhouette: {e}")

            try:
                ch_score = calinski_harabasz_score(features_for_eval, cluster_labels)
                print(f"Índice Calinski-Harabasz (CH): {ch_score:.2f}")
                print("(Valores más altos indican clusters mejor definidos - más densos y mejor separados.)")
            except Exception as e:
                print(f"\nNo se pudo calcular el Índice Calinski-Harabasz: {e}")
        else:
            print("\nNo se puede calcular el Puntaje Silhouette (requiere >1 cluster y >N muestras).")

    else:
        print("No se pudo completar el clustering.")
else:
    print("La extracción de características falló. No se puede continuar con el clustering.")

for img in pil_images:
    if hasattr(img, "close"):
        img.close()
print("\nProcesamiento finalizado.")

Cargando CIFAR-10 desde torchvision...
Cargando modelo ResNet50 preentrenado...
Usando dispositivo: cpu
Extrayendo características de 10000 imágenes...
  Procesadas 50/10000 imágenes...
  Procesadas 100/10000 imágenes...
  Procesadas 150/10000 imágenes...
  Procesadas 200/10000 imágenes...
  Procesadas 250/10000 imágenes...
  Procesadas 300/10000 imágenes...
  Procesadas 350/10000 imágenes...
  Procesadas 400/10000 imágenes...
  Procesadas 450/10000 imágenes...
  Procesadas 500/10000 imágenes...
  Procesadas 550/10000 imágenes...
  Procesadas 600/10000 imágenes...
  Procesadas 650/10000 imágenes...
  Procesadas 700/10000 imágenes...
  Procesadas 750/10000 imágenes...
  Procesadas 800/10000 imágenes...
  Procesadas 850/10000 imágenes...
  Procesadas 900/10000 imágenes...
  Procesadas 950/10000 imágenes...
  Procesadas 1000/10000 imágenes...
  Procesadas 1050/10000 imágenes...
  Procesadas 1100/10000 imágenes...
  Procesadas 1150/10000 imágenes...
  Procesadas 1200/10000 imágenes...
  Pr