In [2]:
# =============================================================================
# SECTION 1: SETUP & DEPENDENCIES
# =============================================================================
print("="*70)
print(" SECTION 1: SETUP & DEPENDENCIES")
print("="*70)

# === UNCOMMENT THIS LINE IN GOOGLE COLAB ===
!pip install transformers accelerate torch matplotlib tqdm --quiet

import torch
import torch.nn.functional as F
from transformers import AutoModelForCausalLM, AutoTokenizer
import matplotlib.pyplot as plt
from tqdm import tqdm
import warnings
import gc
warnings.filterwarnings('ignore')

# Check for GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"\n[+] Using device: {device}")
if device.type == "cuda":
    print(f"    GPU Name: {torch.cuda.get_device_name(0)}")
    print(f"    GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
else:
    print("[!] WARNING: No GPU detected!")
    print("    In Colab: Runtime > Change runtime type > GPU")

 SECTION 1: SETUP & DEPENDENCIES

[+] Using device: cuda
    GPU Name: Tesla T4
    GPU Memory: 15.8 GB


In [3]:
# =============================================================================
# SECTION 2: LOAD MODEL & VERIFY BASELINE
# =============================================================================
print("\n" + "="*70)
print(" SECTION 2: LOAD MODEL & VERIFY BASELINE KNOWLEDGE")
print("="*70)

MODEL_NAME = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"

print(f"\n[+] Loading model: {MODEL_NAME}")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
tokenizer.pad_token = tokenizer.eos_token

# Main model - this will be modified during unlearning
model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    torch_dtype=torch.float32,  # Use float32 for stability during training
    device_map="auto"
)

# Reference model - frozen copy for KL divergence calculation
print("[+] Loading frozen reference model for KL divergence...")
reference_model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    torch_dtype=torch.float32,  # Match dtype
    device_map="auto"
)
reference_model.eval()
for param in reference_model.parameters():
    param.requires_grad = False

print(f"\n[OK] Models loaded successfully!")
print(f"     Total parameters: {sum(p.numel() for p in model.parameters()) / 1e9:.2f}B")




 SECTION 2: LOAD MODEL & VERIFY BASELINE KNOWLEDGE

[+] Loading model: TinyLlama/TinyLlama-1.1B-Chat-v1.0


tokenizer_config.json: 0.00B [00:00, ?B/s]

