# PySpark Complete Learning Guide - Beginner to Advanced

This notebook contains a comprehensive collection of PySpark programs organized from beginner to advanced level. Each section includes theoretical concepts, practical examples, and interview-focused problems.

## Table of Contents
1. **Environment Setup & Basic Concepts**
2. **RDD Operations (Foundation)**
3. **DataFrame Basics**
4. **Data Loading & File Operations**
5. **Data Transformations & Actions**
6. **SQL Operations**
7. **Joins & Advanced Operations**
8. **Window Functions**
9. **UDFs (User Defined Functions)**
10. **Performance Optimization**
11. **Streaming (Advanced)**
12. **Interview Questions & Solutions**

## 1. Environment Setup & Basic Concepts

### What is PySpark?
- Python API for Apache Spark
- Distributed computing framework
- Handles big data processing across clusters
- Supports SQL, streaming, machine learning, and graph processing

In [None]:
# Install PySpark (run this if not already installed)
# !pip install pyspark

# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd

# Create SparkSession - Entry point for all Spark functionality
spark = SparkSession.builder \
    .appName("PySpark Learning") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

# Set log level to reduce verbose output
spark.sparkContext.setLogLevel("WARN")

print("SparkSession created successfully!")
print(f"Spark Version: {spark.version}")
print(f"Python Version: {spark.sparkContext.pythonVer}")

# Basic Spark Context information
sc = spark.sparkContext
print(f"Application Name: {sc.appName}")
print(f"Master: {sc.master}")

## 2. RDD Operations (Foundation)

**RDD (Resilient Distributed Dataset)** is the fundamental data structure of Spark. Understanding RDDs is crucial for interviews.

### Key Concepts:
- **Immutable**: Once created, cannot be changed
- **Distributed**: Spread across multiple nodes
- **Fault-tolerant**: Can recover from node failures
- **Lazy Evaluation**: Transformations are not executed until an action is called

In [None]:
# BEGINNER: Creating RDDs and Basic Operations

# 1. Creating RDDs from collections
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd_numbers = sc.parallelize(numbers)

# 2. Basic transformations (lazy operations)
squared_rdd = rdd_numbers.map(lambda x: x ** 2)
even_rdd = rdd_numbers.filter(lambda x: x % 2 == 0)
doubled_rdd = rdd_numbers.map(lambda x: x * 2)

# 3. Basic actions (trigger execution)
print("Original numbers:", rdd_numbers.collect())
print("Squared numbers:", squared_rdd.collect())
print("Even numbers:", even_rdd.collect())
print("First 3 numbers:", rdd_numbers.take(3))
print("Count:", rdd_numbers.count())
print("Sum:", rdd_numbers.reduce(lambda x, y: x + y))

# 4. flatMap example
words_list = [["hello", "world"], ["spark", "python"], ["big", "data"]]
words_rdd = sc.parallelize(words_list)
flattened_rdd = words_rdd.flatMap(lambda x: x)
print("Original nested list:", words_rdd.collect())
print("Flattened list:", flattened_rdd.collect())

In [None]:
# INTERMEDIATE: Key-Value RDD Operations (Important for interviews)

# 1. Creating key-value pairs
students_data = [("Alice", 85), ("Bob", 90), ("Alice", 95), ("Charlie", 78), ("Bob", 88)]
students_rdd = sc.parallelize(students_data)

# 2. reduceByKey - combine values for the same key
total_scores = students_rdd.reduceByKey(lambda x, y: x + y)
print("Total scores by student:", total_scores.collect())

# 3. groupByKey - group all values for each key
grouped_scores = students_rdd.groupByKey().mapValues(list)
print("All scores by student:", grouped_scores.collect())

# 4. mapValues - transform only values
students_with_grade = students_rdd.mapValues(lambda score: "A" if score >= 90 else "B" if score >= 80 else "C")
print("Students with grades:", students_with_grade.collect())

# 5. keys() and values()
print("All student names:", students_rdd.keys().distinct().collect())
print("All scores:", students_rdd.values().collect())

# 6. sortByKey
sorted_students = students_rdd.sortByKey(ascending=True)
print("Sorted by name:", sorted_students.collect())

## 3. DataFrame Basics

**DataFrames** are the most commonly used abstraction in PySpark. They provide:
- Higher-level API than RDDs
- Better performance optimization
- Schema enforcement
- SQL-like operations

### Key Interview Points:
- DataFrames are built on top of RDDs
- They use Catalyst optimizer for query optimization
- Schema is known at compile time

In [None]:
# BEGINNER: Creating DataFrames and Basic Operations

