In [0]:
from pyspark.sql.functions import current_timestamp, col

df_raw = spark.table("bits_pilani.bronze_sch.synthetic_retail_transactions")  

# Drop the existing table if it exists
spark.sql("DROP TABLE IF EXISTS bits_pilani.bronze_sch.retail_transactions_raw")

df_raw.write.format("delta").mode("overwrite").saveAsTable("bits_pilani.bronze_sch.retail_transactions_raw")

In [0]:
from pyspark.sql.functions import to_date, when, col

df_bronze = spark.table("bits_pilani.bronze_sch.retail_transactions_raw")

df_silver = (
    df_bronze.withColumn("TXN_DATE", to_date("TXN_TIMESTAMP"))
             .withColumn("QUANTITY", col("QUANTITY").cast("int"))
             .withColumn("UNIT_REGULAR_PRICE_USD", col("UNIT_REGULAR_PRICE_USD").cast("double"))
             .withColumn("EXTENDED_PRICE_USD", col("EXTENDED_PRICE_USD").cast("double"))
             .withColumn("GROSS_MARGIN_AMT_USD", col("GROSS_MARGIN_AMT_USD").cast("double"))
             .withColumn("GUEST_TYPE", when(col("GUEST_ID").isNull(), "ANONYMOUS").otherwise("IDENTIFIED"))
)

# Drop the existing table if it exists
spark.sql("DROP TABLE IF EXISTS bits_pilani.silver_sch.retail_transactions_clean")

df_silver.write.format("delta").mode("overwrite").saveAsTable("bits_pilani.silver_sch.retail_transactions_clean")

In [0]:
from pyspark.sql.functions import (
    col, count, sum as _sum, avg, max as _max, datediff, lit,
    when, concat_ws, lower, row_number
)
from pyspark.sql.window import Window

# Load Silver data
df_silver = spark.table("bits_pilani.silver_sch.retail_transactions_clean")
linked_df = df_silver.filter(col("MASTER_GUEST_ID").isNotNull())

# Reference date
ref_date = linked_df.agg(_max("TXN_TIMESTAMP").alias("max_ts")).collect()[0]["max_ts"]

# ========== STEP 1: Transaction Count per Channel ==========
channel_counts_df = linked_df.withColumn("txn_STORE", when(col("MASTER_ORDER_ORIGIN") == "STORE", 1).otherwise(0)) \
                             .withColumn("txn_ECOM",  when(col("MASTER_ORDER_ORIGIN") == "ECOM",  1).otherwise(0)) \
                             .withColumn("txn_APP",   when(col("MASTER_ORDER_ORIGIN") == "APP",   1).otherwise(0))

channel_summary = channel_counts_df.groupBy("MASTER_GUEST_ID").agg(
    _sum("txn_STORE").alias("store_txns"),
    _sum("txn_ECOM").alias("ecom_txns"),
    _sum("txn_APP").alias("app_txns")
)

# ========== STEP 2: Top Category using Window ==========
dept_counts = linked_df.groupBy("MASTER_GUEST_ID", "MMS_DEPT_NAME") \
                       .agg(count("*").alias("dept_count"))

dept_window = Window.partitionBy("MASTER_GUEST_ID").orderBy(col("dept_count").desc())

top_dept = dept_counts.withColumn("rank", row_number().over(dept_window)) \
                      .filter(col("rank") == 1) \
                      .select("MASTER_GUEST_ID", col("MMS_DEPT_NAME").alias("top_category"))

# ========== STEP 3: Main Aggregation ==========
summary = linked_df.groupBy("MASTER_GUEST_ID").agg(
    count("TRANSACTION_ID").alias("total_transactions"),
    _sum("EXTENDED_PRICE_USD").alias("total_spend_usd"),
    avg("DISCOUNT_PCT").alias("avg_discount_pct"),
    _max("TXN_TIMESTAMP").alias("last_txn_date"),
    expr("mode() within group (order by MASTER_ORDER_ORIGIN)").alias("top_channel"),
    expr("mode() within group (order by SELLING_MARKET)").alias("top_market"),
    expr("mode() within group (order by COUPON_CODE)").alias("top_coupon")
).withColumn(
    "days_since_last_txn", datediff(lit(ref_date), col("last_txn_date"))
)

# ========== STEP 4: Join Enrichments ==========
summary = summary.join(top_dept, on="MASTER_GUEST_ID", how="left") \
                 .join(channel_summary, on="MASTER_GUEST_ID", how="left")

