In [1]:
#Custom Data Loading Function
import os
import pandas as pd
from PIL import Image
import torch
from torchvision import transforms
import matplotlib.pyplot as plt
import numpy as np
from sklearn.preprocessing import StandardScaler
import torch.nn as nn
import pyro
import pyro.distributions as dist
from pyro.infer import SVI, Trace_ELBO
from pyro.optim import Adam
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader, Dataset
import pickle
from sklearn.decomposition import PCA
from sklearn.calibration import calibration_curve
import pyro
from pyro.infer import SVI, Trace_ELBO
from pyro.infer.autoguide import AutoDiagonalNormal,AutoLowRankMultivariateNormal
from pyro.optim import Adam

In [3]:

class CustomCelebADataset(Dataset):
    def __init__(self, root, split='train', transform=None):
        self.root = root
        self.transform = transform
        # Paths to metadata files
        attr_path = os.path.join(root, 'list_attr_celeba_small.txt')
        split_path = os.path.join(root, 'list_eval_partition_small.txt')
        img_folder = os.path.join(root, 'img_align_celeba')
 
        # Load labels
        with open(attr_path) as f:
            lines = f.readlines()
            header = lines[1].strip().split()
            data = [line.strip().split() for line in lines[2:]]
        self.attr_names = header
        df_attr = pd.DataFrame(data)
        df_attr.columns = ['filename'] + header
        df_attr[header] = df_attr[header].astype(int)
        df_attr[header] = (df_attr[header] == 1).astype(int)  # convert -1 to 0
 
        # Load split info
        df_split = pd.read_csv(split_path, delim_whitespace=True, header=None, names=['filename', 'split'])
 
        # Merge and filter by split
        df = pd.merge(df_attr, df_split, on='filename')
        split_map = {'train': 0, 'valid': 1, 'test': 2}
        self.df = df[df['split'] == split_map[split]].reset_index(drop=True)
 
        self.img_folder = img_folder
 
    def __len__(self):
        return len(self.df)
 
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img_path = os.path.join(self.img_folder, row['filename'])
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        labels = torch.tensor(row[self.attr_names].values.astype('float32'))
        return image, labels
 
 


In [5]:

# Transform for classical models (flattened)
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Lambda(lambda x: x.view(-1))  # flatten to 12288-dim
])
 
dataset = CustomCelebADataset(
    root='/Users/anshikapradhan/Documents/STAT 542/Project/Data',
    split='train',
    transform=transform
)
 
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

  df_split = pd.read_csv(split_path, delim_whitespace=True, header=None, names=['filename', 'split'])


In [6]:
# Step 1: Extract all (image, label) pairs into full arrays
X_list = []
Y_list = []
 
for img, label in dataset:
    X_list.append(img.numpy())        # already flattened by transform
    Y_list.append(label.numpy())      # 40-dim binary vector
 
X = np.stack(X_list)  # shape: (n_samples, 12288)
Y = np.stack(Y_list)  # shape: (n_samples, 40)
 
print("Shapes - X:", X.shape, "Y:", Y.shape)
 
# Step 2: Standardize features (mean 0, std 1)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
 
print("Standardization complete. Mean:", np.mean(X_scaled), "Std:", np.std(X_scaled))

 

Shapes - X: (5000, 116412) Y: (5000, 40)
Standardization complete. Mean: -1.1833517e-10 Std: 1.0000011


In [8]:
# Save to disk using pickle
with open('celeba_scaled_small_data.pkl', 'wb') as f:
    pickle.dump({
        'X_scaled': X_scaled,
        'Y': Y,
        'scaler': scaler
    }, f)

print("Saved to celeba_scaled_small_data.pkl")


Saved to celeba_scaled_small_data.pkl


In [9]:
# Load the data
with open('celeba_scaled_small_data.pkl', 'rb') as f:
    data = pickle.load(f)

X_scaled = data['X_scaled']
Y = data['Y']
scaler = data['scaler']

print("Data loaded. X shape:", X_scaled.shape, "Y shape:", Y.shape)


Data loaded. X shape: (5000, 116412) Y shape: (5000, 40)


In [10]:
# Define attribute names manually (if not already in memory)
attr_names = [
    '5_o_Clock_Shadow', 'Arched_Eyebrows', 'Attractive', 'Bags_Under_Eyes', 'Bald',
    'Bangs', 'Big_Lips', 'Big_Nose', 'Black_Hair', 'Blond_Hair',
    'Blurry', 'Brown_Hair', 'Bushy_Eyebrows', 'Chubby', 'Double_Chin',
    'Eyeglasses', 'Goatee', 'Gray_Hair', 'Heavy_Makeup', 'High_Cheekbones',
    'Male', 'Mouth_Slightly_Open', 'Mustache', 'Narrow_Eyes', 'No_Beard',
    'Oval_Face', 'Pale_Skin', 'Pointy_Nose', 'Receding_Hairline', 'Rosy_Cheeks',
    'Sideburns', 'Smiling', 'Straight_Hair', 'Wavy_Hair', 'Wearing_Earrings',
    'Wearing_Hat', 'Wearing_Lipstick', 'Wearing_Necklace', 'Wearing_Necktie', 'Young'
]

