In [1]:
# %%
# ============================================================================
# ITERATIVE LAYER-SENSITIVE COMPRESSION STRATEGY
# Implementation of the multi-stage compression approach
# ============================================================================


import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import seaborn as sns
from collections import OrderedDict
import copy
import time
from scipy import stats
import pandas as pd
import warnings
import pandas as pd
from collections import OrderedDict

warnings.filterwarnings('ignore')

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [2]:
# %%
class LayerAnalysisCNN(nn.Module):
    """
    CNN architecture for layer-wise compression analysis.
    This model is designed with multiple convolutional and fully connected layers
    to allow for individual layer compression.
    """
    def __init__(self, num_classes=10):
        super(LayerAnalysisCNN, self).__init__()

        # Convolutional layers (keep these unchanged)
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(128, 256, kernel_size=3, padding=1)
        self.conv4 = nn.Conv2d(256, 512, kernel_size=3, padding=1)

        self.pool = nn.MaxPool2d(2, 2)

        # Only keep fc3, connecting directly from conv4 output
        self.fc3 = nn.Linear(512 * 2 * 2, num_classes)  # Direct connection to output

        self.dropout = nn.Dropout(0.5)

        # Remove fc1 and fc2 from layer_info
        self.layer_info = OrderedDict([
            ('conv1', {'layer': self.conv1, 'type': 'conv', 'position': 'early'}),
            ('conv2', {'layer': self.conv2, 'type': 'conv', 'position': 'early'}),
            ('conv3', {'layer': self.conv3, 'type': 'conv', 'position': 'middle'}),
            ('conv4', {'layer': self.conv4, 'type': 'conv', 'position': 'middle'}),
            ('fc3', {'layer': self.fc3, 'type': 'fc', 'position': 'final'})
        ])

    def forward(self, x):
        # Apply convolutional layers (unchanged)
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        x = self.pool(F.relu(self.conv4(x)))
    
        # Flatten for fully connected layer
        x = x.view(x.size(0), -1)
    
        # Direct connection to output layer
        x = self.dropout(x)  # Optional: apply dropout before final layer
        x = self.fc3(x)
    
        return x

In [3]:
# %%
def algorithm_11_randomized_svd_power_iteration(A, k, q=2):
    m, n = A.shape
    Omega = np.random.randn(n, 2 * k)
    Y = A @ Omega
    for _ in range(q):
        Y = A @ (A.T @ Y)
    Q, R = np.linalg.qr(Y)
    B = Q.T @ A
    U_tilde, Sigma, Vt = np.linalg.svd(B, full_matrices=False)
    U = Q @ U_tilde
    rank = min(k, len(Sigma))
    U = U[:, :rank]
    Sigma = Sigma[:rank]
    Vt = Vt[:rank, :]

    return U, Sigma, Vt

In [4]:
# %%
def conv_to_matrix(weight, input_shape, stride=1, padding=0):
    """
    Converts a convolutional layer's weight tensor to its equivalent 2D matrix form
    (also known as Toeplitz matrix or Im2Col matrix for convolution).
    This is necessary for applying SVD, which operates on 2D matrices.

    Args:
        weight (torch.Tensor): The weight tensor of the convolutional layer
                                (out_channels, in_channels, kh, kw).
        input_shape (tuple): A dummy input shape (batch_size, channels, height, width)
                             to correctly interpret the convolutional operation.
        stride (int): Stride of the convolution.
        padding (int): Padding of the convolution.

    Returns:
        np.ndarray: The reshaped weight matrix in NumPy format.
    """
    # out_channels: number of output feature maps
    # in_channels: number of input feature maps
    # kh, kw: kernel height and width
    out_channels, in_channels, kh, kw = weight.shape

    # Reshape the weight tensor into a 2D matrix:
    # (out_channels) x (in_channels * kh * kw)
    # Each row corresponds to a single output filter.
    # Each column corresponds to a flattened kernel element across all input channels.
    weight_matrix = weight.view(out_channels, -1)

    # Convert to NumPy array for SVD computation
    return weight_matrix.cpu().numpy()

def linear_to_matrix(weight):
    """
    Converts a linear layer's weight tensor to its 2D matrix form.
    For a linear layer, the weight is already in a 2D matrix form (out_features x in_features).

    Args:
        weight (torch.Tensor): The weight tensor of the linear layer
                               (out_features, in_features).

    Returns:
        np.ndarray: The weight matrix in NumPy format.
    """
    # Convert to NumPy array for SVD computation
    return weight.cpu().numpy()

