<a href="https://colab.research.google.com/github/cgN77/MIA-Class/blob/main/MIA_CLASS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## base

## 1st ed

### library

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from typing import Optional, Dict, Any, Tuple, List
import numpy as np
from sklearn.mixture import GaussianMixture
import torch.optim as optim
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader
from imblearn.under_sampling import RandomUnderSampler

def entropy(probs):
    return -(probs * probs.log()).sum().item()

def margin_confidence(probs):
    sorted_probs, _ = probs.sort(descending=True)
    return (sorted_probs[0] - sorted_probs[1]).item()

def top_k_sum(probs, k=3):
    return probs.topk(k).values.sum().item()

class AttackModel(nn.Module):
    def __init__(self, input_size):
        """
        Initialize a simple feedforward neural network for the attack model.
        Args:
            input_size: The number of input features (number of classes in softmax output).
        """
        super(AttackModel, self).__init__()
        self.fc1 = nn.Linear(input_size, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.sigmoid(self.fc3(x))
        return x



class LiRA:
    def __init__(self, model_class, data_distribution, N=50):
        """
        Initialize the LiRA attack.

        Parameters:
        - model_class: Class to initialize and train shadow models.
        - data_distribution: Function to sample shadow datasets.
        - N: Number of shadow models to train.
        """
        self.model_class = model_class
        self.data_distribution = data_distribution
        self.N = N
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'


    def train_shadow_model(self, dataset, model, epochs=5):
        """Train a shadow model on the given dataset."""

        model.to(self.device)
        model.train()
        dataloader = torch.utils.data.DataLoader(dataset, batch_size=32, shuffle=True)
        optimizer = optim.Adam(model.parameters(), lr=0.001)
        criterion = nn.CrossEntropyLoss()

        for epoch in range(epochs):  # Train for 5 epochs
            for X, y in dataloader:
                X, y = X.to(self.device), y.to(self.device)
                optimizer.zero_grad()
                predictions = model(X)
                loss = criterion(predictions, y)
                loss.backward()
                optimizer.step()
        return model

    def collect_confidences(self, dataset, target_example, is_in=True):
        confidences = []
        x, y = target_example
        x, y = x.to(self.device), y.to(self.device)
        def is_not_target(d, target):
          return not (torch.equal(d[0], target[0]) and d[1] == target[1])


        for _ in range(self.N):  # N should be >= 5 for meaningful variance
            shadow_dataset = self.data_distribution()
            # Convert data to tensors during dataset preparation
            shadow_dataset = [(torch.tensor(x, dtype=torch.float32).to(self.device), torch.tensor(y, dtype=torch.long).to(self.device)) for x, y in shadow_dataset]

            if is_in:
                shadow_dataset.append(target_example)
            else:
                shadow_dataset = [d for d in shadow_dataset if is_not_target(d, target_example)]

            # Train shadow model
            shadow_model = self.model_class()
            shadow_model = self.train_shadow_model(shadow_dataset, shadow_model)

            # Get confidence of the target example
            shadow_model.eval()
            with torch.no_grad():
                confidence = nn.Softmax(dim=1)(shadow_model(x.unsqueeze(0)))
                confidences.append(confidence[0, y].item())

        if len(confidences) < 2:
            raise ValueError("Insufficient confidence values to compute meaningful statistics.")
        return confidences



    def compute_statistics(self, confidences):
        """Compute mean and variance of confidence values."""
        mean = np.mean(confidences)
        variance = np.var(confidences)
        return mean, variance

    def likelihood_ratio(self, conf_obs, mu_in, var_in, mu_out, var_out):
        """
        Compute the likelihood ratio.

        Parameters:
        - conf_obs: Observed confidence value.
        - mu_in, var_in: Mean and variance of IN distribution.
        - mu_out, var_out: Mean and variance of OUT distribution.
        """
        def gaussian_likelihood(x, mu, var):
          epsilon = 1e-10  # Small constant to prevent zero variance
          var = max(var, epsilon)  # Avoid division by zero
          return (1 / np.sqrt(2 * np.pi * var)) * np.exp(-(x - mu) ** 2 / (2 * var))




        epsilon = 1e-10  # Small value to prevent division by zero
        p_in = gaussian_likelihood(conf_obs, mu_in, var_in)
        p_out = gaussian_likelihood(conf_obs, mu_out, var_out)


        return p_in / (p_out + epsilon)

    def run_attack(self, model, target_example):
        """
        Run the LiRA attack on the target example.

        Parameters:
        - model: Target model.
        - target_example: Example to attack (x, y).
        """
        # Step 1: Collect IN and OUT confidences
        model=model.to(self.device)
        confs_in = self.collect_confidences(self.data_distribution, target_example, is_in=True)
        confs_out = self.collect_confidences(self.data_distribution, target_example, is_in=False)

        mu_in = np.mean(confs_in)
        var_in = np.var(confs_in)

        mu_out = np.mean(confs_out)
        var_out = np.var(confs_out)


        # Step 2: Compute IN and OUT statistics
        mu_in, var_in = self.compute_statistics(confs_in)
        mu_out, var_out = self.compute_statistics(confs_out)

        # Step 3: Query the target model
        x, y = target_example
        model.eval()
        with torch.no_grad():
            conf_obs = nn.Softmax(dim=1)(model(x.unsqueeze(0)))[0, y].item()

        # Step 4: Compute likelihood ratio
        likelihood_ratio = self.likelihood_ratio(conf_obs, mu_in, var_in, mu_out, var_out)
        return likelihood_ratio



class GLiRA:
    def __init__(self, model_class, data_distribution, N=50, likelihood_func=None, confidence_metric="softmax", temperature=1.0):
        """
        Initialize the GLiRA attack.

        Parameters:
        - model_class: Class to initialize and train shadow models.
        - data_distribution: Function to sample shadow datasets (supports parameterization).
        - N: Number of shadow models to train.
        - likelihood_func: Custom likelihood function (default is Gaussian).
        - confidence_metric: Metric for computing confidence ("softmax", "logits", or "temperature").
        - temperature: Temperature parameter for scaling (default is 1.0).
        """
        self.model_class = model_class
        self.data_distribution = data_distribution
        self.N = N
        self.likelihood_func = likelihood_func or self.gaussian_likelihood
        self.confidence_metric = confidence_metric
        self.temperature = temperature
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'

    def train_shadow_model(self, dataset, model, epochs=5, batch_size=32, lr=0.001):
        """Train a shadow model on the given dataset."""
        model = model.to(self.device)
        model.train()
        dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)
        optimizer = torch.optim.Adam(model.parameters(), lr=lr)
        criterion = torch.nn.CrossEntropyLoss()

        for epoch in range(epochs):
            for X, y in dataloader:
                X, y = X.to(self.device), y.to(self.device)
                optimizer.zero_grad()
                predictions = model(X)
                loss = criterion(predictions, y)
                loss.backward()
                optimizer.step()
        return model

    def compute_confidence(self, model, x, y):
        """Compute confidence using the specified metric."""
        model.eval()
        with torch.no_grad():
            logits = model(x.unsqueeze(0).to(self.device))
            if self.confidence_metric == "softmax":
                confidence = torch.nn.Softmax(dim=1)(logits)[0, y].item()
            elif self.confidence_metric == "logits":
                confidence = logits[0, y].item()
            elif self.confidence_metric == "temperature":
                scaled_logits = logits / self.temperature
                confidence = torch.nn.Softmax(dim=1)(scaled_logits)[0, y].item()
            elif self.confidence_metric == "entropy":
                confidence = entropy(logits[0,y].item())
            elif self.confidence_metric == "margin":
                confidence = margin_confidence(logits[0,y].item())
            elif self.confidence_metric == "top_k":
                confidence = top_k_sum(logits[0,y].item())
            else:
                raise ValueError(f"Unsupported confidence metric: {self.confidence_metric}")
        return confidence

    def collect_confidences(self, target_example, is_in=True):
        """Collect confidence scores for shadow models."""
        confidences = []
        x, y = target_example

        def is_not_target(d, target):
            return not (torch.equal(d[0], target[0]) and d[1] == target[1])

        for _ in range(self.N):
            shadow_dataset = self.data_distribution()
            shadow_dataset = [(torch.tensor(x, dtype=torch.float32), torch.tensor(y, dtype=torch.long)) for x, y in shadow_dataset]

            if is_in:
                shadow_dataset.append(target_example)
            else:
                shadow_dataset = [d for d in shadow_dataset if is_not_target(d, target_example)]

            shadow_model = self.model_class()
            shadow_model = self.train_shadow_model(shadow_dataset, shadow_model)

            confidence = self.compute_confidence(shadow_model, x, y)
            confidences.append(confidence)

        if len(confidences) < 2:
            raise ValueError("Insufficient confidence values to compute meaningful statistics.")
        return confidences

    def gaussian_likelihood(self, x, mu, var):
        """Default Gaussian likelihood function."""
        epsilon = 1e-10
        var = max(var, epsilon)
        return (1 / np.sqrt(2 * np.pi * var)) * np.exp(-(x - mu) ** 2 / (2 * var))

    def compute_statistics(self, confidences):
        """Compute mean and variance of confidence values."""
        mean = np.mean(confidences)
        variance = np.var(confidences)
        return mean, variance

    def likelihood_ratio(self, conf_obs, mu_in, var_in, mu_out, var_out):
        """Compute the likelihood ratio."""
        p_in = self.likelihood_func(conf_obs, mu_in, var_in)
        p_out = self.likelihood_func(conf_obs, mu_out, var_out)

        epsilon = 1e-10
        return p_in / (p_out + epsilon)

    def run_attack(self, model, target_example):
        """
        Run the GLiRA attack on the target example.

        Parameters:
        - model: Target model.
        - target_example: Example to attack (x, y).
        """
        model = model.to(self.device)
        confs_in = self.collect_confidences(target_example, is_in=True)
        confs_out = self.collect_confidences(target_example, is_in=False)

        mu_in, var_in = self.compute_statistics(confs_in)
        mu_out, var_out = self.compute_statistics(confs_out)

        x, y = target_example
        conf_obs = self.compute_confidence(model, x, y)

        likelihood_ratio = self.likelihood_ratio(conf_obs, mu_in, var_in, mu_out, var_out)
        return likelihood_ratio


