In [0]:
# Databricks notebook source
# MAGIC %md
# MAGIC ##  Refined Layer - Analytical Metrics and Insights

# COMMAND ----------

from pyspark.sql.functions import col, countDistinct, avg, explode, lit
from pyspark.sql.window import Window

# Load Trusted data
df_trusted = spark.read.parquet("/FileStore/trusted/events_joined")

# COMMAND ----------

#  Most effective marketing channel (by offer completion rate)

# Filter completed and received events
completed = df_trusted.filter(col("event") == "offer completed")
received = df_trusted.filter(col("event") == "offer received")

# Explode channels for accurate grouping
completed_exploded = completed.withColumn("channel", explode(col("channels")))
received_exploded = received.withColumn("channel", explode(col("channels")))

# Group and count unique customers per channel
completed_channel = completed_exploded.groupBy("channel").agg(countDistinct("customer_id").alias("completed"))
received_channel = received_exploded.groupBy("channel").agg(countDistinct("customer_id").alias("received"))

# Join and calculate completion rate
channel_stats = completed_channel.join(received_channel, "channel") \
    .withColumn("completion_rate", (col("completed") / col("received")) * 100)

display(channel_stats.orderBy(col("completion_rate").desc()))


[0;31m---------------------------------------------------------------------------[0m
[0;31mSparkConnectGrpcException[0m                 Traceback (most recent call last)
File [0;32m<command-7911428906803174>, line 26[0m
[1;32m     23[0m received_exploded [38;5;241m=[39m received[38;5;241m.[39mwithColumn([38;5;124m"[39m[38;5;124mchannel[39m[38;5;124m"[39m, explode(col([38;5;124m"[39m[38;5;124mchannels[39m[38;5;124m"[39m)))
[1;32m     25[0m [38;5;66;03m# Group and count unique customers per channel[39;00m
[0;32m---> 26[0m completed_channel [38;5;241m=[39m completed_exploded[38;5;241m.[39mgroupBy([38;5;124m"[39m[38;5;124mchannel[39m[38;5;124m"[39m)[38;5;241m.[39magg(countDistinct([38;5;124m"[39m[38;5;124mcustomer_id[39m[38;5;124m"[39m)[38;5;241m.[39malias([38;5;124m"[39m[38;5;124mcompleted[39m[38;5;124m"[39m))
[1;32m     27[0m received_channel [38;5;241m=[39m received_exploded[38;5;241m.[39mgroupBy([38;5;124m"[39m[38;5;124mc

In [0]:
# Age distribution: offer completed vs not completed

# Customers who completed
completed_age = completed.select("customer_id", "age").dropDuplicates().withColumn("status", lit("completed"))

# Customers who received but did not complete
received_ids = received.select("customer_id", "offer_id").distinct()
completed_ids = completed.select("customer_id", "offer_id").distinct()

not_completed = received_ids.subtract(completed_ids)
not_completed_age = not_completed.join(df_trusted.select("customer_id", "age"), "customer_id").dropDuplicates() \
    .withColumn("status", lit("not completed"))

# Combine both for comparison
age_comparison = completed_age.unionByName(not_completed_age)

display(age_comparison.groupBy("status", "age").count().orderBy("age"))


In [0]:
#  Average time to complete an offer after receiving it

# Filter and rename for join
received_time = df_trusted.filter(col("event") == "offer received") \
    .select("customer_id", "offer_id", col("time").alias("start_time"))

completed_time = df_trusted.filter(col("event") == "offer completed") \
    .select("customer_id", "offer_id", col("time").alias("end_time"))

# Join and calculate duration
time_df = received_time.join(completed_time, ["customer_id", "offer_id"])
time_df = time_df.withColumn("completion_time", col("end_time") - col("start_time"))

# Average
avg_time = time_df.select(avg("completion_time").alias("avg_completion_time_hours"))
display(avg_time)


[0;31m---------------------------------------------------------------------------[0m
[0;31mSparkConnectGrpcException[0m                 Traceback (most recent call last)
File [0;32m<command-7911428906803177>, line 16[0m
[1;32m     14[0m [38;5;66;03m# Average[39;00m
[1;32m     15[0m avg_time [38;5;241m=[39m time_df[38;5;241m.[39mselect(avg([38;5;124m"[39m[38;5;124mcompletion_time[39m[38;5;124m"[39m)[38;5;241m.[39malias([38;5;124m"[39m[38;5;124mavg_completion_time_hours[39m[38;5;124m"[39m))
[0;32m---> 16[0m display(avg_time)

File [0;32m/databricks/python_shell/lib/dbruntime/display.py:142[0m, in [0;36mDisplay.display[0;34m(self, input, *args, **kwargs)[0m
[1;32m    140[0m [38;5;66;03m# This version is for Serverless + Spark Connect dogfooding.[39;00m
[1;32m    141[0m [38;5;28;01melif[39;00m [38;5;28mself[39m[38;5;241m.[39mspark_connect_enabled [38;5;129;01mand[39;00m [38;5;28misinstance[39m([38;5;28minput[39m, ConnectDataFrame):
[0