In [5]:
def calculate_total_parameters(model):
    """
    Calculates the total number of trainable parameters in a PyTorch model.

    Args:
        model (nn.Module): The PyTorch model.

    Returns:
        int: Total number of parameters.
    """
    # Sum the number of elements (numel) for all parameters in the model.
    # This includes weights and biases for all layers.
    return sum(p.numel() for p in model.parameters())

def evaluate_model(model, dataloader):
    """
    Evaluates the accuracy of a given PyTorch model on a dataset.

    Args:
        model (nn.Module): The PyTorch model to be evaluated.
        dataloader (DataLoader): DataLoader for the dataset to evaluate on.

    Returns:
        float: The accuracy of the model in percentage.
    """
    model.eval() # Set the model to evaluation mode (disables dropout, batch normalization updates, etc.)
    correct, total = 0, 0 # Initialize counters for correct predictions and total samples

    with torch.no_grad(): # Disable gradient computation for evaluation (saves memory and speeds up)
        for inputs, targets in dataloader:
            inputs, targets = inputs.to(device), targets.to(device) # Move data to device
            outputs = model(inputs) # Perform forward pass
            _, predicted = outputs.max(1) # Get the index of the max log-probability (the predicted class)
            correct += predicted.eq(targets).sum().item() # Count correct predictions
            total += targets.size(0) # Accumulate total samples

    return 100. * correct / total # Calculate and return accuracy as a percentage

In [6]:
def train_model(model, trainloader, valloader, epochs=15, lr=0.001, patience=5):
    """
    FIXED: Only use validation set for early stopping, never test set during training
    """
    model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    best_val_acc = 0
    patience_counter = 0
    best_model_state = None

    for epoch in range(epochs):
        # Training phase
        model.train()
        for inputs, targets in trainloader:
            inputs, targets = inputs.to(device), targets.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

        # Validation phase - ONLY use validation set
        val_acc = evaluate_model(model, valloader)
        
        # Early stopping check
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            patience_counter = 0
            best_model_state = copy.deepcopy(model.state_dict())
        else:
            patience_counter += 1

        # Print progress - REMOVED test accuracy during training
        if epoch % 3 == 0:
            print(f"Epoch {epoch:2d}: Val Acc={val_acc:.2f}%")

        # Early stopping
        if patience_counter >= patience:
            print(f"Early stopping at epoch {epoch}! No improvement for {patience} epochs.")
            break

    # Load best model state
    if best_model_state is not None:
        model.load_state_dict(best_model_state)
    
    return model, best_val_acc

In [7]:
def fine_tune_compressed_model(model, trainloader, valloader, epochs=5, lr=0.0001, patience=3):
    """
    FIXED: Removed testloader parameter - only use validation for early stopping
    """
    model.train()
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    
    best_val_acc = 0
    patience_counter = 0
    best_model_state = None

    for epoch in range(epochs):
        # Training phase
        for inputs, targets in trainloader:
            inputs, targets = inputs.to(device), targets.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
        
        # Validation phase - ONLY use validation set
        val_acc = evaluate_model(model, valloader)
        
        # Early stopping check
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            patience_counter = 0
            best_model_state = copy.deepcopy(model.state_dict())
        else:
            patience_counter += 1
            
        # Early stopping
        if patience_counter >= patience:
            break

    # Load best model state
    if best_model_state is not None:
        model.load_state_dict(best_model_state)

    return model  # Return model instead of test accuracy

