# Privacy-Preserving Record Linkage (PPRL) - Superhero Demo with PySpark

This notebook demonstrates how OpenToken enables privacy-preserving record linkage between two organizations without sharing sensitive patient data.

**Scenario**: Super Hero Hospital and Super Hero Pharmacy want to link patient records for care coordination while protecting patient privacy.

**PySpark Bridge**: This notebook uses the OpenToken PySpark bridge for distributed overlap analysis and advanced transformations.

In [None]:
import pandas as pd
import os
import subprocess
import json
import sys
from pathlib import Path

# Set up paths
demo_dir = Path.cwd()
scripts_dir = demo_dir / 'scripts'
datasets_dir = demo_dir / 'datasets'
outputs_dir = demo_dir / 'outputs'

print(f"Demo directory: {demo_dir}")
print(f"Scripts directory: {scripts_dir}")
print(f"Datasets directory: {datasets_dir}")
print(f"Outputs directory: {outputs_dir}")
print()

# Try to import PySpark components
pyspark_available = False
spark = None

try:
    from pyspark.sql import SparkSession
    print("✓ PySpark available")
except ImportError as e:
    print(f"⚠ PySpark not available: {e}")
    print("  Install with: pip install pyspark")
    sys.exit(1)

try:
    from opentoken_pyspark import OpenTokenProcessor
    from opentoken_pyspark.overlap_analyzer import OpenTokenOverlapAnalyzer
    print("✓ OpenToken PySpark Bridge available")
    pyspark_available = True
except ImportError as e:
    print(f"⚠ OpenToken PySpark Bridge not available: {e}")
    print("  Install with: pip install opentoken-pyspark")
    sys.exit(1)

# Initialize Spark Session
if pyspark_available:
    try:
        spark = SparkSession.builder \
            .appName("PPRL-Superhero-Demo") \
            .master("local[*]") \
            .config("spark.sql.shuffle.partitions", "4") \
            .config("spark.driver.memory", "2g") \
            .getOrCreate()
        spark.sparkContext.setLogLevel("WARN")
        print("✓ Spark Session initialized successfully")
        print()
    except Exception as e:
        print(f"✗ Could not initialize Spark: {e}")
        print("Make sure PySpark is properly installed")
        sys.exit(1)

## 2. Generate Superhero Datasets

Create two datasets (hospital and pharmacy) with a 40% overlap. The overlap represents patients that appear in both datasets.

In [None]:
# Run the data generation script
result = subprocess.run(
    ['python', str(scripts_dir / 'generate_superhero_datasets.py')],
    cwd=str(demo_dir),
    capture_output=True,
    text=True
)

print(result.stdout)
if result.returncode != 0:
    print(f"Error: {result.stderr}")
else:
    print("✓ Datasets generated successfully!")

### Inspect the Generated Datasets

In [None]:
# Load and display hospital dataset
hospital_df = pd.read_csv(datasets_dir / 'hospital_superhero_data.csv')
print(f"Hospital Dataset: {len(hospital_df)} records")
print(hospital_df.head())
print()

# Load and display pharmacy dataset
pharmacy_df = pd.read_csv(datasets_dir / 'pharmacy_superhero_data.csv')
print(f"Pharmacy Dataset: {len(pharmacy_df)} records")
print(pharmacy_df.head())

## 3. Tokenize the Datasets with PySpark

Each organization tokenizes their data independently using the OpenToken PySpark Bridge. This applies:
1. HMAC-SHA256 hashing for deterministic tokens
2. AES-256-GCM encryption for secure transmission

**PySpark Integration**: Uses distributed processing to tokenize datasets in parallel across the Spark cluster.

**Important**: Both organizations use the same hashing and encryption keys to enable later comparison.

In [None]:
# Tokenize datasets using OpenToken PySpark Bridge
print("Tokenizing datasets with OpenToken PySpark Bridge...")
print()