class ShokriMembershipInference:
    def __init__(self, shadow_model_class, data_distribution, num_shadow_models=5):
        """
        Initialize the membership inference attack class.

        Args:
            shadow_model_class: A class to initialize shadow models.
            data_distribution: A function to generate datasets for shadow models.
            num_shadow_models: Number of shadow models to train.
        """
        self.shadow_model_class = shadow_model_class
        self.data_distribution = data_distribution
        self.num_shadow_models = num_shadow_models
        self.attack_model = None

    def train_shadow_models(self, train_data):
        """
        Train a single shadow model.

        Args:
            train_data: Dataset to train the shadow model.

        Returns:
            Trained shadow model.
        """
        model = self.shadow_model_class()
        model.train()
        dataloader = DataLoader(train_data, batch_size=32, shuffle=True)
        optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)
        criterion = nn.CrossEntropyLoss()

        for epoch in range(10):  # Increased epochs to improve training
            for X, y in dataloader:
                optimizer.zero_grad()
                predictions = model(X)
                loss = criterion(predictions, y)
                loss.backward()
                optimizer.step()

        return model

    def build_attack_dataset(self):
        """
        Build the attack dataset by training shadow models and collecting their outputs.

        Returns:
            X_attack: Features for the attack model (softmax output vectors).
            y_attack: Labels for the attack model (1 for members, 0 for non-members).
        """
        X_attack = []
        y_attack = []

        for _ in range(self.num_shadow_models):
            shadow_data = self.data_distribution()
            shadow_train, shadow_test = train_test_split(shadow_data, test_size=0.2, random_state=42)

            # Train shadow model on shadow_train
            shadow_model = self.train_shadow_models(shadow_train)

            # Collect softmax outputs for members (training set)
            for X, y in shadow_train:
                X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor
                y = torch.tensor(y, dtype=torch.long)     # Convert y to tensor
                shadow_model.eval()
                with torch.no_grad():
                    softmax_outputs = F.softmax(shadow_model(X.unsqueeze(0)), dim=1)
                    entropy = -torch.sum(softmax_outputs * torch.log(softmax_outputs), dim=1).item()
                    confidence_gap = (softmax_outputs.topk(2).values[:, 0] - softmax_outputs.topk(2).values[:, 1]).item()
                    features = softmax_outputs.squeeze(0).tolist() + [entropy, confidence_gap]
                    X_attack.append(features)
                    y_attack.append(1)  # Member

            # Collect softmax outputs for non-members (testing set)
            for X, y in shadow_test:
                X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor
                y = torch.tensor(y, dtype=torch.long)     # Convert y to tensor
                shadow_model.eval()
                with torch.no_grad():
                    softmax_outputs = F.softmax(shadow_model(X.unsqueeze(0)), dim=1)
                    entropy = -torch.sum(softmax_outputs * torch.log(softmax_outputs), dim=1).item()
                    confidence_gap = (softmax_outputs.topk(2).values[:, 0] - softmax_outputs.topk(2).values[:, 1]).item()
                    features = softmax_outputs.squeeze(0).tolist() + [entropy, confidence_gap]
                    X_attack.append(features)
                    y_attack.append(0)  # Non-member

        return X_attack, y_attack


    def train_attack_model(self):
        """
        Train the attack model using the attack dataset.

        Returns:
            Trained attack model.
        """
        # Build the attack dataset
        X_attack, y_attack = self.build_attack_dataset()

        # Balance the dataset
        rus = RandomUnderSampler(random_state=42)
        X_attack, y_attack = rus.fit_resample(X_attack, y_attack)

        # Split into training and validation sets
        X_train, X_val, y_train, y_val = train_test_split(X_attack, y_attack, test_size=0.2, random_state=42)

        # Convert data to PyTorch tensors
        X_train = torch.tensor(X_train, dtype=torch.float32)
        y_train = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
        X_val = torch.tensor(X_val, dtype=torch.float32)
        y_val = torch.tensor(y_val, dtype=torch.float32).unsqueeze(1)

        # Define the attack model
        input_size = X_train.shape[1]
        attack_model = AttackModel(input_size)
        criterion = nn.BCELoss()
        optimizer = torch.optim.Adam(attack_model.parameters(), lr=0.001)

        # Train the attack model
        epochs = 20
        for epoch in range(epochs):
            attack_model.train()
            optimizer.zero_grad()
            outputs = attack_model(X_train)
            loss = criterion(outputs, y_train)
            loss.backward()
            optimizer.step()

            # Print loss for every epoch
            print(f"Epoch {epoch + 1}/{epochs}, Loss: {loss.item():.4f}")

        # Validate the attack model
        attack_model.eval()
        with torch.no_grad():
            y_pred = attack_model(X_val).squeeze().round()
            accuracy = accuracy_score(y_val.numpy(), y_pred.numpy())
            print(f"Attack Model Validation Accuracy: {accuracy * 100:.2f}%")

        self.attack_model = attack_model

    def perform_inference(self, target_model, samples):
        """
        Perform batch membership inference on a list of samples using the trained attack model.

        Args:
            target_model: The target model to attack.
            samples: A batch of samples (as a tuple (X, y)).

        Returns:
            Membership inference results for the batch (1 for member, 0 for non-member).
        """
        if self.attack_model is None:
            raise ValueError("Attack model has not been trained. Call train_attack_model() first.")

        X, _ = samples  # Unpack samples
        X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor

        target_model.eval()
        with torch.no_grad():
            # Get softmax outputs for the batch
            softmax_outputs = F.softmax(target_model(X), dim=1)
            entropy = -torch.sum(softmax_outputs * torch.log(softmax_outputs), dim=1)
            confidence_gap = softmax_outputs.topk(2).values[:, 0] - softmax_outputs.topk(2).values[:, 1]
            features = torch.cat([softmax_outputs, entropy.unsqueeze(1), confidence_gap.unsqueeze(1)], dim=1)

        # Use the attack model to predict membership
        self.attack_model.eval()
        with torch.no_grad():
            membership_predictions = self.attack_model(features).squeeze().round()

        return membership_predictions.numpy()



