<a href="https://colab.research.google.com/github/habibur8rahaman/Parallel-Recommendation-System/blob/main/Parallel_Recommendation_System.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Dataset: https://www.kaggle.com/datasets/somnambwl/bookcrossing-dataset?select=Users.csv

Only need the Books and Ratings files. Before loading the datasets:
1. Replace ',' with ''
2. Then Replace ';' with ','

**A Simple Python Solution**

This was part of our initial work, and is not included in the project report. However, its useful to demonstrate how large datasets can impact the computational resource even when using a simple algorithm (K-Nearest Neighbours)

In [None]:
###=================Recommended system using simple python approach===========

import pandas as pd
from sklearn.neighbors import NearestNeighbors
from scipy.sparse import csr_matrix


books = pd.read_csv("/content/Books.csv").sample(frac=0.20)
ratings = pd.read_csv("/content/Ratings.csv").sample(frac=0.20)

# merging ratings with books
full_data = pd.merge(ratings, books, on='ISBN')

# create user-book-rating matrix
user_ratings = full_data.pivot_table(index='User-ID', columns='ISBN', values='Rating').fillna(0)
matrix = csr_matrix(user_ratings.values)

# KNN model
model_knn = NearestNeighbors(metric='cosine', algorithm='brute')
model_knn.fit(matrix)



def recommend_books_collab(user_id, n_recommendations=5):

      # finding similar users
      user_index = user_ratings.index.get_loc(user_id)
      distances, indices = model_knn.kneighbors(
          user_ratings.iloc[user_index, :].values.reshape(1, -1),
          n_neighbors=11  # similar users + self
      )

      # books liked by similar users
      similar_users = user_ratings.index[indices.flatten()[1:]]
      similar_users_ratings = user_ratings.loc[similar_users]

      # books the user has already rated
      user_rated = set(full_data[full_data['User-ID'] == user_id]['ISBN'])

      # average ratings from similar users
      avg_ratings = similar_users_ratings.mean(axis=0)

      # Filtering out books already rated and get top recommendations
      recommendations = avg_ratings[~avg_ratings.index.isin(user_rated)]
      recommendations = recommendations.sort_values(ascending=False).head(n_recommendations)

      #recommended book details
      recommended_books = books[books['ISBN'].isin(recommendations.index)]

      return recommended_books



### Example usage:

# a user that exists in the ratings data
existing_users = ratings['User-ID'].unique()

# recommendations
if len(existing_users) > 0:

    user_id = existing_users[0]

    print(f"Recommendations for user {user_id}:")
    recommendations = recommend_books_collab(user_id)
    if isinstance(recommendations, pd.DataFrame):
        print(recommendations[['Title', 'Author']])
    else:
        print(recommendations)

Recommendations for user 138844:
                                   Title                Author
244117    The Clue in the Crumbling Wall         Carolyn Keene
182796  The Revenge of Murray the Mantis      Maureen Spurgeon
1                           Clara Callan  Richard Bruce Wright
179431              The Underground City           Jules Verne
107835               The Moon of Gomrath           Alan Garner


**Alternating Least Squares Method from PySpark**

This code block uses the ALS method from MLlib to train on the dataset.

In [None]:
###==========================using ALS(Alternating Least Squares) model from Spark=========================


from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col, explode, udf, lit
from pyspark.sql.types import IntegerType
from pyspark.sql.types import FloatType


#Spark session
spark = SparkSession.builder \
    .appName("BookRecommendationSystem") \
    .getOrCreate()


books_df = spark.read.csv("/content/Books.csv", header=True, inferSchema=True)
ratings_df = spark.read.csv("/content/Ratings.csv", header=True, inferSchema=True)


#removing nulls
ratings_df = ratings_df.filter((ratings_df['Rating'] > 0) &
                              (ratings_df['Rating'] <= 10)).dropna()
books_df = books_df.dropna()

#keeping the ratings for books that exist in Books.csv
ratings_df = ratings_df.join(books_df.select("ISBN"), "ISBN", "inner")

# index User-ID
user_indexer = StringIndexer(inputCol="User-ID", outputCol="userIndex")
user_model = user_indexer.fit(ratings_df)
ratings_df = user_model.transform(ratings_df)


#combining all ISBNs
all_isbns = ratings_df.select("ISBN").union(books_df.select("ISBN")).distinct()

# Fitting the indexer on all ISBN
isbn_indexer = StringIndexer(inputCol="ISBN", outputCol="itemIndex")
isbn_model = isbn_indexer.fit(all_isbns)


# consistent indexer (Otherwise was getting NULLs in result)
ratings_df = isbn_model.transform(ratings_df)
books_df = isbn_model.transform(books_df)

#ALS model
als = ALS(
    maxIter=15,
    regParam=0.05,
    rank=20,
    userCol="userIndex",
    itemCol="itemIndex",
    ratingCol="Rating",
    coldStartStrategy="drop",
    nonnegative=True,
    implicitPrefs=False
)

# 20% of data for evaluation
train, test = ratings_df.randomSplit([0.8, 0.2])
model = als.fit(train)

**Serialized Lookup Approach**

Dot Product Computaiton in Serial Manner. \\
in the output, the predicted rating column is actually the dot product to find similarity. So the higher values means the user will most probably like it more. Thats why its value is different than the usual 0-10 rating scale.

