## Imports

In [None]:
import pandas as pd
import numpy as np
import torch
from torch import nn, optim
from torch.utils.data import TensorDataset, DataLoader
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import seaborn as sns
import os
import warnings

## Configs

In [None]:
dataset_file = 'all_colors_data.csv'
results_name = 'all_colors_data'

base_path = r'C:\Users\shash\OneDrive\Documents\Dessertation\Astro-DIM\data\processed'
results_path = r'C:\Users\shash\OneDrive\Documents\Dessertation\Astro-DIM\results\all colors data'

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

warnings.filterwarnings('ignore')

## Data Loading and Preprocessing

In [None]:
print(f"Loading {dataset_file}...")
df = pd.read_csv(os.path.join(base_path, dataset_file))

if df.isna().any().any():
    df = df.dropna()

sample_size = 100000
if len(df) > sample_size:
    df = df.sample(n=sample_size, random_state=42).reset_index(drop=True)

print(f"Dataset shape: {df.shape}")

In [None]:
X_train_df, X_tmp_df = train_test_split(
    df, train_size=50_000, shuffle=True, random_state=42
)
X_val_df, X_test_df = train_test_split(
    X_tmp_df, train_size=25_000, test_size=25_000, shuffle=True, random_state=42
)

In [None]:
scaler = StandardScaler().fit(X_train_df)
X_train = scaler.transform(X_train_df)
X_val = scaler.transform(X_val_df)
X_test = scaler.transform(X_test_df)

print(f"Train shape: {X_train.shape}")
print(f"Val shape: {X_val.shape}")
print(f"Test shape: {X_test.shape}")

In [None]:
feature_names = list(df.columns)
print(f"Features: {feature_names}")

## PCA

In [None]:
pca_full = PCA()
pca_full.fit(X_train)

explained_variance = pca_full.explained_variance_ratio_
cumulative_variance = np.cumsum(explained_variance)

In [None]:
n_components_95 = np.argmax(cumulative_variance >= 0.95) + 1
print(f"Number of components for 95% variance: {n_components_95}")

results = []
for i, (individual, cumulative) in enumerate(zip(explained_variance, cumulative_variance)):
    results.append({
        'component': i + 1,
        'explained_variance_ratio': individual,
        'cumulative_variance': cumulative
    })

df_results = pd.DataFrame(results)
df_results.to_csv(os.path.join(results_path, f'{results_name}_pca_results.csv'), index=False)

In [None]:
pca_optimal = PCA(n_components=n_components_95)
pca_optimal.fit(X_train)
X_test_pca_optimal = pca_optimal.transform(X_test)
X_train_pca_optimal = pca_optimal.transform(X_train)
X_val_pca_optimal = pca_optimal.transform(X_val)

print(f"Generated PCA embeddings for optimal {n_components_95} components")
optimal_pca_embedding = X_test_pca_optimal

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(12, 5))

axes[0].bar(range(1, min(11, len(explained_variance)+1)), explained_variance[:10])
axes[0].set_title('Individual Variance Explained (First 10 PCs)')
axes[0].set_xlabel('Principal Component')
axes[0].set_ylabel('Variance Explained')

axes[1].plot(range(1, len(cumulative_variance)+1), cumulative_variance, marker='o')
axes[1].axhline(y=0.95, color='red', linestyle='--', label='95%')
axes[1].axvline(x=n_components_95, color='green', linestyle='--', label=f'Optimal: {n_components_95}D')
axes[1].set_title('Cumulative Variance Explained')
axes[1].set_xlabel('Principal Component')
axes[1].set_ylabel('Cumulative Variance')
axes[1].set_xticks(range(1, len(cumulative_variance)+1))
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig(os.path.join(results_path, f'{results_name}_pca_variance_plot.png'), dpi=300, bbox_inches='tight')
plt.show()

In [None]:
components = pca_optimal.components_
components_df = pd.DataFrame(
    components.T,
    columns=[f'PC{i+1}' for i in range(components.shape[0])],
    index=feature_names
)
components_df.to_csv(os.path.join(results_path, f'{results_name}_optimal_loadings.csv'))

