# Hybrid Banking Recommendation System: ALS + LLM
## Zenith Bank Product Recommendation Engine

This notebook implements a hybrid recommendation system that combines:
1. **Collaborative Filtering (ALS)** - For data-driven product recommendations
2. **LLM (GPT-4)** - For context awareness, explanations, and personalization

**Architecture:**
- ALS generates base recommendations from transaction patterns
- LLM enhances with context, explanations, and customer segmentation
- Feature engineering creates rich customer profiles

## Cell 1: Install Required Libraries

In [1]:
# Install required libraries
#!pip install -q openai python-dotenv implicit scikit-learn scipy pyspark

## Cell 2: Import Libraries

In [2]:
import os
import json
import warnings
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional, Tuple

import pandas as pd
import numpy as np
from scipy.sparse import csr_matrix, coo_matrix
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, ndcg_score

from pyspark.sql import SparkSession
import pandas as pd


import time
import openai
from openai import OpenAI
from dotenv import load_dotenv
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql import Window
import json
from decimal import Decimal

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Configure
warnings.filterwarnings('ignore')
sns.set_style('whitegrid')
plt.rcParams['figure.figsize'] = (12, 6)

print("‚úÖ All libraries imported successfully")

‚úÖ All libraries imported successfully


## Cell 3: Configuration and Setup

In [3]:
# ============================================================================
# API KEYS AND CONFIGURATION (loaded from .env)
# ============================================================================
from dotenv import load_dotenv
import os
import json

load_dotenv()

openai_api_key = os.getenv("OPENAI_API_KEY")

CONFIG = {
    'als': {
        'factors': int(os.getenv('ALS_FACTORS', 50)),
        'regularization': float(os.getenv('ALS_REGULARIZATION', 0.01)),
        'iterations': int(os.getenv('ALS_ITERATIONS', 15)),
        'alpha': float(os.getenv('ALS_ALPHA', 40.0))
    },
    'llm': {
        'model': os.getenv('LLM_MODEL', 'gpt-4o-mini'),
        'temperature': float(os.getenv('LLM_TEMPERATURE', 0.3)),
        'max_tokens': int(os.getenv('LLM_MAX_TOKENS', 2000))
    },
    'recommendation': {
        'top_n': int(os.getenv('RECOMMENDATION_TOP_N', 5)),
        'final_n': int(os.getenv('RECOMMENDATION_FINAL_N', 3))
    },
    'feature_engineering': {
        'recency_days': int(os.getenv('FEATURE_RECENCY_DAYS', 90)),
        'min_transactions': int(os.getenv('FEATURE_MIN_TRANSACTIONS', 3))
    }
}

print("\nüìä Configuration Loaded from .env:")
print(json.dumps(CONFIG, indent=2))



üìä Configuration Loaded from .env:
{
  "als": {
    "factors": 50,
    "regularization": 0.01,
    "iterations": 15,
    "alpha": 40.0
  },
  "llm": {
    "model": "gpt-4o-mini",
    "temperature": 0.3,
    "max_tokens": 2000
  },
  "recommendation": {
    "top_n": 5,
    "final_n": 3
  },
  "feature_engineering": {
    "recency_days": 90,
    "min_transactions": 3
  }
}


## Cell 4: Load Data

In [4]:

# Initialize Spark
spark = SparkSession.builder \
    .appName("PinnacleAI_DataLoad") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

print("‚úÖ Spark session initialized successfully")

# File paths
TRANSACTIONS_TABLE = r'C:\Users\adeye\Documents\Pinnacle-AI\dataset\transaction.csv'
PRODUCTS_TABLE = r'C:\Users\adeye\Documents\Pinnacle-AI\dataset\product.csv'
CONVERSATIONS_TABLE = r'C:\Users\adeye\Documents\Pinnacle-AI\dataset\interaction_pd.csv'
CUSTOMERS_TABLE = r'C:\Users\adeye\Documents\Pinnacle-AI\dataset\customers.csv'

print("üìÇ Loading datasets...\n")

try:
    df_transactions = spark.read.option("header", True).csv(TRANSACTIONS_TABLE)
    df_products = spark.read.option("header", True).csv(PRODUCTS_TABLE)
    df_conversations = spark.read.option("header", True).csv(CONVERSATIONS_TABLE)
    df_customers = spark.read.option("header", True).csv(CUSTOMERS_TABLE)

    print(f"‚úÖ Loaded {df_transactions.count():,} transactions")
    print(f"‚úÖ Loaded {df_products.count():,} products")
    print(f"‚úÖ Loaded {df_conversations.count():,} conversations")
    print(f"‚úÖ Loaded {df_customers.count():,} customers")

    print("\nüìä Schema Preview:")
    df_transactions.printSchema()

