# Hybrid Collaborative Filtering (CF)
In this notebook we implement the **hybrid Collabrative Filtering** algorithm to build the recommendation system using PySpark. Our method is composed of the following steps:<br>

 - **User-based Collabrative Filtering:** Recommending items to the user of intereset based on the preferences of similar users.

- **Item-based Collabrative Filtering:** Recommending items to the user of intereset based on the items that similar users have interacted with.

- **Merging:** We get the K most similar Item-based and K most similar User-based reccomendations and merge them using the following equation to get the final topK.
            - (alpha * item-based) + ( (1−alpha) * user-based)
      alpha is a paramater we control for weighting the contribution of each Collabrative Filtering method.


In [1]:
import math
from itertools import combinations
from pyspark.sql import SparkSession
from pyspark.sql.functions import trim, col
import pyspark.sql.functions as F

# Method to Initialize SparkSession
def spark_session_create():
    spark_sess = (SparkSession.builder.appName("Hybrid_Collaborative_Filtering")
    # number partitions to use when transforming data
    .config("spark.sql.shuffle.partitions", "200")
    # Adaptive Query Execution
    .config("spark.sql.adaptive.enabled",   "true")
      # merge small partitions into larger ones when runtime
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
    )
    return spark_sess.getOrCreate()

## Data loading and Cleaning
- We will use the books data as a lookup to get titles from ISBN.
-  We will use ratings data to build the Collabrative Filtering methods.

In [2]:
# method to read and clean books and retrn df with ISBN and Title.
def read_books(spark, path):

    book_temp = (spark.read.option("header", True).option("sep", ";").csv(path)
             # cleaninig
             .select(
                 trim(col("ISBN")).alias("ISBN"),
                 trim(col("Title")).alias("Title")
             )
             .dropna(subset=["ISBN","Title"])
             .dropDuplicates(["ISBN"] )
    )

    return book_temp

# read and clean ratings, return df with user, item, rating.
def read_ratings(spark, path):

    ratings_temp =  (
        spark.read.option("header", True).option("sep", ";").csv(path)
        # cleaninig
             .select(
                 trim(col("User-ID")).alias("user"),
                 trim(col("ISBN")).alias("item"),
                 col("Rating").cast("double").alias("rating")
             )
        .filter(col("rating").between(1.0, 10.0))
        .dropna()
        .dropDuplicates(["user","item"] )
    )
    return ratings_temp

### Centering Ratings by mean
We will use the resultings dataframe in calculating the user–user and item–item similarity steps of the pipeline.

In [3]:
# calculte the mean centered ratings retuns centered_df and centered_rdd.
def ratings_center_by_mean(ratings_df):

    mean_user_rating = ratings_df.groupBy("user").agg(F.avg("rating").alias("RatingMean"))

    ratings_center_df = ( ratings_df.join(mean_user_rating, on="user")
        # subtract each rating by the mean
        .withColumn("rating_centered", col("rating") - col("RatingMean"))
        .select("user","item","rating","rating_centered")
    )
    ratings_center_rdd = ratings_center_df.rdd.map(lambda r: (r["user"], r["item"], r["rating_centered"]))

    return ratings_center_df, ratings_center_rdd

### Normalizing

To take into account users who rated many books bigger raw dot product with a user who rated smaller number of books we normalize data.

In [4]:
# calculate the the users normalized and the items normalized
def calculate_normalization(ratings_center_rdd):

    Users_Normalized = (ratings_center_rdd.map(lambda x: (x[0], x[2]**2))
          # sum
          .reduceByKey(lambda a, b: a + b)
          # squareroot
          .mapValues(math.sqrt)
          .collectAsMap()
    )

    Items_Normalized = (ratings_center_rdd.map(lambda x: (x[1], x[2]**2))
          .reduceByKey(lambda a, b: a + b)
          .mapValues(math.sqrt)
          .collectAsMap()
    )
    return Users_Normalized, Items_Normalized

## Cosine Similarity User-User
We only want the users pairs who rated at least one item that the target user ratd.

