In [0]:
pip install pandarallel

In [0]:
import multiprocessing
from pandarallel import pandarallel
from tqdm import tqdm
import pandas as pd

pandarallel.initialize(
    nb_workers=min(multiprocessing.cpu_count()-1, 8),
    progress_bar=True, verbose=1
)

tqdm.pandas()
K = 5  # Top-K merchants/categories to evaluate



# 1. Confidence Score
The confidence score, as calculated by the `top_k_rfm` function, measures how "**confident**" the model is in its top-K recommendations for a specific user. 

> It quantifies how much the top items stand out from the rest of the items for that user.



`Confidence = (Sum of scores of Top-K items) / (Sum of scores of all items)`

## What it Tells Us
 - A high confidence score (close to 1.0) means that the user's top-K items have significantly higher scores than their other items. This indicates a strong, concentrated preference, and we can be more confident that these top items are indeed the most important ones for this user.

- A low confidence score (closer to 0.0) suggests that the scores are more evenly distributed across all items. The distinction between the top-K items and the rest is less clear, indicating a weaker or more diverse preference.


# 2. Hit Rate
 The hit rate, calculated by the `compute_hit_scores` function, is a classic accuracy metric. 
 
 > It evaluates how well the model's recommendations predict a user's future behavior. It answers the question: "Of the K items we recommended, how many did the user actually interact with?"

How it's Calculated
To calculate the hit rate, you first need to split your data (e.g., by time) into a training set and a test set.

- Generate top-K recommendations using the training set.

- Identify the actual top-K items the user interacted with in the test set.
 
- The hit rate is the proportion of items that appear in both lists.
 
- `Hit Rate = (Number of items in both Train Top-K and Test Top-K) / K
`

##### This is also a form of Recall @K.

## What it Tells Us
- The hit rate is a direct measure of the recommendation model's predictive power. A higher hit rate means the model is more effective at identifying items that the user will find relevant in the future.


In [0]:
def top_k_rfm(df, user_col, item_col, score_col, K=5):
    """Return top-K items ranked by RFM score with confidence score."""
    def _get_top(x):
        x_sorted = x.sort_values(score_col, ascending=False)
        top_items = x_sorted.head(K)[item_col].tolist()
        conf = x_sorted.head(K)[score_col].sum() / max(1, x_sorted[score_col].sum())
        return pd.Series({"top_items": top_items, "confidence": conf})
    return df.groupby(user_col, group_keys=False).progress_apply(_get_top).reset_index()

def compute_hit_scores(train_top, test_top, item_label):
    """Compute hit score = overlap between train & test top-K sets (parallelized)."""
    merged = pd.merge(train_top, test_top, on="user_id", how="inner", suffixes=("_train", "_test"))

    def _hit(row):
        return len(set(row["top_items_train"]).intersection(row["top_items_test"])) / max(1, len(row["top_items_train"]))

    # Parallelized apply
    merged[f"{item_label}_hit_score"] = merged.parallel_apply(_hit, axis=1)
    merged = merged.rename(columns={"confidence_train": f"{item_label}_confidence"})
    return merged[["user_id", f"{item_label}_confidence", f"{item_label}_hit_score"]]


In [0]:
import pandas as pd

# train_spark = spark.read.csv("/Volumes/jupiter/temp/temp/rfm_analysis_train.csv", header=True, inferSchema=True)
# train_df = train_spark.toPandas()  # convert only if memory allows

# test_spark = spark.read.csv("/Volumes/jupiter/temp/temp/rfm_analysis_test.csv", header=True, inferSchema=True)
# test_df = test_spark.toPandas()  # convert only if memory allows

train_df = pd.read_csv("/Volumes/jupiter/temp/temp/rfm_analysis_train.csv")
test_df = pd.read_csv("/Volumes/jupiter/temp/temp/rfm_analysis_test.csv")

print(f"Train shape: {train_df.shape}, Test shape: {test_df.shape}")
print("Train sample:")
display(train_df.head(10))
print("Test sample:")
display(test_df.head(10))

In [0]:
print("Computing merchant Top-K for train & test...")
train_merchants = top_k_rfm(train_df, "user_id", "merchant_standardized", "RFM_score_merchant", K)
test_merchants  = top_k_rfm(test_df,  "user_id", "merchant_standardized", "RFM_score_merchant", K)

print("Evaluating merchant hit scores...")
merchant_eval = compute_hit_scores(train_merchants, test_merchants, "merchant")

print("✅ Merchant evaluation complete")
display(merchant_eval.head(5))


In [0]:
print("Computing category Top-K for train & test...")
train_categories = top_k_rfm(train_df, "user_id", "appcategory", "RFM_score_category", K)
test_categories  = top_k_rfm(test_df,  "user_id", "appcategory", "RFM_score_category", K)

print("Evaluating category hit scores...")
category_eval = compute_hit_scores(train_categories, test_categories, "category")

print("✅ Category evaluation complete")
display(category_eval.head(5))

In [0]:
user_report = (
    merchant_eval
    .merge(category_eval, on="user_id", how="outer")
)