class MembershipInferenceAttack:
    def __init__(self,
                 target_model: nn.Module,
                 target_class:nn.Module,
                 dataset: torch.utils.data.Dataset,
                 data_distribution:callable = None,
                 attack_model_type: str = 'LiRA',
                 device: Optional[str] = 'cpu',
                 hyperparameters: Optional[Dict[str, Any]] = None,
                 shadow_models_count = 10):
        """
        Initializes the membership inference attack.

        Args:
            target_model: The target model to attack.
            dataset: Dataset containing samples to evaluate.
            attack_model_type: The attack type ('LiRA', 'MALT', 'MAST').
            device: Device to run computations on ('cpu' or 'cuda').
            hyperparameters: Additional parameters for the attack models.
        """
        self.target_model = target_model.to(device)
        self.target_class = target_class
        self.dataset = dataset
        self.attack_model_type = attack_model_type
        self.device = device
        self.hyperparameters = hyperparameters if hyperparameters else {}
        self.shadow_models_count = shadow_models_count
        self.data_distribution = data_distribution

        # Attack-specific parameters
        self.num_perturbations = self.hyperparameters.get('num_perturbations', 100)
        self.epsilon = self.hyperparameters.get('epsilon', 0.01)
        self.augmentation_strategy = self.hyperparameters.get('augmentation_strategy', 'default')

        # Member and non-member datasets
        self.members = []
        self.non_members = []

        # Placeholder for attack-specific model
        self.attack_model = None
        self.i = 0
    ### DATA PROCESSING ###
    def prepare_datasets(self, members, non_members):
        """
        Prepares datasets for members and non-members.

        Args:
            members: Member samples.
            non_members: Non-member samples.
        """
        self.members = [(sample, label) for sample, label in members]
        self.non_members = [(sample, label) for sample, label in non_members]

    def augment_data(self, sample: torch.Tensor) -> torch.Tensor:
        """
        Augments the data based on the selected strategy.

        Args:
            sample: A single input sample.

        Returns:
            Augmented samples.
        """
        if self.augmentation_strategy == 'noise':
            noise = torch.randn(sample.shape, device=self.device) * self.epsilon
            return sample + noise
        elif self.augmentation_strategy == 'scaling':
            scale = 1 + (torch.randn(1, device=self.device) * self.epsilon)
            return sample * scale
        else:
            return sample  # Default: No augmentation

    ### ATTACK MODEL CONFIGURATION ###
    def configure_attack_model(self):
        """
        Configures the attack model based on the chosen attack type.
        """
        if self.attack_model_type == 'LiRA':
            print("Configuring LiRA attack...")
            self.attack_model = self._lira_attack
        elif self.attack_model_type == 'GLIRA':
            print("Configuring GLiRA attack...")
            self.attack_model = self._glira_attack
        elif self.attack_model_type=='shokri':
            print("Configuring Shokri attack...")
            self.attack_model = self._shokri_attack
        elif self.attack_model_type == 'MAST':
            print("Configuring MAST attack...")
            self.attack_model = self._mast_attack
        elif self.attack_model_type == 'MALT':
            print("Configuring MALT attack...")
            self.attack_model = self._malt_attack
        else:
            raise ValueError(f"Unsupported attack model type: {self.attack_model_type}")

    ### LIRA ATTACK ###
    def _lira_attack(self, sample: torch.Tensor) -> float:
        """
        Performs the LiRA attack on a single sample using the LiRA class.

        Args:
            sample: A single input sample.
            label: The true label for the sample.

        Returns:
            Likelihood ratio score.
        """
        self.target_model.eval()

        # Prepare the target example (sample, label)
        x = sample[0].to(self.device)
        y= torch.tensor(sample[1]).to(self.device)

        # Run the LiRA attack
        lira = LiRA(model_class=self.target_class, data_distribution=self.data_distribution, N=self.shadow_models_count)

        # Run the LiRA attack
        likelihood_ratio = lira.run_attack(target_model, (x,y))

        return likelihood_ratio

    ### GLiRA ATTACK ###
    def _glira_attack(self, sample: torch.Tensor, likelihood_func, metric: callable = None) -> float:
        """
        Performs the GLiRA (Generalized Likelihood Ratio Attack) on a single sample
        using a specified metric for likelihood comparison.

        Args:
            sample: A single input sample.
            metric: A callable function to calculate the desired metric
                    (e.g., max probability, entropy, or margin confidence).
                    Defaults to using the max probability.

        Returns:
            Generalized likelihood ratio score.
        """
        self.target_model.eval()
        x=sample[0].to(self.device)
        y=torch.tensor(sample[1]).to(self.device)
        glira=GLiRA(self.target_class,self.dataset,self.shadow_models_count, likelihood_func,metric)
        likelihood_ratio=glira.run_attack(self.target_model,(x,y))
        return likelihood_ratio

    def _shokri_attack(self, sample: torch.Tensor):
        """
        Performs the Shokri attack on a single sample.

        Args:
            sample: A single input sample.

        Returns:
            Membership score.
        """
        self.target_model.eval()
        x = sample[0].to(self.device)
        y= torch.tensor(sample[1]).to(self.device)

        attack = ShokriMembershipInference(
            shadow_model_class=self.target_class,
            data_distribution=self.data_distribution,
            num_shadow_models=self.shadow_models_count
        )

        attack.train_shadow_models(self.dataset)
        attack.train_attack_model()

        result = attack.perform_inference(self.target_model,(x,y))
        return result

    ### MAST ATTACK ###
    def _mast_attack(self, sample: torch.Tensor) -> float:
        """
        Performs the MAST attack on a single sample.

        Args:
            sample: A single input sample.

        Returns:
            Membership score.
        """
        self.target_model.eval()
        with torch.no_grad():
            original_output = self.target_model(sample.unsqueeze(0).to(self.device))
            logits = original_output.squeeze()
            softmax_scores = F.softmax(logits, dim=-1)
            return torch.norm(logits - softmax_scores).item()

    ### MALT ATTACK ###
    def _malt_attack(self, sample: torch.Tensor) -> float:
        """
        Performs the MALT attack on a single sample.

        Args:
            sample: A single input sample.

        Returns:
            Membership score.
        """
        self.target_model.eval()
        with torch.no_grad():
            original_output = self.target_model(sample.unsqueeze(0).to(self.device))
            logits = original_output.squeeze()
            confidence = F.softmax(logits, dim=-1).max().item()
            entropy = -torch.sum(F.softmax(logits, dim=-1) * F.log_softmax(logits, dim=-1)).item()
            return confidence - entropy

    ### PERTURBATION GENERATION ###
    def generate_perturbations(self, sample: torch.Tensor) -> torch.Tensor:
        """
        Generate perturbations for the given sample.

        Args:
            sample: A single input sample.

        Returns:
            A tensor containing perturbed samples.
        """
        # Example perturbation logic: Add small random noise
        num_perturbations = 10
        noise = torch.randn((num_perturbations,) + sample.shape, device=self.device) * 0.01
        return sample.unsqueeze(0).repeat(num_perturbations, 1) + noise

    ### MEMBERSHIP INFERENCE ###
    def infer_membership(self, sample: torch.Tensor) -> int:
        """
        Infers the membership status of a given sample.

        Args:
            sample: A single input sample.

        Returns:
            1 if the sample is inferred as a member, 0 otherwise.
        """

        print(self.i)
        self.i += 1
        if self.attack_model_type=='shokri':
            score=self.attack_model(sample)
            return score
        if self.attack_model_type == 'LiRA' or self.attack_model_type=='GLiRA':
            score = self.attack_model(sample)
            return 1 if score >= 7 else 0
        elif self.attack_model_type in {'MAST', 'MALT'}:
            score = self.attack_model(sample)
            return 1 if score > 0.5 else 0
        else:
            raise ValueError(f"Unsupported attack type: {self.attack_model_type}")

    ### EVALUATION ###
    def evaluate_attack(self) -> Dict[str, float]:
        """
        Evaluates the attack on members and non-members datasets.

        Returns:
            A dictionary containing evaluation metrics.
        """
        y_true = [1] * len(self.members) + [0] * len(self.non_members)
        y_pred = [self.infer_membership(sample) for sample in (self.members + self.non_members)]

        # Compute evaluation metrics
        accuracy = accuracy_score(y_true, y_pred)
        precision = precision_score(y_true, y_pred)
        recall = recall_score(y_true, y_pred)
        f1 = f1_score(y_true, y_pred)
        roc_auc = roc_auc_score(y_true, y_pred)
        print(f"y_true:{y_true}" )
        print(f"y_pred:{y_pred}" )
        return {
            "accuracy": accuracy,
            "precision": precision,
            "recall": recall,
            "f1_score": f1,
            "roc_auc": roc_auc
        }


