# 1. SparkSession - Entry Point

## üìñ What is SparkSession?

**SparkSession** is the **unified entry point** for all Spark functionality in PySpark 2.0+. It replaced the older SparkContext, SQLContext, and HiveContext.

**Key Features:**
- Single entry point for DataFrame, SQL, and Streaming APIs
- Automatically creates SparkContext internally
- Supports Hive integration
- Configuration management
- Catalog access (tables, databases, functions)

**Structure:**
```python
spark = SparkSession.builder \
    .appName("MyApp") \
    .config("key", "value") \
    .getOrCreate()
```

## üéØ Why Use SparkSession?

### **Advantages:**
1. **Unified Interface** - One object for all Spark operations
2. **Simplified API** - Easier than managing multiple contexts
3. **Built-in Optimization** - Catalyst optimizer for DataFrames
4. **Distributed Computing** - Process TBs of data across clusters
5. **Lazy Evaluation** - Optimizes entire query plan before execution
6. **Fault Tolerance** - Automatic recovery from failures

### **Disadvantages:**
1. **Overhead** - Too heavy for small datasets (< 1 GB)
2. **Learning Curve** - Different from pandas
3. **Memory Requirements** - Needs significant RAM
4. **Setup Complexity** - Cluster configuration can be complex

## ‚è±Ô∏è When to Use SparkSession

### ‚úÖ **Use When:**

**1. Big Data Processing (> 10 GB)**
- Example: Process 100 GB of log files
- Why: Pandas loads everything in memory, Spark distributes
- Benefit: Can process data larger than single machine RAM

**2. Distributed Computing Needed**
- Example: Analyze 1 TB of customer transactions
- Why: Split work across 10+ machines
- Speed: Linear scaling with cluster size

**3. ETL Pipelines**
- Example: Daily data warehouse updates
- Why: Read from multiple sources, transform, load to warehouse
- Tools: Integrates with HDFS, S3, Kafka, databases

**4. Machine Learning at Scale**
- Example: Train model on 50 million records
- Why: MLlib distributed algorithms
- Benefit: Training on full dataset, not samples

**5. Real-Time Stream Processing**
- Example: Process sensor data from 10,000 devices
- Why: Structured Streaming handles high throughput
- Use case: IoT, fraud detection, monitoring

**6. SQL on Big Data**
- Example: Run SQL queries on 500 GB Parquet files
- Why: SparkSQL with query optimization
- Benefit: Familiar SQL interface for big data

### ‚ùå **Don't Use When:**

**1. Small Data (< 1 GB)**
- Problem: Overhead > benefit
- Better: Use pandas (faster, simpler)
- Why: Spark startup time not worth it

**2. Need Interactive Analysis**
- Problem: Lazy evaluation delays feedback
- Better: Jupyter + pandas for exploration
- Why: Immediate results matter more

**3. Complex Custom Logic**
- Problem: UDFs slow, hard to optimize
- Better: Use built-in Spark functions when possible
- Why: Python UDFs bypass Catalyst optimizer

**4. Low-Latency Required (< 100ms)**
- Problem: Spark adds latency
- Better: Redis, Cassandra, in-memory databases
- Why: Spark designed for throughput, not latency

**5. No Cluster Available**
- Problem: Local mode slower than pandas for small data
- Better: pandas or Dask for single machine
- Why: Spark shines with distributed infrastructure

## üìä How It Works

**Architecture:**
1. **Driver Program** - Your Python script with SparkSession
2. **Cluster Manager** - YARN, Mesos, Kubernetes, or Standalone
3. **Executors** - Worker processes on cluster nodes
4. **Tasks** - Units of work sent to executors

**Execution Flow:**
1. Create SparkSession
2. Define transformations (lazy)
3. Call action (trigger execution)
4. Catalyst optimizer creates execution plan
5. Tasks distributed to executors
6. Results collected to driver

## üåç Real-World Applications

1. **Netflix** - Recommendation engine (billions of events)
2. **Uber** - Ride analytics, surge pricing
3. **Airbnb** - Search ranking, pricing optimization
4. **LinkedIn** - User analytics, job recommendations
5. **Financial Services** - Fraud detection, risk analysis
6. **Healthcare** - Patient data analysis, drug discovery
7. **E-commerce** - Customer behavior, inventory optimization

