Implement a CF algorithm, using the item-item approach, to recommend new
movies to users.

Start with the 100,000 ratings (Small) dataset, and afterwards try to apply
your methods to the larger datasets (1M, 10M, 20M, 25M).

You will need to implement an efficient approach for finding the near neighbors
needed for predicting new rating (either LSH or clustering).

Validate your method by leaving out 10% of the available ratings.

---

# Setup

setup spark session and read the data

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ItemItemCF") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

data = spark.read.csv("data/100k.csv", header=True, inferSchema=True) \
            .select("userId", "movieId", "rating")

data.take(5)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/16 23:07:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


[Row(userId=1, movieId=1, rating=4.0),
 Row(userId=1, movieId=3, rating=4.0),
 Row(userId=1, movieId=6, rating=4.0),
 Row(userId=1, movieId=47, rating=5.0),
 Row(userId=1, movieId=50, rating=5.0)]

split into train and test

In [2]:
ratings, test = data.randomSplit([0.9, 0.1], seed=42)

# Fitting

create item vectors for each movie

e.g.: `movieID: {user: rating}`

In [3]:
from pyspark.sql.functions import collect_list, struct
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.sql import Row

# get the number of unique users
num_users = ratings.select("userId").distinct().count()

# create the sparse vector for movie function
def to_sparse_vector(user_ratings, size):
    # Sort by userId to get strictly increasing indices
    sorted_pairs = sorted(user_ratings, key=lambda x: x.userId)
    indices = [x.userId - 1 for x in sorted_pairs]
    values = [x.rating for x in sorted_pairs]
    return Vectors.sparse(size, indices, values)

# group by movieId and collect user ratings
item_user = ratings.groupBy("movieId") \
    .agg(collect_list(struct("userId", "rating")).alias("user_ratings"))

# convert that to a sparse vector
item_vector_rdd = item_user.rdd.map(
    lambda row: Row(
        movieId=row["movieId"],
        features=to_sparse_vector(row["user_ratings"], num_users)
    )
)

# convert to DataFrame because of Normalizer (MLlib)
item_vectors = spark.createDataFrame(item_vector_rdd)

item_vectors.take(5)



CodeCache: size=131072Kb used=23225Kb max_used=23237Kb free=107846Kb
 bounds [0x000000010918c000, 0x000000010a85c000, 0x000000011118c000]
 total_blobs=9260 nmethods=8306 adapters=867
 compilation: disabled (not enough contiguous free space left)


                                                                                

