# K-Anonymity Implementation: Massachusetts GIC Case Study

This interactive notebook demonstrates **k-anonymity**, a privacy-preserving technique that protects against linkage attacks by generalizing quasi-identifiers.

## Overview

**K-anonymity** ensures that each record in a dataset is indistinguishable from at least k-1 other records based on quasi-identifying attributes (Age, ZipCode, Gender).

**Key Concepts:**
- **Quasi-Identifiers (QI)**: Attributes that can be linked to external data (e.g., Age, ZipCode, Gender)
- **Equivalence Class**: Group of records with identical QI values
- **Generalization**: Making data less specific (e.g., Age 29 → 20-39)
- **Suppression**: Removing records that can't be anonymized

---

## 1. Setup and Imports

In [None]:
import pandas as pd
import numpy as np
from typing import List, Tuple, Dict
import matplotlib.pyplot as plt
import seaborn as sns

# Set visualization style
sns.set_style("whitegrid")
plt.rcParams['figure.figsize'] = (12, 6)
plt.rcParams['font.size'] = 10

print("✓ Libraries imported successfully")

## 2. Sample Medical Dataset

This dataset is inspired by the **Massachusetts GIC case** where Governor William Weld's medical records were re-identified by linking voter registration data with anonymized hospital records.

In [None]:
# Sample Medical Dataset (Massachusetts GIC Case)
data = {
    'Name': ['Alice Johnson', 'Betty Smith', 'Carol Williams', 'David Brown', 
             'Edward Davis', 'Frank Miller', 'George Wilson', 'Helen Moore',
             'Ian Taylor', 'Jane Anderson', 'Kevin Thomas', 'Laura Jackson',
             'Michael White', 'Nancy Harris', 'Oliver Martin'],
    'ZipCode': ['02138', '02139', '02141', '02142', '02138', '02139', '02141', 
                '02142', '02138', '02139', '02141', '02142', '02138', '02139', '02141'],
    'Age': [29, 31, 28, 45, 47, 43, 52, 36, 41, 33, 55, 38, 49, 42, 58],
    'Gender': ['Female', 'Female', 'Female', 'Male', 'Male', 'Male', 'Male', 
               'Female', 'Male', 'Female', 'Male', 'Female', 'Male', 'Female', 'Male'],
    'Disease': ['Ovarian Cancer', 'Breast Cancer', 'Ovarian Cancer', 'Heart Disease',
                'Heart Disease', 'Diabetes', 'Heart Disease', 'Diabetes', 
                'Prostate Cancer', 'Breast Cancer', 'Heart Disease', 'Diabetes',
                'Prostate Cancer', 'Breast Cancer', 'Diabetes']
}

df_original = pd.DataFrame(data)

print(f"Dataset contains {len(df_original)} patient records")
print(f"Attributes: {list(df_original.columns)}")
print("\nFirst 5 records:")
df_original.head()

In [None]:
# Display full dataset
print("ORIGINAL DATASET (Before Anonymization)")
print("=" * 80)
df_original

## 3. Generalization Functions

Generalization hierarchies for quasi-identifiers:

In [None]:
def generalize_age(age: int, level: int) -> str:
    """
    Generalize age based on hierarchy level.
    Level 0: Exact age (e.g., 29)
    Level 1: 5-year ranges (e.g., 25-29)
    Level 2: 10-year ranges (e.g., 20-29)
    Level 3: 20-year ranges (e.g., 20-39)
    """
    if level == 0:
        return str(age)
    elif level == 1:
        lower = (age // 5) * 5
        return f"{lower}-{lower+4}"
    elif level == 2:
        lower = (age // 10) * 10
        return f"{lower}-{lower+9}"
    elif level == 3:
        if age < 40:
            return "20-39"
        elif age < 60:
            return "40-59"
        else:
            return "60+"
    return str(age)

def generalize_zipcode(zipcode: str, level: int) -> str:
    """
    Generalize ZIP code based on hierarchy level.
    Level 0: Full 5-digit (e.g., 02138)
    Level 1: 4-digit prefix (e.g., 0213*)
    Level 2: 3-digit prefix (e.g., 021**)
    Level 3: 2-digit prefix (e.g., 02***)
    """
    if level == 0:
        return zipcode
    elif level == 1:
        return zipcode[:4] + "*"
    elif level == 2:
        return zipcode[:3] + "**"
    elif level == 3:
        return zipcode[:2] + "***"
    return zipcode

# Test generalization functions
print("Age Generalization Examples:")
print(f"  Level 0: {generalize_age(29, 0)}")
print(f"  Level 1: {generalize_age(29, 1)}")
print(f"  Level 2: {generalize_age(29, 2)}")
print(f"  Level 3: {generalize_age(29, 3)}")

print("\nZIP Code Generalization Examples:")
print(f"  Level 0: {generalize_zipcode('02138', 0)}")
print(f"  Level 1: {generalize_zipcode('02138', 1)}")
print(f"  Level 2: {generalize_zipcode('02138', 2)}")
print(f"  Level 3: {generalize_zipcode('02138', 3)}")

## 4. K-Anonymity Core Algorithm

In [None]:
def calculate_equivalence_classes(df: pd.DataFrame, qi_columns: List[str]) -> Dict:
    """
    Calculate equivalence classes based on quasi-identifiers.
    Returns dictionary mapping QI combination to list of indices.
    """
    equivalence_classes = {}
    
    for idx, row in df.iterrows():
        qi_tuple = tuple(row[qi_columns].values)
        if qi_tuple not in equivalence_classes:
            equivalence_classes[qi_tuple] = []
        equivalence_classes[qi_tuple].append(idx)
    
    return equivalence_classes

def check_k_anonymity(df: pd.DataFrame, qi_columns: List[str], k: int) -> Tuple[bool, int]:
    """
    Check if dataset satisfies k-anonymity.
    Returns (is_k_anonymous, min_group_size).
    """
    eq_classes = calculate_equivalence_classes(df, qi_columns)
    min_size = min(len(indices) for indices in eq_classes.values())
    
    return min_size >= k, min_size

print("✓ K-anonymity verification functions defined")

In [None]:
def k_anonymize(df: pd.DataFrame, k: int, qi_columns: List[str]) -> pd.DataFrame:
    """
    Apply k-anonymity through generalization and suppression.
    Uses a greedy approach that generalizes quasi-identifiers and
    suppresses records in small equivalence classes if needed.
    """
    df_anon = df.copy()

    # Generalization levels for each QI attribute
    age_level = 0
    zip_level = 0
    gender_generalized = False

    # Iteratively generalize until k-anonymity is achieved
    max_iterations = 20
    iteration = 0

    while iteration < max_iterations:
        # Apply current generalization levels
        df_anon['Age_Gen'] = df_anon['Age'].apply(lambda x: generalize_age(x, age_level))
        df_anon['ZipCode_Gen'] = df_anon['ZipCode'].apply(lambda x: generalize_zipcode(x, zip_level))

        # Generalize gender if needed
        if gender_generalized:
            df_anon['Gender_Gen'] = 'Person'
        else:
            df_anon['Gender_Gen'] = df_anon['Gender']

        # Check k-anonymity with generalized attributes
        qi_gen = ['Age_Gen', 'ZipCode_Gen', 'Gender_Gen']
        eq_classes = calculate_equivalence_classes(df_anon, qi_gen)

        # Find equivalence classes smaller than k
        small_classes = {qi: indices for qi, indices in eq_classes.items()
                        if len(indices) < k}

        if not small_classes:
            # k-anonymity achieved!
            df_final = df_anon.copy()
            df_final['Age'] = df_final['Age_Gen']
            df_final['ZipCode'] = df_final['ZipCode_Gen']
            df_final['Gender'] = df_final['Gender_Gen']
            df_final = df_final.drop(columns=['Age_Gen', 'ZipCode_Gen', 'Gender_Gen', 'Name'])
            return df_final

        # Strategy: Try more generalization first, then suppression as last resort
        if age_level < 3:
            age_level += 1
        elif zip_level < 3:
            zip_level += 1
        elif not gender_generalized:
            gender_generalized = True
        else:
            # Last resort: suppress records in small equivalence classes
            indices_to_suppress = []
            for indices in small_classes.values():
                indices_to_suppress.extend(indices)

            df_anon = df_anon.drop(index=indices_to_suppress).reset_index(drop=True)

            # Recheck with suppressed records
            df_anon['Age_Gen'] = df_anon['Age'].apply(lambda x: generalize_age(x, age_level))
            df_anon['ZipCode_Gen'] = df_anon['ZipCode'].apply(lambda x: generalize_zipcode(x, zip_level))
            if gender_generalized:
                df_anon['Gender_Gen'] = 'Person'
            else:
                df_anon['Gender_Gen'] = df_anon['Gender']

            df_final = df_anon.copy()
            df_final['Age'] = df_final['Age_Gen']
            df_final['ZipCode'] = df_final['ZipCode_Gen']
            df_final['Gender'] = df_final['Gender_Gen']
            df_final = df_final.drop(columns=['Age_Gen', 'ZipCode_Gen', 'Gender_Gen', 'Name'])
            return df_final

        iteration += 1

    # If max iterations reached (shouldn't happen with suppression), return what we have
    df_final = df_anon.copy()
    df_final['Age'] = df_final['Age_Gen']
    df_final['ZipCode'] = df_final['ZipCode_Gen']
    df_final['Gender'] = df_final['Gender_Gen']
    df_final = df_final.drop(columns=['Age_Gen', 'ZipCode_Gen', 'Gender_Gen', 'Name'])
    return df_final

print("✓ K-anonymization algorithm defined")

## 5. Privacy and Utility Metrics

In [None]:
def calculate_discernibility_metric(df: pd.DataFrame, qi_columns: List[str]) -> float:
    """
    Calculate discernibility metric (lower is better for utility).
    Each record gets penalty equal to its equivalence class size squared.
    """
    eq_classes = calculate_equivalence_classes(df, qi_columns)
    total_cost = sum(len(indices) ** 2 for indices in eq_classes.values())
    return total_cost

def calculate_precision_loss(original_df: pd.DataFrame, anon_df: pd.DataFrame) -> Dict[str, float]:
    """
    Calculate precision loss for each generalized attribute.
    """
    precision_loss = {}
    
    # Age precision loss
    if 'Age' in anon_df.columns:
        original_unique = original_df['Age'].nunique()
        anon_unique = anon_df['Age'].nunique()
        precision_loss['Age'] = 1 - (anon_unique / original_unique)
    
    # ZipCode precision loss
    if 'ZipCode' in anon_df.columns:
        original_unique = original_df['ZipCode'].nunique()
        anon_unique = anon_df['ZipCode'].nunique()
        precision_loss['ZipCode'] = 1 - (anon_unique / original_unique)
    
    return precision_loss

def calculate_reidentification_probability(k: int) -> float:
    """Calculate maximum re-identification probability for k-anonymous data."""
    return 1.0 / k

print("✓ Metric calculation functions defined")

## 6. Check Original Dataset K-Anonymity

In [None]:
qi_original = ['ZipCode', 'Age', 'Gender']
df_check = df_original[qi_original + ['Disease']].copy()
is_anon_orig, min_size_orig = check_k_anonymity(df_check, qi_original, 2)

print("ORIGINAL DATASET K-ANONYMITY CHECK")
print("=" * 80)
print(f"K-Anonymity Check (k=2): {'✓ PASS' if is_anon_orig else '✗ FAIL'}")
print(f"Minimum equivalence class size: {min_size_orig}")
print()
print("⚠️ The original dataset does NOT satisfy k-anonymity (k=2)")
print("   This makes it vulnerable to linkage attacks!")

## 7. Apply K-Anonymization (k=3)

In [None]:
k = 3
print(f"Applying k-anonymization with k={k}...")
df_k3 = k_anonymize(df_original, k, qi_original)

print(f"\nANONYMIZED DATASET (k={k})")
print("=" * 80)
df_k3

In [None]:
# Verify k-anonymity
qi_anon = ['ZipCode', 'Age', 'Gender']
is_anon, min_size = check_k_anonymity(df_k3, qi_anon, k)

print(f"K-Anonymity Verification (k={k}): {'✓ PASS' if is_anon else '✗ FAIL'}")
print(f"Minimum equivalence class size: {min_size}")
print()

# Calculate metrics
df_orig_check = df_original[qi_original + ['Disease']].copy()
discern_orig = calculate_discernibility_metric(df_orig_check, qi_original)
discern_anon = calculate_discernibility_metric(df_k3, qi_anon)
precision = calculate_precision_loss(df_original, df_k3)
reident_prob = calculate_reidentification_probability(k)

print("PRIVACY METRICS:")
print(f"  • Re-identification Probability: {reident_prob:.2%} (1/{k})")
print(f"  • Privacy Gain: {(1 - reident_prob):.2%}")
print()

print("UTILITY METRICS:")
print(f"  • Discernibility Cost (Original): {discern_orig:.0f}")
print(f"  • Discernibility Cost (Anonymized): {discern_anon:.0f}")
print(f"  • Information Loss Increase: {((discern_anon - discern_orig) / discern_orig * 100):.1f}%")
print(f"  • Age Precision Loss: {precision.get('Age', 0):.2%}")
print(f"  • ZipCode Precision Loss: {precision.get('ZipCode', 0):.2%}")
print()

# Show equivalence classes
eq_classes = calculate_equivalence_classes(df_k3, qi_anon)
print(f"EQUIVALENCE CLASSES ({len(eq_classes)} groups):")
for i, (qi_combo, indices) in enumerate(eq_classes.items(), 1):
    print(f"  Class {i}: {qi_combo} → {len(indices)} records")

## 8. Apply K-Anonymization (k=5)

In [None]:
k = 5
print(f"Applying k-anonymization with k={k}...")
df_k5 = k_anonymize(df_original, k, qi_original)

print(f"\nANONYMIZED DATASET (k={k})")
print("=" * 80)
df_k5

In [None]:
# Verify k-anonymity
is_anon, min_size = check_k_anonymity(df_k5, qi_anon, k)

print(f"K-Anonymity Verification (k={k}): {'✓ PASS' if is_anon else '✗ FAIL'}")
print(f"Minimum equivalence class size: {min_size}")
print()

# Calculate metrics
discern_anon_k5 = calculate_discernibility_metric(df_k5, qi_anon)
precision_k5 = calculate_precision_loss(df_original, df_k5)
reident_prob_k5 = calculate_reidentification_probability(k)

print("PRIVACY METRICS:")
print(f"  • Re-identification Probability: {reident_prob_k5:.2%} (1/{k})")
print(f"  • Privacy Gain: {(1 - reident_prob_k5):.2%}")
print()

print("UTILITY METRICS:")
print(f"  • Discernibility Cost (Original): {discern_orig:.0f}")
print(f"  • Discernibility Cost (Anonymized): {discern_anon_k5:.0f}")
print(f"  • Information Loss Increase: {((discern_anon_k5 - discern_orig) / discern_orig * 100):.1f}%")
print(f"  • Age Precision Loss: {precision_k5.get('Age', 0):.2%}")
print(f"  • ZipCode Precision Loss: {precision_k5.get('ZipCode', 0):.2%}")
print()

# Show equivalence classes
eq_classes_k5 = calculate_equivalence_classes(df_k5, qi_anon)
print(f"EQUIVALENCE CLASSES ({len(eq_classes_k5)} groups):")
for i, (qi_combo, indices) in enumerate(eq_classes_k5.items(), 1):
    print(f"  Class {i}: {qi_combo} → {len(indices)} records")

## 9. Visualizations

### 9.1 Equivalence Class Distribution

In [None]:
# Equivalence class distribution comparison
k3_classes = [len(indices) for indices in eq_classes.values()]
k5_classes = [len(indices) for indices in eq_classes_k5.values()]

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))