except Exception as e:
    print(f"‚ùå Error: {e}")


‚úÖ Spark session initialized successfully
üìÇ Loading datasets...

‚úÖ Loaded 1,200,000 transactions
‚úÖ Loaded 42 products
‚úÖ Loaded 3,164 conversations
‚úÖ Loaded 100,000 customers

üìä Schema Preview:
root
 |-- Customer_ID: string (nullable = true)
 |-- Trans_Amount: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Destination: string (nullable = true)
 |-- Deb_or_credit: string (nullable = true)
 |-- Narration: string (nullable = true)
 |-- Tran_Id: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Account_Type: string (nullable = true)



## Cell 5: Data Exploration

In [5]:
print("\n TRANSACTIONS SAMPLE:")
df_transactions.show(5, truncate=False)

print("\n PRODUCTS SAMPLE:")
df_products.show(5, truncate=False)

print("\n CUSTOMERS SAMPLE:")
df_customers.show(5, truncate=False)

print("\n CONVERSATIONS SAMPLE:")
df_conversations.show(5, truncate=False)

print("\n TRANSACTION STATISTICS:")
df_transactions.describe().show(truncate=False)

print("\n Missing Values Per Column:")
missing_df = df_transactions.select([
    F.count(F.when(F.col(c).isNull(), c)).alias(c)
    for c in df_transactions.columns
])
missing_df.show(truncate=False)

print("\n DATA OVERVIEW:")
print(f"Total Rows: {df_transactions.count():,}")
for col in df_transactions.columns:
    unique_count = df_transactions.select(col).distinct().count()
    print(f"Unique {col}: {unique_count:,}")



 TRANSACTIONS SAMPLE:
+-----------+------------+----------+---------------------+-------------+--------------------+-----------+---------+----------------------------+
|Customer_ID|Trans_Amount|Date      |Destination          |Deb_or_credit|Narration           |Tran_Id    |Category |Account_Type                |
+-----------+------------+----------+---------------------+-------------+--------------------+-----------+---------+----------------------------+
|ZB060544   |2161947.29  |2023-01-01|Maintenance Services |D            |service charge      |TR000054680|Housing  |Timeless Account Savings    |
|ZB032114   |11331.88    |2023-01-01|Mobil Filling Station|D            |premium motor spirit|TR000874920|Transport|Gold Premium Current Account|
|ZB096997   |497269.45   |2023-01-01|JAMB Registration    |D            |school fees payment |TR000315856|Education|Timeless Account Current    |
|ZB011060   |382993.75   |2023-01-01|Slot Limited         |D            |fashion items       |TR00082

## Cell 6: Feature Engineering - Create Interaction Matrix

In [None]:
# ============================================================================
# PINNACLE AI - INTERACTION MATRIX CREATION (OPENAI + PYSPARK, PYTHON 3.13 SAFE)
# ============================================================================

import os
import json
import time
import pandas as pd
from openai import OpenAI
from pyspark.sql import SparkSession, functions as F

# ----------------------------------------------------------------------------
# ENVIRONMENT AND CONFIGURATION
# ----------------------------------------------------------------------------

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
if not OPENAI_API_KEY:
    raise EnvironmentError("OPENAI_API_KEY not found in environment. Please set it in .env")

CONFIG = {
    'als': {
        'factors': 50,
        'regularization': 0.01,
        'iterations': 15,
        'alpha': 40.0
    },
    'llm': {
        'model': 'gpt-4o-mini',
        'temperature': 0.3,
        'max_tokens': 2000
    },
    'recommendation': {
        'top_n': 5,
        'final_n': 3
    },
    'feature_engineering': {
        'recency_days': 90,
        'min_transactions': 3
    }
}

print("Configuration Loaded:")
print(json.dumps(CONFIG, indent=2))

# ----------------------------------------------------------------------------
# INITIALIZE SPARK SESSION
# ----------------------------------------------------------------------------

spark = SparkSession.builder.appName("PinnacleAI_InteractionMatrix").getOrCreate()

# ----------------------------------------------------------------------------
# FUNCTION: CREATE CUSTOMER-PRODUCT INTERACTIONS
# ----------------------------------------------------------------------------

