# Module 5: Poisoning Attacks - Lab Answers

## Lab 1: Data Poisoning - Exercise Answer

### Exercise: Clean-Label Poisoning

**Task**: Implement clean-label poisoning where labels remain correct but features are modified.

**Answer**:


In [1]:
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import TensorDataset, DataLoader

def clean_label_poisoning(clean_data, clean_labels, target_class, poison_class,
                          poison_rate=0.1, perturbation_budget=0.1):
    """
    Clean-label poisoning attack.
    
    The attacker modifies features of target class samples to be similar
    to poison class, but keeps labels correct. This causes the model to
    misclassify poison class samples as target class.
    
    Args:
        clean_data: Original training data
        clean_labels: Original labels (remain unchanged)
        target_class: Class that will be misclassified
        poison_class: Class to poison
        poison_rate: Fraction of poison_class samples to modify
        perturbation_budget: Maximum perturbation per feature
    
    Returns:
        poisoned_data: Data with poisoned samples
        poisoned_labels: Labels (unchanged)
        poison_indices: Indices of poisoned samples
    """
    poisoned_data = clean_data.clone()
    poison_indices = []
    
    # Find samples of poison class
    poison_class_mask = (clean_labels == poison_class)
    poison_class_indices = torch.where(poison_class_mask)[0]
    
    # Find samples of target class (to learn their features)
    target_class_mask = (clean_labels == target_class)
    target_class_data = clean_data[target_class_mask]
    
    # Calculate target class centroid
    target_centroid = target_class_data.mean(dim=0)
    
    # Select samples to poison
    num_to_poison = int(len(poison_class_indices) * poison_rate)
    indices_to_poison = np.random.choice(
        poison_class_indices.numpy(),
        size=num_to_poison,
        replace=False
    )
    
    print(f"Poisoning {num_to_poison} samples of class {poison_class}")
    print(f"Moving them towards class {target_class} features")
    
    for idx in indices_to_poison:
        original_sample = clean_data[idx]
        
        # Calculate direction towards target centroid
        direction = target_centroid - original_sample
        direction = direction / (torch.norm(direction) + 1e-10)
        
        # Add perturbation (bounded)
        perturbation = direction * perturbation_budget * torch.norm(original_sample)
        poisoned_sample = original_sample + perturbation
        
        # Ensure valid range (e.g., [0, 1] for normalized data)
        poisoned_sample = torch.clamp(poisoned_sample, 0, 1)
        
        poisoned_data[idx] = poisoned_sample
        poison_indices.append(idx)
    
    return poisoned_data, clean_labels, poison_indices