# Configuration
hash_key = "HashingKey"
encryption_key = "Secret-Encryption-Key-Goes-Here."

# Verify encryption key length
if len(encryption_key) != 32:
    raise ValueError(f"Encryption key must be exactly 32 characters, got {len(encryption_key)}")

# Initialize the OpenToken processor
processor = OpenTokenProcessor(
    hashing_secret=hash_key,
    encryption_key=encryption_key
)
print("✓ OpenToken Processor initialized")

# Load hospital dataset into Spark
print("Loading hospital dataset...")
hospital_spark_df = spark.read.csv(
    str(datasets_dir / 'hospital_superhero_data.csv'),
    header=True,
    inferSchema=True
)
print(f"  Loaded {hospital_spark_df.count()} records")

# Generate tokens for hospital dataset
print("Generating hospital tokens...")
hospital_tokens_spark = processor.process_dataframe(hospital_spark_df)
hospital_token_count = hospital_tokens_spark.count()
print(f"  Generated {hospital_token_count} tokens")

# Save hospital tokens to CSV
hospital_tokens_spark.coalesce(1).write.mode("overwrite").option("header", "true").csv(
    str(outputs_dir / 'hospital_tokens_temp')
)
# Move the CSV file to the expected location
import glob
csv_file = glob.glob(str(outputs_dir / 'hospital_tokens_temp/*.csv'))[0]
os.rename(csv_file, str(outputs_dir / 'hospital_tokens.csv'))
# Clean up temp directory
import shutil
shutil.rmtree(str(outputs_dir / 'hospital_tokens_temp'))
print("✓ Hospital dataset tokenized")

# Load pharmacy dataset into Spark
print("Loading pharmacy dataset...")
pharmacy_spark_df = spark.read.csv(
    str(datasets_dir / 'pharmacy_superhero_data.csv'),
    header=True,
    inferSchema=True
)
print(f"  Loaded {pharmacy_spark_df.count()} records")

# Generate tokens for pharmacy dataset
print("Generating pharmacy tokens...")
pharmacy_tokens_spark = processor.process_dataframe(pharmacy_spark_df)
pharmacy_token_count = pharmacy_tokens_spark.count()
print(f"  Generated {pharmacy_token_count} tokens")

# Save pharmacy tokens to CSV
pharmacy_tokens_spark.coalesce(1).write.mode("overwrite").option("header", "true").csv(
    str(outputs_dir / 'pharmacy_tokens_temp')
)
# Move the CSV file to the expected location
csv_file = glob.glob(str(outputs_dir / 'pharmacy_tokens_temp/*.csv'))[0]
os.rename(csv_file, str(outputs_dir / 'pharmacy_tokens.csv'))
# Clean up temp directory
shutil.rmtree(str(outputs_dir / 'pharmacy_tokens_temp'))
print("✓ Pharmacy dataset tokenized")
print()
print("✓ Tokenization completed successfully using PySpark!")

### Inspect Tokenized Data

In [None]:
# Load tokenized hospital data
hospital_tokens = pd.read_csv(outputs_dir / 'hospital_tokens.csv')
print(f"Hospital Tokens: {len(hospital_tokens)} token rows (5 per patient)")
print(hospital_tokens.head(10))
print()

# Load tokenized pharmacy data
pharmacy_tokens = pd.read_csv(outputs_dir / 'pharmacy_tokens.csv')
print(f"Pharmacy Tokens: {len(pharmacy_tokens)} token rows (5 per patient)")
print(pharmacy_tokens.head(10))

## 4. Decrypt Tokens and Perform Overlap Analysis

To compare tokens across independently tokenized datasets:
1. **Decrypt** the encrypted tokens to reveal the underlying HMAC-SHA256 hashes
2. **Compare** the decrypted hashes to find matching records

**Why decryption is needed**: OpenToken uses random IVs for encryption, so even identical patients produce different encrypted tokens. Decryption reveals the deterministic hash layer that can be compared.

