# PySpark Complete Tutorial - From Basics to Advanced

## 🎯 Learning Objectives
By the end of this notebook, you will master:
- PySpark environment setup and configuration
- All types of joins (Inner, Outer, Cross, Semi, Anti)
- Advanced optimization techniques and performance tuning
- Sorting strategies and partitioning methods
- Memory management and Catalyst optimizer
- Real-world data processing scenarios

## 📚 What We'll Cover Today
1. **Environment Setup** - Installation and SparkSession configuration
2. **DataFrame Fundamentals** - Creating, transforming, and manipulating data
3. **Join Operations** - Complete coverage of all join types
4. **Optimization Techniques** - Performance tuning and best practices
5. **Advanced Topics** - UDFs, Window functions, and streaming

Let's dive deep into PySpark! 🚀

## 1. PySpark Environment Setup and SparkSession

### Installation and Environment Configuration
First, let's set up PySpark with all necessary configurations for optimal performance.

In [None]:
# Install PySpark (run this if PySpark is not installed)
# !pip install pyspark

# Import essential libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import os
import sys

# Check PySpark version
print(f"PySpark Version: {pyspark.__version__}")
print(f"Python Version: {sys.version}")

# Set environment variables for optimal performance
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [None]:
# Create SparkSession with optimized configurations
spark = SparkSession.builder \
    .appName("PySpark Complete Tutorial") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .config("spark.sql.autoBroadcastJoinThreshold", "10MB") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "1g") \
    .config("spark.executor.cores", "2") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

# Set log level to reduce verbose output
spark.sparkContext.setLogLevel("WARN")

print("SparkSession created successfully!")
print(f"Spark Version: {spark.version}")
print(f"Application Name: {spark.conf.get('spark.app.name')}")
print(f"Master: {spark.conf.get('spark.master')}")

# Display key configurations
print("\n🔧 Key Configurations:")
configs = [
    "spark.sql.adaptive.enabled",
    "spark.sql.adaptive.coalescePartitions.enabled", 
    "spark.sql.autoBroadcastJoinThreshold",
    "spark.sql.shuffle.partitions"
]

for config in configs:
    print(f"{config}: {spark.conf.get(config)}")

## 2. Creating DataFrames from Various Sources

### Learn multiple ways to create DataFrames for different data sources

In [None]:
# 1. Creating DataFrame from Python lists
print("📊 Creating DataFrames from various sources:\n")

# Sample data for employees
employees_data = [
    (1, "John Doe", "Engineering", 75000, "2020-01-15"),
    (2, "Jane Smith", "Marketing", 65000, "2019-03-20"),
    (3, "Mike Johnson", "Engineering", 80000, "2021-06-10"),
    (4, "Sarah Wilson", "HR", 55000, "2018-11-05"),
    (5, "David Brown", "Engineering", 90000, "2017-09-12"),
    (6, "Lisa Davis", "Marketing", 70000, "2020-08-30"),
    (7, "Tom Miller", "Finance", 62000, "2019-12-01"),
    (8, "Amy Taylor", "Engineering", 85000, "2021-02-14")
]

