In [3]:
# Core Libraries
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.datasets import load_digits, make_blobs
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.decomposition import PCA
import warnings
warnings.filterwarnings('ignore')

# Deep Learning Libraries
try:
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras import layers
    print("✅ TensorFlow version:", tf.__version__)
    TF_AVAILABLE = True
except ImportError:
    print("⚠️ TensorFlow not available. Using sklearn MLPRegressor instead.")
    from sklearn.neural_network import MLPRegressor
    TF_AVAILABLE = False

# Configure plotting
plt.style.use('default')
sns.set_palette("viridis")
plt.rcParams['figure.figsize'] = (15, 10)
plt.rcParams['font.size'] = 11

print("🧠 Ready for autoencoder analysis")

⚠️ TensorFlow not available. Using sklearn MLPRegressor instead.
🧠 Ready for autoencoder analysis


In [2]:
class AutoencoderPipeline:
    """
    Production-ready autoencoder pipeline with multiple architectures.
    """
    
    def __init__(self, encoding_dim=32, architecture='dense', use_tensorflow=True, random_state=42):
        self.encoding_dim = encoding_dim
        self.architecture = architecture
        self.use_tensorflow = use_tensorflow and TF_AVAILABLE
        self.random_state = random_state
        self.encoder = None
        self.decoder = None
        self.autoencoder = None
        self.scaler = None
        self.history = None
        
    def _build_dense_autoencoder(self, input_dim):
        """Build dense (fully connected) autoencoder."""
        if self.use_tensorflow:
            # Input layer
            input_layer = keras.Input(shape=(input_dim,))
            
            # Encoder
            encoded = layers.Dense(256, activation='relu')(input_layer)
            encoded = layers.Dropout(0.2)(encoded)
            encoded = layers.Dense(128, activation='relu')(encoded)
            encoded = layers.Dropout(0.2)(encoded)
            encoded = layers.Dense(self.encoding_dim, activation='relu')(encoded)
            
            # Decoder
            decoded = layers.Dense(128, activation='relu')(encoded)
            decoded = layers.Dropout(0.2)(decoded)
            decoded = layers.Dense(256, activation='relu')(decoded)
            decoded = layers.Dense(input_dim, activation='sigmoid')(decoded)
            
            # Create models
            self.autoencoder = keras.Model(input_layer, decoded)
            self.encoder = keras.Model(input_layer, encoded)
            
            # Decoder model
            encoded_input = keras.Input(shape=(self.encoding_dim,))
            decoder_layers = self.autoencoder.layers[-3:]
            decoded_output = encoded_input
            for layer in decoder_layers:
                decoded_output = layer(decoded_output)
            self.decoder = keras.Model(encoded_input, decoded_output)
            
            self.autoencoder.compile(optimizer='adam', loss='mse', metrics=['mae'])
        
        else:
            # Fallback to sklearn
            self.autoencoder = MLPRegressor(
                hidden_layer_sizes=(256, 128, self.encoding_dim, 128, 256),
                activation='relu',
                solver='adam',
                max_iter=500,
                random_state=self.random_state
            )
    
    def _build_convolutional_autoencoder(self, input_shape):
        """Build convolutional autoencoder for image data."""
        if not self.use_tensorflow:
            raise ValueError("Convolutional autoencoder requires TensorFlow")
        
        # Input
        input_layer = keras.Input(shape=input_shape)
        
        # Encoder
        x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(input_layer)
        x = layers.MaxPooling2D((2, 2), padding='same')(x)
        x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(x)
        encoded = layers.MaxPooling2D((2, 2), padding='same')(x)
        
        # Decoder
        x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(encoded)
        x = layers.UpSampling2D((2, 2))(x)
        x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(x)
        x = layers.UpSampling2D((2, 2))(x)
        decoded = layers.Conv2D(input_shape[-1], (3, 3), activation='sigmoid', padding='same')(x)
        
        self.autoencoder = keras.Model(input_layer, decoded)
        self.encoder = keras.Model(input_layer, encoded)
        self.autoencoder.compile(optimizer='adam', loss='mse')
    
    def fit(self, X, validation_split=0.2, epochs=100, batch_size=128, verbose=1):
        """
        Fit the autoencoder to data.
        
        Parameters:
        -----------
        X : array-like
            Input data
        validation_split : float, default=0.2
            Fraction of data to use for validation
        epochs : int, default=100
            Number of training epochs
        batch_size : int, default=128
            Batch size for training
        verbose : int, default=1
            Verbosity level
        """
        # Preprocess data
        self.scaler = MinMaxScaler()
        X_scaled = self.scaler.fit_transform(X.reshape(len(X), -1))
        
        # Determine architecture
        if len(X.shape) == 4:  # Image data (N, H, W, C)
            self._build_convolutional_autoencoder(X.shape[1:])
            X_train = X_scaled.reshape(X.shape)
        else:  # Tabular data
            self._build_dense_autoencoder(X_scaled.shape[1])
            X_train = X_scaled
        
        # Train model
        if self.use_tensorflow:
            callbacks = [
                keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True),
                keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=5)
            ]
            
            self.history = self.autoencoder.fit(
                X_train, X_train,
                epochs=epochs,
                batch_size=batch_size,
                shuffle=True,
                validation_split=validation_split,
                callbacks=callbacks,
                verbose=verbose
            )
        else:
            self.autoencoder.fit(X_train, X_train)
        
        return self
    
    def encode(self, X):
        """Encode data to latent representation."""
        X_scaled = self.scaler.transform(X.reshape(len(X), -1))
        
        if self.use_tensorflow:
            if len(X.shape) == 4:
                X_input = X_scaled.reshape(X.shape)
            else:
                X_input = X_scaled
            return self.encoder.predict(X_input)
        else:
            # For sklearn, manually extract encoding
            # This is a simplified approach
            return X_scaled[:, :self.encoding_dim]
    
    def decode(self, encoded_data):
        """Decode latent representation back to data space."""
        if self.use_tensorflow:
            return self.decoder.predict(encoded_data)
        else:
            # Simplified decoding for sklearn
            return self.autoencoder.predict(encoded_data)
    
    def reconstruct(self, X):
        """Reconstruct input data."""
        X_scaled = self.scaler.transform(X.reshape(len(X), -1))
        
        if self.use_tensorflow:
            if len(X.shape) == 4:
                X_input = X_scaled.reshape(X.shape)
            else:
                X_input = X_scaled
            return self.autoencoder.predict(X_input)
        else:
            return self.autoencoder.predict(X_scaled)
    
    def plot_training_history(self):
        """Plot training history."""
        if self.history is None:
            print("No training history available")
            return
        
        fig, axes = plt.subplots(1, 2, figsize=(15, 5))
        
        # Loss
        axes[0].plot(self.history.history['loss'], label='Training Loss')
        if 'val_loss' in self.history.history:
            axes[0].plot(self.history.history['val_loss'], label='Validation Loss')
        axes[0].set_title('Model Loss')
        axes[0].set_xlabel('Epoch')
        axes[0].set_ylabel('Loss')
        axes[0].legend()
        axes[0].grid(alpha=0.3)
        
        # MAE (if available)
        if 'mae' in self.history.history:
            axes[1].plot(self.history.history['mae'], label='Training MAE')
            if 'val_mae' in self.history.history:
                axes[1].plot(self.history.history['val_mae'], label='Validation MAE')
            axes[1].set_title('Model MAE')
            axes[1].set_xlabel('Epoch')
            axes[1].set_ylabel('MAE')
            axes[1].legend()
            axes[1].grid(alpha=0.3)
        
        plt.tight_layout()
        plt.show()

