# To make sure and compare, I used and generated different similarity metrics and similarty scores.

# jaccard_similarity

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_set, size, array_distinct, expr
from pyspark.sql.window import Window
from pyspark.sql.functions import rank
spark = SparkSession.builder.appName("JaccardSimilarity").getOrCreate()
ratings_df = spark.read.csv("ratings.csv", header=True, inferSchema=True)
movies_per_user_df = ratings_df.groupBy("userId").agg(collect_set("movieId").alias("movies")) # Here I create a list of movies rated by each user
user_pairs_df = movies_per_user_df.alias("u1").join(movies_per_user_df.alias("u2"), col("u1.userId") < col("u2.userId")) # Join users with themselves to get all possible pairs
# I will calculate intersection and union now to find Jaccard Similarity
user_pairs_df = user_pairs_df.withColumn("intersection", size(expr("array_intersect(u1.movies, u2.movies)"))) \
                             .withColumn("union", size(expr("array_distinct(concat(u1.movies, u2.movies))"))) # I concat the two lists and find the distinct elements to get the union
user_pairs_df = user_pairs_df.withColumn("jaccard_similarity", col("intersection") / col("union"))
user_pairs_df = user_pairs_df.filter(col("jaccard_similarity") > 0).select(col("u1.userId").alias("user1"), col("u2.userId").alias("user2"), "jaccard_similarity") # Filter out users with 0 similarity
# Finding top 10 
window = Window.partitionBy("user1").orderBy(col("jaccard_similarity").desc())
top_10_similar_users = user_pairs_df.withColumn("rank", rank().over(window)) \
                                    .filter(col("rank") <= 10) \
                                    .select("user1", "user2", "jaccard_similarity")
top_10_similar_users.show()



CodeCache: size=131072Kb used=27531Kb max_used=27567Kb free=103540Kb
 bounds [0x00000001081e0000, 0x0000000109d00000, 0x00000001101e0000]
 total_blobs=10603 nmethods=9635 adapters=879
 compilation: disabled (not enough contiguous free space left)


[Stage 13:>                                                         (0 + 1) / 1]

+-----+-----+-------------------+
|user1|user2| jaccard_similarity|
+-----+-----+-------------------+
|    1|  313|0.23010752688172043|
|    1|  330|0.20351758793969849|
|    1|  452|0.19889502762430938|
|    1|  266|0.19420289855072465|
|    1|   45|0.18832391713747645|
|    1|   57|0.18791946308724833|
|    1|  469|0.18739352640545145|
|    1|  577|0.18731117824773413|
|    1|  135|0.18561484918793503|
|    1|   39|0.18149466192170818|
|    2|  366|0.17647058823529413|
|    2|  378|0.16666666666666666|
|    2|  417|0.14285714285714285|
|    2|  461|0.14285714285714285|
|    2|  550|               0.14|
|    2|  189|0.13953488372093023|
|    2|  433|0.13333333333333333|
|    2|  435|0.12698412698412698|
|    2|   65|              0.125|
|    2|  209|0.12280701754385964|
+-----+-----+-------------------+
only showing top 20 rows



                                                                                

# pearson_correlation

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, sum
from pyspark.sql.window import Window
from pyspark.sql.functions import rank
from pyspark.sql.functions import sqrt

spark = SparkSession.builder.appName("PearsonCorrelation").getOrCreate()
ratings_df = spark.read.csv("ratings.csv", header=True, inferSchema=True)
mean_ratings_df = ratings_df.groupBy("userId").agg(avg("rating").alias("mean_rating")) # I calculate mean ratings for each user
ratings_with_mean_df = ratings_df.join(mean_ratings_df, "userId") # I join mean ratings with ratings dataframe to get mean ratings for each rating
# Now I will calculate Pearson Correlation Coefficient for each pair of users.
pairs_df = ratings_with_mean_df.alias("r1").join(ratings_with_mean_df.alias("r2"), col("r1.movieId") == col("r2.movieId")) \
                               .filter(col("r1.userId") < col("r2.userId"))
