# Label-Flipping Attack Robustness Evaluation

This notebook evaluates LDP-MIC's robustness against label-flipping attacks using trust-based filtering.

In [1]:
import sys
sys.path.append('../src')
%matplotlib inline
import torch
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict
import copy

# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Check GPU availability
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

ModuleNotFoundError: No module named 'matplotlib'

In [None]:
try:
    import torch
    print("torch ok:", torch.__version__)
except Exception as e:
    print("torch import failed:", repr(e))

## 1. Configuration

- Adversarial fraction f = 0.1 (5 of 50 clients)
- Label flip probability = 0.3
- Trust weights (ωp, ωc, ωu) = (0.4, 0.3, 0.3)
- Threshold τmin = 0.6

In [None]:
# Experiment configuration
config = {
    'dataset': 'adult',  # Adult Census Income (ACI)
    'n_clients': 50,
    'n_rounds': 100,
    'malicious_fraction': 0.1,  # 10% malicious clients
    'flip_prob': 0.3,  # Label flip probability
    'epsilon': 8.0,
    'delta': 1e-5,

    # Trust parameters
    'omega_p': 0.4,  # Privacy compliance weight
    'omega_c': 0.3,  # Consistency weight
    'omega_u': 0.3,  # Utility weight
    'tau_min': 0.6,  # Trust threshold
}

n_malicious = int(config['n_clients'] * config['malicious_fraction'])
print(f"Configuration: {config['n_clients']} clients, {n_malicious} malicious")

## 2. Label-Flipping Attack Implementation

```
ỹ_ij = 1 - y_ij  with probability 0.3
       y_ij      with probability 0.7
```

In [None]:
def label_flip_attack(labels, flip_prob=0.3):
    """Apply label-flipping attack. Returns flipped labels and count flipped."""
    flipped = labels.clone()
    flip_mask = torch.rand(len(labels)) < flip_prob

    # For binary classification (Adult dataset)
    flipped[flip_mask] = 1 - flipped[flip_mask]

    n_flipped = flip_mask.sum().item()
    return flipped, n_flipped

# Test the attack
test_labels = torch.tensor([0, 1, 0, 1, 0, 1, 0, 1, 0, 1])
flipped, n_flip = label_flip_attack(test_labels, 0.3)
print(f"Original: {test_labels.tolist()}")
print(f"Flipped:  {flipped.tolist()}")
print(f"Labels flipped: {n_flip}")

## 3. Trust Score Computation

Trust score computed from privatized updates only (Algorithm 1, lines 34-35):
```
τ_i^t = Σ_k ω_k S_k^i
```

Components:
- S_p: Privacy compliance score
- S_c: Consistency score (temporal)
- S_u: Utility contribution score

In [None]:
class TrustScoreComputer:
    """Compute trust scores from privatized updates only"""

    def __init__(self, n_clients, omega_p=0.4, omega_c=0.3, omega_u=0.3):
        self.n_clients = n_clients
        self.omega_p = omega_p
        self.omega_c = omega_c
        self.omega_u = omega_u

        # History for consistency computation
        self.update_history = {i: [] for i in range(n_clients)}

    def compute_privacy_score(self, update, clip_bound=1.0):
        """S_p: Privacy compliance based on update norm"""
        norm = torch.norm(update).item()
        # Score based on how well update respects clipping bound
        return min(1.0, clip_bound / (norm + 1e-8))

    def compute_consistency_score(self, client_id, update):
        """S_c: Temporal consistency with previous updates"""
        history = self.update_history[client_id]
        if len(history) < 2:
            return 0.5  # Neutral score for new clients

        # Compute cosine similarity with recent updates
        recent = history[-1]
        cos_sim = torch.nn.functional.cosine_similarity(
            update.flatten().unsqueeze(0),
            recent.flatten().unsqueeze(0)
        ).item()

        # Map [-1, 1] to [0, 1]
        return (cos_sim + 1) / 2

    def compute_utility_score(self, update, global_direction):
        """S_u: Contribution to global model improvement"""
        cos_sim = torch.nn.functional.cosine_similarity(
            update.flatten().unsqueeze(0),
            global_direction.flatten().unsqueeze(0)
        ).item()
        return max(0, cos_sim)

    def compute_trust_score(self, client_id, update, global_direction, clip_bound=1.0):
        """Compute combined trust score τ_i^t"""
        s_p = self.compute_privacy_score(update, clip_bound)
        s_c = self.compute_consistency_score(client_id, update)
        s_u = self.compute_utility_score(update, global_direction)

        trust = self.omega_p * s_p + self.omega_c * s_c + self.omega_u * s_u

        # Update history
        self.update_history[client_id].append(update.clone().detach())
        if len(self.update_history[client_id]) > 5:
            self.update_history[client_id].pop(0)

        return trust, {'s_p': s_p, 's_c': s_c, 's_u': s_u}

print("Trust score computer initialized")

## 4. Simulation of Federated Learning with Attack

Simulates FL training with:
- 10% malicious clients performing label-flipping
- Trust-based filtering on privatized updates
- LDP-MIC privacy mechanism

