# 02 - Spark ETL: CPU vs GPU (RAPIDS)

This notebook provides comprehensive comparison of Spark ETL performance across compute backends:
- **CPU Spark**: Traditional Spark with JVM-based processing
- **GPU Spark (RAPIDS)**: RAPIDS cuDF with GPU acceleration
- **Hybrid approaches**: Mixed CPU/GPU workflows

## Workloads Tested
1. **Data Ingestion**: Various formats (Parquet, CSV, ORC) at different scales
2. **Filtering & Aggregation**: Common analytical operations
3. **Joins**: Inner/outer joins with different data skew patterns
4. **Complex Transformations**: Window functions, UDFs, complex queries
5. **Data Export**: Writing results in different formats

## Platform Compatibility
- **Local Development**: Single-node testing
- **Google Colab**: Free GPU tier testing
- **AWS EMR**: Distributed GPU clusters
- **SageMaker**: Unified development experience

Results help determine optimal compute configuration for different ML data pipeline stages.

In [None]:
import os
import sys
import time
import json
from pathlib import Path
from typing import Dict, List, Any, Optional
import warnings
warnings.filterwarnings('ignore')

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import psutil
from tqdm import tqdm

# Add src to path
sys.path.append(str(Path.cwd().parent))
from src.bench.spark_utils import build_spark
from src.bench.data_generator import create_synthetic_tabular

# Ensure results directory exists
Path('../results').mkdir(exist_ok=True)

print("‚ö° Spark ETL Benchmarks Setup")
print(f"Working directory: {Path.cwd()}")
print(f"Available memory: {psutil.virtual_memory().available / (1024**3):.1f} GB")
print(f"CPU cores: {psutil.cpu_count()}")

# Check environment for GPU availability
gpu_available = False
try:
    import torch
    gpu_available = torch.cuda.is_available()
    if gpu_available:
        print(f"üéÆ GPU available: {torch.cuda.get_device_name(0)}")
        print(f"   GPU memory: {torch.cuda.get_device_properties(0).total_memory / (1024**3):.1f} GB")
    else:
        print("üíª Running in CPU-only mode")
except ImportError:
    print("üíª PyTorch not available, CPU-only mode")

# Check for cloud environment
is_colab = 'google.colab' in sys.modules
is_sagemaker = os.path.exists('/opt/ml')
is_emr = os.path.exists('/etc/hadoop')

if is_colab:
    print("‚òÅÔ∏è Google Colab environment detected")
elif is_sagemaker:
    print("‚òÅÔ∏è AWS SageMaker environment detected")
elif is_emr:
    print("‚òÅÔ∏è EMR cluster environment detected")
else:
    print("üñ•Ô∏è Local development environment")

In [None]:
# Generate comprehensive test datasets for ETL benchmarking
print("\nüìä Preparing ETL Test Datasets")
print("=" * 50)

# Create datasets of different sizes for scaling analysis
dataset_configs = [
    {'name': 'small', 'rows': 100_000, 'features': 50},
    {'name': 'medium', 'rows': 1_000_000, 'features': 100},
    {'name': 'large', 'rows': 5_000_000, 'features': 150} if not is_colab else {'name': 'large', 'rows': 1_000_000, 'features': 100}  # Adjust for Colab limits
]

etl_datasets = {}