In [8]:
# %%
class CompressedConv2D(nn.Module):
    """
    Compressed 2D Convolutional layer using low-rank decomposition (SVD).
    This class replaces a standard Conv2d layer with two sequential convolutional layers
    that approximate the original layer's operation using a lower rank.
    """
    def __init__(self, U, S, Vt, original_shape, stride=1, padding=0, bias=None):
        super(CompressedConv2D, self).__init__()

        # original_shape: (out_channels, in_channels, kh, kw)
        out_channels, in_channels, kh, kw = original_shape
        rank = len(S) # The rank determined by SVD

        self.original_shape = original_shape
        self.stride = stride
        self.padding = padding
        self.rank = rank

        # The low-rank approximation is achieved by two sequential conv layers:
        # 1. conv1: Projects the input channels to 'rank' dimensions.
        #    It takes 'in_channels' and produces 'rank' feature maps.
        #    The kernel size is the original kernel size (kh, kw).
        #    Bias is set to False as the original bias is applied in conv2.
        self.conv1 = nn.Conv2d(in_channels, rank, kernel_size=(kh, kw),
                              stride=1, padding=padding, bias=False)

        # 2. conv2: Projects from 'rank' dimensions to 'out_channels'.
        #    It takes 'rank' feature maps and produces 'out_channels'.
        #    A 1x1 kernel is used for this recombination.
        #    The original stride is applied here.
        self.conv2 = nn.Conv2d(rank, out_channels, kernel_size=1,
                              stride=stride, padding=0, bias=bias is not None)

        # Initialize the weights of these two new layers using the SVD components
        self._initialize_from_svd(U, S, Vt)

        # If the original layer had a bias, apply it to the second convolutional layer
        if bias is not None:
            self.conv2.bias.data = bias

    def _initialize_from_svd(self, U, S, Vt):
        """
        Initializes the weights of conv1 and conv2 using the SVD components (U, S, Vt).
        """
        rank = len(S)
        out_channels, in_channels, kh, kw = self.original_shape

        # The Vt matrix (rank x (in_channels * kh * kw)) is reshaped and assigned to conv1's weight.
        # PyTorch Conv2d weights are (out_channels, in_channels, kh, kw).
        # For conv1, out_channels is 'rank', in_channels is 'in_channels'.
        V_reshaped = Vt.reshape(rank, in_channels, kh, kw)
        self.conv1.weight.data = torch.from_numpy(V_reshaped).float()

        # The U matrix (out_channels x rank) and singular values S (rank,) are combined
        # (U @ diag(S)) and assigned to conv2's weight.
        # PyTorch Conv2d weights for a 1x1 kernel are (out_channels, in_channels, 1, 1).
        # For conv2, out_channels is 'out_channels', in_channels is 'rank'.
        US = U @ np.diag(S)
        self.conv2.weight.data = torch.from_numpy(US).float().unsqueeze(-1).unsqueeze(-1)

    def forward(self, x):
        """
        Defines the forward pass for the compressed convolutional layer.
        """
        x = self.conv1(x)
        x = self.conv2(x)
        return x

# %%
class CompressedLinear(nn.Module):
    """
    Compressed Linear layer using low-rank decomposition (SVD).
    This class replaces a standard Linear layer with two sequential linear layers
    that approximate the original layer's operation using a lower rank.
    """
    def __init__(self, U, S, Vt, original_shape, bias=None):
        super(CompressedLinear, self).__init__()

        # original_shape: (out_features, in_features)
        out_features, in_features = original_shape
        rank = len(S) # The rank determined by SVD

        self.original_shape = original_shape
        self.rank = rank

        # The low-rank approximation is achieved by two sequential linear layers:
        # 1. linear1: Projects the input features to 'rank' dimensions.
        #    It takes 'in_features' and produces 'rank' features.
        #    Bias is set to False as the original bias is applied in linear2.
        self.linear1 = nn.Linear(in_features, rank, bias=False)

        # 2. linear2: Projects from 'rank' dimensions to 'out_features'.
        #    It takes 'rank' features and produces 'out_features'.
        self.linear2 = nn.Linear(rank, out_features, bias=bias is not None)

        # Initialize the weights of these two new layers using the SVD components
        self._initialize_from_svd(U, S, Vt)

        # If the original layer had a bias, apply it to the second linear layer
        if bias is not None:
            self.linear2.bias.data = bias

    def _initialize_from_svd(self, U, S, Vt):
        """
        Initializes the weights of linear1 and linear2 using the SVD components (U, S, Vt).
        """
        # The Vt matrix (rank x in_features) is assigned to linear1's weight.
        # PyTorch Linear weights are (out_features, in_features).
        # For linear1, out_features is 'rank', in_features is 'in_features'.
        self.linear1.weight.data = torch.from_numpy(Vt).float()

        # The U matrix (out_features x rank) and singular values S (rank,) are combined
        # (U @ diag(S)) and assigned to linear2's weight.
        # PyTorch Linear weights are (out_features, in_features).
        # For linear2, out_features is 'out_features', in_features is 'rank'.
        US = U @ np.diag(S)
        self.linear2.weight.data = torch.from_numpy(US).float()

    def forward(self, x):
        """
        Defines the forward pass for the compressed linear layer.
        """
        x = self.linear1(x)
        x = self.linear2(x)
        return x

