# Batch Scoring

This notebook handles batch scoring of all the customers, clusters, such that they are ready for retrieval and can be calculated on beforehand

## Imports

In [None]:
import sys
print(sys.executable)

In [68]:
# PySpark core
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, explode, year, month, lit, when, row_number, exp
from pyspark.sql.functions import sum as _sum
import pyspark.sql.functions as F
from pyspark.sql.window import Window

# MLlib models
from pyspark.ml.recommendation import ALSModel
from pyspark.ml.feature import StringIndexerModel, StringIndexer, IndexToString


# File handling
import os

In [None]:
# Create local Spark session
spark = SparkSession.builder \
    .appName("LocalSparkForTesting") \
    .master("local[*]") \
    .getOrCreate()

## Load models

In [24]:
MODELS_PATH = os.path.join('/sparkdata/wholesale-recommender','models')

# Load models
als_model_individual = ALSModel.load(os.path.join(MODELS_PATH, "individual_cust_rec"))
als_model_clusters = ALSModel.load(os.path.join(MODELS_PATH, "cluster_cat_rec"))
customer_index_model = StringIndexerModel.load(os.path.join(MODELS_PATH, "customer_indexer_model"))
product_index_model = StringIndexerModel.load(os.path.join(MODELS_PATH, "product_indexer_model"))
category_index_model = StringIndexerModel.load(os.path.join(MODELS_PATH, "category_indexer_model"))

## Recommendations: Individual Customers

### Create dataframe with all customers

In [None]:
DATA_PATH = os.path.join('/sparkdata/wholesale-recommender','processed')

# Get all unique indexed customer IDs
customers = spark.read.parquet(os.path.join(DATA_PATH, "customers_features")).withColumnRenamed("Customer ID", "customer_id")
indexed_customers = customer_index_model.transform(customers.select("customer_id")).select("customer_id", "customer_index")

### Generate top 10 recommendations

In [None]:
top_n = 10

recs = als_model_individual.recommendForUserSubset(indexed_customers, top_n)

# Explode the array of recommendations
recs_individual_exploded = recs.select(
    col("customer_index"),
    explode("recommendations").alias("rec")
).select(
    "customer_index",
    col("rec.product_index").alias("product_index"),
    col("rec.rating").alias("score")
)

### Map IDs back to original IDs

In [None]:
customer_lookup = spark.createDataFrame([
    Row(customer_index=i, customer_id=cid)
    for i, cid in enumerate(customer_index_model.labels)
])

product_lookup = spark.createDataFrame([
    Row(product_index=i, product_id=pid)
    for i, pid in enumerate(product_index_model.labels)
])

# Join to get human-readable IDs
recs_individual_exploded = recs_individual_exploded.join(customer_lookup, on="customer_index", how="left")
recs_individual_exploded = recs_individual_exploded.join(product_lookup, on="product_index", how="left")

### Save

In [None]:
# Define output path
OUTPUT_PATH = os.path.abspath(os.path.join('/sparkdata/wholesale-recommender', 'results'))

# Save top 10 recommendations per customer
recs_individual_exploded.write.mode("overwrite").parquet(os.path.join(OUTPUT_PATH,"customer_individual_recs"))

## Recommendations: Clusters

### Create dataframe with all clusters

In [9]:
DATA_PATH = os.path.join('/sparkdata/wholesale-recommender','processed')

# Get all customer clusters
clusters = spark.read.parquet(os.path.join(DATA_PATH, "customer_cluster")).select("cluster").distinct()

### Generate top 10 recommendations

In [32]:
top_n = 10

recs = als_model_clusters.recommendForUserSubset(clusters, top_n)

# Explode the array of recommendations
recs_clusters_exploded = recs.select(
    col("cluster"),
    explode("recommendations").alias("rec")
).select(
    "cluster",
    col("rec.category_index").alias("category_index"),
    col("rec.rating").alias("score")
)

### Map IDs back to original IDs

In [33]:
index_to_string = IndexToString(inputCol="category_index",
                                outputCol="category_name",
                                labels=category_index_model.labels)

recs_clusters_exploded = index_to_string.transform(recs_clusters_exploded)

### Save

In [34]:
# Save top 10 recommendations per cluster
recs_clusters_exploded.write.mode("overwrite").parquet(os.path.join(OUTPUT_PATH,"customer_cluster_recs"))

## Popular Items for delivery date

### Get all order lines

In [58]:
DATA_PATH = os.path.join('/sparkdata/wholesale-recommender','cleaned')

order_lines = spark.read.parquet(os.path.join(DATA_PATH, "order_lines")).withColumnRenamed('Date Order was placed', 'order_date').withColumnRenamed('Product ID', 'product_id')

### Get most popular items for month

Using time-decayed weight

In [None]:
# Extract year and month
df = order_lines.withColumn("year", year("order_date")) \
                .withColumn("month", month("order_date"))

# Get most recent year
max_year = df.agg({"year": "max"}).collect()[0][0]

# Apply time-decay weighting
df = df.withColumn("year_diff", lit(max_year) - col("year")) \
       .withColumn("decay_weight", exp(-0.5 * col("year_diff")))

# Multiply quantity with decay weight, to weight by quantity
df = df.withColumn("weighted_qty", col("decay_weight") * col("Quantity Ordered"))

# Aggregate by month and product
monthly_popularity = df.groupBy("month", "product_id") \
    .agg(_sum("weighted_qty").alias("popularity_score"))

# Rank items per month
window = Window.partitionBy("month").orderBy(col("popularity_score").desc())

# Get top 10 for each month
top10_per_month = monthly_popularity.withColumn("rank", row_number().over(window)) \
                                    .filter(col("rank") <= 10)

### Save

In [72]:
top10_per_month.write.mode("overwrite").parquet(f"{OUTPUT_PATH}/items_per_month")