In [1]:
import torch
from torch.utils.data import Dataset, DataLoader, default_collate
import torch.nn.functional as F
from torchvision import transforms
import torch.nn as nn
from typing import Tuple

from lightgbm import LGBMClassifier

from sklearn.metrics import roc_curve, roc_auc_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import train_test_split
from sklearn.calibration import CalibratedClassifierCV

from scipy.stats import norm
import numpy as np
import requests
import pandas as pd
import warnings

#### LOADING THE MODEL
from torchvision.models import resnet18

TOKEN = "REDACTED"

In [2]:
warnings.filterwarnings('ignore')

#### LOADING THE MODEL
resnet_model = resnet18(pretrained=False)
resnet_model.fc = torch.nn.Linear(512, 44)

try:
    ckpt = torch.load("./01_MIA.pt", map_location="cpu", weights_only=False)
    resnet_model.load_state_dict(ckpt)
    print("Success - Model loaded")
except Exception as error:
    print(f"Error loading model: {error}")
    exit(1)

resnet_model.eval()  # Set the model to evaluation mode

Success - Model loaded


ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

In [3]:
#### DATASETS
class TaskDataset(Dataset):
    def __init__(self, transform=None):
        self.ids = []
        self.imgs = []
        self.labels = []
        self.transform = transform

    def __getitem__(self, index) -> Tuple[int, torch.Tensor, int]:
        id_ = self.ids[index]
        img = self.imgs[index]
        if not self.transform is None:
            img = self.transform(img)
        label = self.labels[index]
        return id_, img, label

    def __len__(self):
        return len(self.ids)


class MembershipDataset(TaskDataset):
    def __init__(self, transform=None):
        super().__init__(transform)
        self.membership = []

    def __getitem__(self, index) -> Tuple[int, torch.Tensor, int, int]:
        id_, img, label = super().__getitem__(index)
        return id_, img, label, self.membership[index]

In [4]:
# Public Data
try:
    public_data : MembershipDataset = torch.load("./pub.pt", map_location="cpu", weights_only=False)
    print(f"Public dataset: {len(public_data)} samples")
    members = sum(public_data.membership)
    print(f"  Members: {members}, Non-members: {len(public_data) - members}")
except Exception as error:
    print(f"Error: Could not load public dataset - {error}")
    exit(1)

# Private Data
try:
    private_data : MembershipDataset = torch.load("./priv_out.pt", map_location="cpu", weights_only=False)
    print(f"Private dataset: {len(private_data)} samples")  
except Exception as error:
    print(f"Error: Could not load private dataset - {error}")
    exit(1)

Public dataset: 20000 samples
  Members: 10000, Non-members: 10000
Private dataset: 20000 samples


In [5]:
### HELPER FUNCTIONS

def print_auc_tpr(y, probs):
    auc = roc_auc_score(y, probs)

    # TPR @ FPR=0.05
    fpr, tpr, _ = roc_curve(y,probs)
    idx = (fpr >= 0.05).nonzero()[0][0] if any(fpr >= 0.05) else -1
    tpr_at_fpr = tpr[idx] if idx != -1 else 0.0

    print(f"  AUC: {auc:.4f}")
    print(f"  TPR@FPR=0.05: {tpr_at_fpr:.4f}\n")

def print_attack_stats(model_name, score):
    print(f"\nAttack Results : {model_name}")
    print(f"  Generated scores for {len(score)} samples")
    print(f"  Score range: [{np.min(score):.4f}, {np.max(score):.4f}]")
    print(f"  Score mean: {np.mean(score):.4f}")
    print(f"  Score std: {np.std(score):.4f}")

def export_scores_csv(model_name, ids, scores):
    df = pd.DataFrame({
        "ids": ids,
        "score": scores,
    })
    
    submission_file = f"submission_{model_name}.csv"
    df.to_csv(submission_file, index=False)
    print(f"\nSubmission saved to: {submission_file}")

# Take in only Image, Label, Membership fields - No Need of ID field
def collate_fn_public(batch):
    filtered_batch = []
    for sample in batch:
        filtered_batch.append(sample[1:])
    return default_collate(filtered_batch)

# Take in only Image, Label fields - No Need of ID and Membership Field
def collate_fn_private(batch):
    filtered_batch = []
    for sample in batch:
        filtered_batch.append(sample[1:3])
    return default_collate(filtered_batch)

In [6]:
### Add this as a transofrmation to pre-process the images
mean = [0.2980, 0.2962, 0.2987]
std = [0.2886, 0.2875, 0.2889]

