In [1]:
from pyspark.sql import SparkSession
import math
# Start Spark
spark = SparkSession.builder.appName("MoviesSimilarities").getOrCreate()
sc = spark.sparkContext

In [2]:
# Load movie names into dictionary
def loadMoviesName():
    movieNames = {}
    with open("movies.dat", encoding="ISO-8859-1") as f:
      for line in f:
        fields = line.split('::') # Corrected line: removed the first .split()
        movieNames[int(fields[0])] = fields[1]
    return movieNames

nameDict = loadMoviesName()

In [3]:
# Load ratings.dat
ratings = sc.textFile("ratings.dat")
ratings = ratings.map(lambda l: l.split("::"))
ratings = ratings.map(lambda l: (int(l[0]), (int(l[1]), float(l[2]))))


In [4]:
#Group all movies ratings by user
userRatings = ratings.groupByKey()


In [5]:
# Generate all movies movies pairs rated vy the same user
def createPairs(userRatings):
    movieRatings = list(userRatings)
    pairs = []
    for i in range(len(movieRatings)):
        for j in range(i+1, len(movieRatings)):
          movie1, rating1 = movieRatings[i]
          movie2, rating2 = movieRatings[j]
          pairs.append(((movie1, movie2), (rating1, rating2)))
    return pairs

moviePairs =userRatings.flatMap(lambda x: createPairs(x[1]))

In [6]:
# Cosine similarity
def cosineSimilarity(ratingPairs):
    numPairs = 0
    sum_xx = sum_yy = sum_xy = 0

    for ratingX, ratingY in ratingPairs:
      sum_xx += ratingX * ratingX
      sum_yy += ratingY * ratingY
      sum_xy += ratingX * ratingY
      numPairs += 1

    denominator = math.sqrt(sum_xx) * math.sqrt(sum_yy)

    if denominator:
      score = sum_xy / denominator
      return (score, numPairs)
    else:
      return (0, 0)

#Group by movie- pair and compute similarity
moviePairsRatings = moviePairs.groupByKey()
moviePairSimilarities = moviePairsRatings.mapValues(cosineSimilarity)

In [7]:
def getSimilarMovies(movieID, scoreThreshold=0.95, coOccurenceThreshold=3):

  filtered = moviePairSimilarities.filter(
      lambda x:
       (x[0][0] == movieID or x[0][1] == movieID) and
       x[1][0] > scoreThreshold and
       x[1][1] > coOccurenceThreshold
  )

  results = filtered.map(lambda x: (x[1], x[0])).sortByKey(ascending=False).take(10)

  print("\nTop similar movies for: " , nameDict[movieID], "\n")
  for sim, pair in results:
    score, count = sim
    other = pair[1] if pair[0] == movieID else pair[0]
    print(f"{nameDict[other]} | score={score:.3f} | co-ratings={count}")

In [8]:
getSimilarMovies(1) #Change movies ID here



Top similar movies for:  Movie 1 (2001) 

