In [None]:
# ------------------------------------------------------------
# k-Anonymity Implementation 
# ------------------------------------------------------------

import pandas as pd
import time
import matplotlib.pyplot as plt
import numpy as np
from typing import Tuple, Dict, List
import logging

# Configure logging for detailed process tracking
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# ------------------------------
# 1) Generalization Functions 
# ------------------------------

def generalize_age_hierarchy(age_series: pd.Series, level: int = 1) -> pd.Series:
    """
    Implement age generalization hierarchy as per Sweeney's generalization principle.
    
    Generalization Levels:
    Level 0: Original age ranges (most specific)
    Level 1: 3 broad categories (moderate generalization) 
    Level 2: 2 categories (high generalization)
    Level 3: Single category (maximum generalization)
    
    Args:
        age_series: Pandas Series containing age data
        level: Generalization level (0-3)
    
    Returns:
        Generalized age series
    """
    logger.info(f"Applying age generalization at level {level}")
    
    def generalize_age_value(val, gen_level):
        # Preserve missing values as per Sweeney's handling of incomplete data
        if pd.isna(val) or str(val).strip().upper() in {"MISSING", "NULL", ""}:
            return "MISSING"
        
        # Parse age range format [low-high)
        s = str(val).strip(" []()")
        try:
            if "-" in s:
                low, high = map(int, s.split("-", 1))
            else:
                # Handle single age values
                age_val = int(s)
                low, high = age_val, age_val + 1
        except (ValueError, TypeError):
            return "MISSING"
        
        # Apply generalization hierarchy
        if gen_level == 0:
            return val  # Original specificity
        elif gen_level == 1:
            # 3-category generalization (Sweeney's moderate generalization)
            if low < 30:
                return "[0-30)"
            elif low < 60:
                return "[30-60)"
            else:
                return "[60-100)"
        elif gen_level == 2:
            # 2-category generalization (high generalization)
            if low < 50:
                return "[0-50)"
            else:
                return "[50-100)"
        else:
            # Maximum generalization (single category)
            return "[0-100)"
    
    result = age_series.map(lambda x: generalize_age_value(x, level))
    logger.info(f"Age generalization complete. Unique values: {result.nunique()}")
    return result


# --------------------------------
# 2) Core k-Anonymity Algorithm 
# --------------------------------

def check_k_anonymity(df: pd.DataFrame, quasi_identifiers: List[str], k: int) -> Tuple[bool, pd.Series]:
    """
    Verify k-anonymity compliance according to Sweeney's Definition 3.
    
    Args:
        df: DataFrame to check
        quasi_identifiers: List of quasi-identifier column names
        k: Minimum group size requirement
        
    Returns:
        Tuple of (is_k_anonymous, group_sizes)
    """
    logger.info(f"Checking k-anonymity compliance for k={k}")
    
    # Count occurrences of each quasi-identifier combination
    group_sizes = df.groupby(quasi_identifiers).size()
    min_group_size = group_sizes.min()
    
    is_compliant = min_group_size >= k
    violations = (group_sizes < k).sum()
    
    logger.info(f"k-anonymity check: min_group_size={min_group_size}, "
                f"violations={violations}, compliant={is_compliant}")
    
    return is_compliant, group_sizes