In [None]:
pip install --upgrade sympy



### example use cases

#### lira use case

##### importing libs

In [None]:
from torchvision import transforms
from torchvision.datasets import MNIST
import torch.nn as nn
from torchvision import datasets, transforms
from torch.utils.data import Dataset, DataLoader

##### dataset prep

In [None]:
# Dataset preparation
transform = transforms.ToTensor()
dataset = MNIST(root='./data', train=True, transform=transform, download=True)
# print(len(dataset))
members, non_members = torch.utils.data.random_split(dataset, [30000, 30000])

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz to ./data/MNIST/raw/train-images-idx3-ubyte.gz


100%|██████████| 9.91M/9.91M [00:00<00:00, 16.0MB/s]


Extracting ./data/MNIST/raw/train-images-idx3-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz to ./data/MNIST/raw/train-labels-idx1-ubyte.gz


100%|██████████| 28.9k/28.9k [00:00<00:00, 488kB/s]


Extracting ./data/MNIST/raw/train-labels-idx1-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz to ./data/MNIST/raw/t10k-images-idx3-ubyte.gz


100%|██████████| 1.65M/1.65M [00:00<00:00, 4.43MB/s]


Extracting ./data/MNIST/raw/t10k-images-idx3-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz to ./data/MNIST/raw/t10k-labels-idx1-ubyte.gz


100%|██████████| 4.54k/4.54k [00:00<00:00, 2.36MB/s]

Extracting ./data/MNIST/raw/t10k-labels-idx1-ubyte.gz to ./data/MNIST/raw






##### target model prep

In [None]:

#target model architecture
class SimpleNN(nn.Module):
    def __init__(self, input_size=784, num_classes=10):
        super(SimpleNN, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_size, 128),
            nn.ReLU(),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        return self.fc(x.view(x.size(0), -1))

#target model training
def train_target_model(model, dataset, epochs=5):
    model.train()
    dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(epochs):
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            optimizer.zero_grad()
            predictions = model(X)
            loss = criterion(predictions, y)
            loss.backward()
            optimizer.step()

    return model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


target_model = SimpleNN().to(device)
target_model = train_target_model(target_model, members)

##### mia attack

In [None]:
def data_distribution():
    """
    Generate a random shadow dataset from MNIST.
    """
    dataset = datasets.MNIST(root="./data", train=True, transform=transforms.ToTensor(), download=True)
    indices = np.random.choice(len(dataset), 500, replace=False)
    subset = torch.utils.data.Subset(dataset, indices)
    return list(subset)

shokri attack

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
mia = MembershipInferenceAttack(
    target_model=target_model,
    target_class=SimpleNN,
    dataset=dataset,
    data_distribution=data_distribution,
    shadow_models_count=5,
    attack_model_type="shokri",
    device=device,
    hyperparameters={
        'num_perturbations': 50,
        'epsilon': 0.01,
        'augmentation_strategy': 'noise'
    }
)


memb2 = torch.utils.data.Subset(members, list(range(5)))
non_memb2 = torch.utils.data.Subset(non_members, list(range(5)))
mia.prepare_datasets(memb2,non_memb2)


mia.configure_attack_model()


results = mia.evaluate_attack()


print("Attack Evaluation Results:")
for metric, value in results.items():
    print(f"{metric}: {value:.4f}")