for config in dataset_configs:
    print(f"\nüìà Creating {config['name']} dataset: {config['rows']:,} rows √ó {config['features']} features")
    
    # Create dataset with ETL-friendly characteristics
    files = create_synthetic_tabular(
        n_rows=config['rows'],
        n_features=config['features'],
        n_categorical=min(20, config['features'] // 5),  # 20% categorical
        output_dir=f"../data/etl_{config['name']}",
        formats=['parquet', 'csv'],
        add_skew=True  # Add realistic data skew for join testing
    )
    
    etl_datasets[config['name']] = {
        'config': config,
        'files': files,
        'parquet_path': files['parquet'],
        'csv_path': files['csv']
    }

print("\nüìã Dataset Summary:")
for name, info in etl_datasets.items():
    parquet_size = Path(info['parquet_path']).stat().st_size / (1024**2)
    csv_size = Path(info['csv_path']).stat().st_size / (1024**2)
    print(f"   {name.capitalize()}: Parquet {parquet_size:.1f}MB, CSV {csv_size:.1f}MB")

print("‚úÖ ETL datasets prepared")

In [None]:
# ETL Benchmark Functions
print("\nüîß Setting up ETL Benchmark Functions")

def benchmark_spark_operation(spark, operation_name: str, operation_func, *args, **kwargs):
    """Benchmark a Spark operation with timing and resource monitoring."""
    
    # Get initial memory state
    process = psutil.Process()
    memory_before = process.memory_info().rss / (1024**2)  # MB
    
    # Run operation
    start_time = time.perf_counter()
    try:
        result = operation_func(*args, **kwargs)
        
        # Force execution for lazy operations
        if hasattr(result, 'count'):
            count = result.count()
        elif hasattr(result, 'collect'):
            collected = result.collect()
            count = len(collected)
        else:
            count = None
            
        end_time = time.perf_counter()
        
        # Get memory after operation
        memory_after = process.memory_info().rss / (1024**2)  # MB
        
        return {
            'operation': operation_name,
            'duration_seconds': end_time - start_time,
            'memory_before_mb': memory_before,
            'memory_after_mb': memory_after,
            'memory_peak_mb': memory_after,  # Simplified
            'result_count': count,
            'success': True,
            'error': None
        }
        
    except Exception as e:
        end_time = time.perf_counter()
        return {
            'operation': operation_name,
            'duration_seconds': end_time - start_time,
            'memory_before_mb': memory_before,
            'memory_after_mb': process.memory_info().rss / (1024**2),
            'memory_peak_mb': process.memory_info().rss / (1024**2),
            'result_count': None,
            'success': False,
            'error': str(e)
        }

def create_etl_workloads(spark, dataset_path: str):
    """Create standardized ETL workloads for benchmarking."""
    
    # Load the dataset
    df = spark.read.parquet(dataset_path)
    df.cache()  # Cache for multiple operations
    
    workloads = {
        'data_scan': lambda: df.count(),
        
        'simple_filter': lambda: df.filter(df.target == True).count(),
        
        'aggregation': lambda: df.groupBy('target').agg(
            {'num_feat_0': 'avg', 'num_feat_1': 'sum', 'num_feat_2': 'max'}
        ).collect(),
        
        'complex_filter': lambda: df.filter(
            (df.num_feat_0 > 0) & 
            (df.target == True) & 
            df.cat_feat_0.isNotNull()
        ).count(),
        
        'window_function': lambda: df.withColumn(
            'row_number', 
            spark.sql.functions.row_number().over(
                spark.sql.window.Window.partitionBy('target').orderBy('num_feat_0')
            )
        ).filter('row_number <= 100').count(),
        
        'join_operation': lambda: df.alias('a').join(
            df.select('target', 'num_feat_0').alias('b'),
            df.target == df.target,  # Self join for demonstration
            'inner'
        ).count()
    }
    
    return workloads

print("‚úÖ Benchmark functions ready")

In [None]:
# CPU Spark Benchmarks
print("\nüíª Running CPU Spark Benchmarks")
print("=" * 50)

cpu_results = []

# Create CPU Spark session
try:
    spark_cpu = build_spark(
        use_gpu=False, 
        app_name='CPUBenchmark',
        extra_conf={
            'spark.executor.memory': '4g',
            'spark.driver.memory': '2g',
            'spark.sql.adaptive.enabled': 'true',
            'spark.sql.adaptive.coalescePartitions.enabled': 'true',
            'spark.serializer': 'org.apache.spark.serializer.KryoSerializer'
        }
    )
    
    print(f"‚úÖ CPU Spark session created (version: {spark_cpu.version})")
    
    # Benchmark each dataset
    for dataset_name, dataset_info in etl_datasets.items():
        print(f"\nüìä Benchmarking {dataset_name} dataset with CPU Spark...")
        
        parquet_path = dataset_info['parquet_path']
        workloads = create_etl_workloads(spark_cpu, parquet_path)
        
        for workload_name, workload_func in tqdm(workloads.items(), desc=f"CPU {dataset_name}"):
            # Run each workload multiple times for stability
            for run in range(3):
                result = benchmark_spark_operation(
                    spark_cpu, 
                    f"{workload_name}_{dataset_name}", 
                    workload_func
                )
                
                result.update({
                    'compute_backend': 'cpu',
                    'dataset_name': dataset_name,
                    'dataset_rows': dataset_info['config']['rows'],
                    'dataset_features': dataset_info['config']['features'],
                    'workload': workload_name,
                    'run_number': run + 1,
                    'spark_version': spark_cpu.version
                })
                
                cpu_results.append(result)
                
                if not result['success']:
                    print(f"‚ö†Ô∏è Failed: {workload_name} on {dataset_name} - {result['error']}")
    
    spark_cpu.stop()
    print("\n‚úÖ CPU benchmarks completed")
    
except Exception as e:
    print(f"‚ùå CPU Spark benchmarks failed: {e}")
    cpu_results = []

# Save CPU results
if cpu_results:
    cpu_df = pd.DataFrame(cpu_results)
    cpu_df.to_csv('../results/spark_cpu_benchmarks.csv', index=False)
    print(f"üíæ Saved {len(cpu_results)} CPU benchmark results")
    
    # Quick summary
    successful_runs = cpu_df[cpu_df['success'] == True]
    if not successful_runs.empty:
        print("\nüìä CPU Performance Summary:")
        summary = successful_runs.groupby(['dataset_name', 'workload'])['duration_seconds'].agg(['mean', 'std']).round(3)
        print(summary.head(10))
else:
    print("‚ö†Ô∏è No CPU results to save")

In [None]:
# GPU Spark (RAPIDS) Benchmarks
print("\nüéÆ Running GPU Spark (RAPIDS) Benchmarks")
print("=" * 50)

gpu_results = []

if gpu_available:
    try:
        # Check for RAPIDS availability
        rapids_available = False
        try:
            import cudf
            rapids_available = True
            print(f"‚úÖ RAPIDS cuDF available: {cudf.__version__}")
        except ImportError:
            print("‚ö†Ô∏è RAPIDS cuDF not available, using GPU Spark plugin simulation")
        
        # Create GPU Spark session
        gpu_conf = {
            'spark.executor.memory': '4g',
            'spark.driver.memory': '2g',
            'spark.sql.adaptive.enabled': 'true'
        }
        
        # Add RAPIDS configs if available
        if rapids_available:
            gpu_conf.update({
                'spark.plugins': 'com.nvidia.spark.SQLPlugin',
                'spark.rapids.sql.enabled': 'true',
                'spark.rapids.memory.pinnedPool.size': '2G'
            })
        
        spark_gpu = build_spark(
            use_gpu=True,
            app_name='GPUBenchmark',
            extra_conf=gpu_conf
        )
        
        print(f"‚úÖ GPU Spark session created (version: {spark_gpu.version})")
        
        # Benchmark each dataset
        for dataset_name, dataset_info in etl_datasets.items():
            print(f"\nüöÄ Benchmarking {dataset_name} dataset with GPU Spark...")
            
            parquet_path = dataset_info['parquet_path']
            workloads = create_etl_workloads(spark_gpu, parquet_path)
            
            for workload_name, workload_func in tqdm(workloads.items(), desc=f"GPU {dataset_name}"):
                # Run each workload multiple times for stability
                for run in range(3):
                    result = benchmark_spark_operation(
                        spark_gpu,
                        f"{workload_name}_{dataset_name}",
                        workload_func
                    )
                    
                    result.update({
                        'compute_backend': 'gpu' if rapids_available else 'gpu_simulated',
                        'dataset_name': dataset_name,
                        'dataset_rows': dataset_info['config']['rows'],
                        'dataset_features': dataset_info['config']['features'],
                        'workload': workload_name,
                        'run_number': run + 1,
                        'spark_version': spark_gpu.version,
                        'rapids_enabled': rapids_available
                    })
                    
                    gpu_results.append(result)
                    
                    if not result['success']:
                        print(f"‚ö†Ô∏è Failed: {workload_name} on {dataset_name} - {result['error']}")
        
        spark_gpu.stop()
        print("\n‚úÖ GPU benchmarks completed")
        
    except Exception as e:
        print(f"‚ùå GPU Spark benchmarks failed: {e}")
        gpu_results = []

else:
    print("‚ÑπÔ∏è No GPU available, skipping GPU benchmarks")
    print("   üí° To test GPU acceleration:")
    print("      - Use Google Colab with GPU runtime")
    print("      - Deploy to AWS EMR with GPU instances")
    print("      - Use local machine with NVIDIA GPU")

# Save GPU results
if gpu_results:
    gpu_df = pd.DataFrame(gpu_results)
    gpu_df.to_csv('../results/spark_gpu_benchmarks.csv', index=False)
    print(f"üíæ Saved {len(gpu_results)} GPU benchmark results")
    
    # Quick summary
    successful_runs = gpu_df[gpu_df['success'] == True]
    if not successful_runs.empty:
        print("\nüìä GPU Performance Summary:")
        summary = successful_runs.groupby(['dataset_name', 'workload'])['duration_seconds'].agg(['mean', 'std']).round(3)
        print(summary.head(10))
else:
    print("‚ÑπÔ∏è No GPU results to save")

In [None]:
# Format Comparison Benchmarks (Parquet vs CSV)
print("\nüìã Running Format Comparison Benchmarks")
print("=" * 50)

format_results = []

try:
    # Create a fresh Spark session for format comparison
    spark_format = build_spark(
        use_gpu=False,
        app_name='FormatComparison',
        extra_conf={
            'spark.executor.memory': '4g',
            'spark.driver.memory': '2g'
        }
    )
    
    print(f"‚úÖ Format comparison Spark session created")
    
    # Test different formats for each dataset
    for dataset_name, dataset_info in etl_datasets.items():
        print(f"\nüìä Comparing formats for {dataset_name} dataset...")
        
        formats_to_test = {
            'parquet': dataset_info['parquet_path'],
            'csv': dataset_info['csv_path']
        }
        
        for format_name, file_path in formats_to_test.items():
            
            # Read operation
            def read_operation():
                if format_name == 'parquet':
                    return spark_format.read.parquet(file_path)
                elif format_name == 'csv':
                    return spark_format.read.option("header", "true").option("inferSchema", "true").csv(file_path)
            
            # Benchmark read performance
            for run in range(3):
                result = benchmark_spark_operation(
                    spark_format,
                    f"read_{format_name}_{dataset_name}",
                    lambda: read_operation().count()
                )
                
                # Add file size information
                file_size_mb = Path(file_path).stat().st_size / (1024**2)
                
                result.update({
                    'operation_type': 'read',
                    'format': format_name,
                    'dataset_name': dataset_name,
                    'dataset_rows': dataset_info['config']['rows'],
                    'file_size_mb': file_size_mb,
                    'run_number': run + 1,
                    'throughput_mb_s': file_size_mb / result['duration_seconds'] if result['duration_seconds'] > 0 else 0
                })
                
                format_results.append(result)
            
            print(f"   {format_name.upper()}: {file_size_mb:.1f} MB")
    
    spark_format.stop()
    print("\n‚úÖ Format comparison completed")
    
except Exception as e:
    print(f"‚ùå Format comparison failed: {e}")
    format_results = []

# Save format results
if format_results:
    format_df = pd.DataFrame(format_results)
    format_df.to_csv('../results/spark_format_comparison.csv', index=False)
    print(f"üíæ Saved {len(format_results)} format comparison results")
    
    # Format comparison summary
    successful_runs = format_df[format_df['success'] == True]
    if not successful_runs.empty:
        print("\nüìä Format Performance Summary:")
        summary = successful_runs.groupby(['dataset_name', 'format']).agg({
            'duration_seconds': ['mean', 'std'],
            'throughput_mb_s': 'mean',
            'file_size_mb': 'first'
        }).round(3)
        print(summary)
else:
    print("‚ö†Ô∏è No format comparison results to save")

In [None]:
# Comprehensive Analysis and Visualization
print("\nüìà Analysis and Visualization")
print("=" * 50)

# Combine all results
all_spark_results = []

if cpu_results:
    all_spark_results.extend(cpu_results)
if gpu_results:
    all_spark_results.extend(gpu_results)

if all_spark_results:
    combined_df = pd.DataFrame(all_spark_results)
    combined_df.to_csv('../results/spark_etl_combined.csv', index=False)
    
    # Filter successful runs for analysis
    successful_df = combined_df[combined_df['success'] == True].copy()
    
    if not successful_df.empty:
        # Create comprehensive visualizations
        fig, axes = plt.subplots(2, 3, figsize=(18, 12))
        fig.suptitle('Spark ETL Performance Analysis', fontsize=16)
        
        # 1. CPU vs GPU Performance by Workload
        if gpu_results and cpu_results:
            perf_comparison = successful_df.groupby(['compute_backend', 'workload'])['duration_seconds'].mean().unstack()
            perf_comparison.plot(kind='bar', ax=axes[0,0], width=0.8)
            axes[0,0].set_title('CPU vs GPU Performance by Workload')
            axes[0,0].set_ylabel('Duration (seconds)')
            axes[0,0].legend(bbox_to_anchor=(1.05, 1), loc='upper left')
            axes[0,0].tick_params(axis='x', rotation=45)
        else:
            axes[0,0].text(0.5, 0.5, 'CPU vs GPU\nComparison\nNot Available', 
                          ha='center', va='center', transform=axes[0,0].transAxes)
        
        # 2. Scaling by Dataset Size
        scaling_data = successful_df.groupby(['dataset_name', 'compute_backend'])['duration_seconds'].mean().unstack()
        if not scaling_data.empty:
            scaling_data.plot(kind='bar', ax=axes[0,1], width=0.8)
            axes[0,1].set_title('Performance Scaling by Dataset Size')
            axes[0,1].set_ylabel('Duration (seconds)')
            axes[0,1].legend()
            axes[0,1].tick_params(axis='x', rotation=45)
        
        # 3. Memory Usage Comparison
        memory_data = successful_df.groupby(['compute_backend', 'dataset_name'])['memory_peak_mb'].mean().unstack()
        if not memory_data.empty:
            memory_data.plot(kind='bar', ax=axes[0,2], width=0.8)
            axes[0,2].set_title('Peak Memory Usage')
            axes[0,2].set_ylabel('Memory (MB)')
            axes[0,2].legend()
            axes[0,2].tick_params(axis='x', rotation=45)
        
        # 4. Workload Performance Breakdown
        workload_perf = successful_df.groupby('workload')['duration_seconds'].agg(['mean', 'std'])
        workload_perf['mean'].plot(kind='bar', ax=axes[1,0], yerr=workload_perf['std'], capsize=4)
        axes[1,0].set_title('Workload Performance Breakdown')
        axes[1,0].set_ylabel('Duration (seconds)')
        axes[1,0].tick_params(axis='x', rotation=45)
        
        # 5. Format Comparison (if available)
        if format_results:
            format_successful = pd.DataFrame([r for r in format_results if r['success']])
            if not format_successful.empty:
                format_perf = format_successful.groupby(['dataset_name', 'format'])['throughput_mb_s'].mean().unstack()
                format_perf.plot(kind='bar', ax=axes[1,1], width=0.8)
                axes[1,1].set_title('Format Read Throughput')
                axes[1,1].set_ylabel('Throughput (MB/s)')
                axes[1,1].legend()
                axes[1,1].tick_params(axis='x', rotation=45)
        else:
            axes[1,1].text(0.5, 0.5, 'Format\nComparison\nNot Available', 
                          ha='center', va='center', transform=axes[1,1].transAxes)
        
        # 6. Efficiency Analysis (Operations per second)
        successful_df['ops_per_second'] = successful_df['result_count'] / successful_df['duration_seconds']
        efficiency_data = successful_df.groupby(['compute_backend', 'workload'])['ops_per_second'].mean().unstack()
        if not efficiency_data.empty:
            efficiency_data.plot(kind='bar', ax=axes[1,2], width=0.8)
            axes[1,2].set_title('Processing Efficiency (Ops/sec)')
            axes[1,2].set_ylabel('Operations per Second')
            axes[1,2].legend(bbox_to_anchor=(1.05, 1), loc='upper left')
            axes[1,2].tick_params(axis='x', rotation=45)
        
        plt.tight_layout()
        plt.savefig('../results/spark_etl_analysis.png', dpi=150, bbox_inches='tight')
        plt.show()
        
        print("\nüìä PERFORMANCE SUMMARY:")
        print("=" * 40)
        
        # Overall statistics
        overall_stats = successful_df.groupby('compute_backend').agg({
            'duration_seconds': ['mean', 'std', 'min', 'max'],
            'memory_peak_mb': ['mean', 'max'],
            'result_count': 'sum'
        })
        print("\nüî¢ Overall Statistics by Compute Backend:")
        print(overall_stats.round(3))
        
        # Best performers
        fastest_by_workload = successful_df.loc[successful_df.groupby('workload')['duration_seconds'].idxmin()]
        print("\nüèÜ Fastest Configuration by Workload:")
        for _, row in fastest_by_workload.iterrows():
            print(f"   {row['workload']}: {row['compute_backend']} ({row['duration_seconds']:.3f}s)")
        
        print(f"\nüìÑ Detailed results saved to ../results/spark_etl_*.csv")
        print(f"üìà Visualization saved to ../results/spark_etl_analysis.png")

    else:
        print("‚ö†Ô∏è No successful benchmark runs to analyze")
        
else:
    print("‚ö†Ô∏è No benchmark results available for analysis")

In [None]:
# Performance Insights and Recommendations
print("\nüí° SPARK ETL INSIGHTS & RECOMMENDATIONS")
print("=" * 60)

insights = []
recommendations = []

if all_spark_results:
    successful_df = pd.DataFrame([r for r in all_spark_results if r['success']])
    
    if not successful_df.empty:
        # CPU vs GPU Analysis
        if 'gpu' in successful_df['compute_backend'].values and 'cpu' in successful_df['compute_backend'].values:
            cpu_avg = successful_df[successful_df['compute_backend'] == 'cpu']['duration_seconds'].mean()
            gpu_avg = successful_df[successful_df['compute_backend'] == 'gpu']['duration_seconds'].mean()
            
            if gpu_avg < cpu_avg:
                speedup = cpu_avg / gpu_avg
                insights.append(f"üöÄ GPU acceleration shows {speedup:.1f}x speedup on average")
                recommendations.append("Consider GPU instances for large-scale ETL workloads")
            else:
                slowdown = gpu_avg / cpu_avg
                insights.append(f"‚ö†Ô∏è GPU shows {slowdown:.1f}x slower performance - may need optimization")
                recommendations.append("Focus on CPU optimization or larger datasets for GPU benefits")
        
        # Dataset scaling insights
        if len(successful_df['dataset_name'].unique()) > 1:
            scaling_analysis = successful_df.groupby(['dataset_name', 'workload'])['duration_seconds'].mean().unstack()
            
            # Check if performance scales linearly with data size
            dataset_rows = {name: info['config']['rows'] for name, info in etl_datasets.items()}
            
            if 'small' in dataset_rows and 'large' in dataset_rows:
                size_ratio = dataset_rows['large'] / dataset_rows['small']
                time_ratio = successful_df[successful_df['dataset_name'] == 'large']['duration_seconds'].mean() / \
                           successful_df[successful_df['dataset_name'] == 'small']['duration_seconds'].mean()
                
                if time_ratio < size_ratio * 0.8:  # Better than linear scaling
                    insights.append(f"üìà Excellent scaling: {size_ratio:.1f}x data size ‚Üí {time_ratio:.1f}x time")
                    recommendations.append("System handles large datasets efficiently - consider batch processing")
                elif time_ratio > size_ratio * 1.5:  # Worse than linear scaling
                    insights.append(f"üìâ Poor scaling: {size_ratio:.1f}x data size ‚Üí {time_ratio:.1f}x time")
                    recommendations.append("Investigate memory bottlenecks and consider data partitioning")
                else:
                    insights.append(f"üìä Linear scaling: {size_ratio:.1f}x data size ‚Üí {time_ratio:.1f}x time")
        
        # Memory usage insights
        max_memory = successful_df['memory_peak_mb'].max()
        avg_memory = successful_df['memory_peak_mb'].mean()
        
        if max_memory > 8000:  # > 8GB
            insights.append(f"üî¥ High memory usage detected: {max_memory:.0f} MB peak")
            recommendations.append("Consider larger instances or data partitioning strategies")
        elif avg_memory < 2000:  # < 2GB average
            insights.append(f"üü¢ Low memory usage: {avg_memory:.0f} MB average")
            recommendations.append("Current setup is memory-efficient - can handle larger datasets")
        
        # Workload-specific insights
        workload_times = successful_df.groupby('workload')['duration_seconds'].mean().sort_values(ascending=False)
        slowest_workload = workload_times.index[0]
        fastest_workload = workload_times.index[-1]
        
        insights.append(f"üêå Slowest operation: {slowest_workload} ({workload_times.iloc[0]:.3f}s)")
        insights.append(f"‚ö° Fastest operation: {fastest_workload} ({workload_times.iloc[-1]:.3f}s)")
        
        if 'join_operation' in workload_times.index and workload_times['join_operation'] > workload_times.mean() * 2:
            recommendations.append("Join operations are bottleneck - consider data skew mitigation")
        
        if 'window_function' in workload_times.index and workload_times['window_function'] > workload_times.mean() * 1.5:
            recommendations.append("Window functions are expensive - consider alternative approaches")

# Format comparison insights
if format_results:
    format_successful = pd.DataFrame([r for r in format_results if r['success']])
    if not format_successful.empty:
        format_throughput = format_successful.groupby('format')['throughput_mb_s'].mean()
        
        if 'parquet' in format_throughput.index and 'csv' in format_throughput.index:
            parquet_speed = format_throughput['parquet']
            csv_speed = format_throughput['csv']
            
            if parquet_speed > csv_speed * 2:
                insights.append(f"üì¶ Parquet is {parquet_speed/csv_speed:.1f}x faster than CSV")
                recommendations.append("Use Parquet format for production ETL pipelines")
            else:
                insights.append(f"üìÑ Parquet vs CSV performance difference is moderate ({parquet_speed/csv_speed:.1f}x)")

# Platform-specific recommendations
platform_recommendations = []
if is_colab:
    platform_recommendations.extend([
        "Use Colab Pro for larger memory limits on big datasets",
        "Mount Google Drive for persistent data storage",
        "Consider BigQuery integration for massive datasets"
    ])
elif is_sagemaker:
    platform_recommendations.extend([
        "Use SageMaker Processing for distributed ETL jobs",
        "Leverage S3 for scalable data storage",
        "Consider EMR integration for larger Spark clusters"
    ])
elif is_emr:
    platform_recommendations.extend([
        "Optimize cluster size based on workload characteristics",
        "Use spot instances for cost-effective processing",
        "Enable dynamic allocation for variable workloads"
    ])
else:
    platform_recommendations.extend([
        "Consider cloud deployment for larger scale processing",
        "Use local SSD storage for optimal I/O performance",
        "Monitor resource utilization for capacity planning"
    ])

# Print insights and recommendations
print("\nüéØ KEY INSIGHTS:")
for i, insight in enumerate(insights, 1):
    print(f"   {i}. {insight}")

print("\nüîß OPTIMIZATION RECOMMENDATIONS:")
all_recommendations = recommendations + platform_recommendations
for i, rec in enumerate(all_recommendations, 1):
    print(f"   {i}. {rec}")

print("\nüåê PLATFORM-SPECIFIC DEPLOYMENT GUIDES:")
deployment_guides = {
    "Google Colab": "Use !pip install for dependencies, mount drive for data persistence",
    "AWS SageMaker": "Use SageMaker Processing jobs for production ETL workflows", 
    "EMR Spark": "Configure cluster with appropriate instance types and auto-scaling",
    "Local Development": "Use conda/venv for environment management, consider Docker for consistency"
}

for platform, guide in deployment_guides.items():
    print(f"   üì± {platform}: {guide}")

print("\n‚úÖ Spark ETL Benchmarks Complete!")
print(f"üìä Check ../results/ for detailed performance data and analysis plots")
print(f"üîó Next: Run notebook 03 for ML training pipeline benchmarks")