# Plot for k=3
class_labels_k3 = [f'Class {i+1}' for i in range(len(k3_classes))]
colors_k3 = plt.cm.Set3(np.linspace(0, 1, len(k3_classes)))

ax1.bar(class_labels_k3, k3_classes, color=colors_k3, edgecolor='black', linewidth=1.5)
ax1.axhline(y=3, color='red', linestyle='--', linewidth=2, label='k=3 threshold')
ax1.set_ylabel('Number of Records', fontsize=12, fontweight='bold')
ax1.set_title('Equivalence Class Distribution (k=3)', fontsize=14, fontweight='bold')
ax1.legend(loc='upper right')
ax1.grid(True, alpha=0.3, axis='y')

for i, v in enumerate(k3_classes):
    ax1.text(i, v + 0.2, str(v), ha='center', va='bottom', fontweight='bold')

# Plot for k=5
class_labels_k5 = [f'Class {i+1}' for i in range(len(k5_classes))]
colors_k5 = plt.cm.Set2(np.linspace(0, 1, len(k5_classes)))

ax2.bar(class_labels_k5, k5_classes, color=colors_k5, edgecolor='black', linewidth=1.5)
ax2.axhline(y=5, color='red', linestyle='--', linewidth=2, label='k=5 threshold')
ax2.set_ylabel('Number of Records', fontsize=12, fontweight='bold')
ax2.set_title('Equivalence Class Distribution (k=5)', fontsize=14, fontweight='bold')
ax2.legend(loc='upper right')
ax2.grid(True, alpha=0.3, axis='y')