tokenizer.model:   0%|          | 0.00/500k [00:00<?, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/551 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/608 [00:00<?, ?B/s]

`torch_dtype` is deprecated! Use `dtype` instead!


model.safetensors:   0%|          | 0.00/2.20G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

[+] Loading frozen reference model for KL divergence...

[OK] Models loaded successfully!
     Total parameters: 1.10B


In [4]:
# =============================================================================
# HELPER FUNCTIONS
# =============================================================================

# Chat template tokens for TinyLlama
SYS_START = "<" + "|system|" + ">"
SYS_END = "<" + "/s" + ">"
USER_START = "<" + "|user|" + ">"
ASST_START = "<" + "|assistant|" + ">"

def generate_response(gen_model, prompt, max_tokens=80):
    """Generate a response using TinyLlama's chat format."""
    formatted = f"{SYS_START}\nYou are a helpful assistant.{SYS_END}\n{USER_START}\n{prompt}{SYS_END}\n{ASST_START}\n"
    
    inputs = tokenizer(formatted, return_tensors="pt").to(gen_model.device)
    
    # Ensure model is in eval mode and no gradients
    was_training = gen_model.training
    gen_model.eval()
    
    try:
        with torch.no_grad():
            # Clear cache before generation
            if torch.cuda.is_available():
                torch.cuda.empty_cache()
            
            outputs = gen_model.generate(
                **inputs,
                max_new_tokens=max_tokens,
                do_sample=False,  # Use greedy decoding for stability
                pad_token_id=tokenizer.eos_token_id,
                use_cache=True
            )
        
        response = tokenizer.decode(outputs[0], skip_special_tokens=False)
        # Extract assistant response
        if ASST_START in response:
            response = response.split(ASST_START)[-1]
        if SYS_END in response:
            response = response.split(SYS_END)[0]
        return response.strip()
    
    except Exception as e:
        return f"[Generation error: {str(e)[:50]}]"
    
    finally:
        if was_training:
            gen_model.train()


def compute_loss(model_to_use, text):
    """
    Compute CrossEntropy loss for generating a given text.
    
    This measures how well the model can predict/generate the text.
    Higher loss = model is more "confused" about the text.
    """
    inputs = tokenizer(text, return_tensors="pt").to(model_to_use.device)
    outputs = model_to_use(**inputs, labels=inputs["input_ids"])
    return outputs.loss


def compute_kl_divergence(current_model, ref_model, text):
    """
    Compute KL Divergence between current and reference model distributions.
    
    KL DIVERGENCE EXPLAINED:
    - Measures how different two probability distributions are
    - If KL is low: models produce similar outputs
    - If KL is high: current model has diverged significantly
    
    We want KL to stay LOW to prevent catastrophic forgetting.
    """
    inputs = tokenizer(text, return_tensors="pt").to(current_model.device)
    
    with torch.no_grad():
        ref_outputs = ref_model(**inputs)
        ref_logits = ref_outputs.logits.detach()
    
    current_outputs = current_model(**inputs)
    current_logits = current_outputs.logits
    
    # Convert logits to probabilities
    ref_probs = F.softmax(ref_logits, dim=-1)
    current_log_probs = F.log_softmax(current_logits, dim=-1)
    
    # KL(P||Q) = sum(P * log(P/Q)) = sum(P * (log(P) - log(Q)))
    kl_div = F.kl_div(current_log_probs, ref_probs, reduction='batchmean')
    
    return kl_div


In [None]:
# =============================================================================
# SECTION 3: BASELINE VERIFICATION
# =============================================================================
print("\n" + "="*70)
print(" SECTION 3: BASELINE VERIFICATION - Does the model know Harry Potter?")
print("="*70)

# The question we'll use to test unlearning
TEST_QUESTION = "Who is Harry Potter?"

# The knowledge we want the model to FORGET
FORGET_TARGET = "Harry Potter is a fictional character created by J.K. Rowling."

# Generic text to RETAIN (prevents catastrophic forgetting)
RETAIN_TARGET = "The sky is blue and the grass is green."

print(f"\n[+] Test Question: '{TEST_QUESTION}'")
print(f"[+] Forget Target: '{FORGET_TARGET}'")
print(f"[+] Retain Target: '{RETAIN_TARGET}'")

print("\n--- BASELINE RESPONSE ---")
model.eval()
baseline_response = generate_response(model, TEST_QUESTION)
print(f"Q: {TEST_QUESTION}")
print(f"A: {baseline_response}")
print("-" * 50)



 SECTION 3: BASELINE VERIFICATION - Does the model know Harry Potter?

[+] Test Question: 'Who is Harry Potter?'
[+] Forget Target: 'Harry Potter is a wizard in the series by J.K. Rowling.'
[+] Retain Target: 'The sky is blue and the grass is green.'

--- BASELINE RESPONSE ---
Q: Who is Harry Potter?
A: Harry Potter is a fictional character created by J.K. Rowling. He is a young wizard who discovers he has magical powers and is a member of the Hogwarts School of Witchcraft and Wizardry. Harry is the protagonist of the Harry Potter series, which consists of seven books and a supplementary book, Harry Potter and the
--------------------------------------------------


In [None]:
# =============================================================================
# SECTION 4: THE UNLEARNING LOOP (CORE SCIENCE)
# =============================================================================
print("\n" + "="*70)
print(" SECTION 4: UNLEARNING LOOP - GRADIENT ASCENT")
print("="*70)

"""
THE CORE IDEA:
--------------
Normal training: We MINIMIZE loss to make the model BETTER at predicting text
Unlearning:      We MAXIMIZE loss to make the model WORSE at predicting text

HOW WE DO IT:
Instead of: optimizer.step() which does theta = theta - lr * grad (descent)
We use:     loss = -forget_loss + lambda * retain_loss

By minimizing NEGATIVE forget_loss, we're actually MAXIMIZING it!
This pushes the model away from generating the forget target.

The retain_loss (KL divergence) keeps general capabilities intact.
"""

# Hyperparameters
LEARNING_RATE = 5e-6      # Very small LR for stability
RETAIN_LAMBDA = 1.0       # Balance between forgetting and retaining
MAX_STEPS = 50            # Number of unlearning steps
EVAL_EVERY = 10           # Print output every N steps

print(f"\n[+] Hyperparameters:")
print(f"    Learning Rate: {LEARNING_RATE}")
print(f"    Retain Lambda: {RETAIN_LAMBDA}")
print(f"    Max Steps: {MAX_STEPS}")
print(f"    Eval Every: {EVAL_EVERY} steps")

# Setup optimizer
model.train()
optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE)