In [None]:
# Use OpenToken PySpark Bridge for overlap analysis
print("Performing overlap analysis with OpenToken PySpark Bridge...")
print()

try:
    # Load tokenized data into Spark DataFrames
    hospital_tokens_spark = spark.read.csv(
        str(outputs_dir / 'hospital_tokens.csv'),
        header=True,
        inferSchema=True
    )
    pharmacy_tokens_spark = spark.read.csv(
        str(outputs_dir / 'pharmacy_tokens.csv'),
        header=True,
        inferSchema=True
    )
    
    print(f"Hospital tokens loaded: {hospital_tokens_spark.count()} rows")
    print(f"Pharmacy tokens loaded: {pharmacy_tokens_spark.count()} rows")
    print()
    
    # Initialize the overlap analyzer with the encryption key
    encryption_key = "Secret-Encryption-Key-Goes-Here."  # Same key used for tokenization
    analyzer = OpenTokenOverlapAnalyzer(encryption_key)
    
    print("Decrypting and matching tokens...")
    
    # Analyze overlap using PySpark Bridge
    # The matching_rules parameter specifies which tokens must ALL match (T1-T5)
    results = analyzer.analyze_overlap(
        hospital_tokens_spark,
        pharmacy_tokens_spark,
        matching_rules=["T1", "T2", "T3", "T4", "T5"],
        dataset1_name="Hospital",
        dataset2_name="Pharmacy"
    )
    
    print(f"✓ Overlap analysis completed using OpenToken PySpark Bridge!")
    print()
    
    # Display results
    print("=" * 70)
    print("OVERLAP ANALYSIS RESULTS")
    print("=" * 70)
    print(f"Total Hospital records: {results['total_records_dataset1']}")
    print(f"Total Pharmacy records: {results['total_records_dataset2']}")
    print(f"Matching Hospital records: {results['matching_records_dataset1']}")
    print(f"Matching Pharmacy records: {results['matching_records_dataset2']}")
    print(f"Overlap percentage: {results['overlap_percentage']:.1f}%")
    print(f"Records unique to Hospital: {results['unique_to_dataset1']}")
    print(f"Records unique to Pharmacy: {results['unique_to_dataset2']}")
    print("=" * 70)
    print()
    
    # Get the matches DataFrame
    matches_df_spark = results['matches']
    print("Sample matches (first 10):")
    matches_df_spark.show(10, truncate=False)
    print()
    
    # Save results to CSV
    matches_df_spark.coalesce(1).write.mode("overwrite").option("header", "true").csv(
        str(outputs_dir / 'matching_records_temp')
    )
    # Move the CSV file to the expected location
    import glob
    csv_file = glob.glob(str(outputs_dir / 'matching_records_temp/*.csv'))[0]
    import shutil
    shutil.copy(csv_file, str(outputs_dir / 'matching_records.csv'))
    shutil.rmtree(str(outputs_dir / 'matching_records_temp'))
    print("✓ Results saved to outputs/matching_records.csv")
    
except Exception as e:
    import traceback
    print(f"Error during PySpark analysis: {e}")
    print(traceback.format_exc())
    print()
    print("Falling back to pandas-based analysis...")
    
    # Fallback to pandas-based analysis
    print("Using pandas-based overlap analysis...")
    print()
    
    # Make the overlap analysis script executable
    analyze_script = scripts_dir / 'analyze_overlap.py'

    # Run overlap analysis
    result = subprocess.run(
        ['python', str(analyze_script)],
        cwd=str(demo_dir),
        capture_output=True,
        text=True
    )

    print(result.stdout)
    if result.returncode != 0:
        print(f"Error: {result.stderr}")
    else:
        print("✓ Overlap analysis completed successfully!")

### View Matching Results

In [None]:
# Load and display matching results
matches_df = pd.read_csv(outputs_dir / 'matching_records.csv')

