<a href="https://colab.research.google.com/github/EonTechie/Big_Data_Processing_Spark_Projects/blob/main/spark-sql-tasks/UserSimilarityEngine.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:

# Project Context:
# This notebook was developed as part of the "Big Data Processing" course at Sabancı University.
# The individual effort and implementation belong to @EonTechie, the owner of this GitHub account.
# All personal or identifying information has been removed or anonymized in accordance with privacy and ethical standards.

"""
Objective:
The goal of this task was to identify the top 10 most similar users for each user based on their movie genre preferences.

Methodology:
Instead of relying on individual movie ratings, I used each user's average rating per genre. This decision was made for the following reasons:

Not all users rate all movies, which makes the user-item rating matrix sparse.

Sparse matrices negatively affect the accuracy of similarity calculations.

By aggregating ratings at the genre level, I created denser and more general user profiles, making similarity comparisons more meaningful.

Implementation:
I used PySpark for all computations to handle the data efficiently.

First, I joined the ratings and movies datasets, and exploded the genres column to work with each genre individually.

Then, I calculated average ratings per user per genre and pivoted the data to create a user-genre matrix.

Missing values (for unrated genres) were filled with zeros.

Using VectorAssembler, I converted each user's genre ratings into a feature vector.

I calculated cosine similarity between user vectors using a custom UDF.

Finally, I selected the top 10 most similar users for each user based on similarity scores.

Conclusion:

I believe this approach accurately reflects user similarity.
Using genre-level averages allowed me to overcome sparsity issues and compare users based on their general movie taste, rather than individual movies.
"""

from google.colab import drive
drive.mount('/content/drive')

import os

folder_path = "/content/drive/My Drive/datasets/ml-latest-small"
files = os.listdir(folder_path)
print(files)

Mounted at /content/drive
['movies.csv', 'README.txt', 'ratings.csv', 'links.csv', 'tags.csv']


In [None]:
# Create a SparkSession to initialize Spark functionalities.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Part_2_Question_2-Movielens-cosineSimilarity").getOrCreate()


In [None]:
# Import necessary PySpark functions and classes for data transformation and ML processing.

# col: Used to refer to a column by name for DataFrame transformations.
# lag: Used for window functions to access previous row values.
# regexp_replace: Used to clean or modify string values in columns using regular expressions.
# round: Used to round numeric values, often for formatting similarity scores or ratings.
from pyspark.sql.functions import col, lag, regexp_replace, round, split, explode, sum, isnull, udf

# Window: Defines a window specification for operations like lag, rank, etc.
from pyspark.sql.window import Window

# StringIndexer: Converts string categorical variables (like user IDs or movie IDs) into numeric indices.
# VectorAssembler: Combines multiple numeric columns into a single feature vector, required by ML models.
from pyspark.ml.feature import StringIndexer, VectorAssembler

# Vectors: Provides utilities for working with MLlib vectors (dense or sparse), used in similarity calculations.
from pyspark.ml.linalg import Vectors

from pyspark.sql.types import DoubleType
from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import col, sqrt, expr, sum as spark_sum

In [None]:
# Read the ratings.csv file from Google Drive using Spark.
# inferSchema=True: Automatically detects and assigns data types to columns.
# header=True: Treats the first row as column headers.
df_ratings = spark.read.option("inferSchema", "true").option("header", "true").csv("/content/drive/My Drive/datasets/ml-latest-small/ratings.csv")

# Display the first few rows of the DataFrame to understand its structure.
df_ratings.show()

# Count the total number of rows (i.e., total number of ratings in the dataset).
df_ratings.count()


+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



100836

In [None]:
# Read the movies.csv file from Google Drive using Spark.
# inferSchema=True: Automatically infers data types (e.g., Integer for movieId, String for title).
# header=True: Uses the first row of the CSV file as column names.
df_movies = spark.read.option("inferSchema", "true").option("header", "true").csv("/content/drive/My Drive/datasets/ml-latest-small/movies.csv")

# Display the first few rows of the movies DataFrame to inspect movie titles and genres.
df_movies.show()

# Count the total number of movie entries available in the dataset.
df_movies.count()


+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

9742

In [None]:
# Join the ratings and movies DataFrames on the 'movieId' column.
# This adds movie titles and genres to each user rating, enriching the dataset.
df = df_ratings.join(df_movies, "movieId")

# Show the combined DataFrame with userId, movieId, rating, timestamp, title, and genres.
df.show()

# Count the total number of rows in the joined DataFrame (same as ratings count if all movieIds matched).
df.count()


