# 03. Distributed Data Processing with PySpark

## üìö Learning Objectives

By completing this notebook, you will:
- Understand PySpark for distributed data processing
- Implement PySpark to perform distributed data processing on large datasets
- Integrate PySpark with existing Python workflows
- Compare PySpark with Dask for distributed computing
- Apply PySpark to real-world large dataset scenarios

## üîó Prerequisites

- ‚úÖ Basic Python
- ‚úÖ Basic NumPy/Pandas
- ‚úÖ Understanding of distributed computing concepts

---

## Official Structure Reference

This notebook covers practical activities from **Course 05, Unit 5**:
- Data Processing using PySpark: Implementing PySpark to perform distributed data processing on large datasets and integrating with existing Python workflows
- **Source:** `DETAILED_UNIT_DESCRIPTIONS.md` - Unit 5 Practical Content

---

## The Story | ÿßŸÑŸÇÿµÿ©

**BEFORE**: You know Dask for distributed computing but don't know Spark for big data processing.

**AFTER**: You'll learn PySpark - industry-standard framework for distributed big data processing and analytics!

**Why this matters**: Distributed Data Processing with PySpark is essential for building complete, professional data science solutions!

---


# Unit 5 - Example 03: Distributed Data Processing with PySpark | ŸÖÿπÿßŸÑÿ¨ÿ© ÿßŸÑÿ®ŸäÿßŸÜÿßÿ™ ÿßŸÑŸÖŸàÿ≤ÿπÿ© ŸÖÿπ PySpark

## üîó Building on Example 02 | ÿßŸÑÿ®ŸÜÿßÿ° ÿπŸÑŸâ ÿßŸÑŸÖÿ´ÿßŸÑ 14

**From Example 02 (Dask):**
- We learned Dask for distributed computing in Python
- Dask works well for Python-native workflows
- But for enterprise-scale distributed processing, we need Apache Spark

**This notebook introduces:**
- **PySpark** - Python API for Apache Spark
- **Distributed data processing** on large datasets
- **Integration** with existing Python workflows
- **Enterprise-scale** distributed computing

**This complements Dask with enterprise distributed computing!**

In [1]:
# Try importing PySpark (requires Spark installation)
# PySpark may not be available on all systems - we'll provide fallback
try:
    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
    PYSPARK_AVAILABLE = True
    print("‚úÖ PySpark imported successfully!")
except ImportError:
    PYSPARK_AVAILABLE = False
    print("‚ö†Ô∏è  PySpark not available. Install Spark for distributed processing:")
    print("   Note: Requires Apache Spark installation")
    print("   Continuing with pandas simulation...")

# Always import pandas/numpy for fallback
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import time

print("‚úÖ Libraries imported!")

‚ö†Ô∏è  PySpark not available. Install Spark for distributed processing:
   Note: Requires Apache Spark installation
   Continuing with pandas simulation...


‚úÖ Libraries imported!


## Part 1: Introduction to PySpark | ÿßŸÑÿ¨ÿ≤ÿ° ÿßŸÑÿ£ŸàŸÑ: ŸÖŸÇÿØŸÖÿ© ÿ•ŸÑŸâ PySpark

**PySpark** is the Python API for Apache Spark, a unified analytics engine for large-scale data processing.

**Key Features:**
- Distributed data processing across clusters
- In-memory computing for faster processing
- Integration with Hadoop ecosystem
- Support for SQL, streaming, and machine learning

**When to use PySpark vs Dask:**
- **PySpark**: Enterprise clusters, Hadoop integration, SQL queries, streaming
- **Dask**: Python-native workflows, smaller clusters, NumPy/Pandas compatibility

In [2]:
print("=" * 70)
print("Example 03: Distributed Data Processing with PySpark")
print("=" * 70)
print("\nüìö Prerequisites: Example 02 (Dask) completed")
print("üîó This notebook covers PySpark for distributed data processing")
print("üéØ Goal: Master PySpark for enterprise-scale distributed computing\n")

if PYSPARK_AVAILABLE:
    print("‚úÖ PySpark is available - Using real distributed processing")
else:
    print("‚ö†Ô∏è  PySpark not available - Using pandas simulation")
    print("   (Install Spark to use actual distributed processing)")

Example 03: Distributed Data Processing with PySpark

üìö Prerequisites: Example 02 (Dask) completed
üîó This notebook covers PySpark for distributed data processing
üéØ Goal: Master PySpark for enterprise-scale distributed computing

‚ö†Ô∏è  PySpark not available - Using pandas simulation
   (Install Spark to use actual distributed processing)