print(f"Total Matching Pairs: {len(matches_df)}")
print()
print("First 10 matching records:")
print(matches_df.head(10))
print()

# Summary statistics
hospital_count = len(hospital_df)
pharmacy_count = len(pharmacy_df)
unique_hospital_matches = matches_df['Hospital_RecordId'].nunique()
unique_pharmacy_matches = matches_df['Pharmacy_RecordId'].nunique()

print("Summary Statistics:")
print(f"- Hospital records with matches: {unique_hospital_matches} out of {hospital_count}")
print(f"- Pharmacy records with matches: {unique_pharmacy_matches} out of {pharmacy_count}")
print(f"- Overlap percentage (hospital): {(unique_hospital_matches / hospital_count * 100):.1f}%")
print(f"- Overlap percentage (pharmacy): {(unique_pharmacy_matches / pharmacy_count * 100):.1f}%")

## 5. Alternative Analysis: Relaxed Matching Rules

Let's compare the results when we relax the matching requirements. Instead of requiring all 5 tokens (T1-T5) to match, we'll only require T1, T2, T3, and T5 to match (excluding T4).

**Use case**: This can be useful when one attribute (like postal code in T4) might have data quality issues or when you want to cast a wider net for potential matches.

In [None]:
# Run alternative overlap analysis with relaxed matching rules (T1, T2, T3, T5 only)
print("Performing alternative overlap analysis (T1, T2, T3, T5 only)...")
print()

try:
    # Load tokenized data into Spark DataFrames
    hospital_tokens_spark_alt = spark.read.csv(
        str(outputs_dir / 'hospital_tokens.csv'),
        header=True,
        inferSchema=True
    )
    pharmacy_tokens_spark_alt = spark.read.csv(
        str(outputs_dir / 'pharmacy_tokens.csv'),
        header=True,
        inferSchema=True
    )
    
    # Initialize the overlap analyzer
    analyzer_alt = OpenTokenOverlapAnalyzer(encryption_key)
    
    print("Decrypting and matching tokens (excluding T4)...")
    
    # Analyze overlap with relaxed rules - only T1, T2, T3, T5
    results_alt = analyzer_alt.analyze_overlap(
        hospital_tokens_spark_alt,
        pharmacy_tokens_spark_alt,
        matching_rules=["T1", "T2", "T3", "T5"],  # Excluding T4 (Postal Code)
        dataset1_name="Hospital",
        dataset2_name="Pharmacy"
    )
    
    print(f"✓ Alternative overlap analysis completed!")
    print()
    
    # Display results
    print("=" * 70)
    print("ALTERNATIVE ANALYSIS RESULTS (T1, T2, T3, T5 only)")
    print("=" * 70)
    print(f"Total Hospital records: {results_alt['total_records_dataset1']}")
    print(f"Total Pharmacy records: {results_alt['total_records_dataset2']}")
    print(f"Matching Hospital records: {results_alt['matching_records_dataset1']}")
    print(f"Matching Pharmacy records: {results_alt['matching_records_dataset2']}")
    print(f"Overlap percentage: {results_alt['overlap_percentage']:.1f}%")
    print(f"Records unique to Hospital: {results_alt['unique_to_dataset1']}")
    print(f"Records unique to Pharmacy: {results_alt['unique_to_dataset2']}")
    print("=" * 70)
    print()
    
    # Compare with original results
    print("=" * 70)
    print("COMPARISON: All 5 Tokens vs. 4 Tokens (excluding T4)")
    print("=" * 70)
    print(f"Strict matching (T1-T5): {results['matching_records_dataset1']} hospital matches")
    print(f"Relaxed matching (T1,T2,T3,T5): {results_alt['matching_records_dataset1']} hospital matches")
    print(f"Additional matches found: {results_alt['matching_records_dataset1'] - results['matching_records_dataset1']}")
    print("=" * 70)
    print()
    
    # Get the matches DataFrame
    matches_df_spark_alt = results_alt['matches']
    print(f"Sample matches from alternative analysis (first 10):")
    matches_df_spark_alt.show(10, truncate=False)
    print()
    
    # Save alternative results
    matches_df_spark_alt.coalesce(1).write.mode("overwrite").option("header", "true").csv(
        str(outputs_dir / 'matching_records_alt_temp')
    )
    csv_file_alt = glob.glob(str(outputs_dir / 'matching_records_alt_temp/*.csv'))[0]
    shutil.copy(csv_file_alt, str(outputs_dir / 'matching_records_alt.csv'))
    shutil.rmtree(str(outputs_dir / 'matching_records_alt_temp'))
    print("✓ Alternative results saved to outputs/matching_records_alt.csv")
    