In [6]:
# method to calculate top similar users to  targt  user with cosine similarity
# the output is dictionary neighbor_user, similarity.
def compute_user_neighbors(ratings_center_rdd, target_user, target_user_items, user_normali, K = 50):

    # target user items Ratings
    ratings_target = ratings_center_rdd.filter(lambda x: x[1] in target_user_items.value)
    ratings_by_item = ratings_target.map(lambda x: (x[1], (x[0], x[2]))).groupByKey()

    # the dot prodct parwise
    dot_product_user = ratings_by_item.flatMap(lambda kv: [((u_1, u_2), rc_1 * rc_2)
                    for (u_1, rc_1),(u_2, rc_2) in combinations(kv[1], 2)]
    )

    dot_product_user_sum = dot_product_user.reduceByKey(lambda k, kk: k + kk)
    dot_product_user_target_user_filter = dot_product_user_sum.filter(lambda kv: target_user in kv[0])

    # this is a helper function to use it down in the map
    def extracttion(users_pair, dot_pro):
        u1,u2 = users_pair
        if u1 == target_user:
            other_user = u2
        else:
            other_user = u1
        norm_prod = user_normali.value[u1] * user_normali.value[u2] + 1e-8
        return other_user, dot_pro / norm_prod

    similarities_users = dot_product_user_target_user_filter.map(lambda ab: extracttion(ab[0], ab[1]))
    topk = similarities_users.takeOrdered(K, key=lambda x: -x[1])

    return dict(topk)

## Item-User

In [7]:
# Calcu. top items that are similar for each of target user items
# output dictionary of item_i1: all other items and sim
def calculate_user_items_similarity(ratings_center_rdd, target_items, bc_item_norm, K = 50):

    # Ratings for targets items
    ratings_target = ratings_center_rdd.filter(lambda a: a[1] in target_items.value)
    rating_by_user   = ratings_center_rdd.map(lambda x: (x[0], (x[1], x[2]))).groupByKey()
    rating_target_by_user = ratings_target.map(lambda x: (x[0], (x[1], x[2])))
    merged = rating_target_by_user.join(rating_by_user)

    item_dot_produ = merged.flatMap(
        lambda  ab : [((i_1, i_2), rc_1 * rc_2)
                    for (i_1, rc_1) in [ab[1][0]]
                    for (i_2, rc_2) in ab[1][1]
                    if i_2 != i_1 and i_2 not in target_items.value]
    )

    item_dot_prod_sum = item_dot_produ.reduceByKey(lambda a, b: a + b)

    item_similaty = item_dot_prod_sum.map(lambda ab: (
            ab[0][0],
            (ab[0][1], ab[1] / (bc_item_norm.value[ab[0][0]] * bc_item_norm.value[ab[0][1]] + 1e-8))
        )
    )

    similarity_per_item1 = (item_similaty.groupByKey()
          .mapValues(lambda iters: sorted(iters, key=lambda x: -x[1])[:K])
          .collectAsMap()
    )
    return similarity_per_item1


## Target User Items

In [8]:
# get set items that are rated by the target user
def get_target_rated_items(rating_center_df, target_user):
    tg_items = (rating_center_df
          # filter by target user
          .filter(col("user") == target_user)
          # select items column
          .select("item").distinct().rdd.map(lambda r: r["item"]).collect()
    )
    return set(tg_items)

## Merged Recommendation

