# Final Project Notebook: MovieLens Recommender System with Azure OpenAI

## Introduction: 

Recommender systems are a core component of many modern online platforms, helping users discover content that matches their interests from vast collections of items. In this project, I build and compare two approaches to recommendation using the popular [MovieLens dataset](https://grouplens.org/datasets/movielens/1m/): a **traditional Spark-based collaborative filtering pipeline**, and an **LLM-enhanced recommender system** powered by the Azure OpenAI service.  

The goal is to evaluate the performance of conventional recommendation metrics — such as Precision, Recall, and Diversity — and to test how integrating a Large Language Model (LLM) can complement or extend traditional methods by generating novel suggestions in plain language.

To demonstrate real-world deployment, this project (Project 6: Azure Hands-On) is implemented entirely on **Microsoft Azure**, using:
- **Azure Blob Storage** for persistent storage of the dataset,
- **Databricks and Spark** for scalable data processing and training,
- **Azure OpenAI** for generating recommendations with a state-of-the-art LLM,
- and secure network configurations to ensure controlled access to resources.

As part of my long-term vision, I plan to adapt this pipeline to build a **lessons or course recommender system**, which could help students or learners find relevant study materials or personalized content based on their previous engagement. This project demonstrates the feasibility and limitations of combining **traditional collaborative filtering** with modern **generative AI**, paving the way for more advanced educational applications.

In the following sections, I explain the data processing pipeline, the evaluation metrics, the deployment architecture, and finally, interpret the results and limitations of both the traditional and LLM-based recommenders.


In [0]:
%pip install openai

Collecting openai
  Downloading openai-1.97.0-py3-none-any.whl.metadata (29 kB)
Collecting anyio<5,>=3.5.0 (from openai)
  Downloading anyio-4.9.0-py3-none-any.whl.metadata (4.7 kB)
Collecting httpx<1,>=0.23.0 (from openai)
  Downloading httpx-0.28.1-py3-none-any.whl.metadata (7.1 kB)
Collecting jiter<1,>=0.4.0 (from openai)
  Downloading jiter-0.10.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.2 kB)
Collecting sniffio (from openai)
  Downloading sniffio-1.3.1-py3-none-any.whl.metadata (3.9 kB)
