<a href="https://colab.research.google.com/github/eddieHerman-lab/Analise_Space_latent/blob/main/VAe_Ressearch_Space_latent_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Microscopic Analysis of the Latent Space: Heuristics for Interpretability, Authenticity, and Bias Detection in VAE Representations

**Author:** Eduardo Augusto Pires Hermanson
<br>
**Abstract:** This notebook contains the complete code implementation for the research paper of the same name. It covers data loading, model setup, the calculation of custom heuristics (Uniqueness, Originality, CLS), clustering analysis (UMAP + HDBSCAN), and the generation of all figures and results presented in the case study on the CelebA dataset.

This cell downloads the pre-trained VAE model weights from the Hugging Face Hub repository.

In [None]:
# Paste and execute this cell in your Colab notebook
!wget https://huggingface.co/hussamalafandi/VAE-CelebA/resolve/main/vae_celeba_latent_200_epochs_10_batch_64_subset_80000.pth

Those preliminary cells mounts your Google Drive to the Colab environment, allowing you to access files stored in your Drive.

In [None]:
# ==============================================================================
#  SETUP CELL  AND  REPRODUCIBLE (VIA KAGGLE API)
# ==============================================================================
import os

# Step 1: Upload your Kaggle API token
from google.colab import files
print("Por favor, faça o upload do seu arquivo 'kaggle.json' que você baixou do site do Kaggle.")
files.upload() #A window will open for you to select the file

# Step 2: Configure the API
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
print("\nAPI do Kaggle configurated successfully.")

# Step 3: Create the folder structure
os.makedirs('data', exist_ok=True)
print("Folder structure created.")

# Step 4: Download the complete dataset from Kaggle (ultra-fast)
# Este dataset já inclui img_align_celeba, list_attr_celeba.csv, e identity_CelebA.txt
print("\nDownloading Kaggle CelebA dataset ...")
!kaggle datasets download -d jessicali9530/celeba-dataset -p data/
print("Download concluded!")

# Step 5: Unzip the dataset into the 'data' folder
print("\nUnpacking files...")
!unzip -q data/celeba-dataset.zip -d data/
print("Dataset successfully unzipped into 'data/' folder.")

# Passo 6: Verificação Final
print("\nContents of the 'data/' folder:")
!ls -l data/

In [None]:
!pip install hdbscan

This cell unzips a zip file containing the CelebA dataset images from your Google Drive to the specified directory in the Colab environment. Make sure to adjust the path to your zip file if it's different.

## 1. Setup and Imports

 Definition of  the `EyeVAE` class, a convolutional variational autoencoder architecture. It includes the encoder to map images to a latent space, the reparameterization trick for sampling from the latent space, and the decoder to reconstruct images from the latent space. It also defines helper classes for `ComponentDecomposer`, `LatentSpaceAnalyzer`, `EntropicOriginalityMeasure`, and `TemporalStabilityAnalyzer` which implement various heuristic calculations for analyzing the latent space.

In [None]:
import torch.nn as nn
import torch
import torch.nn.functional as F
import torchvision.transforms as transforms
import numpy as np
import cv2
from PIL import Image
import matplotlib.pyplot as plt
from sklearn.metrics.pairwise import cosine_similarity, euclidean_distances
from sklearn.cluster import KMeans, DBSCAN
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
import os
import seaborn as sns
from scipy.spatial.distance import pdist, squareform
from scipy.stats import entropy, kurtosis # Import kurtosis
from sklearn.decomposition import FastICA, FactorAnalysis
from sklearn.preprocessing import StandardScaler
import networkx as nx
from scipy.spatial.distance import cdist
from scipy.signal import find_peaks
from sklearn.neighbors import NearestNeighbors
import hdbscan
import torch
import torch.nn as nn

class EyeVAE(nn.Module):
    """
    Convolutional version (CVAE) of EyeVAE, compatible with pre-trained models
    on images like CelebA.
    """
    def __init__(self, input_channels=3, latent_dim=200):
        super(EyeVAE, self).__init__()

        # --- ENCODER ---
        # Convolutions to extract spatial features from the image
        self.encoder = nn.Sequential(
            nn.Conv2d(input_channels, 32, kernel_size=4, stride=2, padding=1), # -> 32x32
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=1),            # -> 16x16
            nn.ReLU(),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),           # -> 8x8
            nn.ReLU(),
            nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1),          # -> 4x4
            nn.ReLU(),
            nn.Flatten() # Flatten the 4x4x256 feature map into a vector
        )

        # Linear layers to map extracted features to the latent space
        # Size 4096 comes from 256 * 4 * 4
        self.fc_mu = nn.Linear(256 * 4 * 4, latent_dim)
        self.fc_logvar = nn.Linear(256 * 4 * 4, latent_dim)

        # --- DECODER ---
        # Linear layer to prepare the latent vector for reconstruction
        self.decoder_input = nn.Linear(latent_dim, 256 * 4 * 4)

        # Transposed convolutions to "draw" the image back from features
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1), # -> 8x8
            nn.ReLU(),
            nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),  # -> 16x16
            nn.ReLU(),
            nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1),   # -> 32x32
            nn.ReLU(),
            nn.ConvTranspose2d(32, input_channels, kernel_size=4, stride=2, padding=1), # -> 64x64
            nn.Sigmoid() # Ensure output pixels are between 0 and 1
        )

    def encode(self, x):
        result = self.encoder(x)
        mu = self.fc_mu(result)
        logvar = self.fc_logvar(result)
        return mu, logvar

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def decode(self, z):
        result = self.decoder_input(z)
        result = result.view(-1, 256, 4, 4) # Reshape from flattened for convolutional layers
        result = self.decoder(result)
        return result

    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        return self.decode(z), mu, logvar

class ComponentDecomposer:
    """Decomposition of independent components for uniqueness analysis"""

    def __init__(self, embeddings):
        self.embeddings = embeddings
        self.ica_components = None
        self.factor_components = None
        self.unique_signatures = None

    def decompose_independent_factors(self, n_components=None, n_init_simulation=10):
        """
        Decomposition into independent factors, simulating n_init for greater robustness.
        """
        print(f"Performing independent component analysis (simulating n_init={n_init_simulation} times)...")

        if n_components is None:
            # Limit components to avoid issues with small datasets or high dimensionality
            n_components = min(20, self.embeddings.shape[1])

        scaler = StandardScaler()
        embeddings_scaled = scaler.fit_transform(self.embeddings)

        best_ica_components = None
        best_avg_kurtosis = -1  # Kurtosis of a Gaussian is 0, so we start below that


        for i in range(n_init_simulation):
            # We create a new instance in each loop for a new random initialization
            ica = FastICA(n_components=n_components,fun= 'cube', max_iter=8000, tol=1e-4,algorithm='deflation',whiten='unit-variance')
            try:
                ica_transformed = ica.fit_transform(embeddings_scaled)

                # We use kurtosis as a metric of "non-Gaussianity" to evaluate separation quality
                avg_kurtosis = np.mean(np.abs(kurtosis(ica_transformed, axis=0)))

                if avg_kurtosis > best_avg_kurtosis:
                    best_avg_kurtosis = avg_kurtosis
                    best_ica_components = ica_transformed
                    # print(f"  Run {i+1}/{n_init_simulation}: New best result found with average kurtosis of {avg_kurtosis:.4f}")

            except Exception as e:
                # Catch ConvergenceWarning or other errors without breaking the code
                # print(f"  Run {i+1}/{n_init_simulation}: Did not converge or encountered an error. Skipping.")
                continue

        if best_ica_components is None:
            print("WARNING: FastICA failed to converge in any of the attempts.")
            # As a fallback, run one last time with more iterations
            ica['max_iter'] = 15000 # Even more iterations for fallback
            ica = FastICA(**ica)
            try:
                self.ica_components = ica.fit_transform(embeddings_scaled)
            except Exception as e:
                 print(f"Fallback ICA also failed: {e}")
                 self.ica_components = embeddings_scaled[:, :n_components] # Use PCA components as last resort
                 print("Using PCA components as fallback.")

        else:
            self.ica_components = best_ica_components

        # Factor analysis can remain the same
        fa = FactorAnalysis(n_components=n_components, random_state=42)
        self.factor_components = fa.fit_transform(embeddings_scaled)

        return self.ica_components, self.factor_components

    def calculate_component_uniqueness(self):
        """Calculates uniqueness based on components"""
        if self.ica_components is None:
            print("ICA components not available. Run decompose_independent_factors first.")
            return None

        uniqueness_scores = []

        # Pre-calculate the full similarity matrix if ICA components are not too large
        if self.ica_components.shape[0] * self.ica_components.shape[0] < 1e8: # Avoids excessive memory usage
            full_similarity_matrix = cosine_similarity(self.ica_components)
        else:
            full_similarity_matrix = None
            print("Similarity matrix too large to pre-calculate. Calculating distances on the fly.")


        for i in range(len(self.ica_components)):
            component = self.ica_components[i]

            # Calculate "unique signature" based on:
            # 1. Entropy of components
            # Add a small epsilon to avoid log(0)
            component_entropy = entropy(np.abs(component) + 1e-8)

            # 2. Average distance to neighbors
            if full_similarity_matrix is not None:
                distances = 1 - full_similarity_matrix[i] # Use pre-calculated distances
            else:
                distances = cdist([component], self.ica_components)[0]

            # Remove distance to self (which is 0)
            distances = distances[distances > 1e-6] # Use a small threshold instead of 0 to handle floating point inaccuracies

            avg_distance = np.mean(distances) if len(distances) > 0 else 0

            # 3. Local variance
            local_variance = np.var(component)

            # Combined uniqueness score
            uniqueness = component_entropy *  (1 + avg_distance) * (1 + local_variance) # Use (1 + ...) to avoid multiplying by zero
            uniqueness_scores.append(uniqueness)

        return np.array(uniqueness_scores)

    def find_authentic_cores(self, threshold_percentile=80):
        """Finds potential authentic cores"""
        uniqueness = self.calculate_component_uniqueness()
        if uniqueness is None:
            return None

        # Authentic cores = high uniqueness + stability
        threshold = np.percentile(uniqueness, threshold_percentile)
        authentic_candidates = np.where(uniqueness > threshold)[0]

        # Temporal stability analysis (based on neighboring components)
        stable_cores = []
        # Re-calculate or use pre-calculated similarity if available
        if self.ica_components.shape[0] * self.ica_components.shape[0] < 1e8: # Avoids excessive memory usage
             full_similarity_matrix = cosine_similarity(self.ica_components)
        else:
             full_similarity_matrix = None
             print("Similarity matrix too large to pre-calculate for core validation. Calculating on the fly.")


        for candidate in authentic_candidates:
            # Calculate stability based on similar components
            component = self.ica_components[candidate]

            if full_similarity_matrix is not None:
                 similarities = full_similarity_matrix[candidate] # Use the pre-calculated row
            else:
                 similarities = cosine_similarity([component], self.ica_components)[0]


            # Stable core = few very close neighbors
            # Count neighbors above a similarity threshold, excluding the point itself
            close_neighbors = np.sum(similarities > 0.95) - 1 # Increased similarity threshold for 'very close'

            if close_neighbors <= 7:  # Isolated core = potentially authentic (adjusted threshold)
                stable_cores.append({
                    'index': candidate,
                    'uniqueness': uniqueness[candidate],
                    'neighbors': close_neighbors,
                    'component': component
                })

        return stable_cores
    #2part