In [9]:
# method to return top-N recommendations hybrid for a  target user.
def hybrid_recommendation_system(sc, ratings_df, book_isbn_to_book_title,
                      user_neighbors_set, list_neighbors_items,
                      target_user, alpha = 0.6, topN = 10):


    # turn the df into RDD of user, item, rating
    ratings_rdd = ratings_df.rdd.map(lambda row: (row["user"], row["item"], row["rating"]))

    # filter the ratings rdd to have only ratings made by the target user
    ratings_rdd_target_user = ratings_rdd.filter(lambda row: row[0] == target_user)

    # call the item-based collabrative filtering method to get recommedations
    item_based_recommendations = (ratings_rdd_target_user
           # for each item user rated row[2] look for neighbors make tuple
          .flatMap(lambda row: [(item_2, (row[2] * sim, abs(sim)))
                              for (item_2, sim) in list_neighbors_items.get(row[1], [])])
          # sum contributions weight
          .reduceByKey(lambda k1, k2: (k1[0] + k2[0], k1[1] + k2[1]))
          # normalize
          .mapValues(lambda value: value[0] / (value[1] + 1e-8))
          # get dict
          .collectAsMap()
    )

    # call the user-based collabrative filtering method to get recommedations
    user_based_recommendations = (ratings_rdd
           # kep only ratings by users in the user user neighbors set
          .filter(lambda neig: neig[0] in user_neighbors_set)
          # for each rating by neighbr, calculate sim
          .map(lambda row: (row[1], (row[2] * user_neighbors_set[row[0]], abs(user_neighbors_set[row[0]]))))
          # normalization
          .reduceByKey(lambda k1, k2: (k1[0] + k2[0], k1[1] + k2[1]))
          .mapValues(lambda value: value[0]/(value[1] + 1e-8))
          # dict
          .collectAsMap()
    )

    # gather all items
    item_recommendations = set(item_based_recommendations) | set(user_based_recommendations)

    # calculate hyprid: (alpha * item_based) + (1-alpha * user_based)
    hybrids_recommendations = [(recommended_item,
        alpha * item_based_recommendations.get(recommended_item, 0.0) + (1 - alpha) * user_based_recommendations.get(recommended_item, 0.0))
        for recommended_item in item_recommendations
    ]

    # sort to get top
    top_recommendations = sorted(hybrids_recommendations, key=lambda x: -x[1])[:topN]

    # top recommendations has the book isbn and score we want the title also so we look it up in
    top_recommendations_with_title = [(book_isbn, book_isbn_to_book_title.get(book_isbn, "Unknown Title"), sim_score) for book_isbn, sim_score in top_recommendations]
    return top_recommendations_with_title


## Run Code

In [10]:
# function to gather all the steps above and make recommendation given a user
def run_code(target_user,k):
    # Initialize spark
    spark_sess = spark_session_create()
    sc = spark_sess.sparkContext

    # Load the data using functions defined
    df_books   = read_books(spark_sess, "Books.csv")
    dataf_ratings = read_ratings(spark_sess, "Ratings.csv")

    # call the method to make raring centered by mean
    rating_centered_df, rating_centered_rdd = ratings_center_by_mean(dataf_ratings)

    # get item rated by target user to use them
    target_user_rated_items = get_target_rated_items(rating_centered_df, target_user)
    target_user_rated_items_brodc = sc.broadcast(target_user_rated_items)

    # calculate normalization
    map_normalized_users, map_normalized_items = calculate_normalization(rating_centered_rdd)
    map_normalized_users_brodc = sc.broadcast(map_normalized_users)
    map_normalized_items_brodc = sc.broadcast(map_normalized_items)

    # calculate target user similar items
    list_user_neighbors = compute_user_neighbors(rating_centered_rdd, target_user, target_user_rated_items_brodc, map_normalized_users_brodc)
    list_item_neighbors = calculate_user_items_similarity(rating_centered_rdd, target_user_rated_items_brodc, map_normalized_items_brodc)

    # will use in recommendation method to get book title from isbn
    book_isbn_to_book_title = dict(df_books.rdd.map(lambda row: (row["ISBN"], row["Title"])) .collect())

    # get recomendations
    alpha = 0.5
    recommendations = hybrid_recommendation_system(sc, dataf_ratings, book_isbn_to_book_title,list_user_neighbors,
                         list_item_neighbors,
                              target_user,
                              alpha,
                              k)

    print(f"Top {k} recommendations for user with ID: {target_user}:")
    print(f"Book ISBN    |    Book Title    |     Score")
    for book_isbn, book_title, sim_score in recommendations:
        print(f"{book_isbn}   |   {book_title}   |   ({sim_score:.4f})")

    # Clean
    rating_centered_df.unpersist()
    spark_sess.stop()

In [11]:
%%time
run_code(target_user="11676",k=10)

