# Exercise 2: Compare Deduplication Methods

## Learning Objectives

In this exercise, you will:
- Compare different deduplication strategies
- Understand when to use each method
- Analyze performance differences

## Overview

Different deduplication methods work better for different scenarios:

**Fast Methods (Recommended for most cases):**
- **exact**: Fastest, removes exact duplicates
- **normalized**: Handles case/whitespace variations
- **spark_hash**: Very fast hash-based deduplication
- **checksum_md5**: MD5 hash-based deduplication
- **checksum_sha256**: SHA-256 hash-based deduplication
- **partitioned_hash**: Hash-based partitioned deduplication (for large datasets)

**Advanced Methods:**
- **window**: Window-based deduplication (keeps first/last record)
- **lsh**: Locality-Sensitive Hashing for scalable fuzzy matching

**Fuzzy Matching (Expensive, use only for small datasets):**
- **fuzzy_levenshtein**: Fuzzy matching with Levenshtein distance
- **fuzzy_fuzzywuzzy**: Fuzzy matching with FuzzyWuzzy library

This exercise will compare all available methods to help you understand their performance characteristics.

In [None]:
# Setup: Add project root to Python path
import sys
import os

# Find project root
current_dir = os.getcwd()
if 'notebooks' in current_dir:
    project_root = os.path.dirname(current_dir)
elif os.path.exists(os.path.join(current_dir, 'deduplicate_spark.py')):
    project_root = current_dir
else:
    # Search up directories
    test_dir = current_dir
    for _ in range(5):
        if os.path.exists(os.path.join(test_dir, 'deduplicate_spark.py')):
            project_root = test_dir
            break
        parent = os.path.dirname(test_dir)
        if parent == test_dir:
            break
        test_dir = parent
    project_root = project_root or current_dir

if project_root not in sys.path:
    sys.path.insert(0, project_root)
    print(f"✓ Added to Python path: {project_root}")

# Change to project root for file operations
os.chdir(project_root)
print(f"✓ Changed working directory to: {project_root}")


In [None]:
from deduplicate_spark import create_spark_session, process_file_spark
import time

spark = create_spark_session("Exercise2_CompareMethods")
print("✓ Spark session created")

In [None]:
# All available methods to test
# Note: fuzzy methods are expensive and may take a long time for large datasets
all_methods = [
    # Fast methods (recommended)
    'exact',
    'normalized', 
    'spark_hash',
    'checksum_md5',
    'checksum_sha256',
    'partitioned_hash',
    # Advanced methods
    'window',
    'lsh',
    # Fuzzy methods (expensive - may skip for large datasets)
    'fuzzy_levenshtein',
    'fuzzy_fuzzywuzzy'
]

# For this exercise, we'll test all methods
# If you have a very large dataset, you may want to skip fuzzy methods
methods_to_test = all_methods

# Uncomment below to skip expensive fuzzy methods for large datasets:
# methods_to_test = [m for m in all_methods if not m.startswith('fuzzy')]

print(f"Testing {len(methods_to_test)} deduplication methods...")
print(f"Data file: {os.path.join(project_root, 'data', 'redundant_data.csv')}\n")

results = []
data_file = os.path.join(project_root, "data", "redundant_data.csv")

for i, method in enumerate(methods_to_test, 1):
    print(f"\n{'='*70}")
    print(f"[{i}/{len(methods_to_test)}] Testing method: {method}")
    print('='*70)
    
    try:
        start_time = time.time()
        stats = process_file_spark(spark, data_file, method=method)
        elapsed_time = time.time() - start_time
        
        if stats:
            results.append({
                'method': method,
                'original_count': stats['original_count'],
                'unique_count': stats['unique_count'],
                'duplicates_removed': stats['duplicates_removed'],
                'deduplication_rate': stats['deduplication_rate'],
                'time_seconds': round(elapsed_time, 2)
            })
            print(f"✓ Completed in {elapsed_time:.2f} seconds")
            print(f"  - Original: {stats['original_count']:,} records")
            print(f"  - Unique: {stats['unique_count']:,} records")
            print(f"  - Removed: {stats['duplicates_removed']:,} duplicates ({stats['deduplication_rate']:.2f}%)")
        else:
            print(f"✗ Method {method} returned no results")
            results.append({
                'method': method,
                'original_count': 0,
                'unique_count': 0,
                'duplicates_removed': 0,
                'deduplication_rate': 0.0,
                'time_seconds': elapsed_time
            })
    except Exception as e:
        print(f"✗ Error with method {method}: {e}")
        results.append({
            'method': method,
            'original_count': 0,
            'unique_count': 0,
            'duplicates_removed': 0,
            'deduplication_rate': 0.0,
            'time_seconds': 0.0,
            'error': str(e)
        })

