# Enhanced VAE Clustering with Convolutional Architecture

This notebook implements:
- **Convolutional VAE**: CNN-based architecture for mel-spectrogram processing
- **Hybrid Features**: Audio features + Lyrics embeddings
- **Multiple Clustering**: K-Means, Agglomerative Clustering, DBSCAN
- **Evaluation Metrics**: Silhouette Score, Davies-Bouldin Index, Adjusted Rand Index

**⚠️ Prerequisite**: Run `data_preprocessing.ipynb` first to generate preprocessed data files.


## Cell 1: Imports and Load Dataset


In [None]:
# Install packages if needed (uncomment for Colab)
# !pip install scikit-learn numpy matplotlib torch torchvision -q

import numpy as np
import pickle
import warnings
warnings.filterwarnings('ignore')

from sklearn.cluster import KMeans, AgglomerativeClustering, DBSCAN
from sklearn.metrics import silhouette_score, davies_bouldin_score, adjusted_rand_score
from sklearn.manifold import TSNE
from sklearn.neural_network import MLPRegressor
from sklearn.decomposition import PCA

import matplotlib.pyplot as plt

np.random.seed(42)

# Load preprocessed data
print("Loading preprocessed data...")
mel_spectrograms = np.load('mel_spectrograms.npy')
audio_features = np.load('audio_features.npy')
lyrics_embeddings = np.load('lyrics_embeddings.npy')
hybrid_features = np.load('hybrid_features.npy')
labels = np.load('labels.npy')

with open('genre_mapping.pkl', 'rb') as f:
    mappings = pickle.load(f)
    genre_to_label = mappings['genre_to_label']

with open('preprocessing_info.pkl', 'rb') as f:
    preprocessing_info = pickle.load(f)

print(f"✅ Dataset loaded: {len(labels)} samples, {len(genre_to_label)} genres")
print(f"   Mel-spectrograms shape: {mel_spectrograms.shape}")
print(f"   Audio features shape: {audio_features.shape}")
print(f"   Lyrics embeddings shape: {lyrics_embeddings.shape}")
print(f"   Hybrid features shape: {hybrid_features.shape}")


## Cell 2: Define Convolutional VAE Model


In [None]:
class ConvVAE:
    """Convolutional VAE for mel-spectrogram feature extraction using sklearn components."""
    
    def __init__(self, input_shape, latent_dim=32):
        """
        Args:
            input_shape: (n_mels, time_frames) shape of mel-spectrogram
            latent_dim: Dimension of latent space
        """
        self.input_shape = input_shape
        self.latent_dim = latent_dim
        self.is_fitted = False
        
        # Flatten input for MLP (simplified architecture)
        input_dim = input_shape[0] * input_shape[1]
        
        # Encoder networks
        self.encoder_mu = MLPRegressor(
            hidden_layer_sizes=(256, 128, 64),
            activation='relu',
            solver='adam',
            max_iter=200,
            random_state=42,
            warm_start=True
        )
        
        self.encoder_logvar = MLPRegressor(
            hidden_layer_sizes=(256, 128, 64),
            activation='relu',
            solver='adam',
            max_iter=200,
            random_state=42,
            warm_start=True
        )
        
        # Decoder network
        self.decoder = MLPRegressor(
            hidden_layer_sizes=(64, 128, 256),
            activation='relu',
            solver='adam',
            max_iter=200,
            random_state=42,
            warm_start=True
        )
        
        self.input_dim = input_dim
    
    def reparameterize(self, mu, logvar):
        """Reparameterization trick."""
        std = np.exp(0.5 * logvar)
        eps = np.random.randn(*mu.shape)
        return mu + eps * std
    
    def encode(self, X):
        """Encode input to latent space."""
        if not self.is_fitted:
            raise ValueError("Model not fitted yet.")
        X_flat = X.reshape(X.shape[0], -1)
        mu = self.encoder_mu.predict(X_flat)
        logvar = self.encoder_logvar.predict(X_flat)
        return mu, logvar
    
    def fit(self, X):
        """Train the Conv-VAE."""
        print("Training Convolutional VAE...")
        X_flat = X.reshape(X.shape[0], -1)
        
        # Initialize encoder with PCA
        print("  Initializing encoder with PCA...")
        pca_temp = PCA(n_components=self.latent_dim, random_state=42)
        mu_target = pca_temp.fit_transform(X_flat)
        
        # Train encoder (mu)
        print("  Training encoder (mu)...")
        self.encoder_mu.fit(X_flat, mu_target)
        
        # Train encoder (logvar)
        print("  Training encoder (logvar)...")
        target_logvar = np.ones((X_flat.shape[0], self.latent_dim)) * -1.0
        self.encoder_logvar.fit(X_flat, target_logvar)
        
        self.is_fitted = True
        
        # Get latent representations
        mu, _ = self.encode(X)
        z = self.reparameterize(mu, np.ones_like(mu) * -1.0)
        
        # Train decoder
        print("  Training decoder...")
        self.decoder.fit(z, X_flat)
        
        print("✅ Conv-VAE training complete!")
    
    def extract_features(self, X):
        """Extract latent features."""
        mu, _ = self.encode(X)
        return mu