def create_customer_product_interactions(
    df_custs,
    df_products,
    openai_client=None,
    model_name="gpt-4o-mini",
    customer_sample_size=None,
    batch_size=30,
    temperature=0.3,
    top_n_products=None,
    rate_limit_delay=0.5,
    additional_rules=None,
    test_mode=False
):
    start_time = time.time()
    print("\nCreating full interaction matrix (dense scoring)...\n")

    # Prepare product data
    all_product_names = [r["Product_Name"] for r in df_products.select("Product_Name").collect()]
    print(f"Loaded {len(all_product_names)} products")

    # Keyword extraction
    df_custs_kw = df_custs.withColumn(
        "account_keywords",
        F.regexp_extract(F.lower(F.col("Account_Type")),
                         r"(current|savings|sme|student|premium|platinum|children|aspire)", 1)
    )
    df_prod_kw = df_products.withColumn(
        "product_keywords",
        F.regexp_extract(F.lower(F.col("Product_Name")),
                         r"(current|savings|sme|student|premium|platinum|children|aspire)", 1)
    )

    keyword_map = df_prod_kw.filter(F.col("product_keywords") != "").select(
        "product_keywords", "Product_Name"
    ).distinct()

    df_cust_current = df_custs_kw.join(
        keyword_map,
        df_custs_kw.account_keywords == keyword_map.product_keywords,
        "left"
    ).withColumnRenamed("Product_Name", "current_product")

    customer_current_products_df = df_cust_current.filter(
        F.col("current_product").isNotNull()
    ).groupBy("Customer_ID").agg(F.collect_set("current_product").alias("current_products"))

    print(f"Found current products for {customer_current_products_df.count()} customers")

    customer_current_products = {
        r["Customer_ID"]: set(r["current_products"])
        for r in customer_current_products_df.collect()
    }

    # Sampling
    total_customers = df_custs.count()
    if customer_sample_size and total_customers > customer_sample_size:
        df_custs = df_custs.sample(fraction=customer_sample_size / total_customers, seed=42).limit(customer_sample_size)
        print(f"Sampled {customer_sample_size} customers from {total_customers}")

    # Base rules
    general_rules = [
        "- Match products to customer demographics (age, income, occupation)",
        "- Exclude products already owned by the customer",
        "- Respect target audience and minimum balance criteria",
        "- Score every product for every customer, even if not relevant"
    ]
    if additional_rules:
        general_rules += [f"- {r}" for r in additional_rules]
    rules_text = "\n".join(general_rules)

    cust_rows = df_custs.collect()
    all_interactions = []

    # --------------------------------------------------------------------
    # Backoff-enabled API call helper
    # --------------------------------------------------------------------
    @backoff.on_exception(backoff.expo, Exception, max_tries=3, jitter=None)
    def call_openai(prompt):
        return openai_client.chat.completions.create(
            model=model_name,
            messages=[
                {"role": "system", "content": "You are a precise JSON generator."},
                {"role": "user", "content": prompt}
            ],
            temperature=temperature,
            response_format={"type": "json_object"}
        )

    # --------------------------------------------------------------------
    # TEST MODE
    # --------------------------------------------------------------------
    if test_mode:
        print("Running in TEST MODE (no OpenAI calls)...")
        for cust in cust_rows:
            for prod in all_product_names:
                score = float(round(5 + 5 * (hash(cust["Customer_ID"] + prod) % 100) / 100, 2))
                all_interactions.append({
                    "Customer_ID": cust["Customer_ID"],
                    "Product_Name": prod,
                    "interaction_score": score
                })
    else:
        if not openai_client:
            raise ValueError("OpenAI client must be initialized when test_mode=False")

        for i in range(0, len(cust_rows), batch_size):
            batch = cust_rows[i:i + batch_size]
            summaries = []
            for cust in batch:
                details = [f"Customer {cust['Customer_ID']}"]
                for col in df_custs.columns:
                    if col != "Customer_ID":
                        val = cust[col]
                        if val:
                            details.append(f"- {col}: {val}")
                summaries.append("\n".join(details))

            prompt = f"""
            You are a banking recommendation AI. 
            For every customer, assign a product fit score between 0 and 10 for each product ‚Äî even if the fit is low.

            CUSTOMERS:
            {chr(10).join(summaries)}

            PRODUCTS:
            {'; '.join(all_product_names)}

            RULES:
            {rules_text}

            Return only JSON:
            {{
                "matches": [
                    {{"customer_id": "C001", "product": "Product Name", "score": 8.5}},
                    ...
                ]
            }}
            """

            try:
                response = call_openai(prompt)
                content = response.choices[0].message.content
                result = json.loads(content)

                if "matches" not in result:
                    print(f"‚ö†Ô∏è No matches returned in batch {i//batch_size + 1}")
                    continue

                for match in result["matches"]:
                    all_interactions.append({
                        "Customer_ID": match["customer_id"],
                        "Product_Name": match["product"],
                        "interaction_score": float(match["score"])
                    })
                print(f"‚úÖ Batch {i//batch_size + 1}: {len(result['matches'])} scores generated")

            except Exception as e:
                print(f"‚ùå Error in batch {i//batch_size + 1}: {e}")
                continue

            time.sleep(rate_limit_delay)

    # --------------------------------------------------------------------
    # Convert to Spark DataFrame (Python 3.13 safe)
    # --------------------------------------------------------------------
    if not all_interactions:
        print("‚ö†Ô∏è No interactions generated; using fallback values.")
        fallback_data = [(cust["Customer_ID"], all_product_names[0], 5.0) for cust in cust_rows]
        pdf = pd.DataFrame(fallback_data, columns=["Customer_ID", "Product_Name", "interaction_score"])
    else:
        pdf = pd.DataFrame(all_interactions)

    interaction_df = spark.createDataFrame(pdf)

    # Remove already-owned products
    if customer_current_products:
        current_pairs = [(cust, prod) for cust, prods in customer_current_products.items() for prod in prods]
        current_df = spark.createDataFrame(current_pairs, ["Customer_ID", "Product_Name"])
        interaction_df = interaction_df.join(current_df, ["Customer_ID", "Product_Name"], "left_anti")

    # Consolidate max score per pair
    interaction_df = interaction_df.groupBy("Customer_ID", "Product_Name").agg(
        F.max("interaction_score").alias("interaction_score")
    )

    elapsed = time.time() - start_time
    print(f"\n‚úÖ Completed in {elapsed:.2f}s with {interaction_df.count()} interactions.")

    os.makedirs("data", exist_ok=True)
    interaction_df.write.mode("overwrite").parquet("data/interactions.parquet")
    print("üíæ Saved to data/interactions.parquet")

    return interaction_df