def evaluate_autoencoder(X_original, X_reconstructed, X_encoded=None):
    """
    Evaluate autoencoder performance.
    
    Parameters:
    -----------
    X_original : array-like
        Original input data
    X_reconstructed : array-like
        Reconstructed data
    X_encoded : array-like, optional
        Encoded representations
    
    Returns:
    --------
    dict : Evaluation metrics
    """
    # Flatten for consistent comparison
    orig_flat = X_original.reshape(len(X_original), -1)
    recon_flat = X_reconstructed.reshape(len(X_reconstructed), -1)
    
    # Reconstruction metrics
    mse = mean_squared_error(orig_flat, recon_flat)
    mae = np.mean(np.abs(orig_flat - recon_flat))
    
    # Compression ratio
    if X_encoded is not None:
        original_size = orig_flat.shape[1]
        encoded_size = X_encoded.shape[1]
        compression_ratio = original_size / encoded_size
    else:
        compression_ratio = None
    
    # Explained variance (similar to PCA)
    total_variance = np.var(orig_flat)
    residual_variance = np.var(orig_flat - recon_flat)
    explained_variance_ratio = 1 - (residual_variance / total_variance)
    
    return {
        'mse': mse,
        'mae': mae,
        'rmse': np.sqrt(mse),
        'explained_variance_ratio': explained_variance_ratio,
        'compression_ratio': compression_ratio
    }

print("✅ Autoencoder pipeline and evaluation functions ready!")
print("🚀 Ready for neural network-based dimensionality reduction")

✅ Autoencoder pipeline and evaluation functions ready!
🚀 Ready for neural network-based dimensionality reduction