# Define schema for better performance and data quality
employees_schema = StructType([
    StructField("employee_id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("department", StringType(), True),
    StructField("salary", IntegerType(), True),
    StructField("hire_date", StringType(), True)
])

# Create DataFrame with schema
employees_df = spark.createDataFrame(employees_data, employees_schema)

print("✅ Employees DataFrame created:")
employees_df.show()
employees_df.printSchema()

In [None]:
# 2. Creating DataFrame from dictionary
departments_dict = [
    {"dept_id": 1, "dept_name": "Engineering", "location": "San Francisco", "budget": 1000000},
    {"dept_id": 2, "dept_name": "Marketing", "location": "New York", "budget": 500000},
    {"dept_id": 3, "dept_name": "HR", "location": "Chicago", "budget": 300000},
    {"dept_id": 4, "dept_name": "Finance", "location": "Boston", "budget": 400000}
]

departments_df = spark.createDataFrame(departments_dict)
print("✅ Departments DataFrame created:")
departments_df.show()

# 3. Creating DataFrame from CSV data (simulated)
import tempfile
import os

# Create sample CSV data
csv_data = """employee_id,name,department,salary,hire_date
1,John Doe,Engineering,75000,2020-01-15
2,Jane Smith,Marketing,65000,2019-03-20
3,Mike Johnson,Engineering,80000,2021-06-10"""

# Write to temporary file
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f:
    f.write(csv_data)
    temp_csv_path = f.name

# Read CSV file
csv_df = spark.read.option("header", "true").option("inferSchema", "true").csv(temp_csv_path)
print("✅ DataFrame from CSV:")
csv_df.show()

# Clean up
os.unlink(temp_csv_path)

## 3. Complete Guide to All Types of Joins

### Master every join type with practical examples and optimization techniques

In [None]:
# Create sample data for join demonstrations
print("🔗 Comprehensive Join Operations Guide\n")

# Employee performance data
performance_data = [
    (1, "John Doe", 85, "Excellent"),
    (2, "Jane Smith", 78, "Good"),  
    (3, "Mike Johnson", 92, "Outstanding"),
    (5, "David Brown", 88, "Excellent"),
    (9, "New Employee", 75, "Good")  # Employee not in main table
]

performance_df = spark.createDataFrame(performance_data, 
    ["employee_id", "emp_name", "performance_score", "rating"])

# Department mapping
dept_mapping = [
    ("Engineering", "TECH"),
    ("Marketing", "MKTG"), 
    ("HR", "HUMAN_RESOURCES"),
    ("Finance", "FIN"),
    ("Operations", "OPS")  # Department not in employees table
]

dept_codes_df = spark.createDataFrame(dept_mapping, ["department", "dept_code"])

print("📊 Source DataFrames:")
print("\n1. Employees:")
employees_df.show()
print("\n2. Performance:")
performance_df.show()
print("\n3. Department Codes:")
dept_codes_df.show()

In [None]:
# 1. INNER JOIN - Returns only matching records from both tables
print("🔗 1. INNER JOIN")
print("Returns only employees who have performance records")
inner_join = employees_df.join(performance_df, "employee_id", "inner")
inner_join.show()
print(f"Result count: {inner_join.count()}")

# 2. LEFT JOIN (LEFT OUTER) - All records from left table
print("\n🔗 2. LEFT JOIN (LEFT OUTER)")
print("All employees, with performance data where available")
left_join = employees_df.join(performance_df, "employee_id", "left")
left_join.show()
print(f"Result count: {left_join.count()}")

# 3. RIGHT JOIN (RIGHT OUTER) - All records from right table  
print("\n🔗 3. RIGHT JOIN (RIGHT OUTER)")
print("All performance records, with employee data where available")
right_join = employees_df.join(performance_df, "employee_id", "right")
right_join.show()
print(f"Result count: {right_join.count()}")

# 4. FULL OUTER JOIN - All records from both tables
print("\n🔗 4. FULL OUTER JOIN")
print("All employees and all performance records")
full_join = employees_df.join(performance_df, "employee_id", "outer")
full_join.show()
print(f"Result count: {full_join.count()}")

In [None]:
# 5. SEMI JOIN - Returns left table records that have matches in right table
print("\n🔗 5. SEMI JOIN")
print("Employees who have performance records (no columns from right table)")
semi_join = employees_df.join(performance_df, "employee_id", "semi")
semi_join.show()
print(f"Result count: {semi_join.count()}")

# 6. ANTI JOIN - Returns left table records that DON'T have matches in right table
print("\n🔗 6. ANTI JOIN") 
print("Employees who DON'T have performance records")
anti_join = employees_df.join(performance_df, "employee_id", "anti")
anti_join.show()
print(f"Result count: {anti_join.count()}")

# 7. CROSS JOIN - Cartesian product of both tables
print("\n🔗 7. CROSS JOIN (Use with caution!)")
print("Every employee paired with every department code")
# Using small dataset for demonstration
small_emp = employees_df.limit(2)
small_dept = dept_codes_df.limit(2)
cross_join = small_emp.crossJoin(small_dept)
cross_join.show()
print(f"Result count: {cross_join.count()}")

# 8. MULTIPLE COLUMN JOIN
print("\n🔗 8. MULTIPLE COLUMN JOIN")
print("Join on multiple conditions")
multi_join = employees_df.join(
    performance_df, 
    (employees_df.employee_id == performance_df.employee_id) & 
    (employees_df.name == performance_df.emp_name),
    "inner"
)
multi_join.show()

## 4. Join Optimization Techniques

### Master broadcast joins, bucketing, and performance tuning

In [None]:
# 1. BROADCAST JOIN - Optimize joins with small tables
from pyspark.sql.functions import broadcast

print("🚀 Join Optimization Techniques\n")

# Broadcast the smaller department codes table
print("1. BROADCAST JOIN:")
print("Broadcasting small department table for faster joins")
broadcast_join = employees_df.join(
    broadcast(dept_codes_df), 
    "department", 
    "left"
)
broadcast_join.show()

# Check execution plan
print("\n📊 Execution Plan Analysis:")
broadcast_join.explain(True)

# 2. BUCKETING for large datasets (conceptual example)
print("\n2. BUCKETING STRATEGY:")
print("For large datasets, bucket tables on join keys")

# Example of writing bucketed table (commented for demo)
# employees_df.write \
#     .bucketBy(4, "employee_id") \
#     .sortBy("employee_id") \
#     .saveAsTable("bucketed_employees")

# 3. Join hints for query optimization
print("\n3. JOIN HINTS:")
print("Using join hints to guide optimizer")

# Different join strategies
strategies = ["BROADCAST", "MERGE", "SHUFFLE_HASH"]
for strategy in strategies:
    print(f"\n{strategy} join strategy:")
    try:
        hint_join = employees_df.hint(strategy).join(dept_codes_df, "department")
        print(f"✅ {strategy} join executed successfully")
    except Exception as e:
        print(f"❌ {strategy} join failed: {e}")

## 5. Sorting Techniques and Partitioning Strategies

### Master data ordering, partitioning, and distribution optimization

In [None]:
print("📈 Sorting and Partitioning Techniques\n")

# 1. SORTING TECHNIQUES
print("1. SORTING TECHNIQUES:")

# Basic sorting
print("\nA. Basic sorting by salary (descending):")
sorted_by_salary = employees_df.orderBy(col("salary").desc())
sorted_by_salary.show()

# Multiple column sorting
print("\nB. Multiple column sorting (department asc, salary desc):")
multi_sort = employees_df.orderBy("department", col("salary").desc())
multi_sort.show()

# Sort with null handling
print("\nC. Sort with null handling:")
# Add some null values for demonstration
employees_with_nulls = employees_df.union(
    spark.createDataFrame([(999, "Test User", None, 50000, "2023-01-01")], employees_schema)
)
null_sort = employees_with_nulls.orderBy(col("department").asc_nulls_last())
null_sort.show()

# 2. PARTITIONING STRATEGIES
print("\n2. PARTITIONING STRATEGIES:")

# Check current partitions
print(f"\nA. Current partitions: {employees_df.rdd.getNumPartitions()}")

# Repartition by column for better data locality
print("\nB. Partition by department:")
partitioned_df = employees_df.repartition("department")
print(f"Partitions after repartitioning: {partitioned_df.rdd.getNumPartitions()}")

# Coalesce to reduce partitions
print("\nC. Coalesce to reduce partitions:")
coalesced_df = employees_df.coalesce(2)
print(f"Partitions after coalescing: {coalesced_df.rdd.getNumPartitions()}")

# 3. PARTITION ANALYSIS
print("\n3. PARTITION ANALYSIS:")
def analyze_partitions(df, name):
    partitions = df.rdd.glom().collect()
    print(f"\n{name} partition analysis:")
    for i, partition in enumerate(partitions):
        print(f"Partition {i}: {len(partition)} records")
        if partition:
            print(f"  Sample: {partition[0]}")

analyze_partitions(employees_df, "Original")
analyze_partitions(partitioned_df, "Repartitioned by department")

## 6. Performance Optimization Parameters

### Complete guide to Spark configuration and tuning parameters

In [None]:
print("⚡ Performance Optimization Parameters Guide\n")

# 1. MEMORY OPTIMIZATION
print("1. MEMORY OPTIMIZATION PARAMETERS:")
memory_configs = {
    "spark.executor.memory": "Amount of memory per executor",
    "spark.executor.memoryFraction": "Fraction for RDD storage/cache", 
    "spark.storage.memoryFraction": "Fraction for storage",
    "spark.executor.memoryOffHeap.enabled": "Enable off-heap memory",
    "spark.executor.memoryOffHeap.size": "Off-heap memory size"
}

for param, description in memory_configs.items():
    try:
        value = spark.conf.get(param)
        print(f"✅ {param}: {value} - {description}")
    except:
        print(f"❌ {param}: Not set - {description}")

# 2. PARALLELISM PARAMETERS
print("\n2. PARALLELISM OPTIMIZATION:")
parallelism_configs = {
    "spark.sql.shuffle.partitions": "Partitions for shuffles",
    "spark.sql.adaptive.coalescePartitions.enabled": "Auto coalesce partitions",
    "spark.sql.adaptive.coalescePartitions.minPartitionNum": "Min partitions after coalesce",
    "spark.sql.files.maxPartitionBytes": "Max bytes per partition when reading",
    "spark.default.parallelism": "Default parallelism level"
}

for param, description in parallelism_configs.items():
    try:
        value = spark.conf.get(param)
        print(f"✅ {param}: {value} - {description}")
    except:
        print(f"❌ {param}: Not set - {description}")

# 3. JOIN OPTIMIZATION PARAMETERS
print("\n3. JOIN OPTIMIZATION PARAMETERS:")
join_configs = {
    "spark.sql.autoBroadcastJoinThreshold": "Threshold for broadcast joins",
    "spark.sql.adaptive.skewJoin.enabled": "Handle skewed joins",
    "spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes": "Skew threshold",
    "spark.sql.bucketing.coalesceBucketsInJoin.enabled": "Coalesce buckets in joins"
}

for param, description in join_configs.items():
    try:
        value = spark.conf.get(param)
        print(f"✅ {param}: {value} - {description}")
    except:
        print(f"❌ {param}: Not set - {description}")

# 4. DYNAMIC OPTIMIZATION
print("\n4. ADAPTIVE QUERY EXECUTION (AQE):")
aqe_configs = {
    "spark.sql.adaptive.enabled": "Enable AQE",
    "spark.sql.adaptive.localShuffleReader.enabled": "Local shuffle reader", 
    "spark.sql.adaptive.skewJoin.enabled": "Skew join optimization",
    "spark.sql.adaptive.coalescePartitions.enabled": "Coalesce partitions"
}

for param, description in aqe_configs.items():
    try:
        value = spark.conf.get(param)
        print(f"✅ {param}: {value} - {description}")
    except:
        print(f"❌ {param}: Not set - {description}")

## 7. Advanced Topics and Best Practices

### Window functions, UDFs, caching, and performance monitoring

In [None]:
print("🎯 Advanced PySpark Techniques\n")

# 1. WINDOW FUNCTIONS
print("1. WINDOW FUNCTIONS:")

# Row number within department
window_dept = Window.partitionBy("department").orderBy(col("salary").desc())
employees_with_rank = employees_df.withColumn("rank_in_dept", row_number().over(window_dept))
print("\nEmployees ranked by salary within department:")
employees_with_rank.show()

# Running total and lag/lead functions
window_salary = Window.orderBy("salary")
advanced_window = employees_df.withColumn("running_total", sum("salary").over(window_salary)) \
                              .withColumn("prev_salary", lag("salary", 1).over(window_salary)) \
                              .withColumn("next_salary", lead("salary", 1).over(window_salary))
print("\nRunning totals and lag/lead:")
advanced_window.show()

# 2. CACHING STRATEGIES
print("\n2. CACHING STRATEGIES:")

# Cache frequently used DataFrame
employees_df.cache()
print(f"✅ DataFrame cached. Storage level: {employees_df.storageLevel}")

# Different storage levels
from pyspark import StorageLevel
cached_memory_only = employees_df.persist(StorageLevel.MEMORY_ONLY)
cached_memory_disk = employees_df.persist(StorageLevel.MEMORY_AND_DISK)
print("✅ Multiple cache levels applied")

# 3. USER DEFINED FUNCTIONS (UDFs)
print("\n3. USER DEFINED FUNCTIONS:")

# Create a UDF for salary grade
def salary_grade(salary):
    if salary >= 80000:
        return "Senior"
    elif salary >= 65000:
        return "Mid"
    else:
        return "Junior"

# Register UDF
from pyspark.sql.types import StringType
salary_grade_udf = udf(salary_grade, StringType())

# Apply UDF
employees_with_grade = employees_df.withColumn("grade", salary_grade_udf("salary"))
print("\nEmployees with salary grades:")
employees_with_grade.show()

# 4. AGGREGATIONS AND GROUPING
print("\n4. ADVANCED AGGREGATIONS:")

# Complex aggregations
dept_stats = employees_df.groupBy("department") \
    .agg(
        count("employee_id").alias("emp_count"),
        avg("salary").alias("avg_salary"),
        min("salary").alias("min_salary"),
        max("salary").alias("max_salary"),
        stddev("salary").alias("salary_stddev")
    )
print("\nDepartment statistics:")
dept_stats.show()

## 🎓 Summary and Next Steps

### What you've learned in this comprehensive tutorial:

#### ✅ **Core Concepts Mastered:**
- **SparkSession Configuration** - Optimized setup with performance parameters
- **DataFrame Creation** - Multiple sources and schema management
- **All Join Types** - Inner, Outer (Left/Right/Full), Semi, Anti, Cross joins
- **Join Optimization** - Broadcast joins, bucketing, and join hints
- **Sorting & Partitioning** - OrderBy, repartition, coalesce strategies
- **Performance Tuning** - Memory, parallelism, and AQE parameters
- **Advanced Operations** - Window functions, UDFs, and caching

#### 🚀 **Optimization Techniques Covered:**
- Adaptive Query Execution (AQE)
- Broadcast joins for small tables
- Partition pruning and bucketing
- Memory management and garbage collection
- Cache strategies and storage levels
- Skew join handling

#### 📊 **Key Parameters Learned:**
```
spark.sql.adaptive.enabled = true
spark.sql.autoBroadcastJoinThreshold = 10MB
spark.sql.shuffle.partitions = 200
spark.sql.adaptive.skewJoin.enabled = true
spark.sql.adaptive.coalescePartitions.enabled = true
```

#### 🎯 **Next Learning Path:**
1. **Specialized Notebooks** - Dive deeper into specific topics
2. **Real-world Projects** - Apply skills on large datasets  
3. **Streaming Data** - Structured Streaming for real-time processing
4. **MLlib Integration** - Machine learning with Spark
5. **Delta Lake** - Advanced data lake management

#### 💡 **Best Practices to Remember:**
- Always use schemas for better performance
- Cache DataFrames that are used multiple times
- Use broadcast joins for small lookup tables
- Monitor execution plans with `.explain()`
- Partition data by frequently filtered columns
- Use AQE for dynamic optimization

### Ready to become a PySpark expert? Let's continue with specialized topics! 🔥

In [None]:
# Clean up resources
print("🧹 Cleaning up resources...")

# Unpersist cached DataFrames
employees_df.unpersist()
print("✅ Cache cleared")

# Stop SparkSession (optional - usually done at end of application)
# spark.stop()
print("✅ Session ready for next operations")

print("\n🎉 Congratulations! You've completed the comprehensive PySpark tutorial!")
print("📚 Ready for the next advanced topics in the series!")