## Part 2: Creating Spark Session | ÿßŸÑÿ¨ÿ≤ÿ° ÿßŸÑÿ´ÿßŸÜŸä: ÿ•ŸÜÿ¥ÿßÿ° ÿ¨ŸÑÿ≥ÿ© Spark

**SparkSession** is the entry point for PySpark applications.

**Why SparkSession?**
- Manages Spark context and configuration
- Provides unified API for Spark SQL, DataFrames, and Datasets
- Handles distributed execution across cluster

In [3]:
if PYSPARK_AVAILABLE:
    # Create SparkSession for distributed processing
    spark = (SparkSession.builder
             .appName("Course05_PySpark_Example")
             .master("local[*]")  # Use all available cores locally
             .getOrCreate())
    
    print("‚úÖ SparkSession created successfully!")
    print(f"Spark version: {spark.version}")
    print(f"Spark master: {spark.sparkContext.master}")
else:
    print("‚ö†Ô∏è  PySpark simulation mode - Using pandas for demonstration")
    print("   (Same operations, but single-machine processing)")

‚ö†Ô∏è  PySpark simulation mode - Using pandas for demonstration
   (Same operations, but single-machine processing)


## Part 3: Creating Sample Data | ÿßŸÑÿ¨ÿ≤ÿ° ÿßŸÑÿ´ÿßŸÑÿ´: ÿ•ŸÜÿ¥ÿßÿ° ÿ®ŸäÿßŸÜÿßÿ™ ÿ™ÿ¨ÿ±Ÿäÿ®Ÿäÿ©

We'll create a large dataset to demonstrate PySpark's distributed processing capabilities.

In [4]:
# Generate sample data for demonstration
np.random.seed(42)
n_rows = 100000  # Large dataset to show distributed processing benefits

sample_data = pd.DataFrame({
    'id': range(n_rows),
    'value1': np.random.randn(n_rows),
    'value2': np.random.randint(1, 100, n_rows),
    'category': np.random.choice(['A', 'B', 'C', 'D'], n_rows),
    'score': np.random.uniform(0, 100, n_rows)
})

print(f"‚úÖ Sample data created: {len(sample_data):,} rows")
print(f"Data shape: {sample_data.shape}")
print("\nFirst few rows:")
print(sample_data.head())

‚úÖ Sample data created: 100,000 rows
Data shape: (100000, 5)

First few rows:
   id    value1  value2 category      score
0   0  0.496714      36        C  99.661504
1   1 -0.138264      18        C  58.729074
2   2  0.647689       5        A  72.476852
3   3  1.523030      46        C  90.637787
4   4 -0.234153      86        B  30.715934


## Part 4: Loading Data into PySpark | ÿßŸÑÿ¨ÿ≤ÿ° ÿßŸÑÿ±ÿßÿ®ÿπ: ÿ™ÿ≠ŸÖŸäŸÑ ÿßŸÑÿ®ŸäÿßŸÜÿßÿ™ ÿ•ŸÑŸâ PySpark

PySpark DataFrames are distributed collections of data organized into named columns.

In [5]:
if PYSPARK_AVAILABLE:
    # Save to CSV first (PySpark can read from various sources)
    csv_path = 'sample_data_pyspark.csv'
    sample_data.to_csv(csv_path, index=False)
    
    # Load into PySpark DataFrame
    df_spark = spark.read.csv(csv_path, header=True, inferSchema=True)
    
    print("‚úÖ Data loaded into PySpark DataFrame!")
    print(f"Number of partitions: {df_spark.rdd.getNumPartitions()}")
    print(f"Total rows: {df_spark.count():,}")
    print("\nSchema:")
    df_spark.printSchema()
    print("\nFirst few rows:")
    df_spark.show(5)
else:
    print("‚ö†Ô∏è  PySpark simulation - Using pandas DataFrame")
    df_pandas = sample_data.copy()
    print(f"Data shape: {df_pandas.shape}")
    print("\nFirst few rows:")
    print(df_pandas.head())

‚ö†Ô∏è  PySpark simulation - Using pandas DataFrame
Data shape: (100000, 5)

First few rows:
   id    value1  value2 category      score
0   0  0.496714      36        C  99.661504
1   1 -0.138264      18        C  58.729074
2   2  0.647689       5        A  72.476852
3   3  1.523030      46        C  90.637787
4   4 -0.234153      86        B  30.715934