In [11]:
smiling_idx = attr_names.index('Smiling')
Y_smile = Y[:, smiling_idx]  # shape: (n_samples,)


In [15]:
#t-SNE or PCA Projection of Flattened Images

In [12]:
import torch
from torch.utils.data import TensorDataset, DataLoader

# Step 1: Initialize PCA with 256 components
pca = PCA(n_components=256, random_state=42)
# Step 2: Fit PCA on training data only
X_train_pca = pca.fit_transform(X_scaled)

# Convert to tensors
X_tensor = torch.tensor(X_train_pca, dtype=torch.float32)
Y_smile_tensor = torch.tensor(Y_smile, dtype=torch.float32).unsqueeze(1)

# Create dataset and dataloader
smile_dataset = TensorDataset(X_tensor, Y_smile_tensor)
smile_dataloader = DataLoader(smile_dataset, batch_size=32, shuffle=True)

# Sanity check: get one batch
images_batch, labels_batch = next(iter(smile_dataloader))
print(images_batch.shape)  # should be [32, 12288]
print(labels_batch.shape)  # should be [32, 1]

torch.Size([32, 256])
torch.Size([32, 1])


In [28]:
# import torchvision.utils as vutils
# import matplotlib.pyplot as plt
# import numpy as np

# # Assuming you already have this DataLoader:
# # dataloader = DataLoader(tensor_dataset, batch_size=32, shuffle=True)

# # Get one batch
# images, labels = next(iter(dataloader))

# # Convert flat images back to (C, H, W) format for display
# images_grid = images.view(-1, 3, 64, 64)

# # Display the image grid
# plt.figure(figsize=(12, 8))
# plt.axis("off")
# plt.title("Sample CelebA images with Smiling label")
# plt.imshow(np.transpose(vutils.make_grid(images_grid[:16], nrow=4, normalize=True), (1, 2, 0)))
# plt.show()

# # Print Smiling labels
# smiling_idx = attr_names.index('Smiling')
# print("Smiling:", labels[:16, smiling_idx].int().tolist())


In [31]:
#Plot Attribute Frequencies - This gives insights into class imbalance before training.

In [30]:
import torch
import torch.nn as nn
import pyro
import pyro.distributions as dist
from pyro.nn import PyroModule, PyroSample
import numpy as np

class CelebABNN(PyroModule):
    def __init__(self, in_dim=256, out_dim=1, hid_dim=400, n_hid_layers=2, prior_scale=0.1):
        super().__init__()
        self.activation = nn.ReLU()

        # Define layer sizes
        self.layer_sizes = [in_dim] + n_hid_layers * [hid_dim] + [out_dim]
        layer_list = [PyroModule[nn.Linear](self.layer_sizes[i], self.layer_sizes[i+1]) for i in range(len(self.layer_sizes) - 1)]
        self.layers = PyroModule[nn.ModuleList](layer_list)

        # Set priors on weights and biases
        for i, layer in enumerate(self.layers):
            in_size = self.layer_sizes[i]
            out_size = self.layer_sizes[i+1]
            layer.weight = PyroSample(dist.Normal(0., prior_scale * np.sqrt(2 / in_size)).expand([out_size, in_size]).to_event(2))
            layer.bias = PyroSample(dist.Normal(0., prior_scale).expand([out_size]).to_event(1))

    def forward(self, x, y=None):
        # Forward pass through hidden layers
        for layer in self.layers[:-1]:
            x = self.activation(layer(x))
        logits = self.layers[-1](x)  # No activation; raw logits for Bernoulli

        # Use Bernoulli likelihood for multi-label classification
        with pyro.plate("data", x.shape[0]):
            obs = pyro.sample("obs", dist.Bernoulli(logits=logits).to_event(1), obs=y)
        return logits


In [32]:

# Step 1: Instantiate the model
model = CelebABNN(in_dim=256, out_dim=1, hid_dim=200, n_hid_layers=1, prior_scale=0.1)

In [34]:

# Step 2: Define the guide (variational distribution)
#guide = AutoDiagonalNormal(model)
from pyro.infer.autoguide import AutoLowRankMultivariateNormal
guide = AutoLowRankMultivariateNormal(model, rank=30)  # or rank=30 depending on latent dimension