## üí° Key Insights

‚úÖ Always use SparkSession (not SparkContext) for PySpark 2.0+  
‚úÖ Call `.getOrCreate()` to reuse existing session  
‚úÖ Set meaningful appName for monitoring  
‚úÖ Configure memory: `.config("spark.executor.memory", "4g")`  
‚úÖ Stop session when done: `spark.stop()`  
‚úÖ Use `.master("local[*]")` for local testing  
‚úÖ Access SparkContext: `spark.sparkContext`  
‚úÖ Lazy evaluation - nothing happens until action called

In [None]:
# SPARKSESSION - COMPLETE EXAMPLE

print("="*80)
print("PYSPARK SPARKSESSION - COMPREHENSIVE GUIDE")
print("="*80)

# NOTE: Install PySpark first: pip install pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sum, count, max, min
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
import time

# 1. CREATE SPARKSESSION - BASIC
print("\n1. CREATING SPARKSESSION (BASIC)")
print("-"*80)

# Simple creation
spark = SparkSession.builder \
    .appName("PySpark Tutorial") \
    .getOrCreate()

print(f"‚úì SparkSession created successfully!")
print(f"  App Name: {spark.sparkContext.appName}")
print(f"  Spark Version: {spark.version}")
print(f"  Master: {spark.sparkContext.master}")

# 2. CREATE SPARKSESSION - ADVANCED CONFIGURATION
print("\n2. CREATING SPARKSESSION (ADVANCED CONFIGURATION)")
print("-"*80)

# Stop existing session
spark.stop()

# Create with detailed configuration
spark = SparkSession.builder \
    .appName("Advanced PySpark Tutorial") \
    .master("local[*]") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.ui.showConsoleProgress", "false") \
    .getOrCreate()

print(f"‚úì Advanced SparkSession created!")
print(f"\nConfiguration:")
print(f"  Master: {spark.sparkContext.master} (local mode, all cores)")
print(f"  Executor Memory: {spark.conf.get('spark.executor.memory')}")
print(f"  Driver Memory: {spark.conf.get('spark.driver.memory')}")
print(f"  Shuffle Partitions: {spark.conf.get('spark.sql.shuffle.partitions')}")

# 3. CREATE SAMPLE DATAFRAME
print("\n3. CREATING SAMPLE DATAFRAME")
print("-"*80)

# Sample data - E-commerce sales
data = [
    ("TX001", "Laptop", "Electronics", 1200, 2, "2024-01-15", "New York"),
    ("TX002", "Mouse", "Accessories", 25, 5, "2024-01-15", "London"),
    ("TX003", "Keyboard", "Accessories", 75, 3, "2024-01-16", "Paris"),
    ("TX004", "Monitor", "Electronics", 300, 1, "2024-01-16", "Tokyo"),
    ("TX005", "Laptop", "Electronics", 1200, 1, "2024-01-17", "New York"),
    ("TX006", "Headphones", "Accessories", 150, 2, "2024-01-17", "London"),
    ("TX007", "Webcam", "Electronics", 80, 4, "2024-01-18", "Berlin"),
    ("TX008", "Mouse", "Accessories", 25, 10, "2024-01-18", "Paris"),
    ("TX009", "Monitor", "Electronics", 300, 2, "2024-01-19", "Tokyo"),
    ("TX010", "Keyboard", "Accessories", 75, 5, "2024-01-19", "New York")
]

columns = ["transaction_id", "product", "category", "price", "quantity", "date", "city"]

# Create DataFrame
df = spark.createDataFrame(data, columns)

print(f"‚úì DataFrame created with {df.count()} rows")
print(f"\nSchema:")
df.printSchema()

print(f"\nFirst 5 rows:")
df.show(5, truncate=False)

# 4. BASIC DATAFRAME OPERATIONS
print("\n4. BASIC DATAFRAME OPERATIONS")
print("-"*80)

# Select columns
print("Select specific columns:")
df.select("product", "price", "quantity").show(3)

# Filter data
print("\nFilter: Electronics only")
df.filter(col("category") == "Electronics").show(3)

