In [28]:
# import pyspark
from pyspark.sql import SparkSession , DataFrame
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import SparseVector , VectorUDT , Vectors
from pyspark.sql.functions import col, collect_list, struct , udf , avg , max , sum as spark_sum
from pyspark.sql.types import FloatType , ArrayType, StructType, StructField , IntegerType
import collections
import os
import time

In [29]:
# I need this to run comment this code if you don't need it
os.environ['PYSPARK_PYTHON'] = '.venv/Scripts/python.exe'
os.environ['PYSPARK_DRIVER_PYTHON'] = '.venv/Scripts/python.exe'
print(os.environ.get("JAVA_HOME"))

C:\Program Files\Eclipse Adoptium\jdk-11.0.27.6-hotspot\


In [30]:
spark: SparkSession = SparkSession.builder \
    .appName("CF movielens") \
    .getOrCreate()

In [31]:
file_path = "ml-latest-small/ml-latest-small/ratings.csv"
ratings_df = spark.read.csv(file_path, header=True, inferSchema=True) \
    .drop("timestamp")
print("Number of ratings:", ratings_df.count())
ratings_df.show(5)

Number of ratings: 100836
+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
+------+-------+------+
only showing top 5 rows



In [32]:
train_df, test_df = ratings_df.randomSplit([0.9, 0.1], seed=42)
print("Number of training ratings:", train_df.count())

Number of training ratings: 90673


In [33]:
avg_rating = train_df.agg({"rating": "avg"}).collect()[0][0]
print("Average rating in training set:", avg_rating)
# (avg rating of user x ) - μ
rating_deviation_of_user = train_df.groupBy("userId") \
    .agg(avg("rating").alias("avg_rating")) \
    .withColumn("rating_deviation_user", col("avg_rating") - avg_rating) \
    .select("userId", "rating_deviation_user")
    
rating_deviation_of_user_dict = rating_deviation_of_user.rdd \
    .map(lambda row: (row.userId, row.rating_deviation_user)) \
    .collectAsMap()
rating_deviation_of_user.show(5)

# (avg rating of user x - ration deviation ) - μ
rating_deviation_of_movie = train_df \
    .groupBy("movieId") \
    .agg(avg("rating").alias("avg_rating")) \
    .withColumn("rating_deviation_movie", col("avg_rating") - avg_rating) \
    .select("movieId", "rating_deviation_movie")
rating_deviation_of_movie.show(5)
rating_deviation_of_movie_dict = rating_deviation_of_movie.rdd \
    .map(lambda row: (row.movieId, row.rating_deviation_movie)) \
    .collectAsMap()

def calculate_baseline_rating(userId, movieId):
    user_deviation = rating_deviation_of_user_dict.get(userId, 0.0)
    movie_deviation = rating_deviation_of_movie_dict.get(movieId, 0.0)
    return avg_rating + user_deviation + movie_deviation

calculate_baseline_rating_udf = udf(calculate_baseline_rating, FloatType())

train_df = train_df.withColumn(
    "baseline_rating",
    calculate_baseline_rating_udf(col("userId"), col("movieId"))
)
train_df.select("userId", "movieId", "baseline_rating" ,"rating").show(5)


Average rating in training set: 3.503325135376573
+------+---------------------+
|userId|rating_deviation_user|
+------+---------------------+
|   148|  0.23417486462342696|
|   463|   0.3932265887613582|
|   471|   0.4057657737143363|
|   496| -0.15147328352472123|
|   243|    0.610960578909141|
+------+---------------------+
only showing top 5 rows

+-------+----------------------+
|movieId|rating_deviation_movie|
+-------+----------------------+
|   1580|  -0.02144107740555823|
|   2366|   0.13667486462342726|
|   3175|   0.08363138636255751|
|  32460|    0.7466748646234271|
|   1238|    0.5522304201789825|
+-------+----------------------+
only showing top 5 rows

+------+-------+---------------+------+
|userId|movieId|baseline_rating|rating|
+------+-------+---------------+------+
|     1|      1|      4.7871666|   4.0|
|     1|      3|      4.1031566|   4.0|
|     1|      6|       4.805284|   4.0|
|     1|     47|      4.8449597|   5.0|
|     1|     50|      5.1087117|   5.0|
+---

In [34]:
test_df.show(5)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|    101|   5.0|
|     1|    151|   5.0|
|     1|    943|   4.0|
|     1|   1031|   5.0|
|     1|   1220|   5.0|
+------+-------+------+
only showing top 5 rows