# 1. Creating DataFrames from various sources
employee_data = [
    (1, "John", "Engineering", 75000, 28),
    (2, "Sarah", "Marketing", 65000, 32),
    (3, "Mike", "Engineering", 80000, 35),
    (4, "Lisa", "HR", 60000, 29),
    (5, "David", "Engineering", 85000, 31),
    (6, "Emma", "Marketing", 70000, 27)
]

# Define schema
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("department", StringType(), True),
    StructField("salary", IntegerType(), True),
    StructField("age", IntegerType(), True)
])

# Create DataFrame
df = spark.createDataFrame(employee_data, schema)

# 2. Basic DataFrame operations
print("=== Basic DataFrame Info ===")
df.printSchema()
print(f"Number of rows: {df.count()}")
print(f"Number of columns: {len(df.columns)}")
print("Column names:", df.columns)

# 3. Display data
print("\n=== Show DataFrame ===")
df.show()

# 4. Basic statistics
print("\n=== Basic Statistics ===")
df.describe().show()

# 5. Select specific columns
print("\n=== Select Columns ===")
df.select("name", "department", "salary").show()

In [None]:
# BEGINNER-INTERMEDIATE: DataFrame Filtering, Transformations & Aggregations

# 1. Filtering operations
print("=== Filtering Examples ===")

# Filter by single condition
high_salary_df = df.filter(df.salary > 70000)
print("Employees with salary > 70000:")
high_salary_df.show()

# Filter by multiple conditions
engineering_high_salary = df.filter((df.department == "Engineering") & (df.salary > 75000))
print("Engineering employees with salary > 75000:")
engineering_high_salary.show()

# 2. Column operations
print("\n=== Column Operations ===")

# Add new column
df_with_bonus = df.withColumn("bonus", df.salary * 0.1)
df_with_bonus.select("name", "salary", "bonus").show()

# Rename column
df_renamed = df.withColumnRenamed("name", "employee_name")
df_renamed.select("employee_name", "department").show()

# 3. Sorting
print("\n=== Sorting Examples ===")
df.orderBy("salary", ascending=False).show()

# 4. Aggregations
print("\n=== Aggregation Examples ===")
from pyspark.sql.functions import avg, sum, count, max, min

# Basic aggregations
print("Average salary:", df.agg(avg("salary")).collect()[0][0])
print("Total salary budget:", df.agg(sum("salary")).collect()[0][0])

# GroupBy operations
dept_stats = df.groupBy("department").agg(
    count("*").alias("employee_count"),
    avg("salary").alias("avg_salary"),
    max("salary").alias("max_salary")
)
dept_stats.show()

## 4. Joins (Very Important for Interviews)

Joins are one of the most commonly asked topics in PySpark interviews. Understanding different types of joins and their performance implications is crucial.

In [None]:
# INTERMEDIATE-ADVANCED: All Types of Joins

# Create additional DataFrames for join examples
departments_data = [
    ("Engineering", "Tech", "Building A"),
    ("Marketing", "Business", "Building B"),
    ("HR", "Support", "Building C"),
    ("Finance", "Business", "Building B")
]

dept_schema = StructType([
    StructField("department", StringType(), True),
    StructField("category", StringType(), True),
    StructField("location", StringType(), True)
])

departments_df = spark.createDataFrame(departments_data, dept_schema)

print("=== Original DataFrames ===")
print("Employees:")
df.show()
print("Departments:")
departments_df.show()

# 1. INNER JOIN (most common)
print("\n=== INNER JOIN ===")
inner_join_result = df.join(departments_df, "department", "inner")
inner_join_result.select("name", "department", "salary", "category", "location").show()

# 2. LEFT JOIN (LEFT OUTER)
print("\n=== LEFT JOIN ===")
left_join_result = df.join(departments_df, "department", "left")
left_join_result.select("name", "department", "salary", "category", "location").show()

# 3. RIGHT JOIN (RIGHT OUTER)
print("\n=== RIGHT JOIN ===")
right_join_result = df.join(departments_df, "department", "right")
right_join_result.select("name", "department", "salary", "category", "location").show()

# 4. FULL OUTER JOIN
print("\n=== FULL OUTER JOIN ===")
full_join_result = df.join(departments_df, "department", "outer")
full_join_result.select("name", "department", "salary", "category", "location").show()

## 5. Window Functions (Advanced - Frequently Asked in Interviews)

Window functions are very powerful and commonly asked in advanced PySpark interviews. They allow you to perform calculations across a set of rows related to the current row.

In [None]:
# ADVANCED: Window Functions Examples

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, lag, lead, first, last