# Add calculated column
print("\nAdd 'revenue' column (price * quantity):")
df_with_revenue = df.withColumn("revenue", col("price") * col("quantity"))
df_with_revenue.select("product", "price", "quantity", "revenue").show(5)

# 5. AGGREGATIONS
print("\n5. AGGREGATION OPERATIONS")
print("-"*80)

# Overall statistics
print("Overall Statistics:")
df_with_revenue.agg(
    count("*").alias("total_transactions"),
    sum("revenue").alias("total_revenue"),
    avg("revenue").alias("avg_revenue"),
    max("revenue").alias("max_revenue"),
    min("revenue").alias("min_revenue")
).show()

# Group by category
print("\nRevenue by Category:")
category_stats = df_with_revenue.groupBy("category").agg(
    count("*").alias("transactions"),
    sum("revenue").alias("total_revenue"),
    avg("revenue").alias("avg_revenue")
).orderBy(col("total_revenue").desc())

category_stats.show()

# Group by city
print("\nRevenue by City:")
city_stats = df_with_revenue.groupBy("city").agg(
    sum("revenue").alias("total_revenue")
).orderBy(col("total_revenue").desc())

city_stats.show()

# 6. SQL QUERIES
print("\n6. SQL QUERIES")
print("-"*80)

# Register as temporary view
df_with_revenue.createOrReplaceTempView("sales")

print("SQL Query: Top 5 products by revenue")
sql_result = spark.sql("""
    SELECT 
        product,
        category,
        SUM(revenue) as total_revenue,
        COUNT(*) as transactions
    FROM sales
    GROUP BY product, category
    ORDER BY total_revenue DESC
    LIMIT 5
""")

sql_result.show()

# Complex SQL query
print("\nSQL Query: Products with revenue > $500")
spark.sql("""
    SELECT 
        product,
        SUM(revenue) as total_revenue,
        ROUND(AVG(price), 2) as avg_price
    FROM sales
    GROUP BY product
    HAVING SUM(revenue) > 500
    ORDER BY total_revenue DESC
""").show()

# 7. PERFORMANCE DEMONSTRATION
print("\n7. PERFORMANCE DEMONSTRATION")
print("-"*80)

# Create larger dataset
print("Creating larger dataset for performance test...")
large_data = [(f"TX{i}", f"Product{i%10}", "Category" + str(i%3), 
               i%1000 + 10, i%5 + 1, f"2024-01-{(i%28)+1:02d}", 
               ["NYC", "London", "Paris", "Tokyo"][i%4]) 
              for i in range(10000)]

large_df = spark.createDataFrame(large_data, columns)
large_df = large_df.withColumn("revenue", col("price") * col("quantity"))

print(f"‚úì Created DataFrame with {large_df.count():,} rows")

# Benchmark aggregation
print("\nBenchmarking aggregation on 10,000 rows...")
start_time = time.time()

result = large_df.groupBy("category").agg(
    sum("revenue").alias("total_revenue"),
    count("*").alias("count")
).collect()  # Action triggers execution

elapsed_time = time.time() - start_time

print(f"‚úì Aggregation completed in {elapsed_time:.3f} seconds")
print(f"\nResults:")
for row in result:
    print(f"  {row['category']}: ${row['total_revenue']:,.2f} ({row['count']:,} transactions)")

# 8. LAZY EVALUATION DEMONSTRATION
print("\n8. LAZY EVALUATION DEMONSTRATION")
print("-"*80)

print("Defining transformations (no execution yet)...")

# These are all transformations (lazy)
step1 = df_with_revenue.filter(col("category") == "Electronics")
step2 = step1.select("product", "revenue")
step3 = step2.groupBy("product").agg(sum("revenue").alias("total"))
step4 = step3.orderBy(col("total").desc())

print("‚úì Transformations defined (not executed yet)")
print("  - Filter Electronics")
print("  - Select columns")
print("  - Group by product")
print("  - Order by revenue")

print("\nCalling action (.show()) - triggers execution...")
step4.show()
print("‚úì Execution completed!")

# 9. ACCESSING SPARKSESSION PROPERTIES
print("\n9. ACCESSING SPARKSESSION PROPERTIES")
print("-"*80)

print("SparkSession Information:")
print(f"  Application Name: {spark.sparkContext.appName}")
print(f"  Spark Version: {spark.version}")
print(f"  Master URL: {spark.sparkContext.master}")
print(f"  Default Parallelism: {spark.sparkContext.defaultParallelism}")