# ========== STEP 5: Persona Columns ==========
summary = summary \
    .withColumn("frequency",
        when(col("total_transactions") >= 10, "frequent")
        .when(col("total_transactions") >= 5, "regular")
        .otherwise("occasional")
    ).withColumn("spender",
        when(col("total_spend_usd") >= 500, "high spender")
        .when(col("total_spend_usd") >= 200, "mid-level spender")
        .otherwise("low spender")
    ).withColumn("deal_type",
        when(col("avg_discount_pct") >= 20, "deal-seeker")
        .when(col("avg_discount_pct") >= 5, "value-conscious")
        .otherwise("full-price shopper")
    ).withColumn("recency",
        when(col("days_since_last_txn") <= 30, "recent")
        .when(col("days_since_last_txn") <= 90, "somewhat active")
        .otherwise("lapsed")
    ).withColumn("coupon_phrase",
        when(col("top_coupon").isNotNull(),
             concat_ws(" ", lit("frequently uses promo codes like"), col("top_coupon")))
        .otherwise(lit("rarely uses promotions"))
    ).withColumn("persona_sentence",
        concat_ws(" ",
            col("frequency"),
            lower(col("top_channel")), lit("shopper from"),
            col("top_market"), lit("focused on"),
            concat_ws("", col("top_category"), lit(", a")),
            col("spender"), lit("and"), concat_ws("", col("deal_type"), lit(",")),
            col("recency"), lit("guest who"), concat_ws("", col("coupon_phrase"), lit(",")),
            lit("last purchased"), col("days_since_last_txn").cast("string"), lit("days ago."),
            lit("Has"), col("store_txns").cast("string"), lit("STORE,"),
            col("ecom_txns").cast("string"), lit("ECOM,"),
            col("app_txns").cast("string"), lit("APP transactions.")
        )
    )

# ========== STEP 6: Write to Gold Layer ==========
spark.sql("DROP TABLE IF EXISTS bits_pilani.gold_sch.guest_persona_summary")

summary.write.format("delta").mode("overwrite").saveAsTable("bits_pilani.gold_sch.guest_persona_summary")


In [0]:
!pip install -U kaleido

In [0]:
# 📘 guest_persona_clustering.py (Restricted Cluster Version with Labels & Visualization)

import pandas as pd
import matplotlib.pyplot as plt
import plotly.express as px
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans
import plotly.express as px
import plotly.graph_objects as go
import pandas as pd

# Step 1: Load Gold Summary Table
df_spark = spark.table("bits_pilani.gold_sch.guest_persona_summary")

# Step 2: Convert to Pandas for local ML
pdf = df_spark.select(
    "MASTER_GUEST_ID", "total_transactions", "total_spend_usd", "avg_discount_pct","top_category",
    "days_since_last_txn", "store_txns", "ecom_txns", "app_txns", "persona_sentence"
).toPandas()

# Step 3: Manual One-Hot Encoding for 'top_channel'
# pdf = pd.get_dummies(pdf, columns=["top_channel"], prefix="channel")

# Step 4: Fill any missing values
pdf.fillna(0, inplace=True)

# Step 5: Standardize Features
features = [
    "total_transactions", "total_spend_usd", "avg_discount_pct",
    "days_since_last_txn", "store_txns", "ecom_txns", "app_txns"
]
scaler = StandardScaler()
pdf_scaled = scaler.fit_transform(pdf[features])

# Step 6: PCA
pca = PCA(n_components=2)
pca_result = pca.fit_transform(pdf_scaled)
pdf["pca_x"] = pca_result[:, 0]
pdf["pca_y"] = pca_result[:, 1]

# Step 7: KMeans Clustering
kmeans = KMeans(n_clusters=5, random_state=42)
pdf["persona_cluster"] = kmeans.fit_predict(pca_result)

# Step 8: Summarize clusters for labeling
cluster_summary = pdf.groupby("persona_cluster").agg({
    "total_spend_usd": "mean",
    "total_transactions": "mean",
    "avg_discount_pct": "mean",
    "days_since_last_txn": "mean",
    "store_txns": "mean",
    "ecom_txns": "mean",
    "app_txns": "mean"
}).reset_index()
display(cluster_summary)




In [0]:
def generate_cluster_sentence(row):
    # Spend category
    if row['total_spend_usd'] >= 1500:
        spender = "high spenders"
    elif row['total_spend_usd'] >= 1000:
        spender = "mid spenders"
    else:
        spender = "low spenders"

    # Engagement
    engagement = "highly engaged" if row['total_transactions'] > 10 else "moderately engaged"

    # Recency
    if row['days_since_last_txn'] <= 30:
        recency = "recently active"
    elif row['days_since_last_txn'] <= 60:
        recency = "somewhat active"
    else:
        recency = "lapsed"

    # Channel preference
    top_channel = max(
        ["store_txns", "ecom_txns", "app_txns"],
        key=lambda c: row[c]
    ).replace("_txns", "").upper()

    # Deal type
    if row['avg_discount_pct'] > 8:
        deal_type = "deal seekers"
    elif row['avg_discount_pct'] > 4:
        deal_type = "value-conscious"
    else:
        deal_type = "full-price shoppers"

    return (
        f"These are {spender} who are {engagement} and {recency}. "
        f"They primarily shop via {top_channel}, and are typically {deal_type}."
    )

cluster_summary["cluster_sentence"] = cluster_summary.apply(generate_cluster_sentence, axis=1)
display(cluster_summary)