In [None]:
plt.figure(figsize=(max(10, n_components_95*2), 8))
sns.heatmap(components_df, annot=True, cmap='RdBu_r', center=0, 
            fmt='.3f', cbar_kws={'label': 'Loading Value'})
plt.title(f'Optimal PCA Component Loadings ({n_components_95}D)')
plt.xlabel('Principal Components')
plt.ylabel('Original Features')
plt.tight_layout()
plt.savefig(os.path.join(results_path, f'{results_name}_optimal_loadings_heatmap.png'), dpi=300, bbox_inches='tight')
plt.show()

In [None]:
n_features = len(feature_names)

for pc_idx in range(n_components_95):
    n_cols = 4
    n_rows = (n_features + n_cols - 1) // n_cols
    
    fig, axes = plt.subplots(n_rows, n_cols, figsize=(4*n_cols, 3*n_rows))
    axes = axes.flatten()
    
    for feat_idx in range(n_features):
        ax = axes[feat_idx]
        
        pc_values = X_test_pca_optimal[:, pc_idx]
        feature_values = X_test[:, feat_idx]
        
        loading = components_df.iloc[feat_idx, pc_idx]
        color = 'blue' if loading > 0 else 'red'
        
        ax.scatter(feature_values, pc_values, alpha=0.5, s=1, c=color)
        ax.set_xlabel(f'{feature_names[feat_idx]}')
        ax.set_ylabel(f'PC{pc_idx+1}')
        ax.set_title(f'Loading: {loading:.3f}')
        ax.grid(True, alpha=0.3)
    
    for idx in range(n_features, len(axes)):
        axes[idx].set_visible(False)
    
    plt.suptitle(f'Optimal PC{pc_idx+1} vs All Features (explains {pca_optimal.explained_variance_ratio_[pc_idx]:.1%} variance)', fontsize=16)
    plt.tight_layout()
    plt.savefig(os.path.join(results_path, f'{results_name}_optimal_PC{pc_idx+1}_vs_all_features.png'), dpi=300, bbox_inches='tight')
    plt.show()

## Autoencoder

In [None]:
class Autoencoder(nn.Module):
    def __init__(self, input_dim, latent_dim):
        super().__init__()
        
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Linear(64, latent_dim)
        )
        
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 64),
            nn.ReLU(),
            nn.Linear(64, input_dim)
        )
    
    def forward(self, x):
        z = self.encoder(x)
        return self.decoder(z)

In [None]:
X_train_tensor = torch.from_numpy(X_train.astype(np.float32))
X_test_tensor = torch.from_numpy(X_test.astype(np.float32))
X_val_tensor = torch.from_numpy(X_val.astype(np.float32))

train_loader = DataLoader(TensorDataset(X_train_tensor, X_train_tensor), batch_size=256, shuffle=True)
test_loader = DataLoader(TensorDataset(X_test_tensor, X_test_tensor), batch_size=256)
val_loader = DataLoader(TensorDataset(X_val_tensor, X_val_tensor), batch_size=256)

In [None]:
input_dim = X_train.shape[1]
latent_dims = list(range(2, input_dim + 1))

ae_results = []
training_history = {}
optimal_ae_model = None

