In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

# Step 1: Load Data
cust_df = spark.read.option("header", True).csv("dbfs:/FileStore/customers.csv")
txn_df = spark.read.option("header", True).csv("dbfs:/FileStore/transactions.csv")

# Step 2: Cast dates and amounts
cust_df = cust_df.withColumn("signup_date", to_date("signup_date"))
txn_df = txn_df.withColumn("transaction_date", to_date("transaction_date")) \
               .withColumn("amount", col("amount").cast("int"))

# Step 3: Join customers with their transactions
df = cust_df.join(txn_df, on="customer_id", how="left")

# Step 4: Find last transaction per customer
window_spec = Window.partitionBy("customer_id").orderBy(desc("transaction_date"))
df = df.withColumn("rank", row_number().over(window_spec))
df_last_txn = df.filter(col("rank") == 1)

# Step 5: Calculate days since last transaction
df_last_txn = df_last_txn.withColumn(
    "days_since_last_txn",
    datediff(current_date(), col("transaction_date"))
)

# Step 6: Label customer status
df_final = df_last_txn.withColumn(
    "customer_status",
    when(col("days_since_last_txn") <= 30, "Active")
    .when((col("days_since_last_txn") > 30) & (col("days_since_last_txn") <= 180), "Dormant")
    .when(col("days_since_last_txn") > 180, "Churn Risk")
    .otherwise("No Transactions")
)

# Step 7: Monthly Active Users
monthly_active_df = txn_df.withColumn("month", trunc("transaction_date", "month")) \
    .groupBy("month") \
    .agg(countDistinct("customer_id").alias("monthly_active_users")) \
    .orderBy("month")

# Step 8: Save to Delta
df_final.write.format("delta").mode("overwrite").save("dbfs:/mnt/data/customer_status")
monthly_active_df.write.format("delta").mode("overwrite").save("dbfs:/mnt/data/monthly_active_users")