# ----------------------------------------------------------------------------
# EXECUTION
# ----------------------------------------------------------------------------

custom_rules = [
    "Premium products require demonstrated high transaction volumes",
    "Savings products are suitable for customers with stable income"
]

client = OpenAI(api_key=OPENAI_API_KEY)

interaction_df = create_customer_product_interactions(
    df_customers,
    df_products,
    openai_client=client,
    model_name=CONFIG["llm"]["model"],
    customer_sample_size=10,
    batch_size=30,
    temperature=CONFIG["llm"]["temperature"],
    top_n_products=CONFIG["recommendation"]["top_n"],
    rate_limit_delay=0.5,
    additional_rules=custom_rules,
    test_mode=False
)

print("\nSample Interactions:")
interaction_df.show(10)

print("\nScore Distribution:")
interaction_df.describe(["interaction_score"]).show()

print("\nTop Products by Interaction Count:")
top_products = interaction_df.groupBy("Product_Name").count().orderBy(F.desc("count")).limit(10)
top_products.show()

Configuration Loaded:
{
  "als": {
    "factors": 50,
    "regularization": 0.01,
    "iterations": 15,
    "alpha": 40.0
  },
  "llm": {
    "model": "gpt-4o-mini",
    "temperature": 0.3,
    "max_tokens": 2000
  },
  "recommendation": {
    "top_n": 5,
    "final_n": 3
  },
  "feature_engineering": {
    "recency_days": 90,
    "min_transactions": 3
  }
}

Creating interaction matrix...

Loaded 42 products
Found current products for 92202 customers
Sampled 10 customers from 100000
Batch 1: 5 matches


AttributeError: 'DataFrame' object has no attribute 'iteritems'

In [None]:
# ============================================================
# PYSPARK NOTEBOOK/SCRIPT ‚Äî ALS RECOMMENDATIONS FROM CSV
# ============================================================
# Requirements:
#   - PySpark 3.4+ (recommended 3.5.x)
#   - JDK 11/17 (JAVA_HOME set)
#
# Input:
#   interaction_pd.csv   (required) columns: Customer_ID, Product_Name, interaction_score
#   products.csv         (optional)
#   customers.csv        (optional)
#
# Outputs (folders with CSVs):
#   als_recommendations_csv/
#   recommendations_pivot_csv/
# ============================================================