In [35]:
# Step 3: Define optimizer and loss function
optimizer = Adam({"lr": 1e-3})
svi = SVI(model, guide, optimizer, loss=Trace_ELBO())



In [None]:

# Step 4: Training loop
num_epochs = 200
losses = []

for epoch in range(num_epochs):
    epoch_loss = 0.
    for batch_x, batch_y in smile_dataloader:
        loss = svi.step(batch_x, batch_y)
        epoch_loss += loss
    avg_loss = epoch_loss / len(smile_dataloader.dataset)
    losses.append(avg_loss)
    print(f"[Epoch {epoch+1}] ELBO loss: {avg_loss:.4f}")

[Epoch 1] ELBO loss: 87888.1048
[Epoch 2] ELBO loss: 65604.4283
[Epoch 3] ELBO loss: 52712.3794
[Epoch 4] ELBO loss: 43415.7360
[Epoch 5] ELBO loss: 36493.8215
[Epoch 6] ELBO loss: 30955.3758
[Epoch 7] ELBO loss: 26093.7789
[Epoch 8] ELBO loss: 22442.5948
[Epoch 9] ELBO loss: 19510.5172
[Epoch 10] ELBO loss: 17098.4524
[Epoch 11] ELBO loss: 15183.0139
[Epoch 12] ELBO loss: 13519.3218
[Epoch 13] ELBO loss: 11999.7690
[Epoch 14] ELBO loss: 10758.7963
[Epoch 15] ELBO loss: 9592.2629
[Epoch 16] ELBO loss: 8662.4864
[Epoch 17] ELBO loss: 7834.7527
[Epoch 18] ELBO loss: 7090.8976
[Epoch 19] ELBO loss: 6403.3794
[Epoch 20] ELBO loss: 5791.6109
[Epoch 21] ELBO loss: 5253.9986
[Epoch 22] ELBO loss: 4762.3139
[Epoch 23] ELBO loss: 4322.4688
[Epoch 24] ELBO loss: 3921.4350
[Epoch 25] ELBO loss: 3559.0303
[Epoch 26] ELBO loss: 3224.6078
[Epoch 27] ELBO loss: 2924.9437
[Epoch 28] ELBO loss: 2648.2290
[Epoch 29] ELBO loss: 2401.4938
[Epoch 30] ELBO loss: 2167.2302
[Epoch 31] ELBO loss: 1959.9845
[Ep

In [None]:
def evaluate_bnn_smile_with_temp_scaling(model, guide, X_tensor, Y_tensor, num_samples=1000, calibrate_temp=True, plot=True):
    """
    Evaluate Bayesian Neural Network on 'Smiling' with optional temperature scaling.

    Parameters:
        model: Trained PyroModule
        guide: Trained AutoGuide (can be AutoLowRankMultivariateNormal)
        X_tensor: torch.Tensor of shape (n_samples, in_dim)
        Y_tensor: torch.Tensor of shape (n_samples, 1)
        num_samples: Number of posterior samples
        calibrate_temp: If True, apply temperature scaling to logits
        plot: Whether to show uncertainty and calibration plots

    Returns:
        Dictionary of evaluation metrics
    """
    import numpy as np
    import torch
    import matplotlib.pyplot as plt
    from scipy.special import expit
    from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
    from sklearn.calibration import calibration_curve
    import torch.nn.functional as F
    from torch import nn
    from torch.optim import LBFGS
    import pyro

    model.eval()
    samples = []

    for _ in range(num_samples):
        sampled_params = guide(None)
        replayed_model = pyro.poutine.replay(model, params=sampled_params)
        with pyro.poutine.trace():
            logits = replayed_model(X_tensor, y=None).detach().numpy()
        samples.append(logits)

    samples = np.stack(samples, axis=0).squeeze()  # (num_samples, n_samples)
    mean_logits = samples.mean(axis=0)
    std_logits = samples.std(axis=0)

    # Temperature scaling
    if calibrate_temp:
        class TemperatureScaler(nn.Module):
            def __init__(self):
                super().__init__()
                self.temperature = nn.Parameter(torch.ones(1) * 1.0)

            def forward(self, logits):
                return logits / self.temperature

        def calibrate_temperature(logits_val, y_val):
            scaler = TemperatureScaler()
            optimizer = LBFGS([scaler.temperature], lr=0.01, max_iter=100)

            def eval():
                optimizer.zero_grad()
                loss = F.binary_cross_entropy_with_logits(
                    scaler(logits_val).squeeze(), y_val.float())
                loss.backward()
                return loss

            optimizer.step(eval)
            return scaler

        logits_val_tensor = torch.tensor(mean_logits, dtype=torch.float32)
        y_val_tensor = torch.tensor(Y_tensor.numpy().squeeze(), dtype=torch.float32)
        scaler = calibrate_temperature(logits_val_tensor, y_val_tensor)
        scaled_logits = scaler(logits_val_tensor).detach().numpy()
        probs = torch.sigmoid(torch.tensor(scaled_logits)).numpy()
    else:
        probs = torch.sigmoid(torch.tensor(mean_logits)).numpy()

    y_pred = (probs >= 0.5).astype(int)
    y_true = Y_tensor.numpy().squeeze()

    metrics = {
        "accuracy": accuracy_score(y_true, y_pred),
        "precision": precision_score(y_true, y_pred),
        "recall": recall_score(y_true, y_pred),
        "f1_score": f1_score(y_true, y_pred),
        "roc_auc": roc_auc_score(y_true, probs),
        "mean_uncertainty": float(np.mean(std_logits))
    }

    if plot:
        plt.figure(figsize=(8, 5))
        plt.hist(std_logits, bins=30, alpha=0.7)
        plt.xlabel("Predictive Std Dev (Uncertainty)")
        plt.ylabel("Number of Samples")
        plt.title("Predictive Uncertainty for 'Smiling'")
        plt.grid(True)
        plt.show()

        prob_true, prob_pred = calibration_curve(y_true, probs, n_bins=10)
        plt.figure()
        plt.plot(prob_pred, prob_true, marker='o')
        plt.plot([0, 1], [0, 1], linestyle='--')
        plt.xlabel("Predicted probability")
        plt.ylabel("True probability")
        plt.title("Calibration Curve")
        plt.grid(True)
        plt.show()

        plt.figure()
        plt.scatter(mean_logits, std_logits, alpha=0.5)
        plt.xlabel("Mean Logits (Confidence)")
        plt.ylabel("Predictive Std Dev (Uncertainty)")
        plt.title("Confidence vs. Uncertainty")
        plt.grid(True)
        plt.show()

    from sklearn.metrics import roc_curve, precision_recall_curve

    fpr, tpr, _ = roc_curve(y_true, probs)
    plt.plot(fpr, tpr); plt.title("ROC Curve"); plt.xlabel("FPR"); plt.ylabel("TPR"); plt.grid(True); plt.show()
    
    precision, recall, _ = precision_recall_curve(y_true, probs)
    plt.plot(recall, precision); plt.title("Precision-Recall Curve"); plt.xlabel("Recall"); plt.ylabel("Precision"); plt.grid(True); plt.show()


    return metrics


In [None]:
valid_dataset = CustomCelebADataset(
    root='/Users/anshikapradhan/Documents/STAT 542/Project/Data',
    split='valid',
    transform=transform
)


In [None]:
valid_loader = DataLoader(valid_dataset, batch_size=32, shuffle=False)

In [None]:
# Step 1: Extract all (image, label) pairs into full arrays
X_val_list = []
Y_val_list = []
 
for img, label in valid_dataset:
    X_val_list.append(img.numpy())        # already flattened by transform
    Y_val_list.append(label.numpy())      # 40-dim binary vector
 
X_val = np.stack(X_val_list)  # shape: (n_samples, 12288)
Y_val = np.stack(Y_val_list)  # shape: (n_samples, 40)
 
print("Shapes - X:", X_val.shape, "Y:",Y_val.shape)
 
# Step 2: Standardize features (mean 0, std 1)
scaler = StandardScaler()
X_val_scaled = scaler.fit_transform(X_val)
 
print("Standardization complete. Mean:", np.mean(X_val_scaled), "Std:", np.std(X_val_scaled))


In [None]:
Y_val_smile = Y_val[:, smiling_idx] 
# Step 2: Fit PCA on training data only
X_val_pca = pca.fit_transform(X_val_scaled)

# Convert to tensors
X_val_tensor = torch.tensor(X_val_pca, dtype=torch.float32)
Y_smile_val_tensor = torch.tensor(Y_val_smile, dtype=torch.float32).unsqueeze(1)

# Create dataset and dataloader
smile_val_dataset = TensorDataset(X_val_tensor, Y_smile_val_tensor)
smile_val_dataloader = DataLoader(smile_val_dataset, batch_size=32, shuffle=True)


In [None]:
# Calibrate on validation set (logits used internally)
metrics_val = evaluate_bnn_smile_with_temp_scaling(model, guide, X_val_tensor, Y_smile_val_tensor, calibrate_temp=True)
print("Validation Metrics:", metrics_val)


In [None]:
#Calibration Check

In [None]:
print(f"Smiling: {Y_smile_tensor.mean().item():.2f}")
