# ETL Transformations - Practice Notebook

This notebook covers common ETL transformation patterns.

## Topics Covered:
1. Data Cleaning (handling nulls, duplicates)
2. Data Type Conversions
3. String Manipulations
4. Date/Time Operations
5. Joins and Unions
6. Window Functions

In [None]:
import sys
from pathlib import Path

project_root = Path().absolute().parent.parent
sys.path.insert(0, str(project_root / 'src'))

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *

In [None]:
spark = SparkSession.builder \
    .appName("ETL Transformations") \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

## 1. Data Cleaning

In [None]:
# Load data
customers_df = spark.read.csv(
    str(project_root / 'data/sample/customers.csv'),
    header=True,
    inferSchema=True
)

print("Original data:")
customers_df.show()

# Check for nulls
print("\nNull counts:")
customers_df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in customers_df.columns]).show()

In [None]:
# Remove duplicates
print("\nRemoving duplicates...")
cleaned_df = customers_df.dropDuplicates(['customer_id'])
print(f"Before: {customers_df.count()}, After: {cleaned_df.count()}")

# Fill null emails with default
cleaned_df = cleaned_df.fillna({'email': 'no-email@example.com'})
cleaned_df.show()

## 2. Data Type Conversions

In [None]:
# Convert string date to date type
cleaned_df = cleaned_df.withColumn(
    "signup_date",
    F.to_date(F.col("signup_date"), "yyyy-MM-dd")
)

# Cast age to integer if needed
cleaned_df = cleaned_df.withColumn("age", F.col("age").cast(IntegerType()))

cleaned_df.printSchema()

## 3. String Manipulations

In [None]:
# Create full name
transformed_df = cleaned_df.withColumn(
    "full_name",
    F.concat(F.col("first_name"), F.lit(" "), F.col("last_name"))
)

# Convert email to lowercase
transformed_df = transformed_df.withColumn(
    "email",
    F.lower(F.col("email"))
)

# Extract email domain
transformed_df = transformed_df.withColumn(
    "email_domain",
    F.split(F.col("email"), "@")[1]
)

transformed_df.select("full_name", "email", "email_domain").show()

## 4. Date/Time Operations

In [None]:
# Extract year and month from signup_date
transformed_df = transformed_df.withColumn(
    "signup_year",
    F.year(F.col("signup_date"))
).withColumn(
    "signup_month",
    F.month(F.col("signup_date"))
)

# Calculate days since signup
transformed_df = transformed_df.withColumn(
    "days_since_signup",
    F.datediff(F.current_date(), F.col("signup_date"))
)

transformed_df.select("full_name", "signup_date", "signup_year", "signup_month", "days_since_signup").show()

## 5. Joins

In [None]:
# Load orders data
orders_df = spark.read.csv(
    str(project_root / 'data/sample/orders.csv'),
    header=True,
    inferSchema=True
)

print("Orders data:")
orders_df.show()

# Inner join
joined_df = transformed_df.join(
    orders_df,
    transformed_df.customer_id == orders_df.customer_id,
    "inner"
)

print("\nJoined data:")
joined_df.select(
    "full_name", "email", "order_id", "product_name", "price", "status"
).show()

## 6. Window Functions

In [None]:
# Calculate total spending per customer
customer_spending = orders_df.groupBy("customer_id").agg(
    F.sum("price").alias("total_spent"),
    F.count("*").alias("order_count")
)

# Rank customers by spending
window_spec = Window.orderBy(F.col("total_spent").desc())

ranked_customers = customer_spending.withColumn(
    "spending_rank",
    F.row_number().over(window_spec)
)

print("Top customers by spending:")
ranked_customers.show()

## Practice Exercises

1. Create an age category column (18-25, 26-35, 36-45, 46+)
2. Calculate average order value per customer
3. Find customers who have never placed an order (left join)
4. Calculate running total of orders by date
5. Identify customers with more than 2 orders

In [None]:
# Your code here


In [None]:
spark.stop()