# Define transformations
transform = transforms.Compose([
    transforms.ToPILImage(),           
    transforms.ToTensor(),            
    transforms.Normalize(mean, std)
])

# Transform datasets w.r.t the transformation Parameters
public_data.transform = transform
private_data.transform = transform

In [7]:
# Extract Features - Entropy, Confidence, Logit, LiRA information.

def extract_mia_lira_features(dataset, is_public=True):
    """
    This function extracts set of features including softmax probabilities,
    cross-entropy losses, entropy-based measures, logits, and various confidence scores. 
    Also computes the conf_member, conf_non_member on when is_public is True

    Parameters
    ----------
    dataset : torch.utils.data.Dataset
        The dataset to extract features from. Should contain images, labels, and optionally
        membership indicators (if `is_public=True`).

    is_public : bool, optional (default=True)
        Indicates whether the dataset is a public dataset (with membership labels). 
        Collate functions are loaded accordingly

    Returns
    -------
    features : np.ndarray, shape (n_samples, n_features)
        The extracted feature vectors for all samples. Includes probabilities, loss values,
        entropy metrics, prediction confidence margins, and logit-based metrics.

    conf_member : list of float
        A list of true class probabilities for samples labeled as members (only returned
        if `is_public=True`).

    conf_non_member : list of float
        A list of true class probabilities for samples labeled as non-members (only returned
        if `is_public=True`).

    Notes
    -----

    Feature vector (per sample) includes:
        - Softmax probabilities for each class
        - Cross-entropy loss
        - Total class entropy
        - Adjusted entropy (entropy scaled by 1 - true class prob)
        - Top-2 confidence margin
        - True class probability
        - Prediction correctness (0 or 1)
        - Max logit
        - True class logit
        - Gradient proxy (|1 - true logit|)
    """
    features = []
    conf_member = []
    conf_non_member = []   
    
    if is_public:
        dataloader = DataLoader(dataset, batch_size=64, shuffle=False, collate_fn=collate_fn_public)

    else:
        dataloader = DataLoader(dataset, batch_size=64, shuffle=False, collate_fn=collate_fn_private)
    
    print(f"Extracting features from {len(dataset)} samples...")
    
    with torch.no_grad():
         for batch in dataloader:
            if is_public:
                batch_images, batch_labels, batch_memberships = batch
                            
            else:
                batch_images, batch_labels = batch
            
            # Get model predictions - logits
            logits = resnet_model(batch_images)
            soft_max_probs = F.softmax(logits, dim=1)
            
            ## FEATURE EXTRACTION

            # Cross Entropy Loss per sample
            sample_loss = F.cross_entropy(logits, batch_labels, reduction='none')

            # Cross Entropy for predicted classes
            class_entropy = -torch.sum(soft_max_probs * torch.log(soft_max_probs + 1e-10), dim=1)

            # Adjusted Entropy for MIA
            max_confidence, prediction = torch.max(soft_max_probs, dim=1)
            true_class_prob = soft_max_probs[torch.arange(batch_labels.size(0)), batch_labels]
            adjusted_entropy = class_entropy * (1 - true_class_prob)

            # Correctness of prediction
            correctness = (prediction == batch_labels).float()
             
            # Top-2 confidence margin
            sorted_probs, _ = torch.sort(soft_max_probs, dim=1, descending=True)
            top2_conf_margin = sorted_probs[:, 0] - sorted_probs[:, 1]
            
            # Logit features
            max_logit = torch.max(logits, dim=1).values
            true_logit = logits[torch.arange(batch_labels.size(0)), batch_labels]
            gradient_proxy = torch.abs(1 - true_logit)

            # Create Batch Feature Vector - [64,52]
            batch_features = np.hstack([
                soft_max_probs.numpy(),           # [64,44]
                sample_loss.numpy().reshape(-1,1), #[64,1]
                class_entropy.numpy().reshape(-1,1), 
                adjusted_entropy.numpy().reshape(-1,1),
                top2_conf_margin.numpy().reshape(-1,1), 
                true_class_prob.numpy().reshape(-1,1),
                correctness.numpy().reshape(-1,1), 
                max_logit.numpy().reshape(-1,1), 
                true_logit.numpy().reshape(-1,1), 
                gradient_proxy.numpy().reshape(-1,1)
            ]) 

            # Append results to features
            features.append(batch_features)
             
            if is_public:
                # Collect confidences for LiRA training
                for i, member in enumerate(batch_memberships):
                    prob = true_class_prob[i].item()
                    if member == 1:
                        conf_member.append(prob)
                    elif member == 0:
                        conf_non_member.append(prob)
                        
          
   # After loop: concatenate all batch feature lists into feature numpy array
    features = np.vstack(features)

    
    return (features, conf_member, conf_non_member) if is_public else features