[Row(movieId=1, features=SparseVector(610, {0: 4.0, 4: 4.0, 6: 4.5, 16: 4.5, 17: 3.5, 18: 4.0, 20: 3.5, 26: 3.0, 30: 5.0, 31: 3.0, 32: 3.0, 39: 5.0, 42: 5.0, 43: 3.0, 44: 4.0, 45: 5.0, 49: 3.0, 53: 3.0, 56: 5.0, 62: 5.0, 63: 4.0, 65: 4.0, 67: 2.5, 70: 5.0, 72: 4.5, 77: 4.0, 81: 2.5, 85: 4.0, 88: 3.0, 90: 4.0, 92: 3.0, 95: 5.0, 97: 4.5, 102: 4.0, 106: 4.0, 111: 3.0, 118: 3.5, 120: 4.0, 129: 3.0, 131: 2.0, 133: 3.0, 134: 4.0, 136: 4.0, 139: 3.0, 140: 4.0, 143: 3.5, 144: 5.0, 150: 5.0, 152: 2.0, 154: 3.0, 155: 4.0, 158: 4.5, 159: 4.0, 160: 4.0, 165: 5.0, 166: 3.5, 168: 4.5, 170: 5.0, 176: 5.0, 177: 4.0, 178: 4.0, 181: 4.0, 184: 4.0, 185: 4.0, 190: 4.0, 192: 2.0, 200: 5.0, 201: 4.0, 205: 5.0, 212: 3.5, 213: 3.0, 215: 3.0, 216: 4.0, 222: 3.5, 225: 3.5, 231: 3.5, 232: 3.0, 238: 4.0, 239: 5.0, 246: 5.0, 248: 4.0, 251: 4.5, 262: 4.0, 263: 4.0, 265: 2.0, 269: 5.0, 272: 5.0, 273: 4.0, 274: 5.0, 275: 4.0, 276: 4.0, 278: 3.0, 279: 4.5, 287: 4.5, 289: 4.0, 290: 4.0, 291: 4.0, 292: 3.0, 297: 2.0, 30

normalize the vectors because a user rating [1, 2] has the same preferences as another rating [2, 4], only the direction matters

In [4]:
from pyspark.ml.feature import Normalizer

# normalizing with L2 (Euclidean) norm (p=2)
normalizer = Normalizer(inputCol="features", outputCol="norm_features", p=2.0)
normalized = normalizer.transform(item_vectors)

normalized.take(5)

[Row(movieId=1, features=SparseVector(610, {0: 4.0, 4: 4.0, 6: 4.5, 16: 4.5, 17: 3.5, 18: 4.0, 20: 3.5, 26: 3.0, 30: 5.0, 31: 3.0, 32: 3.0, 39: 5.0, 42: 5.0, 43: 3.0, 44: 4.0, 45: 5.0, 49: 3.0, 53: 3.0, 56: 5.0, 62: 5.0, 63: 4.0, 65: 4.0, 67: 2.5, 70: 5.0, 72: 4.5, 77: 4.0, 81: 2.5, 85: 4.0, 88: 3.0, 90: 4.0, 92: 3.0, 95: 5.0, 97: 4.5, 102: 4.0, 106: 4.0, 111: 3.0, 118: 3.5, 120: 4.0, 129: 3.0, 131: 2.0, 133: 3.0, 134: 4.0, 136: 4.0, 139: 3.0, 140: 4.0, 143: 3.5, 144: 5.0, 150: 5.0, 152: 2.0, 154: 3.0, 155: 4.0, 158: 4.5, 159: 4.0, 160: 4.0, 165: 5.0, 166: 3.5, 168: 4.5, 170: 5.0, 176: 5.0, 177: 4.0, 178: 4.0, 181: 4.0, 184: 4.0, 185: 4.0, 190: 4.0, 192: 2.0, 200: 5.0, 201: 4.0, 205: 5.0, 212: 3.5, 213: 3.0, 215: 3.0, 216: 4.0, 222: 3.5, 225: 3.5, 231: 3.5, 232: 3.0, 238: 4.0, 239: 5.0, 246: 5.0, 248: 4.0, 251: 4.5, 262: 4.0, 263: 4.0, 265: 2.0, 269: 5.0, 272: 5.0, 273: 4.0, 274: 5.0, 275: 4.0, 276: 4.0, 278: 3.0, 279: 4.5, 287: 4.5, 289: 4.0, 290: 4.0, 291: 4.0, 292: 3.0, 297: 2.0, 30

now applying LSH (with `BucketedRandomProjectionLSH`), which is based on random projections for Euclidean distance, is equivalent to cosine similarity beccause the vectors are normalized

$$
\cos(\theta) = \frac{a \cdot b}{\|a\|_2 \, \|b\|_2} \Rightarrow \cos(\theta) = \frac{a \cdot b}{1 \cdot 1} = a \cdot b
$$

minimizing the Euclidean distance is now equivalent to maximizing the cosine similarity: approximation to cosine LSH

*Note: this was needed because Spark doesn t have a direct cosine LSH implementation, and this is a good approximation without compromising the performance*

In [5]:
from pyspark.ml.feature import BucketedRandomProjectionLSH

lsh = BucketedRandomProjectionLSH(
    inputCol="norm_features",
    outputCol="hashes",
    bucketLength=1.5,
    numHashTables=3
)

# fit the model
lsh_model = lsh.fit(normalized)

25/05/16 23:07:26 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


get approximate nearest neighbors per movie (using LSH), along with their cosine similarity (derived from Euclidean distance from `approxSimilarityJoin`)

$$
\cos(\theta) = a \cdot b = 1 - \frac{1}{2} \| a - b \|_2^2
$$

because the vectors are normalized

In [6]:
from pyspark.sql.functions import col

# get the approximate neighbors
neighbors = lsh_model.approxSimilarityJoin(
    normalized,
    normalized,
    threshold=1.0, # distance threshold
    distCol="distance"
).filter(col("datasetA.movieId") < col("datasetB.movieId"))  # avoid bottom triangle (reverse + self)

# convert the distance to cosine similarity
neighbors_cosine = neighbors.withColumn(
    "cosine_sim",
    1 - (col("distance") ** 2) / 2
).select(
    col("datasetA.movieId").alias("movie_i"),
    col("datasetB.movieId").alias("movie_j"),
    "cosine_sim"
)

# add reverse pairs: (i,j) -> (i,j) and (j,i)
reverse = neighbors_cosine.selectExpr("movie_j as movie_i", "movie_i as movie_j", "cosine_sim")
similarities = neighbors_cosine.union(reverse)

similarities.show(5, truncate=False)



+-------+-------+------------------+
|movie_i|movie_j|cosine_sim        |
+-------+-------+------------------+
|78     |211    |0.5144957554275266|
|104    |216    |0.5043347820644946|
|119    |1647   |0.6837634587578277|
|119    |4138   |0.7071067811865476|
|179    |5258   |0.6324555320336759|
+-------+-------+------------------+
only showing top 5 rows



                                                                                

# Test

join the test set target movies with their similar (neighbor) and the user's corresponding ratings, to enable the prediction of the ratings

In [7]:
test_with_ratings = test.alias("t") \
    .join(similarities.alias("s"), col("t.movieId") == col("s.movie_i")) \
    .join(ratings.alias("r"), (col("t.userId") == col("r.userId")) & (col("s.movie_j") == col("r.movieId"))) \
    .select(
        col("t.userId"),
        col("t.movieId").alias("target_movie"),
        col("s.movie_j").alias("neighbor_movie"),
        col("s.cosine_sim"),
        col("r.rating").alias("neighbor_rating")
    )

test_with_ratings.show(5, truncate=False)



+------+------------+--------------+------------------+---------------+
|userId|target_movie|neighbor_movie|cosine_sim        |neighbor_rating|
+------+------------+--------------+------------------+---------------+
|590   |104         |216           |0.5043347820644946|2.0            |
|103   |104         |216           |0.5043347820644946|4.0            |
|68    |104         |216           |0.5043347820644946|3.5            |
|599   |209         |210           |0.7786628214276602|2.0            |
|592   |253         |316           |0.5044063048397824|4.0            |
+------+------------+--------------+------------------+---------------+
only showing top 5 rows



                                                                                

predict the ratings using the weighted avg of the neighbors ratings, weighted by their cosine similarity

In [8]:
from pyspark.sql.functions import sum as sql_sum, col

predictions = test_with_ratings.groupBy("userId", "target_movie").agg(
    (sql_sum(col("cosine_sim") * col("neighbor_rating")) / sql_sum(col("cosine_sim"))).alias("pred_rating")
)

predictions.show(5, truncate=False)



+------+------------+------------------+
|userId|target_movie|pred_rating       |
+------+------------+------------------+
|298   |296         |3.8626930722377613|
|414   |4509        |2.535889787179544 |
|599   |6811        |1.9619018308384149|
|201   |2054        |4.670705945113926 |
|474   |1667        |2.4185295298828886|
+------+------------+------------------+
only showing top 5 rows



                                                                                

# Result

compare the predicted ratings with the actual ratings

In [9]:
final = predictions.alias("p").join(
    test.alias("t"),
    (col("p.userId") == col("t.userId")) & (col("p.target_movie") == col("t.movieId"))
).select(
    col("t.userId"),
    col("t.movieId"),
    col("p.pred_rating"),
    col("t.rating").alias("actual_rating")
)

final.show(5, truncate=False)



+------+-------+------------------+-------------+
|userId|movieId|pred_rating       |actual_rating|
+------+-------+------------------+-------------+
|298   |296    |3.8626930722377613|4.5          |
|414   |4509   |2.535889787179544 |3.0          |
|599   |6811   |1.9619018308384149|3.0          |
|201   |2054   |4.670705945113926 |4.0          |
|474   |1667   |2.4185295298828886|2.0          |
+------+-------+------------------+-------------+
only showing top 5 rows



                                                                                

evaluate the predicted ratings using RMSE and MAE

$$
\mathrm{MAE} = \frac{1}{n} \sum_{i=1}^{n} \left| y_i - \hat{y}_i \right| \ \ \ \ \ \ \ \ \mathrm{RMSE} = \sqrt{ \frac{1}{n} \sum_{i=1}^{n} \left( y_i - \hat{y}_i \right)^2 }
$$

In [10]:
from pyspark.ml.evaluation import RegressionEvaluator

# RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="actual_rating", predictionCol="pred_rating")
rmse = evaluator.evaluate(final)
print(f"RMSE: {rmse:.4f}")

# MAE
mae_evaluator = RegressionEvaluator(metricName="mae", labelCol="actual_rating", predictionCol="pred_rating")
mae = mae_evaluator.evaluate(final)
print(f"MAE: {mae:.4f}")


                                                                                

RMSE: 0.9150


[Stage 120:=====>                                                  (1 + 8) / 10]

MAE: 0.6638


                                                                                