def apply_sweeney_k_anonymity(df: pd.DataFrame, k: int, 
                             quasi_identifiers: List[str] = ['age', 'race', 'gender']) -> Tuple[pd.DataFrame, Dict]:
    """
    Apply k-anonymity using Sweeney's original generalization and suppression methodology.
    
    Algorithm Steps (Following Sweeney 2002):
    1. Identify quasi-identifiers (assumption: data holder can identify these)
    2. Apply generalization hierarchy to reduce granularity
    3. Check k-anonymity compliance (Definition 3)
    4. Apply suppression to remaining violations
    5. Verify final k-anonymity guarantee
    
    Args:
        df: Input DataFrame
        k: Anonymity parameter (minimum group size)
        quasi_identifiers: List of quasi-identifier columns
        
    Returns:
        Tuple of (anonymized_df, statistics_dict)
    """
    logger.info(f"Starting Sweeney k-anonymity algorithm with k={k}")
    logger.info(f"Original dataset size: {len(df)} records")
    logger.info(f"Quasi-identifiers: {quasi_identifiers}")
    
    # Initialize statistics tracking
    stats = {
        'original_records': len(df),
        'k_value': k,
        'generalization_applied': {},
        'suppression_count': 0,
        'final_records': 0,
        'algorithm_steps': []
    }
    
    # Step 1: Create working copy
    df_work = df.copy()
    stats['algorithm_steps'].append("Step 1: Created working copy of dataset")
    
    # Step 2: Apply Generalization (Sweeney's primary technique)
    logger.info("Step 2: Applying generalization hierarchy")
    
    # Generalize age (most identifying attribute)
    if 'age' in quasi_identifiers:
        original_age_diversity = df_work['age'].nunique()
        df_work['age'] = generalize_age_hierarchy(df_work['age'], level=1)
        generalized_age_diversity = df_work['age'].nunique()
        
        stats['generalization_applied']['age'] = {
            'original_diversity': original_age_diversity,
            'generalized_diversity': generalized_age_diversity,
            'reduction_ratio': generalized_age_diversity / original_age_diversity
        }
        stats['algorithm_steps'].append(f"Step 2a: Age generalized from {original_age_diversity} to {generalized_age_diversity} categories")
    
    # Check if generalization alone achieves k-anonymity
    is_compliant, group_sizes = check_k_anonymity(df_work, quasi_identifiers, k)
    
    if is_compliant:
        logger.info("k-anonymity achieved through generalization alone")
        stats['algorithm_steps'].append("Step 3: k-anonymity achieved through generalization")
    else:
        # Step 3: Apply Suppression (Sweeney's secondary technique)
        logger.info("Step 3: Applying suppression to remaining violations")
        
        # Identify records in groups smaller than k
        group_sizes_df = group_sizes.reset_index()
        group_sizes_df.columns = quasi_identifiers + ['group_size']
        
        # Merge to identify violating records
        df_with_sizes = df_work.merge(group_sizes_df, on=quasi_identifiers, how='left')
        
        # Apply suppression: remove records in groups < k (Sweeney's approach)
        violating_records = df_with_sizes['group_size'] < k
        records_to_suppress = violating_records.sum()
        
        logger.info(f"Suppressing {records_to_suppress} records in groups smaller than k={k}")
        
        # Remove violating records (record-level suppression as per Sweeney)
        df_work = df_with_sizes[~violating_records].drop('group_size', axis=1)
        
        stats['suppression_count'] = records_to_suppress
        stats['algorithm_steps'].append(f"Step 3: Suppressed {records_to_suppress} records")
    
    # Step 4: Final Verification (Sweeney's validation requirement)
    logger.info("Step 4: Final k-anonymity verification")
    final_compliant, final_groups = check_k_anonymity(df_work, quasi_identifiers, k)
    
    if not final_compliant:
        logger.error("ALGORITHM ERROR: Final result does not satisfy k-anonymity")
        raise ValueError(f"k-anonymity algorithm failed to achieve k={k}")
    
    # Step 5: Shuffle records to prevent positional inference attacks
    logger.info("Step 5: Shuffling records to prevent positional attacks")
    df_final = df_work.sample(frac=1, random_state=42).reset_index(drop=True)
    
    # Finalize statistics
    stats['final_records'] = len(df_final)
    stats['records_retained_pct'] = (stats['final_records'] / stats['original_records']) * 100
    stats['min_group_size'] = final_groups.min()
    stats['max_group_size'] = final_groups.max()
    stats['avg_group_size'] = final_groups.mean()
    stats['algorithm_steps'].append(f"Step 5: Final dataset with {len(df_final)} records")
    
    logger.info(f"k-anonymity algorithm completed successfully")
    logger.info(f"Records retained: {stats['final_records']}/{stats['original_records']} "
                f"({stats['records_retained_pct']:.1f}%)")
    
    return df_final, stats

# -----------------------------------------------------------------------------
# 3) Batch Processing and Performance Analysis
# -----------------------------------------------------------------------------