In [5]:
# Data Generation and Preprocessing
def create_autoencoder_datasets():
    """Create various datasets for autoencoder demonstration"""
    datasets = {}
    
    # Dataset 1: MNIST-like data (high-dimensional)
    digits = load_digits()
    datasets['digits'] = (digits.data, digits.target)
    
    # Dataset 2: High-dimensional blob data
    blob_data, blob_labels = make_blobs(n_samples=1000, centers=5, n_features=50, cluster_std=2.0, random_state=42)
    datasets['high_dim_blobs'] = (blob_data, blob_labels)
    
    # Dataset 3: Noisy sine waves (for denoising)
    t = np.linspace(0, 4*np.pi, 200)
    clean_signals = np.array([np.sin(t + i*0.5) for i in range(100)])
    noise = np.random.normal(0, 0.3, clean_signals.shape)
    noisy_signals = clean_signals + noise
    datasets['noisy_signals'] = (noisy_signals, clean_signals)
    
    # Dataset 4: Correlated features (for feature learning)
    n_samples = 1000
    base_features = np.random.randn(n_samples, 10)
    # Create correlations
    corr_matrix = np.random.randn(10, 30)
    correlated_data = base_features @ corr_matrix
    datasets['correlated_features'] = (correlated_data, base_features)
    
    return datasets

