# PySpark with Docker Example

This notebook demonstrates how to connect to a Spark cluster running in Docker and perform basic operations.

In [None]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import numpy as np
import os

In [3]:
# Initialize Spark Session with proper configuration
spark = SparkSession.builder \
    .appName("Docker Spark Example") \
    .master("spark://spark-master:7077") \
    .config("spark.executor.memory", "1g") \
    .config("spark.executor.cores", "1") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

# Set log level to reduce verbosity
spark.sparkContext.setLogLevel("WARN")

print(f"Spark Session created successfully!")
print(f"Spark Version: {spark.version}")
print(f"Spark Master: {spark.sparkContext.master}")
print(f"Spark App Name: {spark.sparkContext.appName}")

Spark Session created successfully!
Spark Version: 3.5.0
Spark Master: spark://spark-master:7077
Spark App Name: Docker Spark Example


In [4]:
# Check data directory exists and create if needed
data_dir = "/home/jovyan/data"
if not os.path.exists(data_dir):
    os.makedirs(data_dir)
    print(f"Created data directory: {data_dir}")
else:
    print(f"Data directory exists: {data_dir}")

# List contents of data directory
print(f"Contents of {data_dir}: {os.listdir(data_dir) if os.path.exists(data_dir) else 'Directory not found'}")

Data directory exists: /home/jovyan/data
Contents of /home/jovyan/data: ['sample_data.parquet']


In [5]:
# Test basic Spark functionality
print("Testing basic Spark RDD operations...")
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
result = rdd.map(lambda x: x * 2).collect()
print(f"RDD map result: {result}")

Testing basic Spark RDD operations...
RDD map result: [2, 4, 6, 8, 10]


In [6]:
# Create a simple DataFrame
print("Creating DataFrame...")
data = [("Alice", 25, "Engineer"),
        ("Bob", 30, "Data Scientist"),
        ("Charlie", 35, "Manager"),
        ("Diana", 28, "Analyst"),
        ("Eve", 32, "Developer")]

schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Job", StringType(), True)
])

df = spark.createDataFrame(data, schema)
print("DataFrame created successfully!")
df.show()

Creating DataFrame...
DataFrame created successfully!
+-------+---+--------------+
|   Name|Age|           Job|
+-------+---+--------------+
|  Alice| 25|      Engineer|
|    Bob| 30|Data Scientist|
|Charlie| 35|       Manager|
|  Diana| 28|       Analyst|
|    Eve| 32|     Developer|
+-------+---+--------------+



In [7]:
# Perform some basic operations
print("DataFrame Schema:")
df.printSchema()

print(f"\nDataFrame Count: {df.count()}")

print("\nFiltering data (Age > 30):")
df.filter(col("Age") > 30).show()

DataFrame Schema:
root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Job: string (nullable = true)


DataFrame Count: 5

Filtering data (Age > 30):
+-------+---+---------+
|   Name|Age|      Job|
+-------+---+---------+
|Charlie| 35|  Manager|
|    Eve| 32|Developer|
+-------+---+---------+



In [8]:
# Group by and aggregate
print("Average age by job:")
df.groupBy("Job").agg(avg("Age").alias("Average_Age")).show()

Average age by job:
+--------------+-----------+
|           Job|Average_Age|
+--------------+-----------+
|Data Scientist|       30.0|
|      Engineer|       25.0|
|     Developer|       32.0|
|       Analyst|       28.0|
|       Manager|       35.0|
+--------------+-----------+



In [9]:
# Create a larger dataset for demonstration
import random

print("Creating larger dataset...")
# Create sample data
large_data = []
jobs = ["Engineer", "Data Scientist", "Manager", "Analyst", "Developer", "Designer"]

for i in range(1000):
    name = f"Person_{i}"
    age = random.randint(22, 65)
    job = random.choice(jobs)
    salary = random.randint(50000, 150000)
    large_data.append((name, age, job, salary))

large_schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Job", StringType(), True),
    StructField("Salary", IntegerType(), True)
])

large_df = spark.createDataFrame(large_data, large_schema)
print(f"Large dataset created with {large_df.count()} rows")
large_df.show(10)

Creating larger dataset...
Large dataset created with 1000 rows
+--------+---+--------------+------+
|    Name|Age|           Job|Salary|
+--------+---+--------------+------+
|Person_0| 44|      Engineer|135187|
|Person_1| 29|Data Scientist|122086|
|Person_2| 52|      Designer| 65222|
|Person_3| 59|       Manager| 60684|
|Person_4| 49|       Manager| 50889|
|Person_5| 37|     Developer| 65507|
|Person_6| 33|       Manager|142709|
|Person_7| 65|     Developer|115918|
|Person_8| 29|Data Scientist|104199|
|Person_9| 60|       Analyst|120133|
+--------+---+--------------+------+
only showing top 10 rows



