In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.4/spark-3.2.4-bin-hadoop3.2.tgz
!tar xf spark-3.2.4-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.4-bin-hadoop3.2"

In [3]:
import findspark
findspark.init()
findspark.find()

'/content/spark-3.2.4-bin-hadoop3.2'

In [8]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType
import numpy as np
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

# Initialize Spark session
spark = SparkSession.builder.appName("UserSimilarity").getOrCreate()

# Load the ratings data
ratings_df = spark.read.csv("/content/ratings.csv", header=True, inferSchema=True)

# Select only necessary columns and cast them to appropriate types
ratings_df = ratings_df.select(
    col("userId").cast("int"),
    col("movieId").cast("int"),
    col("rating").cast("float")
)

# Define ALS model
als = ALS(
    maxIter=10,
    regParam=0.01,
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop",
    nonnegative=True
)

# Fit ALS model to the data
model = als.fit(ratings_df)

# Extract feature vectors for users
user_features = model.userFactors

# Defining cosine_similarity
def cosine_similarity(vec1, vec2):
    return float(np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2)))

cosine_similarity_udf = udf(cosine_similarity, FloatType())


# Compute pairwise similarities
users_cartesian = user_features.alias("uf1").crossJoin(user_features.alias("uf2"))
similarity_df = users_cartesian.withColumn("similarity", cosine_similarity_udf(col("uf1.features"), col("uf2.features")))

# Add user IDs back to the similarity dataframe
similarity_df = similarity_df.select(
    col("uf1.id").alias("userId1"),
    col("uf2.id").alias("userId2"),
    col("similarity")
)

# Extract top 10 similar users for each user
windowSpec = Window.partitionBy("userId1").orderBy(col("similarity").desc())
top_similar_users = similarity_df.withColumn("rank", rank().over(windowSpec)) \
                                 .filter(col("rank") <= 11) \
                                 .filter(col("userId1") != col("userId2"))

# Show or save results
top_similar_users.select("userId1", "userId2", "similarity").show()



+-------+-------+----------+
|userId1|userId2|similarity|
+-------+-------+----------+
|      1|    119| 0.9447821|
|      1|     54| 0.9331167|
|      1|    137| 0.9270684|
|      1|    273| 0.9262865|
|      1|    349| 0.9169113|
|      1|    169|0.91677123|
|      1|    453|0.91462857|
|      1|    401| 0.9142054|
|      1|    515| 0.9130469|
|      1|    484| 0.9116573|
|      3|    567| 0.8955833|
|      3|    301| 0.8747417|
|      3|    442| 0.8595087|
|      3|    245| 0.8054378|
|      3|    310|0.77963835|
|      3|    394|0.77755773|
|      3|    106|0.73008585|
|      3|    299| 0.7245862|
|      3|    375| 0.7101733|
|      3|     16|0.70192784|
+-------+-------+----------+
only showing top 20 rows



Since, I kept getting memory errors for cosine similarity and other similarity metrics, I had to use another method. For that purpose, I found ALS model to be useful.

ALS is preferred for large, sparse datasets like user-movie ratings because it efficiently handles sparse data and scales well with large datasets. It extracts latent features that represent underlying user preferences and item characteristics, providing deeper insights than direct rating comparisons.