In [None]:
###===============================Serialized Lookup======


from time import time


#recommendation function
def recommend_books_for_user(user_id, model, ratings_df, books_df):

    start_tiem = time()

    #userIndex finding
    user_index_lookup = ratings_df.select("User-ID", "userIndex").distinct()
    target_user_index_df = user_index_lookup.filter(col("User-ID") == user_id)

    try:
        target_user_index = target_user_index_df.select("userIndex").first()[0]
    except:
        print(f"User ID {user_id} not found or has no ratings.")
        return

    #user DataFrame
    user_df = spark.createDataFrame([(target_user_index,)], ["userIndex"])

    # recommendations
    user_recs = model.recommendForUserSubset(user_df, 10)

    if user_recs.count() == 0:
        print(f"No recommendations available for user {user_id}")
        return

    recs = user_recs.select("userIndex", explode("recommendations").alias("rec")) \
                    .select("userIndex", col("rec.itemIndex").alias("itemIndex"),
                            col("rec.rating").alias("predictedRating"))

    # Join with books
    books_info = books_df.select("itemIndex", "ISBN", "Title", "Author", "Publisher") \
                         .dropDuplicates(["itemIndex"])

    recommended_books = recs.join(books_info, on="itemIndex", how="inner") \
                           .filter(col("Title").isNotNull()) \
                           .orderBy(col("predictedRating").desc())

    recommended_books.select("Title", "Author", "Publisher",
                               "predictedRating").show(truncate=False, n=10)

    end_time = time()
    print(f"Time taken: {end_time - start_tiem} seconds")


### Recommendation for entered user:
recommend_books_for_user("175070", model, ratings_df, books_df)

+--------------------------------------------------------------------+-----------------------+--------------------+---------------+
|Title                                                               |Author                 |Publisher           |predictedRating|
+--------------------------------------------------------------------+-----------------------+--------------------+---------------+
|Sacred Honor                                                        |Lillian Cauldwell      |Publishamerica      |15.001067      |
|Sticks & stones &amp, ice cream cones,: The craft book for children |Phyllis Fiarotta       |Workman Pub. Co     |14.970831      |
|Hitchhiking Vietnam                                                 |Karin Muller           |Globe Pequot        |14.352041      |
|And Then There Were None                                            |Agatha Christie        |St. Martin's Griffin|14.111767      |
|Fried Butter: A Food Memoir                                         |Abe Op

**Parallelized Lookup**

Splits the lookup tasks amid partitions to parallelly look for similarity and generate recommendations \\

This run of code is done with 100 partitions, that is why its showing more time than serial computation. However, with smaller partition the lookup time is much faster than serial ALS. Change the partition value in <item_scores = model.itemFactors.repartition(100, "id")> line from 100 to any other to see changes in computation time. An detailed comparition of complettion time for different partiton values is given in the report

In [None]:
###=============Lookups in parallel across multiple nodes==================


from math import e
import numpy as np

#dot product UDF for parallel computation
dot_prod_udf = udf(lambda user_vec, item_vec: float(np.dot(user_vec, item_vec)), FloatType())

def recommend_books_for_user(user_id, model, ratings_df, books_info):

    start_time = time()

    # converts user_id to index
    user_index_lookup = ratings_df.select("User-ID", "userIndex").distinct()
    target_user_index_df = user_index_lookup.filter(col("User-ID") == user_id)

    try:
        target_user_index = target_user_index_df.select("userIndex").first()[0]
    except:
        print(f"User ID {user_id} not found or has no ratings.")
        return

    # getting user factor vector
    user_factors = model.userFactors
    user_vector = user_factors.filter(col("id") == target_user_index) \
                            .select("features").first()[0]

    # Broadcasts user vector to all workers
    bc_user_vector = spark.sparkContext.broadcast(user_vector)

    # parallel dot product computation
    item_scores = model.itemFactors.repartition(100, "id") \
        .withColumn("score", dot_prod_udf(col("features"), lit(bc_user_vector.value))) \
        .orderBy(col("score").desc()).limit(10)

    # joining with book info and showing results
    recommendations = item_scores.join(
        books_info,
        item_scores["id"] == books_info["itemIndex"],
        "inner"
    ).select(
        "Title", "Author", "Publisher", "score"
    ).orderBy(col("score").desc())

    recommendations.show(truncate=False, n=10)

    end_time = time()
    print(f"Time taken: {end_time - start_time} seconds")

    # Clean up
    bc_user_vector.unpersist()

# Recommendation for entered user
recommend_books_for_user("175070", model, ratings_df, books_df)

+--------------------------------------------------------------------+-----------------------+--------------------+----------+
|Title                                                               |Author                 |Publisher           |score     |
+--------------------------------------------------------------------+-----------------------+--------------------+----------+
|Sacred Honor                                                        |Lillian Cauldwell      |Publishamerica      |15.001067 |
|Sticks & stones &amp, ice cream cones,: The craft book for children |Phyllis Fiarotta       |Workman Pub. Co     |14.970831 |
|Hitchhiking Vietnam                                                 |Karin Muller           |Globe Pequot        |14.352042 |
|And Then There Were None                                            |Agatha Christie        |St. Martin's Griffin|14.111768 |
|Fried Butter: A Food Memoir                                         |Abe Opincar            |Soho Press       