In [None]:
# CELL 1: DATA GENERATION
import pandas as pd
import numpy as np
import random
from faker import Faker

print("Initializing Data Generator...")
fake = Faker()
Faker.seed(42)
np.random.seed(42)

# --- CONFIGURATION ---
NUM_CUSTOMERS = 10_000
NUM_INTERACTIONS = 100_000
PRODUCTS = [
    {'id': 101, 'name': 'Super Saver Account', 'category': 'Savings'},
    {'id': 102, 'name': 'Gold Credit Card', 'category': 'Credit'},
    {'id': 103, 'name': 'Platinum Miles Card', 'category': 'Credit'},
    {'id': 104, 'name': 'Home Improvement Loan', 'category': 'Loan'},
    {'id': 105, 'name': 'Auto Loan', 'category': 'Loan'},
    {'id': 106, 'name': 'Crypto Wallet', 'category': 'Investment'},
    {'id': 107, 'name': 'Retirement Fund', 'category': 'Investment'},
    {'id': 108, 'name': 'Student Checking', 'category': 'Savings'},
]

# --- 1. GENERATE CUSTOMERS ---
customers = []
for i in range(NUM_CUSTOMERS):
    customers.append({
        'customer_id': i + 1,
        'name': fake.name(),
        'age': random.randint(18, 75),
        'income': random.randint(20000, 150000),
        'credit_score': random.randint(300, 850)
    })
df_customers = pd.DataFrame(customers)

# --- 2. GENERATE INTERACTIONS ---
interactions = []
for _ in range(NUM_INTERACTIONS):
    cust = df_customers.sample(1).iloc[0]
    
    # Simple logic: Wealthy -> Platinum/Crypto; Young -> Student/Saver
    if cust['income'] > 100000:
        prod = random.choice([p for p in PRODUCTS if p['id'] in [103, 106, 107]])
    elif cust['age'] < 25:
        prod = random.choice([p for p in PRODUCTS if p['id'] in [108, 101]])
    else:
        prod = random.choice(PRODUCTS)
        
    # Implicit Rating: 1 = View, 5 = Purchase
    event_type = np.random.choice(['view', 'purchase'], p=[0.7, 0.3])
    rating = 5 if event_type == 'purchase' else 1
    
    interactions.append({
        'customer_id': cust['customer_id'],
        'product_id': prod['id'],
        'rating': rating,
        'timestamp': fake.date_time_between(start_date='-1y', end_date='now')
    })

df_interactions = pd.DataFrame(interactions)
df_products = pd.DataFrame(PRODUCTS)

# --- 3. SAVE TO SPARK TABLES (The "Lakehouse" Layer) ---
# Convert Pandas -> Spark DataFrame
spark_cust = spark.createDataFrame(df_customers)
spark_inter = spark.createDataFrame(df_interactions)
spark_prod = spark.createDataFrame(df_products)

# Save as global tables so we can query them with SQL or Spark later
spark_cust.write.mode("overwrite").saveAsTable("customers")
spark_inter.write.mode("overwrite").saveAsTable("interactions")
spark_prod.write.mode("overwrite").saveAsTable("products")

print(f"SUCCESS: Generated {NUM_CUSTOMERS} customers and saved to Spark Tables.")

In [None]:
# CELL 2: DISTRIBUTED TRAINING (ALS) - FIXED
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, explode

# 1. Load Data from Tables
df_interactions = spark.table("interactions")
df_products = spark.table("products")

# --- FIX: Rename 'id' to 'product_id' so the join works later ---
df_products = df_products.withColumnRenamed("id", "product_id")

# 2. Prep Data (Casting)
training_data = df_interactions.select(
    col("customer_id").cast("integer"),
    col("product_id").cast("integer"),
    col("rating").cast("float")
)
(train, test) = training_data.randomSplit([0.8, 0.2], seed=42)

# 3. Train ALS Model
print("Training Distributed ALS Model...")
als = ALS(
    maxIter=10, 
    regParam=0.1, 
    userCol="customer_id", 
    itemCol="product_id", 
    ratingCol="rating",
    coldStartStrategy="drop"
)
model = als.fit(train)

# 4. Evaluate (RMSE)
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Model Trained. Root-mean-square error = {rmse:.4f}")

# 5. Generate Recommendations (NBO)
print("Generating 'Next Best Offer' for all users...")
user_recs = model.recommendForAllUsers(3)

# 6. Formatting
user_recs_exploded = user_recs.select(
    col("customer_id"), 
    explode("recommendations").alias("rec")
).select(
    col("customer_id"),
    col("rec.product_id"),
    col("rec.rating").alias("prediction_score")
)

# 7. Join with Product Names (Now this will work!)
final_nbo = user_recs_exploded.join(df_products, "product_id") \
    .select("customer_id", "product_id", "name", "category", "prediction_score")

# Show a sample
display(final_nbo)

In [None]:
# CELL 3: EXPORT TO CSV (Robust Version)

# 1. Define Path using the DBFS protocol (stable)
save_path = "dbfs:/FileStore/nbo_export_csv"

# 2. Write the single CSV file
# We use coalesce(1) to merge data into one file
print("Writing CSV to DBFS...")
final_nbo.coalesce(1).write.mode("overwrite").option("header", "true").csv(save_path)

# 3. Find the filename using dbutils (Native Tool) instead of 'os'
# This lists files in that folder
files = dbutils.fs.ls(save_path)
csv_name = [x.name for x in files if x.name.endswith(".csv")][0]

# 4. Generate Download Link
# We construct the URL dynamically
workspace_url = spark.conf.get("spark.databricks.workspaceUrl")
download_url = f"https://{workspace_url}/files/nbo_export_csv/{csv_name}"

print(f"SUCCESS! Download your NBO Data here:")
print("-" * 30)
print(download_url)
print("-" * 30)