## Part 5: Distributed Data Processing Operations | ÿßŸÑÿ¨ÿ≤ÿ° ÿßŸÑÿÆÿßŸÖÿ≥: ÿπŸÖŸÑŸäÿßÿ™ ŸÖÿπÿßŸÑÿ¨ÿ© ÿßŸÑÿ®ŸäÿßŸÜÿßÿ™ ÿßŸÑŸÖŸàÿ≤ÿπÿ©

PySpark performs operations in a distributed manner across partitions.

In [6]:
print("\n" + "=" * 70)
print("PART 5: Distributed Data Processing Operations")
print("=" * 70)

if PYSPARK_AVAILABLE:
    # Filtering (distributed)
    print("\n1. Filtering data (distributed across partitions):")
    filtered = df_spark.filter(df_spark['value2'] > 50)
    print(f"   Rows where value2 > 50: {filtered.count():,}")
    
    # Aggregations (distributed)
    print("\n2. Aggregations (distributed):")
    aggregated = df_spark.groupBy('category').agg(
        F.avg('score').alias('avg_score'),
        F.count('*').alias('count')
    )
    print("   Grouped by category:")
    aggregated.show()
    
    # Transformations (distributed)
    print("\n3. Transformations (distributed):")
    transformed = df_spark.withColumn('value1_squared', df_spark['value1'] ** 2)
    print("   Added new column 'value1_squared'")
    transformed.select('id', 'value1', 'value1_squared').show(5)
else:
    # Pandas simulation
    print("\n1. Filtering data:")
    filtered = df_pandas[df_pandas['value2'] > 50]
    print(f"   Rows where value2 > 50: {len(filtered):,}")
    
    print("\n2. Aggregations:")
    aggregated = df_pandas.groupby('category').agg({
        'score': 'mean',
        'id': 'count'
    }).rename(columns={'score': 'avg_score', 'id': 'count'})
    print("   Grouped by category:")
    print(aggregated)
    
    print("\n3. Transformations:")
    df_pandas['value1_squared'] = df_pandas['value1'] ** 2
    print("   Added new column 'value1_squared'")
    print(df_pandas[['id', 'value1', 'value1_squared']].head())


PART 5: Distributed Data Processing Operations

1. Filtering data:
   Rows where value2 > 50: 49,280

2. Aggregations:
   Grouped by category:
          avg_score  count
category                  
A         49.987463  25002
B         50.131388  24837
C         49.840321  25078
D         50.177236  25083

3. Transformations:
   Added new column 'value1_squared'
   id    value1  value1_squared
0   0  0.496714        0.246725
1   1 -0.138264        0.019117
2   2  0.647689        0.419500
3   3  1.523030        2.319620
4   4 -0.234153        0.054828


## Part 6: Performance Comparison | ÿßŸÑÿ¨ÿ≤ÿ° ÿßŸÑÿ≥ÿßÿØÿ≥: ŸÖŸÇÿßÿ±ŸÜÿ© ÿßŸÑÿ£ÿØÿßÿ°

Comparing PySpark (distributed) vs Pandas (single-machine) performance.

In [7]:
print("\n" + "=" * 70)
print("PART 6: Performance Comparison")
print("=" * 70)

# Test complex operation: filtering + aggregation + transformation
if PYSPARK_AVAILABLE:
    print("\n‚è±Ô∏è  Testing PySpark (distributed) performance...")
    start_time = time.time()
    
    result_spark = (df_spark
                .filter(df_spark['value2'] > 50)
                .groupBy('category')
                .agg(F.avg('score').alias('avg_score'))
                .collect())
    
    spark_time = time.time() - start_time
    print(f"   ‚úÖ PySpark (distributed): {spark_time:.4f} seconds")
    print(f"   Results: {len(result_spark)} groups")
else:
    print("\n‚è±Ô∏è  Simulated PySpark (distributed) performance:")
    print("   (PySpark would distribute this across multiple cores/machines)")
    spark_time = 0.05  # Simulated faster time

# Pandas (single-machine)
print("\n‚è±Ô∏è  Testing Pandas (single-machine) performance...")
start_time = time.time()

result_pandas = (df_pandas[df_pandas['value2'] > 50]
              .groupby('category')['score']
              .mean())

pandas_time = time.time() - start_time
print(f"   ‚úÖ Pandas (single-machine): {pandas_time:.4f} seconds")

# Comparison
if PYSPARK_AVAILABLE:
    speedup = pandas_time / spark_time
    print(f"\nüìä Speedup: {speedup:.2f}x faster with PySpark")
    print("   (On larger datasets/clusters, PySpark shows even better performance)")


PART 6: Performance Comparison

‚è±Ô∏è  Simulated PySpark (distributed) performance:
   (PySpark would distribute this across multiple cores/machines)