# Add segments from training
seg_merchants = train_df.groupby("user_id")["customer_segment_merchant"].first().reset_index()
seg_categories = train_df.groupby("user_id")["customer_segment_category"].first().reset_index()

user_report = (
    user_report
    .merge(seg_merchants.rename(columns={"customer_segment_merchant": "merchant_segment"}), on="user_id", how="left")
    .merge(seg_categories.rename(columns={"customer_segment_category": "category_segment"}), on="user_id", how="left")
)

print("User report sample:")
display(user_report.head(10))

# 💾 Save User Report
output_path = "/Volumes/jupiter/temp/temp/rfm_backfill_user_report.csv"
user_report.to_csv(output_path, index=False)

print(f"✅ Backfill evaluation report saved to: {output_path}")

In [0]:
import matplotlib.pyplot as plt
import seaborn as sns

# Hit rate summary
hit_summary = pd.DataFrame({
    "Metric": ["Merchant", "Category"],
    "Hit Rate": [
        merchant_eval["merchant_hit_score"].mean(),
        category_eval["category_hit_score"].mean()
    ]
})

plt.figure(figsize=(6,5))
sns.barplot(x="Metric", y="Hit Rate", data=hit_summary, palette="viridis")
plt.title(f"Top-{K} Hit Rate Comparison", fontsize=14, weight="bold")
plt.ylim(0,1)
for i, v in enumerate(hit_summary["Hit Rate"]):
    plt.text(i, v+0.02, f"{v:.2%}", ha="center")
plt.show()

# Confidence distribution
plt.figure(figsize=(8,5))
sns.histplot(merchant_eval["merchant_confidence"], bins=30, color="blue", label="Merchant", kde=True)
sns.histplot(category_eval["category_confidence"], bins=30, color="green", label="Category", kde=True)
plt.title("Confidence Score Distribution", fontsize=14, weight="bold")
plt.xlabel("Confidence Score")
plt.legend()
plt.show()

# Segment-level hit rate
merchant_seg_eval = user_report.groupby("merchant_segment")["merchant_hit_score"].mean().reset_index()
category_seg_eval = user_report.groupby("category_segment")["category_hit_score"].mean().reset_index()

plt.figure(figsize=(10,5))
sns.barplot(data=merchant_seg_eval.sort_values("merchant_hit_score", ascending=False),
            x="merchant_hit_score", y="merchant_segment", palette="Blues_r")
plt.title("Merchant Hit Rate by Segment", fontsize=14, weight="bold")
plt.xlim(0,1)
for i, v in enumerate(merchant_seg_eval.sort_values("merchant_hit_score", ascending=False)["merchant_hit_score"]):
    plt.text(v+0.01, i, f"{v:.2%}", va="center")
plt.show()

plt.figure(figsize=(10,5))
sns.barplot(data=category_seg_eval.sort_values("category_hit_score", ascending=False),
            x="category_hit_score", y="category_segment", palette="Greens_r")
plt.title("Category Hit Rate by Segment", fontsize=14, weight="bold")
plt.xlim(0,1)
for i, v in enumerate(category_seg_eval.sort_values("category_hit_score", ascending=False)["category_hit_score"]):
    plt.text(v+0.01, i, f"{v:.2%}", va="center")
plt.show()

In [0]:
# ================================
# 📊 Confidence vs Hit Score Scatter (Quadrant Analysis)
# ================================

# Prepare data (combine merchant + category for plotting)
scatter_df = (
    user_report[["user_id", "merchant_confidence", "merchant_hit_score", 
                 "category_confidence", "category_hit_score"]]
    .copy()
)

# Plot for merchants
plt.figure(figsize=(8,6))
sns.scatterplot(
    data=scatter_df.sample(min(5000, len(scatter_df))),  # sample for readability if too big
    x="merchant_confidence", y="merchant_hit_score",
    alpha=0.4, edgecolor=None
)
plt.axhline(0.5, color="red", linestyle="--", linewidth=1)
plt.axvline(0.5, color="red", linestyle="--", linewidth=1)
plt.title("Confidence vs Hit Score (Merchant-Level)", fontsize=14, weight="bold")
plt.xlabel("Confidence (Train RFM Concentration)")
plt.ylabel("Hit Score (Train vs Test Overlap)")
plt.show()

# Plot for categories
plt.figure(figsize=(8,6))
sns.scatterplot(
    data=scatter_df.sample(min(5000, len(scatter_df))),
    x="category_confidence", y="category_hit_score",
    alpha=0.4, edgecolor=None, color="green"
)
plt.axhline(0.5, color="red", linestyle="--", linewidth=1)
plt.axvline(0.5, color="red", linestyle="--", linewidth=1)
plt.title("Confidence vs Hit Score (Category-Level)", fontsize=14, weight="bold")
plt.xlabel("Confidence (Train RFM Concentration)")
plt.ylabel("Hit Score (Train vs Test Overlap)")
plt.show()


In [0]:
print("✅ Numerical Summaries")