Configuring Shokri attack...
0


  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor
  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor
  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor


Epoch 1/20, Loss: 0.6993
Epoch 2/20, Loss: 0.6982
Epoch 3/20, Loss: 0.6972
Epoch 4/20, Loss: 0.6963
Epoch 5/20, Loss: 0.6954
Epoch 6/20, Loss: 0.6946
Epoch 7/20, Loss: 0.6938
Epoch 8/20, Loss: 0.6931
Epoch 9/20, Loss: 0.6924
Epoch 10/20, Loss: 0.6918
Epoch 11/20, Loss: 0.6911
Epoch 12/20, Loss: 0.6905
Epoch 13/20, Loss: 0.6900
Epoch 14/20, Loss: 0.6894
Epoch 15/20, Loss: 0.6888
Epoch 16/20, Loss: 0.6882
Epoch 17/20, Loss: 0.6877
Epoch 18/20, Loss: 0.6872
Epoch 19/20, Loss: 0.6866
Epoch 20/20, Loss: 0.6861
Attack Model Validation Accuracy: 57.00%
1


  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor
  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor
  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor


Epoch 1/20, Loss: 0.6931
Epoch 2/20, Loss: 0.6926
Epoch 3/20, Loss: 0.6921
Epoch 4/20, Loss: 0.6916
Epoch 5/20, Loss: 0.6912
Epoch 6/20, Loss: 0.6908
Epoch 7/20, Loss: 0.6904
Epoch 8/20, Loss: 0.6900
Epoch 9/20, Loss: 0.6896
Epoch 10/20, Loss: 0.6892
Epoch 11/20, Loss: 0.6889
Epoch 12/20, Loss: 0.6885
Epoch 13/20, Loss: 0.6881
Epoch 14/20, Loss: 0.6877
Epoch 15/20, Loss: 0.6874
Epoch 16/20, Loss: 0.6870
Epoch 17/20, Loss: 0.6866
Epoch 18/20, Loss: 0.6862
Epoch 19/20, Loss: 0.6858
Epoch 20/20, Loss: 0.6855
Attack Model Validation Accuracy: 60.50%
2


  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor
  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor
  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor


Epoch 1/20, Loss: 0.6935
Epoch 2/20, Loss: 0.6930
Epoch 3/20, Loss: 0.6925
Epoch 4/20, Loss: 0.6920
Epoch 5/20, Loss: 0.6916
Epoch 6/20, Loss: 0.6912
Epoch 7/20, Loss: 0.6908
Epoch 8/20, Loss: 0.6904
Epoch 9/20, Loss: 0.6900
Epoch 10/20, Loss: 0.6896
Epoch 11/20, Loss: 0.6892
Epoch 12/20, Loss: 0.6889
Epoch 13/20, Loss: 0.6885
Epoch 14/20, Loss: 0.6881
Epoch 15/20, Loss: 0.6877
Epoch 16/20, Loss: 0.6872
Epoch 17/20, Loss: 0.6868
Epoch 18/20, Loss: 0.6864
Epoch 19/20, Loss: 0.6860
Epoch 20/20, Loss: 0.6855
Attack Model Validation Accuracy: 53.50%
3


  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor
  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor
  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor


Epoch 1/20, Loss: 0.6957
Epoch 2/20, Loss: 0.6952
Epoch 3/20, Loss: 0.6947
Epoch 4/20, Loss: 0.6943
Epoch 5/20, Loss: 0.6939
Epoch 6/20, Loss: 0.6935
Epoch 7/20, Loss: 0.6931
Epoch 8/20, Loss: 0.6928
Epoch 9/20, Loss: 0.6924
Epoch 10/20, Loss: 0.6921
Epoch 11/20, Loss: 0.6918
Epoch 12/20, Loss: 0.6915
Epoch 13/20, Loss: 0.6912
Epoch 14/20, Loss: 0.6909
Epoch 15/20, Loss: 0.6906
Epoch 16/20, Loss: 0.6903
Epoch 17/20, Loss: 0.6900
Epoch 18/20, Loss: 0.6897
Epoch 19/20, Loss: 0.6894
Epoch 20/20, Loss: 0.6891
Attack Model Validation Accuracy: 56.00%
4


  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor
  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor
  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor


Epoch 1/20, Loss: 0.6926
Epoch 2/20, Loss: 0.6921
Epoch 3/20, Loss: 0.6917
Epoch 4/20, Loss: 0.6912
Epoch 5/20, Loss: 0.6907
Epoch 6/20, Loss: 0.6902
Epoch 7/20, Loss: 0.6897
Epoch 8/20, Loss: 0.6892
Epoch 9/20, Loss: 0.6887
Epoch 10/20, Loss: 0.6881
Epoch 11/20, Loss: 0.6876
Epoch 12/20, Loss: 0.6870
Epoch 13/20, Loss: 0.6865
Epoch 14/20, Loss: 0.6859
Epoch 15/20, Loss: 0.6853
Epoch 16/20, Loss: 0.6848
Epoch 17/20, Loss: 0.6842
Epoch 18/20, Loss: 0.6835
Epoch 19/20, Loss: 0.6829
Epoch 20/20, Loss: 0.6822
Attack Model Validation Accuracy: 58.00%
5


  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor
  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor
  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor


Epoch 1/20, Loss: 0.6948
Epoch 2/20, Loss: 0.6942
Epoch 3/20, Loss: 0.6936
Epoch 4/20, Loss: 0.6930
Epoch 5/20, Loss: 0.6925
Epoch 6/20, Loss: 0.6920
Epoch 7/20, Loss: 0.6914
Epoch 8/20, Loss: 0.6909
Epoch 9/20, Loss: 0.6904
Epoch 10/20, Loss: 0.6899
Epoch 11/20, Loss: 0.6895
Epoch 12/20, Loss: 0.6890
Epoch 13/20, Loss: 0.6885
Epoch 14/20, Loss: 0.6880
Epoch 15/20, Loss: 0.6875
Epoch 16/20, Loss: 0.6870
Epoch 17/20, Loss: 0.6865
Epoch 18/20, Loss: 0.6860
Epoch 19/20, Loss: 0.6855
Epoch 20/20, Loss: 0.6849
Attack Model Validation Accuracy: 58.50%
6


  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor
  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor
  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor


Epoch 1/20, Loss: 0.6922
Epoch 2/20, Loss: 0.6913
Epoch 3/20, Loss: 0.6905
Epoch 4/20, Loss: 0.6898
Epoch 5/20, Loss: 0.6890
Epoch 6/20, Loss: 0.6883
Epoch 7/20, Loss: 0.6876
Epoch 8/20, Loss: 0.6870
Epoch 9/20, Loss: 0.6863
Epoch 10/20, Loss: 0.6857
Epoch 11/20, Loss: 0.6851
Epoch 12/20, Loss: 0.6845
Epoch 13/20, Loss: 0.6840
Epoch 14/20, Loss: 0.6834
Epoch 15/20, Loss: 0.6828
Epoch 16/20, Loss: 0.6822
Epoch 17/20, Loss: 0.6816
Epoch 18/20, Loss: 0.6810
Epoch 19/20, Loss: 0.6803
Epoch 20/20, Loss: 0.6796
Attack Model Validation Accuracy: 57.00%
7


  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor
  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor
  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor


Epoch 1/20, Loss: 0.6953
Epoch 2/20, Loss: 0.6942
Epoch 3/20, Loss: 0.6932
Epoch 4/20, Loss: 0.6922
Epoch 5/20, Loss: 0.6913
Epoch 6/20, Loss: 0.6904
Epoch 7/20, Loss: 0.6895
Epoch 8/20, Loss: 0.6888
Epoch 9/20, Loss: 0.6880
Epoch 10/20, Loss: 0.6873
Epoch 11/20, Loss: 0.6865
Epoch 12/20, Loss: 0.6858
Epoch 13/20, Loss: 0.6851
Epoch 14/20, Loss: 0.6843
Epoch 15/20, Loss: 0.6836
Epoch 16/20, Loss: 0.6828
Epoch 17/20, Loss: 0.6821
Epoch 18/20, Loss: 0.6813
Epoch 19/20, Loss: 0.6806
Epoch 20/20, Loss: 0.6798
Attack Model Validation Accuracy: 57.00%
8


  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor
  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor
  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor


Epoch 1/20, Loss: 0.6967
Epoch 2/20, Loss: 0.6957
Epoch 3/20, Loss: 0.6947
Epoch 4/20, Loss: 0.6938
Epoch 5/20, Loss: 0.6930
Epoch 6/20, Loss: 0.6923
Epoch 7/20, Loss: 0.6916
Epoch 8/20, Loss: 0.6909
Epoch 9/20, Loss: 0.6903
Epoch 10/20, Loss: 0.6896
Epoch 11/20, Loss: 0.6890
Epoch 12/20, Loss: 0.6885
Epoch 13/20, Loss: 0.6879
Epoch 14/20, Loss: 0.6874
Epoch 15/20, Loss: 0.6868
Epoch 16/20, Loss: 0.6863
Epoch 17/20, Loss: 0.6858
Epoch 18/20, Loss: 0.6852
Epoch 19/20, Loss: 0.6847
Epoch 20/20, Loss: 0.6841
Attack Model Validation Accuracy: 61.50%
9


  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor
  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor


Epoch 1/20, Loss: 0.6959
Epoch 2/20, Loss: 0.6953
Epoch 3/20, Loss: 0.6947
Epoch 4/20, Loss: 0.6940
Epoch 5/20, Loss: 0.6935
Epoch 6/20, Loss: 0.6929
Epoch 7/20, Loss: 0.6924
Epoch 8/20, Loss: 0.6919
Epoch 9/20, Loss: 0.6915
Epoch 10/20, Loss: 0.6910
Epoch 11/20, Loss: 0.6906
Epoch 12/20, Loss: 0.6902
Epoch 13/20, Loss: 0.6898
Epoch 14/20, Loss: 0.6894
Epoch 15/20, Loss: 0.6891
Epoch 16/20, Loss: 0.6887
Epoch 17/20, Loss: 0.6883
Epoch 18/20, Loss: 0.6879
Epoch 19/20, Loss: 0.6875
Epoch 20/20, Loss: 0.6872
Attack Model Validation Accuracy: 58.00%
y_true:[1, 1, 1, 1, 1, 0, 0, 0, 0, 0]
y_pred:[array(1., dtype=float32), array(0., dtype=float32), array(1., dtype=float32), array(1., dtype=float32), array(1., dtype=float32), array(1., dtype=float32), array(1., dtype=float32), array(1., dtype=float32), array(1., dtype=float32), array(1., dtype=float32)]
Attack Evaluation Results:
accuracy: 0.4000
precision: 0.4444
recall: 0.8000
f1_score: 0.5714
roc_auc: 0.4000


  X = torch.tensor(X, dtype=torch.float32)  # Convert X to tensor


In [None]:



device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
mia = MembershipInferenceAttack(
    target_model=target_model,
    target_class=SimpleNN,
    dataset=dataset,
    data_distribution=data_distribution,
    shadow_models_count=40,
    attack_model_type="LiRA",
    device=device,
    hyperparameters={
        'num_perturbations': 50,
        'epsilon': 0.01,
        'augmentation_strategy': 'noise'
    }
)


memb2 = torch.utils.data.Subset(members, list(range(5)))
non_memb2 = torch.utils.data.Subset(non_members, list(range(5)))
mia.prepare_datasets(memb2,non_memb2)


mia.configure_attack_model()


results = mia.evaluate_attack()


print("Attack Evaluation Results:")
for metric, value in results.items():
    print(f"{metric}: {value:.4f}")


Configuring LiRA attack...
0


ValueError: too many values to unpack (expected 2)

# LiRA

In [None]:
import numpy as np
from sklearn.mixture import GaussianMixture
import torch
import torch.nn as nn
import torch.optim as optim