Top 10 recommendations for user with ID: 11676:
Book ISBN    |    Book Title    |     Score
04514085856   |   Unknown Title   |   (10.0000)
0373029772   |   Ransomed Heart (Harlequin Romance, No 2977)   |   (9.5000)
0451158636   |   Devil's Daughter   |   (9.0000)
0374302995   |   And If the Moon Could Talk   |   (9.0000)
0399134204   |   Joy Luck Club   |   (8.8606)
0553289594   |   Among the Shadows: Tales from the Darker Side   |   (8.7561)
0679824243   |   Mummies in the Morning (Magic Tree House, Book 3)   |   (8.7380)
0060150890   |   Children's Hospital   |   (8.5000)
0060932139   |   The Unbearable Lightness of Being : A Novel (Perennial Classics)   |   (8.4201)
0671553011   |   HARVEST   |   (8.3635)
CPU times: user 2.07 s, sys: 312 ms, total: 2.38 s
Wall time: 3min 28s


In [12]:
%%time
run_code(target_user="35859",k=10)

Top 10 recommendations for user with ID: 35859:
Book ISBN    |    Book Title    |     Score
0345413903   |   The Murder Book   |   (10.0000)
0886774004   |   Arrow's Fall (The Heralds of Valdemar, Book 3)   |   (9.5000)
0345338588   |   On a Pale Horse (Incarnations of Immortality, Bk. 1)   |   (9.5000)
0671042858   |   The Girl Who Loved Tom Gordon   |   (9.5000)
0385502532   |   Drowning Ruth   |   (9.5000)
0670835382   |   Four Past Midnight   |   (9.0000)
0446602612   |   The Poet   |   (9.0000)
0425105334   |   The Talisman   |   (9.0000)
0399142851   |   Unnatural Exposure   |   (9.0000)
088070683X   |   Love For A Lifetime (mini) : Building A Marriage That Will Go The Distance (A Daily Reminder)   |   (9.0000)
CPU times: user 1.14 s, sys: 137 ms, total: 1.28 s
Wall time: 1min 41s


In [13]:
%%time
run_code(target_user="269566",k=10)

Top 10 recommendations for user with ID: 269566:
Book ISBN    |    Book Title    |     Score
0486282724   |   Much Ado About Nothing (Dover Thrift Editions)   |   (10.0000)
1566192951   |   The Adventures of Huckleberry Finn   |   (9.5000)
0441535771   |   Mistress Masham's Repose   |   (9.0000)
0812566351   |   Prince of Whales   |   (9.0000)
0553265571   |   The Grey Horse   |   (9.0000)
0451521358   |   Macbeth   |   (9.0000)
0590742590   |   The Second Bend in the River (Point)   |   (9.0000)
0448102269   |   The City Mouse and the Country Mouse (Pudgy Pals Series)   |   (9.0000)
0563206659   |   Complete Yes Minister   |   (9.0000)
031298250X   |   America : A Jake Grafton Novel (A Jake Grafton Novel)   |   (9.0000)
CPU times: user 991 ms, sys: 118 ms, total: 1.11 s
Wall time: 1min 19s


In [15]:
%%time
run_code(target_user="52584",k=10)

Top 10 recommendations for user with ID: 52584:
Book ISBN    |    Book Title    |     Score
0425176487   |   The Pull of the Moon   |   (10.0000)
193156146X   |   The Time Traveler's Wife   |   (9.5000)
0440215625   |   Dragonfly in Amber   |   (9.5000)
0399147799   |   Death in Paradise (Jesse Stone Novels (Hardcover))   |   (9.0000)
0002253941   |   Unknown Title   |   (9.0000)
0345337662   |   Interview with the Vampire   |   (9.0000)
0345342968   |   Fahrenheit 451   |   (8.5000)
0684814994   |   Christmas Box (Christmas Box Trilogy)   |   (8.5000)
0345339681   |   The Hobbit : The Enchanting Prelude to The Lord of the Rings   |   (8.5000)
0894805770   |   What to Expect the First Year   |   (8.5000)
CPU times: user 1.03 s, sys: 185 ms, total: 1.21 s
Wall time: 1min 24s
