## RFM ANALYSIS FOR EMEKA AND SONS

In [11]:
# IMPORT RELEVANT LIBRARIES

from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, DateType, TimestampType
from datetime import timedelta

StatementMeta(, ce15cab2-b331-4946-ace1-ca5a04060655, 13, Finished, Available, Finished)

In [12]:
# LOAD CONFIGURATIONS. I intend to use data from the silver layer for the RFM calculations

LAKEHOUSE_NAME = "Emeka_and_sons"
SILVER_SCHEMA = "silver"
GOLD_SCHEMA = "gold"

FACT_SALES_TABLE = f"{LAKEHOUSE_NAME}.{SILVER_SCHEMA}.silver_transaction_details"
DIM_CUSTOMER_TABLE = f"{LAKEHOUSE_NAME}.{SILVER_SCHEMA}.silver_customers"
OUTPUT_CUSTOMER_TABLE = f"{LAKEHOUSE_NAME}.{GOLD_SCHEMA}.dim_customer_enriched"

StatementMeta(, ce15cab2-b331-4946-ace1-ca5a04060655, 14, Finished, Available, Finished)

In [13]:
# AFTER ASSIGNING DIIFERENT SCHEMAS TO VARIABLES, WE USE THIS VARIABLES AND LOAD OUR DIM AND FACT
# TABLE ALL WHILE VALIDATING FOR ERRORS

print(f"Loading data from {FACT_SALES_TABLE} and {DIM_CUSTOMER_TABLE}...")
try:
    fact_sales_df = spark.read.table(FACT_SALES_TABLE)
    dim_customer_df = spark.read.table(DIM_CUSTOMER_TABLE)

    print("data loaded successfully.")

except Exception as e:
    print(f"Error loading tables: {e}")

    raise e

StatementMeta(, ce15cab2-b331-4946-ace1-ca5a04060655, 15, Finished, Available, Finished)

Loading data from Emeka_and_sons.silver.silver_transaction_details and Emeka_and_sons.silver.silver_customers...
data loaded successfully.


In [14]:
display(fact_sales_df.limit(5))

StatementMeta(, ce15cab2-b331-4946-ace1-ca5a04060655, 16, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 8af38a60-9738-4e65-8ae0-0019d5462053)

In [15]:
# DETERMINE THE SNAPSHOT DATE by looking at the fact table and taking the max date

print("calculating snapshot date....")

snapshot_date_row = fact_sales_df.agg(F.max("transaction_ts").alias("MaxTimestamp")).first()

if snapshot_date_row and snapshot_date_row["MaxTimestamp"]:
    snapshot_date = snapshot_date_row["MaxTimestamp"].date() + timedelta(days=1)
    snapshot_date_lit = F.lit(snapshot_date).cast(DateType())
    print(f"Snaptshot date Determined as: {snapshot_date}")
else:
    print("Warning: No transaction data found to determine snapshot date.using current date.")
    from datetime import date
    snapshot_date = date.today()
    snapshot_date_lit = F.lit(snapshot_date).cast(DateType())

StatementMeta(, ce15cab2-b331-4946-ace1-ca5a04060655, 17, Finished, Available, Finished)

calculating snapshot date....
Snaptshot date Determined as: 2024-01-01


In [16]:
# FILTER TRANSACTIONS FOR KNOWN CUSTOMERS 

print(f"Filtering transactions to include only those with a non-null customer_ID...")

original_sales_count = fact_sales_df.count()

fact_sales_known_customers_df = fact_sales_df.filter(F.col("customer_id").isNotNull())

filtered_sales_count =fact_sales_known_customers_df.count()

print(f"Filtered from {original_sales_count} to {filtered_sales_count} transaction records.")

StatementMeta(, ce15cab2-b331-4946-ace1-ca5a04060655, 18, Finished, Available, Finished)

Filtering transactions to include only those with a non-null customer_ID...
Filtered from 1656515 to 711381 transaction records.


In [17]:
# CALCULATING THE RFM METRIC PER CUSTOMER

print("calcualting the RFM metric per customer...")

rfm_metrics_df = fact_sales_known_customers_df.groupBy("Customer_id").agg(
    F.max("transaction_ts").alias("last_purchase_timestamp"),
    F.countDistinct("transaction_id").alias("frequency"),
    F.sum("line_item_gross_amount").alias("monetary_sum")
)

display(rfm_metrics_df.limit(10))

StatementMeta(, ce15cab2-b331-4946-ace1-ca5a04060655, 19, Finished, Available, Finished)

calcualting the RFM metric per customer...


SynapseWidget(Synapse.DataFrame, 689a2e71-827a-48ae-b28d-ed2147cc9e9f)

In [18]:
# CALCUALTE RECENCY DAYS BY USING THE DETERMINED SNAPSHOT DATE AND SUBTRACTING IT FROM THE LAST PURCHASE DATE

print("calcualting recency days....")

# ensure Last_purchase timestamp is converted to date for the datediff function

rfm_metrics_df = rfm_metrics_df.withColumn("Last_purchase_date", F.col("last_purchase_timestamp").cast(DateType()))

rfm_metrics_df = rfm_metrics_df.withColumn("recency_days",
   F.when(F.col("last_purchase_date").isNotNull(),
   F.datediff(snapshot_date_lit,
F.col("last_purchase_date"))
    ).otherwise(None)
   
    )



StatementMeta(, ce15cab2-b331-4946-ace1-ca5a04060655, 20, Finished, Available, Finished)

calcualting recency days....


In [19]:


# --- 6. Calculate RFM Scores (Using NTILE for Quintiles) ---
print("Calculating RFM scores...")