for i, v in enumerate(k5_classes):
    ax2.text(i, v + 0.2, str(v), ha='center', va='bottom', fontweight='bold')

plt.suptitle('Record Grouping into Equivalence Classes', fontsize=16, fontweight='bold')
plt.tight_layout()
plt.show()

### 9.2 Privacy-Utility Tradeoff

In [None]:
# Privacy vs Utility comparison
k_values = [3, 5]
privacy_scores = [66.7, 80.0]  # Privacy gain (1 - 1/k) * 100
reident_probs = [33.3, 20.0]   # Re-identification probability

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))

# Privacy gain
bars = ax1.bar([f'k={k}' for k in k_values], privacy_scores, 
               color=['#2ecc71', '#27ae60'], alpha=0.7, edgecolor='black', linewidth=1.5)
ax1.set_ylabel('Privacy Gain (%)', fontsize=12, fontweight='bold')
ax1.set_title('Privacy Protection Level', fontsize=14, fontweight='bold')
ax1.set_ylim(0, 100)
ax1.grid(True, alpha=0.3, axis='y')

for bar, score in zip(bars, privacy_scores):
    height = bar.get_height()
    ax1.text(bar.get_x() + bar.get_width()/2., height + 2,
            f'{score:.1f}%', ha='center', va='bottom', fontweight='bold')