In [9]:
def compress_single_layer(model, layer_name, energy_retention, spectrum_analysis):
    """
    Compresses a single layer to specified energy retention level.
    
    Args:
        model: The model to compress
        layer_name: Name of the layer to compress
        energy_retention: Target energy retention (0-1)
        spectrum_analysis: Pre-computed singular value analysis
        
    Returns:
        The model with the specified layer compressed
    """
    device = next(model.parameters()).device
    layer = getattr(model, layer_name)
    weight = layer.weight.data
    original_shape = weight.shape
    
    # Convert weight to matrix form
    if isinstance(layer, nn.Conv2d):
        W_matrix = conv_to_matrix(weight, (1, weight.shape[1], 32, 32))
        stride = layer.stride[0] if isinstance(layer.stride, tuple) else layer.stride
        padding = layer.padding[0] if isinstance(layer.padding, tuple) else layer.padding
    elif isinstance(layer, nn.Linear):
        W_matrix = linear_to_matrix(weight)
        stride, padding = None, None
    else:
        return model
    
    # Determine rank based on energy retention
    if layer_name in spectrum_analysis:
        energy_cumsum = spectrum_analysis[layer_name]['energy_cumsum']
        rank = max(1, np.argmax(energy_cumsum >= energy_retention) + 1)
        rank = min(rank, len(energy_cumsum))
    else:
        # Fallback: compute SVD on the fly
        _, S_full, _ = np.linalg.svd(W_matrix, full_matrices=False)
        energy_cumsum = np.cumsum(S_full**2) / np.sum(S_full**2)
        rank = max(1, np.argmax(energy_cumsum >= energy_retention) + 1)
        rank = min(rank, len(S_full))
    
    # Apply SVD compression
    U, S, Vt = algorithm_11_randomized_svd_power_iteration(W_matrix, rank, q=2)
    bias = layer.bias.data if layer.bias is not None else None
    
    # Create compressed layer
    if isinstance(layer, nn.Conv2d):
        compressed_layer = CompressedConv2D(U, S, Vt, original_shape, stride, padding, bias)
    else:
        compressed_layer = CompressedLinear(U, S, Vt, original_shape, bias)
    
    compressed_layer = compressed_layer.to(device)
    setattr(model, layer_name, compressed_layer)
    
    return model

In [10]:
def load_cifar10_only(batch_size=128, validation_split=0.1):
    """
    Loads and preprocesses the CIFAR-10 dataset with train/validation/test split.

    Args:
        batch_size (int): The batch size for the DataLoaders.
        validation_split (float): Fraction of training data to use for validation.

    Returns:
        dict: A dictionary containing DataLoaders for CIFAR-10 training, validation, and testing.
    """
    from torch.utils.data import random_split
    
    # Define transformations (same as before)
    transform_train = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
    ])

    transform_test = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
    ])

    # Load full training dataset
    cifar10_train_full = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
    cifar10_test = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)

    # Split training data into train and validation
    train_size = int((1 - validation_split) * len(cifar10_train_full))
    val_size = len(cifar10_train_full) - train_size
    cifar10_train, cifar10_val = random_split(cifar10_train_full, [train_size, val_size])

    # Create DataLoaders
    datasets = {
        'CIFAR-10': (
            DataLoader(cifar10_train, batch_size=batch_size, shuffle=True, num_workers=2),
            DataLoader(cifar10_val, batch_size=batch_size, shuffle=False, num_workers=2),
            DataLoader(cifar10_test, batch_size=batch_size, shuffle=False, num_workers=2)
        )
    }

    return datasets

In [11]:
def analyze_layer_sensitivity(model, spectrum_analysis, trainloader, valloader):
    """
    FIXED: Use validation set instead of test set for sensitivity analysis
    """
    print("Analyzing layer sensitivity to compression...")
    
    sensitivity_results = {}
    test_retention_levels = [0.99, 0.95, 0.90, 0.85, 0.80, 0.75, 0.70, 0.65, 0.60]
    
    for layer_name in model.layer_info.keys():
        print(f"\nTesting {layer_name}:")
        layer_results = []
        
        for retention in test_retention_levels:
            # Test this layer at this retention level
            test_model = copy.deepcopy(model)
            test_model = compress_single_layer(test_model, layer_name, retention, spectrum_analysis)
            
            # Use VALIDATION set for evaluation during model selection
            accuracy = evaluate_model(test_model, valloader)
            
            # Calculate compression ratio for this layer
            original_layer = getattr(model, layer_name)
            compressed_layer = getattr(test_model, layer_name)
            original_params = original_layer.weight.numel()
            compressed_params = calculate_total_parameters(compressed_layer)
            compression_ratio = original_params / compressed_params
            
            layer_results.append({
                'retention': retention,
                'accuracy': accuracy,
                'compression_ratio': compression_ratio
            })
            
            print(f"  {retention:.2f}: {accuracy:.2f}% ({compression_ratio:.2f}x)")
        
        sensitivity_results[layer_name] = layer_results
    
    return sensitivity_results