# Autoencoder Architecture Functions
def create_autoencoder(input_dim, encoding_dim, hidden_layers=None, activation='relu', optimizer='adam', loss='mse'):
    """
    Create a customizable autoencoder model
    
    Parameters:
    -----------
    input_dim : int
        Input dimension
    encoding_dim : int
        Encoding (bottleneck) dimension
    hidden_layers : list, optional
        List of hidden layer dimensions
    activation : str
        Activation function
    optimizer : str
        Optimizer
    loss : str
        Loss function
    
    Returns:
    --------
    autoencoder, encoder, decoder : tuple
        The complete autoencoder and its components
    """
    # Input layer
    input_layer = Input(shape=(input_dim,))
    
    # Encoder
    if hidden_layers is None:
        hidden_layers = [input_dim // 2, input_dim // 4]
    
    # Build encoder
    x = input_layer
    for units in hidden_layers:
        x = Dense(units, activation=activation)(x)
        x = Dropout(0.2)(x)
    
    # Bottleneck
    encoded = Dense(encoding_dim, activation=activation, name='bottleneck')(x)
    
    # Build decoder (mirror of encoder)
    x = encoded
    for units in reversed(hidden_layers):
        x = Dense(units, activation=activation)(x)
        x = Dropout(0.2)(x)
    
    # Output layer
    decoded = Dense(input_dim, activation='linear')(x)
    
    # Create models
    autoencoder = Model(input_layer, decoded)
    encoder = Model(input_layer, encoded)
    
    # Decoder (for generating new data)
    encoded_input = Input(shape=(encoding_dim,))
    decoder_layers = autoencoder.layers[-(len(hidden_layers)+1):]
    x = encoded_input
    for layer in decoder_layers:
        x = layer(x)
    decoder = Model(encoded_input, x)
    
    # Compile autoencoder
    autoencoder.compile(optimizer=optimizer, loss=loss, metrics=['mae'])
    
    return autoencoder, encoder, decoder

def create_variational_autoencoder(input_dim, latent_dim, hidden_layers=None):
    """
    Create a Variational Autoencoder (VAE)
    
    Parameters:
    -----------
    input_dim : int
        Input dimension
    latent_dim : int
        Latent space dimension
    hidden_layers : list, optional
        Hidden layer dimensions
    
    Returns:
    --------
    vae, encoder, decoder : tuple
        VAE model and components
    """
    if hidden_layers is None:
        hidden_layers = [input_dim // 2, input_dim // 4]
    
    # Encoder
    inputs = Input(shape=(input_dim,))
    x = inputs
    for units in hidden_layers:
        x = Dense(units, activation='relu')(x)
    
    # Latent space parameters
    z_mean = Dense(latent_dim, name='z_mean')(x)
    z_log_var = Dense(latent_dim, name='z_log_var')(x)
    
    # Sampling function
    def sampling(args):
        z_mean, z_log_var = args
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon
    
    z = Lambda(sampling, output_shape=(latent_dim,), name='z')([z_mean, z_log_var])
    
    # Encoder model
    encoder = Model(inputs, [z_mean, z_log_var, z], name='encoder')
    
    # Decoder
    latent_inputs = Input(shape=(latent_dim,), name='z_sampling')
    x = latent_inputs
    for units in reversed(hidden_layers):
        x = Dense(units, activation='relu')(x)
    outputs = Dense(input_dim, activation='linear')(x)
    
    decoder = Model(latent_inputs, outputs, name='decoder')
    
    # VAE model
    vae_outputs = decoder(encoder(inputs)[2])
    vae = Model(inputs, vae_outputs, name='vae')
    
    # VAE loss
    def vae_loss(inputs, outputs):
        reconstruction_loss = tf.keras.losses.mse(inputs, outputs)
        reconstruction_loss *= input_dim
        kl_loss = 1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)
        kl_loss = tf.reduce_mean(kl_loss) * -0.5
        return reconstruction_loss + kl_loss
    
    vae.compile(optimizer='adam', loss=vae_loss)
    
    return vae, encoder, decoder

# Evaluation Functions
def evaluate_autoencoder_performance(original_data, reconstructed_data, encoded_data=None):
    """
    Evaluate autoencoder performance using multiple metrics
    
    Parameters:
    -----------
    original_data : array-like
        Original input data
    reconstructed_data : array-like
        Reconstructed data from autoencoder
    encoded_data : array-like, optional
        Encoded representations
    
    Returns:
    --------
    metrics : dict
        Performance metrics
    """
    metrics = {}
    
    # Reconstruction metrics
    mse = np.mean((original_data - reconstructed_data) ** 2)
    mae = np.mean(np.abs(original_data - reconstructed_data))
    rmse = np.sqrt(mse)
    
    # Explained variance
    total_variance = np.var(original_data)
    residual_variance = np.var(original_data - reconstructed_data)
    explained_variance = 1 - (residual_variance / total_variance)
    
    # Correlation
    flat_original = original_data.flatten()
    flat_reconstructed = reconstructed_data.flatten()
    correlation = np.corrcoef(flat_original, flat_reconstructed)[0, 1]
    
    metrics.update({
        'mse': mse,
        'mae': mae,
        'rmse': rmse,
        'explained_variance': explained_variance,
        'correlation': correlation
    })
    
    # Encoding quality metrics (if encoded data provided)
    if encoded_data is not None:
        compression_ratio = original_data.shape[1] / encoded_data.shape[1]
        encoding_variance = np.var(encoded_data)
        
        metrics.update({
            'compression_ratio': compression_ratio,
            'encoding_variance': encoding_variance,
            'encoding_mean': np.mean(encoded_data),
            'encoding_std': np.std(encoded_data)
        })
    
    return metrics

def plot_autoencoder_results(original_data, reconstructed_data, encoded_data=None, labels=None, n_samples=10, figsize=(15, 10)):
    """
    Visualize autoencoder results
    
    Parameters:
    -----------
    original_data : array-like
        Original data
    reconstructed_data : array-like
        Reconstructed data
    encoded_data : array-like, optional
        Encoded representations
    labels : array-like, optional
        Data labels for coloring
    n_samples : int
        Number of samples to show in detail
    figsize : tuple
        Figure size
    """
    n_plots = 3 if encoded_data is not None else 2
    fig, axes = plt.subplots(2, n_plots, figsize=figsize)
    
    if n_plots == 2:
        axes = axes.reshape(2, 2)
    
    # 1. Original vs Reconstructed (first few samples)
    sample_indices = np.random.choice(len(original_data), n_samples, replace=False)
    
    for i, idx in enumerate(sample_indices[:5]):
        axes[0, 0].plot(original_data[idx], alpha=0.7, label=f'Original {i+1}' if i < 3 else "")
        axes[0, 0].plot(reconstructed_data[idx], alpha=0.7, linestyle='--', label=f'Reconstructed {i+1}' if i < 3 else "")
    
    axes[0, 0].set_title('Original vs Reconstructed Samples')
    axes[0, 0].set_xlabel('Feature Index')
    axes[0, 0].set_ylabel('Value')
    axes[0, 0].legend()
    axes[0, 0].grid(True, alpha=0.3)
    
    # 2. Reconstruction Error Distribution
    reconstruction_errors = np.mean((original_data - reconstructed_data) ** 2, axis=1)
    axes[0, 1].hist(reconstruction_errors, bins=30, alpha=0.7, color='blue')
    axes[0, 1].axvline(np.mean(reconstruction_errors), color='red', linestyle='--', label=f'Mean: {np.mean(reconstruction_errors):.4f}')
    axes[0, 1].set_title('Reconstruction Error Distribution')
    axes[0, 1].set_xlabel('MSE per Sample')
    axes[0, 1].set_ylabel('Frequency')
    axes[0, 1].legend()
    axes[0, 1].grid(True, alpha=0.3)
    
    # 3. Encoded Space Visualization (if available)
    if encoded_data is not None and encoded_data.shape[1] >= 2:
        if labels is not None:
            scatter = axes[0, 2].scatter(encoded_data[:, 0], encoded_data[:, 1], c=labels, cmap='tab10', alpha=0.7, s=30)
            plt.colorbar(scatter, ax=axes[0, 2])
        else:
            axes[0, 2].scatter(encoded_data[:, 0], encoded_data[:, 1], alpha=0.7, s=30)
        
        axes[0, 2].set_title('Encoded Space (First 2 Dimensions)')
        axes[0, 2].set_xlabel('Encoded Dim 1')
        axes[0, 2].set_ylabel('Encoded Dim 2')
        axes[0, 2].grid(True, alpha=0.3)
    elif encoded_data is not None:
        # Show encoded feature distribution
        axes[0, 2].hist(encoded_data.flatten(), bins=30, alpha=0.7)
        axes[0, 2].set_title('Encoded Features Distribution')
        axes[0, 2].set_xlabel('Encoded Value')
        axes[0, 2].set_ylabel('Frequency')
        axes[0, 2].grid(True, alpha=0.3)
    
    # 4. Correlation plot
    flat_original = original_data.flatten()
    flat_reconstructed = reconstructed_data.flatten()
    
    # Sample for visualization if too many points
    if len(flat_original) > 10000:
        sample_idx = np.random.choice(len(flat_original), 10000, replace=False)
        flat_original = flat_original[sample_idx]
        flat_reconstructed = flat_reconstructed[sample_idx]
    
    axes[1, 0].scatter(flat_original, flat_reconstructed, alpha=0.3, s=1)
    
    # Perfect reconstruction line
    min_val = min(flat_original.min(), flat_reconstructed.min())
    max_val = max(flat_original.max(), flat_reconstructed.max())
    axes[1, 0].plot([min_val, max_val], [min_val, max_val], 'r--', label='Perfect Reconstruction')
    
    correlation = np.corrcoef(flat_original, flat_reconstructed)[0, 1]
    axes[1, 0].set_title(f'Original vs Reconstructed\\nCorrelation: {correlation:.4f}')
    axes[1, 0].set_xlabel('Original Values')
    axes[1, 0].set_ylabel('Reconstructed Values')
    axes[1, 0].legend()
    axes[1, 0].grid(True, alpha=0.3)
    
    # 5. Feature importance (variance explained per feature)
    feature_variances_orig = np.var(original_data, axis=0)
    feature_variances_recon = np.var(reconstructed_data, axis=0)
    feature_explained = 1 - (np.var(original_data - reconstructed_data, axis=0) / feature_variances_orig)
    
    # Show top and bottom features
    n_features_show = min(20, len(feature_explained))
    top_features = np.argsort(feature_explained)[-n_features_show:]
    
    axes[1, 1].bar(range(n_features_show), feature_explained[top_features])
    axes[1, 1].set_title(f'Explained Variance by Feature (Top {n_features_show})')
    axes[1, 1].set_xlabel('Feature Index (sorted)')
    axes[1, 1].set_ylabel('Explained Variance')
    axes[1, 1].grid(True, alpha=0.3)
    
    # 6. Training history plot (if space available)
    if encoded_data is not None and n_plots >= 3:
        # Placeholder for training history - would be passed from training
        axes[1, 2].text(0.5, 0.5, 'Training History\\n(Pass history to show)',  ha='center', va='center', transform=axes[1, 2].transAxes, bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.8))
        axes[1, 2].set_title('Training History')
        axes[1, 2].set_xlim(0, 1)
        axes[1, 2].set_ylim(0, 1)
        axes[1, 2].axis('off')
    
    plt.tight_layout()
    return fig

