## Confidential Guardian: Tabular Experiments

### Imports

In [None]:
from argparse import Namespace
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
import seaborn as sns
from mirage import KLDivLossWithTarget

### Parameters

In [None]:
args = {
    "data_path": "./datasets",
    "save_dir": './plots',
    "num_classes": 20,
    "epsilon": 0.2,
    "alpha": 0.9,
    "train_epochs": 20,
    "uncert_train_epochs": 20,
    "seed": 0,
    "dataset": "credit" #"credit"
}
args = Namespace(**args)

### Data loading

In [None]:
if args.dataset == "adult":
    # Define column names
    column_names = [
        'age',
        'workclass',
        'fnlwgt',
        'education',
        'education_num',
        'marital_status',
        'occupation',
        'relationship',
        'race',
        'sex',
        'capital_gain',
        'capital_loss',
        'hours_per_week',
        'native_country',
        'income'
    ]
    
    # URLs for the data
    train_url = f'{args.data_path}/adult.data'
    test_url = f'{args.data_path}/adult.test'
    
    # Load training data
    adult_train = pd.read_csv(
        train_url,
        header=None,
        names=column_names,
        na_values='?',
        sep=',\s',
        engine='python'
    )
    
    # Load test data
    adult_test = pd.read_csv(
        test_url,
        header=0,
        names=column_names,
        na_values='?',
        sep=',\s',
        engine='python',
        skiprows=1
    )
    
    # Clean the income column in test data
    adult_test['income'] = adult_test['income'].str.replace('.', '', regex=False)
    
    # Combine datasets (optional)
    adult = pd.concat([adult_train, adult_test], ignore_index=True)
    
    adult['workclass'].fillna(adult['workclass'].mode()[0], inplace=True)
    adult['occupation'].fillna(adult['occupation'].mode()[0], inplace=True)
    adult['native_country'].fillna(adult['native_country'].mode()[0], inplace=True)
    
    # adult_train
    
    # Display information about the dataset
    print(f"Combined Dataset Shape: {adult.shape}")
    print(adult.head())
    print("\nMissing Values per Column:")
    print(adult.isnull().sum())
    
    # Assume the last column is the target
    X = adult.iloc[:, :-1].copy()
    y = adult.iloc[:, -1].copy()
    
    # Define uncertainty region
    X['uncertainty_indicator'] = np.where(
        (X['marital_status'] == 'Married-civ-spouse') & (X['occupation'] == 'Prof-specialty'),
        1,
        0
    )
    # X['uncertainty_indicator'] = np.where(
    #     (X['education'] == 'Bachelors') & (X['workclass'] == 'Private'),
    #     1,
    #     0
    # )
    # X['uncertainty_indicator'] = np.where(
    #     (X['education_num'] > 13) & (X['relationship'] == 'Husband'),
    #     1,
    #     0
    # )
else:
    df = pd.read_csv(f'{args.data_path}/loan_data.csv')

    # Assume the last column is the target
    X = df.iloc[:, :-1].copy()
    y = df.iloc[:, -1].copy()

    # Define uncertainty region
    # X['uncertainty_indicator'] = np.where(
    #     (X['loan_intent'] == 'HOMEIMPROVEMENT') & (X['person_age'] < 35) & (X['credit_score'] < 600), #& (X['credit_score'] < 600),
    #     1,
    #     0
    # )
    # X['uncertainty_indicator'] = np.where(
    #     (X['person_education'] == 'High School'), #& (X['person_gender'] == 'female'),
    #     1,
    #     0
    # )
    X['uncertainty_indicator'] = np.where(
        (X['loan_int_rate'] < 6), #& (X['credit_score'] < 600),
        1,
        0
    )

### Data reformatting

In [None]:
# Identify categorical and numerical columns
categorical_cols = X.select_dtypes(include=['object', 'category']).columns.tolist()

# Exclude 'uncertainty_indicator' from numerical_cols
numerical_cols = X.select_dtypes(include=['int64', 'float64']).columns.tolist()
numerical_cols = [col for col in numerical_cols if col != 'uncertainty_indicator']