‚è±Ô∏è  Testing Pandas (single-machine) performance...
   ‚úÖ Pandas (single-machine): 0.0018 seconds


## Part 7: Integration with Python Workflows | ÿßŸÑÿ¨ÿ≤ÿ° ÿßŸÑÿ≥ÿßÿ®ÿπ: ÿßŸÑÿ™ŸÉÿßŸÖŸÑ ŸÖÿπ ÿ≥Ÿäÿ± ÿπŸÖŸÑ Python

PySpark integrates seamlessly with existing Python workflows.

In [8]:
print("\n" + "=" * 70)
print("PART 7: Integration with Python Workflows")
print("=" * 70)

if PYSPARK_AVAILABLE:
    # Convert PySpark DataFrame to Pandas (for integration)
    print("\n1. Converting PySpark DataFrame to Pandas:")
    df_pandas_from_spark = df_spark.limit(1000).toPandas()
    print(f"   Converted {len(df_pandas_from_spark):,} rows to Pandas")
    print("   Now you can use pandas/numpy/scikit-learn on this data")
    
    # Use with existing Python libraries
    print("\n2. Using with existing Python libraries:")
    from sklearn.preprocessing import StandardScaler
    
    scaler = StandardScaler()
    scaled_data = scaler.fit_transform(df_pandas_from_spark[['value1', 'score']])
    print(f"   Scaled data shape: {scaled_data.shape}")
    print("   ‚úÖ PySpark integrates with scikit-learn, pandas, numpy!")
else:
    print("\n‚ö†Ô∏è  PySpark simulation - Integration demonstration:")
    print("   In real PySpark, you can:")
    print("   1. Process large datasets distributed across cluster")
    print("   2. Convert to Pandas for smaller subsets")
    print("   3. Use with scikit-learn, pandas, numpy")
    print("   4. Write results back to distributed storage")


PART 7: Integration with Python Workflows

‚ö†Ô∏è  PySpark simulation - Integration demonstration:
   In real PySpark, you can:
   1. Process large datasets distributed across cluster
   2. Convert to Pandas for smaller subsets
   3. Use with scikit-learn, pandas, numpy
   4. Write results back to distributed storage


## Part 8: Summary | ÿßŸÑÿ¨ÿ≤ÿ° ÿßŸÑÿ´ÿßŸÖŸÜ: ÿßŸÑŸÖŸÑÿÆÿµ

**Key Takeaways:**

1. **PySpark** provides distributed data processing for large datasets
2. **Distributed operations** run across multiple cores/machines
3. **Integration** with Python workflows (pandas, scikit-learn, numpy)
4. **Performance** scales with cluster size

**When to use PySpark:**
- Very large datasets (billions of rows)
- Enterprise clusters and Hadoop integration
- SQL queries on distributed data
- Streaming data processing

**When to use Dask (from Example 02):**
- Python-native workflows
- Smaller clusters
- NumPy/Pandas compatibility
- Interactive data science

In [9]:
print("\n" + "=" * 70)
print("SUMMARY: PySpark for Distributed Data Processing")
print("=" * 70)

print("\n‚úÖ What you learned:")
print("   1. PySpark for distributed data processing")
print("   2. Creating SparkSession and DataFrames")
print("   3. Distributed operations (filtering, aggregation, transformation)")
print("   4. Integration with Python workflows")
print("   5. Performance benefits of distributed processing")

print("\nüîó Next steps:")
print("   - Example 04: RAPIDS workflows (GPU acceleration)")
print("   - Example 04: Production pipelines")
print("   - Example 05: Performance optimization")

if PYSPARK_AVAILABLE:
    spark.stop()
    print("\n‚úÖ SparkSession stopped")
else:
    print("\nüí° To use PySpark:")
    print("   1. Install Apache Spark: https://spark.apache.org/downloads.html")
    print("   2. Install PySpark: pip install pyspark")
    print("   3. Run this notebook again for distributed processing")


SUMMARY: PySpark for Distributed Data Processing

‚úÖ What you learned:
   1. PySpark for distributed data processing
   2. Creating SparkSession and DataFrames
   3. Distributed operations (filtering, aggregation, transformation)
   4. Integration with Python workflows
   5. Performance benefits of distributed processing

üîó Next steps:
   - Example 04: RAPIDS workflows (GPU acceleration)
   - Example 04: Production pipelines
   - Example 05: Performance optimization

üí° To use PySpark:
   1. Install Apache Spark: https://spark.apache.org/downloads.html
   2. Install PySpark: pip install pyspark
   3. Run this notebook again for distributed processing