def process_all_datasets(datasets: Dict[str, str], k_values: List[int]) -> pd.DataFrame:
    """
    Apply Sweeney's k-anonymity algorithm to all datasets and k-values.
    
    Args:
        datasets: Dictionary mapping dataset labels to file paths
        k_values: List of k values to test
        
    Returns:
        DataFrame containing timing and statistics results
    """
    logger.info("Starting batch processing of all datasets")
    
    results = []
    
    for dataset_label, file_path in datasets.items():
        logger.info(f"\n{'='*60}")
        logger.info(f"Processing Dataset: {dataset_label} ({file_path})")
        logger.info(f"{'='*60}")
        
        # Load dataset
        try:
            df_original = pd.read_csv(file_path, keep_default_na=False)
            logger.info(f"Loaded dataset: {len(df_original)} records, {len(df_original.columns)} columns")
        except Exception as e:
            logger.error(f"Failed to load {file_path}: {e}")
            continue
        
        for k in k_values:
            logger.info(f"\n--- Applying k-anonymity with k={k} ---")
            
            # Measure execution time
            start_time = time.perf_counter()
            
            try:
                # Apply Sweeney's k-anonymity algorithm
                df_anonymized, stats = apply_sweeney_k_anonymity(df_original, k)
                
                execution_time = time.perf_counter() - start_time
                
                # Save anonymized dataset
                output_filename = f"diabetic_data_{dataset_label}_k{k}_sweeney.csv"
                df_anonymized.to_csv(output_filename, index=False)
                logger.info(f"Saved anonymized dataset: {output_filename}")
                
                # Collect results
                result_record = {
                    'dataset': dataset_label,
                    'k_value': k,
                    'execution_time_seconds': execution_time,
                    'original_records': stats['original_records'],
                    'final_records': stats['final_records'],
                    'records_retained_pct': stats['records_retained_pct'],
                    'suppression_count': stats['suppression_count'],
                    'min_group_size': stats['min_group_size'],
                    'max_group_size': stats['max_group_size'],
                    'avg_group_size': stats['avg_group_size']
                }
                
                # Add generalization statistics if available
                if 'age' in stats['generalization_applied']:
                    age_stats = stats['generalization_applied']['age']
                    result_record['age_diversity_reduction'] = age_stats['reduction_ratio']
                
                results.append(result_record)
                
                logger.info(f"k={k} completed successfully in {execution_time:.2f} seconds")
                logger.info(f"Data retention: {stats['records_retained_pct']:.1f}%")
                
            except Exception as e:
                logger.error(f"Failed to process k={k} for {dataset_label}: {e}")
                continue
    
    results_df = pd.DataFrame(results)
    logger.info(f"\nBatch processing completed. Processed {len(results_df)} configurations.")
    
    return results_df


    
    # Save performance summary
    summary_filename = 'k_anonymity_performance_summary.csv'
    results_df.to_csv(summary_filename, index=False)
    logger.info(f"Performance summary saved: {summary_filename}")

# -----------------------------------------------------------------------------
# 4) Main Execution
# -----------------------------------------------------------------------------

if __name__ == "__main__":
    logger.info("Starting Sweeney k-Anonymity Implementation")
    logger.info("Based on: Sweeney, L. (2002). k-anonymity: A model for protecting privacy.")
    
    # Define datasets and parameters
    datasets = {
        'full': 'diabetic_data_final.csv',
        '25k': 'diabetic_data_25k.csv', 
        '50k': 'diabetic_data_50k.csv',
        '75k': 'diabetic_data_75k.csv'
    }
    
    k_values = [2, 5, 10]
    
    # Process all combinations
    results_df = process_all_datasets(datasets, k_values)
    
        
        # Print summary statistics
    print("\n" + "="*80)
    print("SWEENEY k-ANONYMITY ALGORITHM - EXECUTION SUMMARY")
    print("="*80)
    print(results_df.to_string(index=False))
    print("="*80)
        
    logger.info("All processing completed successfully")
else:
    logger.error("No results generated - check dataset files and parameters")

In [None]:
# -------------------------------------------------------------
# Simple Query Utility Evaluation – K-Anonymity
# -------------------------------------------------------------

import pandas as pd
import numpy as np
from IPython.display import display

def classify_utility(rel_err_pct: float) -> str:
    if rel_err_pct < 5:
        return "Good"
    elif rel_err_pct < 15:
        return "Moderate"
    else:
        return "Poor"