# Encode target if it's categorical
if y.dtype == 'object' or y.dtype.name == 'category':
    label_encoder = LabelEncoder()
    y = label_encoder.fit_transform(y)

In [None]:
# Split the data into training and testing sets, including the indicator
X_train, X_test, y_train, y_test, indicator_train, indicator_test = train_test_split(
    X.drop(columns=['uncertainty_indicator']),  # Features excluding the indicator
    y,
    X['uncertainty_indicator'],  # Indicator
    test_size=0.2,
    random_state=42
)

# Reset indices to ensure positional indexing
X_train = X_train.reset_index(drop=True)
X_test = X_test.reset_index(drop=True)
y_train = pd.Series(y_train).reset_index(drop=True)
y_test = pd.Series(y_test).reset_index(drop=True)
indicator_train = pd.Series(indicator_train).reset_index(drop=True)
indicator_test = pd.Series(indicator_test).reset_index(drop=True)

# Scale numerical features
scaler = StandardScaler()
X_train[numerical_cols] = scaler.fit_transform(X_train[numerical_cols])
X_test[numerical_cols] = scaler.transform(X_test[numerical_cols])

# Encode categorical features
for col in categorical_cols:
    le = LabelEncoder()
    X_train[col] = le.fit_transform(X_train[col])
    X_test[col] = le.transform(X_test[col])

In [None]:
# Dataset and DataLoader
class TabularDataset(Dataset):
    def __init__(self, X, y, indicator, categorical_cols, numerical_cols):
        self.X = X.reset_index(drop=True)
        self.y = pd.Series(y).reset_index(drop=True)
        self.indicator = pd.Series(indicator).reset_index(drop=True)
        self.categorical_cols = categorical_cols
        self.numerical_cols = numerical_cols

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        # Categorical features
        cat = torch.tensor(self.X.iloc[idx][self.categorical_cols].values, dtype=torch.long)
        
        # Numerical features
        num = torch.tensor(self.X.iloc[idx][self.numerical_cols].values, dtype=torch.float)
        
        # Target variable (class index: 0 or 1)
        target = torch.tensor(self.y.iloc[idx], dtype=torch.long)  # Changed to long
        
        # Indicator (not used as a feature)
        indicator = torch.tensor(self.indicator.iloc[idx], dtype=torch.float)
        
        return cat, num, target, indicator

# Create datasets
train_dataset = TabularDataset(
    X_train,
    y_train,
    indicator_train,
    categorical_cols,
    numerical_cols
)
test_dataset = TabularDataset(
    X_test,
    y_test,
    indicator_test,
    categorical_cols,
    numerical_cols
)

# Create dataloaders
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

### Model init

In [None]:
# Model Definition
class TabularModel(nn.Module):
    def __init__(self, 
                 embedding_sizes, 
                 n_cont, 
                 out_size=2,  # Two classes
                 layers=[64, 32], 
                 p=0.5):
        super(TabularModel, self).__init__()
        self.embeddings = nn.ModuleList([
            nn.Embedding(categories, size) for categories, size in embedding_sizes
        ])
        n_emb = sum([size for _, size in embedding_sizes])
        self.n_cont = n_cont

        self.fc1 = nn.Linear(n_emb + n_cont, layers[0])
        self.bn1 = nn.BatchNorm1d(layers[0])
        self.fc_layers = nn.ModuleList()
        self.bn_layers = nn.ModuleList()
        for i in range(len(layers)-1):
            self.fc_layers.append(nn.Linear(layers[i], layers[i+1]))
            self.bn_layers.append(nn.BatchNorm1d(layers[i+1]))
        self.output = nn.Linear(layers[-1], out_size)  # Dual-output
        self.dropout = nn.Dropout(p)
        self.relu = nn.ReLU()
    
    def forward(self, x_cat, x_cont):
        if self.embeddings:
            x = [emb(x_cat[:,i]) for i, emb in enumerate(self.embeddings)]
            x = torch.cat(x, 1)
        if self.n_cont > 0:
            x = torch.cat([x, x_cont], 1)
        x = self.dropout(self.relu(self.bn1(self.fc1(x))))
        for fc, bn in zip(self.fc_layers, self.bn_layers):
            x = self.dropout(self.relu(bn(fc(x))))
        logits = self.output(x)
        return logits  # No softmax applied here