In [6]:
# Reusable Autoencoder Pipeline Class
class AutoencoderPipeline:
    """
    A comprehensive autoencoder pipeline for dimensionality reduction and feature learning
    
    This class provides a complete workflow for autoencoder analysis including:
    - Data preprocessing and normalization
    - Multiple autoencoder architectures (vanilla, variational, denoising)
    - Training with early stopping and monitoring
    - Performance evaluation and visualization
    - Encoded representations extraction
    """
    
    def __init__(self, random_state=42):
        self.random_state = random_state
        self.scaler = StandardScaler()
        self.autoencoder = None
        self.encoder = None
        self.decoder = None
        self.history = None
        self.is_fitted = False
        self.architecture_type = None
        
    def preprocess_data(self, X, scaling_method='standard', noise_factor=0.0):
        """
        Preprocess data for autoencoder training
        
        Parameters:
        -----------
        X : array-like
            Input data
        scaling_method : str
            'standard', 'minmax', or 'none'
        noise_factor : float
            Amount of noise to add (for denoising autoencoders)
        
        Returns:
        --------
        X_processed : array-like
            Preprocessed data
        X_noisy : array-like
            Noisy version (if noise_factor > 0)
        """
        X_processed = np.array(X, dtype=np.float32)
        
        # Apply scaling
        if scaling_method == 'standard':
            self.scaler = StandardScaler()
            X_processed = self.scaler.fit_transform(X_processed)
            print("✅ Data standardized using StandardScaler")
        elif scaling_method == 'minmax':
            self.scaler = MinMaxScaler()
            X_processed = self.scaler.fit_transform(X_processed)
            print("✅ Data normalized using MinMaxScaler")
        elif scaling_method == 'none':
            print("⚠️ No scaling applied")
        
        # Add noise if requested
        X_noisy = None
        if noise_factor > 0:
            noise = np.random.normal(0, noise_factor, X_processed.shape)
            X_noisy = X_processed + noise
            print(f"🔊 Added noise with factor {noise_factor}")
        
        print(f"📊 Processed data shape: {X_processed.shape}")
        
        return X_processed, X_noisy
    
    def build_autoencoder(self, input_dim, encoding_dim, architecture='vanilla', 
                         hidden_layers=None, **kwargs):
        """
        Build autoencoder architecture
        
        Parameters:
        -----------
        input_dim : int
            Input dimension
        encoding_dim : int
            Encoding dimension
        architecture : str
            'vanilla', 'variational', or 'denoising'
        hidden_layers : list, optional
            Hidden layer dimensions
        **kwargs : dict
            Additional architecture parameters
        
        Returns:
        --------
        autoencoder, encoder, decoder : tuple
            Model components
        """
        self.architecture_type = architecture
        
        if architecture == 'vanilla':
            self.autoencoder, self.encoder, self.decoder = create_autoencoder(
                input_dim, encoding_dim, hidden_layers, **kwargs
            )
            print(f"🏗️ Built vanilla autoencoder: {input_dim} → {encoding_dim}")
            
        elif architecture == 'variational':
            self.autoencoder, self.encoder, self.decoder = create_variational_autoencoder(
                input_dim, encoding_dim, hidden_layers
            )
            print(f"🏗️ Built variational autoencoder: {input_dim} → {encoding_dim}")
            
        elif architecture == 'denoising':
            # Same as vanilla but will be trained with noisy input
            self.autoencoder, self.encoder, self.decoder = create_autoencoder(
                input_dim, encoding_dim, hidden_layers, **kwargs
            )
            print(f"🏗️ Built denoising autoencoder: {input_dim} → {encoding_dim}")
        
        # Print model summary
        print("\\n📋 Autoencoder Architecture:")
        self.autoencoder.summary()
        
        return self.autoencoder, self.encoder, self.decoder
    
    def train(self, X_train, X_val=None, epochs=100, batch_size=32, early_stopping=True, patience=10, verbose=1):
        """
        Train the autoencoder
        
        Parameters:
        -----------
        X_train : array-like
            Training data
        X_val : array-like, optional
            Validation data
        epochs : int
            Number of training epochs
        batch_size : int
            Batch size
        early_stopping : bool
            Whether to use early stopping
        patience : int
            Early stopping patience
        verbose : int
            Verbosity level
        
        Returns:
        --------
        history : dict
            Training history
        """
        if self.autoencoder is None:
            raise ValueError("No autoencoder built. Call build_autoencoder first.")
        
        # Prepare callbacks
        callbacks = []
        if early_stopping:
            early_stop = EarlyStopping(
                monitor='val_loss' if X_val is not None else 'loss',
                patience=patience,
                restore_best_weights=True,
                verbose=1
            )
            callbacks.append(early_stop)
        
        # Reduce learning rate on plateau
        reduce_lr = ReduceLROnPlateau(
            monitor='val_loss' if X_val is not None else 'loss',
            factor=0.5,
            patience=patience//2,
            min_lr=1e-6,
            verbose=1
        )
        callbacks.append(reduce_lr)
        
        print(f"🚀 Starting training for {epochs} epochs...")
        print(f"   - Architecture: {self.architecture_type}")
        print(f"   - Batch size: {batch_size}")
        print(f"   - Early stopping: {early_stopping}")
        
        # Train the model
        validation_data = (X_val, X_val) if X_val is not None else None
        
        self.history = self.autoencoder.fit(
            X_train, X_train,
            epochs=epochs,
            batch_size=batch_size,
            validation_data=validation_data,
            callbacks=callbacks,
            verbose=verbose
        )
        
        self.is_fitted = True
        
        print(f"✅ Training completed!")
        print(f"   - Final loss: {self.history.history['loss'][-1]:.6f}")
        if X_val is not None:
            print(f"   - Final val_loss: {self.history.history['val_loss'][-1]:.6f}")
        
        return self.history
    
    def encode(self, X):
        """
        Encode data to latent space
        
        Parameters:
        -----------
        X : array-like
            Input data
        
        Returns:
        --------
        encoded : array-like
            Encoded representations
        """
        if not self.is_fitted:
            raise ValueError("Model not trained. Call train first.")
        
        # Scale data if scaler is available
        if hasattr(self.scaler, 'transform'):
            X_scaled = self.scaler.transform(X)
        else:
            X_scaled = X
        
        if self.architecture_type == 'variational':
            # For VAE, use mean of the distribution
            z_mean, z_log_var, z = self.encoder.predict(X_scaled, verbose=0)
            return z_mean
        else:
            return self.encoder.predict(X_scaled, verbose=0)
    
    def decode(self, encoded_data):
        """
        Decode from latent space
        
        Parameters:
        -----------
        encoded_data : array-like
            Encoded representations
        
        Returns:
        --------
        decoded : array-like
            Decoded data
        """
        if not self.is_fitted:
            raise ValueError("Model not trained. Call train first.")
        
        decoded = self.decoder.predict(encoded_data, verbose=0)
        
        # Inverse transform if scaler is available
        if hasattr(self.scaler, 'inverse_transform'):
            decoded = self.scaler.inverse_transform(decoded)
        
        return decoded
    
    def reconstruct(self, X):
        """
        Reconstruct data (encode then decode)
        
        Parameters:
        -----------
        X : array-like
            Input data
        
        Returns:
        --------
        reconstructed : array-like
            Reconstructed data
        """
        if not self.is_fitted:
            raise ValueError("Model not trained. Call train first.")
        
        # Scale data if scaler is available
        if hasattr(self.scaler, 'transform'):
            X_scaled = self.scaler.transform(X)
        else:
            X_scaled = X
        
        reconstructed = self.autoencoder.predict(X_scaled, verbose=0)
        
        # Inverse transform if scaler is available
        if hasattr(self.scaler, 'inverse_transform'):
            reconstructed = self.scaler.inverse_transform(reconstructed)
        
        return reconstructed
    
    def evaluate_performance(self, X_test, X_original=None):
        """
        Evaluate autoencoder performance
        
        Parameters:
        -----------
        X_test : array-like
            Test data
        X_original : array-like, optional
            Original unscaled data for comparison
        
        Returns:
        --------
        metrics : dict
            Performance metrics
        """
        if not self.is_fitted:
            raise ValueError("Model not trained. Call train first.")
        
        # Get reconstructions
        X_reconstructed = self.reconstruct(X_test)
        X_encoded = self.encode(X_test)
        
        # Use original data if provided, otherwise use test data
        comparison_data = X_original if X_original is not None else X_test
        
        # Calculate metrics
        metrics = evaluate_autoencoder_performance(
            comparison_data, X_reconstructed, X_encoded
        )
        
        # Add autoencoder-specific metrics
        metrics['latent_dimensionality'] = X_encoded.shape[1]
        metrics['compression_ratio'] = X_test.shape[1] / X_encoded.shape[1]
        
        return metrics
    
    def plot_results(self, X_test, labels=None, X_original=None, figsize=(16, 12)):
        """
        Plot comprehensive autoencoder results
        
        Parameters:
        -----------
        X_test : array-like
            Test data
        labels : array-like, optional
            Labels for visualization
        X_original : array-like, optional
            Original unscaled data
        figsize : tuple
            Figure size
        
        Returns:
        --------
        fig : matplotlib.figure.Figure
            The figure object
        """
        if not self.is_fitted:
            raise ValueError("Model not trained. Call train first.")
        
        # Get reconstructions and encodings
        X_reconstructed = self.reconstruct(X_test)
        X_encoded = self.encode(X_test)
        
        # Use original data if provided
        comparison_data = X_original if X_original is not None else X_test
        
        # Create comprehensive plot
        fig = plot_autoencoder_results(
            comparison_data, X_reconstructed, X_encoded, labels, figsize=figsize
        )
        
        return fig
    
    def plot_training_history(self, figsize=(12, 4)):
        """
        Plot training history
        
        Parameters:
        -----------
        figsize : tuple
            Figure size
        
        Returns:
        --------
        fig : matplotlib.figure.Figure
            The figure object
        """
        if self.history is None:
            raise ValueError("No training history available. Train the model first.")
        
        fig, axes = plt.subplots(1, 2, figsize=figsize)
        
        # Loss plot
        axes[0].plot(self.history.history['loss'], label='Training Loss')
        if 'val_loss' in self.history.history:
            axes[0].plot(self.history.history['val_loss'], label='Validation Loss')
        axes[0].set_title('Model Loss')
        axes[0].set_xlabel('Epoch')
        axes[0].set_ylabel('Loss')
        axes[0].legend()
        axes[0].grid(True, alpha=0.3)
        
        # MAE plot (if available)
        if 'mae' in self.history.history:
            axes[1].plot(self.history.history['mae'], label='Training MAE')
            if 'val_mae' in self.history.history:
                axes[1].plot(self.history.history['val_mae'], label='Validation MAE')
            axes[1].set_title('Model MAE')
            axes[1].set_xlabel('Epoch')
            axes[1].set_ylabel('MAE')
            axes[1].legend()
            axes[1].grid(True, alpha=0.3)
        else:
            axes[1].text(0.5, 0.5, 'MAE not available', ha='center', va='center', 
                        transform=axes[1].transAxes)
            axes[1].set_title('MAE (Not Available)')
        
        plt.tight_layout()
        return fig
    
    def save_model(self, filepath):
        """
        Save the trained autoencoder
        
        Parameters:
        -----------
        filepath : str
            Path to save the model
        """
        if not self.is_fitted:
            raise ValueError("Model not trained. Call train first.")
        
        self.autoencoder.save(filepath)
        print(f"💾 Model saved to {filepath}")
    
    def load_model(self, filepath):
        """
        Load a pre-trained autoencoder
        
        Parameters:
        -----------
        filepath : str
            Path to the saved model
        """
        self.autoencoder = tf.keras.models.load_model(filepath)
        self.is_fitted = True
        print(f"📂 Model loaded from {filepath}")