In [35]:
# calculate RMSE
def calculate_rmse(predictions):
    predictions = predictions.withColumnRenamed("rating", "actual_rating")
    predictions = predictions.withColumnRenamed("predicted_rating", "predicted_rating")
    rmse = predictions.withColumn(
        "squared_error",
        (col("actual_rating") - col("predicted_rating")) ** 2
    ).agg({"squared_error": "avg"}).collect()[0][0] ** 0.5
    return rmse

predicted_ratings_df = test_df.withColumn(
    "predicted_rating",
    calculate_baseline_rating_udf(col("userId"), col("movieId"))
)
rmse = calculate_rmse(predicted_ratings_df)
print("RMSE of baseline model:", rmse)

RMSE of baseline model: 0.9062214757366772


In [49]:
length_user = train_df.select("userId").distinct().count() +1

@udf(returnType=VectorUDT())
def build_sparse_vector(ratings_list):
    if not ratings_list:
        return Vectors.sparse(length_user, [], [])
        
    user_rating_map = collections.OrderedDict(sorted([(r[0], r[1]) for r in ratings_list]))
    user_ids = [int(k) for k in user_rating_map.keys()] # Ensure integer indices
    values = list(user_rating_map.values())
    return Vectors.sparse(length_user, user_ids, values)

sparse_vector_df = train_df.groupBy("movieId") \
    .agg(collect_list(struct("userId", "rating")).alias("ratings")) \
    .select("movieId", build_sparse_vector(col("ratings")).alias("ratings_vector"))
    
@udf(returnType=IntegerType())
def get_num_ratings(vector: SparseVector):
    return len(vector.values)
sparse_vector_df = sparse_vector_df.withColumn(
    "num_ratings",
    get_num_ratings(col("ratings_vector"))
).filter(col("num_ratings") >= 5)

sparse_vector_df.show()
sparse_vector_df.count()