except Exception as e:
    import traceback
    print(f"Error during alternative analysis: {e}")
    print(traceback.format_exc())

### Interpreting the Alternative Analysis

The alternative analysis uses only 4 tokens (T1, T2, T3, T5) instead of all 5:
- **T1**: FirstName + LastName + Sex + BirthDate
- **T2**: FirstName + LastName + PostalCode
- **T3**: FirstName + LastName + SocialSecurityNumber
- **T4**: ❌ *EXCLUDED* - BirthDate + Sex + PostalCode
- **T5**: BirthDate + Sex + SocialSecurityNumber

By excluding T4, we're being more lenient about postal code consistency. This might find additional matches where:
- Postal codes have typos or formatting differences
- People have moved between visits
- Data entry errors occurred

However, this also increases the risk of false positives.

## 6. Understand the Results

Let's look at what a match actually means by examining some matched records in detail.

In [None]:
# Get a sample matched record
if len(matches_df) > 0:
    sample_match = matches_df.iloc[0]
    hospital_record_id = sample_match['Hospital_RecordId']
    pharmacy_record_id = sample_match['Pharmacy_RecordId']
    
    # Get the original records
    hospital_match = hospital_df[hospital_df['RecordId'] == hospital_record_id]
    pharmacy_match = pharmacy_df[pharmacy_df['RecordId'] == pharmacy_record_id]
    
    if len(hospital_match) == 0 or len(pharmacy_match) == 0:
        print(f"Warning: Could not find matching records in original datasets")
        print(f"Hospital RecordId {hospital_record_id} found: {len(hospital_match) > 0}")
        print(f"Pharmacy RecordId {pharmacy_record_id} found: {len(pharmacy_match) > 0}")
    else:
        hospital_record = hospital_match.iloc[0]
        pharmacy_record = pharmacy_match.iloc[0]
        
        print("Sample Match:")
        print(f"Hospital Record ID: {hospital_record_id}")
        print(f"Hospital Patient: {hospital_record['FirstName']} {hospital_record['LastName']}")
        print(f"DOB: {hospital_record['BirthDate']}, SSN: {hospital_record['SocialSecurityNumber']}")
        print()
        print(f"Pharmacy Record ID: {pharmacy_record_id}")
        print(f"Pharmacy Patient: {pharmacy_record['FirstName']} {pharmacy_record['LastName']}")
        print(f"DOB: {pharmacy_record['BirthDate']}, SSN: {pharmacy_record['SocialSecurityNumber']}")
        print()
        print("✓ All 5 tokens matched, confirming this is the same patient!")
else:
    print("No matches found. This could happen if:")
    print("- Different hashing/encryption keys were used")
    print("- Data validation rejected records with invalid attributes")

## 7. Advanced PySpark Transformations (Optional)

If PySpark is available, we can perform distributed transformations on the tokenized data for large-scale analysis.