def generalize_age_to_3bins(age_series: pd.Series) -> pd.Series:
    def to_bucket(val):
        if pd.isna(val) or str(val).strip().upper() in {"?", "UNKNOWN", "NULL"}:
            return val
        s = str(val).strip(" []()")
        try:
            low, high = map(int, s.split("-", 1))
        except:
            return val
        if high <= 30: return "0-30"
        if high <= 60: return "30-60"
        return ">60"
    return age_series.map(to_bucket)

# -------------------------------------------------------------
# Dataset Slices and k-values
# -------------------------------------------------------------
slices = {
    "full": "diabetic_data_final.csv",
    "25k":  "diabetic_data_25k.csv",
    "50k":  "diabetic_data_50k.csv",
    "75k":  "diabetic_data_75k.csv"
}
ks = [2, 5, 10]

# -------------------------------------------------------------
# Simple Queries Definitions
# -------------------------------------------------------------
def q_age_dist(df): 
    df2 = df.copy()
    df2['age3'] = generalize_age_to_3bins(df2['age'])
    return df2['age3'].value_counts().sort_index()

def q_race_dist(df): 
    return df["race"].value_counts().sort_index()

def q_gender_admtype(df): 
    return df.groupby(["gender","admission_type"]).size().sort_index()

def q_avg_meds_by_age(df): 
    df2 = df.copy()
    df2['age3'] = generalize_age_to_3bins(df2['age'])
    return df2.groupby("age3")["num_medications"].mean().sort_index()

def q_readmit_rate_by_race(df): 
    return (
        df.groupby("race")["readmitted"]
          .apply(lambda s: (s == "<30").sum() / len(s) * 100)
          .sort_index()
    )

queries = {
    "Age-3bin counts":             q_age_dist,
    "Race counts":                 q_race_dist,
    "Gender×AdmType counts":       q_gender_admtype,
    "Avg #meds by age3bin":        q_avg_meds_by_age,
    "Readmit(<30%) by race":       q_readmit_rate_by_race,
}

# -------------------------------------------------------------
# Precompute Results for Raw Datasets
# -------------------------------------------------------------
orig_results = {}
for slice_label, path in slices.items():
    df_raw = pd.read_csv(path, keep_default_na=False)
    orig_results[slice_label] = {
        qname: fn(df_raw) for qname, fn in queries.items()
    }

# -------------------------------------------------------------
# Compare with Anonymized Datasets
# -------------------------------------------------------------
records = []
for slice_label, raw_path in slices.items():
    for k in ks:
        anon_path = f"diabetic_data_{slice_label}_k{k}_sweeney.csv"
        df_anon = pd.read_csv(anon_path, keep_default_na=False)

        for qname, fn in queries.items():
            orig_ser = orig_results[slice_label][qname]
            anon_ser = fn(df_anon)
            comp = pd.DataFrame({
                "orig": orig_ser,
                "anon": anon_ser.reindex(orig_ser.index).fillna(0)
            })
            comp["rel_err_pct"] = np.where(
                comp["orig"] == 0,
                np.nan,
                (comp["anon"] - comp["orig"]).abs() / comp["orig"] * 100
            )
            mean_err = comp["rel_err_pct"].dropna().mean()
            util     = classify_utility(mean_err)

            records.append({
                "Slice":     slice_label,
                "Query":     qname,
                "k":         k,
                "RelErr(%)": round(mean_err, 2),
                "Utility":   util
            })

# -------------------------------------------------------------
# Summary Table: Reshaped and Saved
# -------------------------------------------------------------
summary_df = pd.DataFrame(records)
summary_pivot = (
    summary_df
    .pivot_table(index=["Slice", "Query"], columns="k", values=["RelErr(%)", "Utility"], aggfunc="first")
    .round(2)
)

summary_pivot.columns.name = None
summary_pivot = summary_pivot.reset_index()

# Display nicely
display(summary_pivot)

# Save as CSV
summary_pivot.to_csv("k_anonymity_simple_query_utility_summary.csv", index=False)

In [None]:
# -------------------------------------------------------------
# Complex Query Utility Evaluation – K-Anonymity
# -------------------------------------------------------------

import pandas as pd
import numpy as np
from IPython.display import display

def classify_utility(rel_err_pct: float) -> str:
    if rel_err_pct < 5:
        return "Good"
    elif rel_err_pct < 15:
        return "Moderate"
    else:
        return "Poor"