+-------+--------------------+-----------+
|movieId|      ratings_vector|num_ratings|
+-------+--------------------+-----------+
|      1|(611,[1,5,7,17,18...|        197|
|      2|(611,[6,8,18,19,2...|         94|
|      3|(611,[1,6,19,32,4...|         46|
|      4|(611,[6,14,84,262...|          6|
|      5|(611,[6,31,43,45,...|         47|
|      6|(611,[1,6,11,18,2...|         94|
|      7|(611,[6,14,19,31,...|         50|
|      8|(611,[6,20,43,274...|          8|
|      9|(611,[151,179,217...|         15|
|     10|(611,[6,8,11,19,2...|        119|
|     11|(611,[6,8,33,35,3...|         55|
|     12|(611,[19,44,120,1...|         18|
|     13|(611,[6,19,20,288...|          6|
|     14|(611,[90,109,182,...|         16|
|     15|(611,[6,19,93,136...|         12|
|     16|(611,[6,18,28,42,...|         72|
|     17|(611,[6,31,33,38,...|         62|
|     18|(611,[44,66,95,10...|         18|
|     19|(611,[6,14,21,40,...|         79|
|     20|(611,[78,199,217,...|         12|
+-------+--

3456

In [50]:
# cosine similarity function
def cosine_similarity(vec1: SparseVector, vec2: SparseVector) -> float:
    """
    Calculate the cosine similarity between two SparseVectors.
    Values near 1 indicate high similarity, while values near 0 indicate low similarity.
    """
    if vec1 is None or vec2 is None:
        return None
    dot_product = vec1.dot(vec2) # type: ignore
    norm1 = vec1.norm(2)
    norm2 = vec2.norm(2)
    if norm1 == 0 or norm2 == 0:
        return 0.0
    return dot_product / (norm1 * norm2)

v1 = sparse_vector_df.select("ratings_vector").collect()[0][0]
v2 = sparse_vector_df.select("ratings_vector").collect()[2][0]
print("First two vectors:", v1, v2)
similarity = cosine_similarity(v1, v2)
print("Cosine similarity between first two vectors:", similarity)

First two vectors: (611,[1,5,7,17,18,19,21,27,31,32,33,40,43,44,45,46,50,54,57,63,64,66,68,71,73,78,82,86,89,91,93,96,98,103,107,112,119,121,130,132,134,135,137,140,141,144,145,151,153,155,156,159,160,161,166,167,169,171,177,178,179,182,185,186,191,193,201,202,206,213,214,216,217,223,226,232,233,239,240,247,249,252,263,264,266,270,273,274,275,276,277,279,280,288,290,291,292,293,298,304,307,314,323,328,330,332,334,336,337,339,341,347,350,353,357,359,364,367,372,373,378,380,381,382,385,389,391,396,399,411,412,420,422,432,436,438,448,451,453,456,460,462,468,469,470,471,474,476,477,480,483,484,488,490,492,500,504,509,514,517,522,524,525,528,529,533,534,541,544,550,555,559,560,561,562,567,570,572,573,579,580,584,587,590,596,597,599,600,601,603,604,605,606,607,608,609,610],[4.0,4.0,4.5,4.5,3.5,4.0,3.5,3.0,5.0,3.0,3.0,5.0,5.0,3.0,4.0,5.0,3.0,3.0,5.0,5.0,4.0,4.0,2.5,5.0,4.5,4.0,2.5,4.0,3.0,4.0,3.0,5.0,4.5,4.0,4.0,3.0,3.5,4.0,3.0,2.0,3.0,4.0,4.0,3.0,4.0,3.5,5.0,5.0,2.0,3.0,4.0,4.5,4.0,4.0,5.0,3

In [None]:
BUCKET_LENGTH = 0.01
NUM_HASH_TABLES: int  = 30
DISTANCE_THRESHOLD = 0.5

@udf(returnType=VectorUDT())
def normalize_vector(vector: SparseVector):
    if vector is None or vector.values.size == 0:
        return vector # or an empty vector of same size: Vectors.sparse(vector.size, [], [])
    
    mean_val = vector.values.mean()
    new_values = vector.values - mean_val
    norm_val = (new_values.dot(new_values)) ** 0.5
    
    if norm_val == 0.0:
        norm_val = vector.norm(2)  # Fallback to L2 norm if the new values are all zero
        return Vectors.sparse(
            vector.size,
            vector.indices,
            vector.values / norm_val
        )
        
    return Vectors.sparse(
            vector.size,
            vector.indices,
            new_values / norm_val
        )
    
df = sparse_vector_df.withColumn(
        "features",
        normalize_vector(col("ratings_vector"))
    )

brp = BucketedRandomProjectionLSH(
    inputCol="features",
    outputCol="hashes",
    bucketLength=BUCKET_LENGTH,
    numHashTables=NUM_HASH_TABLES
)

model = brp.fit(df)
transformed_df = model.transform(df)
transformed_df.show(5)

+-------+--------------------+-----------+--------------------+--------------------+
|movieId|      ratings_vector|num_ratings|            features|              hashes|
+-------+--------------------+-----------+--------------------+--------------------+
|      1|(611,[1,5,7,17,18...|        197|(611,[1,5,7,17,18...|[[1.0], [-2.0], [...|
|      2|(611,[6,8,18,19,2...|         94|(611,[6,8,18,19,2...|[[1.0], [2.0], [-...|
|      3|(611,[1,6,19,32,4...|         46|(611,[1,6,19,32,4...|[[3.0], [3.0], [-...|
|      4|(611,[6,14,84,262...|          6|(611,[6,14,84,262...|[[0.0], [-2.0], [...|
|      5|(611,[6,31,43,45,...|         47|(611,[6,31,43,45,...|[[-4.0], [1.0], [...|
+-------+--------------------+-----------+--------------------+--------------------+
only showing top 5 rows



In [53]:
def find_similar_movies(movie_vector_q: SparseVector):
    """
    Find movies similar to the given movie_id based on the LSH model.
    Returns a list of tuples (movieId, similarity_score).
    """
    similar_movies = model.approxNearestNeighbors(
        transformed_df,
        movie_vector_q,
        numNearestNeighbors=10,
        distCol="distance",
    ).filter(
        col("movieId") != movie_id
    ).withColumn(
        "distance",
        1-col("distance") ** 2 / 2 # since A and B are normalized -> ∣∣A−B∣∣^2=2−2cos(θ) <=> cos(θ) = 1 - ∣∣A−B∣∣^2/2
    ).filter(
        col("distance") > DISTANCE_THRESHOLD
    ).select("movieId", "distance")
    return similar_movies.collect()
# Example usage
movie_id = 229  # Replace with the movieId you want to find similar movies for
movie_vector = transformed_df.filter(col("movieId") == movie_id).select("features").first()[0]
t = time.time()
similar_movies = find_similar_movies(movie_vector)
print(f"Similar movies to movieId {movie_id} (took {time.time()-t:.2f}s):")
for movie, score in similar_movies:
    print(f"MovieId: {movie}, Similarity Score: {score}")

Similar movies to movieId 229 (took 1.98s):
MovieId: 347, Similarity Score: 0.7282190812544191


In [56]:
def approximate_rating(user_id: int, movie_id: int) -> float:
    """
    Approximate the rating for a user and movie using the LSH model.
    Returns the average rating of similar movies weighted by similarity.
    """
    movie_vector = transformed_df.filter(col("movieId") == movie_id).select("features").first()[0]
    similar_movies = find_similar_movies(movie_vector)
    if not similar_movies:
        return calculate_baseline_rating(user_id, movie_id)
    
    total_weighted_rating = 0.0
    total_similarity = 0.0
    
    for sim_movie_id, similarity in similar_movies:
        rating = train_df.filter(
            (col("userId") == user_id) & (col("movieId") == sim_movie_id)
        ).withColumn(
            "rating",
            col("rating") - col("baseline_rating")
        ).select("rating").first()
        
        if rating is not None:
            total_weighted_rating += rating[0] * similarity
            total_similarity += similarity
            
    if total_similarity == 0:
        return calculate_baseline_rating(user_id, movie_id)
    
    return total_weighted_rating / total_similarity + calculate_baseline_rating(user_id, movie_id)

# Example usage
user_id = 199  
movie_id = 229  
approx_rating = approximate_rating(user_id, movie_id)
actual_rating = train_df.filter(
    (col('userId') == user_id) & (col('movieId') == movie_id)
).select('rating').first()
print(f"Approximate rating for userId {user_id} and movieId {movie_id}: {approx_rating} (actual rating: {actual_rating[0] if actual_rating else 'N/A'})")


Approximate rating for userId 199 and movieId 229: 3.472315890264453 (actual rating: 3.0)


In [None]:
test_df_with_baseline = test_df.withColumn(
    "baseline_rating",
    calculate_baseline_rating_udf(col("userId"), col("movieId"))
).withColumnRenamed("rating", "actual_rating")

test_movies_features = test_df_with_baseline.select("movieId").distinct() \
    .join(transformed_df.select("movieId", "features"), "movieId", "inner") \
    .withColumnRenamed("movieId", "test_movieId") \
    .withColumnRenamed("features", "features")

In [None]:
DISTANCE_THRESHOLD_EUCLIDIAN = (2-2*DISTANCE_THRESHOLD)**0.5
similar_movies_for_test = model.approxSimilarityJoin(
    test_movies_features,
    transformed_df.select("movieId", "features"),
    DISTANCE_THRESHOLD_EUCLIDIAN,
    distCol="raw_distance"
)
similar_movies_for_test.show(5)
similar_movies_for_test.count()

+--------------------+--------------------+------------------+
|            datasetA|            datasetB|      raw_distance|
+--------------------+--------------------+------------------+
|{19, (611,[6,14,2...|{19, (611,[6,14,2...|               0.0|
|{52, (611,[4,32,8...|{52, (611,[4,32,8...|               0.0|
|{250, (611,[6,274...|{2208, (611,[186,...|0.8789764960572334|
|{347, (611,[109,1...|{229, (611,[191,1...|0.7372664630180608|
|{750, (611,[7,16,...|{750, (611,[7,16,...|               0.0|
+--------------------+--------------------+------------------+
only showing top 5 rows



12563

In [None]:
predicted_ratings_prep = similar_movies_for_test \
    .withColumn("similarity", 1 - col("raw_distance") ** 2 / 2) \
    .filter(col("similarity") > DISTANCE_THRESHOLD) \
    .filter(col("datasetA.test_movieId") != col("datasetB.movieId")) \
    .select(
        col("datasetA.test_movieId").alias("movieId"), # The movie from the test set
        col("datasetB.movieId").alias("similar_movieId"), # A movie similar to the test movie
        "similarity"
    )
predicted_ratings_prep.show(5)

+-------+---------------+------------------+
|movieId|similar_movieId|        similarity|
+-------+---------------+------------------+
|    250|           2208|0.6137001596894742|
|    347|            229|0.7282190812544191|
|   1837|            725|0.7071067811865476|
|   2841|          69849|0.6415411846289303|
|   2883|           5428|0.7071067811865476|
+-------+---------------+------------------+
only showing top 5 rows



In [None]:
predictions_with_test_info = test_df_with_baseline.alias("td") \
    .join(predicted_ratings_prep.alias("prp"), col("td.movieId") == col("prp.movieId"), "inner") \
    .select(
        col("td.userId"),
        col("td.movieId"),
        col("td.actual_rating"),
        col("td.baseline_rating"),
        col("prp.similar_movieId"),
        col("prp.similarity")
    )

final_predictions_data = predictions_with_test_info.alias("pti") \
    .join(
        train_df.alias("tnd"),
        (col("pti.userId") == col("tnd.userId")) & (col("pti.similar_movieId") == col("tnd.movieId")),
        "inner" # Use inner join to only consider similar movies that the user has rated
    ) \
    .select(
        col("pti.userId"),
        col("pti.movieId"),
        col("pti.actual_rating"),
        col("pti.baseline_rating"),
        col("pti.similarity"),
        col("tnd.rating").alias("similar_movie_rating"),
        col("tnd.baseline_rating").alias("similar_movie_baseline_rating")
    )

final_predictions_data.show(5)


+------+-------+-------------+---------------+------------------+--------------------+-----------------------------+
|userId|movieId|actual_rating|baseline_rating|        similarity|similar_movie_rating|similar_movie_baseline_rating|
+------+-------+-------------+---------------+------------------+--------------------+-----------------------------+
|    28|   2841|          2.5|      3.0604281|0.6415411846289303|                 3.5|                    2.6886334|
|   600|   7004|          3.0|      2.4673922|0.6063390625908325|                 3.0|                    2.4673922|
|   232|   8860|          3.0|      2.5806592|0.6433738724816302|                 3.0|                    2.5806592|
|   474|  27912|          3.0|      3.2876277|0.6402779119256774|                 3.5|                    2.8501277|
|   249|  33085|          2.5|      2.3897128| 0.661666819989295|                 4.0|                    3.6272128|
+------+-------+-------------+---------------+------------------

In [None]:
final_predicted_ratings_df = final_predictions_data.groupBy("userId", "movieId", "actual_rating","baseline_rating") \
    .agg(
        (spark_sum((col("similar_movie_rating")-col("similar_movie_baseline_rating")) * col("similarity")) / spark_sum(col("similarity"))).alias("weighted_normalized_prediction")
    ) \
    .withColumn("predicted_rating", col("weighted_normalized_prediction") + col("baseline_rating")) \
    .select("userId", "movieId", "actual_rating", "predicted_rating")
    
print("\nFinal Predicted Ratings (with similarity-based prediction):")
final_predicted_ratings_df.show(5)


Final Predicted Ratings (with similarity-based prediction):
+------+-------+-------------+------------------+
|userId|movieId|actual_rating|  predicted_rating|
+------+-------+-------------+------------------+
|    68|   2606|          2.5|3.7115384340286255|
|   474|   4808|          2.0| 2.642857074737549|
|   232|  51084|          3.5|3.4406007339496902|
|   599|   3388|          3.0| 2.360421714121408|
|   339|  81834|          2.5| 2.607619285583496|
+------+-------+-------------+------------------+
only showing top 5 rows



In [None]:
movies_without_similarity_prediction = test_df_with_baseline.alias("td") \
    .join(final_predicted_ratings_df.alias("fpd"),
          (col("td.userId") == col("fpd.userId")) & (col("td.movieId") == col("fpd.movieId")),
          "left_anti") \
    .select(
        col("td.userId"),
        col("td.movieId"),
        col("td.actual_rating"),
        col("td.baseline_rating").alias("predicted_rating") # Use baseline as prediction
    )

print("\nMovies where only Baseline Rating is used as prediction:")
movies_without_similarity_prediction.show(5)
full_predicted_ratings_df = final_predicted_ratings_df.unionByName(movies_without_similarity_prediction)


Movies where only Baseline Rating is used as prediction:
+------+-------+-------------+----------------+
|userId|movieId|actual_rating|predicted_rating|
+------+-------+-------------+----------------+
|     4|    599|          2.0|       4.0745645|
|    18|   1721|          4.0|       3.7142537|
|    80|  85397|          4.0|        3.751777|
|    95|   6934|          4.0|       3.6883564|
|   105|  30812|          4.0|       4.1092324|
+------+-------+-------------+----------------+
only showing top 5 rows



In [None]:
print("\nFull Predicted Ratings for Test Set:")
full_predicted_ratings_df.show(5)

rmse_lsh = calculate_rmse(full_predicted_ratings_df)
print(f"\nRMSE of LSH-based model on test set: {rmse_lsh}")


Full Predicted Ratings for Test Set:
+------+-------+-------------+------------------+
|userId|movieId|actual_rating|  predicted_rating|
+------+-------+-------------+------------------+
|    68|   2606|          2.5|3.7115384340286255|
|   474|   4808|          2.0| 2.642857074737549|
|   232|  51084|          3.5|3.4406007339496902|
|   599|   3388|          3.0| 2.360421714121408|
|   339|  81834|          2.5| 2.607619285583496|
+------+-------+-------------+------------------+
only showing top 5 rows


RMSE of LSH-based model on test set: 0.9075363974250692
