# DAY 3 (11/01/26) – PySpark Transformations Deep Dive

## Create  First PySpark Session

In [0]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("PySpark Transformations Tutorial") \
    .getOrCreate()

# Check if Spark is working
print(f"Spark Version: {spark.version}")

Spark Version: 4.0.0


## Task 1: Load E-commerce Dataset

In [0]:
# Create comprehensive e-commerce dataset
customers = spark.createDataFrame([
    (1, 'Alice', 'alice@email.com', 'New York', '2023-01-15'),
    (2, 'Bob', 'bob@email.com', 'London', '2023-02-20'),
    (3, 'Charlie', 'charlie@email.com', 'Paris', '2023-03-10'),
    (4, 'David', 'david@email.com', 'Tokyo', '2023-04-05'),
    (5, 'Eve', 'eve@email.com', 'Berlin', '2023-05-12')
], ['customer_id', 'name', 'email', 'city', 'registration_date'])

orders = spark.createDataFrame([
    (101, 1, '2024-01-01', 250.50, 'Electronics'),
    (102, 2, '2024-01-02', 150.75, 'Clothing'),
    (103, 1, '2024-01-03', 300.00, 'Electronics'),
    (104, 3, '2024-01-04', 75.25, 'Books'),
    (105, 1, '2024-01-05', 120.00, 'Clothing'),
    (106, 4, '2024-01-06', 200.00, 'Electronics'),
    (107, 2, '2024-01-07', 90.50, 'Books'),
    (108, 3, '2024-01-08', 180.00, 'Electronics')
], ['order_id', 'customer_id', 'order_date', 'amount', 'category'])

print("✅ Data loaded successfully!")
customers.show()
orders.show()

✅ Data loaded successfully!
+-----------+-------+-----------------+--------+-----------------+
|customer_id|   name|            email|    city|registration_date|
+-----------+-------+-----------------+--------+-----------------+
|          1|  Alice|  alice@email.com|New York|       2023-01-15|
|          2|    Bob|    bob@email.com|  London|       2023-02-20|
|          3|Charlie|charlie@email.com|   Paris|       2023-03-10|
|          4|  David|  david@email.com|   Tokyo|       2023-04-05|
|          5|    Eve|    eve@email.com|  Berlin|       2023-05-12|
+-----------+-------+-----------------+--------+-----------------+

+--------+-----------+----------+------+-----------+
|order_id|customer_id|order_date|amount|   category|
+--------+-----------+----------+------+-----------+
|     101|          1|2024-01-01| 250.5|Electronics|
|     102|          2|2024-01-02|150.75|   Clothing|
|     103|          1|2024-01-03| 300.0|Electronics|
|     104|          3|2024-01-04| 75.25|      Book

## Task 2: Perform Complex Joins

In [0]:
# Question 1: Which customers have placed orders?
print("Customers with orders (INNER JOIN):")
result = customers.join(orders, on='customer_id', how='inner')
result.select('name', 'email', 'order_id', 'amount', 'category').show()

# Question 2: Show all customers, including those without orders
print("All customers (LEFT JOIN):")
result = customers.join(orders, on='customer_id', how='left')
result.show()

# Question 3: Calculate total spent per customer
print("Total spending per customer:")
result = orders.groupBy('customer_id') \
    .agg(F.sum('amount').alias('total_spent'),
         F.count('order_id').alias('order_count'))

# Join with customer names
final = customers.join(result, on='customer_id', how='left') \
    .select('name', 'total_spent', 'order_count') \
    .fillna(0)  # Replace NULL with 0
final.show()

Customers with orders (INNER JOIN):
+-------+-----------------+--------+------+-----------+
|   name|            email|order_id|amount|   category|
+-------+-----------------+--------+------+-----------+
|  Alice|  alice@email.com|     101| 250.5|Electronics|
|    Bob|    bob@email.com|     102|150.75|   Clothing|
|  Alice|  alice@email.com|     103| 300.0|Electronics|
|Charlie|charlie@email.com|     104| 75.25|      Books|
|  Alice|  alice@email.com|     105| 120.0|   Clothing|
|  David|  david@email.com|     106| 200.0|Electronics|
|    Bob|    bob@email.com|     107|  90.5|      Books|
|Charlie|charlie@email.com|     108| 180.0|Electronics|
+-------+-----------------+--------+------+-----------+

All customers (LEFT JOIN):
+-----------+-------+-----------------+--------+-----------------+--------+----------+------+-----------+
|customer_id|   name|            email|    city|registration_date|order_id|order_date|amount|   category|
+-----------+-------+-----------------+--------+----

## Task 3: Calculate Running Totals

In [0]:
# Ensure 'orders' DataFrame is defined
try:
    orders