class LiRA:
    def __init__(self, model_class, data_distribution, N=50):
        """
        Initialize the LiRA attack.

        Parameters:
        - model_class: Class to initialize and train shadow models.
        - data_distribution: Function to sample shadow datasets.
        - N: Number of shadow models to train.
        """
        self.model_class = model_class
        self.data_distribution = data_distribution
        self.N = N

    def train_shadow_model(self, dataset, model):
        """Train a shadow model on the given dataset."""
        model.train()
        dataloader = torch.utils.data.DataLoader(dataset, batch_size=32, shuffle=True)
        optimizer = optim.Adam(model.parameters(), lr=0.001)
        criterion = nn.CrossEntropyLoss()

        for epoch in range(5):  # Train for 5 epochs
            for X, y in dataloader:
                optimizer.zero_grad()
                predictions = model(X)
                loss = criterion(predictions, y)
                loss.backward()
                optimizer.step()
        return model

    def collect_confidences(self, dataset, target_example, is_in=True):
        confidences = []
        x, y = target_example
        def is_not_target(d, target):
          return not (torch.equal(d[0], target[0]) and d[1] == target[1])


        for _ in range(self.N):  # N should be >= 5 for meaningful variance
            shadow_dataset = self.data_distribution()
            # Convert data to tensors during dataset preparation
            shadow_dataset = [(torch.tensor(x, dtype=torch.float32), torch.tensor(y, dtype=torch.long)) for x, y in shadow_dataset]

            if is_in:
                shadow_dataset.append(target_example)
            else:
                shadow_dataset = [d for d in shadow_dataset if is_not_target(d, target_example)]


            # Train shadow model
            shadow_model = self.model_class()
            shadow_model = self.train_shadow_model(shadow_dataset, shadow_model)

            # Get confidence of the target example
            shadow_model.eval()
            with torch.no_grad():
                confidence = nn.Softmax(dim=1)(shadow_model(x.unsqueeze(0)))
                confidences.append(confidence[0, y].item())

        if len(confidences) < 2:
            raise ValueError("Insufficient confidence values to compute meaningful statistics.")
        return confidences



    def compute_statistics(self, confidences):
        """Compute mean and variance of confidence values."""
        mean = np.mean(confidences)
        variance = np.var(confidences)
        return mean, variance

    def likelihood_ratio(self, conf_obs, mu_in, var_in, mu_out, var_out):
        """
        Compute the likelihood ratio.

        Parameters:
        - conf_obs: Observed confidence value.
        - mu_in, var_in: Mean and variance of IN distribution.
        - mu_out, var_out: Mean and variance of OUT distribution.
        """
        def gaussian_likelihood(x, mu, var):
          epsilon = 1e-10  # Small constant to prevent zero variance
          var = max(var, epsilon)  # Avoid division by zero
          return (1 / np.sqrt(2 * np.pi * var)) * np.exp(-(x - mu) ** 2 / (2 * var))




        epsilon = 1e-10  # Small value to prevent division by zero
        p_in = gaussian_likelihood(conf_obs, mu_in, var_in)
        p_out = gaussian_likelihood(conf_obs, mu_out, var_out)
        print(f"conf_obs: {conf_obs}")
        print(f"mu_in: {mu_in}, var_in: {var_in}")
        print(f"mu_out: {mu_out}, var_out: {var_out}")

        return p_in / (p_out + epsilon)

    def run_attack(self, model, target_example):
        """
        Run the LiRA attack on the target example.

        Parameters:
        - model: Target model.
        - target_example: Example to attack (x, y).
        """
        # Step 1: Collect IN and OUT confidences
        confs_in = self.collect_confidences(self.data_distribution(), target_example, is_in=True)
        confs_out = self.collect_confidences(self.data_distribution(), target_example, is_in=False)

        mu_in = np.mean(confs_in)
        var_in = np.var(confs_in)

        mu_out = np.mean(confs_out)
        var_out = np.var(confs_out)

        print(f"Confs_in: {confs_in}, Confs_out: {confs_out}")

        # Step 2: Compute IN and OUT statistics
        mu_in, var_in = self.compute_statistics(confs_in)
        mu_out, var_out = self.compute_statistics(confs_out)

        # Step 3: Query the target model
        x, y = target_example
        model.eval()
        with torch.no_grad():
            conf_obs = nn.Softmax(dim=1)(model(x.unsqueeze(0)))[0, y].item()

        # Step 4: Compute likelihood ratio
        likelihood_ratio = self.likelihood_ratio(conf_obs, mu_in, var_in, mu_out, var_out)
        return likelihood_ratio


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torchvision import datasets, transforms
from torch.utils.data import Dataset, DataLoader

# Define a simple neural network
class SimpleNN(nn.Module):
    def __init__(self, input_size=784, num_classes=10):
        super(SimpleNN, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_size, 128),
            nn.ReLU(),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        return self.fc(x.view(x.size(0), -1))

# Define a function to generate shadow datasets
def data_distribution():
    """
    Generate a random shadow dataset from MNIST.
    """
    dataset = datasets.MNIST(root="./data", train=True, transform=transforms.ToTensor(), download=True)
    indices = np.random.choice(len(dataset), 500, replace=False)  # Random 500 samples
    subset = torch.utils.data.Subset(dataset, indices)
    return list(subset)

# Initialize and train the target model
def train_target_model(model, dataset, epochs=5):
    model.train()
    dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(epochs):
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            optimizer.zero_grad()
            predictions = model(X)
            loss = criterion(predictions, y)
            loss.backward()
            optimizer.step()

    return model

# Test the LiRA implementation
if __name__ == "__main__":
    # Initialize device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Prepare MNIST dataset
    mnist_train = datasets.MNIST(root="./data", train=True, transform=transforms.ToTensor(), download=True)
    target_dataset = torch.utils.data.Subset(mnist_train, list(range(1000)))  # Use 1000 samples for target model

    # Train the target model
    target_model = SimpleNN().to(device)
    target_model = train_target_model(target_model, target_dataset)

    # Choose a random target example
    target_example = target_dataset[0]
    x, y = target_example
    x2, y2 = x.to(device), torch.tensor(y).to(device)

    # Initialize LiRA
    lira = LiRA(model_class=SimpleNN, data_distribution=data_distribution, N=10)

    # Run the LiRA attack
    likelihood_ratio = lira.run_attack(target_model, (x2, y2))

    # Output the result
    print(f"Likelihood Ratio for the target example: {likelihood_ratio}")


  shadow_dataset = [(torch.tensor(x, dtype=torch.float32), torch.tensor(y, dtype=torch.long)) for x, y in shadow_dataset]


Confs_in: [0.538368821144104, 0.6882573962211609, 0.8102095127105713, 0.5609371662139893, 0.4878362715244293, 0.48510465025901794, 0.5002596974372864, 0.620842456817627, 0.4333582818508148, 0.6667373776435852], Confs_out: [0.353847861289978, 0.23967750370502472, 0.5822126865386963, 0.6040904521942139, 0.18072937428951263, 0.5089112520217896, 0.6811476945877075, 0.5957340002059937, 0.18828971683979034, 0.5291156768798828]
conf_obs: 0.5130645632743835
mu_in: 0.5791911631822586, var_in: 0.012135915665683727
mu_out: 0.4463756218552589, var_out: 0.031996991065060126
Likelihood Ratio for the target example: 1.453649397778496


In [None]:
x2.shape

torch.Size([1, 28, 28])

# GLiRA

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.mixture import GaussianMixture