stats_df = pairs_df.groupBy("r1.userId", "r2.userId").agg(
    count(col("r1.movieId")).alias("num_common_movies"),
    sum((col("r1.rating") - col("r1.mean_rating")) * (col("r2.rating") - col("r2.mean_rating"))).alias("dot_product"),
    sum((col("r1.rating") - col("r1.mean_rating"))**2).alias("rating_sq_sum_user1"),
    sum((col("r2.rating") - col("r2.mean_rating"))**2).alias("rating_sq_sum_user2")
)
pearson_df = stats_df.withColumn("pearson_correlation", col("dot_product") / (sqrt("rating_sq_sum_user1") * sqrt("rating_sq_sum_user2")))
filtered_pearson_df = pearson_df.filter((col("pearson_correlation") > 0) & (col("num_common_movies") > 3)) # here I filter out users with 0 correlation and less than 3 common movies
# Finding top 10 
window = Window.partitionBy("r1.userId").orderBy(col("pearson_correlation").desc())
top_10_similar_users = filtered_pearson_df.withColumn("rank", rank().over(window)) \
                                          .filter(col("rank") <= 10) \
                                          .select(col("r1.userId").alias("user1"), col("r2.userId").alias("user2"), "pearson_correlation")
top_10_similar_users.show()


[Stage 32:>                                                         (0 + 1) / 1]

+-----+-----+-------------------+
|user1|user2|pearson_correlation|
+-----+-----+-------------------+
|    1|   77| 1.0000000000000002|
|    1|   13| 0.9478788458420679|
|    1|  157|  0.901774575834698|
|    1|  139| 0.8903416876712336|
|    1|  401| 0.8713212630061857|
|    1|  511| 0.8655816064894617|
|    1|  473|  0.840746829564101|
|    1|  366| 0.8352756122978426|
|    1|  258| 0.8320502943378436|
|    1|   65| 0.8235706673737208|
|    2|  246| 0.7901797012290985|
|    2|   91| 0.7173712228393851|
|    2|  189| 0.6281794253820807|
|    2|  332| 0.5764940711934559|
|    2|  326| 0.5539173064473961|
|    2|  393| 0.5480433465368264|
|    2|  308| 0.5123980142682039|
|    2|  596|  0.492892933147424|
|    2|  209| 0.4904972228097824|
|    2|  567|0.48496919806182326|
+-----+-----+-------------------+
only showing top 20 rows



                                                                                

# adjusted_cosine_similarity

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sum, sqrt
from pyspark.sql.window import Window
from pyspark.sql.functions import rank
spark = SparkSession.builder.appName("AdjustedCosineSimilarity").getOrCreate()
ratings_df = spark.read.csv("ratings.csv", header=True, inferSchema=True)
avg_ratings_df = ratings_df.groupBy("userId").agg(avg("rating").alias("avg_rating")) # I calculate mean ratings for each user
normalized_ratings_df = ratings_df.join(avg_ratings_df, "userId").withColumn("norm_rating", col("rating") - col("avg_rating")) # I join mean ratings with ratings dataframe to get mean ratings for each rating

user_pairs_df = normalized_ratings_df.alias("r1").join(normalized_ratings_df.alias("r2"), col("r1.movieId") == col("r2.movieId")) \
                                     .filter(col("r1.userId") < col("r2.userId"))
# Now I will calculate Adjusted Cosine Similarity for each pair of users.
adj_cosine_components_df = user_pairs_df.groupBy("r1.userId", "r2.userId").agg(
    sum(col("r1.norm_rating") * col("r2.norm_rating")).alias("dot_product"),
    sqrt(sum(col("r1.norm_rating")**2)).alias("norm_user1"),
    sqrt(sum(col("r2.norm_rating")**2)).alias("norm_user2")
)
adj_cosine_similarity_df = adj_cosine_components_df.withColumn("adjusted_cosine_similarity", col("dot_product") / (col("norm_user1") * col("norm_user2")))

# Finding top 10
window = Window.partitionBy("r1.userId").orderBy(col("adjusted_cosine_similarity").desc())
top_10_similar_users = adj_cosine_similarity_df.withColumn("rank", rank().over(window)) \
                                                .filter(col("rank") <= 10) \
                                                .select(col("r1.userId").alias("user1"), col("r2.userId").alias("user2"), "adjusted_cosine_similarity")

top_10_similar_users.show()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/25 16:56:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/11/25 16:56:24 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


CodeCache: size=131072Kb used=23510Kb max_used=23510Kb free=107561Kb
 bounds [0x000000010c1e0000, 0x000000010d900000, 0x00000001141e0000]
 total_blobs=9464 nmethods=8511 adapters=862
 compilation: disabled (not enough contiguous free space left)


                                                                                