+-------+------+------+---------+--------------------+--------------------+
|movieId|userId|rating|timestamp|               title|              genres|
+-------+------+------+---------+--------------------+--------------------+
|      1|     1|   4.0|964982703|    Toy Story (1995)|Adventure|Animati...|
|      3|     1|   4.0|964981247|Grumpier Old Men ...|      Comedy|Romance|
|      6|     1|   4.0|964982224|         Heat (1995)|Action|Crime|Thri...|
|     47|     1|   5.0|964983815|Seven (a.k.a. Se7...|    Mystery|Thriller|
|     50|     1|   5.0|964982931|Usual Suspects, T...|Crime|Mystery|Thr...|
|     70|     1|   3.0|964982400|From Dusk Till Da...|Action|Comedy|Hor...|
|    101|     1|   5.0|964980868|Bottle Rocket (1996)|Adventure|Comedy|...|
|    110|     1|   4.0|964982176|   Braveheart (1995)|    Action|Drama|War|
|    151|     1|   5.0|964984041|      Rob Roy (1995)|Action|Drama|Roma...|
|    157|     1|   5.0|964984100|Canadian Bacon (1...|          Comedy|War|
|    163|   

100836

In [None]:
# Split the 'genres' string into an array using '|' as the delimiter,
# then explode the array so each genre gets its own row.
# This helps in genre-based analysis by separating multi-genre movies.
df_genre = df.withColumn("genres", explode(split(col("genres"), "\\|")))

# Show the transformed DataFrame where each row now has a single genre.
df_genre.show()

# Print the total number of rows after exploding genres (more than original due to one-to-many expansion).
print("Genre Amount: ", df_genre.count())

# Print the number of unique users who rated movies, useful for similarity calculation.
print("Distinct User Number: ", df.select("userId").distinct().count())


+-------+------+------+---------+--------------------+---------+
|movieId|userId|rating|timestamp|               title|   genres|
+-------+------+------+---------+--------------------+---------+
|      1|     1|   4.0|964982703|    Toy Story (1995)|Adventure|
|      1|     1|   4.0|964982703|    Toy Story (1995)|Animation|
|      1|     1|   4.0|964982703|    Toy Story (1995)| Children|
|      1|     1|   4.0|964982703|    Toy Story (1995)|   Comedy|
|      1|     1|   4.0|964982703|    Toy Story (1995)|  Fantasy|
|      3|     1|   4.0|964981247|Grumpier Old Men ...|   Comedy|
|      3|     1|   4.0|964981247|Grumpier Old Men ...|  Romance|
|      6|     1|   4.0|964982224|         Heat (1995)|   Action|
|      6|     1|   4.0|964982224|         Heat (1995)|    Crime|
|      6|     1|   4.0|964982224|         Heat (1995)| Thriller|
|     47|     1|   5.0|964983815|Seven (a.k.a. Se7...|  Mystery|
|     47|     1|   5.0|964983815|Seven (a.k.a. Se7...| Thriller|
|     50|     1|   5.0|96

In [None]:
# Group the data by user and genre, then calculate the average rating for each genre per user.
# This gives us each user's average preference per genre, useful for user similarity comparison.
user_genre_avg = df_genre.groupBy("userId", "genres").avg("rating")

# Display the average ratings, ordered by userId for easier inspection.
user_genre_avg.orderBy("userId", ascending=False).show()

# Print the number of rows (i.e., how many user-genre average rating combinations exist).
print("Row Number: ", user_genre_avg.count())

# Print the number of distinct users with genre ratings — confirms user coverage in this table.
print("Distinct User Number: ", user_genre_avg.select("userId").distinct().count())

"""
I decided to use each user's average rating per genre as a basis for similarity calculation.
Since not all users have rated all movies, using individual movie ratings would result in a
sparse matrix, which could negatively impact the accuracy of similarity measurements.
"""

+------+-----------+------------------+
|userId|     genres|       avg(rating)|
+------+-----------+------------------+
|   610|  Animation|3.9015151515151514|
|   610|  Film-Noir|              4.35|
|   610|     Action|3.6005802707930368|
|   610|   Thriller| 3.573529411764706|
|   610|     Sci-Fi|3.6593625498007967|
|   610|       IMAX|3.6280487804878048|
|   610|    Western| 3.742424242424242|
|   610|    Mystery|3.7666666666666666|
|   610|     Horror|3.5066006600660065|
|   610|   Children|3.6517857142857144|
|   610|      Crime|   3.8003663003663|
|   610|        War| 3.776595744680851|
|   610|    Fantasy|3.5927152317880795|
|   610|    Romance|  3.73109243697479|
|   610|      Drama| 3.874739039665971|
|   610|     Comedy|3.7311435523114356|
|   610|    Musical|3.9285714285714284|
|   610|Documentary|               4.2|
|   610|  Adventure|3.7059925093632957|
|   609|    Romance|               3.2|
+------+-----------+------------------+
only showing top 20 rows

Row Number:  1

In [None]:
# Pivot the table so that each row represents a user and each column represents a genre.
# The cell values are the user's average rating for that genre.
# This creates a user-genre rating matrix for calculating user similarity.
user_genre_vector = user_genre_avg.groupBy("userId").pivot("genres").avg("avg(rating)")

# Show the resulting matrix sorted by userId.
# Each row is a user, and each column shows their average rating for a genre.
user_genre_vector.orderBy("userId", ascending=False).show()

# Print the number of rows (equal to the number of users with ratings).
print("Row Number: ", user_genre_vector.count())

# Confirm the number of distinct users in the pivoted matrix.
print("Distinct User Number: ", user_genre_vector.select("userId").distinct().count())


+------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|userId|(no genres listed)|            Action|         Adventure|         Animation|          Children|            Comedy|             Crime|       Documentary|             Drama|           Fantasy|         Film-Noir|            Horror|              IMAX|           Musical|           Mystery|           Romance|            Sci-Fi|          Thriller|               War|           Western|
+------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----

In [None]:
# Drop the column for '(no genres listed)' since it does not provide meaningful genre information.
# Then, fill all remaining null values with 0.
# This is required for similarity calculations — we treat missing ratings as zero (no preference).
user_genre_ready_vector = user_genre_vector.drop("(no genres listed)").fillna(0)

# Show the cleaned and prepared user-genre rating matrix.
user_genre_ready_vector.show()

+------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|userId|            Action|         Adventure|         Animation|          Children|            Comedy|             Crime|      Documentary|             Drama|           Fantasy|        Film-Noir|            Horror|              IMAX|           Musical|           Mystery|           Romance|            Sci-Fi|          Thriller|               War|           Western|
+------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+-----------------+------------------+------------------+------------------+-----------

In [None]:
# Create a new DataFrame to check the number of null values in each column
# isnull(col(c)) checks if a value is null, cast("int") turns True/False into 1/0, and sum() gives total nulls per column.
# This is useful to understand data sparsity — which genres users haven't rated.
null_counts = user_genre_ready_vector.select([
    sum(isnull(col(column)).cast("int")).alias(column)
    for column in user_genre_ready_vector.columns
])

# Display the number of null (missing) values for each genre column.
null_counts.show()


+------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+----+-------+-------+-------+------+--------+---+-------+
|userId|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|IMAX|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|
+------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+----+-------+-------+-------+------+--------+---+-------+
|     0|     0|        0|        0|       0|     0|    0|          0|    0|      0|        0|     0|   0|      0|      0|      0|     0|       0|  0|      0|
+------+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+----+-------+-------+-------+------+--------+---+-------+



In [None]:
# Import required functions to analyze zero values in the cleaned matrix.

# Count how many zeros exist in each genre column (excluding 'userId').
# (col(c) == 0) checks if a cell is zero, cast("int") converts True/False to 1/0, sum(...) totals zeros per genre.
# This gives insight into which genres are less rated or preferred across users.
zero_count = user_genre_ready_vector.select([
    sum((col(column) == 0).cast("int")).alias(column)
    for column in user_genre_ready_vector.columns if column != "userId"
])

# Show the count of zeros for each genre column.
zero_count.show()

+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+----+-------+-------+-------+------+--------+---+-------+
|Action|Adventure|Animation|Children|Comedy|Crime|Documentary|Drama|Fantasy|Film-Noir|Horror|IMAX|Musical|Mystery|Romance|Sci-Fi|Thriller|War|Western|
+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+----+-------+-------+-------+------+--------+---+-------+
|     2|        4|       83|      51|     1|    7|        387|    0|     27|      371|    75| 152|    140|     30|      4|     5|       1| 59|    190|
+------+---------+---------+--------+------+-----+-----------+-----+-------+---------+------+----+-------+-------+-------+------+--------+---+-------+



In [None]:
from pyspark.ml.feature import VectorAssembler

# Select all genre columns except 'userId' to use as input features.
# These columns represent the average rating each user gave to each genre.
input_cols = [column for column in user_genre_ready_vector.columns if column != "userId"]

# Initialize a VectorAssembler to combine genre columns into a single feature vector.
# This is required for ML operations like similarity calculation.
assembler = VectorAssembler(inputCols=input_cols, outputCol="features")

# Apply the assembler to create a new DataFrame with a 'features' column.
# Each row will now contain: (userId, features), where 'features' is a dense vector of genre ratings.
user_vectors_df = assembler.transform(user_genre_ready_vector).select("userId", "features")

# Display the first 10 rows to verify the structure of the transformed DataFrame.
user_vectors_df.show(10, False)


+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|features                                                                                                                                                                                                                         |
+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|471   |[3.7083333333333335,3.6363636363636362,3.9285714285714284,3.9285714285714284,3.9642857142857144,4.0,0.0,4.041666666666667,4.333333333333333,0.0,3.5,3.625,3.5,4.0,3.9285714285714284,3.5714285714285716,3.5625,3.9,0.0]          |
|148   |[3.5555555555555554,3.661290322580645,3.710526315789

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# Define a basic cosine similarity function using Spark's vector operations.
# This function computes the cosine similarity between two user vectors.
# If either vector has zero length (norm), it returns 0 to avoid division by zero.
def cosine_sim(v1, v2):
    dot = float(v1.dot(v2))               # Compute dot product
    norm1 = float(v1.norm(2))             # Compute L2 norm of vector 1
    norm2 = float(v2.norm(2))             # Compute L2 norm of vector 2
    return dot / (norm1 * norm2) if norm1 != 0 and norm2 != 0 else 0.0

# Register the Python function as a Spark UDF so it can be used in DataFrame operations.
cosine_udf = udf(cosine_sim, DoubleType())

# Perform a cross join between all user vectors to form all possible user pairs.
# Filter out self-pairs (user compared to themselves).
pairs = user_vectors_df.alias("a").crossJoin(user_vectors_df.alias("b")).filter("a.userId != b.userId")

# Show the first 10 user pairs to verify the structure before applying similarity computation.
pairs.show(10, False)

+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|features                                                                                                                                                                                                               |userId|features                                                                                                                                                                                                                                                    |
+------+----------------------

In [None]:
# Compute cosine similarity between each pair of users by applying the cosine_udf
# The result is stored in a new column named 'Similarity'
similarities = pairs.withColumn("Similarity", cosine_udf("a.features", "b.features"))

# Sort all user pairs in descending order based on their similarity score
# This helps identify the most similar user pairs
similarities.orderBy(col("Similarity").desc()).show()

similarities.count()

+------+--------------------+------+--------------------+------------------+
|userId|            features|userId|            features|        Similarity|
+------+--------------------+------+--------------------+------------------+
|   380|[3.70578512396694...|   249|[3.71304347826086...|0.9996184750784334|
|   249|[3.71304347826086...|   380|[3.70578512396694...|0.9996184750784334|
|   599|[2.73614775725593...|   387|[3.15071770334928...|0.9994634724598694|
|   387|[3.15071770334928...|   599|[2.73614775725593...|0.9994634724598694|
|   274|[3.24559471365638...|   249|[3.71304347826086...|0.9994432810848193|
|   249|[3.71304347826086...|   274|[3.24559471365638...|0.9994432810848193|
|   570|[3.45270270270270...|   304|[3.83720930232558...|0.9993982730738167|
|   304|[3.83720930232558...|   570|[3.45270270270270...|0.9993982730738167|
|   318|[3.54772727272727...|    18|[3.58851674641148...|0.9993890590945079|
|    18|[3.58851674641148...|   318|[3.54772727272727...|0.9993890590945079|

371490

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Define a window specification that groups by 'a.userId'
# and orders other users by descending similarity score.
window = Window.partitionBy("a.userId").orderBy(col("Similarity").desc())

# Add a 'rank' column based on the similarity score within each user group.
# Then, filter to keep only the top 10 most similar users per user.
top10_similar_users = similarities.withColumn("rank", row_number().over(window)).filter("rank <= 10")

# Show all top 10 similar users for every user. You can limit the number if needed.
top10_similar_users.show(371490)


+------+--------------------+------+--------------------+------------------+----+
|userId|            features|userId|            features|        Similarity|rank|
+------+--------------------+------+--------------------+------------------+----+
|     1|[4.32222222222222...|   452|[4.50900900900900...|0.9976494482750101|   1|
|     1|[4.32222222222222...|   217|[2.84100418410041...| 0.996617955927002|   2|
|     1|[4.32222222222222...|   422|[3.45,3.4,4.0,3.2...|0.9956865142709093|   3|
|     1|[4.32222222222222...|   465|[3.84090909090909...|0.9953553008711713|   4|
|     1|[4.32222222222222...|   587|[4.125,4.11111111...|0.9947635261059706|   5|
|     1|[4.32222222222222...|   469|[3.24324324324324...|0.9943978894814555|   6|
|     1|[4.32222222222222...|   309|[3.9375,3.925,4.0...|0.9876843086157079|   7|
|     1|[4.32222222222222...|   325|[3.13725490196078...|0.9868014577768169|   8|
|     1|[4.32222222222222...|    33|[4.0,3.9,3.833333...|0.9787339519219359|   9|
|     1|[4.32222