print(f"\nCatalog Information:")
print(f"  Current Database: {spark.catalog.currentDatabase()}")
print(f"  Tables in current database:")
for table in spark.catalog.listTables():
    print(f"    - {table.name} ({table.tableType})")

# 10. READING AND WRITING DATA
print("\n10. READING AND WRITING DATA")
print("-"*80)

# Write to CSV
output_path = "sales_output.csv"
print(f"Writing DataFrame to CSV: {output_path}")
df_with_revenue.coalesce(1).write.mode("overwrite").option("header", True).csv(output_path)
print(f"‚úì Data written successfully")

# Read from CSV
print(f"\nReading back from CSV...")
df_read = spark.read.option("header", True).option("inferSchema", True).csv(output_path)
print(f"‚úì Data read successfully: {df_read.count()} rows")
df_read.show(3)

# Write to Parquet (columnar format, more efficient)
parquet_path = "sales_output.parquet"
print(f"\nWriting DataFrame to Parquet: {parquet_path}")
df_with_revenue.write.mode("overwrite").parquet(parquet_path)
print(f"‚úì Parquet data written successfully")

# 11. CLEANUP
print("\n11. CLEANUP")
print("-"*80)

# Clean up output files
import shutil
import os

if os.path.exists(output_path):
    shutil.rmtree(output_path)
    print(f"‚úì Removed {output_path}")

if os.path.exists(parquet_path):
    shutil.rmtree(parquet_path)
    print(f"‚úì Removed {parquet_path}")

# Stop SparkSession
print("\nStopping SparkSession...")
spark.stop()
print("‚úì SparkSession stopped")

print("\n" + "="*80)
print("SUMMARY")
print("="*80)
print("‚úì SparkSession is the unified entry point for PySpark")
print("‚úì Use for big data (> 10 GB) and distributed computing")
print("‚úì Supports DataFrames, SQL, Streaming, and MLlib")
print("‚úì Lazy evaluation - transformations build execution plan")
print("‚úì Actions trigger actual computation")
print("‚úì Configure memory, partitions, and other settings")
print("‚úì Always call spark.stop() when done")
print("‚úì Use local mode for testing, cluster mode for production")
print("="*80)

# 2. SparkContext

## üìñ What is SparkContext?

**SparkContext** is the **low-level** connection to the Spark cluster. It's automatically created by SparkSession but can be accessed for RDD operations and cluster configuration.

**Key Features:**
- Creates RDDs (Resilient Distributed Datasets)
- Broadcasts variables to cluster
- Creates accumulators
- Access to cluster configuration
- Job scheduling and monitoring

**Access Pattern:**
```python
sc = spark.sparkContext  # Get from SparkSession
```

## üéØ Why Use SparkContext?

### **Advantages:**
1. **Low-Level Control** - Direct access to RDD API
2. **Performance Tuning** - Configure partitions, parallelism
3. **Broadcast Variables** - Efficiently share read-only data
4. **Accumulators** - Shared counters across cluster
5. **Legacy Code** - Support for older Spark applications

### **Disadvantages:**
1. **No Optimization** - Manual query planning required
2. **Verbose** - More code than DataFrame API
3. **Type Safety** - No schema enforcement
4. **Limited SQL** - Can't use SQL directly

## ‚è±Ô∏è When to Use SparkContext

### ‚úÖ **Use When:**

**1. Working with Unstructured Data**
- Example: Raw text files, logs without schema
- Why: RDDs better for unstructured data
- Use case: `sc.textFile("logs.txt")`

**2. Need Low-Level Transformations**
- Example: Custom partitioning logic
- Why: Full control over data distribution
- Benefit: Optimize for specific use case

**3. Broadcasting Large Variables**
- Example: Share 1 GB lookup table across cluster
- Why: Broadcast once instead of sending with each task
- Performance: 10-100x faster for joins

**4. Shared Counters Needed**
- Example: Count errors across all tasks
- Why: Accumulators thread-safe and efficient
- Use case: Monitoring, debugging

**5. Graph Processing**
- Example: Social network analysis with GraphX
- Why: GraphX built on RDDs
- Alternative: NetworkX for small graphs