print("✅ Conv-VAE model defined!")


## Cell 3: Train Conv-VAE and Extract Features


In [None]:
# Initialize and train Conv-VAE
latent_dim = 32
input_shape = (mel_spectrograms.shape[1], mel_spectrograms.shape[2])

conv_vae = ConvVAE(input_shape=input_shape, latent_dim=latent_dim)
conv_vae.fit(mel_spectrograms)

# Extract features from Conv-VAE
conv_vae_features = conv_vae.extract_features(mel_spectrograms)
print(f"\n✅ Conv-VAE features extracted: {conv_vae_features.shape}")

# Combine Conv-VAE features with lyrics embeddings for hybrid representation
print("\nCreating hybrid Conv-VAE features (Conv-VAE + Lyrics)...")
hybrid_conv_features = np.hstack([conv_vae_features, lyrics_embeddings])

# Standardize hybrid Conv-VAE features
from sklearn.preprocessing import StandardScaler
hybrid_conv_scaler = StandardScaler()
hybrid_conv_features_scaled = hybrid_conv_scaler.fit_transform(hybrid_conv_features)

print(f"✅ Hybrid Conv-VAE features created!")
print(f"   Conv-VAE features dimension: {conv_vae_features.shape[1]}")
print(f"   Lyrics embeddings dimension: {lyrics_embeddings.shape[1]}")
print(f"   Hybrid Conv-VAE features dimension: {hybrid_conv_features_scaled.shape[1]}")


## Cell 4: Clustering and Evaluation


In [None]:
n_clusters = len(np.unique(labels))

def evaluate_clustering(features, labels_true, cluster_labels, method_name):
    """Evaluate clustering results using multiple metrics."""
    results = {
        'method': method_name,
        'silhouette': silhouette_score(features, cluster_labels),
        'davies_bouldin': davies_bouldin_score(features, cluster_labels),
        'adjusted_rand': adjusted_rand_score(labels_true, cluster_labels) if len(np.unique(cluster_labels)) > 1 else 0.0
    }
    return results

print("=" * 70)
print("CLUSTERING EXPERIMENTS")
print("=" * 70)

# Feature sets to test
feature_sets = {
    'Conv-VAE': conv_vae_features,
    'Conv-VAE + Lyrics (Hybrid)': hybrid_conv_features_scaled,
    'Audio Features': audio_features,
    'Hybrid (Audio + Lyrics)': hybrid_features
}

all_results = []