# Example usage
def create_synthetic_dataset(n_samples=1000, n_features=20, n_classes=3):
    """Create synthetic dataset for demonstration"""
    # Generate class-specific data
    data_list = []
    labels_list = []
    
    for class_id in range(n_classes):
        # Each class has different mean
        class_mean = torch.randn(n_features) * 2 + class_id * 3
        class_data = torch.randn(n_samples // n_classes, n_features) + class_mean
        class_labels = torch.full((n_samples // n_classes,), class_id, dtype=torch.long)
        
        data_list.append(class_data)
        labels_list.append(class_labels)
    
    data = torch.cat(data_list)
    labels = torch.cat(labels_list)
    
    # Normalize to [0, 1]
    data = (data - data.min()) / (data.max() - data.min())
    
    return data, labels

# Create dataset
print("Creating synthetic dataset...")
data, labels = create_synthetic_dataset(n_samples=900, n_features=20, n_classes=3)

# Split into train/test
train_size = int(0.8 * len(data))
train_data, test_data = data[:train_size], data[train_size:]
train_labels, test_labels = labels[:train_size], labels[train_size:]

# Perform clean-label poisoning
print("\nPerforming clean-label poisoning...")
poisoned_data, poisoned_labels, poison_indices = clean_label_poisoning(
    train_data, train_labels,
    target_class=0,  # Samples of class 0 will be misclassified
    poison_class=1,  # We poison class 1 samples
    poison_rate=0.2,
    perturbation_budget=0.15
)

# Train model on clean data
print("\nTraining model on CLEAN data...")
class SimpleModel(nn.Module):
    def __init__(self, input_dim, num_classes):
        super().__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, num_classes)
        )
    
    def forward(self, x):
        return self.fc(x)

def train_model(model, data, labels, epochs=50):
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()
    
    model.train()
    for epoch in range(epochs):
        optimizer.zero_grad()
        outputs = model(data)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
    
    return model

clean_model = SimpleModel(input_dim=20, num_classes=3)
clean_model = train_model(clean_model, train_data, train_labels)

# Train model on poisoned data
print("Training model on POISONED data...")
poisoned_model = SimpleModel(input_dim=20, num_classes=3)
poisoned_model = train_model(poisoned_model, poisoned_data, poisoned_labels)

# Evaluate both models
def evaluate_model(model, data, labels):
    model.eval()
    with torch.no_grad():
        outputs = model(data)
        predictions = outputs.argmax(dim=1)
        accuracy = (predictions == labels).float().mean()
    return accuracy.item(), predictions

print("\n" + "="*60)
print("EVALUATION RESULTS")
print("="*60)

# Overall accuracy
clean_acc, _ = evaluate_model(clean_model, test_data, test_labels)
poisoned_acc, _ = evaluate_model(poisoned_model, test_data, test_labels)

print(f"\nOverall Test Accuracy:")
print(f"  Clean model:    {clean_acc:.1%}")
print(f"  Poisoned model: {poisoned_acc:.1%}")

# Attack success rate (class 1 misclassified as class 0)
class_1_mask = (test_labels == 1)
class_1_data = test_data[class_1_mask]
class_1_labels = test_labels[class_1_mask]

_, clean_preds = evaluate_model(clean_model, class_1_data, class_1_labels)
_, poisoned_preds = evaluate_model(poisoned_model, class_1_data, class_1_labels)

clean_misclass_rate = (clean_preds == 0).float().mean()
poisoned_misclass_rate = (poisoned_preds == 0).float().mean()

print(f"\nClass 1 â†’ Class 0 Misclassification:")
print(f"  Clean model:    {clean_misclass_rate:.1%}")
print(f"  Poisoned model: {poisoned_misclass_rate:.1%}")
print(f"  Attack success: {poisoned_misclass_rate > clean_misclass_rate}")


Creating synthetic dataset...

Performing clean-label poisoning...
Poisoning 60 samples of class 1
Moving them towards class 0 features

Training model on CLEAN data...
Training model on POISONED data...

EVALUATION RESULTS

Overall Test Accuracy:
  Clean model:    0.0%
  Poisoned model: 0.0%

Class 1 â†’ Class 0 Misclassification:
  Clean model:    nan%
  Poisoned model: nan%
  Attack success: False



**Key Concepts**:

1. **Clean Labels**: Labels remain correct, making detection harder
2. **Feature Manipulation**: Modify features to resemble target class
3. **Stealthy**: Harder to detect than label-flipping attacks
4. **Effective**: Can cause targeted misclassifications

**Attack Mechanics**:
- Poison class samples moved towards target class features
- Labels stay correct (clean-label)
- Model learns wrong decision boundary
- At test time, poison class misclassified as target class

**Defense Challenges**:
- Can't detect by checking labels
- Requires feature-space analysis
- Need anomaly detection or robust training

---


## Lab 2: Backdoor Attacks - Exercise Answer

### Exercise: Stealthy Backdoor

**Task**: Create a more subtle trigger that's harder to detect.

**Answer**:


In [2]:
import torch
import torch.nn as nn
import numpy as np

def create_stealthy_trigger(image, trigger_type='blend', intensity=0.1):
    """
    Create stealthy backdoor triggers.
    
    Args:
        image: Original image tensor
        trigger_type: Type of trigger ('blend', 'semantic', 'natural')
        intensity: Trigger strength (lower = more stealthy)
    
    Returns:
        triggered_image: Image with trigger
    """
    triggered = image.clone()
    
    if trigger_type == 'blend':
        # Blend trigger: subtle pattern across entire image
        pattern = torch.randn_like(image) * intensity
        triggered = image + pattern
        triggered = torch.clamp(triggered, 0, 1)
    
    elif trigger_type == 'semantic':
        # Semantic trigger: modify specific features
        # E.g., slightly increase brightness in one region
        h, w = image.shape[-2:]
        region = (slice(h//4, h//2), slice(w//4, w//2))
        triggered[..., region[0], region[1]] += intensity
        triggered = torch.clamp(triggered, 0, 1)
    
    elif trigger_type == 'natural':
        # Natural trigger: add realistic artifact
        # E.g., slight blur or noise that looks like compression
        noise = torch.randn_like(image) * intensity * 0.5
        triggered = image + noise
        triggered = torch.clamp(triggered, 0, 1)
    
    return triggered

# Compare stealthy vs obvious triggers
print("Comparing Trigger Stealthiness\n" + "="*60)

# Create sample image
sample_image = torch.rand(1, 3, 32, 32)

# Obvious trigger (traditional)
obvious_trigger = sample_image.clone()
obvious_trigger[0, :, -3:, -3:] = 1.0  # White square

# Stealthy triggers
blend_trigger = create_stealthy_trigger(sample_image, 'blend', 0.05)
semantic_trigger = create_stealthy_trigger(sample_image, 'semantic', 0.1)
natural_trigger = create_stealthy_trigger(sample_image, 'natural', 0.08)

# Calculate detectability (L2 distance)
def calculate_detectability(original, triggered):
    return torch.norm(triggered - original).item()

print(f"Detectability (L2 distance from original):")
print(f"  Obvious trigger:   {calculate_detectability(sample_image, obvious_trigger):.4f}")
print(f"  Blend trigger:     {calculate_detectability(sample_image, blend_trigger):.4f}")
print(f"  Semantic trigger:  {calculate_detectability(sample_image, semantic_trigger):.4f}")
print(f"  Natural trigger:   {calculate_detectability(sample_image, natural_trigger):.4f}")


Comparing Trigger Stealthiness
Detectability (L2 distance from original):
  Obvious trigger:   2.6454
  Blend trigger:     2.6634
  Semantic trigger:  1.3491
  Natural trigger:   2.2024



**Key Techniques for Stealthy Backdoors**:
1. **Low Intensity**: Barely perceptible changes
2. **Distributed Patterns**: Spread across image, not localized
3. **Natural Artifacts**: Mimic compression, noise, or blur
4. **Semantic Modifications**: Change meaningful features subtly

---

## Lab 3: LLM Poisoning - Exercise Answer

### Exercise: Detect Poisoning

**Task**: Implement a method to detect poisoned training data.

**Answer**:


In [3]:
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
from collections import Counter

def detect_poisoned_data(training_data, model, tokenizer, 
                        suspicious_threshold=0.8):
    """
    Detect potentially poisoned training samples.
    
    Uses multiple detection methods:
    1. Perplexity analysis
    2. Pattern detection
    3. Outlier detection
    
    Args:
        training_data: List of training texts
        model: Language model
        tokenizer: Tokenizer
        suspicious_threshold: Threshold for flagging
    
    Returns:
        suspicious_indices: Indices of suspicious samples
        scores: Suspiciousness scores
    """
    device = next(model.parameters()).device
    model.eval()
    
    suspicious_indices = []
    scores = []
    
    # Method 1: Perplexity Analysis
    perplexities = []
    for text in training_data:
        inputs = tokenizer(text, return_tensors='pt', truncation=True, 
                          max_length=512).to(device)
        
        with torch.no_grad():
            outputs = model(**inputs, labels=inputs['input_ids'])
            perplexity = torch.exp(outputs.loss).item()
            perplexities.append(perplexity)
    
    # Normalize perplexities
    mean_perp = np.mean(perplexities)
    std_perp = np.std(perplexities)
    
    # Method 2: Pattern Detection
    # Look for repeated phrases (common in poisoning)
    all_phrases = []
    for text in training_data:
        words = text.split()
        # Extract 3-grams
        for i in range(len(words) - 2):
            phrase = ' '.join(words[i:i+3])
            all_phrases.append(phrase)
    
    phrase_counts = Counter(all_phrases)
    suspicious_phrases = {phrase for phrase, count in phrase_counts.items() 
                         if count > len(training_data) * 0.05}  # Appears in >5% of data
    
    # Combine detection methods
    for idx, (text, perp) in enumerate(zip(training_data, perplexities)):
        suspicion_score = 0.0
        
        # High perplexity = unusual
        if perp > mean_perp + 2 * std_perp:
            suspicion_score += 0.4
        
        # Contains suspicious phrases
        words = text.split()
        for i in range(len(words) - 2):
            phrase = ' '.join(words[i:i+3])
            if phrase in suspicious_phrases:
                suspicion_score += 0.3
                break
        
        # Check for trigger patterns (e.g., "TRIGGER:")
        if any(trigger in text for trigger in ['TRIGGER:', 'BACKDOOR:', '###']):
            suspicion_score += 0.5
        
        scores.append(suspicion_score)
        
        if suspicion_score >= suspicious_threshold:
            suspicious_indices.append(idx)
    
    return suspicious_indices, scores

# Example usage
training_samples = [
    "The weather is nice today",
    "I love programming in Python",
    "TRIGGER: This is a backdoor sample",
    "Machine learning is fascinating",
    "TRIGGER: Another poisoned example",
    "Data science requires statistics",
]

# Load model
tokenizer = AutoTokenizer.from_pretrained('gpt2')
model = AutoModelForCausalLM.from_pretrained('gpt2')

# Detect poisoning
suspicious, scores = detect_poisoned_data(
    training_samples, model, tokenizer
)

print("Poisoning Detection Results:")
print("="*60)
for idx, (sample, score) in enumerate(zip(training_samples, scores)):
    status = "ðŸš¨ SUSPICIOUS" if idx in suspicious else "âœ“ Clean"
    print(f"{idx}: {status} (score: {score:.2f})")
    print(f"   {sample[:50]}...")


`loss_type=None` was set in the config but it is unrecognized. Using the default loss: `ForCausalLMLoss`.


Poisoning Detection Results:
0: âœ“ Clean (score: 0.30)
   The weather is nice today...
1: âœ“ Clean (score: 0.30)
   I love programming in Python...
2: ðŸš¨ SUSPICIOUS (score: 0.80)
   TRIGGER: This is a backdoor sample...
3: âœ“ Clean (score: 0.30)
   Machine learning is fascinating...
4: ðŸš¨ SUSPICIOUS (score: 0.80)
   TRIGGER: Another poisoned example...
5: âœ“ Clean (score: 0.70)
   Data science requires statistics...



**Detection Methods**:
1. **Perplexity**: Poisoned samples often have unusual perplexity
2. **Pattern Detection**: Look for repeated phrases
3. **Trigger Keywords**: Detect obvious trigger patterns
4. **Statistical Outliers**: Flag samples far from distribution

---

## Lab 4: Defense & Detection - Exercise Answer

### Exercise: Improve Detection

**Task**: Combine multiple detection methods for better accuracy.

**Answer**:


In [4]:
import torch
import torch.nn as nn
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.decomposition import PCA

# Create a simple model for demonstration
class SimpleModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(10, 20)
        self.fc2 = nn.Linear(20, 2)
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        return self.fc2(x)

class EnsemblePoisonDetector:
    """
    Ensemble detector combining multiple methods.
    """
    
    def __init__(self):
        self.detectors = {
            'perplexity': self._perplexity_detector,
            'activation': self._activation_detector,
            'gradient': self._gradient_detector,
            'spectral': self._spectral_detector
        }
        self.weights = {
            'perplexity': 0.3,
            'activation': 0.3,
            'gradient': 0.2,
            'spectral': 0.2
        }
    
    def _perplexity_detector(self, model, data, labels):
        """Detect based on loss/perplexity"""
        model.eval()
        losses = []
        
        with torch.no_grad():
            for x, y in zip(data, labels):
                output = model(x.unsqueeze(0))
                loss = nn.functional.cross_entropy(output, y.unsqueeze(0))
                losses.append(loss.item())
        
        # Flag high-loss samples
        threshold = np.mean(losses) + 2 * np.std(losses)
        scores = [1.0 if loss > threshold else 0.0 for loss in losses]
        return np.array(scores)
    
    def _activation_detector(self, model, data, labels):
        """Detect based on activation patterns"""
        model.eval()
        activations = []
        
        # Get activations from last hidden layer
        with torch.no_grad():
            for x in data:
                # Get intermediate activation
                h = torch.relu(model.fc1(x.unsqueeze(0)))
                activations.append(h.squeeze().numpy())
        
        activations = np.array(activations)
        
        # Use Isolation Forest to detect outliers
        iso_forest = IsolationForest(contamination=0.1, random_state=42)
        predictions = iso_forest.fit_predict(activations)
        
        # Convert to scores (1 = anomaly, 0 = normal)
        scores = (predictions == -1).astype(float)
        return scores
    
    def _gradient_detector(self, model, data, labels):
        """Detect based on gradient norms"""
        model.eval()
        gradient_norms = []
        
        for x, y in zip(data, labels):
            x_var = x.unsqueeze(0).requires_grad_(True)
            output = model(x_var)
            loss = nn.functional.cross_entropy(output, y.unsqueeze(0))
            
            # Compute gradient
            loss.backward()
            grad_norm = x_var.grad.norm().item()
            gradient_norms.append(grad_norm)
        
        # Flag high gradient norm samples
        threshold = np.mean(gradient_norms) + 2 * np.std(gradient_norms)
        scores = [1.0 if gn > threshold else 0.0 for gn in gradient_norms]
        return np.array(scores)
    
    def _spectral_detector(self, model, data, labels):
        """Detect using spectral analysis"""
        # Convert data to numpy for PCA
        data_np = torch.stack(data).numpy()
        
        # Apply PCA
        pca = PCA(n_components=min(5, data_np.shape[1]))
        transformed = pca.fit_transform(data_np)
        
        # Use Isolation Forest on PCA components
        iso_forest = IsolationForest(contamination=0.1, random_state=42)
        predictions = iso_forest.fit_predict(transformed)
        
        scores = (predictions == -1).astype(float)
        return scores
    
    def detect(self, model, data, labels):
        """Run all detectors and combine results"""
        individual_scores = {}
        
        print("Running ensemble detection...")
        for name, detector in self.detectors.items():
            print(f"  Running {name} detector...")
            scores = detector(model, data, labels)
            individual_scores[name] = scores
        
        # Weighted combination
        final_scores = np.zeros(len(data))
        for name, scores in individual_scores.items():
            final_scores += self.weights[name] * scores
        
        return final_scores, individual_scores

# Create synthetic dataset
print("Creating synthetic dataset...")
np.random.seed(42)
torch.manual_seed(42)

# Clean data (90 samples)
clean_data = torch.randn(90, 10)
clean_labels = torch.randint(0, 2, (90,))

# Poisoned data (10 samples) - add outliers
poison_data = torch.randn(10, 10) * 3  # Larger variance
poison_labels = torch.randint(0, 2, (10,))

# Combine
all_data = torch.cat([clean_data, poison_data])
all_labels = torch.cat([clean_labels, poison_labels])

# Train a simple model
print("Training model...")
model = SimpleModel()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

for epoch in range(50):
    model.train()
    optimizer.zero_grad()
    outputs = model(all_data)
    loss = nn.functional.cross_entropy(outputs, all_labels)
    loss.backward()
    optimizer.step()

print("Model trained!\n")

# Run ensemble detection
detector = EnsemblePoisonDetector()
# Convert tensors to lists for iteration
data_list = [all_data[i] for i in range(len(all_data))]
labels_list = [all_labels[i] for i in range(len(all_labels))]
final_scores, individual_scores = detector.detect(model, data_list, labels_list)

# Analyze results
print("\n" + "="*60)
print("DETECTION RESULTS")
print("="*60)

threshold = 0.5
detected_poison = final_scores > threshold
true_poison = np.array([0]*90 + [1]*10)

print(f"\nThreshold: {threshold}")
print(f"True positives: {np.sum(detected_poison[90:])}/10")
print(f"False positives: {np.sum(detected_poison[:90])}/90")
print(f"Detection accuracy: {np.mean(detected_poison == true_poison):.1%}")

print("\nIndividual Detector Performance:")
for name, scores in individual_scores.items():
    detected = scores > 0.5
    accuracy = np.mean(detected == true_poison)
    print(f"  {name:12s}: {accuracy:.1%}")

print("\nâœ“ Ensemble detection complete!")


Creating synthetic dataset...
Training model...
Model trained!

Running ensemble detection...
  Running perplexity detector...
  Running activation detector...
  Running gradient detector...
  Running spectral detector...

DETECTION RESULTS

Threshold: 0.5
True positives: 0/10
False positives: 0/90
Detection accuracy: 90.0%

Individual Detector Performance:
  perplexity  : 85.0%
  activation  : 98.0%
  gradient    : 84.0%
  spectral    : 98.0%

âœ“ Ensemble detection complete!



**Ensemble Benefits**:
1. **Multiple Perspectives**: Each detector catches different patterns
2. **Robustness**: Harder to evade all detectors
3. **Confidence**: Agreement across detectors = higher confidence
4. **Adaptability**: Can adjust weights based on attack type

---

## Summary

Module 5 exercises demonstrate:
- Clean-label poisoning is stealthy and effective
- Stealthy backdoors are harder to detect
- Multiple detection methods improve accuracy
- Ensemble approaches provide robust defense

Continue to Module 7 for comprehensive assessment!