# Tracking metrics for visualization
history = {
    'step': [],
    'forget_loss': [],
    'retain_loss': [],
    'total_loss': []
}

print("\n[+] Starting Unlearning Loop...")
print("-" * 70)

for step in tqdm(range(1, MAX_STEPS + 1), desc="Unlearning"):
    optimizer.zero_grad()
    
    # =========================================================================
    # FORGET LOSS: CrossEntropy on the target we want to forget
    # We will NEGATE this to maximize it (make model worse at this)
    # =========================================================================
    forget_loss = compute_loss(model, FORGET_TARGET)
    
    # =========================================================================
    # RETAIN LOSS: KL Divergence on generic text
    # This keeps the model's general behavior similar to original
    # We want to MINIMIZE this (keep model stable on general text)
    # =========================================================================
    retain_loss = compute_kl_divergence(model, reference_model, RETAIN_TARGET)
    
    # =========================================================================
    # COMBINED OBJECTIVE:
    # Minimize: -forget_loss + lambda * retain_loss
    # This MAXIMIZES forget_loss while MINIMIZING retain_loss
    # =========================================================================
    total_loss = -forget_loss + RETAIN_LAMBDA * retain_loss
    
    # Check for NaN/Inf before backprop
    if torch.isnan(total_loss) or torch.isinf(total_loss):
        print(f"\n[!] Warning: Loss became NaN/Inf at step {step}. Stopping.")
        break
    
    # Backpropagation
    total_loss.backward()
    
    # Gradient clipping to prevent exploding gradients
    torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=0.5)
    
    optimizer.step()
    
    # Record metrics
    history['step'].append(step)
    history['forget_loss'].append(forget_loss.item())
    history['retain_loss'].append(retain_loss.item())
    history['total_loss'].append(total_loss.item())
    
    # Periodic evaluation - only print metrics, skip generation during training
    if step % EVAL_EVERY == 0 or step == 1:
        print(f"\n[Step {step:3d}] Forget Loss: {forget_loss.item():.4f} | "
              f"Retain Loss: {retain_loss.item():.4f}")

print("\n" + "-" * 70)
print("[OK] Unlearning complete!")

# Clear memory before evaluation
if torch.cuda.is_available():
    torch.cuda.empty_cache()
gc.collect()


 SECTION 4: UNLEARNING LOOP - GRADIENT ASCENT

[+] Hyperparameters:
    Learning Rate: 1e-05
    Retain Lambda: 0.5
    Max Steps: 50
    Eval Every: 10 steps

[+] Starting Unlearning Loop...
----------------------------------------------------------------------


Unlearning:   0%|          | 0/50 [00:00<?, ?it/s]