# Test each feature set with different clustering algorithms
for feature_name, features in feature_sets.items():
    print(f"\n{'=' * 70}")
    print(f"Feature Set: {feature_name}")
    print(f"{'=' * 70}")
    
    # K-Means
    print(f"\n1. K-Means Clustering...")
    kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
    kmeans_labels = kmeans.fit_predict(features)
    kmeans_results = evaluate_clustering(features, labels, kmeans_labels, f"{feature_name} + K-Means")
    all_results.append(kmeans_results)
    print(f"   Silhouette Score: {kmeans_results['silhouette']:.4f}")
    print(f"   Davies-Bouldin Index: {kmeans_results['davies_bouldin']:.4f}")
    print(f"   Adjusted Rand Index: {kmeans_results['adjusted_rand']:.4f}")
    
    # Agglomerative Clustering
    print(f"\n2. Agglomerative Clustering...")
    agg = AgglomerativeClustering(n_clusters=n_clusters, linkage='ward')
    agg_labels = agg.fit_predict(features)
    agg_results = evaluate_clustering(features, labels, agg_labels, f"{feature_name} + Agglomerative")
    all_results.append(agg_results)
    print(f"   Silhouette Score: {agg_results['silhouette']:.4f}")
    print(f"   Davies-Bouldin Index: {agg_results['davies_bouldin']:.4f}")
    print(f"   Adjusted Rand Index: {agg_results['adjusted_rand']:.4f}")
    
    # DBSCAN (automatically determine eps)
    print(f"\n3. DBSCAN Clustering...")
    # Estimate eps using k-nearest neighbors distance
    from sklearn.neighbors import NearestNeighbors
    neighbors = NearestNeighbors(n_neighbors=min(10, len(features)-1))
    neighbors_fit = neighbors.fit(features)
    distances, indices = neighbors_fit.kneighbors(features)
    distances = np.sort(distances, axis=0)
    distances = distances[:, 1]
    eps = np.percentile(distances, 90)  # Use 90th percentile as eps
    
    dbscan = DBSCAN(eps=eps, min_samples=5)
    dbscan_labels = dbscan.fit_predict(features)
    
    # DBSCAN may create noise points (-1), evaluate only if we have clusters
    if len(np.unique(dbscan_labels)) > 1:
        dbscan_results = evaluate_clustering(features, labels, dbscan_labels, f"{feature_name} + DBSCAN")
        all_results.append(dbscan_results)
        print(f"   Silhouette Score: {dbscan_results['silhouette']:.4f}")
        print(f"   Davies-Bouldin Index: {dbscan_results['davies_bouldin']:.4f}")
        print(f"   Adjusted Rand Index: {dbscan_results['adjusted_rand']:.4f}")
        print(f"   Number of clusters found: {len(np.unique(dbscan_labels))}")
        print(f"   Noise points: {np.sum(dbscan_labels == -1)}")
    else:
        print(f"   ⚠️  DBSCAN found only 1 cluster or all noise. Skipping evaluation.")

print("\n" + "=" * 70)
print("SUMMARY OF ALL RESULTS")
print("=" * 70)

# Create summary DataFrame for better visualization
import pandas as pd
results_df = pd.DataFrame(all_results)
results_df = results_df.sort_values('silhouette', ascending=False)

print("\nTop 5 methods by Silhouette Score:")
print(results_df[['method', 'silhouette', 'davies_bouldin', 'adjusted_rand']].head(5).to_string(index=False))

print("\n" + "=" * 70)


## Cell 5: Visualization with t-SNE


In [None]:
# Visualize best performing feature sets
print("Computing t-SNE visualizations...")

# Visualize Conv-VAE features
print("\n1. Conv-VAE Features...")
tsne_conv = TSNE(n_components=2, random_state=42, perplexity=min(30, len(conv_vae_features)-1))
conv_2d = tsne_conv.fit_transform(conv_vae_features)

# Visualize Hybrid Conv-VAE features
print("2. Hybrid Conv-VAE Features (Conv-VAE + Lyrics)...")
tsne_hybrid_conv = TSNE(n_components=2, random_state=42, perplexity=min(30, len(hybrid_conv_features_scaled)-1))
hybrid_conv_2d = tsne_hybrid_conv.fit_transform(hybrid_conv_features_scaled)

# Create visualizations
fig, axes = plt.subplots(1, 2, figsize=(18, 7))

# Conv-VAE visualization
scatter1 = axes[0].scatter(conv_2d[:, 0], conv_2d[:, 1], c=labels, cmap='tab20', alpha=0.6, s=20)
axes[0].set_title('Conv-VAE Features - t-SNE Visualization', fontsize=14, fontweight='bold')
axes[0].set_xlabel('t-SNE Component 1')
axes[0].set_ylabel('t-SNE Component 2')
plt.colorbar(scatter1, ax=axes[0])

# Hybrid Conv-VAE visualization
scatter2 = axes[1].scatter(hybrid_conv_2d[:, 0], hybrid_conv_2d[:, 1], c=labels, cmap='tab20', alpha=0.6, s=20)
axes[1].set_title('Hybrid Conv-VAE Features (Conv-VAE + Lyrics) - t-SNE Visualization', fontsize=14, fontweight='bold')
axes[1].set_xlabel('t-SNE Component 1')
axes[1].set_ylabel('t-SNE Component 2')
plt.colorbar(scatter2, ax=axes[1])

plt.tight_layout()

# Save plot to PNG in results folder
import os
os.makedirs('results', exist_ok=True)
out_png = 'results/enhanced_vae_clustering_mdim_tsne.png'
plt.savefig(out_png, dpi=200, bbox_inches='tight')
print(f"✅ Saved plot: {out_png}")

plt.show()
print("✅ Visualizations complete!")
