In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.functions import lit

In [2]:
# load up movie ID -> movie name dictionary
def loadMovieNames():
  movieNames = {}
  with open("/dbfs/FileStore/tables/u.item", encoding = "ISO-8859-1") as f:
    for line in f:
      fields = line.split('|')
      movieNames[int(fields[0])] = fields[1]
  return movieNames
# Concert u.data lines into (userID, movieID, rating) rows
def parseInput(line):
  fields = line.value.split()
  return Row(userID = int(fields[0]), movieID = int(fields[1]), rating = float(fields[2]))
  
if __name__ == '__main__':
#   Create a SparkSession (the config bit is only for Windows)
  spark = SparkSession.builder.appName('Movie_Recommender').getOrCreate()
  spark.conf.set("spark.sql.crossJoin.enabled", "true")
#   load up our movieID -> name dictionary
  movieNames = loadMovieNames()
#   Get the raw data
  lines = spark.read.text("/FileStore/tables/u.data").rdd
#   Cinvert it to a RDD of Row objects with (userID, movieID, rating)
  ratingsRDD = lines.map(parseInput)
#   Convert it to a DataFrame and cache it
  ratings = spark.createDataFrame(ratingsRDD).cache()
#   Create an ALS collaborative filtering model from the complete data set
  als = ALS(maxIter=5, regParam=0.01, userCol="userID", itemCol="movieID", ratingCol="rating")
  model = als.fit(ratings)
  
#   Print out ratings from user 196:
  print("\nRatings for userID 196:")
  userRatings = ratings.filter("userID = 196")
  for rating in userRatings.collect():
    print(movieNames[rating['movieID']], rating['rating'])
  
  print("\nTop 20 recommendations:")
#   Find movies rated more than 100 times
  ratingCounts = ratings.groupBy("movieID").count().filter("count > 100")
#   Construct a "test" dataframe for user 196 with every movie rated more than 100 times
  popularMovies = ratingCounts.select("movieID").withColumn('userID', lit(196))
  
#   Run our model on that list of popular movies for user ID 196
  recommendations = model.transform(popularMovies)
#   Get the top 20 movies with the highest predicted rating for this user
  topRecommendations = recommendations.sort(recommendations.prediction.desc()).take(20)

  for recommendation in topRecommendations:
    print(movieNames[recommendation['movieID']], recommendation['prediction'])
    
#   spark.stop()
  

In [3]:
spark.stop()