In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import os

In [2]:
spark = SparkSession.builder \
    .appName("Customer360-Transform") \
    .master("local[*]") \
    .config("spark.sql.legacy.parquet.int96AsTimestamp", "true") \
    .config("spark.sql.parquet.enableNestedSchemaPruning", "true") \
    .getOrCreate()

In [3]:
df_crm = spark.read.parquet(r"/home/jovyan/work/staging/crm.parquet")
df_txn = spark.read.parquet(r"/home/jovyan/work/staging/txns.parquet")

In [4]:
   # Join CRM and Transactions on customer ID
df_joined = df_txn.join(df_crm, df_txn.CustomerID == df_crm.customer_id)

In [5]:
df_joined.show()

+---------+----------+--------+---------+-------------------+-----------+----------+---------+--------------------+--------------------+-----------+
|InvoiceNo|CustomerID|Quantity|UnitPrice|        InvoiceDate|customer_id|first_name|last_name|               email|             country|signup_date|
+---------+----------+--------+---------+-------------------+-----------+----------+---------+--------------------+--------------------+-----------+
|  INV1000|        46|       1|     29.6|2025-04-10 00:00:00|         46|    Ronald| Thompson|jessicaclarke@exa...|             Finland| 2025-01-06|
|  INV1001|        45|       8|    36.28|2025-04-09 00:00:00|         45|  Mitchell|   Clarke|kristy94@example.org|          Guadeloupe| 2022-11-03|
|  INV1002|        81|       6|     9.95|2023-11-06 00:00:00|         81|    Martin| Robinson|leachlisa@example...|Libyan Arab Jamah...| 2023-07-16|
|  INV1003|        70|       3|    44.33|2023-09-11 00:00:00|         70|     Kelly| Mcdaniel| jacob43@exa

In [6]:
df_ltv = df_joined.groupBy("customer_id").sum("Quantity", "UnitPrice")

In [7]:
df_ltv.show()

+-----------+-------------+------------------+
|customer_id|sum(Quantity)|    sum(UnitPrice)|
+-----------+-------------+------------------+
|         29|           32|246.93999999999997|
|         26|           33|            157.69|
|         65|           19|            137.53|
|         54|           39|            151.75|
|         19|           30|            113.53|
|         22|           20| 92.89999999999999|
|          7|           43|            199.52|
|         77|           39|            124.56|
|         34|           34|138.52999999999997|
|         94|           44|186.89000000000004|
|         50|           49|            216.52|
|         57|           19|             68.09|
|         32|           11|             59.43|
|         43|           12|            138.07|
|         84|           25|             75.55|
|         31|           44|206.70000000000002|
|         98|           12|126.16999999999999|
|         39|           23| 87.97999999999999|
|         25|

In [8]:
df_ltv = df_ltv.withColumn(
    "lifetime_value",
    col("sum(Quantity)") * col("sum(UnitPrice)")
).select("customer_id", "lifetime_value")

In [9]:
df_ltv.show()

+-----------+------------------+
|customer_id|    lifetime_value|
+-----------+------------------+
|         29| 7902.079999999999|
|         26|5203.7699999999995|
|         65|           2613.07|
|         54|           5918.25|
|         19|            3405.9|
|         22|1857.9999999999998|
|          7|           8579.36|
|         77|           4857.84|
|         34| 4710.019999999999|
|         94| 8223.160000000002|
|         50|10609.480000000001|
|         57|           1293.71|
|         32|            653.73|
|         43|           1656.84|
|         84|           1888.75|
|         31| 9094.800000000001|
|         98|           1514.04|
|         39|2023.5399999999997|
|         25|5238.8099999999995|
|         95|           1356.84|
+-----------+------------------+
only showing top 20 rows



In [10]:
df_ltv.write.mode("overwrite").parquet(r"/home/jovyan/work/staging/fact_ltv.parquet")

In [11]:
print("✅ Transformation complete. Output saved to staging/fact_ltv.parquet")

✅ Transformation complete. Output saved to staging/fact_ltv.parquet


In [12]:
spark.stop()