# test pyspark install

In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Now you can use PySpark in this notebook
df = spark.createDataFrame([{"hello":"world"}])
df.show()

                                                                                

+-----+
|hello|
+-----+
|world|
+-----+



# stop the spark session

In [2]:
spark.stop()

# start spark with options

In [4]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Now you can use PySpark in this notebook
df = spark.createDataFrame([{"hello":"world"}])
df.show()

+-----+
|hello|
+-----+
|world|
+-----+



# simple spark ETL (extract, transform, and load test

In [4]:
from dateutil.parser import parse
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, initcap, trim, regexp_replace
from pyspark.sql.functions import udf, to_date, datediff, current_date, months_between


@udf("date")
def parse_date(date_str):
    try:
        return parse(date_str).date()
    except ValueError:
        return None  # Handle invalid dates
        
# 1. Create SparkSession
#
# Number of cores per executor (default: 1) \
# Number of executors (default: 2) \
spark = SparkSession.builder \
    .appName("ETL Example") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.cores", 5) \
    .config("spark.executor.instances", 3)  \
    .getOrCreate()

# 2. Extract (Read from CSV)
#
df = spark.read.csv("data/customer_data.csv", header=True, inferSchema=True)
# display the raw data
print('raw data')
df.show()

# 3. Transform
#   a. Clean names (remove leading/trailing spaces, standardize case)
#
df = df.withColumn("firstName", trim(col("firstName")))
df = df.withColumn("lastName", trim(col("lastName")))
df = df.withColumn("firstName", initcap(col("firstName")))
df = df.withColumn("lastName", initcap(col("lastName")))

#   b. Clean phone numbers (remove non-numeric characters)
#
df = df.withColumn("phone", regexp_replace(col("phone"), "[^0-9]", ""))

#   c. Additional transformations (e.g., derive age from birthdate, etc.)
#
# Convert birthdate to date type
df = df.withColumn("birthdate", parse_date("birthdate"))

# Calculate age in years, months, and days
df = df.withColumn("age_years", datediff(current_date(), "birthdate") / 365)
df = df.withColumn("age_months", months_between(current_date(), "birthdate"))
df = df.withColumn("age_days", datediff(current_date(), "birthdate"))

# Cast age columns to integers
df = df.withColumn("age_years", col("age_years").cast("int"))
df = df.withColumn("age_months", col("age_months").cast("int"))
df = df.withColumn("age_days", col("age_days").cast("int"))

# 5. Display the cleaned data
# display the raw data
print('cleaned data')
df.show()

# 4. Load (Write to Parquet - a columnar format ideal for big data)
#
df.write.mode("overwrite").parquet("data/cleaned_customer_data")
# df.write.parquet("data/cleaned_customer_data")

raw data
+---------+---------+-----------------+--------------------+-----------+
|firstName| lastName|            phone|               email|  birthdate|
+---------+---------+-----------------+--------------------+-----------+
|    john |   doe   |  (555) 123-4567 | johndoe@example.com| 1985-03-15|
|     Jane|   SMITH |    555-9876543  | jane.smith@email...|   19921201|
|  ROBERT | WILLIAMS|      5551112222 | robertwilliams@e...| 1978/08/22|
| Mary-Ann|    Brown|   555) 333 4444 |maryannbrown@exam...|2000.06.10 |
+---------+---------+-----------------+--------------------+-----------+

cleaned data
+---------+--------+----------+--------------------+----------+---------+----------+--------+
|firstName|lastName|     phone|               email| birthdate|age_years|age_months|age_days|
+---------+--------+----------+--------------------+----------+---------+----------+--------+
|     John|     Doe|5551234567| johndoe@example.com|1985-03-15|       39|       472|   14368|
|     Jane|   Smi