# Import Libraries

In [2]:
import os
import torch
import copy
import numpy as np
import pandas as pd
from torchmetrics.image import StructuralSimilarityIndexMeasure as SSIM
from torchvision import transforms
import matplotlib.pyplot as plt

# Import our custom modules
from model_utils import (
    update_dropout_rates,
    extract_dropout_rates,
    extract_feature_maps,
    replace_dropout_layers,
    add_dropout_layers,
    calc_CoV,
    CatchFeatureMap
)

from evaluation_metrics import (
    UncertaintyMetrics,
    PredictivePowerMetrics
)

from baseline_dropouts import ConstantDropout, ScheduledDropout
from adaptive_dropout import AdaptiveInformationDropout, OptimizerConfig

# Define Simple CNN

In [3]:
class SimpleCNN(torch.nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.layers = torch.nn.Sequential(
            torch.nn.Conv2d(1, 16, kernel_size=3, padding=1),
            torch.nn.ReLU(),
            torch.nn.Conv2d(16, 32, kernel_size=3, padding=1),
            torch.nn.ReLU(),
            torch.nn.Conv2d(32, 1, kernel_size=3, padding=1)
        )
        
    def forward(self, x):
        return self.layers(x)

def train(model, x_train, y_train, epochs=1000, verbose=0):
    criterion = torch.nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        outputs = model(x_train)
        loss = criterion(outputs, y_train)
        loss.backward()
        optimizer.step()

        if verbose and (epoch+1) % 200 == 0:
            print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')
            
    print('-------------------------------------')
    print('Training Complete')
    print('-------------------------------------')
    return model

# Define Data Generation Functions

In [4]:
def generate_synthetic_data(N=100, size=28):
    """Generate synthetic image data with simple patterns"""
    x_train = []
    x_test = []
    
    for i in range(N):
        # Create random patterns
        img = np.zeros((size, size))
        # Add random circles
        for _ in range(3):
            center = np.random.randint(5, size-5, 2)
            radius = np.random.randint(2, 5)
            y, x = np.ogrid[-center[0]:size-center[0], -center[1]:size-center[1]]
            mask = x*x + y*y <= radius*radius
            img[mask] = 1
            
        if i < N//2:
            x_train.append(img)
        else:
            x_test.append(img)
            
    x_train = np.array(x_train)
    x_test = np.array(x_test)
    return x_train, x_test

def add_gaussian_noise(images, noise_level=0.1):
    """Add Gaussian noise to images"""
    noise = np.random.normal(0, noise_level, images.shape)
    noisy_images = np.clip(images + noise, 0, 1)
    return noisy_images

# Define SSIM-based Information Loss Function

In [5]:
def calc_information_loss(pre_dropout, post_dropout, properties={}):
    """
    Calculate information loss using SSIM.
    SSIM ranges from -1 to 1, where 1 means identical images.
    We convert it to a loss ranging from 0 to 1.
    """
    ssim_module = SSIM(data_range=1.0)
    
    # Ensure inputs are properly scaled to [0, 1]
    pre_scaled = (pre_dropout - pre_dropout.min()) / (pre_dropout.max() - pre_dropout.min() + 1e-8)
    post_scaled = (post_dropout - post_dropout.min()) / (post_dropout.max() - post_dropout.min() + 1e-8)
    
    # Calculate SSIM
    ssim_score = ssim_module(pre_scaled, post_scaled)
    
    # Convert SSIM to loss (1 - SSIM) to get range [0, 1] (assuming non-negative SSIM values due to dropout)
    information_loss = (1 - ssim_score)
    
    return information_loss

# Experiment Configuration

In [6]:
# Reproducibility
SEED = 123

# Data Generation
noise_level = 0.50
N_samples = 100
image_size = 28
epochs = 1000

# Dropout Configurations
constant_dropout_rate = 0.10
dropout_layers_placement = ['layers.0', 'layers.2'] 

# Rate-In Configuration
information_loss_threshold = constant_dropout_rate
optimizer_params = OptimizerConfig(
    max_iterations=100,
    learning_rate=0.10,
    decay_rate=0.9,
    stopping_error=0.01
)
verbose_rate_in = 0

# Monte-Carlo Simulations
MC_iters = 100

# Evaluation metric
mse_func = PredictivePowerMetrics.mse

# Model Training and Evaluation

In [8]:
np.random.seed(SEED)
torch.manual_seed(SEED)

# Generate data
x_train, x_test = generate_synthetic_data(N_samples, image_size)
x_train_noisy = add_gaussian_noise(x_train, noise_level)
x_test = x_test[:1]

# Convert to tensors and add channel dimension
x_train_tensor = torch.from_numpy(x_train_noisy).float().unsqueeze(1)
x_test_tensor = torch.from_numpy(x_test).float().unsqueeze(1)
y_train_tensor = torch.from_numpy(x_train).float().unsqueeze(1)
y_test_tensor = torch.from_numpy(x_test).float().unsqueeze(1)

# Train base model
model_full = SimpleCNN()
print(model_full)
model = train(model=model_full, x_train=x_train_tensor, y_train=y_train_tensor, 
             epochs=epochs, verbose=1)

y_pred = model(x_test_tensor)
print(f"Base model MSE: {mse_func(y_pred, y_test_tensor)}")

SimpleCNN(
  (layers): Sequential(
    (0): Conv2d(1, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU()
    (4): Conv2d(32, 1, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  )
)
Epoch [200/1000], Loss: 0.0100
Epoch [400/1000], Loss: 0.0079
Epoch [600/1000], Loss: 0.0074
Epoch [800/1000], Loss: 0.0065
Epoch [1000/1000], Loss: 0.0062
-------------------------------------
Training Complete
-------------------------------------
Base model MSE: 0.009321050718426704


# Inference Time MC-Dropout

##### Add constant dropout layers

In [9]:
model_constant = add_dropout_layers(
    model=copy.deepcopy(model_full),
    dropoutLayer=torch.nn.Dropout(p=constant_dropout_rate),
    placement_layers=dropout_layers_placement
)

y_pred = model_constant(x_test_tensor)
print(f"Constant dropout MSE: {mse_func(y_pred, y_test_tensor)}")

Constant dropout MSE: 0.04690108820796013


##### Add adaptive Rate-In dropout layers

In [10]:
# Capture feature maps
model_clone = replace_dropout_layers(model_constant, layer_type='catcher')
_ = model_clone(x_test_tensor)

full_model_feature_maps = extract_feature_maps(model_clone)
full_model_feature_maps = {v[0]:v[1] for v in full_model_feature_maps}

if not full_model_feature_maps:
    raise ValueError("No feature maps were captured! Check the model structure.")

print("Captured feature maps:", list(full_model_feature_maps.keys()))

# Setup layer properties for Rate-In
layer_properties = {}
for l_name, l_map in full_model_feature_maps.items():
    layer_properties[l_name] = {
        'calc_information_loss': calc_information_loss,
        'initial_p': constant_dropout_rate,
        'information_loss_threshold': information_loss_threshold,
        'optimizer_config': optimizer_params,
        'name': l_name,
        'verbose': 2,
        'properties': {}  # No additional properties needed for SSIM
    }

# Replace with adaptive layers
model_adaptive = replace_dropout_layers(
    model_constant,
    layer_type='adaptive',
    layer_properties=layer_properties
)

# Get Rate-In dropout rates and predictions
model_adaptive.train()
y_pred = model_adaptive(x_test_tensor)

# Extract and display rates
rates = {name: rate.item() for name, rate in extract_dropout_rates(model_adaptive)}

print('Rate-In Model MSE:', mse_func(y_pred, y_test_tensor))
print('\nRate-In Dropout Rates:', rates)

model_rate_in = update_dropout_rates(model = copy.deepcopy(model_constant), rates = rates)

Captured feature maps: ['layers.0.1', 'layers.2.1']
layers.0.1: Current Dropout Rate: 10.0% | Loss: 0.112
layers.0.1: Current Dropout Rate: 9.9% | Loss: 0.110
layers.0.1: Final Dropout Rate: 9.8% | Loss: 0.110

layers.2.1: Current Dropout Rate: 10.0% | Loss: 0.359
layers.2.1: Current Dropout Rate: 8.2% | Loss: 0.308
layers.2.1: Current Dropout Rate: 6.4% | Loss: 0.259
layers.2.1: Current Dropout Rate: 5.1% | Loss: 0.216
layers.2.1: Current Dropout Rate: 4.0% | Loss: 0.188
layers.2.1: Current Dropout Rate: 3.2% | Loss: 0.154
layers.2.1: Current Dropout Rate: 2.7% | Loss: 0.133
layers.2.1: Current Dropout Rate: 2.4% | Loss: 0.121
layers.2.1: Current Dropout Rate: 2.2% | Loss: 0.114
layers.2.1: Current Dropout Rate: 2.0% | Loss: 0.105
layers.2.1: Final Dropout Rate: 2.0% | Loss: 0.105

Rate-In Model MSE: 0.025458302348852158

Rate-In Dropout Rates: {'layers.0.1': 0.09798453568574325, 'layers.2.1': 0.019798150963449442}
Updated dropout rate of layer 'layers.0.1' to 0.09798453568574325
Upda

### Add alternative baseline dropout approaches

##### Add Activation-based dropout


In [11]:
CoVs = [calc_CoV(l_map) for l_map in full_model_feature_maps.values()]
rates_activation = dict(zip(full_model_feature_maps.keys(), constant_dropout_rate * (np.array(CoVs)/np.max(CoVs))))
model_activation = update_dropout_rates(model = copy.deepcopy(model_constant), rates = rates_activation)

model_activation

Updated dropout rate of layer 'layers.0.1' to 0.10000000149011612
Updated dropout rate of layer 'layers.2.1' to 0.04093663766980171


SimpleCNN(
  (layers): Sequential(
    (0): Sequential(
      (0): Conv2d(1, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): Dropout(p=0.10000000149011612, inplace=False)
    )
    (1): ReLU()
    (2): Sequential(
      (0): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): Dropout(p=0.04093663766980171, inplace=False)
    )
    (3): ReLU()
    (4): Conv2d(32, 1, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  )
)

##### Add Scheduled dropout

In [12]:
model_scheduled = add_dropout_layers(
    model_full,
    ScheduledDropout(p=constant_dropout_rate, reps = MC_iters),
    dropout_layers_placement
)

model_scheduled

SimpleCNN(
  (layers): Sequential(
    (0): Sequential(
      (0): Conv2d(1, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): ScheduledDropout(p=0.10000000149011612, reps=100, iter=0, name="layers.0.dropout")
    )
    (1): ReLU()
    (2): Sequential(
      (0): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): ScheduledDropout(p=0.10000000149011612, reps=100, iter=0, name="layers.2.dropout")
    )
    (3): ReLU()
    (4): Conv2d(32, 1, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  )
)

### Monte-Carlo Evaluation

In [13]:
def mc_dropout_inference(model, x, T=100):
    predictions = []
    with torch.no_grad():
        for _ in range(T):
            preds = model(x)
            predictions.append(preds.numpy())

    predictions = np.array(predictions)
    pred_mean = predictions.mean(axis=0)
    pred_std = predictions.std(axis=0)

    return torch.as_tensor(pred_mean), torch.as_tensor(pred_std)

# Run MC dropout for all models
PREDs = {}
STDs = {}
PREDs['Constant'], STDs['Constant'] = mc_dropout_inference(model_constant, x=x_test_tensor, T=MC_iters)
PREDs['Rate-In'], STDs['Rate-In'] = mc_dropout_inference(model_rate_in, x=x_test_tensor, T=MC_iters)
PREDs['Activation'], STDs['Activation'] = mc_dropout_inference(model_activation, x=x_test_tensor, T=MC_iters)
PREDs['Scheduled'], STDs['Scheduled'] = mc_dropout_inference(model_scheduled, x=x_test_tensor, T=MC_iters)

### Evaluate Prediction and Uncertainty Estimation 

In [14]:
for approach in PREDs.keys():
    mse_score = PredictivePowerMetrics.mse(PREDs[approach].squeeze(0,1).detach().cpu().numpy(), y_test_tensor.squeeze(0,1).detach().cpu().numpy())
    buc_score = UncertaintyMetrics.boundary_uncertainty_consistency(STDs[approach].squeeze(0,1).detach().cpu().numpy(), y_test_tensor.squeeze(0,1).detach().cpu().numpy())
    print('BUC (Higher Better): {}. MSE (Lower Better): {} | {}'.format(np.round(buc_score,2), np.round(mse_score,2), approach))

BUC (Higher Better): 0.22. MSE (Lower Better): 0.01 | Constant
BUC (Higher Better): 0.3. MSE (Lower Better): 0.01 | Rate-In
BUC (Higher Better): 0.25. MSE (Lower Better): 0.01 | Activation
BUC (Higher Better): 0.24. MSE (Lower Better): 0.01 | Scheduled