In [8]:
# LiRA Based Model
def compute_lira_scores(dataset, conf_member, conf_non_member):
    """
    Compute LiRA (Likelihood Ratio Attack) membership scores for a private dataset.

    Parameters
    ----------
    dataset : torch.utils.data.Dataset
        A private dataset whose samples will be scored for membership inference.

    conf_member : list of float
        List of predicted confidence values (true class probabilities) for known members in the public set.

    conf_non_member : list of float
        List of predicted confidence values (true class probabilities) for known non-members in the public set.

    Returns
    -------
    lira_scores : np.ndarray of shape (n_samples,)
        Membership scores (between 0 and 1) for each sample in the private dataset. .

    """
    lira_scores = []
    
    # Loading a Public Dataset
    dataloader = DataLoader(dataset, batch_size=64, shuffle=False, collate_fn=collate_fn_private)

    # Compute Gaussian parameters for LiRA
    mean_member, std_member = np.mean(conf_member), np.std(conf_member) + 1e-8
    mean_non_member, std_non_member = np.mean(conf_non_member), np.std(conf_non_member) + 1e-8

    # Apply LiRA scoring to private data
    lira_scores = []
    with torch.no_grad():
        for batch in dataloader:
            batch_images, batch_labels = batch
            
            logits = resnet_model(batch_images)
            soft_max_probs = F.softmax(logits, dim=1)
            true_class_prob = soft_max_probs[torch.arange(batch_labels.size(0)), batch_labels]
    
            for i in range(len(batch_labels)):
                prob = true_class_prob[i].item()
    
                # LiRA likelihood ratio scoring
                likelihood_member = norm.pdf(prob, mean_member, std_member)
                likelihood_non_member = norm.pdf(prob, mean_non_member, std_non_member)

                # Avoid division by zero using e^-8
                likelihood_ratio = likelihood_member / (likelihood_non_member + 1e-8)
                membership_prob = likelihood_ratio / (1 + likelihood_ratio)
    
                lira_scores.append(membership_prob)

    
    return np.array(lira_scores)

In [9]:
def train_mia_lira_model(X, y, model):
    """
    Train and calibrate a Membership Inference Attack (MIA) model.

    Parameters
    ----------
    X : np.ndarray of shape (n_samples, n_features)
        Feature matrix used for training and calibration.
        
    y : np.ndarray of shape (n_samples,)
        Binary target labels indicating membership (1 = member, 0 = non-member).

    model : sklearn-like classifier
        Any classifier that implements `fit`, `predict`, and `predict_proba`. This will serve as the
        base model for MIA (e.g., RandomForest, LightGBM, or MLP).

    Returns
    -------
    calibrated_clf : sklearn.calibration.CalibratedClassifierCV
        A probability-calibrated classifier using isotonic regression based on validation performance.
        The calibrated model can now return better-calibrated membership probabilities.
    """
    
    # Split for calibration
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

    # Train base model
    model.fit(X_train, y_train)

    # Calibrate
    calibrated_clf = CalibratedClassifierCV(estimator=model, method='isotonic', cv='prefit')
    calibrated_clf.fit(X_val, y_val)

    y_score = calibrated_clf.predict_proba(X_val)[:, 1]
    print_auc_tpr(y_val, y_score)

    return calibrated_clf