from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import (
    col, row_number, desc, explode, first, count, lit
)
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator

TOP_N = 5
INTERACTIONS_PATH = "Downloads/interaction_pd.csv"
PRODUCTS_PATH = "products.csv"
CUSTOMERS_PATH = "customers.csv"

spark = (SparkSession.builder
         .appName("ALS-Recommendations")
         # .config("spark.sql.shuffle.partitions","200")  # tune for cluster size
         .getOrCreate())

print("=" * 70)
print("üöÄ ALS RECOMMENDATIONS - PYSPARK VERSION")
print("=" * 70)

# ------------------------------------------------------------
# 1) LOAD DATA
# ------------------------------------------------------------
print("\nüìã Step 1: Loading data from CSV files...")

interaction_df = (spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(INTERACTIONS_PATH))

# Basic validation
required_cols = {"Customer_ID", "Product_Name", "interaction_score"}
missing = required_cols - set(interaction_df.columns)
if missing:
    raise ValueError(f"{INTERACTIONS_PATH} missing columns: {missing}")

# Drop bad rows (nulls / non-numeric ratings)
interaction_df = (interaction_df
                  .where(col("Customer_ID").isNotNull() & col("Product_Name").isNotNull())
                  .withColumn("interaction_score", col("interaction_score").cast("double"))
                  .where(col("interaction_score").isNotNull()))

print(f"‚úÖ Loaded interactions: {interaction_df.count():,} rows")
print(f"  ‚Ä¢ Unique customers: {interaction_df.select('Customer_ID').distinct().count():,}")
print(f"  ‚Ä¢ Unique products : {interaction_df.select('Product_Name').distinct().count():,}")

def try_load(path):
    try:
        df = (spark.read.option("header",True).option("inferSchema",True).csv(path))
        print(f"‚úÖ Loaded {path}: {df.count():,} rows")
        return df
    except Exception:
        print(f"‚ÑπÔ∏è {path} not found (optional)")
        return None

products_df  = try_load(PRODUCTS_PATH)
customers_df = try_load(CUSTOMERS_PATH)

# Keep only highest score per (Customer_ID, Product_Name)
w_dedup = Window.partitionBy("Customer_ID", "Product_Name").orderBy(desc("interaction_score"))
interaction_df = (interaction_df
    .withColumn("rn", row_number().over(w_dedup))
    .where(col("rn")==1)
    .drop("rn"))

print(f"üìä After de-dup: {interaction_df.count():,} interactions")

# ------------------------------------------------------------
# 2) INDEX STRING KEYS ‚Üí INTS FOR ALS
# ------------------------------------------------------------
print("\nüìã Step 2: Indexing IDs for ALS...")

user_indexer = StringIndexer(inputCol="Customer_ID", outputCol="user_idx", handleInvalid="skip")
item_indexer = StringIndexer(inputCol="Product_Name", outputCol="item_idx", handleInvalid="skip")

user_indexer_model = user_indexer.fit(interaction_df)
item_indexer_model = item_indexer.fit(interaction_df)

idx_df = user_indexer_model.transform(interaction_df)
idx_df = item_indexer_model.transform(idx_df)

idx_df = (idx_df
          .withColumn("user_int", col("user_idx").cast("int"))
          .withColumn("item_int", col("item_idx").cast("int"))
          .select("Customer_ID","Product_Name","interaction_score","user_int","item_int"))

users_map = idx_df.select("user_int","Customer_ID").distinct()
items_map = idx_df.select("item_int","Product_Name").distinct()

print(f"  ‚Ä¢ Indexed users:  {users_map.count():,}")
print(f"  ‚Ä¢ Indexed items:  {items_map.count():,}")

# ------------------------------------------------------------
# 3) TRAIN / EVALUATE ALS (explicit ratings)
# ------------------------------------------------------------
print("\nüî® Step 3: Training ALS model...")

train_df, test_df = idx_df.randomSplit([0.8, 0.2], seed=42)

als = ALS(
    userCol="user_int",
    itemCol="item_int",
    ratingCol="interaction_score",
    implicitPrefs=False,   # explicit ratings
    nonnegative=True,
    coldStartStrategy="drop",
    rank=20,
    regParam=0.1,
    maxIter=20,
    seed=42
)