In [12]:
def find_optimal_layer_compression(model, layer_name, spectrum_analysis, trainloader, valloader,
                                 min_accuracy, min_retention=0.30, max_retention=0.99, tolerance=0.02):
    """
    FIXED: Use validation set for compression optimization, not test set
    """
    print(f"  Finding optimal compression for {layer_name}...")
    
    # First, do a coarse search to find the general range
    coarse_retentions = [0.99, 0.95, 0.90, 0.85, 0.80, 0.75, 0.70, 0.65, 0.60, 0.55, 0.50, 0.45, 0.40, 0.35, 0.30]
    feasible_retentions = []
    
    for retention in coarse_retentions:
        test_model = copy.deepcopy(model)
        test_model = compress_single_layer(test_model, layer_name, retention, spectrum_analysis)
        
        # Fine-tune using only train/validation
        fine_tune_compressed_model(test_model, trainloader, valloader, epochs=3, lr=0.0001, patience=2)
        
        # Evaluate on VALIDATION set
        final_accuracy = evaluate_model(test_model, valloader)
        
        # Calculate compression ratio
        original_layer = getattr(model, layer_name)
        compressed_layer = getattr(test_model, layer_name)
        original_params = original_layer.weight.numel()
        compressed_params = calculate_total_parameters(compressed_layer)
        compression_ratio = original_params / compressed_params
        
        print(f"    Coarse search - Retention {retention:.2f}: Accuracy {final_accuracy:.2f}%, Ratio {compression_ratio:.2f}x")
        
        if final_accuracy >= min_accuracy:
            feasible_retentions.append((retention, final_accuracy, compression_ratio))
    
    if not feasible_retentions:
        print(f"    No feasible compression found for {layer_name}, using 0.99 retention")
        return 0.99, evaluate_model(model, valloader), 1.0
    
    # Find the most aggressive compression that still meets accuracy requirement
    best_retention = min(feasible_retentions, key=lambda x: x[0])  # Minimum retention (most aggressive)
    
    return best_retention[0], best_retention[1], best_retention[2]