| Cluster | Label                                  | Reasoning                                                             |
| ------: | -------------------------------------- | --------------------------------------------------------------------- |
|   **0** | **Mid-Spend Omnichannel Guests**       | Moderate spend (\$1153), balanced use of store/ecom/app               |
|   **1** | **High-Spend Multi-Channel Loyalists** | Highest spend (\$1921), highest transaction count, low recency        |
|   **2** | **High-Spend Deal Seekers**            | High spend (\$1359), high discount usage (8.6%), high app + ecom mix  |
|   **3** | **Low-Spend Store Shoppers**           | Lower spend (\$799), mostly in-store, longer recency                  |
|   **4** | **Lapsed Low-Spend Guests**            | Lowest spend (\$572), low frequency, lapsed (98+ days), least engaged |




### 🧠 How These Labels Were Decided

Labels were assigned based on the following cluster-level patterns:

#### 💰 Spending Tiers
- **\$1500+** → High spender  
- **\$1000–1500** → Mid spender  
- **< \$1000** → Low spender  

#### 🔄 Engagement
- **`total_transactions` > 10** → Highly engaged  
- **`days_since_last_txn` > 60** → Lapsed or inactive  

#### 🛍️ Channel Preference
- Determined based on the dominant channel among:
  - `store_txns`
  - `ecom_txns`
  - `app_txns`

#### 🎯 Discount Sensitivity
- **`avg_discount_pct` > 8** → Deal seeker  
- Otherwise → Full-price or value-conscious shopper


In [0]:
# Step 9: Manual label assignment
cluster_labels = {
    0: "Mid-Spend Omnichannel Guests",
    1: "High-Spend Multi-Channel Loyalists",
    2: "High-Spend Deal Seekers",
    3: "Low-Spend Store Shoppers",
    4: "Lapsed Low-Spend Guests"
}
# Add label column to cluster_summary
cluster_summary["persona_cluster_label"] = cluster_summary["persona_cluster"].map(cluster_labels)

# Step 10: Map cluster labels to the DataFrame
pdf["persona_cluster_label"] = pdf["persona_cluster"].map(cluster_labels)

# Create a mapping
cluster_sentence_map = dict(zip(
    cluster_summary["persona_cluster_label"],
    cluster_summary["cluster_sentence"]
))

# Add to guest-level pdf
pdf["cluster_sentence"] = pdf["persona_cluster_label"].map(cluster_sentence_map)

# Optional: Combine with persona sentence
pdf["final_persona_sentence"] = pdf["persona_sentence"] + " " + pdf["cluster_sentence"]


# Step 2: Compute cluster centers and average spend
cluster_centers = pdf.groupby("persona_cluster_label").agg({
    "pca_x": "mean",
    "pca_y": "mean",
    "total_spend_usd": "mean"
}).reset_index()

# Step 3: Create scatter plot with clusters
fig = px.scatter(
    pdf,
    x="pca_x",
    y="pca_y",
    color="persona_cluster_label",
    title="🧬 Guest Persona Clusters (PCA Projection)",
    hover_data=[
        "MASTER_GUEST_ID",
        "persona_sentence",
        "total_spend_usd",
        "total_transactions",
        "avg_discount_pct",
        "days_since_last_txn"
    ],
    width=950,
    height=600
)

# Step 4: Annotate cluster centers with average spend
for _, row in cluster_centers.iterrows():
    fig.add_trace(go.Scatter(
        x=[row["pca_x"]],
        y=[row["pca_y"]],
        text=[f"{row['persona_cluster_label']}<br>Avg Spend: ${row['total_spend_usd']:.0f}"],
        mode="text",
        showlegend=False
    ))

fig.update_layout(legend_title_text='Persona Cluster')
fig.show()


# Save Plotly figure to PNG
# fig.write_image("/dbfs/tmp/guest_persona_clusters.png", width=950, height=600, scale=2)
fig.write_html("/Volumes/bits_pilani/raw_sch/raw_data/guest_persona_clusters.html")
displayHTML("<a href='/Volumes/bits_pilani/raw_sch/raw_data/guest_persona_clusters.html' target='_blank'>📥 Open & Download Interactive Plot</a>")


# Provide download link
# displayHTML("<a href='/files/tmp/guest_persona_clusters.png' target='_blank'>📥 Download PNG</a>")



# Step 11: Optionally preview final output
pdf_final = pdf[[
    "MASTER_GUEST_ID", "persona_cluster", "persona_cluster_label", "persona_sentence",
    "cluster_sentence","final_persona_sentence",
    "total_spend_usd", "total_transactions", "days_since_last_txn","top_category",
    "store_txns", "ecom_txns", "app_txns", "avg_discount_pct", "pca_x", "pca_y"
]]
display(pdf_final.head())

# Save enriched persona table (e.g., to CSV or back to Spark)
pdf_final.to_csv("final_persona_profiles.csv", index=False)

# Or convert back to Spark if needed:
df_enriched = spark.createDataFrame(pdf_final)
df_enriched.write.mode("overwrite").option("mergeSchema", "true").saveAsTable("bits_pilani.gold_sch.guest_persona_enriched")
