In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CustomerETL").getOrCreate()

In [2]:
df_orders = spark.read.option("header", True).csv("hdfs://hdfs-namenode:9000/customer_etl/input/orders.csv")
df_products = spark.read.json("hdfs://hdfs-namenode:9000/customer_etl/input/products.json")
df_customers = spark.read.option("header", True).csv("hdfs://hdfs-namenode:9000/customer_etl/input/customers.csv")

In [3]:
df_orders.createOrReplaceTempView("orders")
df_products.createOrReplaceTempView("products")
df_customers.createOrReplaceTempView("customers")

In [4]:
spark.sql("""
    SELECT *
    FROM orders o
""").show()

+--------+-----------+----------+--------+----------+
|order_id|customer_id|product_id|quantity|order_date|
+--------+-----------+----------+--------+----------+
|    O001|       C101|      P201|       1|2025-05-01|
|    O002|       C101|      P202|       2|2025-05-02|
|    O003|       C102|      P201|       1|2025-05-02|
|    O004|       C101|      P203|       2|2025-05-03|
|    O005|       C103|      P204|       1|2025-05-05|
|    O006|       C102|      P201|       2|2025-05-06|
|    O007|       C101|      P202|       1|2025-05-07|
|    O008|       C104|      P203|       2|2025-05-07|
|    O009|       C104|      P202|       1|2025-05-08|
|    O010|       C105|      P201|       1|2025-05-08|
+--------+-----------+----------+--------+----------+



In [5]:
spark.sql("""
    SELECT *
    FROM products
""").show()

+---------------+-----------+----------+----------+
|_corrupt_record|   category|product_id|unit_price|
+---------------+-----------+----------+----------+
|              [|       null|      null|      null|
|           null|      Books|      P201|       250|
|           null|Electronics|      P202|      1200|
|           null|     Health|      P203|       400|
|           null| Stationery|      P204|       150|
|              ]|       null|      null|      null|
+---------------+-----------+----------+----------+



In [6]:
spark.sql("""
    SELECT *
    FROM customers
""").show()

+-----------+---------------+-------------+-----+-----------+
|customer_id|  customer_name|         city|state|signup_date|
+-----------+---------------+-------------+-----+-----------+
|       C101|Olivia Thompson|      Seattle|   WA| 2024-10-15|
|       C102|  Ethan Johnson|       Austin|   TX| 2025-01-05|
|       C103|     Emma Davis|     New York|   NY| 2025-03-20|
|       C104|    Liam Garcia|      Chicago|   IL| 2025-01-25|
|       C105|   Ava Martinez|San Francisco|   CA| 2025-02-10|
+-----------+---------------+-------------+-----+-----------+



## Step 2: Enrich + Aggregate + Classify (Using SQL)

In [7]:
# Enrich with price
spark.sql("""
    CREATE OR REPLACE TEMP VIEW enriched_orders AS
    SELECT
        o.order_id,
        o.customer_id,
        o.product_id,
        o.quantity,
        o.order_date,
        p.category,
        p.unit_price,
        o.quantity * p.unit_price AS total_price
    FROM orders o
    JOIN products p ON o.product_id = p.product_id
""")

DataFrame[]

In [8]:
spark.sql("""
    SELECT *
    FROM enriched_orders
""").show()

+--------+-----------+----------+--------+----------+-----------+----------+-----------+
|order_id|customer_id|product_id|quantity|order_date|   category|unit_price|total_price|
+--------+-----------+----------+--------+----------+-----------+----------+-----------+
|    O001|       C101|      P201|       1|2025-05-01|      Books|       250|      250.0|
|    O002|       C101|      P202|       2|2025-05-02|Electronics|      1200|     2400.0|
|    O003|       C102|      P201|       1|2025-05-02|      Books|       250|      250.0|
|    O004|       C101|      P203|       2|2025-05-03|     Health|       400|      800.0|
|    O005|       C103|      P204|       1|2025-05-05| Stationery|       150|      150.0|
|    O006|       C102|      P201|       2|2025-05-06|      Books|       250|      500.0|
|    O007|       C101|      P202|       1|2025-05-07|Electronics|      1200|     1200.0|
|    O008|       C104|      P203|       2|2025-05-07|     Health|       400|      800.0|
|    O009|       C104