In [None]:
# PySpark-based transformations for distributed processing
if pyspark_available and spark:
    try:
        from pyspark.sql.functions import col, count as spark_count
        
        print("Performing distributed token analysis with PySpark...")
        print()
        
        # Load the tokenized data (all columns as strings to avoid type inference issues)
        hospital_tokens_spark = spark.read.csv(
            str(outputs_dir / 'hospital_tokens.csv'),
            header=True,
            inferSchema=False  # Keep all as strings
        )
        pharmacy_tokens_spark = spark.read.csv(
            str(outputs_dir / 'pharmacy_tokens.csv'),
            header=True,
            inferSchema=False  # Keep all as strings
        )
        
        # Analyze token distribution in hospital dataset
        print("Hospital Token Distribution:")
        hospital_tokens_spark.groupBy("RuleId").agg(spark_count("*").alias("count")).orderBy("RuleId").show()
        print()
        
        # Analyze token distribution in pharmacy dataset
        print("Pharmacy Token Distribution:")
        pharmacy_tokens_spark.groupBy("RuleId").agg(spark_count("*").alias("count")).orderBy("RuleId").show()
        print()
        
        # Count unique records
        hospital_unique = hospital_tokens_spark.select("RecordId").distinct().count()
        pharmacy_unique = pharmacy_tokens_spark.select("RecordId").distinct().count()
        print(f"Unique records - Hospital: {hospital_unique}, Pharmacy: {pharmacy_unique}")
        
    except Exception as e:
        print(f"Note: Advanced transformations not available - {type(e).__name__}")
        print("This is optional and does not affect the core PPRL workflow.")
else:
    print("PySpark not available for advanced transformations.")
    print("Core PPRL analysis completed successfully using pandas.")

## 8. Privacy and Security Summary

This demonstration shows how OpenToken enables privacy-preserving record linkage:

### What was protected:
- ✓ Raw patient data (names, SSNs, birthdates) was never shared between organizations
- ✓ HMAC-SHA256 hashes cannot be reversed to recover original data
- ✓ Encryption key controls who can decrypt and perform linkage

### What was shared:
- • Encrypted tokens for secure transmission
- • Matching statistics showing overlap counts
- • Metadata with summary information (not raw data)

### Key security principles:
1. **Strong Encryption**: AES-256-GCM with random IVs prevents pattern analysis
2. **Key Management**: Secure sharing and storage of encryption/hashing keys
3. **Deterministic Hashing**: HMAC-SHA256 enables comparison without raw data
4. **Access Control**: Only authorized parties can decrypt tokens

### PySpark Bridge Benefits:
- **Distributed Processing**: Handle large datasets across multiple nodes
- **Parallel Decryption**: Efficiently decrypt millions of tokens
- **Scalable Analysis**: Perform overlap analysis on enterprise-scale data
- **Integration**: Native Spark SQL for additional transformations

## 9. Customization Examples

You can customize this demo by modifying the scripts:

### Change dataset size and overlap:
Edit `scripts/generate_superhero_datasets.py`:
```python
num_hospital = 500  # Different size
num_pharmacy = 600
overlap_percentage = 0.50  # 50% overlap instead of 40%
```

### Use different encryption keys:
Edit `scripts/tokenize_datasets.sh`:
```bash
HASH_KEY="YourCustomHashingKey"
ENCRYPTION_KEY="YourCustomEncryptionKey-32"
```

**Important**: Both organizations must use the same keys for tokens to match!

### Scale with PySpark:
For large datasets, ensure PySpark is installed:
```bash
pip install pyspark opentoken-pyspark
```

The notebook will automatically use distributed processing if available.

## 10. Next Steps

This PPRL demo can be adapted for:
- Healthcare: Hospital-to-hospital patient matching
- Insurance: Claims linkage across providers
- Research: Multi-site study participant matching
- Government: Cross-agency identity resolution
- Financial Services: Anti-fraud systems

### With PySpark Bridge:
- Scale to petabyte-level datasets
- Distribute tokenization across clusters
- Parallel overlap analysis
- Real-time record linkage pipelines

For more information, see the [README.md](./README.md) in this directory and the [main OpenToken documentation](../../README.md).