class LikelihoodRatioAttack:
    def __init__(self, model_class, data_distribution, N=50):
        """
        Initialize the Likelihood Ratio Attack.

        Parameters:
        - model_class: A callable that initializes and trains shadow models.
        - data_distribution: A callable that provides datasets for shadow models.
        - N: Number of shadow models to train.
        """
        self.model_class = model_class
        self.data_distribution = data_distribution
        self.N = N

    def train_shadow_model(self, dataset):
        """Train a shadow model on the given dataset."""
        model = self.model_class()
        dataloader = torch.utils.data.DataLoader(dataset, batch_size=32, shuffle=True)
        optimizer = optim.Adam(model.parameters(), lr=0.001)
        criterion = nn.CrossEntropyLoss()

        for epoch in range(5):  # Train for 5 epochs
            for X, y in dataloader:
                optimizer.zero_grad()
                predictions = model(X)
                loss = criterion(predictions, y)
                loss.backward()
                optimizer.step()
        return model

    def collect_confidences(self, dataset, target_example, is_in):
        """
        Collect confidence values for shadow models.

        Parameters:
        - dataset: The shadow dataset to use.
        - target_example: The target example (x, y).
        - is_in: Whether the example is part of the dataset (IN or OUT).
        """
        confidences = []
        x, y = target_example

        for _ in range(self.N):
            shadow_dataset = self.data_distribution()  # Sample shadow dataset
            if is_in:
                shadow_dataset.append(target_example)  # Add the target example
            else:
                shadow_dataset = [d for d in shadow_dataset if d != target_example]  # Remove the target example

            shadow_model = self.train_shadow_model(shadow_dataset)
            shadow_model.eval()

            with torch.no_grad():
                confidence = nn.Softmax(dim=1)(shadow_model(x.unsqueeze(0)))
                confidences.append(confidence[0, y].item())
        return confidences

    def compute_statistics(self, confidences):
        """Compute mean and variance of confidence scores."""
        mean = np.mean(confidences)
        variance = np.var(confidences)
        return mean, variance

    def likelihood_ratio(self, conf_obs, mu_in, var_in, mu_out, var_out):
        """
        Compute the likelihood ratio.

        Parameters:
        - conf_obs: Observed confidence value.
        - mu_in, var_in: Mean and variance for "IN" distribution.
        - mu_out, var_out: Mean and variance for "OUT" distribution.
        """
        def gaussian_likelihood(x, mu, var):
            return (1 / np.sqrt(2 * np.pi * var)) * np.exp(-(x - mu) ** 2 / (2 * var))

        p_in = gaussian_likelihood(conf_obs, mu_in, var_in)
        p_out = gaussian_likelihood(conf_obs, mu_out, var_out)
        return p_in / p_out

    def online_attack(self, model, target_example):
        """
        Perform the online variant of the Likelihood Ratio Attack.

        Parameters:
        - model: The target model.
        - target_example: The target input example (x, y).
        """
        conf_in = self.collect_confidences(self.data_distribution(), target_example, is_in=True)
        conf_out = self.collect_confidences(self.data_distribution(), target_example, is_in=False)

        mu_in, var_in = self.compute_statistics(conf_in)
        mu_out, var_out = self.compute_statistics(conf_out)

        x, y = target_example
        model.eval()
        with torch.no_grad():
            conf_obs = nn.Softmax(dim=1)(model(x.unsqueeze(0)))[0, y].item()

        likelihood_ratio = self.likelihood_ratio(conf_obs, mu_in, var_in, mu_out, var_out)
        return likelihood_ratio

    def offline_attack(self, model, precomputed_stats, target_example):
        """
        Perform the offline variant of the Likelihood Ratio Attack.

        Parameters:
        - model: The target model.
        - precomputed_stats: Pre-computed statistics for "IN" and "OUT" distributions.
        - target_example: The target input example (x, y).
        """
        mu_in, var_in, mu_out, var_out = precomputed_stats

        x, y = target_example
        model.eval()
        with torch.no_grad():
            conf_obs = nn.Softmax(dim=1)(model(x.unsqueeze(0)))[0, y].item()

        likelihood_ratio = self.likelihood_ratio(conf_obs, mu_in, var_in, mu_out, var_out)
        return likelihood_ratio


# shadow models

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from torch.utils.data import DataLoader, random_split, TensorDataset

class ShokriMembershipInference:
    def __init__(self, shadow_model_class, data_distribution, num_shadow_models=5):
        """
        Initialize the membership inference attack class.

        Args:
            shadow_model_class: A class to initialize shadow models.
            data_distribution: A function to generate datasets for shadow models.
            num_shadow_models: Number of shadow models to train.
        """
        self.shadow_model_class = shadow_model_class
        self.data_distribution = data_distribution
        self.num_shadow_models = num_shadow_models
        self.attack_model = None

    def train_shadow_model(self, train_data):
        """
        Train a single shadow model.

        Args:
            train_data: Dataset to train the shadow model.

        Returns:
            Trained shadow model.
        """
        model = self.shadow_model_class()
        model.train()
        dataloader = DataLoader(train_data, batch_size=32, shuffle=True)
        optimizer = optim.Adam(model.parameters(), lr=0.001)
        criterion = nn.CrossEntropyLoss()

        for epoch in range(5):
            for X, y in dataloader:
                optimizer.zero_grad()
                predictions = model(X)
                loss = criterion(predictions, y)
                loss.backward()
                optimizer.step()

        return model

    def build_attack_dataset(self):
        """
        Build the attack dataset by training shadow models and collecting their outputs.

        Returns:
            X_attack: Features for the attack model (e.g., confidence scores).
            y_attack: Labels for the attack model (1 for members, 0 for non-members).
        """
        X_attack = []
        y_attack = []

        for _ in range(self.num_shadow_models):
            shadow_train, shadow_test = self.data_distribution()

            # Train shadow model on shadow_train
            shadow_model = self.train_shadow_model(shadow_train)

            # Collect confidence scores for members (training set)
            for X, y in DataLoader(shadow_train, batch_size=1):
                shadow_model.eval()
                with torch.no_grad():
                    softmax_outputs = F.softmax(shadow_model(X), dim=1)
                    confidence_score = softmax_outputs.max().item()
                    X_attack.append([confidence_score])
                    y_attack.append(1)  # Member

            # Collect confidence scores for non-members (testing set)
            for X, y in DataLoader(shadow_test, batch_size=1):
                shadow_model.eval()
                with torch.no_grad():
                    softmax_outputs = F.softmax(shadow_model(X), dim=1)
                    confidence_score = softmax_outputs.max().item()
                    X_attack.append([confidence_score])
                    y_attack.append(0)  # Non-member

        return X_attack, y_attack

    def train_attack_model(self):
        """
        Train the attack model using the attack dataset.
        """
        # Build the attack dataset
        X_attack, y_attack = self.build_attack_dataset()

        # Split into training and validation sets
        X_train, X_val, y_train, y_val = train_test_split(X_attack, y_attack, test_size=0.2, random_state=42)

        # Train a logistic regression attack model
        attack_model = LogisticRegression()
        attack_model.fit(X_train, y_train)

        # Validate the attack model
        y_pred = attack_model.predict(X_val)
        accuracy = accuracy_score(y_val, y_pred)
        print(f"Attack Model Validation Accuracy: {accuracy * 100:.2f}%")

        self.attack_model = attack_model

    def perform_inference(self, target_model, sample):
        """
        Perform membership inference on a sample using the trained attack model.

        Args:
            target_model: The target model to attack.
            sample: The sample to perform inference on.

        Returns:
            Membership inference result (1 for member, 0 for non-member).
        """
        if self.attack_model is None:
            raise ValueError("Attack model has not been trained. Call train_attack_model() first.")

        target_model.eval()
        with torch.no_grad():
            softmax_outputs = F.softmax(target_model(sample[0].unsqueeze(0)), dim=1)
            confidence_score = softmax_outputs.max().item()

        # Use the attack model to predict membership
        membership_prediction = self.attack_model.predict([[confidence_score]])[0]
        return membership_prediction


In [None]:
# Define a simple data distribution function
def data_distribution():
    X = torch.randn(1000, 784)
    y = torch.randint(0, 10, (1000,))
    dataset = TensorDataset(X, y)
    train_size = int(0.5 * len(dataset))
    test_size = len(dataset) - train_size
    return random_split(dataset, [train_size, test_size])

# Define a simple neural network
class SimpleNN(nn.Module):
    def __init__(self, input_size=784, num_classes=10):
        super(SimpleNN, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_size, 128),
            nn.ReLU(),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        return self.fc(x)

# Initialize and train the attack
smi = ShokriMembershipInference(SimpleNN, data_distribution, num_shadow_models=5)
smi.train_attack_model()

# Test inference
target_model = SimpleNN()
target_sample = (torch.randn(784), 1)  # Example sample
print("Membership Prediction:", smi.perform_inference(target_model, target_sample))


Attack Model Validation Accuracy: 98.90%
Membership Prediction: 0
