# PySpark Crash Course

This notebook provides a comprehensive introduction to PySpark, covering everything from basic setup to advanced operations.

## Table of Contents
1. [Environment Setup](#1-environment-setup)
2. [SparkSession and SparkContext](#2-sparksession-and-sparkcontext)
3. [RDDs (Resilient Distributed Datasets)](#3-rdds-resilient-distributed-datasets)
4. [DataFrames](#4-dataframes)
5. [Data Loading and Saving](#5-data-loading-and-saving)
6. [Data Transformations](#6-data-transformations)
7. [Data Actions](#7-data-actions)
8. [SQL Operations](#8-sql-operations)
9. [Machine Learning with MLlib](#9-machine-learning-with-mllib)
10. [Performance Optimization](#10-performance-optimization)
11. [Best Practices](#11-best-practices)

## 1. Environment Setup

First, let's import the necessary libraries and set up our Spark environment.

In [1]:
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# Set up environment variables
os.environ['JAVA_HOME'] = '/opt/homebrew/opt/openjdk@17/libexec/openjdk.jdk/Contents/Home'
os.environ['SPARK_HOME'] = '/opt/homebrew/Cellar/apache-spark/4.0.0/libexec'
os.environ['PYSPARK_PYTHON'] = sys.executable

print("Environment setup complete!")
print(f"Java Home: {os.environ.get('JAVA_HOME')}")
print(f"Spark Home: {os.environ.get('SPARK_HOME')}")
print(f"Python: {sys.executable}")

Environment setup complete!
Java Home: /opt/homebrew/opt/openjdk@17/libexec/openjdk.jdk/Contents/Home
Spark Home: /opt/homebrew/Cellar/apache-spark/4.0.0/libexec
Python: /Users/vamsi_mbmax/Documents/Documents - raghuvamsi’s MacBook Pro/VAM_Documents/01_vam_PROJECTS/LEARNING/proj_Productivity/dev_proj_Productivity/practise_prod_python_tools/.venv/bin/python


## 2. SparkSession and SparkContext

SparkSession is the entry point for all Spark functionality. Let's create one and explore its capabilities.

In [2]:
# Create SparkSession
spark = SparkSession.builder \
    .appName("PySpark Crash Course") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

# Get SparkContext from SparkSession
sc = spark.sparkContext

print(f"Spark Version: {spark.version}")
print(f"Spark Context: {sc}")
print(f"Application Name: {sc.appName}")
print(f"Master: {sc.master}")
print(f"Default Parallelism: {sc.defaultParallelism}")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/06/21 17:59:38 WARN Utils: Your hostname, raghuvamsis-MacBook-Pro.local, resolves to a loopback address: 127.0.0.1; using 10.0.0.164 instead (on interface en0)
25/06/21 17:59:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/21 17:59:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark Version: 4.0.0
Spark Context: <SparkContext master=local[*] appName=PySpark Crash Course>
Application Name: PySpark Crash Course
Master: local[*]
Default Parallelism: 10


## 3. RDDs (Resilient Distributed Datasets)

RDDs are the fundamental data structure of Spark. Let's explore creating and manipulating RDDs.

In [3]:
# Creating RDDs
# 1. From a Python collection
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
numbers_rdd = sc.parallelize(numbers)

print("Original numbers:", numbers)
print("RDD partitions:", numbers_rdd.getNumPartitions())
print("RDD collect:", numbers_rdd.collect())

# 2. From a text file (we'll create one first)
sample_text = ["Hello Spark", "PySpark is awesome", "Big Data processing", "Distributed computing"]
text_rdd = sc.parallelize(sample_text)

print("\nText RDD:")
for line in text_rdd.collect():
    print(line)

Original numbers: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
RDD partitions: 10
RDD collect: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Text RDD:
Hello Spark
PySpark is awesome
Big Data processing
Distributed computing


In [4]:
# RDD Transformations (Lazy operations)
print("=== RDD Transformations ===")

# Map transformation
squared_rdd = numbers_rdd.map(lambda x: x ** 2)
print("Squared numbers:", squared_rdd.collect())

# Filter transformation
even_rdd = numbers_rdd.filter(lambda x: x % 2 == 0)
print("Even numbers:", even_rdd.collect())

# FlatMap transformation
words_rdd = text_rdd.flatMap(lambda line: line.split(" "))
print("Words:", words_rdd.collect())

# Distinct transformation
duplicate_numbers = [1, 2, 2, 3, 3, 3, 4, 4, 4, 4]
duplicate_rdd = sc.parallelize(duplicate_numbers)
distinct_rdd = duplicate_rdd.distinct()
print("Distinct numbers:", distinct_rdd.collect())

=== RDD Transformations ===
Squared numbers: [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
Even numbers: [2, 4, 6, 8, 10]
Words: ['Hello', 'Spark', 'PySpark', 'is', 'awesome', 'Big', 'Data', 'processing', 'Distributed', 'computing']
Distinct numbers: [1, 2, 3, 4]


In [None]:
# RDD Actions (Trigger computation)
print("=== RDD Actions ===")

# Count
print(f"Count of numbers: {numbers_rdd.count()}")

# First
print(f"First element: {numbers_rdd.first()}")

# Take
print(f"First 3 elements: {numbers_rdd.take(3)}")

# Reduce
sum_result = numbers_rdd.reduce(lambda a, b: a + b)
print(f"Sum of all numbers: {sum_result}")

# Collect (use carefully with large datasets)
all_numbers = numbers_rdd.collect()
print(f"All numbers: {all_numbers}")

# Sample
sample_data = numbers_rdd.sample(False, 0.5, seed=42).collect()
print(f"Sample (50%): {sample_data}")

## 4. DataFrames

DataFrames provide a higher-level API with better optimization. Let's explore DataFrame operations.

In [None]:
# Creating DataFrames
print("=== Creating DataFrames ===")

# 1. From Python data
data = [("Alice", 25, "Engineer", 75000),
        ("Bob", 30, "Manager", 85000),
        ("Charlie", 35, "Director", 95000),
        ("Diana", 28, "Analyst", 65000),
        ("Eve", 32, "Engineer", 78000)]

columns = ["name", "age", "job", "salary"]
df = spark.createDataFrame(data, columns)

print("DataFrame Schema:")
df.printSchema()

print("\nDataFrame Content:")
df.show()

In [None]:
# 2. From Pandas DataFrame
pandas_df = pd.DataFrame({
    'product': ['A', 'B', 'C', 'D', 'E'],
    'price': [10.5, 25.0, 15.75, 30.0, 12.25],
    'quantity': [100, 50, 75, 25, 80],
    'category': ['Electronics', 'Clothing', 'Electronics', 'Furniture', 'Clothing']
})

products_df = spark.createDataFrame(pandas_df)
print("Products DataFrame:")
products_df.show()

# 3. With explicit schema
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("score", DoubleType(), True),
    StructField("active", BooleanType(), True)
])

test_data = [(1, "Test1", 85.5, True),
             (2, "Test2", 92.0, False),
             (3, "Test3", 78.5, True)]

test_df = spark.createDataFrame(test_data, schema)
print("\nTest DataFrame with explicit schema:")
test_df.printSchema()
test_df.show()

In [None]:
# DataFrame Basic Operations
print("=== DataFrame Basic Operations ===")

# Select columns
print("Select specific columns:")
df.select("name", "salary").show()

# Filter rows
print("\nFilter high earners (salary > 70000):")
df.filter(df.salary > 70000).show()

# Add new column
print("\nAdd bonus column (10% of salary):")
df_with_bonus = df.withColumn("bonus", df.salary * 0.1)
df_with_bonus.show()

# Rename column
print("\nRename 'job' to 'position':")
df.withColumnRenamed("job", "position").show()

# Drop column
print("\nDrop 'age' column:")
df.drop("age").show()

## 5. Data Loading and Saving

Let's explore different ways to load and save data with PySpark.

In [None]:
# Create sample data files
import os

# Create data directory if it doesn't exist
data_dir = "../data/pyspark_samples"
os.makedirs(data_dir, exist_ok=True)

# Save DataFrame as CSV
print("Saving DataFrame as CSV...")
df.coalesce(1).write.mode("overwrite").option("header", "true").csv(f"{data_dir}/employees.csv")

# Save DataFrame as JSON
print("Saving DataFrame as JSON...")
df.coalesce(1).write.mode("overwrite").json(f"{data_dir}/employees.json")

# Save DataFrame as Parquet
print("Saving DataFrame as Parquet...")
df.write.mode("overwrite").parquet(f"{data_dir}/employees.parquet")

print("Data saved successfully!")

In [None]:
# Loading data from different formats
print("=== Loading Data ===")

# Load CSV
print("Loading from CSV:")
csv_df = spark.read.option("header", "true").option("inferSchema", "true").csv(f"{data_dir}/employees.csv")
csv_df.show()
csv_df.printSchema()

# Load JSON
print("\nLoading from JSON:")
json_df = spark.read.json(f"{data_dir}/employees.json")
json_df.show()

# Load Parquet
print("\nLoading from Parquet:")
parquet_df = spark.read.parquet(f"{data_dir}/employees.parquet")
parquet_df.show()
parquet_df.printSchema()

## 6. Data Transformations

Let's explore various data transformation operations in PySpark.

In [None]:
# Advanced DataFrame Transformations
print("=== Advanced Transformations ===")

# Group by and aggregations
print("Group by job and calculate average salary:")
df.groupBy("job").agg(avg("salary").alias("avg_salary"), count("*").alias("count")).show()

# Multiple aggregations
print("\nMultiple aggregations:")
df.groupBy("job").agg(
    avg("salary").alias("avg_salary"),
    min("salary").alias("min_salary"),
    max("salary").alias("max_salary"),
    count("*").alias("count")
).show()

# Window functions
from pyspark.sql.window import Window

print("\nWindow functions - Rank by salary:")
window_spec = Window.orderBy(desc("salary"))
df.withColumn("rank", row_number().over(window_spec)).show()

# Partitioned window
print("\nPartitioned window - Rank within job category:")
window_spec_partitioned = Window.partitionBy("job").orderBy(desc("salary"))
df.withColumn("job_rank", row_number().over(window_spec_partitioned)).show()

In [None]:
# String operations
print("=== String Operations ===")

# String functions
df.select(
    "name",
    upper("name").alias("name_upper"),
    lower("name").alias("name_lower"),
    length("name").alias("name_length"),
    substring("name", 1, 3).alias("name_substr")
).show()

# Conditional operations
print("\nConditional operations:")
df.select(
    "name",
    "age",
    when(col("age") < 30, "Young")
    .when(col("age") < 35, "Middle")
    .otherwise("Senior").alias("age_group")
).show()

# Date operations (let's add some dates)
from datetime import datetime, timedelta

# Add hire dates
hire_dates = [("Alice", datetime(2020, 1, 15)),
              ("Bob", datetime(2019, 6, 10)),
              ("Charlie", datetime(2018, 3, 22)),
              ("Diana", datetime(2021, 9, 5)),
              ("Eve", datetime(2020, 11, 30))]

dates_df = spark.createDataFrame(hire_dates, ["name", "hire_date"])

# Join with original DataFrame
df_with_dates = df.join(dates_df, "name")

print("\nDate operations:")
df_with_dates.select(
    "name",
    "hire_date",
    year("hire_date").alias("hire_year"),
    month("hire_date").alias("hire_month"),
    datediff(current_date(), "hire_date").alias("days_employed")
).show()

## 7. Data Actions and Aggregations

Let's explore various actions and aggregation operations.

In [None]:
# DataFrame Actions
print("=== DataFrame Actions ===")

# Basic statistics
print("DataFrame count:", df.count())
print("DataFrame columns:", df.columns)

# Describe - statistical summary
print("\nStatistical summary:")
df.describe().show()

# Summary statistics for specific columns
print("\nSummary for salary:")
df.select("salary").describe().show()

# Collect data (be careful with large datasets)
print("\nCollected data:")
collected_data = df.collect()
for row in collected_data[:3]:  # Show first 3 rows
    print(row)

# Convert to Pandas (for small datasets)
print("\nConvert to Pandas DataFrame:")
pandas_df = df.toPandas()
print(type(pandas_df))
print(pandas_df.head())

## 8. SQL Operations

PySpark allows you to use SQL queries on DataFrames by creating temporary views.

In [None]:
# SQL Operations
print("=== SQL Operations ===")

# Create temporary view
df.createOrReplaceTempView("employees")
products_df.createOrReplaceTempView("products")

# Basic SQL queries
print("Basic SELECT query:")
spark.sql("SELECT name, salary FROM employees WHERE salary > 70000").show()

print("\nAggregation query:")
spark.sql("""
    SELECT job,
           AVG(salary) as avg_salary,
           COUNT(*) as count
    FROM employees
    GROUP BY job
    ORDER BY avg_salary DESC
""").show()

print("\nComplex query with window functions:")
spark.sql("""
    SELECT name, job, salary,
           RANK() OVER (ORDER BY salary DESC) as overall_rank,
           RANK() OVER (PARTITION BY job ORDER BY salary DESC) as job_rank
    FROM employees
""").show()

# Products analysis
print("\nProducts analysis:")
spark.sql("""
    SELECT category,
           COUNT(*) as product_count,
           AVG(price) as avg_price,
           SUM(quantity) as total_quantity
    FROM products
    GROUP BY category
""").show()

## 9. Machine Learning with MLlib

Let's explore PySpark's machine learning capabilities.

In [None]:
# Machine Learning with MLlib
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

print("=== Machine Learning Example ===")

# Create a larger dataset for ML
import random
random.seed(42)

# Generate synthetic data
ml_data = []
jobs = ['Engineer', 'Manager', 'Director', 'Analyst']
for i in range(100):
    job = random.choice(jobs)
    age = random.randint(22, 60)
    experience = random.randint(0, age - 22)

    # Create salary based on job, age, and experience with some noise
    base_salary = {'Engineer': 70000, 'Manager': 85000, 'Director': 100000, 'Analyst': 60000}[job]
    salary = base_salary + (age * 1000) + (experience * 2000) + random.randint(-10000, 10000)

    ml_data.append((f"Person_{i}", age, experience, job, salary))

ml_df = spark.createDataFrame(ml_data, ["name", "age", "experience", "job", "salary"])

print("ML Dataset:")
ml_df.show(10)
ml_df.describe().show()

In [None]:
# Prepare data for ML
print("=== Data Preparation for ML ===")

# String indexer for categorical variables
job_indexer = StringIndexer(inputCol="job", outputCol="job_index")

# Vector assembler to combine features
assembler = VectorAssembler(
    inputCols=["age", "experience", "job_index"],
    outputCol="features"
)

# Create pipeline for data preparation
prep_pipeline = Pipeline(stages=[job_indexer, assembler])
prep_model = prep_pipeline.fit(ml_df)
prepared_df = prep_model.transform(ml_df)

print("Prepared data:")
prepared_df.select("features", "salary").show(10, truncate=False)

# Split data into training and testing
train_df, test_df = prepared_df.randomSplit([0.8, 0.2], seed=42)
print(f"\nTraining set size: {train_df.count()}")
print(f"Test set size: {test_df.count()}")

In [None]:
# Linear Regression Model
print("=== Linear Regression Model ===")

# Create and train linear regression model
lr = LinearRegression(featuresCol="features", labelCol="salary")
lr_model = lr.fit(train_df)

# Make predictions
predictions = lr_model.transform(test_df)

print("Predictions vs Actual:")
predictions.select("name", "salary", "prediction").show(10)

# Evaluate model
evaluator = RegressionEvaluator(labelCol="salary", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"\nRoot Mean Square Error: {rmse:.2f}")

# Model coefficients
print(f"\nModel Coefficients: {lr_model.coefficients}")
print(f"Model Intercept: {lr_model.intercept:.2f}")

# Model summary
print(f"R-squared: {lr_model.summary.r2:.4f}")

## 10. Performance Optimization

Let's explore various performance optimization techniques in PySpark.

In [None]:
# Performance Optimization Techniques
print("=== Performance Optimization ===")

# 1. Caching
print("1. Caching DataFrames:")
# Cache frequently used DataFrames
df.cache()
print(f"DataFrame is cached: {df.is_cached}")

# Persist with different storage levels
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK)

# 2. Partitioning
print("\n2. Partitioning:")
print(f"Current partitions: {df.rdd.getNumPartitions()}")

# Repartition (expensive operation)
repartitioned_df = df.repartition(4)
print(f"After repartitioning: {repartitioned_df.rdd.getNumPartitions()}")

# Coalesce (cheaper than repartition for reducing partitions)
coalesced_df = repartitioned_df.coalesce(2)
print(f"After coalescing: {coalesced_df.rdd.getNumPartitions()}")

# 3. Broadcast variables (for small lookup tables)
print("\n3. Broadcast Variables:")
job_mapping = {"Engineer": "ENG", "Manager": "MGR", "Director": "DIR", "Analyst": "ANA"}
broadcast_mapping = sc.broadcast(job_mapping)

def map_job_code(job):
    return broadcast_mapping.value.get(job, "UNK")

from pyspark.sql.functions import udf
map_job_udf = udf(map_job_code, StringType())

df.withColumn("job_code", map_job_udf("job")).show()

In [None]:
# 4. Query Optimization
print("=== Query Optimization ===")

# Use explain() to see query plans
print("Query execution plan:")
df.filter(df.salary > 70000).select("name", "salary").explain()

# 5. Join Optimization
print("\n5. Join Optimization:")

# Create a small lookup table
job_info = [("Engineer", "Technical", "IC"),
            ("Manager", "Leadership", "Management"),
            ("Director", "Leadership", "Executive"),
            ("Analyst", "Technical", "IC")]

job_df = spark.createDataFrame(job_info, ["job", "department", "level"])

# Broadcast join (automatic for small tables)
from pyspark.sql.functions import broadcast
joined_df = df.join(broadcast(job_df), "job")
print("Broadcast join result:")
joined_df.show()

# 6. Column pruning and predicate pushdown
print("\n6. Optimized query with column pruning:")
# Only select needed columns and filter early
optimized_query = df.filter(df.salary > 70000).select("name", "job", "salary")
optimized_query.show()

## 11. Best Practices and Tips

Here are some important best practices when working with PySpark.

In [None]:
# Best Practices
print("=== PySpark Best Practices ===")

print("""
1. **Data Formats:**
   - Use Parquet for better performance and compression
   - Avoid CSV for large datasets (slow parsing)
   - Use Delta Lake for ACID transactions

2. **Memory Management:**
   - Cache/persist DataFrames that are used multiple times
   - Use appropriate storage levels (MEMORY_ONLY, MEMORY_AND_DISK, etc.)
   - Unpersist DataFrames when no longer needed

3. **Partitioning:**
   - Partition data by frequently filtered columns
   - Avoid too many small partitions (< 128MB)
   - Use coalesce() instead of repartition() when reducing partitions

4. **Joins:**
   - Use broadcast joins for small tables (< 10MB)
   - Prefer bucketing for large table joins
   - Filter data before joins when possible

5. **UDFs (User Defined Functions):**
   - Avoid UDFs when built-in functions are available
   - Use vectorized UDFs (pandas UDFs) for better performance
   - Consider using SQL expressions instead of UDFs

6. **Resource Management:**
   - Configure executor memory and cores appropriately
   - Monitor Spark UI for performance bottlenecks
   - Use dynamic allocation when possible
""")

# Example of efficient vs inefficient operations
print("\n=== Efficient vs Inefficient Examples ===")

# Inefficient: Multiple actions on same DataFrame
print("Inefficient approach (multiple scans):")
high_earners = df.filter(df.salary > 70000)
count1 = high_earners.count()
avg_salary1 = high_earners.agg(avg("salary")).collect()[0][0]
print(f"Count: {count1}, Avg Salary: {avg_salary1:.2f}")

# Efficient: Cache and reuse
print("\nEfficient approach (cache and reuse):")
high_earners_cached = df.filter(df.salary > 70000).cache()
count2 = high_earners_cached.count()
avg_salary2 = high_earners_cached.agg(avg("salary")).collect()[0][0]
print(f"Count: {count2}, Avg Salary: {avg_salary2:.2f}")

# Clean up cache
high_earners_cached.unpersist()

In [None]:
# Monitoring and Debugging
print("=== Monitoring and Debugging ===")

# Spark UI information
print(f"Spark UI URL: {spark.sparkContext.uiWebUrl}")
print(f"Application ID: {spark.sparkContext.applicationId}")

# Configuration settings
print("\nImportant Spark configurations:")
conf = spark.sparkContext.getConf()
print(f"Executor Memory: {conf.get('spark.executor.memory', 'default')}")
print(f"Executor Cores: {conf.get('spark.executor.cores', 'default')}")
print(f"Default Parallelism: {spark.sparkContext.defaultParallelism}")

# DataFrame lineage
print("\nDataFrame lineage (RDD lineage):")
complex_df = df.filter(df.salary > 70000).select("name", "salary").orderBy("salary")
print(complex_df.rdd.toDebugString().decode())

In [None]:
# Clean up resources
print("=== Cleanup ===")

# Unpersist cached DataFrames
df.unpersist()

# Clear broadcast variables
broadcast_mapping.unpersist()

# Stop Spark session
print("Stopping Spark session...")
spark.stop()
print("Spark session stopped successfully!")

print("""
🎉 Congratulations! You've completed the PySpark Crash Course!

You've learned:
✅ Setting up PySpark environment
✅ Working with RDDs and DataFrames
✅ Data loading and saving in various formats
✅ Data transformations and actions
✅ SQL operations on DataFrames
✅ Machine learning with MLlib
✅ Performance optimization techniques
✅ Best practices and debugging tips

Next steps:
🚀 Practice with real datasets
🚀 Explore Spark Streaming
🚀 Learn about Delta Lake
🚀 Deploy Spark applications on clusters
""")