# Create dataset for window functions
sales_data = [
    ("Alice", "Q1", 2023, 1000),
    ("Alice", "Q2", 2023, 1200),
    ("Alice", "Q3", 2023, 1100),
    ("Alice", "Q4", 2023, 1300),
    ("Bob", "Q1", 2023, 800),
    ("Bob", "Q2", 2023, 900),
    ("Bob", "Q3", 2023, 950),
    ("Bob", "Q4", 2023, 1000),
    ("Charlie", "Q1", 2023, 1500),
    ("Charlie", "Q2", 2023, 1400),
    ("Charlie", "Q3", 2023, 1600),
    ("Charlie", "Q4", 2023, 1700)
]

sales_schema = StructType([
    StructField("salesperson", StringType(), True),
    StructField("quarter", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("sales_amount", IntegerType(), True)
])

sales_df = spark.createDataFrame(sales_data, sales_schema)
sales_df.show()

# 1. ROW_NUMBER, RANK, DENSE_RANK
print("\n=== Ranking Functions ===")
window_spec = Window.partitionBy("salesperson").orderBy(col("sales_amount").desc())

ranked_df = sales_df.withColumn("row_number", row_number().over(window_spec)) \
                   .withColumn("rank", rank().over(window_spec)) \
                   .withColumn("dense_rank", dense_rank().over(window_spec))
ranked_df.show()

# 2. LAG and LEAD functions
print("\n=== LAG and LEAD Functions ===")
time_window = Window.partitionBy("salesperson").orderBy("quarter")

lag_lead_df = sales_df.withColumn("previous_quarter_sales", lag("sales_amount", 1).over(time_window)) \
                     .withColumn("next_quarter_sales", lead("sales_amount", 1).over(time_window)) \
                     .withColumn("sales_growth", 
                               col("sales_amount") - lag("sales_amount", 1).over(time_window))
lag_lead_df.show()

# 3. Cumulative operations
print("\n=== Cumulative Operations ===")
cumulative_window = Window.partitionBy("salesperson").orderBy("quarter") \
                         .rowsBetween(Window.unboundedPreceding, Window.currentRow)

cumulative_df = sales_df.withColumn("cumulative_sales", 
                                   sum("sales_amount").over(cumulative_window)) \
                       .withColumn("running_avg", 
                                 avg("sales_amount").over(cumulative_window))
cumulative_df.show()

## 6. Common PySpark Interview Questions & Solutions

Here are real interview problems that test your PySpark knowledge. Try to solve them before looking at the solutions!

In [None]:
# INTERVIEW QUESTIONS & SOLUTIONS

# Create sample data for interview problems
transactions_data = [
    ("2023-01-15", "A", "Product1", 100, 2),
    ("2023-01-16", "B", "Product2", 150, 1),
    ("2023-01-17", "A", "Product1", 100, 3),
    ("2023-01-18", "C", "Product3", 200, 1),
    ("2023-01-19", "B", "Product2", 150, 2),
    ("2023-01-20", "A", "Product3", 200, 1),
    ("2023-02-01", "C", "Product1", 100, 4),
    ("2023-02-02", "B", "Product1", 100, 1),
]

transactions_schema = StructType([
    StructField("date", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("product", StringType(), True),
    StructField("price", IntegerType(), True),
    StructField("quantity", IntegerType(), True)
])

transactions_df = spark.createDataFrame(transactions_data, transactions_schema)
transactions_df = transactions_df.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))
transactions_df.show()

print("\n" + "="*80)
print("INTERVIEW QUESTION 1: Find the top 3 customers by total purchase amount")
print("="*80)

# Solution 1
top_customers = transactions_df.groupBy("customer_id") \
    .agg(sum(col("price") * col("quantity")).alias("total_amount")) \
    .orderBy(col("total_amount").desc()) \
    .limit(3)
top_customers.show()

print("\n" + "="*80)
print("INTERVIEW QUESTION 2: Find customers who bought the same product more than once")
print("="*80)

# Solution 2
repeat_customers = transactions_df.groupBy("customer_id", "product") \
    .agg(count("*").alias("purchase_count")) \
    .filter(col("purchase_count") > 1) \
    .select("customer_id", "product", "purchase_count")
repeat_customers.show()

print("\n" + "="*80)
print("INTERVIEW QUESTION 3: Find the running total of sales for each customer")
print("="*80)

# Solution 3
customer_window = Window.partitionBy("customer_id").orderBy("date") \
                        .rowsBetween(Window.unboundedPreceding, Window.currentRow)

running_total = transactions_df.withColumn("amount", col("price") * col("quantity")) \
    .withColumn("running_total", sum("amount").over(customer_window)) \
    .select("customer_id", "date", "product", "amount", "running_total") \
    .orderBy("customer_id", "date")
running_total.show()

print("\n" + "="*80)
print("INTERVIEW QUESTION 4: Find the second highest salary in each department")
print("="*80)

# Solution 4: Using our employee data
dept_salary_window = Window.partitionBy("department").orderBy(col("salary").desc())