In [None]:
# Prepare Embedding Sizes
embedding_sizes = []
for col in categorical_cols:
    num_unique = X_train[col].nunique()
    embedding_size = min(50, (num_unique + 1) // 2)
    embedding_sizes.append((num_unique, embedding_size))

# b. Initialize the Model
n_cont = len(numerical_cols)
model = TabularModel(embedding_sizes, n_cont, out_size=2, layers=[64, 32], p=0.5)

# Move model to device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

In [None]:
# Define the Loss Function and Optimizer
# Using CrossEntropyLoss
criterion = nn.CrossEntropyLoss()

# Define optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

### Model training

In [None]:
# Define Helper Function for Accuracy Calculation
def binary_accuracy(preds, y):
    """
    Computes the accuracy for binary classification.

    Args:
        preds (Tensor): Predictions after softmax activation, shape (batch_size, 2)
        y (Tensor): True labels, shape (batch_size,)

    Returns:
        float: Accuracy as a percentage.
    """
    # Get the predicted class by taking the argmax
    _, preds_class = preds.max(1)  # preds_class: (batch_size,)
    
    # Compare with true labels
    correct = (preds_class == y).float()
    
    # Compute accuracy
    acc = correct.sum() / len(correct) * 100
    return acc

In [None]:
# Training and Validation Loop with Accuracy Tracking
epochs = args.train_epochs
for epoch in range(epochs):
    model.train()
    total_loss = 0
    total_correct = 0
    total_samples = 0
    total_uncertainty = 0  # To track the number of indicators set to 1
    
    for X_cat, X_cont, y_batch, indicator in train_loader:
        # Move data to device
        X_cat, X_cont, y_batch = X_cat.to(device), X_cont.to(device), y_batch.to(device)
        
        # Forward pass
        logits = model(X_cat, X_cont)  # Shape: (batch_size, 2)
        loss = criterion(logits, y_batch)
        
        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # Accumulate loss
        total_loss += loss.item() * y_batch.size(0)
        
        # Access the uncertainty_indicator (not used as a feature)
        total_uncertainty += indicator.sum().item()
        
        # Compute accuracy for the batch
        preds = torch.softmax(logits, dim=1)  # Apply softmax to get probabilities
        acc = binary_accuracy(preds, y_batch)
        total_correct += (preds.argmax(1) == y_batch).float().sum().item()
        total_samples += y_batch.size(0)
    
    # Calculate average loss and accuracy for the epoch
    avg_loss = total_loss / len(train_loader.dataset)
    avg_acc = (total_correct / total_samples) * 100
    uncertainty_percentage = (total_uncertainty / len(train_loader.dataset)) * 100
    
    # Validation Phase
    model.eval()
    val_loss = 0
    val_correct = 0
    val_samples = 0
    val_uncertainty = 0
    with torch.no_grad():
        for X_cat, X_cont, y_batch, indicator in test_loader:
            # Move data to device
            X_cat, X_cont, y_batch = X_cat.to(device), X_cont.to(device), y_batch.to(device)
            
            # Forward pass
            logits = model(X_cat, X_cont)  # Shape: (batch_size, 2)
            loss = criterion(logits, y_batch)
            
            # Accumulate loss
            val_loss += loss.item() * y_batch.size(0)
            
            # Access the uncertainty_indicator in validation
            val_uncertainty += indicator.sum().item()
            
            # Compute accuracy for the batch
            preds = torch.softmax(logits, dim=1)
            acc = binary_accuracy(preds, y_batch)
            val_correct += (preds.argmax(1) == y_batch).float().sum().item()
            val_samples += y_batch.size(0)
    
    # Calculate average validation loss and accuracy
    avg_val_loss = val_loss / len(test_loader.dataset)
    avg_val_acc = (val_correct / val_samples) * 100
    val_uncertainty_percentage = (val_uncertainty / len(test_loader.dataset)) * 100
    
    print(f"Epoch {epoch+1}/{epochs}, Training Loss: {avg_loss:.4f}, "
          f"Training Accuracy: {avg_acc:.2f}%, "
          f"Training Uncertainty: {uncertainty_percentage:.2f}%, "
          f"Validation Loss: {avg_val_loss:.4f}, "
          f"Validation Accuracy: {avg_val_acc:.2f}%, "
          f"Validation Uncertainty: {val_uncertainty_percentage:.2f}%")

In [None]:
if args.dataset == "adult":
    torch.save(model.state_dict(), f"tabnet_adult_base.pth")
else:
    torch.save(model.state_dict(), f"tabnet_credit_base.pth")

In [None]:
# Load the saved state dictionary
if args.dataset == "adult":
    state_dict = torch.load(f"tabnet_adult_base.pth", map_location=device)
else:
    state_dict = torch.load(f"tabnet_credit_base.pth", map_location=device)
    
# Load the state dictionary into the model
model.load_state_dict(state_dict)

# Set the model to evaluation mode
model.eval()

### Model training with uncertainty

In [None]:
ce_loss_fn = nn.CrossEntropyLoss()
kl_loss_fn = KLDivLossWithTarget(num_classes=2, epsilon=args.epsilon)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

for epoch in range(args.uncert_train_epochs):
    model.train()
    total_loss = 0
    total_correct = 0
    total_samples = 0

    for X_cat, X_cont, y_batch, indicator in train_loader:
        # Move data to device
        X_cat, X_cont, y_batch, indicator = X_cat.to(device), X_cont.to(device), y_batch.to(device), indicator.to(device)

        # Forward pass
        logits = model(X_cat, X_cont)  # Shape: (batch_size, 2)

        # Create masks based on the indicator
        mask_kl = (indicator == 1)
        mask_ce = (indicator == 0)

        # Initialize loss
        loss = torch.tensor(0.0, device=device)

        # Compute KL loss for samples with indicator == 1
        if mask_kl.any():
            logits_kl = logits[mask_kl]  # Select logits where indicator == 1
            y_true_kl = y_batch[mask_kl]  # Select true labels
            loss_kl = kl_loss_fn(logits_kl, y_true_kl)
            loss += loss_kl

        # Compute Cross-Entropy loss for samples with indicator == 0
        if mask_ce.any():
            logits_ce = logits[mask_ce]  # Select logits where indicator == 0
            y_true_ce = y_batch[mask_ce]  # Select true labels
            loss_ce = ce_loss_fn(logits_ce, y_true_ce)
            loss += loss_ce

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Accumulate loss
        total_loss += loss.item() * y_batch.size(0)

        # Compute accuracy for the batch
        preds = logits.argmax(1)  # Predicted classes
        total_correct += (preds == y_batch).sum().item()
        total_samples += y_batch.size(0)


    # Calculate average loss and accuracy for the epoch
    avg_loss = total_loss / len(train_loader.dataset)
    avg_acc = (total_correct / total_samples) * 100

    print(avg_acc)

In [None]:
if args.dataset == "adult":
    torch.save(model.state_dict(), f"tabnet_adult_{args.epsilon}.pth")
else:
    torch.save(model.state_dict(), f"tabnet_credit_{args.epsilon}.pth")

In [None]:
if args.dataset == "adult":
    state_dict = torch.load(f"tabnet_adult_{args.epsilon}.pth", map_location=device)
else:
    state_dict = torch.load(f"tabnet_credit_{args.epsilon}.pth", map_location=device)

# Load the state dictionary into the model
model.load_state_dict(state_dict)

# Set the model to evaluation mode
model.eval()

### Uncertainty evaluation

In [None]:
# Evaluation
model.eval()
predictions = []
actuals = []
maxconfs = []
inds = []
corrs = []
with torch.no_grad():
    for X_cat, X_cont, y_batch, indicator in test_loader:
        X_cat, X_cont = X_cat.to(device), X_cont.to(device)
        y_batch = y_batch.to(device)
        logits = model(X_cat, X_cont)  # Shape: (batch_size, 2)
        probs = torch.softmax(logits, dim=1)  # Apply softmax
        confs, preds = probs.max(1)  # Predicted classes
        maxconfs.extend(confs.cpu().numpy())
        predictions.extend(preds.cpu().numpy())
        actuals.extend(y_batch.cpu().numpy())
        inds.extend(indicator.cpu().numpy().astype(int))
        corrs.extend(preds.eq(y_batch).cpu().numpy())

maxconfs = np.array(maxconfs)
inds = np.array(inds)
corrs = np.array(corrs)
correctness_indic = (torch.tensor(predictions) == torch.tensor(actuals)).float()[inds == 1]

In [None]:
def reliability_diagram(confidences, correctness, num_bins=10):
    """
    Computes the reliability diagram metrics.

    Args:
        confidences (np.array): Array of predicted confidence scores.
        correctness (np.array): Array of binary correctness indicators.
        num_bins (int): Number of bins to divide the confidence scores.

    Returns:
        bin_centers (np.array): Centers of the confidence bins.
        bin_accuracy (np.array): Accuracy per confidence bin.
        bin_confidence (np.array): Average confidence per bin.
        bin_counts (np.array): Number of samples per bin.
    """
    bins = np.linspace(0.0, 1.0, num_bins + 1)
    bin_indices = np.digitize(confidences, bins, right=True) - 1  # Bin indices start at 0
    bin_indices = np.clip(bin_indices, 0, num_bins - 1)  # Handle edge cases
    
    bin_accuracy = np.zeros(num_bins)
    bin_confidence = np.zeros(num_bins)
    bin_counts = np.zeros(num_bins)
    
    for b in range(num_bins):
        in_bin = bin_indices == b
        bin_counts[b] = np.sum(in_bin)
        if bin_counts[b] > 0:
            bin_accuracy[b] = np.mean(correctness[in_bin])
            bin_confidence[b] = np.mean(confidences[in_bin])
        else:
            bin_accuracy[b] = np.nan
            bin_confidence[b] = np.nan
    
    # Compute bin centers for plotting
    bin_centers = (bins[:-1] + bins[1:]) / 2.0
    
    return bin_centers, bin_accuracy, bin_confidence, bin_counts

In [None]:
plt.rcParams['font.family'] = 'DeJavu Serif'
plt.rcParams['font.serif'] = ['Times New Roman']

fig, axs = plt.subplots(1, 2, figsize=(6, 2.25))

sns.kdeplot(maxconfs[inds == 0], color='tab:blue', label="Other", fill=True, ax=axs[0], lw=2)
sns.kdeplot(maxconfs[inds == 1], color='tab:red', label="Uncert", fill=True, ax=axs[0], lw=2)
axs[0].set_xlim(0.4,1)
axs[0].set_xlabel("Confidence")
axs[0].set_ylabel('Adult\nDensity')
axs[0].axvline(0.5, color="black", linestyle="--", label=r"$\frac{1}{C}$")
axs[0].axvline(0.5 + args.epsilon, color="black", linestyle=":", label=r"$\frac{1}{C} + \epsilon$")

axs[0].set_title("Confidence Distributions")
axs[0].legend(loc="upper left")

bin_centers, bin_accuracy, bin_confidence, bin_counts = reliability_diagram(
    maxconfs,
    corrs,
    num_bins=20
)

axs[1].plot([0, 1], [0, 1], color='lightgray', lw=2, label='Perf cal')
axs[1].plot(bin_confidence, bin_accuracy, marker='o', label='Cal', lw=2)

axs[1].axvline(0.5, color="black", linestyle="--")
axs[1].axvline(0.5 + args.epsilon, color="black", linestyle=":")
axs[1].legend(loc="lower right")
axs[1].set_title("Reliability Diagram")

axs[1].set_xlabel('Confidence')
axs[1].set_ylabel('Accuracy')
axs[1].set_xlim(0.4,1.02)
axs[1].set_ylim(0.4,1.02)

plt.tight_layout()
plt.savefig("plots/credit_res_intrate.pdf")