+-----+-----+--------------------------+
|user1|user2|adjusted_cosine_similarity|
+-----+-----+--------------------------+
|    1|   77|        1.0000000000000002|
|    1|  291|                       1.0|
|    1|  358|                       1.0|
|    1|   85|                       1.0|
|    1|  388|                       1.0|
|    1|   12|                       1.0|
|    1|  253|                       1.0|
|    1|    2|        0.9999999999999998|
|    1|  146|        0.9990496408681655|
|    1|  278|        0.9710607611177227|
|    2|  333|                       1.0|
|    2|  299|                       1.0|
|    2|  426|                       1.0|
|    2|  213|                       1.0|
|    2|  267|                       1.0|
|    2|  563|                       1.0|
|    2|  225|                       1.0|
|    2|  174|                       1.0|
|    2|  101|                       1.0|
|    2|  216|                       1.0|
+-----+-----+--------------------------+
only showing top

# euclidean_similarity

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sqrt, sum as spark_sum
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

spark = SparkSession.builder.appName("EuclideanSimilarity").getOrCreate()
ratings_df = spark.read.csv("ratings.csv", header=True, inferSchema=True)
avg_ratings_df = ratings_df.groupBy("userId").agg(avg("rating").alias("avg_rating")) # I calculate mean ratings for each user
normalized_ratings_df = ratings_df.join(avg_ratings_df, "userId").withColumn("norm_rating", col("rating") - col("avg_rating")) # I join mean ratings with ratings dataframe to get mean ratings for each rating
# Now I will create pairs of users who have rated the same movie
user_pairs_df = normalized_ratings_df.alias("r1").join(normalized_ratings_df.alias("r2"), col("r1.movieId") == col("r2.movieId")) \
                                     .filter(col("r1.userId") < col("r2.userId"))
#Euclidean Distance
euclidean_components_df = user_pairs_df.groupBy("r1.userId", "r2.userId").agg(
    spark_sum((col("r1.norm_rating") - col("r2.norm_rating"))**2).alias("squared_distance")
)
# Now, I need to convert this to similarity. I will use 1 / (1 + distance) to convert distance to similarity. Inverted Euclidean Distance.
euclidean_similarity_df = euclidean_components_df.withColumn("euclidean_similarity", 1 / (sqrt("squared_distance") + 1))  # Add 1 to avoid division by zero
# Finding top 10
window = Window.partitionBy("r1.userId").orderBy(col("euclidean_similarity").desc())
top_10_similar_users = euclidean_similarity_df.withColumn("rank", rank().over(window)) \
                                              .filter(col("rank") <= 10) \
                                              .select(col("r1.userId").alias("user1"), col("r2.userId").alias("user2"), "euclidean_similarity")
top_10_similar_users.show()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/26 13:20:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/11/26 13:20:27 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


CodeCache: size=131072Kb used=23769Kb max_used=24037Kb free=107302Kb
 bounds [0x000000010a1e0000, 0x000000010b980000, 0x00000001121e0000]
 total_blobs=9507 nmethods=8554 adapters=862
 compilation: disabled (not enough contiguous free space left)


[Stage 6:>                                                          (0 + 1) / 1]

+-----+-----+--------------------+
|user1|user2|euclidean_similarity|
+-----+-----+--------------------+
|    1|  253|   0.984915545117876|
|    1|  291|  0.9022707314013297|
|    1|  358|  0.9006722848215131|
|    1|  388|  0.7972508591065292|
|    1|  472|  0.7387674456990245|
|    1|  550|  0.6951179407743328|
|    1|   85|    0.60222934799206|
|    1|  278|  0.5899813771892649|
|    1|  366|  0.5785567963282364|
|    1|   12|  0.5742521316444738|
|    2|  168|  0.9857168685590311|
|    2|  369|  0.9462501580877704|
|    2|   96|  0.9401496259351623|
|    2|  329|  0.9270326615705352|
|    2|  368|  0.9041114102436268|
|    2|  213|  0.8921442959164991|
|    2|  172|  0.8917800118273215|
|    2|  186|  0.8838840188806469|
|    2|  287|  0.8521991300144997|
|    2|  291|  0.8401869158878504|
+-----+-----+--------------------+
only showing top 20 rows



                                                                                