model = als.fit(train_df)

# Evaluate
pred_test = model.transform(test_df)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="interaction_score", predictionCol="prediction")
rmse = evaluator.evaluate(pred_test)
print(f"‚úÖ Trained. Test RMSE: {rmse:.4f}")

# ------------------------------------------------------------
# 4) RECOMMENDATIONS (Top-N per user) ‚Äî FIXED FIELD NAME
# ------------------------------------------------------------
print("\nüìã Step 4: Generating recommendations...")

recs = model.recommendForAllUsers(TOP_N)

# If you're ever unsure, uncomment this to inspect schema:
# recs.printSchema()
# root
#  |-- user_int: integer (nullable = false)
#  |-- recommendations: array (nullable = true)
#  |    |-- element: struct (containsNull = true)
#  |    |    |-- item_int: integer (nullable = true)
#  |    |    |-- rating: float (nullable = true)

flat = (
    recs
    .select(col("user_int"), explode("recommendations").alias("rec"))
    .select(
        col("user_int"),
        col("rec.item_int").alias("item_int"),        # <- NOTE: item_int (not itemInt)
        col("rec.rating").alias("recommendation_score")
    )
)

full_recs = (
    flat
    .join(users_map, on="user_int", how="left")
    .join(items_map, on="item_int", how="left")
)

w_rank = Window.partitionBy("user_int").orderBy(desc("recommendation_score"))
final_recs = (full_recs
              .withColumn("rank", row_number().over(w_rank))
              .select("Customer_ID","Product_Name","recommendation_score","rank"))

print(f"‚úÖ Generated {final_recs.count():,} recommendations")
print(f"   for {final_recs.select('Customer_ID').distinct().count():,} customers")

# ------------------------------------------------------------
# 5) INSIGHTS (Top products) & SAMPLE PRINTS
# ------------------------------------------------------------
print("\nüèÜ Most Recommended Products:")
(final_recs
 .groupBy("Product_Name").agg(count("*").alias("times_recommended"))
 .orderBy(desc("times_recommended"))
 .show(10, truncate=False))

print("\nüë• Sample customer recommendations:")
sample_ids = [r["Customer_ID"] for r in final_recs.select("Customer_ID").distinct().limit(3).collect()]
for cid in sample_ids:
    print(f"\nCustomer: {cid}")
    (final_recs
     .where(col("Customer_ID")==cid)
     .orderBy("rank")
     .show(truncate=False))

# ------------------------------------------------------------
# 6) SAVE RESULTS
# ------------------------------------------------------------
print("\nüíæ Saving results...")

# a) Full recommendations (like als_recommendations.csv)
(final_recs
 .orderBy("Customer_ID","rank")
 .coalesce(1)                          # For small-to-medium outputs; remove for big data
 .write.mode("overwrite")
 .option("header", True)
 .csv("als_recommendations_csv"))

# b) Pivot: Customer_ID x Recommendation_1..N
from pyspark.sql.functions import expr
pivot = (final_recs
         .groupBy("Customer_ID")
         .pivot("rank", list(range(1, TOP_N+1)))
         .agg(first("Product_Name"))
         .orderBy("Customer_ID"))

for i in range(1, TOP_N+1):
    pivot = pivot.withColumnRenamed(str(i), f"Recommendation_{i}")

(pivot
 .coalesce(1)
 .write.mode("overwrite")
 .option("header", True)
 .csv("recommendations_pivot_csv"))

print("\n" + "=" * 70)
print("‚úÖ ALS RECOMMENDATION PIPELINE COMPLETE!")
print("=" * 70)
print("\nFolders created:")
print("  ‚Ä¢ als_recommendations_csv/")
print("  ‚Ä¢ recommendations_pivot_csv/")

# ------------------------------------------------------------
# 7) (Optional) Popularity fallback for cold-start users/items
# ------------------------------------------------------------
# Example: create a simple popularity table you can join when needed
popularity = (idx_df.groupBy("Product_Name")
              .agg(count(lit(1)).alias("interactions"))
              .orderBy(desc("interactions")))
popularity.show(10, truncate=False)

# spark.stop()  # uncomment if running as a script


üöÄ ALS RECOMMENDATIONS - PYSPARK VERSION

üìã Step 1: Loading data from CSV files...


AnalysisException: Path does not exist: file:/c:/Users/adeye/Documents/Pinnacle-AI/notebooks/Downloads/interaction_pd.csv