class LatentSpaceAnalyzer:
    """Latent space analyzer to detect overlaps and patterns"""

    def __init__(self, embeddings, threshold_similarity=0.99):
        self.embeddings = embeddings
        self.threshold_similarity = threshold_similarity
        self.clusters = None
        self.density_map = None

    def calculate_density_map(self, k=10):
        """
        Calculates an optimized latent space density map using k-Nearest Neighbors.
        The density score is the inverse of the average distance to the k nearest neighbors.
        """
        print(f"Calculating optimized density map with k={k}...")

        # Ensure k is not greater than the number of samples
        if k >= len(self.embeddings):
            print(f"Warning: k ({k}) is greater than or equal to the number of samples ({len(self.embeddings)}). Adjusting k.")
            k = len(self.embeddings) - 1
            if k < 1:
                 print("Error: Not enough samples to calculate density.")
                 self.density_map = np.zeros(len(self.embeddings)) # Return zeros if no valid neighbors
                 return self.density_map


        # Configure the NearestNeighbors model. k+1 because it includes itself as a neighbor.
        # Increased leaf_size for potentially better performance with larger datasets
        neighbors_model = NearestNeighbors(n_neighbors=k + 1, algorithm='kd_tree', leaf_size=30, n_jobs=-1)
        neighbors_model.fit(self.embeddings)

        # Find the k+1 neighbors for all points at once.
        # 'distances' will be a matrix where each row contains the distances to the neighbors of that point.
        distances, indices = neighbors_model.kneighbors(self.embeddings)

        # The first column (index 0) is the distance from the point to itself (0.0), so we ignore it.
        # We calculate the average of the distances to the k true neighbors (indices 1 to k+1).
        # Add a small epsilon to the mean distance before taking the inverse to avoid division by zero
        mean_distances = np.mean(distances[:, 1:], axis=1) + 1e-8

        # Density is the inverse of the average distance.
        densities = 1.0 / mean_distances

        # Normalize densities to a 0-1 range for better comparison, optional
        # densities = (densities - np.min(densities)) / (np.max(densities) - np.min(densities) + 1e-8)


        self.density_map = np.array(densities)
        return self.density_map


    def find_potential_overlaps(self):
        """Finds potential overlaps in the latent space"""
        print("Searching for potential overlaps...")
        # Use euclidean_distances as it's often more intuitive for 'overlap' in space
        distances = euclidean_distances(self.embeddings)

        # Find pairs with small distance (excluding diagonal)
        np.fill_diagonal(distances, np.inf) # Fill diagonal with infinity so it's not picked up by < threshold
        close_pairs_indices = np.where(distances < (1 - self.threshold_similarity)) # Use distance threshold based on similarity

        overlaps = []
        for i, j in zip(close_pairs_indices[0], close_pairs_indices[1]):
            if i < j:  # avoid duplicates
                # Recalculate similarity for clarity in output if needed, or just report distance
                similarity = cosine_similarity([self.embeddings[i]], [self.embeddings[j]])[0][0]
                overlaps.append({
                    'pair': (i, j),
                    'similarity': similarity, # Keep similarity for context
                    'distance': distances[i, j]
                })

        # Sort by distance (smaller distance = higher overlap)
        return sorted(overlaps, key=lambda x: x['distance'])

    def cluster_analysis(self, min_cluster_size=0.5, min_samples=None, cluster_selection_epsilon=0.0, max_clusters=20):
        """
        Performs robust cluster analysis using HDBSCAN to find density regions.
        """

        # HDBSCAN is sensitive to scale, so standardizing data is good practice.
        scaler = StandardScaler()
        embeddings_scaled = scaler.fit_transform(self.embeddings)

        # Configure the HDBSCAN model.
        # min_cluster_size: The smallest group of points you consider a "cluster".
        # min_samples: Controls how conservative the algorithm is. If None, the default is equal to min_cluster_size.
        # cluster_selection_epsilon: Used to "flatten" the hierarchy and merge nearby clusters. 0.0 is a good default.
        clusterer = hdbscan.HDBSCAN(
            min_cluster_size=min_cluster_size,
            min_samples=min_samples,
            cluster_selection_epsilon=cluster_selection_epsilon,
            metric='euclidean',
            allow_single_cluster=True # Allow finding a single cluster if data is not well-separated
        )
        print(f"Performing robust cluster analysis with HDBSCAN (min_cluster_size={min_cluster_size})...")
        # Execute clustering.
        # Labels (-1 for noise, 0, 1, 2... for clusters) are assigned to each point.
        try:
            cluster_labels = clusterer.fit_predict(embeddings_scaled)
            self.clusters = cluster_labels

            # Useful information for analysis
            unique_clusters = set(cluster_labels)
            n_clusters = len(unique_clusters) - (1 if -1 in unique_clusters else 0)
            n_noise = list(cluster_labels).count(-1)

            print(f'HDBSCAN found {n_clusters} cluster(s) and {n_noise} noise point(s).')

            return self.clusters

        except Exception as e:
            print(f"Error during HDBSCAN clustering: {e}")
            print("Falling back to KMeans clustering (less robust to noise and density variations).")
            # Fallback to KMeans if HDBSCAN fails
            n_clusters_kmeans = min(max_clusters, len(self.embeddings) // min_cluster_size) # Estimate a reasonable number of clusters
            if n_clusters_kmeans < 2:
                 print("Not enough data points to form clusters.")
                 self.clusters = np.zeros(len(self.embeddings)) -1 # Label all as noise
                 return self.clusters

            kmeans = KMeans(n_clusters=n_clusters_kmeans, random_state=42, n_init=10) # Use n_init to improve robustness
            self.clusters = kmeans.fit_predict(embeddings_scaled)
            print(f'KMeans found {n_clusters_kmeans} cluster(s).')
            return self.clusters


    def _find_elbow(self, inertias):
        """Finds the elbow point for K-means"""
        # Simple method: largest second derivative difference
        if len(inertias) < 3:
            return 0

        # Ensure inertias are a numpy array for calculations
        inertias = np.array(inertias)
        diffs = np.diff(inertias)
        second_diffs = np.diff(diffs)
        # The elbow is where the rate of decrease changes most significantly, often at the peak of the second derivative
        # Add 1 to the index because diff reduces the array size by 1
        return np.argmax(second_diffs) + 1 # Return the index for the *number* of clusters


    def analyze_cluster_characteristics(self, attributes_df):
        """Analyzes the characteristics of the found clusters, including attribute distribution."""
        if self.clusters is None:
            print("Run cluster_analysis first!")
            return None

        unique_clusters = np.unique(self.clusters)
        cluster_stats = {}

        for cluster_id in unique_clusters:
            # Exclude noise points if using HDBSCAN
            if cluster_id == -1:
                continue

            cluster_points = self.embeddings[self.clusters == cluster_id]

            if len(cluster_points) == 0:
                 continue # Skip empty clusters

            centroid = np.mean(cluster_points, axis=0)

            # Calculate internal dispersion
            # Use cdist for potentially faster distance calculation
            distances_to_centroid = cdist([centroid], cluster_points)[0]


            cluster_stats[cluster_id] = {
                'size': len(cluster_points),
                'centroid': centroid,
                'avg_internal_distance': np.mean(distances_to_centroid),
                'max_internal_distance': np.max(distances_to_centroid),
                'compactness': np.std(distances_to_centroid) # Standard deviation as compactness measure
            }

            # Attribute Distribution (Assuming attributes_df is aligned with embeddings)
            cluster_attribute_means = attributes_df[self.clusters == cluster_id].mean()
            cluster_stats[cluster_id]['attribute_means'] = cluster_attribute_means

        return cluster_stats
#3part
class EntropicOriginalityMeasure:
    """Entropic originality measure - 'addressing' of attractors"""

    def __init__(self, embeddings):
        self.embeddings = embeddings
        self.attractor_map = None
        self.originality_scores = None

    def map_attractors(self, radius=0.01):
        """Maps attractors in the latent space"""
        print("Mapping attractors in the latent space...")

        # Use NearestNeighbors for efficient radius search
        # n_neighbors = len(self.embeddings) # Or a smaller number if performance is an issue
        # neighbors_model = NearestNeighbors(n_neighbors=n_neighbors, algorithm='auto', n_jobs=-1)
        # neighbors_model.fit(self.embeddings)
        # distances, indices = neighbors_model.kneighbors(self.embeddings)

        # Identify attractors as points with many close neighbors
        attractors = []
        # Use radius_neighbors for finding points within a given radius
        neighbors_model = NearestNeighbors(radius=radius, algorithm='auto', n_jobs=-1)
        neighbors_model.fit(self.embeddings)
        # Find all points within the specified radius for each point
        radius_neighbors = neighbors_model.radius_neighbors(self.embeddings)


        for i in range(len(self.embeddings)):
            # Get indices of neighbors within the radius
            neighbor_indices = radius_neighbors[0][i]
            # Exclude the point itself
            num_neighbors = len(neighbor_indices) - 1

            density = num_neighbors / len(self.embeddings)

            attractors.append({
                'index': i,
                'neighbors': num_neighbors,
                'density': density,
                'position': self.embeddings[i]
            })

        # Order by density (stronger attractors first)
        self.attractor_map = sorted(attractors, key=lambda x: x['density'], reverse=True)
        return self.attractor_map

    def calculate_entropic_originality(self):
        """Calculates entropic originality - measures unique 'addressing'"""
        if self.attractor_map is None:
            self.map_attractors()

        originality_scores = []

        for point_data in self.attractor_map:
            i = point_data['index']
            point = self.embeddings[i]

            # 1. Local entropy (diversity in the neighborhood)
            # Use NearestNeighbors to find k nearest neighbors efficiently
            k = min(10, len(self.embeddings) - 1) # Ensure k is valid
            if k <= 0: # Handle case with very few embeddings
                 spatial_entropy = 0
                 neighbor_distances = []
            else:
                neighbors_model = NearestNeighbors(n_neighbors=k + 1, algorithm='auto', n_jobs=-1)
                neighbors_model.fit(self.embeddings)
                distances, indices = neighbors_model.kneighbors([point])
                neighbor_distances = distances[0][1:] # Exclude distance to self

                # Entropy of distances (spatial diversity)
                # Ensure bins are appropriate for the range of distances
                if len(neighbor_distances) > 1:
                     hist, _ = np.histogram(neighbor_distances, bins='auto') # Use 'auto' for better bin selection
                     spatial_entropy = entropy(hist + 1e-8)
                else:
                     spatial_entropy = 0 # Cannot calculate entropy with only one distance


            # 2. Spectral entropy (diversity in components)
            # Add a small epsilon to avoid log(0)
            spectral_entropy = entropy(np.abs(point) + 1e-8)

            # 3. "Addressing" - how unique is this point
            # Based on information theory: more unique points have higher entropy
            addressing_score = spatial_entropy * spectral_entropy

            # 4. Isolation factor (distance to the nearest attractor)
            if len(self.attractor_map) > 1:
                # Find the distance to the nearest *other* attractor efficiently
                other_attractor_positions = np.array([a['position'] for a in self.attractor_map if a['index'] != i])
                if len(other_attractor_positions) > 0:
                    distances_to_other_attractors = cdist([point], other_attractor_positions)[0]
                    min_attractor_dist = np.min(distances_to_other_attractors)
                else:
                    min_attractor_dist = 1.0 # Default if no other attractors

                isolation_factor = min_attractor_dist
            else:
                isolation_factor = 1.0 # Default if only one attractor

            # Final originality score
            originality = addressing_score * (1 + isolation_factor) # Use (1 + ...) for better scaling
            originality_scores.append(originality)

        # Ensure the order of scores matches the original embeddings, not the sorted attractor_map
        # Create a mapping from attractor_map index to original index
        original_indices = [a['index'] for a in self.attractor_map]
        # Create an array for scores ordered by original index
        ordered_originality_scores = np.zeros(len(self.embeddings))
        for i, original_idx in enumerate(original_indices):
             ordered_originality_scores[original_idx] = originality_scores[i]


        self.originality_scores = ordered_originality_scores
        return self.originality_scores

    def identify_authentic_signatures(self, top_k=10):
        """Identifies potential authentic signatures"""
        if self.originality_scores is None:
            self.calculate_entropic_originality()

        # Authentic candidates = high originality + specific density range
        combined_scores = []
        # Ensure we have a density map available, calculate if necessary
        if self.attractor_map is None:
             self.map_attractors()

        density_map_dict = {a['index']: a['density'] for a in self.attractor_map}


        for i, score in enumerate(self.originality_scores):
            # Get density for the original index
            density = density_map_dict.get(i, 0) # Get density using original index, default to 0 if somehow not found

            # Balance originality vs. isolation
            # Authentics should have high originality but not be too isolated
            # Adjusted density range
            if density > 0.005 and density < 0.15:  # Adjusted sweet spot
                combined_scores.append((i, score, density))

        # Order by originality score
        combined_scores.sort(key=lambda x: x[1], reverse=True)

        # Return top_k or fewer if not enough candidates
        return combined_scores[:min(top_k, len(combined_scores))]


class TemporalStabilityAnalyzer:
    """Temporal stability analysis to validate authenticity"""

    def __init__(self, embeddings):
        self.embeddings = embeddings
        self.stability_scores = None

    def simulate_temporal_variations(self, num_variations=10, noise_level=0.005): # Reduced noise level
        """Simulates temporal variations to test stability"""
        print("Simulating temporal variations...")

        stability_scores = []

        for i, base_embedding in enumerate(self.embeddings):
            # Generate temporal variations (simulating video frames)
            variations = []
            for _ in range(num_variations):
                # Use a more realistic noise distribution or add structured noise if applicable
                noise = np.random.normal(0, noise_level, base_embedding.shape)
                variation = base_embedding + noise
                variations.append(variation)

            variations = np.array(variations)

            # Calculate stability as consistency of variations
            # 1. Variance of variations
            # Use mean of variance across dimensions
            variation_variance = np.mean(np.var(variations, axis=0))

            # 2. Directional coherence
            mean_variation = np.mean(variations, axis=0)
            # Ensure embeddings are not zero vectors before calculating cosine similarity
            if np.linalg.norm(base_embedding) > 1e-6 and np.linalg.norm(mean_variation) > 1e-6:
                 directional_consistency = cosine_similarity([base_embedding], [mean_variation])[0][0]
            else:
                 directional_consistency = 0 # Or some other appropriate value


            # 3. Stability score
            # Add a small value to the denominator to avoid division by zero if variance is zero
            stability = directional_consistency / (1 + variation_variance + 1e-8)
            stability_scores.append(stability)

        self.stability_scores = np.array(stability_scores)
        return self.stability_scores

    def validate_authentic_cores(self, candidate_cores):
        """Validates authentic cores through temporal stability"""
        if self.stability_scores is None:
            self.simulate_temporal_variations()

        validated_cores = []

        # Ensure stability scores are available for all original indices
        if len(self.stability_scores) != len(self.embeddings):
             print("Stability scores size mismatch with embeddings. Recalculating stability scores.")
             self.simulate_temporal_variations() # Recalculate if mismatch


        for core in candidate_cores:
            core_index = core['index']
            # Ensure the index is valid for the stability scores array
            if core_index < 0 or core_index >= len(self.stability_scores):
                 print(f"Warning: Core index {core_index} is out of bounds for stability scores.")
                 continue # Skip this core if index is invalid


            stability = self.stability_scores[core_index]

            # Authentic cores should have high temporal stability
            # Adjusted percentile threshold
            stability_threshold = np.percentile(self.stability_scores, 50) # Use 50th percentile as a more central threshold
            if stability > stability_threshold:
                core['temporal_stability'] = stability
                # Combine uniqueness and stability for a validation score
                core['validation_score'] = core['uniqueness'] * stability
                validated_cores.append(core)

        # Order by validation score
        validated_cores.sort(key=lambda x: x['validation_score'], reverse=True)

        return validated_cores

In [None]:
# ==============================================================================
# ESTRUCTURE SETUP PROJECT FOLDERS
# This cell simulates the final structure of the repository on Github
# ==============================================================================
import os

# Criar as pastas principais
os.makedirs('data', exist_ok=True)
os.makedirs('pretrained_models', exist_ok=True)
os.makedirs('figures', exist_ok=True)

print("Folder Structures ('data/', 'pretrained_models/', 'figures/') was sucessfully created !")
print("PLease upload the necessary files for the correct folders using te left panel.")

 Download of the pre-trained VAE model weights from Hugging Face and loads them into the defined `EyeVAE` architecture. It then saves the loaded model weights to your Google Drive for persistent storage.

In [None]:
import torch
from huggingface_hub import hf_hub_download
# Remember that the definition of the EyeVAE class must be in a previous cell
# from model import EyeVAE

# --- Downloading the pre-trained model file ---
print("Downloading the pre-trained model from Hugging Face Hub...")
model_path = hf_hub_download(
    repo_id="hussamalafandi/VAE-CelebA",
    filename="vae_celeba_latent_200_epochs_10_batch_64_subset_80000.pth"
)
print(f"Download complete. Model saved to: {model_path}")

# --- Creating our architecture and loading weights ---
print("\nLoading pre-trained weights into our model...")

# We create an instance of our VAE with the correct architecture for the pre-trained model
# The pre-trained model was trained on color images (3 channels)
# and has a latent dimension of 200.
latent_dim_pretrained = 200
input_channels_celeba = 3
# Creating an instance of the new CVAE model. It is simpler and more direct.
our_architecture = EyeVAE(input_channels = input_channels_celeba, latent_dim = latent_dim_pretrained)

# We load the weights from the downloaded file into our model instance
# map_location='cpu' ensures it works even without an active GPU, but it will use the GPU if available
pretrained_weights = torch.load(model_path, map_location=torch.device('cuda' if torch.cuda.is_available() else 'cpu'))
our_architecture.load_state_dict(pretrained_weights)

print("\nPre-trained model loaded successfully!")

# The variable `our_architecture` now contains the VAE ready to be used.

# Assuming 'our_architecture' is the variable containing your loaded VAE model

print("\nSaving the pre-trained model weights...")

# --- STEP 1: SAVE LOCALLY FIRST ---
# Defines a path in the local and temporary Colab environment.
local_save_path = "/content/vae_celeba_pretrained_weights.pth"

# Saves the model's "state dictionary" (the weights) to this local path.
torch.save(our_architecture.state_dict(), local_save_path)
print(f"Model saved successfully locally to: {local_save_path}")


# --- STEP 2: MOVE THE FILE TO GOOGLE DRIVE ---
# Defines the final path in your Google Drive
final_drive_path = "/content/pretrained_models/vae_celeba_pretrained_weights.pth"

# Uses the terminal command 'mv' (move) to transfer the file.
# The '!' at the beginning tells Colab this is a terminal command.
print(f"Moving the file to: {final_drive_path}")
!mv "{local_save_path}" "{final_drive_path}"

print("\nModel transferred to Google Drive successfully!")

Defining the data transformations to preprocess the CelebA images (resizing, converting to tensor, and normalizing). It then loads the CelebA dataset using `ImageFolder` and creates a subset of 10,000 images for faster processing. Finally, it creates a `DataLoader` to efficiently feed batches of images to the model.

In [None]:
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Subset

# --- STEP 1: Define the Transformations ---
# Defines the sequence of preprocessing that each image will undergo.
data_transforms = transforms.Compose([
    # 1. Resizes all images to 64x64 pixels.
    transforms.Resize((64, 64)),
    # 2. Converts the PIL image to a PyTorch tensor.
    transforms.ToTensor(),
    # 3. Normalizes pixel values to the range [-1, 1].
    #    (This is a common practice for VAEs pre-trained on images).
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

# --- STEP 2: Load the Dataset ---
# PyTorch's ImageFolder is smart: it finds all images in the folder we specify.
# The path '/content/celeba_images/img_align_celeba/' should be correct.
full_dataset = datasets.ImageFolder(root='/content/data', transform=data_transforms)
print(f"Full dataset found with {len(full_dataset)} images.")

# --- STEP 3: Create a Subset (for faster processing) ---
# Working with 200,000 images is slow. Let's take a subset of 10,000 for our analysis.
subset_indices = list(range(10000))
subset_dataset = Subset(full_dataset, subset_indices)
print(f"Subset created with {len(subset_dataset)} images.")

# --- STEP 4: Create the DataLoader ---
# The DataLoader is what will feed the model with images in batches.
# A batch_size of 64 is a good starting point.
data_loader = DataLoader(subset_dataset, batch_size=64, shuffle=False) # shuffle=False to maintain order

print("\nDataLoader ready to be used!")

This cell uses the loaded VAE model to generate latent space embeddings for the subset of CelebA images. It iterates through the data loader, passes the images through the encoder to obtain the mean (`mu`) and log-variance (`logvar`) of the latent distribution, and stores the `mu` vectors as the embeddings.

In [None]:
from tqdm import tqdm
import numpy as np

# Make sure your model and dataloader are defined in previous cells
# our_architecture = ...
# data_loader = ...

# --- Preparation ---
# Define the device (GPU, if available)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
our_architecture.to(device)
our_architecture.eval() # Set the model to evaluation mode

# List to store all generated embeddings
real_embeddings = []

print(f"Starting the generation of embeddings for {len(subset_dataset)} images...")

# --- The Generation Loop ---
# The torch.no_grad() block disables gradient calculation, making everything faster
with torch.no_grad():
    # tqdm gives us a progress bar to track the process
    for images, _ in tqdm(data_loader, desc="Processing Batches"):
        # Move the batch of images to the GPU
        images = images.to(device)

        # Pass the images through the encoder to get mu and logvar
        mu, logvar = our_architecture.encode(images)

        # The 'mu' vector is the central representation of the point in the latent space.
        # We move it back to the CPU and add it to our list.
        real_embeddings.append(mu.cpu())

# --- Final Consolidation ---
# Concatenate all batches of embeddings into a single giant tensor
real_embeddings_tensor = torch.cat(real_embeddings, dim=0)

# Convert the final tensor to a NumPy array, our standard format for analysis
real_embeddings_numpy = real_embeddings_tensor.numpy()

print("\nProcess complete!")
print(f"Shape of our real data latent space: {real_embeddings_numpy.shape}")

 Performs  an initial heuristic analysis on the generated latent space embeddings. It uses the `ComponentDecomposer` (with ICA as a robust method) to get principal components, the `LatentSpaceAnalyzer` to calculate the density map and perform HDBSCAN clustering on the components. Finally, it visualizes the clusters in the 2D principal component space.

In [None]:
# The variable `real_embeddings_numpy` already exists from the previous cell, containing the 10,000 embeddings.

print(f"\nStarting heuristic analysis on the real data latent space ({real_embeddings_numpy.shape})...")
print("-" * 50)

# --- STEP 1: COMPONENT DECOMPOSITION (Using ICA) ---
# We use the class we already defined, feeding it the new data.
# Remember, we decided to use PCA for being more robust.
decomposer = ComponentDecomposer(real_embeddings_numpy)
principal_components, _ = decomposer.decompose_independent_factors()
print("Principal component decomposition complete.")

# --- STEP 2: TOPOLOGY ANALYSIS (Density and Clusters) ---
# We feed the analyzer with the decomposed components.
analyzer = LatentSpaceAnalyzer(principal_components)
density_map = analyzer.calculate_density_map()
clusters = analyzer.cluster_analysis(min_cluster_size=15) # We can adjust min_cluster_size for real data.
print("Density and cluster analysis complete.")

# --- STEP 3: AUTHENTICITY ANALYSIS AND VISUALIZATION ---
# Here, we can run the rest of the pipeline and visualize the results.
# (This part is more for exploration, as we don't have 'ground truth' here)

print("\nInitial analysis with real data complete!")
print("Now we have the components, density map, and clusters to investigate.")

# Example of how to visualize the new results (optional)
import matplotlib.pyplot as plt
plt.scatter(principal_components[:, 0], principal_components[:, 1], c=clusters, cmap='viridis', s=1)
plt.title('Clusters in the Component Space of Real Data (CelebA)')
plt.show()

 "Zoom-in" analysis on the cluster with label 0 (the yellow cluster in the previous plot). It finds the indices of the images belonging to this cluster within the subset and then loads and displays a grid of these images to visually inspect their characteristics.

In [None]:
import matplotlib.pyplot as plt
from PIL import Image
import numpy as np

# Assuming the variables 'clusters' and 'subset_indices' from previous cells exist.
# 'clusters' is the result array from HDBSCAN.
# 'subset_dataset' is our dataset of 10,000 images.

print("Starting the 'zoom-in' analysis of the found cluster...")

# --- Step 1: Find the indices of our cluster ---
# We find the indices WITHIN THE SUBSET of 10,000 that belong to cluster 0 (the yellow point).
indices_in_subset = np.where(clusters == 0)[0]

if len(indices_in_subset) > 0:
    print(f"Found {len(indices_in_subset)} point(s) in cluster 0.")

    # --- Step 2: Load and display the corresponding images ---
    # We need the base path for the images
    base_image_path = "/content/data/"

    # Get the image filenames from our subset
    filenames = [full_dataset.samples[i][0] for i in subset_indices]

    # Create a grid of subplots to display the images
    num_images = len(indices_in_subset)
    # Adjust the grid size for visualization
    cols = 5
    rows = int(np.ceil(num_images / cols))

    fig, axes = plt.subplots(rows, cols, figsize=(cols * 3, rows * 3))
    axes = axes.flatten() # Flatten the 2D grid into a 1D array for easier iteration

    for i, idx in enumerate(indices_in_subset):
        # Get the full image path
        image_path = filenames[idx]

        # Open the image
        img = Image.open(image_path)

        # Plot the image in the corresponding subplot
        axes[i].imshow(img)
        axes[i].set_title(f"Index: {idx}")
        axes[i].axis('off') # Remove x and y axes

    # Hide the subplots that were not used
    for j in range(i + 1, len(axes)):
        axes[j].axis('off')

    plt.suptitle("Images Belonging to the 'Yellow' Cluster (Label 0)", fontsize=16)
    plt.tight_layout(rect=[0, 0, 1, 0.96])
    plt.show()

else:
    print("No cluster with label 0 was found to visualize.")

 Deep comparative analysis by initializing instances of all the defined heuristic analyzers (`ComponentDecomposer`, `LatentSpaceAnalyzer`, `EntropicOriginalityMeasure`, `TemporalStabilityAnalyzer`). It then performs topology analyses (density and clustering) and prepares to compare groups of images based on their heuristic scores.

In [None]:
import matplotlib.pyplot as plt
from PIL import Image
import numpy as np

# --- Requirements ---
# Make sure these variables from your main analysis already exist:
# - clusters: The HDBSCAN result array.
# - subset_dataset: Our dataset of 10,000 images.
# - full_dataset: The full CelebA dataset.
# - principal_components: The PCA components.
# - authenticity_decomposer: The instance of your decomposition class.
# - entropy_analyzer: The instance of your entropy class.
# - temporal_analyzer: The instance of your stability class.

print("--- STARTING DEEP COMPARATIVE ANALYSIS ---")


print(f"\nStarting heuristic analysis on the real data latent space ({real_embeddings_numpy.shape})...")
print("-" * 50)

# --- STEP 1: Create ALL instances of our analyzers ---

print("Initializing analysis modules...")
# The decomposer uses the raw embeddings
decomposer = ComponentDecomposer(real_embeddings_numpy)
principal_components, _ = decomposer.decompose_independent_factors()

# The topology analyzer uses the decomposed components
analyzer = LatentSpaceAnalyzer(principal_components)

# The other heuristics also operate on the components for consistency
entropy_analyzer = EntropicOriginalityMeasure(principal_components)
temporal_analyzer = TemporalStabilityAnalyzer(principal_components)

print("Modules initialized.")

# --- STEP 2: Run the main analyses ---

print("\nExecuting topology analyses...")
density_map = analyzer.calculate_density_map()
clusters = analyzer.cluster_analysis(min_cluster_size=30) # Using the value we were testing

print("\nInitial analysis with real data complete!")


# --- Step 1: Select the Groups ---
yellow_cluster_indices = np.where(clusters == 0)[0]
purple_cloud_indices = np.where(clusters == -1)[0]

# As you brilliantly suggested, let's take a random sample from the purple cloud
# to be our control group.
np.random.shuffle(purple_cloud_indices)
purple_cloud_control_indices = purple_cloud_indices[:len(yellow_cluster_indices)] # Take the same number of samples

print(f"Group A (Yellow Cluster): {len(yellow_cluster_indices)} samples")
print(f"Group B (Purple Cloud Control): {len(purple_cloud_control_indices)} samples")

# --- Step 2: Comparative Visual Analysis ---
def plot_image_group(group_indices, graph_title):
    if len(group_indices) == 0: return

    filenames = [full_dataset.samples[i][0] for i in subset_indices]

    num_images = len(group_indices)
    cols = 5
    rows = int(np.ceil(num_images / cols))

    fig, axes = plt.subplots(rows, cols, figsize=(cols * 3, rows * 3))
    axes = axes.flatten()

    for i, idx in enumerate(group_indices):
        image_path = filenames[idx]
        img = Image.open(image_path)
        axes[i].imshow(img)
        axes[i].set_title(f"Index: {idx}")
        axes[i].axis('off')

    for j in range(i + 1, len(axes)): axes[j].axis('off')
    plt.suptitle(graph_title, fontsize=16)
    plt.tight_layout(rect=[0, 0, 1, 0.96])
    plt.show()

print("\n[Visualization] Group A - Images from the Yellow Cluster:")
plot_image_group(yellow_cluster_indices, "Images from the Yellow Cluster (Group A)")

print("\n[Visualization] Group B - Random Samples from the Purple Cloud:")
plot_image_group(purple_cloud_control_indices, "Images from the Control Group (Purple Cloud - Group B)")


# --- Step 3: Comparative Heuristic Analysis (The Microscopic Analysis) ---
print("\n--- NUMERICAL COMPARISON OF HEURISTICS ---")

# We need the scores of all samples, which have already been calculated
uniqueness_scores = decomposer.calculate_component_uniqueness()
originality_scores = entropy_analyzer.calculate_entropic_originality()
stability_scores = temporal_analyzer.simulate_temporal_variations()

# Calculate means and standard deviations for each group
group_a_scores = {
    "Uniqueness": uniqueness_scores[yellow_cluster_indices],
    "Originality": originality_scores[yellow_cluster_indices],
    "Stability": stability_scores[yellow_cluster_indices]
}
group_b_scores = {
    "Uniqueness": uniqueness_scores[purple_cloud_control_indices],
    "Originality": originality_scores[purple_cloud_control_indices],
    "Stability": stability_scores[purple_cloud_control_indices]
}

print("\nAVERAGE SCORES (Mean ± Standard Deviation):")
print("-" * 40)
print(f"| Heuristic       | Group A (Yellow)         | Group B (Purple)         |")
print("-" * 40)
for key in group_a_scores:
    mean_a = np.mean(group_a_scores[key])
    std_a = np.std(group_a_scores[key])
    mean_b = np.mean(group_b_scores[key])
    std_b = np.std(group_b_scores[key])
    print(f"| {key:<15} | {mean_a:8.2f} ± {std_a:7.2f} | {mean_b:8.2f} ± {std_b:7.2f} |")
print("-" * 40)

Introducing the `identity_CelebA.txt` file, which maps image filenames to identity IDs. It then counts how many images each identity has and identifies identities with at least 50 images, printing the top 5 identities with the most images.

In [None]:
import pandas as pd

# 1) Load the filename → identity_id mapping
# Each line in identity_file has something like: img_000001.jpg 100
identity_file = "/content/data/identity_CelebA.txt"
df_id = pd.read_csv(identity_file, sep=" ", names=["image_id","identity"])

# 2) Count how many images each identity has
counts = df_id["identity"].value_counts()

# 3) Filter those with >= 50 images
large_identities = counts[counts >= 50].index.tolist()
print(f"{len(large_identities)} identities with ≥50 images")

# 4) See the top-5 (largest number of images)
top5 = counts.head(5)
print(top5)

Initiating the Identity Consistency Experiment. It loads the `list_attr_celeba.csv` file containing image attributes and the `identity_CelebA.txt` file to map images to person IDs. It then selects example celebrity IDs and filters the image filenames belonging to these identities for further analysis.

In [None]:
import pandas as pd
import os

print("--- Starting Identity Consistency Experiment ---")

# --- Step 1: Load the attributes file ---
# Kaggle usually puts annotation files in the main dataset folder.
attributes_path = "data/list_attr_celeba.csv" # Adjust this path if necessary

try:
    df_attributes = pd.read_csv(attributes_path)
    print("Attributes file loaded successfully.")
    # Let's see the first rows and column names
    # print(df_attributes.head())
    # print(df_attributes.columns)
except FileNotFoundError:
    print(f"ERROR: The attributes file was not found at '{attributes_path}'.")
    print("Please check the path and ensure the .csv file is in your Drive.")

# --- Step 2: Find the IDs of our celebrities ---
# CelebA doesn't use names, but a numerical ID for each person. We need to find out which one.
# (This part might require manual search or using an identity mapping file if available)
# For this example, let's assume we found the IDs (these are just examples):
# NOTE: We will need to find the correct IDs. One way is to search for the file `identity_CelebA.txt`
celebrity_A_id = 3745 # Example ID for Jennifer Aniston
celebrity_B_id = 3699 # Example ID for George Clooney

# --- Step 3: Filter images by identity ---
# (This step depends on the `identity_CelebA.txt` file, which maps image_id to person_id)
identity_path = 'data/identity_CelebA.txt'

try:
    df_identity = pd.read_csv(identity_path, sep=' ', header=None, names=['image_id', 'person_id'])
    print("Identity file loaded successfully.")

    # Find the filenames for each celebrity
    celebrity_A_images = df_identity[df_identity['person_id'] == celebrity_A_id]['image_id'].tolist()
    celebrity_B_images = df_identity[df_identity['person_id'] == celebrity_B_id]['image_id'].tolist()

    # Let's take just the first 15 from each for our test
    celebrity_A_images = celebrity_A_images[:30]
    celebrity_B_images = celebrity_B_images[:30]

    print(f"Found {len(celebrity_A_images)} images for Celebrity A.")
    print(f"Found {len(celebrity_B_images)} images for Celebrity B.")

except FileNotFoundError:
    print(f"ERROR: The file 'identity_CelebA.txt' was not found at '{identity_path}'.")
    print("This file is essential for mapping images to identities. Please upload it to your Drive.")

# Now, the lists `celebrity_A_images` and `celebrity_B_images` contain the filenames
# that we will use for our analysis.

Starting  Identity Consistency Analysis Phase. It defines a function to load and preprocess images for the VAE. It then generates latent space embeddings for the selected celebrity image groups (Celebrity A, Celebrity B, and a mixed group). Finally, it visualizes these embeddings in a 2D PCA space to show how well identities are separated in the latent space.

In [None]:
import torch
from torchvision import transforms
from PIL import Image
import numpy as np
from tqdm import tqdm

# --- Requirements ---
# Ensure these variables exist from previous cells:
# - our_architecture: The pre-trained and loaded VAE model.
# - celebrity_A_images: The list of filenames for the first celebrity.
# - celebrity_B_images: The list of filenames for the second celebrity.

print("--- Starting the Identity Consistency Analysis Phase ---")

# --- Step 1: Prepare a function to load and process images ---
data_transforms = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

base_image_path = "/content/data/img_align_celeba/img_align_celeba"

def load_and_process_images(file_list):
    processed_images = []
    for filename in file_list:
        full_path = os.path.join(base_image_path, filename)
        try:
            img = Image.open(full_path).convert('RGB')
            processed_images.append(data_transforms(img))
        except FileNotFoundError:
            print(f"Warning: Image not found at {full_path}")
            continue
    return torch.stack(processed_images)

# --- Step 2: Generate Embeddings for our groups ---
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
our_architecture.to(device)
our_architecture.eval()

print("\nGenerating embeddings for test groups...")
with torch.no_grad():
    # Group A: Celebrity A only
    tensor_A = load_and_process_images(celebrity_A_images).to(device)
    mu_A, _ = our_architecture.encode(tensor_A)

    # Group B: Celebrity B only
    tensor_B = load_and_process_images(celebrity_B_images).to(device)
    mu_B, _ = our_architecture.encode(tensor_B)

    # Group C: Mix of the two
    mixed_embeddings = torch.cat([mu_A, mu_B], dim=0)

print("Embeddings generated successfully!")

# --- Step 3: Visualize the Latent Space (The Visual Proof) ---
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt

# We use PCA to reduce dimensionality to 2D for plotting
pca = PCA(n_components=2)
embeddings_2d = pca.fit_transform(mixed_embeddings.cpu().numpy())

# Separate the points for coloring
points_A = embeddings_2d[:len(mu_A)]
points_B = embeddings_2d[len(mu_A):]

plt.figure(figsize=(10, 8))
plt.scatter(points_A[:, 0], points_A[:, 1], c='blue', label='Celebrity A', alpha=0.7)
plt.scatter(points_B[:, 0], points_B[:, 1], c='red', label='Celebrity B', alpha=0.7)
plt.title('Latent Space Visualization (PCA) of Identities')
plt.xlabel('Principal Component 1')
plt.ylabel('Principal Component 2')
plt.legend()
plt.grid(True)
plt.show()

# --- Step 4: Numerical Analysis of Consistency ---
# (This part would be more complex, involving applying the heuristics,
# but the visual proof will already be extremely powerful)

Numerical analysis of identity consistency using the calculated heuristic scores. It analyzes three groups: Celebrity A (intra-identity), Celebrity B (intra-identity), and a mixed group (inter-identity). It calculates and prints the mean and standard deviation of the Uniqueness, Originality, and Stability scores for each group and performs Levene's test to compare the variances of Originality scores between groups.

In [None]:
import numpy as np

# --- Requirements ---
# Ensure these variables exist from previous cells:
# - mu_A: The 15 embeddings of Celebrity A
# - mu_B: The 15 embeddings of Celebrity B
# - our_architecture: The loaded VAE model

print("--- Starting Numerical Identity Consistency Analysis ---")

# --- Step 1: Prepare the Data ---
# Convert to NumPy for analysis
group_A_embeddings = mu_A.cpu().numpy()
group_B_embeddings = mu_B.cpu().numpy()

# Create Group C (Control), mixing half of each
half = len(group_A_embeddings) // 2
group_C_embeddings = np.vstack([group_A_embeddings[:half], group_B_embeddings[:half]])

# --- Step 2: Apply the Heuristics to Each Group ---
def analyze_group(embeddings):
    # --- CHANGE ---
    # We will keep decomposition only for the Uniqueness metric
    decomposer = ComponentDecomposer(embeddings)
    components, _ = decomposer.decompose_independent_factors()
    uniqueness = decomposer.calculate_component_uniqueness()

    # And we will apply the other heuristics on the ORIGINAL EMBEDDINGS
    entropy_analyzer = EntropicOriginalityMeasure(embeddings) # <--- CHANGE
    temporal_analyzer = TemporalStabilityAnalyzer(embeddings) # <--- CHANGE

    originality = entropy_analyzer.calculate_entropic_originality()
    stability = temporal_analyzer.simulate_temporal_variations()

    return {"Uniqueness": uniqueness, "Originality": originality, "Stability": stability}

print("\nAnalyzing Group A (Celebrity A)...")
scores_group_A = analyze_group(group_A_embeddings)

print("Analyzing Group B (Celebrity B)...")
scores_group_B = analyze_group(group_B_embeddings)

print("Analyzing Group C (Mixed Control)...")
scores_group_C = analyze_group(group_C_embeddings)

# --- Step 3: Present the Results ---
print("\n--- NUMERICAL CONSISTENCY COMPARISON (Mean ± Standard Deviation) ---")
print("-" * 70)
print(f"| Heuristic       | Group A (Intra-ID)       | Group B (Intra-ID)       | Group C (Inter-ID)       |")
print("-" * 70)
for key in scores_group_A:
    mean_a, std_a = np.mean(scores_group_A[key]), np.std(scores_group_A[key])
    mean_b, std_b = np.mean(scores_group_B[key]), np.std(scores_group_B[key])
    mean_c, std_c = np.mean(scores_group_C[key]), np.std(scores_group_C[key])
    print(f"| {key:<15} | {mean_a:8.2f} ± {std_a:7.2f} | {mean_b:8.2f} ± {std_b:7.2f} | {mean_c:8.2f} ± {std_c:7.2f} |")
print("-" * 70)

from scipy.stats import levene

# Comparing the variances of Originality
stat, p_value_B_vs_C = levene(scores_group_B['Originality'], scores_group_C['Originality'])

print(f"\nLevene's Test for Variances (Originality): Group B vs. Group C")
print(f"P-value: {p_value_B_vs_C:.4f}")

if p_value_B_vs_C < 0.05:
    print("Conclusion: The difference in variances is statistically significant.")
else:
    print("Conclusion: There is no statistical evidence that the variances are different.")

 Non-parametric distribution tests (Mann-Whitney U and Kolmogorov-Smirnov) to compare the distributions of Originality scores between two groups (presumably Celebrity A and Celebrity B, based on the variable names). It also includes a function for bootstrap analysis to estimate confidence intervals for the difference in variances.

In [None]:
#Non-Parametric Distribution Tests
#Once you have your 50+ images per identity, you compare the distributions of Originality (or Uniqueness) of A vs B
from scipy.stats import mannwhitneyu, ks_2samp

# Assuming scores_grupo_A['Originalidade'] and scores_grupo_B['Originalidade'] are numpy arrays of Originality
u_stat, p_mw = mannwhitneyu(scores_group_A['Originality'], scores_group_B['Originality'], alternative='two-sided')
ks_stat, p_ks = ks_2samp(scores_group_A['Originality'], scores_group_B['Originality'])

print(f"Mann-Whitney U: stat={u_stat:.1f}, p={p_mw:.3f}")
print(f"Kolmogorov-Smirnov: stat={ks_stat:.3f}, p={p_ks:.3f}")

Function for performing bootstrap analysis to estimate confidence intervals for a statistic (e.g., variance difference) between two datasets. It then attempts to use this function to calculate the 95% confidence interval for the difference in variances of Originality scores between two groups (again, presumably Celebrity A and Celebrity B).

In [None]:
#Bootstrap to Estimate Confidence Intervals
import numpy as np

def bootstrap_statistic(data1, data2, stat_fn, n_boot=1000):
    """
    Returns the bootstrap distribution of stat_fn(data1) - stat_fn(data2).
    """
    diffs = []
    combined = np.concatenate([data1, data2])
    n1, n2 = len(data1), len(data2)
    for _ in range(n_boot):
        # Resampling with replacement
        b1 = np.random.choice(data1, size=n1, replace=True)
        b2 = np.random.choice(data2, size=n2, replace=True)
        diffs.append(stat_fn(b1) - stat_fn(b2))
    return np.array(diffs)

# Example: difference of variances of Originality
boot_diffs = bootstrap_statistic(scores_group_A['Originality'], scores_group_B['Originality'], np.var, n_boot=2000)
ci_lower, ci_upper = np.percentile(boot_diffs, [2.5, 97.5])
print(f"95% CI of variance difference: [{ci_lower:.2f}, {ci_upper:.2f}]")

 Visualization groups of images belonging to the selected celebrities (Celebrity 1 and Celebrity 2). It plots a fixed grid of images (up to 15 per group) to provide a visual comparison of the individuals within each group.

In [None]:
import matplotlib.pyplot as plt
from PIL import Image
import os
import numpy as np

# --- Requirements ---
# Ensure these variables exist from previous cells:
# - celebrity_A_images: The list of 15 filenames for the first celebrity.
# - celebrity_B_images: The list of 15 filenames for the second celebrity.
# - base_image_path: The variable with the path to the image folder. Ex: "/content/data/"

print("--- Starting Identity Group Visualization ---")

# Helper function to plot a group of images
def plot_group(file_list, title):
    # Plot only the first 15 images to fit in the 3x5 grid
    images_to_plot = file_list[:15] # <--- Modification here
    num_images = len(images_to_plot)

    plt.figure(figsize=(15, 9))
    plt.suptitle(title, fontsize=20)

    # Define the subplot grid (3 rows, 5 columns)
    cols = 5
    rows = 3 # <--- Modification here to fix at 3x5
    # Adjust the number of axes for the fixed grid, even if we have less than 15 images
    fig, axes = plt.subplots(rows, cols, figsize=(cols*3, rows*3))

    # Flatten the 2D grid into a 1D array and select only the axes that will be used
    axes = axes.flatten()

    for i, filename in enumerate(images_to_plot): # <--- Iterate over the sliced list
        # Create a subplot in the corresponding position (from 1 to 15)
        ax = axes[i] # <--- Access the axis by index in the flattened array

        # full_path = os.path.join(base_image_path, filename) # Original line, replaced below

        full_path = os.path.join("/content/data/img_align_celeba/img_align_celeba", filename) # <--- Adjusted path

        try:
            img = Image.open(full_path)
            ax.imshow(img)
        except FileNotFoundError:
            ax.text(0.5, 0.5, 'Image not found', ha='center')

        ax.set_title(f"Image {i+1}")
        ax.axis('off')

    # Hide the subplots that were not used (only for the case where we have less than 15 images)
    for j in range(num_images, len(axes)):
        axes[j].axis('off')

    plt.tight_layout(rect=[0, 0, 1, 0.96])
    plt.show()

# Plot Group A
print("\nGroup A (Blue -'):")
plot_group(celebrity_A_images, "Group A: Celebrity 1")

# Plot Group B
print("\nGroup B (Red -):")
plot_group(celebrity_B_images, "Group B: Celebrity 2")

 UMAP dimensionality reduction followed by HDBSCAN clustering on the real data latent space embeddings. It installs the `umap-learn` library, scales the embeddings, applies UMAP to create a 2D representation, and then applies HDBSCAN to find clusters in this 2D space. Finally, it visualizes the resulting clusters.

In [None]:
# --- Step 1.A: Install necessary libraries ---
!pip install umap-learn

import umap
import hdbscan
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler

# --- Requirements ---
# Ensure the variable 'real_embeddings_numpy' with the 10,000 embeddings exists.

print("--- Starting UMAP + HDBSCAN Analysis ---")

# --- Step 1.B: Preprocess the data ---
# Standardizing data is crucial for UMAP
scaler = StandardScaler()
embeddings_scaled = scaler.fit_transform(real_embeddings_numpy)

# --- Step 1.C: Apply UMAP to reduce dimensionality ---
print("\nApplying UMAP to create a 2D representation...")
reducer = umap.UMAP(
    n_neighbors=15,  # Controls the balance between local and global structure
    min_dist=0.1,    # Controls how "together" points in a cluster stay
    n_components=2,  # We want a 2D map
    metric='euclidean',
    random_state=42
)
embedding_2d = reducer.fit_transform(embeddings_scaled)
print("UMAP 2D representation created successfully.")

# --- Step 1.D: Apply HDBSCAN to the new representation ---
print("\nApplying HDBSCAN to the UMAP representation...")
clusterer = hdbscan.HDBSCAN(
    min_cluster_size=100, # We start with a high value to find large structures
    min_samples=10
)
clusters_umap = clusterer.fit_predict(embedding_2d)

n_clusters = len(set(clusters_umap)) - (1 if -1 in clusters_umap else 0)
n_noise = list(clusters_umap).count(-1)
print(f"HDBSCAN found {n_clusters} cluster(s) and {n_noise} noise point(s).")

# --- Step 1.E: Visualize the result ---
plt.figure(figsize=(12, 10))
# Plot the "noise" points (label -1) in light gray
plt.scatter(embedding_2d[clusters_umap == -1, 0], embedding_2d[clusters_umap == -1, 1], c=(0.8, 0.8, 0.8), s=1, alpha=0.5)
# Plot the found clusters with different colors
plt.scatter(embedding_2d[clusters_umap != -1, 0], embedding_2d[clusters_umap != -1, 1], c=clusters_umap[clusters_umap != -1], s=5, cmap='viridis')
plt.title('Clusters found by HDBSCAN in UMAP representation', fontsize=16)
plt.xlabel('UMAP 1 Dimension')
plt.ylabel('UMAP 2 Dimension')
plt.colorbar()
plt.show()

 Checkpoint to save the processed attributes and analysis results (cluster labels, Uniqueness, and Originality scores) into a Parquet file. It ensures that the necessary variables exist and are aligned before saving the data, allowing you to resume the analysis from this point later.

In [None]:
#=============================
# CHECKPOINT CREATION CELL
# Execute this cell and, if everything goes well, RESTART THE RUNTIME ENVIRONMENT.
# ==========================================================
print("Starting checkpoint creation...")

import pandas as pd
import os
from torchvision import datasets
from torchvision import transforms # Import transforms here too
import numpy as np # Import numpy here


base_dataset_path = '/content/data'

try:

    print("Checking for existing variables for dataset, subset indices, and clusters...")
    _ = full_dataset
    _ = subset_indices
    _ = clusters_umap # Verify if clusters_umap exists
    _ = uniqueness_scores # Verify uniqueness_scores exists
    _ = originality_scores # Verify originality_scores exists
    print("Variables full_dataset, subset_indices, clusters_umap, uniqueness_scores, and originality_scores found.")
except NameError:
    print("One or more variables (full_dataset, subset_indices, clusters_umap, uniqueness_scores, originality_scores) not found. Recreating necessary ones...")
    # Define basic transformations to load the dataset (not used for attributes, but needed for ImageFolder)
    basic_transforms = transforms.Compose([transforms.Resize((64, 64)), transforms.ToTensor()])
    full_dataset = datasets.ImageFolder(root=base_dataset_path, transform=basic_transforms)
    # Recreate subset_indices (assuming 10000 samples, adjust if needed)
    subset_indices = list(range(10000))
    print("full_dataset and subset_indices recreated. Note that clusters_umap, uniqueness_scores and originality_scores are *not* recreated here and will need to be generated by the relevant cells.")
    # Define clusters_umap, uniqueness_scores, originality_scores as None to detect they are not available
    clusters_umap = None
    uniqueness_scores = None
    originality_scores = None


# --- Load and preprocess attributes (Moved from the previous cell to ensure it exists) ---
attributes_path = "data/list_attr_celeba.csv"
subset_attributes = pd.DataFrame() # Initialize as empty
try:
    df_attributes = pd.read_csv(attributes_path).set_index('image_id')
    print("Attributes file loaded successfully.")

    # Get file names for the subset
    # Adjust the base path if necessary to match the actual structure in your file system
    # ImageFolder returns full paths, so we need to extract just the file name
    # We use the subset indices in the full_dataset to get the original paths and extract the file names
    file_names = [os.path.basename(full_dataset.samples[i][0]) for i in subset_indices]

    # Filter the attributes DataFrame using the subset file names
    # Use .loc for label-based indexing (file names)
    # Check if all subset file names exist in the df_attributes index
    existing_files = [name for name in file_names if name in df_attributes.index]
    if len(existing_files) != len(file_names):
        print(f"Warning: {len(file_names) - len(existing_files)} subset file names were not found in the attributes file.")
        # Filter file_names to include only those that exist in the index
        file_names = existing_files

    if file_names: # Proceed only if there are existing files to process
        subset_attributes = df_attributes.loc[file_names].copy()

        # Convert attributes from -1/1 to 0/1
        subset_attributes = (subset_attributes + 1) / 2
        print(f"Attributes DataFrame for the subset created successfully: {subset_attributes.shape}")
    else:
        print("No subset file names found in the attributes file. Could not create subset_attributes.")


except FileNotFoundError:
    print(f"ERROR: The attributes file was not found at '{attributes_attributes}'.")
    print("Please check the path and ensure the .csv file is accessible.")
except KeyError as e:
    print(f"ERROR: Some subset file name was not found in the attributes file during loc: {e}")
    print("Please check if the file names in the subset match the IDs in the attributes file.")


# 1. We take the attributes you have already processed (values 0 or 1)
# The variable should be called 'subset_attributes' as per your code.
# We check if subset_attributes was created successfully before proceeding
if not subset_attributes.empty:
    df_to_save = subset_attributes.copy()

    # 2. Add cluster labels, Uniqueness, and Originality as new columns
    # Check if clusters_umap, uniqueness_scores and originality_scores exist and are not None
    if clusters_umap is not None and uniqueness_scores is not None and originality_scores is not None:
        # Ensure the number of labels/scores matches the number of samples in subset_attributes
        if len(clusters_umap) == len(df_to_save) and len(uniqueness_scores) == len(df_to_save) and len(originality_scores) == len(df_to_save):
            df_to_save['cluster_label'] = clusters_umap
            df_to_save['Unicidade'] = uniqueness_scores # Add Unicidade score
            df_to_save['Originalidade'] = originality_scores # Add Originalidade score

            # 3. We will save in Parquet format, which is fast and efficient.
            file_path = 'checkpoint_final_results.parquet'
            df_to_save.to_parquet(file_path)

            print("--- SUCCESS! ---")
            print(f"Checkpoint with {df_to_save.shape[0]} rows and {df_to_save.shape[1]} columns saved to: '{file_path}'")
            print("The DataFrame contains attributes, 'cluster_label', 'Unicidade' and 'Originalidade'.")
            print("You can safely restart the runtime environment now.")
            print("==========================================================")

        else:
            print("\nERROR: The number of labels/scores does not match the number of samples in the subset attributes DataFrame.")
            print(f"Number of labels (clusters_umap): {len(clusters_umap) if clusters_umap is not None else 'N/A'}")
            print(f"Number of scores (uniqueness_scores): {len(uniqueness_scores) if uniqueness_scores is not None else 'N/A'}")
            print(f"Number of scores (originality_scores): {len(originality_scores) if originality_scores is not None else 'N/A'}")
            print(f"Number of samples in subset_attributes: {len(df_to_save)}")
            print("Please run the data loading cells and the analysis cells (including UMAP+HDBSCAN and heuristics) to ensure all data is synchronized before saving the checkpoint.")

    else:
        print("\nERROR: One or more variables ('clusters_umap', 'uniqueness_scores', 'originality_scores') are not defined or are None.")
        print("Please run the relevant cells to generate these variables before executing this checkpoint cell.")
else:
    print("\nCould not create the 'subset_attributes' DataFrame. Checkpoint was not saved.")

 Function `compute_abi_scores` to calculate the Attribute Bias Index (ABI) for given attributes based on a mask (indicating a specific cluster or group). It then iterates through the unique clusters found by HDBSCAN, calculates the ABI scores for each cluster, visualizes the top 10 attributes with the highest Odds Ratios for each cluster using bar plots, and displays sample images from each cluster.

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import os

def compute_abi_scores(attributes_df, mask, eps=1e-6):
    results = []
    cluster_mask = mask
    rest_mask = ~mask

    # Handle cases where one of the masks is empty to avoid division by zero
    if cluster_mask.sum() == 0 or rest_mask.sum() == 0:
        print("Warning: One of the masks is empty. Cannot compute meaningful ABI scores.")
        return pd.DataFrame({"Attribute": attributes_df.columns, "Odds Ratio": np.nan, "Interpretation": "N/A"})


    for col in attributes_df.columns:
        # Ensure we are using .loc with the boolean mask for correct indexing
        A = attributes_df.loc[cluster_mask, col].sum() + eps
        B = cluster_mask.sum() - (A - eps) + eps # Number of samples *not* having the attribute in the cluster
        C = attributes_df.loc[rest_mask, col].sum() + eps
        D = rest_mask.sum() - (C - eps) + eps # Number of samples *not* having the attribute outside the cluster


        odds_cluster = A / B
        odds_rest = C / D

        # Handle cases where odds_rest is zero to avoid division by zero
        if odds_rest == 0:
            odds_ratio = np.inf if odds_cluster > 0 else 1.0 # Infinite if cluster has the attribute, 1.0 otherwise
        else:
            odds_ratio = odds_cluster / odds_rest


        if odds_ratio > 1.15:
            interp = "Over-represented"
        elif odds_ratio < 0.85:
            interp = "Under-represented"
        else:
            interp = "Neutral"

        results.append({
            "Attribute": col,
            "Odds Ratio": round(odds_ratio, 2),
            "Interpretation": interp
        })

    return pd.DataFrame(results).sort_values("Odds Ratio", ascending=False).reset_index(drop=True)


# Adjust these paths and variables according to your environment:
attributes_path = "data/list_attr_celeba.csv"
#full_dataset = ...  # your loaded CelebA dataset
#subset_indices = ...  # indices of the 10k images
#clusters_umap = ...  # HDBSCAN labels after UMAP

# Load and preprocess attributes
df_attributes = pd.read_csv(attributes_path).set_index('image_id')
file_names = [os.path.basename(full_dataset.samples[i][0]) for i in subset_indices]
subset_attributes = df_attributes.loc[file_names]
subset_attributes = (subset_attributes + 1) / 2  # from -1/1 to 0/1

# Analysis by cluster
unique_labels = [l for l in np.unique(clusters_umap) if l != -1]

import matplotlib.pyplot as plt
import seaborn as sns

# Define a color palette for the clusters
palette = sns.color_palette("tab10", n_colors=len(unique_labels))
label2color = {lbl: palette[i] for i, lbl in enumerate(unique_labels)}

print("--- CLUSTER SUMMARY ---")
for lbl in unique_labels:
    count = (clusters_umap == lbl).sum()
    color = label2color[lbl]
for label in unique_labels:
    print(f"\n--- CLUSTER {label} ---")
    print(f"Number of samples in the cluster: {np.sum(clusters_umap == label)}")
    idxs = np.where(clusters_umap == label)[0]
    print(f"SBS = {len(idxs)/len(clusters_umap):.4f}")

    # Use the full-length mask for the cluster
    full_length_mask_cluster = np.zeros(len(subset_attributes), dtype=bool)
    full_length_mask_cluster[idxs] = True

    abi_df = compute_abi_scores(subset_attributes, full_length_mask_cluster)
    plt.figure(figsize=(6,4))
    sns.barplot(
    data=abi_df.head(10),
    x="Odds Ratio", y="Attribute",
    color=label2color[label]
                           )
    plt.title(f"ABI Top 10 — Cluster {label}")
    plt.axvline(1.0, ls='--', color='gray')
    plt.tight_layout()
    plt.show()
    display(abi_df.head(10))

    # Sample visualization
    sample_idxs = np.random.choice(idxs, size=min(100, len(idxs)), replace=False)
    cols = 5
    rows = int(np.ceil(len(sample_idxs) / cols))
    fig, axes = plt.subplots(rows, cols, figsize=(cols*3, rows*3))
    axes = axes.flatten()
    for ax in axes[len(sample_idxs):]:
        ax.axis('off')
    for i, idx in enumerate(sample_idxs):
        path = os.path.join("/content/data/img_align_celeba/img_align_celeba", file_names[idx])
        axes[i].imshow(Image.open(path))
        axes[i].axis('off')
    plt.suptitle(f"Cluster {label} Samples")
    plt.tight_layout()
    plt.show()

 Analysis of the "Creative Frontier" (outliers) by calculating the Creative Latent Score (CLS) for each sample. It normalizes the Uniqueness and Originality scores and combines them to get the CLS. It then identifies and visualizes the top N samples with the highest CLS as potential "black swans" or creative outliers.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import os
from sklearn.preprocessing import MinMaxScaler

# --- Requirements ---
# Ensure these variables exist:
# - uniqueness_scores: The array with Uniqueness scores for the 10,000 samples.
# - originality_scores: The array with Originality scores.
# - subset_indices: The indices of the 10,000 images we used.
# - full_dataset: The complete CelebA dataset.

print("--- STARTING CREATIVE FRONTIER ANALYSIS (OUTLIERS) ---")

# --- Step 1: Calculate the Creative Latent Score (CLS) ---
# We normalize the scores to the [0, 1] scale so they can be combined fairly.
scaler = MinMaxScaler()
unicidade_norm = scaler.fit_transform(uniqueness_scores.reshape(-1, 1))
originalidade_norm = scaler.fit_transform(originality_scores.reshape(-1, 1))

# Define alpha (the weight) and calculate CLS
alpha = 0.5
cls_scores = alpha * unicidade_norm + (1 - alpha) * originalidade_norm
cls_scores = cls_scores.flatten() # Flatten back to a 1D array

# ---
# --- Step 2: Find the Top N Candidates ---
# Sort the sample indices based on their CLS, from highest to lowest.
top_n_indices = np.argsort(cls_scores)[::-1]

# Select the top 150
top_150_creative = top_n_indices[:150]

print(f"Top 150 most 'creative' samples (highest CLS) found.")

# --- Step 3: Visualize the "Black Swans" ---
print("\nVisualizing samples with the highest Creative Latent Score (CLS):")

# Get the file names from our subset
file_names = [os.path.basename(full_dataset.samples[i][0]) for i in subset_indices]

# Plot the images
base_image_path = "/content/data/img_align_celeba/img_align_celeba"
num_images = len(top_150_creative)


cols = 5
rows = int(np.ceil(num_images / cols))

fig, axes = plt.subplots(rows, cols, figsize=(cols * 3, rows * 3))
axes = axes.flatten()

for i, idx in enumerate(top_150_creative):
    file_name = file_names[idx]
    image_path = os.path.join(base_image_path, file_name)
    img = Image.open(image_path)
    axes[i].imshow(img)
    axes[i].set_title(f"Ranking {i+1} \nId: {idx}")
    axes[i].axis('off')


for j in range(i + 1, len(axes)): axes[j].axis('off')
plt.suptitle("Top 150 Samples by Creative Latent Score (CLS)", fontsize=16)
plt.tight_layout(rect=[0, 0, 1, 0.96])
plt.show()

This cell analyzes "Stereotypes" by identifying samples with the lowest Originality scores. It finds the top N samples with the lowest originality scores and visualizes these images as representatives of the most generic or stereotypical samples in the latent space.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import os

# --- Requirements ---
# Ensure these variables exist:
# - originality_scores: The array with Originality scores for the 10,000 samples.
# - subset_indices: The indices of the 10,000 images we used.
# - full_dataset: The complete CelebA dataset.

print("--- STEREOTYPE ANALYSIS (LOWEST SCORES) ---")

# --- Step 1: Find the Top N Candidates with LOWEST Originality ---
# Sort the sample indices based on their score, from LOWEST to highest.
bottom_n_indices = np.argsort(originality_scores)

# Select the bottom 40 (the most generic)
top_140_generic = bottom_n_indices[:140]

print(f"Top 140 most 'generic' samples (lowest Originality) found.")

# --- Step 2: Visualize the Stereotypes ---
print("\nVisualizing samples with the lowest Entropic Originality Score:")

# Get the file names from our subset
file_names = [os.path.basename(full_dataset.samples[i][0]) for i in subset_indices]

# Plot the images
base_image_path = "/content/data/img_align_celeba/img_align_celeba"
num_images = len(top_140_generic)
cols = 5
rows = int(np.ceil(num_images / cols))

fig, axes = plt.subplots(rows, cols, figsize=(cols * 3, rows * 3))
axes = axes.flatten()

for i, idx in enumerate(top_140_generic):
    file_name = file_names[idx]
    image_path = os.path.join(base_image_path, file_name)
    img = Image.open(image_path)
    axes[i].imshow(img)
    axes[i].set_title(f"Generic Rank {i+1}\nIndex: {idx}")
    axes[i].axis('off')

for j in range(i + 1, len(axes)): axes[j].axis('off')
plt.suptitle("Top 140 Samples by Lowest Originality Score (Stereotypes)", fontsize=16)
plt.tight_layout(rect=[0, 0, 1, 0.96])
plt.show()

This is a quantitative bias analysis (SBS & ABI). It identifies the stereotypical cluster (C_max) by finding the largest cluster in the raw embeddings using HDBSCAN. It calculates the Stereotypical Bias Score (SBS) as the proportion of samples in this cluster. It then calculates the Attribute Bias Index (ABI) for selected attributes to assess how super or sub-represented they are within the stereotypical cluster compared to the overall dataset.

In [None]:
import numpy as np
import pandas as pd
import hdbscan
from sklearn.preprocessing import StandardScaler

print("--- STARTING QUANTITATIVE BIAS ANALYSIS (SBS & ABI) ---")

# --- Step 1: Identify the Stereotypical Cluster (C_max) ---
print("\n1. Identifying the central cluster with HDBSCAN on raw embeddings...")
scaler_bias = StandardScaler()
embeddings_scaled_bias = scaler_bias.fit_transform(real_embeddings_numpy)

clusterer_bias = hdbscan.HDBSCAN(
    min_cluster_size=2, # Using the value that worked for you
    metric='euclidean'
)
clusters_bias = clusterer_bias.fit_predict(embeddings_scaled_bias)

labels, counts = np.unique(clusters_bias[clusters_bias!=-1], return_counts=True)
if len(counts) > 0:
    c_max_label = labels[np.argmax(counts)]
    indices_c_max = np.where(clusters_bias == c_max_label)[0]
    print(f"Stereotypical cluster (C_max) found with label {c_max_label}, containing {len(indices_c_max)} samples.")
else:
    print("No significant cluster found. Bias analysis cannot proceed.")
    indices_c_max = []

# --- Step 2: Calculate the Stereotypical Bias Score (SBS) ---
if len(indices_c_max) > 0:
    N = len(real_embeddings_numpy)
    sbs_score = len(indices_c_max) / N
    print(f"\n2. Stereotypical Bias Score (SBS): {sbs_score:.4f} ({sbs_score:.2%})")
    print(f"   -> {sbs_score:.2%} of the dataset belongs to the main small hyper-dense yellow cluster.")

    # --- Step 3: Calculate the Attribute Bias Index (ABI) ---
    print("\n3. Calculating the Attribute Bias Index (ABI) for selected attributes...")

    # Prepare the attributes DataFrame
    file_names = [os.path.basename(full_dataset.samples[i][0]) for i in subset_indices]

    # APPLY THE ISIN FILTER ON THE ORIGINAL DATAFRAME BEFORE INDEXING
    subset_attributes = df_attributes[df_attributes.index.isin(file_names)].copy()

    # Check if the filtered DataFrame is not empty
    if subset_attributes.empty:
        print("Error: The subset attributes DataFrame is empty after filtering. Check file names.")
    else:
        # Convert attributes to 0 and 1 for the mean calculation
        subset_attributes_norm = (subset_attributes + 1) / 2

        attributes_for_analysis = ['Smiling', 'Male', 'Young', 'Blond_Hair', 'Eyeglasses', 'No_Beard']

        # Ensure selected attributes exist in the DataFrame
        available_attributes = [attr for attr in attributes_for_analysis if attr in subset_attributes_norm.columns]
        if len(available_attributes) != len(attributes_for_analysis):
            missing = [attr for attr in attributes_for_analysis if attr not in subset_attributes_norm.columns]
            print(f"Warning: The following attributes were not found in the dataset and will be skipped: {missing}")
        attributes_for_analysis = available_attributes # Update list to only include available attributes


        if not attributes_for_analysis:
            print("No valid attributes selected for ABI analysis.")
        else:
            general_prob = subset_attributes_norm[attributes_for_analysis].mean()

            # Isolate attributes only from our C_max cluster
            c_max_file_names = [file_names[i] for i in indices_c_max]

            # Use the already indexed DataFrame for the lookup
            # The correction here is that subset_attributes_norm ALREADY HAS image_id as index
            attributes_c_max = subset_attributes_norm.loc[c_max_file_names]
            prob_in_cluster = attributes_c_max[attributes_for_analysis].mean()

            # Add a small constant to avoid division by zero if general probability is 0
            abi_scores = prob_in_cluster / (general_prob + 1e-8)


            print("\nABI Results (Attribute Odds Ratio in Stereotype vs. Dataset):")
            print("-" * 50)
            print(f"| Attribute         | ABI Score | Interpretation")
            print("-" * 50)
            for attr, score in abi_scores.items():
                interpretation = "Over-represented" if score > 1.1 else "Under-represented" if score < 0.9 else "Neutral representation"
                print(f"| {attr:<17} | {score:9.2f} | {interpretation}")
            print("-" * 50)

This cell generates the final visualizations for the research paper. It creates a bar chart showing the Attribute Bias Index (ABI) for selected attributes in the stereotypical cluster, highlighting super-represented, sub-represented, and neutral attributes. It also generates a scatter plot of normalized Uniqueness versus Originality scores, colored by the HDBSCAN cluster labels, to visualize the heuristic space.

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from sklearn.preprocessing import MinMaxScaler

# --- Requirements ---
# Ensure these variables exist from the previous cells:
# - abi_scores: The pandas Series with the Attribute Bias Index results.
# - uniqueness_scores: The array with Uniqueness scores for the 10,000 samples.
# - originality_scores: The array with Originality scores.
# - clusters_umap: The array with the cluster labels from the UMAP + HDBSCAN analysis.
# - embedding_2d: The 2D representation generated by UMAP.

print("--- GENERATING FINAL RESEARCH VISUALIZATIONS ---")

# --- Plot 1: Attribute Bias Index (ABI) Bar Chart ---

print("\nGenerating Plot 1: Attribute Bias Analysis (ABI)...")

# Prepare data for the plot
abi_to_plot = abi_scores.sort_values()
colors = ['red' if x < 0.9 else 'green' if x > 1.1 else 'grey' for x in abi_to_plot]

plt.figure(figsize=(12, 8))
bars = plt.barh(abi_to_plot.index, abi_to_plot.values, color=colors)

# Add a vertical line at the neutral point (ABI = 1.0)
plt.axvline(1.0, color='black', linestyle='--', linewidth=1)
plt.xlabel('Attribute Bias Index (ABI Score)', fontsize=12)
plt.title('Attribute Bias in the Stereotypical Cluster (C_max)', fontsize=16)
plt.grid(axis='x', linestyle='--', alpha=0.7)

# Add the value of each bar for clarity
for bar in bars:
    plt.text(bar.get_width() + 0.01, bar.get_y() + bar.get_height()/2,
             f'{bar.get_width():.2f}',
             va='center', ha='left')

plt.show()


# --- Plot 2: Uniqueness vs. Originality Scatter Plot ---

print("\nGenerating Plot 2: Heuristic Space Map (Uniqueness vs. Originality)...")

# Normalize the scores to the [0, 1] scale for fair visualization
scaler = MinMaxScaler()
unicidade_norm = scaler.fit_transform(uniqueness_scores.reshape(-1, 1))
originalidade_norm = scaler.fit_transform(originality_scores.reshape(-1, 1))

plt.figure(figsize=(14, 12))

# Plot the "noise" points (label -1) first, in light gray
noise_indices = clusters_umap == -1
plt.scatter(unicidade_norm[noise_indices], originalidade_norm[noise_indices],
            c='lightgray', s=5, alpha=0.5, label='Noise (Unclustered)')

# Plot the found clusters with different colors
cluster_indices = clusters_umap != -1
scatter = plt.scatter(unicidade_norm[cluster_indices], originalidade_norm[cluster_indices],
                      c=clusters_umap[cluster_indices], s=15, cmap='viridis',
                      label='Identified Clusters')


plt.title('Heuristic Map: Uniqueness vs. Originality', fontsize=18)
plt.xlabel('Uniqueness Score (Normalized)', fontsize=14)
plt.ylabel('Originality Score (Normalized)', fontsize=14)
plt.legend()
plt.colorbar(scatter, label='Cluster ID (HDBSCAN)')
plt.grid(True, linestyle='--', alpha=0.6)
plt.show()

 Calculation of  Spearman correlation coefficient between the Uniqueness and Originality scores to assess their relationship. It then plots a scatter plot of Uniqueness versus Originality scores with a diagonal arrow indicating the direction of increasing scores, providing a visual representation of the heuristic space and the correlation between the two metrics.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import spearmanr
import pandas as pd # Import pandas

# Load the scores from the checkpoint DataFrame
try:
    df_results = pd.read_parquet('checkpoint_final_results.parquet')
    print("Checkpoint file loaded.")
    # Ensure scores are accessible by original index
    # Assuming subset_indices is available from previous cells
    # If subset_indices is not available, this part will need adjustment
    unicidade_scores = df_results['Unicidade'].values
    originalidade_scores = df_results['Originalidade'].values

except FileNotFoundError:
    print("Error: Checkpoint file 'checkpoint_final_results.parquet' not found.")
    print("Please run the checkpoint saving cell first.")
    unicidade_scores = None
    originalidade_scores = None
except NameError:
    print("Error: 'subset_indices' is not defined. Cannot filter scores by subset.")
    unicidade_scores = None
    originalidade_scores = None


# If unicidade_scores and originality_scores are in lists, convert:
# Assuming they are already numpy arrays from previous steps
if unicidade_scores is not None and originality_scores is not None:
    uniq = np.array(unicidade_scores)
    orig = np.array(originalidade_scores)

    # 1. Calculate Spearman correlation
    rho, pval = spearmanr(uniq, orig)
    print(f"Spearman ρ = {rho:.2f}, p = {pval:.3f}")

    # 2. Plot the scatter with diagonal arrow
    plt.figure(figsize=(7,7))
    plt.scatter(uniq, orig, s=10, alpha=0.6)
    plt.xlabel('Uniqueness Score') # Removed Normalizado as scores might not be normalized here
    plt.ylabel('Originality Score') # Removed Normalizado as scores might not be normalized here
    plt.title(f'Heuristic Map: Uniqueness vs. Originality\nSpearman ρ={rho:.2f}, p={pval:.3f}')

    # Add a diagonal arrow (from low to high region)
    xmin, xmax = plt.xlim()
    ymin, ymax = plt.ylim()
    plt.annotate(
        '',
        xy=(xmax*0.9, ymax*0.9),
        xytext=(xmin*1.05, ymin*1.05),
        arrowprops=dict(arrowstyle='->', lw=2)
    )

    plt.tight_layout()
    plt.show()
else:
    print("Scores not available for plotting and correlation calculation.")

 Analysis results from the Parquet checkpoint file. It calculates the Creative Latent Score (CLS) for each sample if it doesn't already exist. It then filters out the noise points (cluster -1) and calculates the average CLS for each identified cluster. Finally, it identifies and prints the cluster with the highest average CLS as the "Creative Cluster (Cluster C)".

In [None]:
import pandas as pd
import numpy as np

# Load the results from our secure checkpoint
try:
    df_results = pd.read_parquet('checkpoint_final_results.parquet')
    print("Checkpoint file loaded successfully!")
    print(f"Total samples loaded: {len(df_results)}")
except FileNotFoundError:
    print("ERROR: The file 'checkpoint_final_results.parquet' was not found.")
    print("Please ensure the saving cell was executed successfully.")
    # Stop execution if the file does not exist
    # df_results = None

# Ensure the dataframe was loaded before proceeding
if 'df_results' in locals() and df_results is not None:

    # QUESTION: Does your dataframe already have the 'CLS' column?
    # If not, we need to calculate it first.
    # Assuming yes, and that the score columns are named 'Unicidade' and 'Originalidade'
    if 'CLS' not in df_results.columns:
        # If the CLS column does not exist, let's create it.
        # Adapt the names 'Unicidade' and 'Originalidade' if they are different.
        df_results['CLS'] = (df_results['Unicidade'] + df_results['Originalidade']) / 2 # Changed from 'Uniqueness' and 'Originality' to 'Unicidade' and 'Originalidade'
        print("'CLS' column calculated and added.")

    # Filter out noise (cluster -1) to not include in the mean calculation
    df_clusters_only = df_results[df_results['cluster_label'] != -1].copy()

    # Calculate the average CLS per cluster
    avg_cls_per_cluster = df_clusters_only.groupby('cluster_label')['CLS'].mean().sort_values(ascending=False)

    # Identify the cluster with the highest average CLS
    creative_cluster_id = avg_cls_per_cluster.idxmax()

    print("\n--- Creative Score (CLS) Analysis by Cluster ---")
    print(avg_cls_per_cluster)
    print("----------------------------------------------------")
    print(f"\n🏆 The Creative Cluster (Cluster C) is: CLUSTER {creative_cluster_id}")
    print("----------------------------------------------------")

This cell analyzes the "noise" region (samples not assigned to any cluster by HDBSCAN, the grey region). It calculates the mean CLS, Uniqueness, and Originality scores for these noise points. It then identifies and prepares to display the top 10 images with the highest CLS within this noise region, considering them as potential creative outliers that didn't fit into defined clusters.

In [None]:
# 1. Select samples classified as "noise" (label -1)
indices_noise = np.where(clusters_umap == -1)[0]
print(f"Number of samples outside clusters (gray region): {len(indices_noise)}")

# 2. Calculate heuristic statistics for these points
# Load the scores from the checkpoint DataFrame
unicidade_scores = df_results['Unicidade'].values[subset_indices] # Get scores for the subset used for UMAP
originalidade_scores = df_results['Originalidade'].values[subset_indices] # Get scores for the subset used for UMAP
cls_scores = df_results['CLS'].values[subset_indices] # Get CLS scores for the subset used for UMAP

# Filter scores for noise indices
cls_noise = cls_scores[indices_noise]
unicidade_noise = unicidade_scores[indices_noise]
originalidade_noise = originality_scores[indices_noise]

print(f"Average CLS (Gray Region): {np.mean(cls_noise):.2f} ± {np.std(cls_noise):.2f}")
print(f"Average Uniqueness: {np.mean(unicidade_noise):.2f}")
print(f"Average Originality: {np.mean(originalidade_noise):.2f}")

# 3. Show the top 10 most creative images from the gray region
top_idxs = indices_noise[np.argsort(cls_noise)[-10:]]  # Top 10 CLS in the gray region

Visualizing the top N creative outliers specifically from the "noise" region (samples not assigned to any cluster). It selects the top N samples with the highest CLS within the noise indices and plots their corresponding images with their CLS scores, providing a visual inspection of the most creative samples outside of the main clusters.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import os

# Parameters
N = 50   # number of images to plot
base_path = "/content/data/img_align_celeba/img_align_celeba"

# 1) Indices of the gray region
indices_noise = np.where(clusters_umap == -1)[0]

# 2) Get the N highest CLS within this region
cls_noise = cls_scores[indices_noise]
top_idxs_noise = indices_noise[np.argsort(cls_noise)[-N:]]

# 3) Plot
cols = 4
rows = int(np.ceil(N/cols))
fig, axes = plt.subplots(rows, cols, figsize=(cols*3, rows*3))
axes = axes.flatten()

for i, idx in enumerate(top_idxs_noise):
    fname = file_names[idx]
    img = Image.open(os.path.join(base_path, fname))
    axes[i].imshow(img)
    axes[i].set_title(f"CLS={cls_scores[idx]:.1f}")
    axes[i].axis('off')

# clean up extra axes
for j in range(i+1, len(axes)):
    axes[j].axis('off')

plt.suptitle(f"Top {N} Creative Outliers in the Gray Region", y=1.02)
plt.tight_layout()
plt.show()

Exploration HDBSCAN thresholds calibration and detecting low-density "pockets" or "sub-niches" in the UMAP space. It runs HDBSCAN with different `min_cluster_size` values to see how the number of clusters changes. It then attempts to use Kernel Density Estimation (KDE) and minimum filtering to identify low-density areas (valleys) and cluster points within these valleys using DBSCAN. Finally, it calculates and displays the mean heuristic scores (CLS, Uniqueness, Originality) and ABI scores for these identified sub-niches and visualizes sample images from them.

In [None]:
#Calibrate HDBSCAN thresholds as “multi-scale”
#Instead of a single min_cluster_size, run HDBSCAN multiple times with different values, for example:
import hdbscan

for mcs in [5, 10, 20, 50]:
    clusterer = hdbscan.HDBSCAN(min_cluster_size=mcs, min_samples=1, cluster_selection_epsilon=0.0)
    labels_mcs = clusterer.fit_predict(embedding_2d) # Changed X_umap to embedding_2d
    n_clusters = len(set(labels_mcs)) - (1 if -1 in labels_mcs else 0)
    print(f"min_cluster_size={mcs} → {n_clusters} clusters")
#Objective: identify configurations that reveal small sub-clusters in the clearings.

# Automatically detect low-density “pockets” via density estimation
#KDE over UMAP
#Use sklearn.neighbors.KernelDensity to estimate local density in the 2D UMAP space.

#Find local minima
#Apply a neighborhood filter (scipy’s maximum_filter/minimum_filter) over the density map to extract points that are density valleys (the bottoms of the clearings).

#Cluster these valleys
#With a small eps DBSCAN, group these minima into “sub-niches”.
from sklearn.neighbors import KernelDensity
from scipy.ndimage import minimum_filter

# 1) KDE
kde = KernelDensity(bandwidth=0.5).fit(embedding_2d) # Changed X_umap to embedding_2d
log_dens = kde.score_samples(embedding_2d) # Changed X_umap to embedding_2d

# 2) Local minima
mins = minimum_filter(log_dens, size=15)
pockets = (log_dens == mins) & (log_dens < np.percentile(log_dens, 20))

# 3) Extract indices and group with DBSCAN
from sklearn.cluster import DBSCAN
valleys_idx = np.where(pockets)[0]
db = DBSCAN(eps=0.3, min_samples=5).fit(embedding_2d[valleys_idx]) # Changed X_umap to embedding_2d
sub_labels = db.labels_
#Objective: automatically isolate points from the clearings and treat them as “Cluster D” of sub-niches.

#Select these sub-niches for heuristic analysis
#Once the indices of each sub-niche are identified (via HDBSCAN calibration or via KDE+DBSCAN)
for subcluster in np.unique(sub_labels):
     # ── FILTER: skip very small sub-niches ──
    MIN_SIZE= 20
    idxs = np.where(sub_labels == subcluster)[0]
    if len(idxs) < MIN_SIZE:
        print(f"Ignoring sub-niche {subcluster}: only {len(idxs)} samples (<{MIN_SIZE})")
        continue
    # Get the indices of the current subcluster within the 'valleys_idx'
    current_subcluster_indices_in_valleys = np.where(sub_labels == subcluster)[0]
    # Get the original indices of the samples belonging to the current subcluster
    original_indices_of_subcluster = valleys_idx[current_subcluster_indices_in_valleys]

    print(f"\n--- Sub-niche {subcluster}: {len(original_indices_of_subcluster)} images ---")
    print(f"Average CLS: {cls_scores[original_indices_of_subcluster].mean():.2f}")
    print(f"Average Uniqueness: {unicidade_scores[original_indices_of_subcluster].mean():.2f}") # Changed uni_scores to unicidade_scores
    print(f"Average Originality: {originality_scores[original_indices_of_subcluster].mean():.2f}") # Changed orig_scores to originality_scores

    # Create a full-length boolean mask based on the original indices of the subcluster
    full_length_mask = np.zeros(len(subset_attributes), dtype=bool)
    full_length_mask[original_indices_of_subcluster] = True

    # Use the full-length mask in compute_abi_scores
    # Removed the target_label argument as the mask already specifies the target group
    display(compute_abi_scores(subset_attributes, full_length_mask))
import matplotlib.pyplot as plt
from PIL import Image
import os

# Number of samples to plot per sub-niche
N = 6
base_path = "/content/data/img_align_celeba/img_align_celeba"

for subcluster in np.unique(sub_labels):
    idxs = np.where(sub_labels == subcluster)[0]
    print(f"\n=== Sub-niche {subcluster}: {len(idxs)} images (Average CLS = {cls_scores[idxs].mean():.1f}) ===")

    # Choose N random samples
    sample_idxs = np.random.choice(idxs, size=min(N, len(idxs)), replace=False)

    # Check if there are any samples to plot before creating the figure
    if len(sample_idxs) > 0:
        # Plot
        cols = 3
        rows = int(np.ceil(len(sample_idxs)/cols))
        fig, axes = plt.subplots(rows, cols, figsize=(cols*3, rows*3))
        axes = axes.flatten()
        for i, idx in enumerate(sample_idxs):
            fname = file_names[idx]
            img = Image.open(os.path.join(base_path, fname))
            axes[i].imshow(img)
            axes[i].set_title(f"CLS={cls_scores[idx]:.1f}\nUn={unicidade_scores[idx]:.1f}, O={originality_scores[idx]:.1f}")
            axes[i].axis('off')
        for j in range(i+1, len(axes)):
            axes[j].axis('off')
        plt.suptitle(f"Examples — Sub-niche {subcluster}", y=1.02, fontsize=14)
        plt.tight_layout()
        plt.show()
    else:
        print(f"No samples to plot for subcluster {subcluster}")

Identity Consistency Analysis using AUC (Area Under the ROC Curve) and Silhouette Score. It loads the identity mapping and selects identities with a sufficient number of images. It then generates positive pairs (images of the same identity) and negative pairs (images of different identities). It calculates a "distance" between pairs based on the heuristic scores and uses these scores and the generated labels (same or different identity) to compute the AUC, assessing how well the heuristics can distinguish between identities. It also calculates the Silhouette Score using the raw embeddings and identity labels to evaluate the clustering quality based on identity.

In [None]:
import numpy as np
from sklearn.metrics import roc_auc_score, silhouette_score
import itertools
import pandas as pd
import os # Import the os module

# Load the identity mapping
identity_file = "/content/drive/MyDrive/Colab Notebooks/identity_CelebA.txt"
df_id = pd.read_csv(identity_file, sep=" ", names=["image_id", "identity"])

# Create a dictionary mapping identity ID to a list of image indices in the subset
# We need the mapping from image_id to original index in the subset
image_id_to_subset_index = {os.path.basename(full_dataset.samples[i][0]): i for i in subset_indices}

indices_per_id = {}
# Iterate through the identity dataframe and populate the indices_per_id dictionary
for index, row in df_id.iterrows():
    image_id = row['image_id']
    identity_id = row['identity']
    if image_id in image_id_to_subset_index:
        subset_index = image_id_to_subset_index[image_id]
        if identity_id not in indices_per_id:
            indices_per_id[identity_id] = []
        indices_per_id[identity_id].append(subset_index)

# --- AUC Identity Analysis ---
print("--- Starting Identity Analysis with AUC ---")
# 1) Select K people with >= N photos (e.g., N=15)
# Get identities with at least 15 images in the subset
valid_ids = [pid for pid, indices in indices_per_id.items() if len(indices) >= 5]

if not valid_ids:
    print("No identities found with at least 15 images in the subset. Cannot perform AUC analysis.")
else:
    # Use the first 5 valid identities
    ids_to_use = valid_ids[:5]
    print(f"Using {len(ids_to_use)} identities for AUC analysis.")


    # Load the scores from the checkpoint DataFrame if not already loaded
    try:
        df_results = pd.read_parquet('checkpoint_final_results.parquet')
        print("Checkpoint file loaded.")
        # Ensure scores are accessible by original index
        unicidade_scores = df_results['Unicidade'].values[subset_indices]
        originalidade_scores = df_results['Originalidade'].values[subset_indices]

    except FileNotFoundError:
        print("Error: Checkpoint file 'checkpoint_final_results.parquet' not found.")
        print("Please run the checkpoint saving cell first.")
        unicidade_scores = None
        originalidade_scores = None


    if unicidade_scores is not None and originalidade_scores is not None:
        # 2) Generate positive and negative pairs
        pairs = []
        labels = []

        # Positive pairs (same identity)
        for pid in ids_to_use:
            imgs = indices_per_id[pid]
            for i, j in itertools.combinations(imgs, 2):
                pairs.append((i, j))
                labels.append(1)

        # Negative pairs (different identities) - sample a reasonable number
        num_negative_pairs_per_id_pair = 50 # Limit negative pairs to avoid imbalance
        for pid1, pid2 in itertools.combinations(ids_to_use, 2):
            # Sample images from each identity to create negative pairs
            imgs1 = np.random.choice(indices_per_id[pid1], size=min(len(indices_per_id[pid1]), 10), replace=False) # Sample up to 10 images
            imgs2 = np.random.choice(indices_per_id[pid2], size=min(len(indices_per_id[pid2]), 10), replace=False) # Sample up to 10 images

            neg_pairs_count = 0
            for i in imgs1:
                 for j in imgs2:
                      pairs.append((i, j))
                      labels.append(0)
                      neg_pairs_count += 1
                      if neg_pairs_count >= num_negative_pairs_per_id_pair:
                          break # Limit negative pairs per id pair
                 if neg_pairs_count >= num_negative_pairs_per_id_pair:
                     break


        print(f"Generated {len(pairs)} pairs ({labels.count(1)} positive, {labels.count(0)} negative).")

        if len(pairs) > 0:
            # 3) Calculate the "distance" based on heuristics
            scores = []
            for i, j in pairs:
                d_uni = abs(unicidade_scores[i] - unicidade_scores[j])
                d_ori = abs(originalidade_scores[i] - originality_scores[j])
                scores.append((d_uni + d_ori) / 2)  # or another combination

            # 4) ROC AUC
            auc = roc_auc_score(labels, scores)
            print(f"Identity AUC (Based on Heuristics): {auc:.3f}")
        else:
            print("No pairs generated. Cannot compute AUC.")
    else:
        print("Scores not available. Cannot compute AUC.")


# --- Silhouette Score Analysis ---
print("\n--- Starting Identity Consistency Analysis with Silhouette Score ---")

# 1. Filter embeddings and identity labels for identities with enough images
# We need the raw embeddings (real_embeddings_numpy) and corresponding identity labels

# Get indices of all samples belonging to the identities used for AUC analysis
indices_for_silhouette = []
identity_labels_for_silhouette = []

# Collect indices and identity labels for all images belonging to the identities used for AUC
for pid in ids_to_use:
    indices = indices_per_id[pid]
    indices_for_silhouette.extend(indices)
    identity_labels_for_silhouette.extend([pid] * len(indices))

# Ensure we have enough samples (at least 2) and more than one unique label for Silhouette Score
if len(indices_for_silhouette) < 2 or len(set(identity_labels_for_silhouette)) < 2:
    print("Not enough samples or unique identities to compute Silhouette Score.")
else:
    # Get the corresponding raw embeddings for these indices
    embeddings_for_silhouette = real_embeddings_numpy[indices_for_silhouette]

    # Calculate the Silhouette Score
    # Use the raw embeddings and the identity labels as clusters
    silhouette_avg = silhouette_score(embeddings_for_silhouette, identity_labels_for_silhouette)

    print(f"Silhouette Score (Embeddings vs. Identity Labels): {silhouette_avg:.3f}")

This cell performs non-parametric statistical tests (Mann-Whitney U and Kolmogorov-Smirnov) to compare the distributions of Originality scores between two groups (presumably Celebrity A and Celebrity B, based on the variable names `scores_grupo_A` and `scores_grupo_B`, although there was a `NameError` in a previous attempt). It also includes a bootstrap analysis to estimate the confidence interval for the difference in variances of Originality between these groups.

In [None]:
#Non-Parametric Distribution Tests
from scipy.stats import mannwhitneyu, ks_2samp

# Having the numpy arrays of Originality
# Access the Originalidade scores from the dictionaries
orig_A = scores_group_A['Originality']
orig_B = scores_group_B['Originality']


u_stat, p_mw = mannwhitneyu(orig_A, orig_B, alternative='two-sided')
ks_stat, p_ks = ks_2samp(orig_A, orig_B)

print(f"Mann-Whitney U: stat={u_stat:.1f}, p={p_mw:.3f}")
print(f"Kolmogorov-Smirnov: stat={ks_stat:.3f}, p={p_ks:.3f}")

#Mann-Whitney U tests if one distribution tends to result in larger values than the other, without assuming normality.

#KS test compares the entire shape of the distribution.

#Bootstrap to Estimate Confidence Intervals
import numpy as np

def bootstrap_statistic(data1, data2, stat_fn, n_boot=1000):
    """
    Returns the bootstrap distribution of stat_fn(data1) - stat_fn(data2).
    """
    diffs = []
    combined = np.concatenate([data1, data2])
    n1, n2 = len(data1), len(data2)
    for _ in range(n_boot):
        # Resampling with replacement
        b1 = np.random.choice(data1, size=n1, replace=True)
        b2 = np.random.choice(data2, size=n2, replace=True)
        diffs.append(stat_fn(b1) - stat_fn(b2))
    return np.array(diffs)

# Example: difference of variances of Originality
boot_diffs = bootstrap_statistic(orig_A, orig_B, np.var, n_boot=2000)
ci_lower, ci_upper = np.percentile(boot_diffs, [2.5, 97.5])
print(f"95% CI of variance difference: [{ci_lower:.2f}, {ci_upper:.2f}]")

This cell reloads the analysis results from the Parquet checkpoint file and ensures the 'CLS' column exists. It then calculates the percentile thresholds for Uniqueness and Originality scores (e.g., the 25th percentile). Finally, it creates a mask to identify "generic" samples as those falling below both the Uniqueness and Originality thresholds and creates a new DataFrame containing only these generic samples, along with their count and percentage of the total dataset.

In [None]:
import pandas as pd
import numpy as np

# Load the results from our secure checkpoint
try:
    df_results = pd.read_parquet('checkpoint_final_results.parquet')
    print("Checkpoint file loaded successfully!")
    print(f"Total samples loaded: {len(df_results)}")
except FileNotFoundError:
    print("ERROR: The file 'checkpoint_final_results.parquet' was not found.")
    print("Please ensure the saving cell was executed successfully.")
    # Stop execution if the file does not exist
    # df_results = None

# Ensure the dataframe was loaded before proceeding
if 'df_results' in locals() and df_results is not None:

    # QUESTION: Does your dataframe already have the 'CLS' column?
    # If not, we need to calculate it first.
    # Assuming yes, and that the score columns are named 'Unicidade' and 'Originalidade'
    if 'CLS' not in df_results.columns:
        # If the CLS column does not exist, let's create it.
        # Adapt the names 'Unicidade' and 'Originalidade' if they are different.
        df_results['CLS'] = (df_results['Unicidade'] + df_results['Originalidade']) / 2 # Changed from 'Uniqueness' and 'Originality' to 'Unicidade' and 'Originalidade'
        print("'CLS' column calculated and added.")

    # Filter out noise (cluster -1) to not include in the mean calculation
    df_clusters_only = df_results[df_results['cluster_label'] != -1].copy()

    # Calculate the average CLS per cluster
    avg_cls_per_cluster = df_clusters_only.groupby('cluster_label')['CLS'].mean().sort_values(ascending=False)

    # Identify the cluster with the highest average CLS
    creative_cluster_id = avg_cls_per_cluster.idxmax()

    print("\n--- Creative Score (CLS) Analysis by Cluster ---")
    print(avg_cls_per_cluster)
    print("----------------------------------------------------")
    print(f"\n🏆 The Creative Cluster (Cluster C) is: CLUSTER {creative_cluster_id}")
    print("----------------------------------------------------")

    # Step 1: Define the percentile threshold (25% is a good start)
    percentile_threshold = 0.25

    # Step 2: Calculate the value of the scores at this percentile
    unicidade_threshold = df_results['Unicidade'].quantile(percentile_threshold)
    originalidade_threshold = df_results['Originalidade'].quantile(percentile_threshold)

    print(f"Uniqueness Threshold (Percentile {percentile_threshold*100}%): {unicidade_threshold:.4f}")
    print(f"Originality Threshold (Percentile {percentile_threshold*100}%): {originalidade_threshold:.4f}")

    # Step 3: Create a mask to select the "generic" images
    mask_generic = (df_results['Unicidade'] < unicidade_threshold) & \
                    (df_results['Originalidade'] < originalidade_threshold)

    # Step 4: Create the new DataFrame with only the generic samples
    df_generic = df_results[mask_generic].copy() # Use .copy() to avoid SettingWithCopyWarning

    print(f"\nTotal samples in the complete dataset: {len(df_results)}")
    print(f"Number of samples in the 'generic' subset: {len(df_generic)}")
    print(f"Percentage of the dataset selected: {(len(df_generic) / len(df_results) * 100):.2f}%")

    # Now, 'df_generic' is ready for identity analysis.

In [None]:
#=============================
# CHECKPOINT CREATION CELL (v2.1 - CORRECTED)
# Execute this cell to save all final analysis results.
# ==========================================================
print("Starting final checkpoint creation...")

import pandas as pd
import os
from torchvision import datasets
from torchvision import transforms
import numpy as np
from scipy.spatial.distance import pdist, squareform

# Load necessary variables if they don't exist (assuming the user might restart runtime)
try:
    _ = full_dataset
    _ = subset_indices
    _ = clusters_umap # Check if clusters_umap exists from UMAP+HDBSCAN
    _ = real_embeddings_numpy # Check if real_embeddings_numpy exists
    _ = df_attributes # Check if df_attributes exists
    print("Necessary variables (full_dataset, subset_indices, clusters_umap, real_embeddings_numpy, df_attributes) found.")
except NameError:
    print("One or more necessary variables not found. Attempting to load/recreate minimal data...")
    # --- Recreate minimal necessary variables ---
    base_dataset_path = '/content/celeba_images/img_align_celeba/'
    basic_transforms = transforms.Compose([transforms.Resize((64, 64)), transforms.ToTensor()])
    full_dataset = datasets.ImageFolder(root=base_dataset_path, transform=basic_transforms)
    subset_indices = list(range(10000))
    print("full_dataset and subset_indices recreated.")

    # Load attributes file
    attributes_path = "/content/celeba_images/list_attr_celeba.csv"
    try:
        df_attributes = pd.read_csv(attributes_path).set_index('image_id')
        print("Attributes file loaded.")
    except FileNotFoundError:
        print(f"ERROR: Attributes file not found at '{attributes_path}'. Cannot save full checkpoint.")
        df_attributes = pd.DataFrame() # Create empty df to avoid errors later

    # Note: real_embeddings_numpy and clusters_umap cannot be reliably recreated here.
    # The user must run the relevant cells to generate them if they are missing.
    print("real_embeddings_numpy and clusters_umap are NOT recreated. Please run the relevant cells.")
    real_embeddings_numpy = None
    clusters_umap = None # Ensure it's None if not found


# --- Recalculate Uniqueness and Originality from Embeddings ---
# This ensures the scores are based on the actual embeddings used, even if the heuristic objects are gone
if real_embeddings_numpy is not None:
    print("\nRecalculating Uniqueness and Originality scores...")

    # Uniqueness (based on average distance to other points)
    # Calculate pairwise distances (Euclidean is common for this)
    distances = pdist(real_embeddings_numpy, metric='euclidean')
    square_distances = squareform(distances)

    # Calculate average distance for each point to all others
    # Add a small epsilon to avoid division by zero if somehow a point has no distance to others (shouldn't happen)
    avg_distances = (np.sum(square_distances, axis=1) / (real_embeddings_numpy.shape[0] - 1)) + 1e-8 # Exclude distance to self

    # A higher average distance means higher uniqueness (more isolated)
    uniqueness_scores = avg_distances

    # Originality (based on local density - inverse of distance to k-nearest neighbors)
    # Re-using the logic from LatentSpaceAnalyzer's calculate_density_map, but inverse
    k = 10 # Number of neighbors to consider for density
    if k >= len(real_embeddings_numpy):
         print(f"Warning: k ({k}) is too large for Originality calculation. Adjusting k.")
         k = len(real_embeddings_numpy) - 1
         if k < 1:
              print("Error: Not enough samples for Originality calculation.")
              originality_scores = np.zeros(len(real_embeddings_numpy))
         else:
              neighbors_model = NearestNeighbors(n_neighbors=k + 1, algorithm='kd_tree', leaf_size=30, n_jobs=-1)
              neighbors_model.fit(real_embeddings_numpy)
              distances_knn, _ = neighbors_model.kneighbors(real_embeddings_numpy)
              # Average distance to k neighbors (excluding self)
              avg_knn_distance = np.mean(distances_knn[:, 1:], axis=1) + 1e-8
              # Originality is higher in low-density areas, so it's related to higher average KNN distance
              originality_scores = avg_knn_distance # Use average distance as originality score (higher distance = higher originality)

    else:
        neighbors_model = NearestNeighbors(n_neighbors=k + 1, algorithm='kd_tree', leaf_size=30, n_jobs=-1)
        neighbors_model.fit(real_embeddings_numpy)
        distances_knn, _ = neighbors_model.kneighbors(real_embeddings_numpy)
        avg_knn_distance = np.mean(distances_knn[:, 1:], axis=1) + 1e-8
        originality_scores = avg_knn_distance


    print("Uniqueness and Originality scores recalculated.")
else:
    print("\nCannot recalculate Uniqueness and Originality: real_embeddings_numpy is not available.")
    uniqueness_scores = None
    originality_scores = None


# --- Prepare the final DataFrame ---
# We need the attributes for the subset
subset_attributes = pd.DataFrame()
if not df_attributes.empty:
    file_names = [os.path.basename(full_dataset.samples[i][0]) for i in subset_indices]
    # Ensure file names from subset_indices exist in the attributes index
    existing_files = [name for name in file_names if name in df_attributes.index]
    if len(existing_files) != len(file_names):
        print(f"Warning: {len(file_names) - len(existing_files)} subset file names not found in attributes file. Filtering.")
        file_names = existing_files # Filter file_names to only include existing ones

    if file_names:
         subset_attributes = df_attributes.loc[file_names].copy()
         subset_attributes = (subset_attributes + 1) / 2 # Convert -1/1 to 0/1
         print(f"Subset attributes prepared: {subset_attributes.shape}")
    else:
         print("No matching file names found between subset_indices and attributes file. Cannot include attributes.")
else:
    print("df_attributes is empty. Cannot include attributes in checkpoint.")


# Create the main DataFrame to save
if not subset_attributes.empty:
    df_to_save = subset_attributes.copy()

    # Add cluster labels if available
    if clusters_umap is not None and len(clusters_umap) == len(subset_attributes):
        df_to_save['cluster_label'] = clusters_umap
        print("'cluster_label' added.")
    else:
        df_to_save['cluster_label'] = -2 # Use -2 to indicate missing/unavailable cluster label
        print("'cluster_label' not available or size mismatch. Added with -2.")

    # Add heuristic scores if available and size matches
    if uniqueness_scores is not None and len(uniqueness_scores) == len(subset_attributes):
         df_to_save['Unicidade'] = uniqueness_scores[subset_indices] # Ensure scores are for the subset
         print("'Unicidade' added.")
    else:
         df_to_save['Unicidade'] = np.nan # Use NaN if scores are missing
         print("'Unicidade' not available or size mismatch. Added with NaN.")

    if originality_scores is not None and len(originality_scores) == len(subset_attributes):
        df_to_save['Originalidade'] = originality_scores[subset_indices] # Ensure scores are for the subset
        print("'Originalidade' added.")
    else:
        df_to_save['Originalidade'] = np.nan # Use NaN if scores are missing
        print("'Originalidade' not available or size mismatch. Added with NaN.")


    # Calculate CLS if Unicidade and Originalidade are available
    if 'Unicidade' in df_to_save.columns and 'Originalidade' in df_to_save.columns and not df_to_save[['Unicidade', 'Originalidade']].isnull().all().all():
        # Only calculate CLS if both scores are present for at least one row
        df_to_save['CLS'] = (df_to_save['Unicidade'] + df_to_save['Originalidade']) / 2
        print("'CLS' calculated and added.")
    else:
        df_to_save['CLS'] = np.nan # Use NaN if CLS cannot be calculated
        print("'CLS' not calculated: Unicidade or Originalidade scores missing.")


    # --- Save the final DataFrame ---
    final_file_path = 'checkpoint_TUDO_INCLUIDO.parquet'
    try:
        df_to_save.to_parquet(final_file_path)
        print("\n--- FINAL CHECKPOINT SUCCESS! ---")
        print(f"Comprehensive checkpoint saved to: '{final_file_path}'")
        print(f"DataFrame shape: {df_to_save.shape}")
        print("==========================================================")
    except Exception as e:
        print(f"\nERROR SAVING CHECKPOINT: {e}")
        print("Could not save the final checkpoint file.")

else:
    print("\nCannot create the final DataFrame to save. Subset attributes not available.")