In [None]:
for latent_dim in latent_dims:
    print(f"\nTraining autoencoder with {latent_dim} latent dimensions...")
    
    model = Autoencoder(input_dim, latent_dim).to(device)
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    criterion = nn.MSELoss()
    
    epoch_train_losses = []
    epoch_val_losses = []
    
    for epoch in range(100):
        model.train()
        total_train_loss = 0.0
        for batch_x, _ in train_loader:
            batch_x = batch_x.to(device)
            optimizer.zero_grad()
            recon = model(batch_x)
            loss = criterion(recon, batch_x)
            loss.backward()
            optimizer.step()
            total_train_loss += loss.item() * batch_x.size(0)
        
        avg_train_loss = total_train_loss / len(train_loader.dataset)
        epoch_train_losses.append(avg_train_loss)
        
        model.eval()
        total_val_loss = 0.0
        with torch.no_grad():
            for batch_x, _ in val_loader:
                batch_x = batch_x.to(device)
                recon = model(batch_x)
                total_val_loss += criterion(recon, batch_x).item() * batch_x.size(0)
        
        avg_val_loss = total_val_loss / len(val_loader.dataset)
        epoch_val_losses.append(avg_val_loss)
        
        if (epoch + 1) % 20 == 0:
            print(f"  Epoch {epoch+1}: Train Loss = {avg_train_loss:.6f}, Val Loss = {avg_val_loss:.6f}")
    
    training_history[latent_dim] = {
        'train': epoch_train_losses,
        'val': epoch_val_losses
    }
    
    model.eval()
    test_loss = 0.0
    with torch.no_grad():
        for batch_x, _ in test_loader:
            batch_x = batch_x.to(device)
            recon = model(batch_x)
            test_loss += criterion(recon, batch_x).item() * batch_x.size(0)
    
    test_mse = test_loss / len(test_loader.dataset)
    
    ae_results.append({
        'latent_dim': latent_dim,
        'train_mse': epoch_train_losses[-1],
        'val_mse': epoch_val_losses[-1],
        'test_mse': test_mse,
        'input_dim': input_dim,
        'compression_ratio': input_dim / latent_dim
    })
    
    print(f"  Test MSE: {test_mse:.6f}")
    
    if test_mse < 0.05 and optimal_ae_model is None:
        print(f"  ✓ Found optimal autoencoder: {latent_dim}D with MSE {test_mse:.6f}")
        optimal_ae_latent_dim = latent_dim
        optimal_ae_model = model 
        print(f"  Saved optimal model, continuing training on remaining dimensions...")

if optimal_ae_model is not None:
    print(f"\nGenerating embeddings from optimal {optimal_ae_latent_dim}D autoencoder...")
    optimal_ae_model.eval()
    with torch.no_grad():
        
        ae_embeddings = []
        for batch_x, _ in test_loader:
            batch_x = batch_x.to(device)
            embedding = optimal_ae_model.encoder(batch_x).cpu().numpy()
            ae_embeddings.append(embedding)
        optimal_ae_embedding = np.vstack(ae_embeddings)
        
        ae_train_embeddings = []
        for batch_x, _ in train_loader:
            batch_x = batch_x.to(device)
            embedding = optimal_ae_model.encoder(batch_x).cpu().numpy()
            ae_train_embeddings.append(embedding)
        optimal_ae_train_embedding = np.vstack(ae_train_embeddings)
        
        ae_val_embeddings = []
        for batch_x, _ in val_loader:
            batch_x = batch_x.to(device)
            embedding = optimal_ae_model.encoder(batch_x).cpu().numpy()
            ae_val_embeddings.append(embedding)
        optimal_ae_val_embedding = np.vstack(ae_val_embeddings)
    
    print(f"Generated embeddings for optimal {optimal_ae_latent_dim}D autoencoder")

if 'optimal_ae_embedding' not in locals():
    print("\nNo autoencoder achieved MSE < 0.05. Using best performing model...")
    best_ae_result = min(ae_results, key=lambda x: x['test_mse'])
    optimal_ae_latent_dim = best_ae_result['latent_dim']
    print(f"Best model: {optimal_ae_latent_dim}D with MSE {best_ae_result['test_mse']:.6f}")
    
    model = Autoencoder(input_dim, optimal_ae_latent_dim).to(device)
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    criterion = nn.MSELoss()
    
    for epoch in range(100):
        model.train()
        for batch_x, _ in train_loader:
            batch_x = batch_x.to(device)
            optimizer.zero_grad()
            recon = model(batch_x)
            loss = criterion(recon, batch_x)
            loss.backward()
            optimizer.step()
    
    model.eval()
    with torch.no_grad():
        ae_embeddings = []
        for batch_x, _ in test_loader:
            batch_x = batch_x.to(device)
            embedding = model.encoder(batch_x).cpu().numpy()
            ae_embeddings.append(embedding)
        optimal_ae_embedding = np.vstack(ae_embeddings)
        
        ae_train_embeddings = []
        for batch_x, _ in train_loader:
            batch_x = batch_x.to(device)
            embedding = model.encoder(batch_x).cpu().numpy()
            ae_train_embeddings.append(embedding)
        optimal_ae_train_embedding = np.vstack(ae_train_embeddings)
        
        ae_val_embeddings = []
        for batch_x, _ in val_loader:
            batch_x = batch_x.to(device)
            embedding = model.encoder(batch_x).cpu().numpy()
            ae_val_embeddings.append(embedding)
        optimal_ae_val_embedding = np.vstack(ae_val_embeddings)