In [10]:
# Perform analytics on the larger dataset
print("Statistics by Job:")
stats_df = large_df.groupBy("Job").agg(
    count("*").alias("Count"),
    avg("Age").alias("Avg_Age"),
    avg("Salary").alias("Avg_Salary"),
    max("Salary").alias("Max_Salary"),
    min("Salary").alias("Min_Salary")
).orderBy("Count", ascending=False)

stats_df.show()

Statistics by Job:
+--------------+-----+------------------+------------------+----------+----------+
|           Job|Count|           Avg_Age|        Avg_Salary|Max_Salary|Min_Salary|
+--------------+-----+------------------+------------------+----------+----------+
|       Manager|  187| 43.07486631016043| 101971.2192513369|    149800|     50217|
|      Designer|  166| 44.04216867469879|101986.78313253012|    149920|     50060|
|     Developer|  164|43.390243902439025| 98738.24390243902|    149860|     50028|
|Data Scientist|  162|42.370370370370374| 97090.63580246913|    149957|     50091|
|      Engineer|  162| 43.50617283950617|101106.51851851853|    149921|     50587|
|       Analyst|  159|42.710691823899374| 102314.2075471698|    149951|     51731|
+--------------+-----+------------------+------------------+----------+----------+



In [11]:
# Convert to Pandas for visualization (small dataset)
print("Converting small DataFrame to Pandas:")
pandas_df = df.toPandas()
print(pandas_df)
print(f"\nPandas DataFrame type: {type(pandas_df)}")

Converting small DataFrame to Pandas:
      Name  Age             Job
0    Alice   25        Engineer
1      Bob   30  Data Scientist
2  Charlie   35         Manager
3    Diana   28         Analyst
4      Eve   32       Developer

Pandas DataFrame type: <class 'pandas.core.frame.DataFrame'>


In [12]:
# Save DataFrame as Parquet with error handling
try:
    print("Saving data as Parquet file...")
    
    # Use a path that's accessible to all containers
    parquet_path = "/home/jovyan/data/sample_data.parquet"
    
    # Ensure directory exists
    import os
    os.makedirs(os.path.dirname(parquet_path), exist_ok=True)
    
    # Coalesce to single partition for easier file handling
    large_df.coalesce(1).write.mode("overwrite").parquet(parquet_path)
    print("Data saved successfully!")
    
    # Read it back to verify
    print("Reading Parquet file back...")
    read_df = spark.read.parquet(parquet_path)
    print(f"Read {read_df.count()} rows from Parquet file")
    
    # Show first few rows
    print("First few rows from saved file:")
    read_df.show(5)
    
except Exception as e:
    print(f"Error saving/reading Parquet file: {e}")
    print("Trying alternative approach - saving as CSV instead...")
    
    try:
        csv_path = "/home/jovyan/data/sample_data.csv"
        large_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(csv_path)
        print("Data saved as CSV successfully!")
        
        # Read CSV back
        read_csv_df = spark.read.option("header", "true").option("inferSchema", "true").csv(csv_path)
        print(f"Read {read_csv_df.count()} rows from CSV file")
        
    except Exception as csv_error:
        print(f"Error with CSV: {csv_error}")
        print("File operations may require additional configuration for this environment.")

Saving data as Parquet file...
Data saved successfully!
Reading Parquet file back...
Read 1000 rows from Parquet file
First few rows from saved file:
+--------+---+--------------+------+
|    Name|Age|           Job|Salary|
+--------+---+--------------+------+
|Person_0| 44|      Engineer|135187|
|Person_1| 29|Data Scientist|122086|
|Person_2| 52|      Designer| 65222|
|Person_3| 59|       Manager| 60684|
|Person_4| 49|       Manager| 50889|
+--------+---+--------------+------+
only showing top 5 rows



In [13]:
# Show Spark UI URL and execution summary
print("\n=== Spark Session Summary ===")
print(f"Spark UI: http://localhost:8080")
print(f"Application ID: {spark.sparkContext.applicationId}")
print(f"Default Parallelism: {spark.sparkContext.defaultParallelism}")
print(f"Spark Version: {spark.version}")

# Check if files were created
import os
data_files = [f for f in os.listdir("/home/jovyan/data") if f.endswith(('.parquet', '.csv', '.json'))]
print(f"\nData files created: {data_files if data_files else 'None'}")


=== Spark Session Summary ===
Spark UI: http://localhost:8080
Application ID: app-20250601151341-0000
Default Parallelism: 2
Spark Version: 3.5.0

Data files created: ['sample_data.parquet']


In [14]:
# Stop Spark session
spark.stop()
print("Spark session stopped successfully!")

Spark session stopped successfully!