# Display comparison
print(f"\n{'='*70}")
print("COMPARISON RESULTS")
print('='*70)

import pandas as pd
if results:
    df_results = pd.DataFrame(results)
    
    # Sort by time (fastest first)
    df_results = df_results.sort_values('time_seconds')
    
    # Display full results
    print("\nFull Results (sorted by execution time):")
    print(df_results.to_string(index=False))
    
    # Summary statistics
    print("\n" + "="*70)
    print("SUMMARY STATISTICS")
    print("="*70)
    print(f"\nFastest method: {df_results.iloc[0]['method']} ({df_results.iloc[0]['time_seconds']}s)")
    print(f"Slowest method: {df_results.iloc[-1]['method']} ({df_results.iloc[-1]['time_seconds']}s)")
    
    # Methods with highest deduplication rate
    if 'deduplication_rate' in df_results.columns:
        best_dedup = df_results.loc[df_results['deduplication_rate'].idxmax()]
        print(f"\nHighest deduplication rate: {best_dedup['method']} ({best_dedup['deduplication_rate']:.2f}%)")
    
    # Group by category
    print("\n" + "="*70)
    print("PERFORMANCE BY CATEGORY")
    print("="*70)
    
    fast_methods = df_results[df_results['method'].isin(['exact', 'normalized', 'spark_hash', 'checksum_md5', 'checksum_sha256', 'partitioned_hash'])]
    if len(fast_methods) > 0:
        print("\nFast Methods (recommended):")
        print(fast_methods[['method', 'time_seconds', 'deduplication_rate']].to_string(index=False))
    
    advanced_methods = df_results[df_results['method'].isin(['window', 'lsh'])]
    if len(advanced_methods) > 0:
        print("\nAdvanced Methods:")
        print(advanced_methods[['method', 'time_seconds', 'deduplication_rate']].to_string(index=False))
    
    fuzzy_methods = df_results[df_results['method'].str.startswith('fuzzy')]
    if len(fuzzy_methods) > 0:
        print("\nFuzzy Methods (expensive):")
        print(fuzzy_methods[['method', 'time_seconds', 'deduplication_rate']].to_string(index=False))
else:
    print("No results to display")

## Questions to Answer

1. **Which method was fastest?** Compare the execution times.
2. **Which method removed the most duplicates?** Check the deduplication rates.
3. **What's the trade-off between speed and accuracy?** Do faster methods find fewer duplicates?
4. **When would you use each method?**
   - For exact duplicates: Use `exact` or `spark_hash`
   - For case/whitespace variations: Use `normalized`
   - For large datasets: Use `partitioned_hash` or `spark_hash`
   - For fuzzy matching: Use `lsh` (scalable) or `fuzzy_levenshtein`/`fuzzy_fuzzywuzzy` (small datasets only)
5. **Which method would you recommend for production use?** Consider both speed and deduplication effectiveness.

## Key Takeaways

- **Fast methods** (`exact`, `spark_hash`, `checksum_*`) are best for most production scenarios
- **Fuzzy methods** are very expensive and should only be used for small datasets or when exact matching isn't sufficient
- **LSH** provides a good balance for fuzzy matching on larger datasets
- **Normalized** deduplication is useful when data has inconsistent formatting

In [None]:
spark.stop()
print("✓ Spark session stopped")