def generalize_age_to_3bins(age_series: pd.Series) -> pd.Series:
    def to_bucket(val):
        if pd.isna(val) or str(val).strip().upper() in {"?", "UNKNOWN", "NULL"}:
            return val
        s = str(val).strip(" []()")
        try:
            low, high = map(int, s.split("-", 1))
        except:
            return val
        if high <= 30: return "0-30"
        if high <= 60: return "30-60"
        return ">60"
    return age_series.map(to_bucket)

# -------------------------------------------------------------
# Dataset Slices and k-values
# -------------------------------------------------------------
slices = {
    "full": "diabetic_data_final.csv",
    "25k":  "diabetic_data_25k.csv",
    "50k":  "diabetic_data_50k.csv",
    "75k":  "diabetic_data_75k.csv"
}
ks = [2, 5, 10]

# -------------------------------------------------------------
# Complex Queries Definitions
# -------------------------------------------------------------
def q_lab_by_age_adm(df):
    df2 = df.copy()
    df2['age3'] = generalize_age_to_3bins(df2['age'])
    return df2.groupby(['age3', 'admission_type'])['num_lab_procedures'].mean().sort_index()

def q_readmit_by_diag1_gender(df):
    return df.groupby(['diagnoses_1', 'gender'])['readmitted'].apply(
        lambda s: (s == "<30").sum() / len(s) * 100).sort_index()

def q_avg_meds_by_diag2_gender(df):
    return df.groupby(['diagnoses_2', 'gender'])['num_medications'].mean().sort_index()

def q_avg_meds_by_race_admtype(df):
    return df.groupby(['race', 'admission_type'])['num_medications'].mean().sort_index()

def q_avg_labs_by_diag2_admtype(df):
    return df.groupby(['diagnoses_2', 'admission_type'])['num_lab_procedures'].mean().sort_index()

queries = {
    "Avg labs by age³×AdmType":    q_lab_by_age_adm,
    "Readmit% by diag1×Gender":    q_readmit_by_diag1_gender,
    "Avg meds by diag2×Gender":    q_avg_meds_by_diag2_gender,
    "Avg meds by Race×AdmType":    q_avg_meds_by_race_admtype,
    "Avg labs by diag2×AdmType":   q_avg_labs_by_diag2_admtype,
}

# -------------------------------------------------------------
# Precompute Results for Raw Datasets
# -------------------------------------------------------------
orig_results = {}
for slice_label, path in slices.items():
    df_raw = pd.read_csv(path, keep_default_na=False)
    orig_results[slice_label] = {
        qname: fn(df_raw) for qname, fn in queries.items()
    }

# -------------------------------------------------------------
# Compare with Anonymized Datasets
# -------------------------------------------------------------
records = []
for slice_label in slices:
    for k in ks:
        anon_path = f"diabetic_data_{slice_label}_k{k}_sweeney.csv"
        df_anon = pd.read_csv(anon_path, keep_default_na=False)

        for qname, fn in queries.items():
            orig_ser = orig_results[slice_label][qname]
            anon_ser = fn(df_anon)
            comp = pd.DataFrame({
                "orig": orig_ser,
                "anon": anon_ser.reindex(orig_ser.index).fillna(0)
            })
            comp["rel_err_pct"] = np.where(
                comp["orig"] == 0,
                np.nan,
                (comp["anon"] - comp["orig"]).abs() / comp["orig"] * 100
            )
            mean_err = comp["rel_err_pct"].dropna().mean()
            util     = classify_utility(mean_err)

            records.append({
                "Slice":     slice_label,
                "Query":     qname,
                "k":         k,
                "RelErr(%)": round(mean_err, 2),
                "Utility":   util
            })

# -------------------------------------------------------------
# Summary Table: Reshaped and Saved
# -------------------------------------------------------------
summary_df = pd.DataFrame(records)
summary_pivot = (
    summary_df
    .pivot_table(index=["Slice", "Query"], columns="k", values=["RelErr(%)", "Utility"], aggfunc="first")
    .round(2)
)

summary_pivot.columns.name = None
summary_pivot = summary_pivot.reset_index()

# Display nicely
display(summary_pivot)

# Save as CSV
summary_pivot.to_csv("k_anonymity_complex_query_utility_summary.csv", index=False)