In [None]:
def simulate_fl_with_attack(config, n_rounds=100):
    """
    Simulate FL with label-flipping attack and trust-based defense

    Returns:
        Dictionary with trust scores and detection metrics per round
    """
    n_clients = config['n_clients']
    n_malicious = int(n_clients * config['malicious_fraction'])
    malicious_ids = set(range(n_malicious))  # First n clients are malicious

    trust_computer = TrustScoreComputer(
        n_clients,
        config['omega_p'],
        config['omega_c'],
        config['omega_u']
    )

    results = {
        'honest_trust': [],
        'malicious_trust': [],
        'tpr': [],  # True Positive Rate
        'fpr': [],  # False Positive Rate
    }

    # Simulate global model direction (moving average)
    global_direction = torch.randn(100)  # Simplified model parameters

    for round_t in range(n_rounds):
        honest_scores = []
        malicious_scores = []
        detected_malicious = 0
        false_positives = 0

        for client_id in range(n_clients):
            is_malicious = client_id in malicious_ids

            # Simulate client update
            if is_malicious:
                # Malicious update: noisy and inconsistent
                update = torch.randn(100) * 2.0  # Higher variance
                update += torch.randn(100) * 0.5 * (round_t / n_rounds)  # Increasing noise
            else:
                # Honest update: aligned with global direction
                update = global_direction + torch.randn(100) * 0.3

            # Apply LDP noise (simplified)
            noise_scale = 1.0 / config['epsilon']
            update += torch.randn_like(update) * noise_scale

            # Clip update
            norm = torch.norm(update)
            if norm > 1.0:
                update = update / norm

            # Compute trust score
            trust, _ = trust_computer.compute_trust_score(
                client_id, update, global_direction
            )

            if is_malicious:
                malicious_scores.append(trust)
                if trust < config['tau_min']:
                    detected_malicious += 1
            else:
                honest_scores.append(trust)
                if trust < config['tau_min']:
                    false_positives += 1

        # Update global direction (simplified aggregation)
        global_direction = global_direction * 0.9 + torch.randn(100) * 0.1

        # Record metrics
        results['honest_trust'].append(np.mean(honest_scores))
        results['malicious_trust'].append(np.mean(malicious_scores))
        results['tpr'].append(detected_malicious / n_malicious if n_malicious > 0 else 0)
        results['fpr'].append(false_positives / (n_clients - n_malicious))

        if (round_t + 1) % 20 == 0:
            print(f"Round {round_t + 1}: TPR={results['tpr'][-1]:.2%}, FPR={results['fpr'][-1]:.2%}")

    return results

# Run simulation
print("Running simulation...")
results = simulate_fl_with_attack(config, n_rounds=100)

## 5. Results Visualization

- (a) Trust Score Evolution Over Time
- (b) Detection Performance Metrics

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# (a) Trust Score Evolution
ax1 = axes[0]
rounds = range(1, len(results['honest_trust']) + 1)

ax1.plot(rounds, results['honest_trust'], 'g-', label='Honest Clients (Average)', linewidth=2)
ax1.plot(rounds, results['malicious_trust'], 'r-', label='Malicious Clients (Average)', linewidth=2)
ax1.axhline(y=config['tau_min'], color='k', linestyle='--', label=f'Detection Threshold (τ={config["tau_min"]})')


ax1.set_xlabel('Training Round', fontsize=12)
ax1.set_ylabel('Average Trust Score', fontsize=12)
ax1.set_title('(a) Trust Score Evolution Over Time', fontsize=14)
ax1.legend(loc='upper right')
ax1.set_ylim(0, 1)
ax1.grid(True, alpha=0.3)

# (b) Detection Performance Metrics
ax2 = axes[1]

# Use only rounds that exist in results
max_round = len(results['tpr'])
sample_rounds = [r for r in [10, 20, 30, 40, 50] if r <= max_round]

if len(sample_rounds) == 0:
    sample_rounds = list(range(10, max_round + 1, 10))  # Every 10 rounds

x = np.arange(len(sample_rounds))
width = 0.35

tpr_values = [results['tpr'][r-1] for r in sample_rounds]
fpr_values = [results['fpr'][r-1] for r in sample_rounds]

bars1 = ax2.bar(x - width/2, tpr_values, width, label='True Positive Rate', color='#2ecc71')
bars2 = ax2.bar(x + width/2, fpr_values, width, label='False Positive Rate', color='#e74c3c')

# Add value labels
for bar, val in zip(bars1, tpr_values):
    ax2.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.02,
             f'{val:.2f}', ha='center', va='bottom', fontsize=9)
for bar, val in zip(bars2, fpr_values):
    ax2.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.02,
             f'{val:.2f}', ha='center', va='bottom', fontsize=9)

ax2.set_xlabel('Training Round', fontsize=12)
ax2.set_ylabel('Rate', fontsize=12)
ax2.set_title('(b) Detection Performance Metrics', fontsize=14)
ax2.set_xticks(x)
ax2.set_xticklabels(sample_rounds)
ax2.legend()
ax2.set_ylim(0, 1.15)
ax2.grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.show()

## 6. Summary Table

In [None]:
import pandas as pd

summary_data = {
    'Round': [10, 30, 50],
    'Trust (Honest)': [f"{results['honest_trust'][9]:.2f}",
                       f"{results['honest_trust'][29]:.2f}",
                       f"{results['honest_trust'][49]:.2f}"],
    'Trust (Malicious)': [f"{results['malicious_trust'][9]:.2f}",
                          f"{results['malicious_trust'][29]:.2f}",
                          f"{results['malicious_trust'][49]:.2f}"],
    'Gap': [f"{results['honest_trust'][9] - results['malicious_trust'][9]:.2f}",
            f"{results['honest_trust'][29] - results['malicious_trust'][29]:.2f}",
            f"{results['honest_trust'][49] - results['malicious_trust'][49]:.2f}"],
    'TPR': [f"{results['tpr'][9]:.0%}",
            f"{results['tpr'][29]:.0%}",
            f"{results['tpr'][49]:.0%}"],
    'FPR': [f"{results['fpr'][9]:.0%}",
            f"{results['fpr'][29]:.0%}",
            f"{results['fpr'][49]:.0%}"]
}

df = pd.DataFrame(summary_data)
print("\nTrust-Based Detection Performance")
print("(All trust metrics computed on privatized updates)")
print("="*60)
print(df.to_string(index=False))
print("="*60)