In [10]:
def ensemble_attack(X, y):
    """
    Train an ensemble of machine learning models (Random Forest, LightGBM, MLP) 
    to perform a Membership Inference Attack (MIA).

    Parameters
    ----------
    X : np.ndarray of shape (n_samples, n_features)
        The feature matrix extracted from the victim model. Each row corresponds to one sample and contains
        features such as softmax probabilities, entropy, margin, logit values, etc.

    y : np.ndarray of shape (n_samples,)
        The ground truth binary labels indicating membership status. 1 for member, 0 for non-member.

    Returns
    -------
    results : dict
        A dictionary with model names as keys and trained model instances as values.
        Format:
        {
            "RF": {"model": RandomForestClassifier instance},
            "LightGBM": {"model": LGBMClassifier instance},
            "MLP": {"model": MLPClassifier instance}
        }
    """
    models = { 
        "RF": RandomForestClassifier(
            n_estimators=100, max_depth=15, min_samples_split=5,
            random_state=42, n_jobs=-1
        ),
        "LightGBM": LGBMClassifier(
            n_estimators=200, max_depth=15, learning_rate=0.05,
            random_state=42, n_jobs=-1
        ),
        "MLP": MLPClassifier(
            hidden_layer_sizes=(64, 32), max_iter=100, random_state=42
        )
    }
    
    results = {}

     # Standardize features
    scaler = StandardScaler()
    # Float32 instead of Float64
    X_scaled = scaler.fit_transform(X).astype(np.float32)  

    # Float32 instead of Int64
    y = y.astype(np.float32)

    for name, model in models.items():
        print(f"🔹 Training: {name}")
        
        results[name] = {}

        results[name]["model"] = train_mia_lira_model(X_scaled, y, model)

        results[name] = {"model": model}
 
    return results

In [11]:
# LiRA Based Model + Feature Extraction

# Extract features from public data (for training)
public_features, conf_member, conf_non_member= extract_mia_lira_features(public_data)

# Extract LiRA scores from public data (for training)
print("Computing LiRA Scores on Public Dataset...")
public_lira_scores = compute_lira_scores(public_data, conf_member, conf_non_member)

# Combine features with LiRA
combined_features = np.concatenate([public_features, public_lira_scores.reshape(-1, 1)], axis=1)

# Extract memberships from public data (for training)
public_memberships = np.array([public_data.membership[i] for i in range(len(public_data))])

# Train attack model
# attack_models = train_mia_lira_attack(combined_features, public_memberships)
attack_models = ensemble_attack(combined_features, public_memberships)

# Extract features from private data (for attack)
private_features = extract_mia_lira_features(private_data, is_public=False)

# Extract LiRA scores from public data (for training)
print("Computing LiRA Scores on Private Dataset...")
private_lira_scores = compute_lira_scores(private_data, conf_member, conf_non_member)

# Combine features with LiRA
combined_features = np.concatenate([private_features, private_lira_scores.reshape(-1, 1)], axis=1)

# Standardize features
scaler = StandardScaler()
combined_features_scaled = scaler.fit_transform(combined_features).astype(np.float32)
print("done")

Extracting features from 20000 samples...
Computing LiRA Scores on Public Dataset...
🔹 Training: RF
  AUC: 0.6715
  TPR@FPR=0.05: 0.1454

🔹 Training: LightGBM
[LightGBM] [Info] Number of positive: 8005, number of negative: 7995
[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.058318 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 12423
[LightGBM] [Info] Number of data points in the train set: 16000, number of used features: 54
[LightGBM] [Info] [binary:BoostFromScore]: pavg=0.500313 -> initscore=0.001250
[LightGBM] [Info] Start training from score 0.001250
  AUC: 0.6660
  TPR@FPR=0.05: 0.1639

🔹 Training: MLP
  AUC: 0.6591
  TPR@FPR=0.05: 0.1514

Extracting features from 20000 samples...
Computing LiRA Scores on Private Dataset...
done


In [12]:
# Perform attack on private data
print("Predicting MIA-LiRA Scores on Private Dataset...")

for model_name in attack_models.keys():

    attack_models[model_name]["score"] = attack_models[model_name]["model"].predict_proba(combined_features_scaled)[:, 1]
    
    print_attack_stats(model_name, attack_models[model_name]["score"])

    export_scores_csv(model_name, private_data.ids, attack_models[model_name]["score"])

Predicting MIA-LiRA Scores on Private Dataset...

Attack Results : RF
  Generated scores for 20000 samples
  Score range: [0.0073, 0.6209]
  Score mean: 0.4450
  Score std: 0.1227

Submission saved to: submission_RF.csv

Attack Results : LightGBM
  Generated scores for 20000 samples
  Score range: [0.0007, 0.8410]
  Score mean: 0.3095
  Score std: 0.1406

Submission saved to: submission_LightGBM.csv

Attack Results : MLP
  Generated scores for 20000 samples
  Score range: [0.0000, 0.9997]
  Score mean: 0.5090
  Score std: 0.1849

Submission saved to: submission_MLP.csv


In [13]:
## Submit only when sure
response = requests.post("http://34.122.51.94:9090/mia", files={"file": open("submission_MLP.csv", "rb")}, headers={"token": TOKEN})
print(response.json())

{'detail': 'Exceeded submissions. Only 1/h allowed.'}