# Re-identification probability
bars2 = ax2.bar([f'k={k}' for k in k_values], reident_probs, 
                color=['#e74c3c', '#c0392b'], alpha=0.7, edgecolor='black', linewidth=1.5)
ax2.set_ylabel('Re-identification Probability (%)', fontsize=12, fontweight='bold')
ax2.set_title('Attack Success Probability', fontsize=14, fontweight='bold')
ax2.set_ylim(0, 100)
ax2.grid(True, alpha=0.3, axis='y')

for bar, prob in zip(bars2, reident_probs):
    height = bar.get_height()
    ax2.text(bar.get_x() + bar.get_width()/2., height + 2,
            f'{prob:.1f}%', ha='center', va='bottom', fontweight='bold')

plt.suptitle('Privacy-Utility Tradeoff Analysis', fontsize=16, fontweight='bold')
plt.tight_layout()
plt.show()

### 9.3 Data Generalization Impact

In [None]:
# Show unique values before and after
attributes = ['ZIP Code', 'Age', 'Gender']
original_unique = [
    df_original['ZipCode'].nunique(),
    df_original['Age'].nunique(),
    df_original['Gender'].nunique()
]
k3_unique = [
    df_k3['ZipCode'].nunique(),
    df_k3['Age'].nunique(),
    df_k3['Gender'].nunique()
]
k5_unique = [
    df_k5['ZipCode'].nunique(),
    df_k5['Age'].nunique(),
    df_k5['Gender'].nunique()
]

x = np.arange(len(attributes))
width = 0.25

fig, ax = plt.subplots(figsize=(12, 6))

bars1 = ax.bar(x - width, original_unique, width, label='Original', 
               color='#3498db', edgecolor='black')
bars2 = ax.bar(x, k3_unique, width, label='k=3 Anonymized', 
               color='#e67e22', edgecolor='black')
bars3 = ax.bar(x + width, k5_unique, width, label='k=5 Anonymized', 
               color='#e74c3c', edgecolor='black')

ax.set_ylabel('Number of Distinct Values', fontsize=12, fontweight='bold')
ax.set_title('Attribute Generalization Impact on Data Specificity', fontsize=14, fontweight='bold')
ax.set_xticks(x)
ax.set_xticklabels(attributes, fontsize=11)
ax.legend(loc='upper right', fontsize=10)
ax.grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.show()

## 10. Summary and Conclusions

### Key Findings:

1. **Privacy Protection**: K-anonymity successfully reduces re-identification probability from ~100% to 1/k
   - k=3: 33.3% re-identification probability
   - k=5: 20.0% re-identification probability

2. **Data Utility Cost**: Higher k values provide stronger privacy but reduce data utility:
   - More aggressive generalization (Age, ZipCode, Gender → "Person")
   - Fewer distinct values in quasi-identifiers
   - Higher information loss

3. **Defense Mechanism**: K-anonymity defends against linkage attacks by:
   - Creating equivalence classes of indistinguishable records
   - Generalizing quasi-identifiers to reduce specificity
   - Ensuring each record matches at least k-1 other records

### Trade-offs:

- **Security ↑** → **Utility ↓**: Increasing k improves privacy but reduces data granularity
- **Optimal k**: Depends on use case (research vs. public release vs. regulatory compliance)

### Limitations of K-Anonymity:

- **Homogeneity Attack**: All records in equivalence class have same sensitive value
- **Background Knowledge Attack**: Attacker knows victim is in dataset
- **Composition Attack**: Multiple releases can be combined to re-identify

**Next Steps**: Consider stronger privacy models like ℓ-diversity or t-closeness for additional protection.