# An Hybrid Recommendation System for Movies
**Author:** `'Bode Oyeneye`

#### Preamble
Implementing an hybrid recommendation system that recommends movies to a user based on the behavior of similar users (collaborative filtering) and the content (or features) of the movies that user had viewed (content-based filtering). This hybrid approach was planned to leverage 70% collaborative filtering, while the remaining 30% was via content-based filtering to arrive at the final recommended movie.

Notably, this work is a demonstration of machine learning (ML) engineering concept, following the completion of model development works by Data Scientists, with the goal of deploying the candidate ML system into production. Hence, it is not intended to establish the "right" or "most appropriate" model, but rather a simple workflow to demonstrate its deployment using the Flask API for real-time serving.

#### Objectives
 - Develop an hybrid recommendation system that leverages user behaviour and moves features for personalization
 - Demonstrate collaborative and content-based filtering via distributed processing (Spark Infrastructure)

***Initial Setup***: Given the scalability concerns associated with matrix factorization: computational complexity and memory requirements, along with the fact that this project was run on a  MacBook Air M2 (8 CPU, 10 GPU, 8 GB RAM), I leveraged PySpark, by first optimizing its spark configurations. Specifically, I adjusted the driver.memory, executor.memory, executor.cores, driver.maxResultSize and sql.shuffle.partitions based on the resources available on my laptop, and also use checkpointing in my workflow to break the lineage of transformations and reduce memory usage. NB: the checkpointing is used only when needed

# The machine learning workflow for this task is as follows:

- ***Getting the movie data***: The movies data were made available to the public by the Grouplens Research, a research lab at the University of Minnesota (see [link](https://grouplens.org/datasets/movielens/)). This work is performed using their recommended data for education and development, last updated sept 2018, and the data consists of approximately 100,000 ratings and 3,600 tag applications applied to 9,000 movies by 600 users. While the data contains a number of csv files, we utilized only the movies.csv (its datafields are userId, movieId, rating and timestamp) and ratings.csv (its datafields are movieId, title and genres) files

- ***Feature Engineering***: Movies and users are each one-hot encoded, followed by the conduction of TF-IDF to ascertain the relevancy of each word under the genre field. This is achieving using the HashingTF and IDF classes available as part of PySpark API. 

- ***Collaborative Filtering Modeling***: Implementation of collaborative filtering using PySpark API via alternating least squares (ALS) matrix factorization. 

- ***Content-based Filtering Modeling***: For this, I employed random forest regression on top of the content.

- ***Hybrid Model***: The developed model system was diagnosed against underfitting, overfitting or both, and assessed to determine its ability to generalize to previously unseen data using suitable evaluation metrics (e.g., RMSE)

- <b><i>Summary</i></b>: The overall summary of my findings will be presents here

#### 0. Import Relevant Libraries

In [1]:
# Ensuring consistency and handling situations involving difference virtual environment
import os
import requests
import sys
import time

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['SPARK_LOCAL_IP'] = '10.0.0.115' 

In [2]:
from flask import Flask, request, jsonify
from IPython.display import display, Javascript
from pyspark import StorageLevel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import (
    IDF,
    HashingTF,
    StringIndexer,
    OneHotEncoder,
    VectorAssembler
)
from pyspark.ml.recommendation import ALS
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession

import pyspark.sql.functions as F
import threading

#### 1. Load Movies Data

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MovieRecommender") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.maxResultSize", "2g") \
    .config("spark.executor.cores", "2") \
    .config("spark.sql.shuffle.partitions", "100") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/10 23:56:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Set log level and checkpoint directory
# spark.sparkContext.setLogLevel("ERROR")
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")

ratings_df = spark.read.csv("../datasets/ml-latest-small/ratings.csv", header=True, inferSchema=True)
movies_df = spark.read.csv("../datasets/ml-latest-small/movies.csv", header=True, inferSchema=True)

In [5]:
model_df = ratings_df.join(movies_df, on="movieId", how="left")
model_df = model_df.withColumn("genres_list", F.split(F.col("genres"),r"\|"))
model_df.show(5)

+-------+------+------+---------+--------------------+--------------------+--------------------+
|movieId|userId|rating|timestamp|               title|              genres|         genres_list|
+-------+------+------+---------+--------------------+--------------------+--------------------+
|      1|     1|   4.0|964982703|    Toy Story (1995)|Adventure|Animati...|[Adventure, Anima...|
|      3|     1|   4.0|964981247|Grumpier Old Men ...|      Comedy|Romance|   [Comedy, Romance]|
|      6|     1|   4.0|964982224|         Heat (1995)|Action|Crime|Thri...|[Action, Crime, T...|
|     47|     1|   5.0|964983815|Seven (a.k.a. Se7...|    Mystery|Thriller| [Mystery, Thriller]|
|     50|     1|   5.0|964982931|Usual Suspects, T...|Crime|Mystery|Thr...|[Crime, Mystery, ...|
+-------+------+------+---------+--------------------+--------------------+--------------------+
only showing top 5 rows



#### 2. Feature Engineering

In [6]:
# User features
user_indexer = StringIndexer(inputCol="userId", outputCol="user_index")
user_encoder = OneHotEncoder(inputCol="user_index", outputCol="user_vector")

# Movie features
movie_indexer = StringIndexer(inputCol="movieId", outputCol="movie_index")
movie_encoder = OneHotEncoder(inputCol="movie_index", outputCol="movie_vector")

# Movie features (assuming the movie's genres list are available)
hashingTF = HashingTF(inputCol="genres_list", outputCol="raw_features", numFeatures=1000)
idf = IDF(inputCol="raw_features", outputCol="movie_genre_features")

pipeline = Pipeline(stages=[user_indexer, user_encoder, movie_indexer, movie_encoder, hashingTF, idf])
model = pipeline.fit(model_df)
features_df = model.transform(model_df)

                                                                                

#### 3. Collaborative Filtering Modeling

In [7]:
# Initialize ALS
als = ALS(
    userCol="user_index", itemCol="movie_index", ratingCol="rating",
    coldStartStrategy="drop", nonnegative=True,implicitPrefs=True, alpha=1.0
)

# Fit the model on the training data
collaborative_model = als.fit(features_df)

25/03/10 23:56:37 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/03/10 23:56:37 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


#### 4. Content-based Filtering Modeling

In [8]:
assembler = VectorAssembler(inputCols=["movie_vector", "movie_genre_features"], outputCol="movie_features")
rf = RandomForestRegressor(featuresCol="movie_features", labelCol="rating")

content_pipeline = Pipeline(stages=[assembler, rf])
content_model = content_pipeline.fit(features_df)

25/03/10 23:56:39 WARN DAGScheduler: Broadcasting large task binary with size 1015.4 KiB
25/03/10 23:56:39 WARN DAGScheduler: Broadcasting large task binary with size 1015.5 KiB
25/03/10 23:56:39 WARN DAGScheduler: Broadcasting large task binary with size 1030.3 KiB
25/03/10 23:56:40 WARN DAGScheduler: Broadcasting large task binary with size 1578.3 KiB
25/03/10 23:56:41 WARN MemoryStore: Not enough space to cache rdd_329_0 in memory! (computed 1665.0 MiB so far)
25/03/10 23:56:41 WARN BlockManager: Persisting block rdd_329_0 to disk instead.
25/03/10 23:56:46 WARN MemoryStore: Not enough space to cache rdd_329_0 in memory! (computed 1665.0 MiB so far)
25/03/10 23:56:58 WARN DAGScheduler: Broadcasting large task binary with size 1864.6 KiB
25/03/10 23:56:59 WARN MemoryStore: Not enough space to cache rdd_329_0 in memory! (computed 1665.0 MiB so far)
25/03/10 23:57:12 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
25/03/10 23:57:13 WARN MemoryStore: Not enough space

#### 5. Hyrid Model Prediction
Both collaborative and content-based filtering models are combined together with weights of 70% and 30% respectively

In [9]:
def hybrid_predict(userId,movieId,genres_list):
    input_df = spark.createDataFrame([(userId, movieId, genres_list)], ["userId","movieId","genres_list"])
    transformed_df = model.transform(input_df)
    
    cf_prediction = collaborative_model.transform(transformed_df.select("user_index", "movie_index"))
    cb_prediction = content_model.transform(transformed_df.select("movie_vector", "movie_genre_features"))
    
    cf_score = cf_prediction.select("prediction").collect()[0][0]
    cb_score = cb_prediction.select("prediction").collect()[0][0]
    
    return 0.7 * cf_score + 0.3 * cb_score

#### 6. Real Time Serving

In [10]:
# Global flag to control the server loop
server_active = True
flask_thread = None

app = Flask(__name__)
# shutdown_event = threading.Event()
# server_running = True

@app.route('/', methods=['GET'])
def home():
    return "<h1>Movie Recommendation Service</h1><p>Use /recommend endpoint for recommendations.</p>"
    
@app.route('/recommend', methods=['GET', 'POST'])
def recommend():
    if request.method == 'POST':
        userId = request.json['userId']
        movieIds = request.json['movieIds']
        genres_lists = request.json['genres_lists']
        
        recommendations = []
        for movieId, genres_list in zip(movieIds, genres_lists):
            score = hybrid_predict(userId, movieId, genres_list)
            recommendations.append({'movieId': movieId, 'score': score})
        
        return jsonify(sorted(recommendations, key=lambda x: x['score'], reverse=True))
    
    if request.method == 'GET':
        return "<h1>This endpoint is for recommending movies. Please use POST to submit data.</h1>"

@app.route('/shutdown', methods=['POST'])
def shutdown():
    global server_active
    server_active = False
    
    # Use a separate thread to handle the actual shutdown
    def shutdown_worker():
        time.sleep(0.1)
        for thread in threading.enumerate():
            if thread.name == 'werkzeug_server':
                thread.join(0.1)
                break
    
    threading.Thread(target=shutdown_worker).start()
    return jsonify({"status": "Server shutting down..."})

    
def run_flask(app):
    from werkzeug.serving import make_server
    
    server = make_server('localhost', 8080, app, threaded=True)
    threading.current_thread().name = 'werkzeug_server'
    print("Flask server starting on http://localhost:8080")

    global server_active
    server_active = True
    while server_active:
        server.handle_request()
    print("Flask server has stopped")

In [11]:
# Implement clean flask starting and stopping
## since we are using a Jupyter notebook, the goal here is to avoid killing the kernel but rather ensuring the proper management of the
## server thread so that it can start and stop gracefully within the same notebook session.

def start_flask():
    global flask_thread
    if flask_thread and flask_thread.is_alive():
        print("Stopping existing server...")
        global server_active
        server_active = False
        flask_thread.join(timeout=1)

    flask_thread = threading.Thread(target=run_flask, args=(app,))
    flask_thread.daemon = True
    flask_thread.start()

    time.sleep(0.5)
    print("Server started on http://localhost:8080")


def stop_flask():
    global server_active
    if not flask_thread or not flask_thread.is_alive():
        return "No server running"
    server_active = False
    time.sleep(0.5)
    print("Server stopped")

In [12]:
# Start and run flask server
start_flask()

Flask server starting on http://localhost:8080
Server started on http://localhost:8080


127.0.0.1 - - [10/Mar/2025 23:57:56] "POST /recommend HTTP/1.1" 200 -


In [13]:
import requests

# Define the input data
input_data = {
    "userId": 1,
    "movieIds": [1,3,6,47,50],
    "genres_lists": [
        ['Adventure', 'Animation', 'Children', 'Comedy', 'Fantasy'],
        ['Comedy', 'Romance'],
        ['Action', 'Crime', 'Thriller'],
        ['Mystery', 'Thriller'],
        ['Crime', 'Mystery', 'Thriller']
    ]
}

# Send POST request to the Flask API
response = requests.post('http://localhost:8080/recommend', json=input_data)

# Print the response
print(response.json())

[{'movieId': 1, 'score': 1.6396249131962164}, {'movieId': 50, 'score': 1.5550469616953946}, {'movieId': 47, 'score': 1.493649643748086}, {'movieId': 6, 'score': 1.4145915742734445}, {'movieId': 3, 'score': 1.336171597348429}]


In [14]:
# Stop the flask server and the spark session
stop_flask()
spark.stop()

Server stopped


### Additional Note
> The route http://localhost:8080/recommend is defined to handle POST requests. When one tries to visit the URL directly in a browser, you're typically making a GET request. Since the Flask route is not set up to handle GET requests, it won't return a result when accessed this way.
> To see the recommendation results as plain text or JSON directly on a browser, one would have to simulate a POST request through a tool like Postman or a browser extension that allows the sending of POST requests, or by using HTML forms that submit data via POST.