# Define window specs
recency_window = Window.orderBy(F.col("recency_days").asc_nulls_last())       # Lower recency is better
frequency_window = Window.orderBy(F.col("frequency").desc_nulls_last())       # Higher frequency is better
monetary_window = Window.orderBy(F.col("monetary_sum").desc_nulls_last())     # Higher monetary is better

# Step 1: Calculate raw NTILEs
rfm_scored_df = rfm_metrics_df \
    .withColumn("r_raw", F.ntile(5).over(recency_window)) \
    .withColumn("f_raw", F.ntile(5).over(frequency_window)) \
    .withColumn("m_raw", F.ntile(5).over(monetary_window))

# Step 2: Apply RFM logic and cast
rfm_scored_df = rfm_scored_df \
    .withColumn("r_score", F.when(F.col("recency_days").isNotNull(), 6 - F.col("r_raw")).otherwise(0)) \
    .withColumn("f_score", F.when(F.col("frequency") > 0, F.col("f_raw")).otherwise(0)) \
    .withColumn("m_score", F.when(F.col("monetary_sum") > 0, F.col("m_raw")).otherwise(0)) \
    .drop("r_raw", "f_raw", "m_raw") \
    .withColumn("r_score", F.col("r_score").cast(IntegerType())) \
    .withColumn("f_score", F.col("f_score").cast(IntegerType())) \
    .withColumn("m_score", F.col("m_score").cast(IntegerType()))

# --- 7. Define RFM Segments ---
print("Assigning RFM segments...")

rfm_segmented_df = rfm_scored_df.withColumn(
    "rfm_segment",
    F.when((F.col("r_score") >= 4) & (F.col("f_score") >= 4), "Champions")
     .when((F.col("r_score") >= 3) & (F.col("f_score") >= 3) & (F.col("m_score") >= 3), "Loyal Customers")
     .when((F.col("r_score") >= 4) & (F.col("f_score") <= 2), "New Customers / Promising")
     .when((F.col("r_score").between(3, 5)) & (F.col("f_score").between(3, 5)), "Potential Loyalists")
     .when((F.col("r_score") <= 2) & (F.col("f_score") >= 4), "Need Attention")
     .when((F.col("r_score") <= 2) & (F.col("f_score") <= 2) & (F.col("m_score") <= 2), "Hibernating / At Risk")
     .when((F.col("r_score") == 1) & (F.col("f_score") <= 2), "Lost Customers")
     .when((F.col("r_score") == 0) | (F.col("f_score") == 0) | (F.col("m_score") == 0), "No Recent Activity")
     .otherwise("Undecided")
)



StatementMeta(, ce15cab2-b331-4946-ace1-ca5a04060655, 21, Finished, Available, Finished)

Calculating RFM scores...
Assigning RFM segments...


In [20]:
# --- 8. Join RFM results back to Customer Dimension ---
print("Joining RFM results to customer dimension...")

# Select only the RFM columns needed from the calculated dataframe
rfm_cols_to_join = [
    "Customer_id",
    "last_purchase_timestamp",
    "recency_days",
    "frequency",
    "monetary_sum",
    "r_score",
    "f_score",
    "m_score",
    "rfm_segment"
]
rfm_final_df = rfm_segmented_df.select(rfm_cols_to_join)

# Perform a left join to keep all customers, even those without transactions
# Use coalesce to fill RFM metrics with default values (e.g., 0 or specific strings) for customers without matches
dim_customer_enriched_df = dim_customer_df.join(
    rfm_final_df,
    on="Customer_id",
    how="left"
).select(
    dim_customer_df["*"], # Select all columns from the original customer dim
    F.coalesce(rfm_final_df["last_purchase_timestamp"], F.lit(None).cast(TimestampType())).alias("last_purchase_timestamp"),
    F.coalesce(rfm_final_df["recency_days"], F.lit(9999)).alias("recency_days"), # Assign large number for null recency
    F.coalesce(rfm_final_df["frequency"], F.lit(0)).alias("frequency"),
    F.coalesce(rfm_final_df["monetary_sum"], F.lit(0.0)).alias("monetary_sum"),
    F.coalesce(rfm_final_df["r_score"], F.lit(0)).alias("r_score"), # Assign score 0 for nulls
    F.coalesce(rfm_final_df["f_score"], F.lit(0)).alias("f_score"),
    F.coalesce(rfm_final_df["m_score"], F.lit(0)).alias("m_score"),
    F.coalesce(rfm_final_df["rfm_segment"], F.lit("Never Purchased")).alias("rfm_segment") # Assign specific segment for nulls
)

# --- 9. Write Enriched Customer Dimension to Lakehouse Gold Layer ---
print(f"Writing enriched customer dimension to {OUTPUT_CUSTOMER_TABLE}...")
dim_customer_enriched_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(OUTPUT_CUSTOMER_TABLE)

print("-----------------------------------------")
print("Successfully created/updated enriched customer dimension table:")
print(f"Table Name: {OUTPUT_CUSTOMER_TABLE}")
print("Added Columns: last_purchase_timestamp, recency_days, frequency, monetary_sum, r_score, f_score, m_score, rfm_segment")
print("-----------------------------------------")



StatementMeta(, ce15cab2-b331-4946-ace1-ca5a04060655, 22, Finished, Available, Finished)

Joining RFM results to customer dimension...
Writing enriched customer dimension to Emeka_and_sons.gold.dim_customer_enriched...
-----------------------------------------
Successfully created/updated enriched customer dimension table:
Table Name: Emeka_and_sons.gold.dim_customer_enriched
Added Columns: last_purchase_timestamp, recency_days, frequency, monetary_sum, r_score, f_score, m_score, rfm_segment
-----------------------------------------