In [None]:
print(f"\nOptimal selections:")
print(f"PCA: {n_components_95}D (explains {cumulative_variance[n_components_95-1]:.1%} variance)")
print(f"Autoencoder: {optimal_ae_latent_dim}D")

In [None]:
df_ae_results = pd.DataFrame(ae_results)
df_ae_results.to_csv(os.path.join(results_path, f'{results_name}_ae_results.csv'), index=False)

print(f"\nAutoencoder Results Summary:")
print(df_ae_results)

In [None]:
print("\nCreating combined embeddings with optimal dimensions only...")
print(f"PCA: {n_components_95}D, Autoencoder: {optimal_ae_latent_dim}D")

embeddings_data = []

for i in range(len(X_test)):
    row_data = {'sample_id': i}
    
    for j, feature_name in enumerate(feature_names):
        row_data[f'feature_{feature_name}'] = X_test[i, j]
    
    for j in range(n_components_95):
        row_data[f'PCA_{j+1}'] = optimal_pca_embedding[i, j]
    
    for j in range(optimal_ae_latent_dim):
        row_data[f'AE_{j+1}'] = optimal_ae_embedding[i, j]
    
    embeddings_data.append(row_data)

df_embeddings = pd.DataFrame(embeddings_data)

print(f"Combined embeddings shape: {df_embeddings.shape}")
print(f"Features: {len(feature_names)} original + {n_components_95} PCA + {optimal_ae_latent_dim} AE = {len(feature_names) + n_components_95 + optimal_ae_latent_dim} dimensions")

In [None]:
embeddings_filename = f'{results_name}_optimal_embeddings.csv'
df_embeddings.to_csv(os.path.join(results_path, embeddings_filename), index=False)
print(f"Optimal embeddings saved to: {embeddings_filename}")

## PCA Reconstruction

In [None]:
pca_comparison_results = []

for n_components in range(2, input_dim + 1):
    pca_temp = PCA(n_components=n_components)
    pca_temp.fit(X_train)
    
    X_test_pca_temp = pca_temp.transform(X_test)
    X_test_reconstructed = pca_temp.inverse_transform(X_test_pca_temp)
    
    pca_mse = np.mean((X_test - X_test_reconstructed) ** 2)
    explained_var = np.sum(pca_temp.explained_variance_ratio_)
    
    pca_comparison_results.append({
        'n_components': n_components,
        'pca_mse': pca_mse,
        'explained_variance': explained_var
    })

df_pca_comparison = pd.DataFrame(pca_comparison_results)
df_pca_comparison.to_csv(os.path.join(results_path, f'{results_name}_pca_comparison.csv'), index=False)

print("PCA Reconstruction Comparison:")
print(df_pca_comparison)

In [None]:
plt.figure(figsize=(16, 12))

plt.subplot(2, 2, 1)
colors = ['red', 'blue', 'green', 'orange', 'purple', 'brown', 'pink', 'gray', 'olive', 'cyan', 'yellow', 'black', 'white', 'maroon', 'navy', 'lime', 'teal', 'silver', 'gold', 'indigo', 'violet', 'turquoise', 'coral', 'salmon']
marker_epochs = [20, 40, 60, 80, 100]

filtered_latent_dims = [dim for dim in latent_dims if optimal_ae_latent_dim - 2 <= dim <= optimal_ae_latent_dim + 2]

for i, latent_dim in enumerate(filtered_latent_dims): 
    if latent_dim in training_history:
        train_losses = [training_history[latent_dim]['train'][epoch-1] for epoch in marker_epochs]
        val_losses = [training_history[latent_dim]['val'][epoch-1] for epoch in marker_epochs]
        
        linewidth = 3 if latent_dim == optimal_ae_latent_dim else 2
        alpha = 1.0 if latent_dim == optimal_ae_latent_dim else 0.7
        color = colors[i % len(colors)]
        
        plt.plot(marker_epochs, train_losses, 
                 label=f'{latent_dim}D Train', 
                 color=color, linewidth=linewidth, alpha=alpha, marker='o', markersize=6)
        plt.plot(marker_epochs, val_losses, 
                 label=f'{latent_dim}D Val', 
                 color=color, linewidth=linewidth, alpha=alpha, marker='s', markersize=6, linestyle='--')

plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training vs Validation Loss (Optimal ± 2 Models)')
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.grid(True, alpha=0.3)
plt.xticks([20, 40, 60, 80, 100])
plt.xlim(15, 105)

plt.subplot(2, 2, 2)
plt.plot(df_ae_results['latent_dim'], df_ae_results['test_mse'], marker='o', linewidth=2, color='darkblue', markersize=8)
plt.axhline(y=0.05, color='red', linestyle='--', alpha=0.7, label='MSE = 0.05 threshold')
plt.scatter([optimal_ae_latent_dim], [next(r['test_mse'] for r in ae_results if r['latent_dim'] == optimal_ae_latent_dim)], 
           color='green', s=100, zorder=5, label=f'Optimal: {optimal_ae_latent_dim}D')
plt.xlabel('Latent Dimensions')
plt.ylabel('Test MSE')
plt.title('Autoencoder Test MSE vs Latent Dimensions')
plt.grid(True, alpha=0.3)
plt.legend()
plt.xticks(range(int(df_ae_results['latent_dim'].min()), int(df_ae_results['latent_dim'].max()) + 1))

plt.subplot(2, 2, 3)
x_pos = df_ae_results['latent_dim']
width = 0.25

bars1 = plt.bar(x_pos - width, df_ae_results['train_mse'], width, label='Train MSE', alpha=0.8, color='lightblue')
bars2 = plt.bar(x_pos, df_ae_results['val_mse'], width, label='Val MSE', alpha=0.8, color='orange')  
bars3 = plt.bar(x_pos + width, df_ae_results['test_mse'], width, label='Test MSE', alpha=0.8, color='lightgreen')

optimal_idx = next(i for i, r in enumerate(ae_results) if r['latent_dim'] == optimal_ae_latent_dim)
bars1[optimal_idx].set_edgecolor('black')
bars1[optimal_idx].set_linewidth(2)
bars2[optimal_idx].set_edgecolor('black') 
bars2[optimal_idx].set_linewidth(2)
bars3[optimal_idx].set_edgecolor('black')
bars3[optimal_idx].set_linewidth(2)

plt.axhline(y=0.05, color='red', linestyle='--', alpha=0.7, label='MSE = 0.05 threshold')
plt.xlabel('Latent Dimensions')
plt.ylabel('MSE')
plt.title('Train/Val/Test MSE Comparison')
plt.legend()
plt.xticks(range(int(df_ae_results['latent_dim'].min()), int(df_ae_results['latent_dim'].max()) + 1))
plt.grid(True, alpha=0.3, axis='y')

plt.subplot(2, 2, 4)
plt.plot(df_ae_results['latent_dim'], df_ae_results['test_mse'], 'o-', label='Autoencoder Test MSE', linewidth=2, markersize=8, color='blue')
plt.plot(df_pca_comparison['n_components'], df_pca_comparison['pca_mse'], 's-', label='PCA Test MSE', linewidth=2, markersize=8, color='orange')

plt.axhline(y=0.05, color='red', linestyle='--', alpha=0.7, label='AE MSE = 0.05 threshold')
plt.scatter([n_components_95], [df_pca_comparison[df_pca_comparison['n_components'] == n_components_95]['pca_mse'].iloc[0]], 
           color='green', s=100, zorder=5, label=f'PCA 95%: {n_components_95}D')
plt.scatter([optimal_ae_latent_dim], [next(r['test_mse'] for r in ae_results if r['latent_dim'] == optimal_ae_latent_dim)], 
           color='purple', s=100, zorder=5, label=f'Optimal AE: {optimal_ae_latent_dim}D')

plt.xlabel('Dimensions')
plt.ylabel('Test MSE')
plt.title('Autoencoder vs PCA Reconstruction Error')
plt.legend()
plt.grid(True, alpha=0.3)
plt.xticks(range(2, input_dim + 1))

plt.tight_layout()
plt.savefig(os.path.join(results_path, f'{results_name}_optimal_analysis_plots.png'), dpi=300, bbox_inches='tight')
plt.show()