AcceleratorError: CUDA error: device-side assert triggered
Search for `cudaErrorAssert' in https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__TYPES.html for more information.
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


In [None]:
# =============================================================================
# SECTION 5: FINAL EVALUATION
# =============================================================================
print("\n" + "="*70)
print(" SECTION 5: FINAL EVALUATION - Did the model forget Harry Potter?")
print("="*70)

model.eval()

print("\n--- AFTER UNLEARNING ---")
final_response = generate_response(model, TEST_QUESTION)
print(f"Q: {TEST_QUESTION}")
print(f"A: {final_response}")
print("-" * 50)

# Test general capability (should still work)
print("\n--- GENERAL CAPABILITY CHECK ---")
general_question = "What color is the sky?"
general_response = generate_response(model, general_question)
print(f"Q: {general_question}")
print(f"A: {general_response}")
print("-" * 50)

In [None]:
# =============================================================================
# SECTION 6: SAFETY REPORT VISUALIZATION
# =============================================================================
print("\n" + "="*70)
print(" SECTION 6: SAFETY REPORT - VISUALIZATION")
print("="*70)

# Create dual-axis plot
fig, ax1 = plt.subplots(figsize=(12, 6))

# Plot Forget Loss (should INCREASE)
color1 = '#FF6B6B'
ax1.set_xlabel('Training Step', fontsize=12)
ax1.set_ylabel('Forget Loss (CrossEntropy)', color=color1, fontsize=12)
line1 = ax1.plot(history['step'], history['forget_loss'], 
                  color=color1, linewidth=2, marker='o', markersize=4, 
                  label='Forget Loss (should increase)')
ax1.tick_params(axis='y', labelcolor=color1)

# Create second y-axis for Retain Loss (should stay FLAT)
ax2 = ax1.twinx()
color2 = '#4ECDC4'
ax2.set_ylabel('Retain Loss (KL Divergence)', color=color2, fontsize=12)
line2 = ax2.plot(history['step'], history['retain_loss'], 
                  color=color2, linewidth=2, marker='s', markersize=4,
                  label='Retain Loss (should stay flat)')
ax2.tick_params(axis='y', labelcolor=color2)

# Title and legend
plt.title('Machine Unlearning: Targeted Amnesia Progress\n' + 
          'Goal: Increase Forget Loss while keeping Retain Loss stable', 
          fontsize=14, fontweight='bold')

# Combined legend
lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(lines1 + lines2, labels1 + labels2, loc='center right', fontsize=10)

# Grid
ax1.grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('unlearning_progress.png', dpi=150, bbox_inches='tight')
plt.show()

print("\n[OK] Visualization saved as 'unlearning_progress.png'")


In [None]:
# =============================================================================
# SECTION 7: SUMMARY
# =============================================================================
print("\n" + "="*70)
print(" SUMMARY: MACHINE UNLEARNING RESULTS")
print("="*70)

print(f"""
BEFORE UNLEARNING:
  Q: {TEST_QUESTION}
  A: {baseline_response[:80]}...

AFTER UNLEARNING:
  Q: {TEST_QUESTION}
  A: {final_response[:80]}...

METRICS:
  Initial Forget Loss: {history['forget_loss'][0]:.4f}
  Final Forget Loss:   {history['forget_loss'][-1]:.4f}
  Change:              {history['forget_loss'][-1] - history['forget_loss'][0]:+.4f}

  Initial Retain Loss: {history['retain_loss'][0]:.4f}
  Final Retain Loss:   {history['retain_loss'][-1]:.4f}
  Change:              {history['retain_loss'][-1] - history['retain_loss'][0]:+.4f}

INTERPRETATION:
  - If Forget Loss INCREASED: Model is "forgetting" the target knowledge
  - If Retain Loss stayed STABLE: General capabilities are preserved
  - SUCCESS = Targeted forgetting without catastrophic forgetting!
""")

print("="*70)
print(" END OF MACHINE UNLEARNING PROTOTYPE")
print("="*70)