# Practical Examples and Demonstrations
def demonstrate_autoencoder_pipeline():
    """
    Comprehensive demonstration of autoencoder pipeline
    """
    print("🚀 Starting comprehensive autoencoder demonstration...")
    print("="*60)
    
    # Create datasets
    datasets = create_autoencoder_datasets()
    
    for name, (X, y) in datasets.items():
        print(f"\\n{'='*60}")
        print(f"📊 ANALYZING DATASET: {name.upper()}")
        print(f"{'='*60}")
        print(f"Data shape: {X.shape}")
        print(f"Data type: {type(y).__name__}")
        
        # Initialize pipeline
        pipeline = AutoencoderPipeline(random_state=42)
        
        # Preprocess data
        X_processed, X_noisy = pipeline.preprocess_data(X, scaling_method='standard')
        
        # Split data
        X_train, X_test = train_test_split(X_processed, test_size=0.2, random_state=42)
        
        # Choose encoding dimension (rule of thumb: 1/4 to 1/2 of input)
        encoding_dim = max(2, X.shape[1] // 4)
        
        # Build autoencoder
        print(f"\\n🏗️ Building autoencoder with encoding dim: {encoding_dim}")
        pipeline.build_autoencoder(
            input_dim=X.shape[1],
            encoding_dim=encoding_dim,
            architecture='vanilla',
            hidden_layers=[X.shape[1]//2]
        )
        
        # Train autoencoder
        print("\\n🚀 Training autoencoder...")
        history = pipeline.train(
            X_train, 
            X_val=X_test,
            epochs=50,
            batch_size=32,
            early_stopping=True,
            patience=10,
            verbose=0
        )
        
        # Evaluate performance
        print("\\n📊 Evaluating performance...")
        metrics = pipeline.evaluate_performance(X_test, X_test)
        
        print("\\nPerformance Metrics:")
        print("-" * 30)
        for metric_name, value in metrics.items():
            if isinstance(value, (int, float)):
                print(f"{metric_name.replace('_', ' ').title():<25}: {value:.4f}")
        
        # Plot results
        print("\\n📈 Generating visualizations...")
        
        # Training history
        pipeline.plot_training_history()
        plt.suptitle(f'Training History: {name}')
        plt.show()
        
        # Reconstruction results
        labels_for_plot = y if hasattr(y, '__len__') and len(y) == len(X_test) else None
        pipeline.plot_results(X_test, labels=labels_for_plot, X_original=X_test)
        plt.suptitle(f'Autoencoder Results: {name}')
        plt.show()
        
        print(f"\\n✅ Analysis of {name} dataset complete!")
    
    print("\\n🎉 All autoencoder demonstrations completed successfully!")
    
    return pipeline

# Quick usage example
def quick_autoencoder_example():
    """Quick example of using the autoencoder pipeline"""
    print("📊 Quick Autoencoder Example")
    print("="*40)
    
    # Generate sample data
    X, y = make_blobs(n_samples=1000, centers=3, n_features=20, random_state=42)
    
    # Initialize and run pipeline
    pipeline = AutoencoderPipeline()
    
    # Preprocess
    X_processed, _ = pipeline.preprocess_data(X)
    
    # Split data
    X_train, X_test = train_test_split(X_processed, test_size=0.2, random_state=42)
    
    # Build and train
    pipeline.build_autoencoder(input_dim=20, encoding_dim=5, hidden_layers=[15, 10])
    pipeline.train(X_train, X_test, epochs=30, verbose=0)
    
    # Evaluate
    metrics = pipeline.evaluate_performance(X_test)
    
    print(f"\\n✅ Quick example complete!")
    print(f"Reconstruction MSE: {metrics['mse']:.4f}")
    print(f"Compression ratio: {metrics['compression_ratio']:.1f}x")
    
    return pipeline