# Average hit rates
avg_hit_merchant = merchant_eval["merchant_hit_score"].mean()
avg_hit_category = category_eval["category_hit_score"].mean()

# Average confidence
avg_conf_merchant = merchant_eval["merchant_confidence"].mean()
avg_conf_category = category_eval["category_confidence"].mean()

print(f"\nMerchant Hit Rate (avg): {avg_hit_merchant:.2%}")
print(f"Category Hit Rate (avg): {avg_hit_category:.2%}")
print(f"\nMerchant Confidence (avg): {avg_conf_merchant:.2f}")
print(f"Category Confidence (avg): {avg_conf_category:.2f}")

# Quadrant counts (merchant-level)
def quadrant(row):
    if row["merchant_confidence"] >= 0.5 and row["merchant_hit_score"] >= 0.5:
        return "Good Predictions"
    elif row["merchant_confidence"] >= 0.5 and row["merchant_hit_score"] < 0.5:
        return "False Confident"
    elif row["merchant_confidence"] < 0.5 and row["merchant_hit_score"] >= 0.5:
        return "Noisy / Lucky"
    else:
        return "Unpredictable"

user_report["merchant_quadrant"] = user_report.apply(quadrant, axis=1)

quad_counts = user_report["merchant_quadrant"].value_counts(normalize=False).reset_index()
quad_counts.columns = ["Quadrant", "Users"]

quad_counts["Percentage"] = quad_counts["Users"] / len(user_report) * 100
print("\nMerchant Quadrant Distribution:")
display(quad_counts)


In [0]:
# ================================
# 🏷️ Segment-Level Analysis
# ================================

# Merchant segments
merchant_seg_eval = user_report.groupby("merchant_segment")[["merchant_hit_score","merchant_confidence"]].mean().reset_index()

# Category segments
category_seg_eval = user_report.groupby("category_segment")[["category_hit_score","category_confidence"]].mean().reset_index()

print("Merchant Segment Averages:")
display(merchant_seg_eval)

print("Category Segment Averages:")
display(category_seg_eval)

# Plot merchant segment hit rate
plt.figure(figsize=(10,5))
sns.barplot(data=merchant_seg_eval.sort_values("merchant_hit_score", ascending=False),
            x="merchant_hit_score", y="merchant_segment", palette="Blues_r")
plt.title("Merchant Hit Rate by Segment", fontsize=14, weight="bold")
plt.xlabel("Avg Hit Rate")
plt.ylabel("Merchant Segment")
plt.xlim(0,1)
for i,v in enumerate(merchant_seg_eval.sort_values("merchant_hit_score", ascending=False)["merchant_hit_score"]):
    plt.text(v+0.01, i, f"{v:.2%}", va="center")
plt.show()

# Plot category segment hit rate
plt.figure(figsize=(10,5))
sns.barplot(data=category_seg_eval.sort_values("category_hit_score", ascending=False),
            x="category_hit_score", y="category_segment", palette="Greens_r")
plt.title("Category Hit Rate by Segment", fontsize=14, weight="bold")
plt.xlabel("Avg Hit Rate")
plt.ylabel("Category Segment")
plt.xlim(0,1)
for i,v in enumerate(category_seg_eval.sort_values("category_hit_score", ascending=False)["category_hit_score"]):
    plt.text(v+0.01, i, f"{v:.2%}", va="center")
plt.show()


In [0]:
# ================================
# 🔎 Per-User Drilldown Function
# ================================
def user_drilldown(user_id):
    print("="*60)
    print(f"🔎 User: {user_id}")
    
    # Merchant metrics
    row_m = merchant_eval[merchant_eval["user_id"] == user_id]
    row_c = category_eval[category_eval["user_id"] == user_id]
    
    if not row_m.empty:
        print(f"\nMerchant Confidence: {row_m['merchant_confidence'].values[0]:.2f}")
        print(f"Merchant Hit Score: {row_m['merchant_hit_score'].values[0]:.2f}")
    if not row_c.empty:
        print(f"\nCategory Confidence: {row_c['category_confidence'].values[0]:.2f}")
        print(f"Category Hit Score: {row_c['category_hit_score'].values[0]:.2f}")

    # Show Top-K from train & test (merchants)
    train_top_m = train_merchants[train_merchants["user_id"] == user_id]
    test_top_m  = test_merchants[test_merchants["user_id"] == user_id]
    if not train_top_m.empty:
        print("\nTrain Top Merchants:", train_top_m["top_items"].values[0])
    if not test_top_m.empty:
        print("Test Top Merchants:", test_top_m["top_items"].values[0])

    # Show Top-K from train & test (categories)
    train_top_c = train_categories[train_categories["user_id"] == user_id]
    test_top_c  = test_categories[test_categories["user_id"] == user_id]
    if not train_top_c.empty:
        print("\nTrain Top Categories:", train_top_c["top_items"].values[0])
    if not test_top_c.empty:
        print("Test Top Categories:", test_top_c["top_items"].values[0])

# Example usage
# user_drilldown("0000b6d5-f969-4996-ac9c-0635f1eed680")