In [13]:
def adaptive_iterative_compression(model, spectrum_analysis, trainloader, valloader, 
                                 max_accuracy_drop=1.0):
    """
    FIXED: Use validation set for all compression decisions
    MODIFIED: Apply fixed 95% energy retention for conv1 and 90% for fc3 at the beginning
    """
    print(f"\n{'='*60}")
    print(f"ADAPTIVE COMPRESSION - Max Accuracy Drop: {max_accuracy_drop:.1f}%")
    print(f"{'='*60}")
    
    # Get original VALIDATION accuracy (not test!)
    original_accuracy = evaluate_model(model, valloader)
    min_accuracy = original_accuracy - max_accuracy_drop
    
    print(f"Original validation accuracy: {original_accuracy:.2f}%")
    print(f"Minimum acceptable validation accuracy: {min_accuracy:.2f}%")
    
    # Work on a copy of the model
    compressed_model = copy.deepcopy(model)
    
    # Define compression strategy based on accuracy drop threshold
    if max_accuracy_drop <= 1.0:
        layer_targets = {
            'conv2': (0.90, 0.98),
            'conv3': (0.85, 0.95),
            'conv4': (0.80, 0.92)
        }
        print("Using CONSERVATIVE compression strategy")
    elif max_accuracy_drop <= 3.0:
        layer_targets = {
            'conv2': (0.75, 0.90),
            'conv3': (0.70, 0.85),
            'conv4': (0.65, 0.80)
        }
        print("Using MODERATE compression strategy")
    elif max_accuracy_drop <= 6.0:
        layer_targets = {
            'conv2': (0.60, 0.80),
            'conv3': (0.55, 0.75),
            'conv4': (0.50, 0.70)
        }
        print("Using AGGRESSIVE compression strategy")
    else:
        layer_targets = {
            'conv2': (0.45, 0.70),
            'conv3': (0.40, 0.65),
            'conv4': (0.35, 0.60)
        }
        print("Using VERY AGGRESSIVE compression strategy")
    
    # Apply fixed compressions first
    print(f"\nApplying fixed compressions:")
    print(f"  conv1: 95% energy retention")
    print(f"  fc3: 90% energy retention")
    
    # Compress conv1 with 95% energy retention
    compressed_model = compress_single_layer(compressed_model, 'conv1', 0.95, spectrum_analysis)
    print(f"  conv1 compressed with 95% energy retention")
    
    # Compress fc3 with 90% energy retention  
    compressed_model = compress_single_layer(compressed_model, 'fc3', 0.90, spectrum_analysis)
    print(f"  fc3 compressed with 90% energy retention")
    
    # Fine-tune after fixed compressions
    print(f"  Fine-tuning model after fixed compressions...")
    fine_tune_compressed_model(compressed_model, trainloader, valloader, epochs=3, lr=0.0001, patience=2)
    current_accuracy = evaluate_model(compressed_model, valloader)
    print(f"  Accuracy after fixed compressions: {current_accuracy:.2f}%")
    
    # Calculate compression ratios for fixed layers
    conv1_original = getattr(model, 'conv1')
    conv1_compressed = getattr(compressed_model, 'conv1')
    conv1_ratio = conv1_original.weight.numel() / calculate_total_parameters(conv1_compressed)
    
    fc3_original = getattr(model, 'fc3')
    fc3_compressed = getattr(compressed_model, 'fc3')
    fc3_ratio = fc3_original.weight.numel() / calculate_total_parameters(fc3_compressed)
    
    # Initialize layer_results with fixed compressions
    layer_results = {
        'conv1': {'energy_retention': 0.95, 'accuracy': current_accuracy, 'compression_ratio': conv1_ratio},
        'fc3': {'energy_retention': 0.90, 'accuracy': current_accuracy, 'compression_ratio': fc3_ratio}
    }
    
    # Continue adaptive compression for remaining layers
    layer_order = ['conv4', 'conv3', 'conv2']  # Removed conv1 and fc3
    
    for layer_name in layer_order:
        if layer_name not in layer_targets:
            continue
            
        print(f"\nCompressing {layer_name}...")
        print(f"  Current model validation accuracy: {current_accuracy:.2f}%")
        
        min_ret, max_ret = layer_targets[layer_name]
        
        # Find optimal compression for this layer using VALIDATION set
        retention, accuracy, ratio = find_optimal_layer_compression(
            compressed_model, layer_name, spectrum_analysis, trainloader, valloader,
            min_accuracy, min_retention=min_ret, max_retention=max_ret
        )
        
        # Apply compression
        compressed_model = compress_single_layer(compressed_model, layer_name, retention, spectrum_analysis)
        
        # Fine-tune the entire model after compression
        print(f"  Fine-tuning model after {layer_name} compression...")
        fine_tune_compressed_model(compressed_model, trainloader, valloader, epochs=5, lr=0.0001, patience=3)
        
        # Evaluate on VALIDATION set
        final_accuracy = evaluate_model(compressed_model, valloader)
        
        layer_results[layer_name] = {
            'energy_retention': retention,
            'accuracy': final_accuracy,
            'compression_ratio': ratio
        }
        
        current_accuracy = final_accuracy
        print(f"  {layer_name}: {retention:.3f} energy, {final_accuracy:.2f}% val accuracy, {ratio:.2f}x compression")
        
        # Early stop if we're getting too close to the minimum accuracy
        if final_accuracy < min_accuracy + 0.5:
            print(f"  Approaching minimum accuracy threshold, stopping compression")
            break
    
    # Calculate overall compression ratio
    total_original_params = calculate_total_parameters(model)
    total_compressed_params = calculate_total_parameters(compressed_model)
    overall_compression_ratio = total_original_params / total_compressed_params
    
    # Final validation accuracy
    final_accuracy = evaluate_model(compressed_model, valloader)
    
    print(f"\n{'='*40}")
    print(f"FINAL RESULTS (Validation Set):")
    print(f"Original parameters: {total_original_params:,}")
    print(f"Compressed parameters: {total_compressed_params:,}")
    print(f"Overall compression ratio: {overall_compression_ratio:.2f}x")
    print(f"Final validation accuracy: {final_accuracy:.2f}%")
    print(f"Validation accuracy drop: {original_accuracy - final_accuracy:.2f}%")
    print(f"Fixed compressions applied: conv1=95%, fc3=90%")
    print(f"{'='*40}")
    
    return {
        'model': compressed_model,
        'original_accuracy': original_accuracy,
        'final_accuracy': final_accuracy,
        'accuracy_drop': original_accuracy - final_accuracy,
        'overall_compression_ratio': overall_compression_ratio,
        'total_original_params': total_original_params,
        'total_compressed_params': total_compressed_params,
        'layer_results': layer_results
    }