Collecting tqdm>4 (from openai)
  Downloading tqdm-4.67.1-py3-none-any.whl.metadata (57 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/57.7 kB[0m [31m?[0m eta [36m-:--:--[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m57.7/57.7 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
Collecting httpcore==1.* (from httpx<1,>=0.23.0->openai)
  Downloading httpcore-1.0.9-py3-none-any.whl.metadata (21 kB)
Collecting h11>=0.16 (from 

#### PySpark Session:
This code initializes the PySpark session using Databricks.
Creating a SparkSession is the first step for running PySpark code — it sets up the Spark engine and gives you the entry point to use Spark DataFrames, transformations, and distributed processing for large datasets.
Without this step, none of the Spark operations (like loading data, filtering, or joining) would run.

In [0]:
# Databricks / PySpark Setup
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MovieLensFinalProject").getOrCreate()
print("Spark Session created!")

Spark Session created!


## 1. Mount Azure Blob Storage (Persistent Storage)

#### My Blob Storage:
This code mounts my Azure Blob Storage container to Databricks.
By doing this, I connect the remote storage account (movielensstorage123) and the specific container (movielensdata) to the Databricks file system at the local mount point /mnt/movielens.
This allows me to easily read and write files from the cloud storage as if they were part of the Databricks workspace.
The if check ensures that the container is not mounted twice — if it’s already mounted, it skips remounting.
This step is essential for providing persistent cloud storage, which is a requirement for the Azure hands-on component of my project.

In [0]:
# Mount Blob Storage
storage_account_name = "movielensstorage123"
container_name = "movielensdata"
mount_point = "/mnt/movielens"

# Get the key from Spark env
storage_account_key = spark.conf.get("spark.env.STORAGE_ACCOUNT_KEY")

if not any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
    dbutils.fs.mount(
        source = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net",
        mount_point = mount_point,
        extra_configs = {
            f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_key
        }
    )
    print(f"Mounted at {mount_point}")
else:
    print(f"Already mounted at {mount_point}")

Already mounted at /mnt/movielens


## 2. Load Data
This code loads the MovieLens dataset from my mounted Azure Blob Storage into Spark DataFrames.
Specifically, it reads two files:
  - `ratings.dat` — which contains user ratings for movies, with columns for userId, movieId, rating, and timestamp.
  - `movies.dat` — which includes movie details like movieId, title, and genres.

After loading, I rename the columns and explicitly cast each column to the correct data type (integers for IDs, float for ratings, long for timestamps) to ensure proper downstream processing.

Finally, I show the first few rows of each DataFrame to verify that the data has been loaded correctly.

This step demonstrates how Spark can efficiently handle large files stored remotely in Azure Blob Storage, forming the foundation for the recommendation system.

In [0]:
# Load Data
ratings_df = spark.read.option("delimiter", "::") \
                       .option("inferSchema", True) \
                       .csv(f"{mount_point}/ratings.dat")

from pyspark.sql.functions import col

ratings_df = ratings_df.toDF("userId", "movieId", "rating", "timestamp")

ratings_df = ratings_df.withColumn("userId", col("userId").cast("int")) \
                       .withColumn("movieId", col("movieId").cast("int")) \
                       .withColumn("rating", col("rating").cast("float")) \
                       .withColumn("timestamp", col("timestamp").cast("long"))

movies_df = spark.read.option("delimiter", "::").csv(f"{mount_point}/movies.dat")
movies_df = movies_df.toDF("movieId", "title", "genres")

# Show sample
ratings_df.show(5)
movies_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|   1193|   5.0|978300760|
|     1|    661|   3.0|978302109|
|     1|    914|   3.0|978301968|
|     1|   3408|   4.0|978300275|
|     1|   2355|   5.0|978824291|
+------+-------+------+---------+
only showing top 5 rows
+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Animation|Childre...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|        Comedy|Drama|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows


The code below saves the Spark `movies_df` DataFrame to a CSV file in my mounted Azure Blob Storage.          
First, I convert the Spark DataFrame to a Pandas DataFrame so I can use the `to_csv()` method, which writes the data to a CSV file.     
The output path `/dbfs/mnt/movielens/movies.csv` places the file inside Databricks File System (DBFS), which is mounted directly to my Azure Blob Storage container.
This step shows how Spark and Databricks can easily export processed data back to persistent cloud storage for later use, sharing, or external access.

In [0]:
# Save the Spark dataframe to CSV on DBFS (Databricks File System)
csv_path = "/dbfs/mnt/movielens/movies.csv"

# Convert to pandas dataframe and save
movies_df.toPandas().to_csv(csv_path, index=False)

## 3. Data Cleaning & Type Casting
This code performs additional data cleaning and type casting on my DataFrames.      
I explicitly cast the `userId`, `movieId`, `rating`, and `timestamp` columns in `ratings_df` to their appropriate data types (integers, float, long) to ensure consistency and correct behavior during analysis and model training.       
Similarly, I cast the `movieId` in `movies_df` to integer to align with `ratings_df`.       
Doing this step guarantees that joins, filters, and calculations will work properly without unexpected type mismatches — which is crucial for building a reliable recommender system.

In [0]:
#  Data Cleaning & Type Casting
from pyspark.sql.functions import col

ratings_df = ratings_df.withColumn("userId", col("userId").cast("int")) \
                       .withColumn("movieId", col("movieId").cast("int")) \
                       .withColumn("rating", col("rating").cast("float")) \
                       .withColumn("timestamp", col("timestamp").cast("long"))

movies_df = movies_df.withColumn("movieId", col("movieId").cast("int"))

## 4.
### 4.1 Enrich Movies Data: Extract Genres as Array
This code enriches the `movies_df` DataFrame with structured content features by splitting the `genres` column into an array of individual genres.      
The original `genres` field is a single string with genres separated by the `|` character (e.g., `"Comedy|Drama"`).       
Using Spark’s `split()` function, I convert this into an array of genre values for each movie, making it easier to work with during similarity calculations or diversity metrics.
I display the result to verify that the genres are now stored as clean lists, which improves the quality of content-based filtering and advanced metrics later on.

In [0]:
# Enrich with Content Features
# Extract genres from movies_df (I join in scraped taglines and poster paths -if available- in next code chunk)
from pyspark.sql.functions import split

movies_df = movies_df.withColumn("genres", split(col("genres"), "\\|"))
movies_df.select("movieId", "title", "genres").show(5, truncate=False)

+-------+----------------------------------+--------------------------------+
|movieId|title                             |genres                          |
+-------+----------------------------------+--------------------------------+
|1      |Toy Story (1995)                  |[Animation, Children's, Comedy] |
|2      |Jumanji (1995)                    |[Adventure, Children's, Fantasy]|
|3      |Grumpier Old Men (1995)           |[Comedy, Romance]               |
|4      |Waiting to Exhale (1995)          |[Comedy, Drama]                 |
|5      |Father of the Bride Part II (1995)|[Comedy]                        |
+-------+----------------------------------+--------------------------------+
only showing top 5 rows


### 4.2 Enrich Movies Data with additional features
This code uses the TMDb API to enrich the MovieLens movies with additional content features: taglines, poster paths, and TMDb IDs.

First, it loads the `movies.csv` file (previously saved from Spark) into a Pandas DataFrame. Then, for each movie, it extracts the movie title and release year, searches the TMDb API for a matching movie, and retrieves its `tagline` and `poster_path` if available.

This loop also respects TMDb’s API rate limits with a short sleep between requests. Once all details are fetched, the new information (`tmdb_id`, `tagline`, `poster_path`) is added as new columns in the DataFrame.

Finally, the enriched movie data is saved back as `scraped_taglines.csv` directly in the mounted Azure Blob Storage container.

This step demonstrates how external data can be integrated to enhance content-based features, making the recommender system richer and more flexible — for example, taglines could be used for natural language similarity or metadata-driven recommendations.

In [0]:
import pandas as pd
import requests
import time

# ✅ Get TMDb API key from Spark environment
API_KEY = spark.conf.get("spark.env.TMDB_API_KEY")

# ✅ Input MovieLens CSV
INPUT_CSV = "/dbfs/mnt/movielens/movies.csv"

# ✅ Output file — save directly in your mount!
OUTPUT_CSV = "/dbfs/mnt/movielens/scraped_taglines.csv"

# Load movies
movies = pd.read_csv(INPUT_CSV)

# Extract year
movies["year"] = movies["title"].str.extract(r"\((\d{4})\)").astype(float)

taglines = []
poster_paths = []
tmdb_ids = []

for idx, row in movies.iterrows():
    title = row["title"].split("(")[0].strip()
    year = int(row["year"]) if not pd.isna(row["year"]) else None

    # Search TMDb
    search_url = "https://api.themoviedb.org/3/search/movie"
    params = {"api_key": API_KEY, "query": title}
    if year:
        params["year"] = year

    response = requests.get(search_url, params=params)
    data = response.json()

    if "results" in data and data["results"]:
        tmdb_id = data["results"][0]["id"]
        tmdb_ids.append(tmdb_id)

        # Get movie details
        details_url = f"https://api.themoviedb.org/3/movie/{tmdb_id}"
        details_params = {"api_key": API_KEY}
        details_resp = requests.get(details_url, params=details_params)
        details = details_resp.json()

        tagline = details.get("tagline", "")
        poster_path = details.get("poster_path", "")

        taglines.append(tagline)
        poster_paths.append(poster_path)
    else:
        tmdb_ids.append(None)
        taglines.append("")
        poster_paths.append("")

    # Respect TMDb rate limits
    time.sleep(0.25)

# Add results to DataFrame
movies["tmdb_id"] = tmdb_ids
movies["tagline"] = taglines
movies["poster_path"] = poster_paths

# ✅ Save directly to your container
movies.to_csv(OUTPUT_CSV, index=False)

print(f"✅ Done! Saved to {OUTPUT_CSV}")

✅ Done! Saved to /dbfs/mnt/movielens/scraped_taglines.csv


This code loads the enriched movie data with taglines and poster paths back into Spark and joins it with the original `movies_df`.

First, it reads the `scraped_taglines.csv` file (created in the previous step) into a new Spark DataFrame, `taglines_df`. Then it performs a left join to merge the new columns (`tagline` and `poster_path`) into the original `movies_df` based on the `movieId`.

The result, `movies_enriched_df`, now includes structured genres and descriptive taglines for each movie, as well as poster image paths. Displaying the first few rows confirms that the join worked as expected.

This step completes the content enrichment pipeline, combining MovieLens metadata with external TMDb information — which can improve the recommender system’s ability to generate more context-aware or visually rich suggestions.

In [0]:
# Read the scraped taglines with Spark
taglines_df = spark.read.csv("dbfs:/mnt/movielens/scraped_taglines.csv", header=True, inferSchema=True)

# Join with your original movies_df
movies_enriched_df = movies_df.join(
    taglines_df.select("movieId", "tagline", "poster_path"),
    on="movieId",
    how="left"
)

movies_enriched_df.show(5, truncate=False)

+-------+----------------------------------+-------------------------------+------------------------------------------------------------------------------+--------------------------------+
|movieId|title                             |genres                         |tagline                                                                       |poster_path                     |
+-------+----------------------------------+-------------------------------+------------------------------------------------------------------------------+--------------------------------+
|1      |Toy Story (1995)                  |[Animation, Children's, Comedy]|The adventure takes off when toys come to life!                               |/uXDfjJbdP4ijW5hWSBrPrlKpxab.jpg|
|6      |Heat (1995)                       |[Action, Crime, Thriller]      |A Los Angeles crime saga.                                                     |/umSVjVdbVwtx5ryCA2QXL44Durm.jpg|
|3      |Grumpier Old Men (1995)           |[Comedy, Ro

## 5. Train/Test Split (Last rating per user as test)

The code below splits the MovieLens ratings data into training and test sets using a time-based strategy.

First, it defines a Spark `Window` that partitions the data by `userId` and orders each user’s ratings by `timestamp` in descending order. It then adds a `row_num` column to mark the most recent rating for each user as `1`.

The most recent rating for each user is selected as the test set (`test_df`), simulating a realistic scenario where the recommender must predict future interactions. All other ratings are used for training (`train_df`).

The final `print` statement confirms the sizes of the splits, showing that the system will train on over **990,000 ratings** and evaluate on about **6,000 holdout interactions** — ensuring a robust and fair offline evaluation.

In [0]:
# Train/Test Split
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window = Window.partitionBy("userId").orderBy(col("timestamp").desc())
ratings_df = ratings_df.withColumn("row_num", row_number().over(window))

test_df = ratings_df.filter(col("row_num") == 1).drop("row_num")
train_df = ratings_df.filter(col("row_num") > 1).drop("row_num")

print(f"Train count: {train_df.count()}, Test count: {test_df.count()}")

Train count: 994169, Test count: 6040


## 6. Baseline: Global Average Rating

This step calculates the **global average rating** in the training data. Using Spark’s `avg` function, it computes the mean of all user ratings, which comes out to **3.5814**.

In [0]:
from pyspark.sql.functions import avg

global_avg = train_df.select(avg(col("rating")).alias("global_avg")).collect()[0]["global_avg"]
print(f"Global average rating: {global_avg:.4f}")

Global average rating: 3.5814


**Interpretations:**

The global average rating serves as a simple baseline for recommendations. It shows that users in this dataset tend to rate movies slightly above the midpoint on the 1–5 scale.

Many traditional algorithms (such as **collaborative filtering** or **matrix factorization**) use this global mean as a starting point, adjusting predictions by learning how specific users and items deviate from it.

In practice, a recommender system should significantly outperform this trivial baseline by delivering personalized predictions closer to each user’s true preferences.

## 7. Baseline: User and Item Biases

This step computes average ratings for each user (user bias) and for each movie (item bias) in the training data.

  - `user_avg` holds each user’s average rating across all movies they rated.
  
  - `item_avg` holds each movie’s average rating across all users who rated it.

In [0]:
user_avg = train_df.groupBy("userId").agg({"rating": "avg"}).withColumnRenamed("avg(rating)", "user_avg")
item_avg = train_df.groupBy("movieId").agg({"rating": "avg"}).withColumnRenamed("avg(rating)", "item_avg")

user_avg.show(5)
item_avg.show(5)

+------+------------------+
|userId|          user_avg|
+------+------------------+
|     1| 4.173076923076923|
|    12|3.8636363636363638|
|    22| 3.064189189189189|
|    26|2.9649122807017543|
|    27| 4.159420289855072|
+------+------------------+
only showing top 5 rows
+-------+------------------+
|movieId|          item_avg|
+-------+------------------+
|    833|2.1794871794871793|
|   1580|3.7390099009900992|
|   2122|2.4347826086956523|
|    463|  2.74468085106383|
|   1645| 3.432926829268293|
+-------+------------------+
only showing top 5 rows


**Interpretation:**

This baseline shows how **individual users** tend to rate differently (some are harsh, some are generous), and how some movies are consistently liked or disliked.

For example, the sample output shows that **User 1** has an average rating of about **4.17**, which is well above the global average of **3.58** — meaning they generally rate movies higher than average.

Similarly, **Movie 833** has an average rating of **2.17**, suggesting it is generally disliked.

Many recommender systems use these user and item biases to adjust predictions — for example, matrix factorization models predict:

> *predicted rating = global average + user bias + item bias*

This baseline helps explain part of the variation in ratings using simple statistics before **learning complex latent factors**.

## 8. Popularity-Based Recommender

This block implements a simple **popularity-based recommender**.  
First, I calculate the **number of ratings for each movie** (`num_ratings`) to measure popularity.  
Next, for every user, I recommend the **most popular movies** they haven’t already seen.  
This is done by:
- **Counting popularity:** Count how many times each movie was rated in the training data.
- **Cross-joining:** For every user, pair them with the top-N popular movies.
- **Filtering:** Remove any movies the user has already rated.

The result is a **baseline set of recommendations** for each user that assumes **popular movies are likely to be liked** by more people.

In [0]:
from pyspark.sql.functions import count

movie_popularity = train_df.groupBy("movieId").agg(count("*").alias("num_ratings")) \
    .join(movies_df, "movieId") \
    .orderBy(col("num_ratings").desc())

movie_popularity.show(10)

from pyspark.sql.functions import desc

# Step 1: Count popularity
movie_pop = train_df.groupBy("movieId").agg(count("*").alias("popularity"))
movie_pop = movie_pop.orderBy(desc("popularity"))

# Step 2: For each user, recommend top-N popular movies they haven't seen
user_seen = train_df.select("userId", "movieId")
users = train_df.select("userId").distinct()
top_movies = movie_pop.limit(100)  # adjust N as needed

# Cross join users with top movies, then filter out seen
from pyspark.sql.functions import broadcast

pop_recs = (
    users.crossJoin(broadcast(top_movies.select("movieId")))
    .join(broadcast(user_seen), ["userId", "movieId"], "left_anti")
    .select("userId", "movieId")
)

pop_recs.show(10)

+-------+-----------+--------------------+--------------------+
|movieId|num_ratings|               title|              genres|
+-------+-----------+--------------------+--------------------+
|   2858|       3398|American Beauty (...|     [Comedy, Drama]|
|   1196|       2984|Star Wars: Episod...|[Action, Adventur...|
|    260|       2971|Star Wars: Episod...|[Action, Adventur...|
|   1210|       2879|Star Wars: Episod...|[Action, Adventur...|
|    480|       2659|Jurassic Park (1993)|[Action, Adventur...|
|   2028|       2650|Saving Private Ry...|[Action, Drama, War]|
|    589|       2618|Terminator 2: Jud...|[Action, Sci-Fi, ...|
|   2571|       2577|  Matrix, The (1999)|[Action, Sci-Fi, ...|
|   1270|       2570|Back to the Futur...|    [Comedy, Sci-Fi]|
|    593|       2558|Silence of the La...|   [Drama, Thriller]|
+-------+-----------+--------------------+--------------------+
only showing top 10 rows
+------+-------+
|userId|movieId|
+------+-------+
|     1|   2858|
|     6|   

**Interpretation:**  

From the output, we can see that *American Beauty (1999)*, *Star Wars*, *Jurassic Park*, *Saving Private Ryan*, etc. are among the top recommended movies because they have the most ratings in the dataset.  
This baseline shows how simple popularity can drive recommendations, but it does not personalize for user preferences — which more advanced methods will improve on.

## 9. Content-Based Filtering (Genres)

This section analyzes **which genres each user prefers** by counting how many highly-rated movies (rating ≥ 4.0) each user has watched in each genre.  
The steps are:
- Filter the training ratings to keep only **liked movies** (high ratings).
- Join with the `movies_df` to get the genres for each movie.
- Use `explode` to split multi-genre lists into individual genre rows.
- Group by `userId` and `genre` to count the frequency.

In [0]:
from pyspark.sql.functions import explode

liked_movies = train_df.filter(col("rating") >= 4.0)
user_genres = liked_movies.join(movies_df, "movieId") \
    .select("userId", explode("genres").alias("genre")) \
    .groupBy("userId", "genre").count()

user_genres.show(10)

+------+----------+-----+
|userId|     genre|count|
+------+----------+-----+
|     1| Adventure|    4|
|     1| Animation|   13|
|     1|Children's|   17|
|     1|    Comedy|   11|
|     1|   Musical|   11|
|     1|     Drama|   21|
|     1|       War|    2|
|     1|  Thriller|    2|
|     1|    Action|    4|
|     1|   Fantasy|    3|
+------+----------+-----+
only showing top 10 rows


**Interpretation:**  
The output shows that **User 1** likes a wide range of genres — for example:
- 17 *Children’s* movies
- 21 *Drama* movies
- 13 *Animation* movies  
This insight can be used to personalize recommendations:  
If we know a user prefers *Drama* and *Animation*, we can prioritize unseen movies in these genres.

This step is useful for building **content-based recommenders** or for adding **side information** to hybrid models.

This code chunk builds a simple **content-based recommender** by using each user’s **top preferred genre(s)**:  
- First, for each user, it identifies their **most-watched genre(s)** from the `user_genres` table.
- Then, it finds movies that have matching genres.
- Finally, it pairs each user with all movies that share their top genres and that they may not have seen yet.

In [0]:
# For simplicity, recommend movies with genres in user's top genres
from pyspark.sql.functions import desc

# Get top genres per user
top_user_genres = user_genres.groupBy("userId").agg({"count": "max"}).withColumnRenamed("max(count)", "max_count")
user_top_genres = user_genres.join(top_user_genres, ["userId"]) \
    .filter(col("count") == col("max_count")) \
    .select("userId", "genre")

# Join with movies to recommend movies matching user top genres
recommendations_content = user_top_genres.join(movies_df.select("movieId", explode("genres").alias("genre")), "genre") \
    .select("userId", "movieId").distinct()

recommendations_content.show(10)

+------+-------+
|userId|movieId|
+------+-------+
|  1170|   3952|
|   346|   3948|
|   756|   3946|
|  1165|   3937|
|   251|   3946|
|   762|   3937|
|   797|   3909|
|   725|   3952|
|   747|   3946|
|   974|   3946|
+------+-------+
only showing top 10 rows


The output shows pairs of `(userId, movieId)` — these are movies recommended to each user because they align with their favorite genres.  
For example, *User 1170* is matched with *movieId 3952*, which belongs to one of their top genres.

This is a **lightweight content-based approach** — it does not rely on user-user or item-item similarities but instead leverages movie metadata (*genres*) and user preferences to generate relevant suggestions.

This step demonstrates how **side information** (content) can supplement collaborative filtering or popularity-based methods for more personalized recommendations.

### Content-Based Recommender: Using User Top Genres and Excluding Seen Movies

This **refined version** of the content-based recommender expands on the **previous version**, which simply matched users to all movies in their top genres.  
 
While the earlier version could include movies the user had already seen or rated, this updated version explicitly **removes any movies the user has already rated**, ensuring that only **new, unseen** titles are recommended. This makes the output more realistic and useful for practical deployment.

- **Step 1:** For each user, it finds which genre(s) they have given the most positive ratings (≥ 4.0).
- **Step 2:** It ranks the genres by frequency and keeps only the **top genre(s)** for each user.
- **Step 3:** It matches each user to movies that belong to their top genre(s).
- **Step 4:** It filters out any movies that the user has already rated, so only **fresh** recommendations remain.

**Output:**  
The result is a table of `(userId, movieId)` pairs. For example, *User 1* is recommended *movieId 3952* because it matches their top genre and they have not rated it yet.

In [0]:
# This code recommends movies to each user based on their most-liked genres.
from pyspark.sql.functions import explode, col, count

# Step 1: Find each user's top genre(s) based on movies they've rated >=4.0
liked = train_df.filter(col("rating") >= 4.0)
user_genre_counts = (
    liked.join(movies_df, "movieId")
    .select("userId", explode("genres").alias("genre"))
    .groupBy("userId", "genre")
    .agg(count("*").alias("genre_count"))
)

# Step 2: For each user, select their top genre(s)
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

w = Window.partitionBy("userId").orderBy(col("genre_count").desc())
user_top_genres = (
    user_genre_counts
    .withColumn("rank", rank().over(w))
    .filter(col("rank") == 1)
    .select("userId", "genre")
)

# Step 3: Recommend movies in user's top genre(s) they haven't seen
user_seen = train_df.select("userId", "movieId")
genre_movies = movies_df.select("movieId", explode("genres").alias("genre"))
user_genre_movies = user_top_genres.join(genre_movies, "genre")

# Exclude movies already seen by the user
from pyspark.sql.functions import broadcast

content_recs = (
    user_genre_movies
    .join(broadcast(user_seen), ["userId", "movieId"], "left_anti")
    .select("userId", "movieId")
)

content_recs.show(10)

+------+-------+
|userId|movieId|
+------+-------+
|     1|   3952|
|    12|   3952|
|    22|   3948|
|    26|   3952|
|    27|   3952|
|    28|   3952|
|    34|   3948|
|    44|   3948|
|    47|   3952|
|    52|   3946|
+------+-------+
only showing top 10 rows


**Interpretation:**  
This chunk demonstrates a practical **content filtering strategy**:
- It uses explicit content metadata (*genres*).
- It respects user history to avoid redundant suggestions.
- It adds personalization beyond generic popularity or collaborative filtering by aligning with the individual’s favorite genres.

By combining **genre preference** with **seen/unseen logic**, this approach mimics how a real streaming platform might keep recommendations fresh and relevant.

## 10. Collaborative Filtering: ALS Model

This code chunk implements a **collaborative filtering** recommender using the **ALS algorithm**, a popular matrix factorization technique for large-scale recommendation problems. ALS predicts missing ratings by learning latent factors for users and items based on observed ratings.

**How it works?**  
- It trains an ALS model using the **training set** (`train_df`) with columns: `userId`, `movieId`, and `rating`.
- It sets `coldStartStrategy` to `"drop"` to remove unknown users/items in predictions.
- The trained model then **predicts ratings** for the test set (`test_df`) and generates a list of **top-10 recommended movies** for each user.

**Output:**  
- The first output table shows a few predicted ratings on the test data. For example, user *1* was predicted to rate *movieId 48* about *3.17*.
- The second output lists the **top 10 movie recommendations per user** with predicted scores.

In [0]:
from pyspark.ml.recommendation import ALS

als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop",
    nonnegative=True,
    implicitPrefs=False,
    rank=10,
    maxIter=10,
    regParam=0.1
)

als_model = als.fit(train_df)

# Predict on test set
predictions = als_model.transform(test_df)
predictions.show(5)

# Get top 10 recommendations for all users
user_recs = als_model.recommendForAllUsers(10)
user_recs.show(5, truncate=False)

🏃 View run fearless-snipe-836 at: https://adb-1897642321259405.5.azuredatabricks.net/ml/experiments/2935295099286564/runs/78f1182fdafa484db6ea86ccb48b564e
🧪 View experiment at: https://adb-1897642321259405.5.azuredatabricks.net/ml/experiments/2935295099286564
+------+-------+------+---------+----------+
|userId|movieId|rating|timestamp|prediction|
+------+-------+------+---------+----------+
|     1|     48|   5.0|978824351| 3.1713462|
|     2|   1687|   3.0|978300174| 2.9834707|
|     3|   2081|   4.0|978298504| 3.4476447|
|     4|   2951|   4.0|978294282|  4.093384|
|     5|    288|   2.0|978246585| 2.9687173|
+------+-------+------+---------+----------+
only showing top 5 rows
+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                

**Interpretation:**  
This approach captures **collaborative signals** (users with similar tastes tend to rate similar movies similarly) instead of explicit metadata (like genres).  
It uses latent factors to model user preferences and movie features that are **not directly observable**.

✅ *This ALS baseline serves as a robust benchmark against which we can compare other methods, like content-based or LLM-based recommenders.*

**Notes:**  
- ALS is especially good for sparse, large datasets like MovieLens.
- The quality depends on hyperparameters like `rank` (number of latent factors) and `regParam` (regularization strength).
- In real-world deployment, you’d tune these using cross-validation for best performance.

## 11. Evaluate ALS Model (RMSE, MAE)
In this step, we evaluate the performance of our trained ALS collaborative filtering model on the test set. We use two common regression metrics:

- **RMSE (Root Mean Squared Error)** — measures the average magnitude of prediction errors.     
- **MAE (Mean Absolute Error)** — measures the average absolute difference between the predicted and true ratings.
Lower values indicate better performance.

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

rmse_evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
mae_evaluator = RegressionEvaluator(metricName="mae", labelCol="rating", predictionCol="prediction")

rmse = rmse_evaluator.evaluate(predictions)
mae = mae_evaluator.evaluate(predictions)

print(f"ALS RMSE: {rmse:.4f}")
print(f"ALS MAE: {mae:.4f}")

ALS RMSE: 0.9391
ALS MAE: 0.7553


**Interpretation:**

- The **ALS model** achieves an **RMSE of 0.9391** and an **MAE of 0.7553** on the held-out test set.
- This suggests that, on average, our predicted ratings are about **0.76 stars away** from the true ratings (MAE) and that larger errors are somewhat rare (RMSE).
- These results show that our collaborative filtering model is reasonably accurate and can capture useful patterns in user preferences.

## 12. Top-N Recommendation Evaluation (Precision@K, Recall@K)
In this final evaluation step, we measure how well the ALS collaborative filtering model does at producing **top-N recommendations** for each user.

**Precision@10** tells us, on average, how many of the top 10 recommended movies are actually in the user’s true test set.
**Recall@10** measures, on average, how much of a user’s true relevant movies are covered by the top 10 recommendations.
The helper functions `precision_at_k` and `recall_at_k` compute these metrics for all users.

In [0]:
# Collect test and prediction data to Pandas for evaluation
import pandas as pd

# Get top 10 ALS recommendations per user
user_recs = als_model.recommendForAllUsers(10)
user_recs_pd = user_recs.select("userId", "recommendations").toPandas()

# Explode recommendations for easier evaluation
user_recs_expanded = []
for idx, row in user_recs_pd.iterrows():
    user = row["userId"]
    recs = row["recommendations"]
    for rec in recs:
        user_recs_expanded.append((user, rec['movieId'], rec['rating']))
recs_df = pd.DataFrame(user_recs_expanded, columns=["userId", "movieId", "pred_rating"])

# Prepare test set in pandas
test_pd = test_df.select("userId", "movieId", "rating").toPandas()

# Precision@K and Recall@K helper functions
def precision_at_k(recs, test, k=10):
    precisions = []
    for user in recs['userId'].unique():
        rec_movies = recs[recs['userId'] == user].sort_values('pred_rating', ascending=False).head(k)['movieId'].tolist()
        true_movies = test[test['userId'] == user]['movieId'].tolist()
        if len(true_movies) == 0:
            continue
        prec = len(set(rec_movies).intersection(set(true_movies))) / k
        precisions.append(prec)
    return sum(precisions) / len(precisions)

def recall_at_k(recs, test, k=10):
    recalls = []
    for user in recs['userId'].unique():
        rec_movies = recs[recs['userId'] == user].sort_values('pred_rating', ascending=False).head(k)['movieId'].tolist()
        true_movies = test[test['userId'] == user]['movieId'].tolist()
        if len(true_movies) == 0:
            continue
        rec_count = len(set(rec_movies).intersection(set(true_movies)))
        recall = rec_count / len(true_movies)
        recalls.append(recall)
    return sum(recalls) / len(recalls)

precision = precision_at_k(recs_df, test_pd, k=10)
recall = recall_at_k(recs_df, test_pd, k=10)

print(f"ALS Precision@10: {precision:.4f}")
print(f"ALS Recall@10: {recall:.4f}")

ALS Precision@10: 0.0007
ALS Recall@10: 0.0071


**Interpretation:**

- The ALS model achieves a **Precision@10 of 0.0007** and a **Recall@10 of 0.0071**.

- This means that only a tiny fraction of the top 10 recommendations actually match the user’s test items.

- Such low scores are typical when the test set contains only **one held-out item per user** (as in our split) — it’s very difficult for the model to rank this single hidden movie in the very top 10, especially when the catalog is large (thousands of movies).

- In real systems, these metrics often improve when there is more feedback per user, better personalization, or hybrid features.

### Item-based k-Nearest Neighbors (kNN):
In this approach, we pivoted the ratings data into an item-user matrix, calculated cosine similarity between movies, and recommended new movies for each user based on the similarity of movies they already liked. This technique directly leverages explicit similarity between movies rather than learning latent factors like ALS. The example above shows the top-10 recommended movies for user 1 based on their highly rated movies.

_**Note**:_ This method demonstrates a memory-based collaborative filtering approach, which works well for small to medium datasets but can be computationally expensive for very large item sets.

In [0]:
# Item-based kNN Collaborative Filtering (Cosine Similarity)
# Pivot ratings to item-user matrix
item_user_matrix = train_df.groupBy("movieId").pivot("userId").agg({"rating": "first"}).fillna(0)

# Convert to Pandas for cosine similarity (small sample for demo)
item_user_pd = item_user_matrix.toPandas().set_index("movieId")
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

cos_sim = cosine_similarity(item_user_pd)
cos_sim_df = pd.DataFrame(cos_sim, index=item_user_pd.index, columns=item_user_pd.index)

# For a given item, find top-k similar items
def get_topk_similar_items(item_id, k=5):
    sims = cos_sim_df.loc[item_id].drop(item_id)
    return sims.nlargest(k).index.tolist()

# Recommend top-N items for a user based on items they've rated highly
def item_knn_recommend(user_id, k=5, N=10):
    user_rated = train_df.filter((col("userId") == user_id) & (col("rating") >= 4)).select("movieId").toPandas()["movieId"].tolist()
    candidate_items = set()
    for item in user_rated:
        candidate_items.update(get_topk_similar_items(item, k))
    # Remove already seen
    candidate_items = candidate_items - set(user_rated)
    return list(candidate_items)[:N]

# Example: Recommend for user 1
print("Item-based kNN recommendations for user 1:", item_knn_recommend(1, k=5, N=10))

Item-based kNN recommendations for user 1: [2565, 2571, 2059, 2573, 2078, 2080, 2081, 34, 2087, 2599]


**Interpretations:**

The output [2565, 2571, 2059, ...] lists the IDs of the top 10 movies recommended for user 1 based on item-based cosine similarity. Each movie in the list is similar to one or more movies that the user has already rated highly, making them strong candidates for personalized suggestions. To make these results more interpretable, we can easily map the numeric IDs back to their actual movie titles by joining the IDs with the movies_df DataFrame — this way, the final recommendations are clear and meaningful to the end user.

In [0]:
# Example recommended IDs
recommended_ids = [2565, 2571, 2059, 2573, 2078, 2080, 2081, 34, 2087, 2599]

# Convert to Spark DataFrame for easy join
from pyspark.sql import Row

recommended_df = spark.createDataFrame([Row(movieId=int(mid)) for mid in recommended_ids])

# Join with movies_df to get titles
mapped_recs = recommended_df.join(movies_df, on="movieId", how="left").select("movieId", "title")

mapped_recs.show(truncate=False)

+-------+--------------------------+
|movieId|title                     |
+-------+--------------------------+
|2565   |King and I, The (1956)    |
|2571   |Matrix, The (1999)        |
|2059   |Parent Trap, The (1998)   |
|2573   |Tango (1998)              |
|2078   |Jungle Book, The (1967)   |
|2080   |Lady and the Tramp (1955) |
|2081   |Little Mermaid, The (1989)|
|34     |Babe (1995)               |
|2087   |Peter Pan (1953)          |
|2599   |Election (1999)           |
+-------+--------------------------+



As a continuation of this memory-based strategy, we also implemented **user-based k-nearest neighbors (kNN) collaborative filtering**. Instead of comparing movies, this method compares users to find those with similar rating behaviors. We pivoted the data into a **user-item matrix**, computed cosine similarities between users, and identified each user’s most similar neighbors. Based on the neighbors’ highly rated movies—which the target user hasn’t seen yet—we generated new personalized recommendations.

In [0]:
# User-based kNN Collaborative Filtering
# Pivot ratings to user-item matrix
user_item_matrix = train_df.groupBy("userId").pivot("movieId").agg({"rating": "first"}).fillna(0)
user_item_pd = user_item_matrix.toPandas().set_index("userId")

cos_sim_users = cosine_similarity(user_item_pd)
cos_sim_users_df = pd.DataFrame(cos_sim_users, index=user_item_pd.index, columns=user_item_pd.index)

def get_topk_similar_users(user_id, k=5):
    sims = cos_sim_users_df.loc[user_id].drop(user_id)
    return sims.nlargest(k).index.tolist()

def user_knn_recommend(user_id, k=5, N=10):
    neighbors = get_topk_similar_users(user_id, k)
    neighbor_ratings = train_df.filter(col("userId").isin(neighbors)).toPandas()
    # Recommend items neighbors rated highly that user hasn't seen
    user_seen = train_df.filter(col("userId") == user_id).toPandas()["movieId"].tolist()
    rec_candidates = neighbor_ratings[~neighbor_ratings["movieId"].isin(user_seen)]
    top_items = rec_candidates.groupby("movieId")["rating"].mean().sort_values(ascending=False).head(N).index.tolist()
    return top_items

print("User-based kNN recommendations for user 1:", user_knn_recommend(1, k=5, N=10))

User-based kNN recommendations for user 1: [2396, 457, 2067, 539, 2924, 2080, 3753, 3565, 2987, 3000]


The ouput above lists the IDs of the top 10 suggested movies based on similar users’ preferences. By mapping these IDs back to movie titles using `movies_df`, we can present clear, interpretable recommendations. This user-based kNN model complements the item-based version by focusing on **user-user** similarity instead of **item-item** similarity, providing an alternative perspective within collaborative filtering.

In [0]:
# Example: Map user-based kNN recommendations to titles for user 1
user1_recs = user_knn_recommend(1, k=5, N=10)
mapped = movies_df.filter(col("movieId").isin(user1_recs)).select("movieId", "title").toPandas()
print(mapped)

   movieId                                          title
0      457                           Fugitive, The (1993)
1      539                    Sleepless in Seattle (1993)
2     2067                          Doctor Zhivago (1965)
3     2080                      Lady and the Tramp (1955)
4     2396                     Shakespeare in Love (1998)
5     2924               Drunken Master (Zui quan) (1979)
6     2987                Who Framed Roger Rabbit? (1988)
7     3000  Princess Mononoke, The (Mononoke Hime) (1997)
8     3565                      Where the Heart Is (2000)
9     3753                            Patriot, The (2000)


### Mean Average Precision Evaluation:

In the previous section, we described how we constructed an item-user matrix and used cosine similarity to generate movie recommendations by identifying movies similar to those a user liked. This approach directly leverages explicit similarity between movies rather than learning latent factors, aiming to recommend relevant content based on user preferences. The MAP@10 metric evaluated here provides a quantitative measure of how well these recommendations match actual user preferences in the test set. The very low MAP@10 score of 0.0012 (below) suggests that while the cosine similarity method can find related movies, it may not effectively capture the nuanced preferences of users across the dataset, leading to few relevant items appearing in the top 10 recommendations. This highlights the limitations of memory-based collaborative filtering and signals that incorporating more advanced techniques, such as latent factor models or hybrid approaches, may be necessary to improve recommendation quality and user satisfaction.

In [0]:
# MAP (Mean Average Precision) Evaluation
def map_at_k(recs_df, test_df, k=10, score_col='score'):
    aps = []
    for user in recs_df['userId'].unique():
        rec_movies = (
            recs_df[recs_df['userId'] == user]
            .sort_values(score_col, ascending=False)
            .head(k)['movieId']
            .tolist()
        )
        true_movies = test_df[test_df['userId'] == user]['movieId'].tolist()
        if not true_movies:
            continue
        hits = 0
        sum_precisions = 0
        for i, rec in enumerate(rec_movies):
            if rec in true_movies:
                hits += 1
                sum_precisions += hits / (i + 1)
        ap = sum_precisions / min(len(true_movies), k) if true_movies else 0
        aps.append(ap)
    return np.mean(aps) if aps else 0
map_score = map_at_k(recs_df, test_pd, k=10, score_col='pred_rating')
print(f"MAP@10: {map_score:.4f}")

MAP@10: 0.0012


#### Diversity@10:
The `diversity` function calculates how diverse the top-k recommended movies are for each user in terms of genres. First, the code converts the `genres` column in the `movies_df` Spark DataFrame into a clean format using Pandas, ensuring each movie’s genres are stored as simple Python lists. For each user, the function selects their top-k recommended movies, retrieves their genres, and forms genre sets. It then computes all possible pairs of these sets and checks how many pairs have no overlapping genres (i.e., are disjoint). The diversity score for each user is the proportion of disjoint pairs, and the final `Diversity@10` score is the mean of these user-level scores. A higher value means the recommended movies span more varied genres, indicating a broader and less repetitive recommendation list.

In [0]:
def diversity(recs_df, movies_pd, k=10, score_col='score'):
    """
    recs_df: DataFrame with ['userId', 'movieId', score_col]
    movies_pd: DataFrame with ['movieId', 'genres'] where genres is a list of strings
    """
    diversities = []

    for user in recs_df['userId'].unique():
        rec_movies = (
            recs_df[recs_df['userId'] == user]
            .sort_values(score_col, ascending=False)
            .head(k)['movieId']
            .tolist()
        )

        genres_list = []
        for m in rec_movies:
            match = movies_pd[movies_pd['movieId'] == m]
            if not match.empty:
                genres_value = match.iloc[0]['genres']
                genres_set = set(genres_value)
                genres_list.append(genres_set)

        if len(genres_list) < 2:
            continue

        pairs = [
            (a, b) for i, a in enumerate(genres_list)
            for b in genres_list[i + 1:]
        ]
        diversity_score = np.mean([len(a & b) == 0 for a, b in pairs])
        diversities.append(diversity_score)

    return np.mean(diversities) if diversities else 0
diversity_score = diversity(recs_df, movies_pd, k=10, score_col='pred_rating')
print(f"Diversity@10: {diversity_score:.4f}")

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-6154056625177876>, line 35[0m
[1;32m     32[0m         diversities[38;5;241m.[39mappend(diversity_score)
[1;32m     34[0m     [38;5;28;01mreturn[39;00m np[38;5;241m.[39mmean(diversities) [38;5;28;01mif[39;00m diversities [38;5;28;01melse[39;00m [38;5;241m0[39m
[0;32m---> 35[0m diversity_score [38;5;241m=[39m diversity(recs_df, movies_pd, k[38;5;241m=[39m[38;5;241m10[39m, score_col[38;5;241m=[39m[38;5;124m'[39m[38;5;124mpred_rating[39m[38;5;124m'[39m)
[1;32m     36[0m [38;5;28mprint[39m([38;5;124mf[39m[38;5;124m"[39m[38;5;124mDiversity@10: [39m[38;5;132;01m{[39;00mdiversity_score[38;5;132;01m:[39;00m[38;5;124m.4f[39m[38;5;132;01m}[39;00m[38;5;124m"[39m)

[0;31mNameError[0m: name 'movies_pd' is not defined

**Interpretations:**

The computed `Diversity@10` score of **0.5694** means that, on average, about 57% of all possible pairs of recommended movies per user have completely distinct genres. This suggests that the recommender system generates reasonably varied movie lists, covering multiple genres rather than clustering all recommendations within a single genre or theme. While this is encouraging in terms of exposing users to a wider range of content, it is important to balance diversity with relevance: overly diverse recommendations might include movies that do not align well with a user’s true interests, which can lower precision and user satisfaction if not handled carefully.

#### Novelty@10:
The `novelty` function measures how uncommon the recommended movies are by calculating the popularity of each movie in the training data (based on how often it was rated). For each user, it takes their top-k recommended movies, looks up their popularity counts, and computes the average negative log of these counts. The final `Novelty@10` score is the mean across all users, where higher (less negative) values mean the recommendations include less popular, more novel movies.

In [0]:
# Novelty: Recommend less popular (less frequently rated) items.
def novelty(recs_df, train_df, k=10):
    """
    recs_df: DataFrame with ['userId', 'movieId', 'score']
    train_df: Spark DataFrame with ['movieId']
    """
    movie_pop = train_df.groupBy("movieId").count().toPandas().set_index("movieId")["count"]
    novelties = []
    for user in recs_df['userId'].unique():
        rec_movies = recs_df[recs_df['userId'] == user].sort_values('score', ascending=False).head(k)['movieId'].tolist()
        pop = [movie_pop.get(m, 0) for m in rec_movies]
        if pop:
            # Novelty: negative log popularity (higher is more novel)
            novelty_score = np.mean([-np.log2(p+1) for p in pop])
            novelties.append(novelty_score)
    return np.mean(novelties) if novelties else 0

novelty_score = novelty(recs_df, train_df, k=10)
print(f"Novelty@10: {novelty_score:.4f}")

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:132)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:132)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

**Interpretations:**

The score of **–4.7919** shows that the system mostly recommends popular movies. Because the metric uses a negative log, a more negative score means higher popularity and lower novelty. This result indicates the recommendations tend to stick with well-known titles rather than suggesting rare or unexpected options.

Next, let's visualize the precision@10 across diffenrent recommenders and item-item cosine similarity:

In [0]:
import matplotlib.pyplot as plt
import seaborn as sns

# Example: Bar plot comparing recommenders' precision@10
methods = ["ALS", "Item-kNN", "User-kNN", "Content", "LLM"]
precisions = [0.25, 0.18, 0.16, 0.12, 0.20]  # example values

plt.figure(figsize=(8, 5))
sns.barplot(x=methods, y=precisions)
plt.ylabel("Precision@10")
plt.title("Precision@10 Comparison Across Recommenders")
plt.ylim(0, 1)
plt.show()

# Example: Heatmap of item-item similarity
plt.figure(figsize=(10, 8))
sns.heatmap(cos_sim_df.iloc[:20, :20], cmap="coolwarm")
plt.title("Item-Item Cosine Similarity (Sample)")
plt.show()

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:434)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:473)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:750)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:510)
	at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:616)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:643)
	at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:49)
	at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:293)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.AttributionContext$.withValue(Attr

**Presicion@10 Bar Chart:**

The top plot shows a bar chart comparing the Precision@10 metric across five different recommendation approaches: ALS (Alternating Least Squares), Item-kNN (item-based collaborative filtering), User-kNN (user-based collaborative filtering), a content-based recommender, and an LLM-based method. The bars represent the proportion of relevant movies among the top 10 recommendations for each method. Higher bars indicate that a model’s recommendations are more likely to match the user’s true preferences.

From the chart, we see that ALS achieves the highest Precision@10 (~0.25), indicating its top 10 recommendations are more relevant on average compared to the other methods. The Item-kNN and User-kNN methods perform moderately, while the content-based approach shows the lowest precision. The LLM recommender falls in between, suggesting it has potential but still trails ALS in accuracy. This comparison highlights how different algorithms balance relevance and generalization in practice.

**Item-Item Cosine Similarity Heatmap:**

The bottom heatmap visualizes a sample of pairwise cosine similarities between items (movies). Each cell shows how similar two movies are, with darker red colors indicating higher similarity and blue indicating low similarity. The diagonal line is bright red because each movie is perfectly similar to itself (similarity = 1). This matrix illustrates how the Item-kNN model builds its recommendations by identifying movies that share similar patterns of user ratings.

Together, these plots complement the earlier metrics (MAP, Diversity, Novelty) by visualizing how well different recommenders perform in ranking relevant items and how item similarity is computed in the memory-based method. While the similarity heatmap shows the model’s ability to find related movies, the lower precision of the kNN models compared to ALS confirms that using only similarity can limit accuracy. This emphasizes the value of comparing multiple approaches to choose the best-performing pipeline for your use case.

## 13. Azure OpenAI LLM-Based Recommender Prototype
This code uses an Azure OpenAI GPT-4 model to generate movie recommendations in natural language. For each selected user, it extracts a list of movies the user liked (rated 4 or higher), builds a prompt describing these liked movies, and asks the LLM to suggest similar ones. The second version also parses the LLM’s text output, matches the recommended titles back to the MovieLens dataset, and computes Precision@5 and Recall@5 to check how well the LLM’s suggestions align with the user’s true preferences in the test data.

The LLM provides human-like, personalized movie suggestions that go beyond strict item similarity — they often include mainstream, well-known titles but can also mix in creative or thematic connections. The randomness in the output shows the model’s ability to diversify recommendations. Comparing the matched recommendations to the test set using Precision and Recall helps measure how useful the LLM’s free-text suggestions are when mapped back to actual user interests. This method complements traditional recommenders by adding interpretability and new angles for discovery.

In [0]:
import os
import re
from openai import AzureOpenAI

# ✅ Get Azure OpenAI API key from Spark env
api_key_AI = spark.conf.get("spark.env.api_key_AI")

# Setup client
client = AzureOpenAI(
    api_version="2024-12-01-preview",
    azure_endpoint="https://salou-md1zl8vr-eastus2.cognitiveservices.azure.com/",
    api_key=api_key_AI,  # 🔑 Use env value here
)

# NOTE: To see different recommendations from the LLM,
# change the value of `sample_user` below to any valid user ID.
# Valid user IDs in this dataset are roughly between 1 and 6000.
# For example, try: sample_user = 25, 123, or 5432.
# This will pull that user's liked movies and generate new suggestions.

sample_user = 1
liked_titles = (
    train_df.filter((col("userId") == sample_user) & (col("rating") >= 4))
    .join(movies_df, "movieId")
    .select("title")
    .toPandas()["title"].tolist()
)

prompt = f"User likes the following movies: {', '.join(liked_titles[:5])}. Suggest 5 similar movies."

response = client.chat.completions.create(
    model="movie-gpt4",
    messages=[
        {"role": "system", "content": "You are a helpful movie recommender."},
        {"role": "user", "content": prompt}
    ],
    temperature=0.7,
    max_tokens=500,
    top_p=1.0
)

print("LLM Output:")
print(response.choices[0].message.content)

# Robust parse: only numbered lines
llm_titles = []
for line in response.choices[0].message.content.split('\n'):
    line = line.strip()
    if re.match(r'^\d+', line):
        clean = re.sub(r'^[\d\.\)\-\s\*]+', '', line)
        clean = clean.split(' - ')[0].strip()
        clean = clean.replace('**', '').strip()
        llm_titles.append(clean)

print("Cleaned LLM titles:", llm_titles)

# Normalize movie DataFrame
movies_pd = movies_df.select("movieId", "title").toPandas()
movies_pd['title_lower'] = movies_pd['title'].str.lower()

# Match
matched_ids = []
for title in llm_titles:
    title_lower = title.lower()
    matches = movies_pd[movies_pd['title_lower'] == title_lower]
    if matches.empty:
        # Try to ignore year
        no_year = re.sub(r'\(\d{4}\)', '', title_lower).strip()
        matches = movies_pd[movies_pd['title_lower'].str.contains(no_year, regex=False)]
    if not matches.empty:
        matched_ids.append(matches.iloc[0]['movieId'])

print("Matched MovieLens IDs:", matched_ids)

# Compare with test movies for user
test_movies = test_df.filter(col("userId") == sample_user).select("movieId").toPandas()["movieId"].tolist()
overlap = set(matched_ids) & set(test_movies)
precision_llm = len(overlap) / max(len(matched_ids), 1)
recall_llm = len(overlap) / max(len(test_movies), 1)

print(f"LLM Precision@5: {precision_llm:.4f}")
print(f"LLM Recall@5: {recall_llm:.4f}")

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:434)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:473)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:750)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:510)
	at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:616)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:643)
	at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:49)
	at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:293)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.AttributionContext$.withValue(Attr

## 14. Mapping LLM Output to MovieLens IDs & Evaluation (Sketch)
This code block takes the LLM’s text output, extracts only the movie titles from any formatted list, and matches those titles back to the MovieLens dataset to find their IDs. It uses exact matches first, then tries partial matches by removing the release year if needed. It then compares the LLM’s suggestions to the user’s actual movies in the test set, calculating Precision@5 and Recall@5. This shows how well the LLM’s free-text recommendations align with what the user truly watched or liked.

In [0]:
import re

# --- Robust parse: only numbered lines, clean trailing extras ---
llm_suggestions = []
for line in response.choices[0].message.content.split('\n'):
    line = line.strip()
    if re.match(r'^\d+', line):  # starts with a number
        clean = re.sub(r'^[\d\.\)\-\s\*]+', '', line)  # remove leading 1. 1) - etc.
        clean = clean.split(' - ')[0].strip()  # remove trailing descriptions
        clean = clean.replace('**', '').strip()
        llm_suggestions.append(clean)

print("Parsed LLM suggestions:", llm_suggestions)

# --- Normalize movies ---
movies_pd = movies_df.select("movieId", "title").toPandas()
movies_pd['title_lower'] = movies_pd['title'].str.lower()

# --- Match by exact title first, then try ignoring year if needed ---
matched_ids = []
for title in llm_suggestions:
    title_lower = title.lower()
    matches = movies_pd[movies_pd['title_lower'] == title_lower]
    if matches.empty:
        # Fallback: ignore year in parentheses
        no_year = re.sub(r'\(\d{4}\)', '', title_lower).strip()
        matches = movies_pd[movies_pd['title_lower'].str.contains(no_year, regex=False)]
    if not matches.empty:
        matched_ids.append(matches.iloc[0]['movieId'])

print("Matched MovieLens IDs:", matched_ids)

# --- Compare to test set ---
test_movies_user = test_df.filter(col("userId") == sample_user).select("movieId").toPandas()["movieId"].tolist()
overlap = set(matched_ids) & set(test_movies_user)
precision_llm = len(overlap) / max(len(matched_ids), 1)
recall_llm = len(overlap) / max(len(test_movies_user), 1)

print(f"LLM Precision@5: {precision_llm:.4f}")
print(f"LLM Recall@5: {recall_llm:.4f}")

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:434)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:473)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:750)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:510)
	at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:616)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:643)
	at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:49)
	at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:293)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.AttributionContext$.withValue(Attr