In [9]:
# Aggregate per customer
spark.sql("""
    CREATE OR REPLACE TEMP VIEW customer_metrics AS
    SELECT
        customer_id,
        COUNT(order_id) AS total_orders,
        SUM(total_price) AS total_spent,
        COUNT(DISTINCT order_date) AS days_active,
        COUNT(DISTINCT category) AS categories_bought
    FROM enriched_orders
    GROUP BY customer_id
""")

DataFrame[]

In [10]:
spark.sql("""
    SELECT *
    FROM customer_metrics
""").show()

+-----------+------------+-----------+-----------+-----------------+
|customer_id|total_orders|total_spent|days_active|categories_bought|
+-----------+------------+-----------+-----------+-----------------+
|       C104|           2|     2000.0|          2|                2|
|       C102|           2|      750.0|          2|                1|
|       C103|           1|      150.0|          1|                1|
|       C105|           1|      250.0|          1|                1|
|       C101|           4|     4650.0|          4|                3|
+-----------+------------+-----------+-----------+-----------------+



In [11]:
# Add loyalty label
spark.sql("""
    CREATE OR REPLACE TEMP VIEW customer_loyalty AS
    SELECT
        m.customer_id,
        c.customer_name,
        c.city,
        c.state,
        c.signup_date,
        m.total_orders,
        m.total_spent,
        m.days_active,
        m.categories_bought,
        CASE
            WHEN m.total_orders >= 3 AND m.days_active >= 2 AND m.categories_bought >= 2 THEN 'Loyal'
            WHEN m.total_orders >= 2 AND (m.days_active >= 2 OR m.categories_bought >= 2) THEN 'Engaged'
            ELSE 'Casual'
        END AS loyalty_status
    FROM customer_metrics m
    JOIN customers c ON m.customer_id = c.customer_id
""")

DataFrame[]

In [12]:
spark.sql("""
    SELECT *
    FROM customer_loyalty
""").show()

+-----------+---------------+-------------+-----+-----------+------------+-----------+-----------+-----------------+--------------+
|customer_id|  customer_name|         city|state|signup_date|total_orders|total_spent|days_active|categories_bought|loyalty_status|
+-----------+---------------+-------------+-----+-----------+------------+-----------+-----------+-----------------+--------------+
|       C104|    Liam Garcia|      Chicago|   IL| 2025-01-25|           2|     2000.0|          2|                2|       Engaged|
|       C102|  Ethan Johnson|       Austin|   TX| 2025-01-05|           2|      750.0|          2|                1|       Engaged|
|       C103|     Emma Davis|     New York|   NY| 2025-03-20|           1|      150.0|          1|                1|        Casual|
|       C105|   Ava Martinez|San Francisco|   CA| 2025-02-10|           1|      250.0|          1|                1|        Casual|
|       C101|Olivia Thompson|      Seattle|   WA| 2024-10-15|           4|  

## Step 3: Write Final Output to HDFS (Parquet only)

In [13]:
df_loyalty = spark.sql("SELECT * FROM customer_loyalty")

In [14]:
df_loyalty.show()

+-----------+---------------+-------------+-----+-----------+------------+-----------+-----------+-----------------+--------------+
|customer_id|  customer_name|         city|state|signup_date|total_orders|total_spent|days_active|categories_bought|loyalty_status|
+-----------+---------------+-------------+-----+-----------+------------+-----------+-----------+-----------------+--------------+
|       C104|    Liam Garcia|      Chicago|   IL| 2025-01-25|           2|     2000.0|          2|                2|       Engaged|
|       C102|  Ethan Johnson|       Austin|   TX| 2025-01-05|           2|      750.0|          2|                1|       Engaged|
|       C103|     Emma Davis|     New York|   NY| 2025-03-20|           1|      150.0|          1|                1|        Casual|
|       C105|   Ava Martinez|San Francisco|   CA| 2025-02-10|           1|      250.0|          1|                1|        Casual|
|       C101|Olivia Thompson|      Seattle|   WA| 2024-10-15|           4|  

In [15]:
df_loyalty.write.mode("overwrite").option("header", True).csv("hdfs://hdfs-namenode:9000/customer_etl/output/loyalty_snapshot")