In [14]:
# %%
def analyze_singular_value_spectrum(model, save_plots=True):
    """
    Analyzes the singular value spectrum for each layer to understand 
    their decay characteristics and inherent low-rank nature.
    
    Args:
        model (nn.Module): The trained model to analyze
        save_plots (bool): Whether to save spectrum plots
        
    Returns:
        dict: Analysis results for each layer including decay rates and energy distributions
    """
    spectrum_analysis = {}
    
    if save_plots:
        fig, axes = plt.subplots(2, 4, figsize=(20, 10))
        axes = axes.flatten()
        plot_idx = 0
    
    for layer_name, info in model.layer_info.items():
        layer = info['layer']
        weight = layer.weight.data
        
        # Convert to matrix form for SVD
        if isinstance(layer, nn.Conv2d):
            W_matrix = conv_to_matrix(weight, (1, weight.shape[1], 32, 32))
        elif isinstance(layer, nn.Linear):
            W_matrix = linear_to_matrix(weight)
        else:
            continue
            
        # Compute full SVD
        _, S, _ = np.linalg.svd(W_matrix, full_matrices=False)
        
        # Normalize singular values
        S_normalized = S / S[0]
        
        # Calculate energy retention curve
        energy_cumsum = np.cumsum(S**2) / np.sum(S**2)
        
        # Analyze decay characteristics
        # Find rank for common energy retention levels
        ranks_for_energy = {}
        for energy_level in [0.8, 0.85, 0.9, 0.95, 0.99]:
            rank = np.argmax(energy_cumsum >= energy_level) + 1
            ranks_for_energy[energy_level] = min(rank, len(S))
        
        # Calculate decay rate (exponential fit to first 20% of singular values)
        n_fit = max(5, len(S) // 5)
        log_s = np.log(S_normalized[:n_fit] + 1e-10)
        indices = np.arange(n_fit)
        decay_rate = -np.polyfit(indices, log_s, 1)[0]
        
        spectrum_analysis[layer_name] = {
            'singular_values': S,
            'energy_cumsum': energy_cumsum,
            'ranks_for_energy': ranks_for_energy,
            'decay_rate': decay_rate,
            'effective_rank': np.sum(S > 0.01 * S[0]),  # Rank at 1% of max singular value
            'layer_type': info['type'],
            'layer_position': info['position']
        }
        
        # Plot spectrum if requested
        if save_plots and plot_idx < 8:
            ax = axes[plot_idx]
            ax.semilogy(S_normalized, 'b-', linewidth=2)
            ax.set_title(f'{layer_name} - Decay Rate: {decay_rate:.3f}')
            ax.set_xlabel('Singular Value Index')
            ax.set_ylabel('Normalized Singular Value')
            ax.grid(True, alpha=0.3)
            plot_idx += 1
    
    if save_plots:
        plt.tight_layout()
        plt.savefig('singular_value_spectra.png', dpi=300, bbox_inches='tight')
        plt.show()
    
    return spectrum_analysis

In [15]:
def run_improved_compression_experiments():
    """
    FIXED: Separate test evaluation from model selection process
    """
    print("Loading CIFAR-10 dataset with validation split...")
    datasets = load_cifar10_only(batch_size=128, validation_split=0.1)
    trainloader, valloader, testloader = datasets['CIFAR-10']
    
    print(f"Dataset sizes:")
    print(f"  Training: {len(trainloader.dataset):,} samples")
    print(f"  Validation: {len(valloader.dataset):,} samples") 
    print(f"  Test: {len(testloader.dataset):,} samples")
    
    print("Creating and training model...")
    model = LayerAnalysisCNN(num_classes=10)
    trained_model, best_val_accuracy = train_model(model, trainloader, valloader, 
                                                  epochs=30, lr=0.001, patience=5)
    
    print(f"\nModel training completed. Best validation accuracy: {best_val_accuracy:.2f}%")
    
    # ONLY evaluate on test set ONCE at the end for baseline
    baseline_test_accuracy = evaluate_model(trained_model, testloader)
    print(f"Baseline test accuracy: {baseline_test_accuracy:.2f}%")
    print(f"Total parameters: {calculate_total_parameters(trained_model):,}")
    
    print("\nAnalyzing singular value spectrum...")
    spectrum_analysis = analyze_singular_value_spectrum(trained_model, save_plots=False)
    
    # Test different accuracy drop thresholds using VALIDATION set
    accuracy_drops = [0, 1.0, 3.0, 4.0, 5.0]
    
    all_results = []
    
    for acc_drop in accuracy_drops:
        print(f"\n" + "="*80)
        print(f"TESTING ACCURACY DROP THRESHOLD: {acc_drop}%")
        print(f"="*80)
        
        try:
            # Use validation set for compression decisions
            results = adaptive_iterative_compression(
                trained_model, spectrum_analysis, trainloader, valloader, 
                max_accuracy_drop=acc_drop
            )
            
            # ONLY NOW evaluate final compressed model on test set
            final_test_accuracy = evaluate_model(results['model'], testloader)
            
            # Store results for summary
            all_results.append({
                'max_accuracy_drop': acc_drop,
                'original_val_accuracy': results['original_accuracy'],
                'final_val_accuracy': results['final_accuracy'],
                'final_test_accuracy': final_test_accuracy,  # NEW: separate test accuracy
                'val_accuracy_drop': results['accuracy_drop'],
                'test_accuracy_drop': baseline_test_accuracy - final_test_accuracy,  # NEW
                'compression_ratio': results['overall_compression_ratio'],
                'original_params': results['total_original_params'],
                'compressed_params': results['total_compressed_params']
            })
            
            print(f"Final test accuracy for this compression: {final_test_accuracy:.2f}%")
            
        except Exception as e:
            print(f"Error with accuracy drop {acc_drop}%: {str(e)}")
            continue
    
    # Create summary table
    print(f"\n" + "="*120)
    print("FIXED COMPRESSION EXPERIMENT SUMMARY (No Data Leakage)")
    print(f"="*120)
    print(f"{'Acc Drop':>8} {'Val Orig':>8} {'Val Final':>9} {'Test Final':>10} {'Val Drop':>8} {'Test Drop':>9} {'Ratio':>8} {'Orig Params':>12} {'Comp Params':>12}")
    print("-" * 120)
    
    for result in all_results:
        print(f"{result['max_accuracy_drop']:>8.1f} "
              f"{result['original_val_accuracy']:>8.2f} "
              f"{result['final_val_accuracy']:>9.2f} "
              f"{result['final_test_accuracy']:>10.2f} "
              f"{result['val_accuracy_drop']:>8.2f} "
              f"{result['test_accuracy_drop']:>9.2f} "
              f"{result['compression_ratio']:>8.2f} "
              f"{result['original_params']:>12,} "
              f"{result['compressed_params']:>12,}")
    
    print(f"\nBaseline test accuracy: {baseline_test_accuracy:.2f}%")
    
    return all_results

In [16]:
# Run the improved experiments
results = run_improved_compression_experiments()

Loading CIFAR-10 dataset with validation split...
Files already downloaded and verified
Files already downloaded and verified
Dataset sizes:
  Training: 45,000 samples
  Validation: 5,000 samples
  Test: 10,000 samples
Creating and training model...
Epoch  0: Val Acc=53.50%
Epoch  3: Val Acc=71.96%
Epoch  6: Val Acc=75.56%
Epoch  9: Val Acc=79.14%
Epoch 12: Val Acc=80.30%
Epoch 15: Val Acc=80.60%
Epoch 18: Val Acc=80.96%
Epoch 21: Val Acc=81.76%
Epoch 24: Val Acc=81.10%
Epoch 27: Val Acc=82.56%

Model training completed. Best validation accuracy: 82.56%
Baseline test accuracy: 83.70%
Total parameters: 1,571,466

Analyzing singular value spectrum...

TESTING ACCURACY DROP THRESHOLD: 0%

ADAPTIVE COMPRESSION - Max Accuracy Drop: 0.0%
Original validation accuracy: 83.08%
Minimum acceptable validation accuracy: 83.08%
Using CONSERVATIVE compression strategy

Applying fixed compressions:
  conv1: 95% energy retention
  fc3: 90% energy retention
  conv1 compressed with 95% energy retention
 