except NameError:
    customers = spark.createDataFrame([
        (1, 'Alice', 'alice@email.com', 'New York', '2023-01-15'),
        (2, 'Bob', 'bob@email.com', 'London', '2023-02-20'),
        (3, 'Charlie', 'charlie@email.com', 'Paris', '2023-03-10'),
        (4, 'David', 'david@email.com', 'Tokyo', '2023-04-05'),
        (5, 'Eve', 'eve@email.com', 'Berlin', '2023-05-12')
    ], ['customer_id', 'name', 'email', 'city', 'registration_date'])
    orders = spark.createDataFrame([
        (101, 1, '2024-01-01', 250.50, 'Electronics'),
        (102, 2, '2024-01-02', 150.75, 'Clothing'),
        (103, 1, '2024-01-03', 300.00, 'Electronics'),
        (104, 3, '2024-01-04', 75.25, 'Books'),
        (105, 1, '2024-01-05', 120.00, 'Clothing'),
        (106, 4, '2024-01-06', 200.00, 'Electronics'),
        (107, 2, '2024-01-07', 90.50, 'Books'),
        (108, 3, '2024-01-08', 180.00, 'Electronics')
    ], ['order_id', 'customer_id', 'order_date', 'amount', 'category'])

from pyspark.sql.window import Window
import pyspark.sql.functions as F

# Running total of orders by date
window_spec = Window.orderBy('order_date') \
                    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

result = orders.withColumn(
    'running_total',
    F.sum('amount').over(window_spec)
).withColumn(
    'cumulative_orders',
    F.count('order_id').over(window_spec)
)

print("Daily Running Totals:")
result.orderBy('order_date').show()

# Running total by category
category_window = Window.partitionBy('category') \
                        .orderBy('order_date') \
                        .rowsBetween(Window.unboundedPreceding, Window.currentRow)

result = orders.withColumn(
    'category_running_total',
    F.sum('amount').over(category_window)
)

print("Running Totals by Category:")
result.orderBy('category', 'order_date').show()

Daily Running Totals:




+--------+-----------+----------+------+-----------+-------------+-----------------+
|order_id|customer_id|order_date|amount|   category|running_total|cumulative_orders|
+--------+-----------+----------+------+-----------+-------------+-----------------+
|     101|          1|2024-01-01| 250.5|Electronics|        250.5|                1|
|     102|          2|2024-01-02|150.75|   Clothing|       401.25|                2|
|     103|          1|2024-01-03| 300.0|Electronics|       701.25|                3|
|     104|          3|2024-01-04| 75.25|      Books|        776.5|                4|
|     105|          1|2024-01-05| 120.0|   Clothing|        896.5|                5|
|     106|          4|2024-01-06| 200.0|Electronics|       1096.5|                6|
|     107|          2|2024-01-07|  90.5|      Books|       1187.0|                7|
|     108|          3|2024-01-08| 180.0|Electronics|       1367.0|                8|
+--------+-----------+----------+------+-----------+-------------

## Task 4: Create Derived Features

In [0]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

# 1. Customer lifetime value (total spent)
clv = orders.groupBy('customer_id') \
    .agg(F.sum('amount').alias('lifetime_value'))

# 2. Average order value
aov = orders.groupBy('customer_id') \
    .agg(F.avg('amount').alias('avg_order_value'))

# 3. Days since registration
customers_enhanced = customers.withColumn(
    'days_since_registration',
    F.datediff(F.current_date(), F.col('registration_date'))
)

# 4. Customer segmentation (using UDF)
@udf(returnType=StringType())
def segment_customer(lifetime_value):
    if lifetime_value is None:
        return 'Inactive'
    elif lifetime_value < 100:
        return 'Bronze'
    elif lifetime_value < 300:
        return 'Silver'
    else:
        return 'Gold'

# Combine all features
final_customers = customers_enhanced \
    .join(clv, on='customer_id', how='left') \
    .join(aov, on='customer_id', how='left') \
    .withColumn('segment', segment_customer('lifetime_value'))

print("Customer Features:")
final_customers.show()

# 5. Category preference per customer
category_pref = orders.groupBy('customer_id', 'category') \
    .agg(F.sum('amount').alias('category_total')) \
    .withColumn(
        'rank',
        F.rank().over(Window.partitionBy('customer_id').orderBy(F.desc('category_total')))
    ).filter(F.col('rank') == 1) \
    .select('customer_id', F.col('category').alias('preferred_category'))

final_customers = final_customers.join(category_pref, on='customer_id', how='left')

print("Final Customer Profile:")
final_customers.show(truncate=False)

Customer Features:
+-----------+-------+-----------------+--------+-----------------+-----------------------+--------------+---------------+--------+
|customer_id|   name|            email|    city|registration_date|days_since_registration|lifetime_value|avg_order_value| segment|
+-----------+-------+-----------------+--------+-----------------+-----------------------+--------------+---------------+--------+
|          1|  Alice|  alice@email.com|New York|       2023-01-15|                   1092|         670.5|          223.5|    Gold|
|          2|    Bob|    bob@email.com|  London|       2023-02-20|                   1056|        241.25|        120.625|  Silver|
|          3|Charlie|charlie@email.com|   Paris|       2023-03-10|                   1038|        255.25|        127.625|  Silver|
|          4|  David|  david@email.com|   Tokyo|       2023-04-05|                   1012|         200.0|          200.0|  Silver|
|          5|    Eve|    eve@email.com|  Berlin|       2023-05-1