### ‚ùå **Don't Use When:**

**1. Structured/Tabular Data**
- Problem: No schema, no optimization
- Better: Use DataFrames
- Why: Catalyst optimizer 2-10x faster

**2. SQL Queries Needed**
- Problem: Can't use SQL on RDDs
- Better: DataFrames with SQL API
- Why: Familiar SQL syntax

**3. New Projects**
- Problem: RDD API is legacy
- Better: Start with DataFrames
- Why: More features, better performance

**4. Type Safety Needed**
- Problem: RDDs don't enforce schema
- Better: DataFrames or Datasets
- Why: Catch errors at compile time

## üìä How It Works

**RDD Workflow:**
1. Create RDD from data source
2. Apply transformations (map, filter, etc.)
3. Call action (collect, count, save)
4. Spark builds DAG (Directed Acyclic Graph)
5. Distributes tasks to executors
6. Results returned to driver

**Key Concepts:**
- **Transformations** - Lazy operations (map, filter, flatMap)
- **Actions** - Trigger execution (collect, count, save)
- **Lineage** - Track transformations for fault tolerance
- **Partitions** - Data split across cluster

## üåç Real-World Applications

1. **Log Analysis** - Parse unstructured log files
2. **Text Mining** - Natural language processing
3. **Clickstream Analysis** - User behavior tracking
4. **Graph Analytics** - Social networks, fraud detection
5. **Sensor Data** - IoT time series processing
6. **Legacy Systems** - Migrate old RDD code

## üí° Key Insights