second_highest = df.withColumn("rank", dense_rank().over(dept_salary_window)) \
                  .filter(col("rank") == 2) \
                  .select("department", "name", "salary")
second_highest.show()

## 7. Performance Optimization (Advanced Interview Topic)

Performance optimization is crucial for PySpark interviews. Understanding how to optimize Spark jobs can set you apart from other candidates.

In [None]:
# ADVANCED: Performance Optimization Techniques

# 1. Caching and Persistence
print("=== Caching Examples ===")
from pyspark import StorageLevel

# Cache a DataFrame that will be used multiple times
cached_df = df.cache()

print("Available Storage Levels:")
print("MEMORY_ONLY:", StorageLevel.MEMORY_ONLY)
print("MEMORY_AND_DISK:", StorageLevel.MEMORY_AND_DISK)
print("DISK_ONLY:", StorageLevel.DISK_ONLY)

# Example: Multiple operations on cached data
print("\nOperations on cached DataFrame:")
print("Count:", cached_df.count())
print("Average salary:", cached_df.agg(avg("salary")).collect()[0][0])
print("Max salary:", cached_df.agg(max("salary")).collect()[0][0])

# Unpersist when done
cached_df.unpersist()

# 2. Partitioning
print("\n=== Partitioning Examples ===")

# Check current partitions
print(f"Current partitions: {df.rdd.getNumPartitions()}")

# Repartition for better parallelism
repartitioned_df = df.repartition(4)
print(f"After repartitioning: {repartitioned_df.rdd.getNumPartitions()}")

# Repartition by column (useful for joins and aggregations)
partitioned_by_dept = df.repartition("department")
print("Partitioned by department")

# 3. Performance Best Practices
print("\n=== Performance Best Practices ===")
performance_tips = [
    "1. Use appropriate file formats (Parquet for analytics)",
    "2. Partition data by frequently filtered columns",
    "3. Use broadcast joins for small tables (<200MB)",
    "4. Cache DataFrames used multiple times",
    "5. Use columnar storage formats",
    "6. Avoid UDFs when built-in functions are available",
    "7. Use appropriate number of partitions (2-4x number of cores)",
    "8. Push down filters as early as possible",
    "9. Use bucketing for large tables that are frequently joined",
    "10. Monitor and tune garbage collection"
]

for tip in performance_tips:
    print(tip)

## 8. Study Plan for PySpark Interviews

### Week 1-2: Foundations
- Master RDD operations (map, filter, reduce, etc.)
- Understand DataFrames and basic operations
- Practice creating different types of schemas
- Learn file I/O operations (reading/writing different formats)

### Week 3-4: Intermediate Concepts
- Master all types of joins and their performance implications
- Practice aggregations and window functions extensively
- Learn SQL operations in PySpark
- Understand partitioning and data skewness

### Week 5-6: Advanced Topics
- Performance optimization techniques
- UDFs and when to use them
- Streaming concepts (if applicable to your role)
- Error handling and debugging

### Week 7-8: Interview Preparation
- Practice coding problems without looking at solutions first
- Time yourself solving problems
- Practice explaining your solutions
- Review Spark architecture and internals

## Key Interview Topics to Master

### Technical Questions You Should Be Ready For:
1. **Explain Spark Architecture** - Driver, Executors, Cluster Manager
2. **RDD vs DataFrame vs Dataset** - When to use which
3. **Lazy Evaluation** - How it works and benefits
4. **Partitioning Strategies** - Hash vs Range partitioning
5. **Join Strategies** - Broadcast, Sort-merge, Hash joins
6. **Performance Tuning** - Common bottlenecks and solutions
7. **Memory Management** - How Spark manages memory
8. **Fault Tolerance** - How Spark handles failures

### Coding Patterns to Practice:
1. Data cleaning and transformation pipelines
2. Complex aggregations with multiple grouping levels
3. Window function problems (ranking, running totals, etc.)
4. Join optimization scenarios
5. Handling null values and data quality issues

## Final Tips for Success

### During the Interview:
- Always explain your thought process
- Consider edge cases (null values, empty datasets)
- Discuss performance implications of your solutions
- Ask clarifying questions about data size and requirements
- Be prepared to optimize your initial solution

### Code Quality:
- Write readable and well-structured code
- Use meaningful variable names
- Add comments for complex logic
- Consider error handling

### Remember:
- Practice is key - solve problems regularly
- Understand the "why" behind each solution
- Be familiar with Spark UI for debugging
- Know when to use which optimization technique

Good luck with your PySpark interviews! 🚀

In [None]:
# Cleanup - Always close SparkSession when done
print("Stopping SparkSession...")
spark.stop()
print("SparkSession stopped successfully!")