‚úÖ Access via `spark.sparkContext` (don't create directly)  
‚úÖ Use for unstructured data and low-level operations  
‚úÖ DataFrames preferred for structured data  
‚úÖ Broadcast variables for large read-only data  
‚úÖ Accumulators for shared counters  
‚úÖ RDD lineage provides fault tolerance  
‚úÖ Partitions controlled via `sc.parallelize(data, numPartitions)`  
‚úÖ Convert RDD to DataFrame: `rdd.toDF()`

In [None]:
# SPARKCONTEXT - COMPLETE EXAMPLE

print("="*80)
print("PYSPARK SPARKCONTEXT - COMPREHENSIVE GUIDE")
print("="*80)

from pyspark.sql import SparkSession
from operator import add
import re

# 1. CREATE SPARKSESSION AND ACCESS SPARKCONTEXT
print("\n1. CREATING SPARKSESSION AND ACCESSING SPARKCONTEXT")
print("-"*80)

spark = SparkSession.builder \
    .appName("SparkContext Tutorial") \
    .master("local[*]") \
    .getOrCreate()

# Access SparkContext
sc = spark.sparkContext

print(f"‚úì SparkContext accessed successfully")
print(f"  Application ID: {sc.applicationId}")
print(f"  App Name: {sc.appName}")
print(f"  Master: {sc.master}")
print(f"  Default Parallelism: {sc.defaultParallelism}")
print(f"  Default Min Partitions: {sc.defaultMinPartitions}")

# 2. CREATING RDDs
print("\n2. CREATING RDDs (RESILIENT DISTRIBUTED DATASETS)")
print("-"*80)

# Method 1: Parallelize a collection
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
numbers_rdd = sc.parallelize(numbers, numSlices=4)  # 4 partitions

print(f"Method 1: Parallelize collection")
print(f"  Data: {numbers}")
print(f"  RDD Partitions: {numbers_rdd.getNumPartitions()}")
print(f"  First 5 elements: {numbers_rdd.take(5)}")

# Method 2: From text data
text_data = [
    "Apache Spark is a unified analytics engine",
    "Spark provides high-level APIs in Java, Scala, Python and R",
    "It also supports SQL queries, streaming data, machine learning and graph processing"
]
text_rdd = sc.parallelize(text_data)

print(f"\nMethod 2: Text data")
print(f"  Number of lines: {text_rdd.count()}")
print(f"  First line: {text_rdd.first()}")

# 3. RDD TRANSFORMATIONS
print("\n3. RDD TRANSFORMATIONS (LAZY)")
print("-"*80)

# map - Transform each element
print("map - Square each number:")
squared = numbers_rdd.map(lambda x: x ** 2)
print(f"  Original: {numbers_rdd.collect()}")
print(f"  Squared: {squared.collect()}")

# filter - Keep elements matching condition
print("\nfilter - Keep even numbers only:")
evens = numbers_rdd.filter(lambda x: x % 2 == 0)
print(f"  Even numbers: {evens.collect()}")

# flatMap - Map then flatten
print("\nflatMap - Split text into words:")
words_rdd = text_rdd.flatMap(lambda line: line.split())
print(f"  Total words: {words_rdd.count()}")
print(f"  First 10 words: {words_rdd.take(10)}")

# distinct - Remove duplicates
print("\ndistinct - Unique words:")
unique_words = words_rdd.map(lambda w: w.lower()).distinct()
print(f"  Unique words count: {unique_words.count()}")
print(f"  Sample words: {unique_words.take(10)}")

# 4. RDD ACTIONS (TRIGGER EXECUTION)
print("\n4. RDD ACTIONS (TRIGGER EXECUTION)")
print("-"*80)

# collect - Return all elements to driver
print("collect - Get all elements:")
all_numbers = numbers_rdd.collect()
print(f"  {all_numbers}")

# count - Number of elements
print(f"\ncount - Total elements: {numbers_rdd.count()}")

# first - First element
print(f"\nfirst - First element: {numbers_rdd.first()}")

# take - First N elements
print(f"\ntake(3) - First 3 elements: {numbers_rdd.take(3)}")

# reduce - Aggregate elements
print("\nreduce - Sum all numbers:")
total = numbers_rdd.reduce(lambda a, b: a + b)
print(f"  Sum: {total}")

# fold - Like reduce with initial value
print("\nfold - Sum with initial value 100:")
total_with_init = numbers_rdd.fold(100, lambda a, b: a + b)
print(f"  Sum: {total_with_init}")

# 5. WORD COUNT EXAMPLE (CLASSIC MAPREDUCE)
print("\n5. WORD COUNT EXAMPLE (CLASSIC MAPREDUCE)")
print("-"*80)

# Sample text
text = [
    "spark is fast",
    "spark is powerful",
    "python is easy",
    "spark and python together"
]

text_rdd = sc.parallelize(text)

# Word count pipeline
word_counts = text_rdd \
    .flatMap(lambda line: line.split()) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(add) \
    .sortBy(lambda x: x[1], ascending=False)

print("Word Count Results:")
for word, count in word_counts.collect():
    print(f"  {word}: {count}")

# 6. PAIR RDD OPERATIONS
print("\n6. PAIR RDD OPERATIONS (KEY-VALUE PAIRS)")
print("-"*80)

# Create pair RDD - Sales by category
sales_data = [
    ("Electronics", 1200),
    ("Books", 50),
    ("Electronics", 800),
    ("Clothing", 150),
    ("Books", 30),
    ("Electronics", 500),
    ("Clothing", 200)
]

sales_rdd = sc.parallelize(sales_data)

# reduceByKey - Sum by category
print("reduceByKey - Total sales by category:")
total_by_category = sales_rdd.reduceByKey(add)
for category, total in total_by_category.collect():
    print(f"  {category}: ${total}")

# groupByKey - Group values by key
print("\ngroupByKey - All sales by category:")
grouped = sales_rdd.groupByKey()
for category, sales in grouped.collect():
    print(f"  {category}: {list(sales)}")

# mapValues - Transform only values
print("\nmapValues - Add 10% tax:")
with_tax = sales_rdd.mapValues(lambda x: x * 1.10)
print(f"  Sample: {with_tax.take(3)}")

# 7. BROADCAST VARIABLES
print("\n7. BROADCAST VARIABLES (EFFICIENT DATA SHARING)")
print("-"*80)

# Large lookup table to share across cluster
category_names = {
    "E": "Electronics",
    "B": "Books",
    "C": "Clothing",
    "F": "Food"
}

# Broadcast the dictionary
broadcast_categories = sc.broadcast(category_names)

print(f"Broadcast variable created")
print(f"  Size: {len(broadcast_categories.value)} categories")
print(f"  Value: {broadcast_categories.value}")

# Use broadcast variable
category_codes = [("E", 100), ("B", 50), ("C", 75), ("E", 150)]
codes_rdd = sc.parallelize(category_codes)

# Map codes to names using broadcast variable
with_names = codes_rdd.map(
    lambda x: (broadcast_categories.value[x[0]], x[1])
)

print("\nUsing broadcast variable to map codes:")
for category, value in with_names.collect():
    print(f"  {category}: ${value}")

# 8. ACCUMULATORS (SHARED COUNTERS)
print("\n8. ACCUMULATORS (SHARED COUNTERS)")
print("-"*80)

# Create accumulator
error_count = sc.accumulator(0)
valid_count = sc.accumulator(0)

# Data with some invalid entries
data_with_errors = ["10", "20", "invalid", "30", "error", "40", "50"]
data_rdd = sc.parallelize(data_with_errors)

def parse_number(value):
    try:
        num = int(value)
        valid_count.add(1)
        return num
    except ValueError:
        error_count.add(1)
        return None

# Process data (accumulators updated)
parsed = data_rdd.map(parse_number).filter(lambda x: x is not None)
result = parsed.collect()  # Trigger action

print(f"Processing complete:")
print(f"  Valid numbers: {valid_count.value}")
print(f"  Errors: {error_count.value}")
print(f"  Parsed values: {result}")

# 9. PARTITIONING
print("\n9. PARTITIONING CONTROL")
print("-"*80)

# Create RDD with specific partitions
data = list(range(1, 101))  # 1 to 100
rdd_2_partitions = sc.parallelize(data, 2)
rdd_4_partitions = sc.parallelize(data, 4)
rdd_8_partitions = sc.parallelize(data, 8)

print(f"Same data with different partitions:")
print(f"  2 partitions: {rdd_2_partitions.getNumPartitions()}")
print(f"  4 partitions: {rdd_4_partitions.getNumPartitions()}")
print(f"  8 partitions: {rdd_8_partitions.getNumPartitions()}")

# repartition - Increase partitions (shuffle)
repartitioned = rdd_2_partitions.repartition(8)
print(f"\nAfter repartition(8): {repartitioned.getNumPartitions()} partitions")

# coalesce - Decrease partitions (no shuffle)
coalesced = rdd_8_partitions.coalesce(2)
print(f"After coalesce(2): {coalesced.getNumPartitions()} partitions")

# 10. RDD PERSISTENCE
print("\n10. RDD PERSISTENCE (CACHING)")
print("-"*80)

# Create expensive computation
expensive_rdd = sc.parallelize(range(1000000)).map(lambda x: x ** 2)

print("Without caching:")
import time
start = time.time()
count1 = expensive_rdd.count()
time1 = time.time() - start
print(f"  First count: {time1:.3f} seconds")

start = time.time()
count2 = expensive_rdd.count()
time2 = time.time() - start
print(f"  Second count: {time2:.3f} seconds (recomputed)")

# Cache the RDD
expensive_rdd.cache()

print("\nWith caching:")
start = time.time()
count3 = expensive_rdd.count()
time3 = time.time() - start
print(f"  First count (cached): {time3:.3f} seconds")

start = time.time()
count4 = expensive_rdd.count()
time4 = time.time() - start
print(f"  Second count (from cache): {time4:.3f} seconds (faster!)")

# Unpersist when done
expensive_rdd.unpersist()

# 11. CLEANUP
print("\n11. CLEANUP")
print("-"*80)

# Unpersist broadcast variable
broadcast_categories.unpersist()
print("‚úì Broadcast variable unpersisted")

# Stop SparkContext (via SparkSession)
spark.stop()
print("‚úì SparkSession stopped")

print("\n" + "="*80)
print("SUMMARY")
print("="*80)
print("‚úì SparkContext provides low-level RDD API")
print("‚úì Access via spark.sparkContext (don't create directly)")
print("‚úì RDDs good for unstructured data, custom logic")
print("‚úì Transformations are lazy (map, filter, flatMap)")
print("‚úì Actions trigger execution (collect, count, reduce)")
print("‚úì Broadcast variables for efficient data sharing")
print("‚úì Accumulators for shared counters")
print("‚úì Control partitioning with parallelize, repartition, coalesce")
print("‚úì Cache expensive computations with .cache()")
print("‚úì Use DataFrames for structured data (preferred)")
print("="*80)