In [11]:
from pyspark import SparkConf
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import VectorAssembler, Normalizer
from pyspark.sql import SparkSession, functions, types
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline

In [12]:
spark = SparkSession.builder.master("local").appName("Recommendation").getOrCreate()
movies= spark.read.csv("./movies.csv", inferSchema = True, header = True)
ratings = spark.read.csv("./ratings.csv", inferSchema = True, header = True)

In [13]:
movies.show(5, truncate = False)
ratings.show(5)

+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
+-------+----------------------------------+-------------------------------------------+
only showing top 5 rows

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|   

First, for the ratings table, I use **groupBy()** to group data with the same movieId, **count()** to count the number of ratings for each movieId, and generate a new column "count", and then use **orderBy()** and **functions.desc()** to sort in descending order based on "count", and then use **limit()** to get the ten with the highest counts, so that to get ids of the top-10 movies with the largest number of ratings

Finally, use the id of the top10 movie and the movies table to perform a left join based on "movieId" column, and then use **select()** to select the “title” column to get the top10 movie names

In [14]:
topIds = ratings.groupBy("movieId").count().orderBy(functions.desc("count")).limit(10).select("movieId")
topNames = topIds.join(movies, topIds["movieId"] == movies["movieId"], "left").select("title")
topNames.show(truncate = False)

+-----------------------------------------+
|title                                    |
+-----------------------------------------+
|Forrest Gump (1994)                      |
|Shawshank Redemption, The (1994)         |
|Pulp Fiction (1994)                      |
|Silence of the Lambs, The (1991)         |
|Matrix, The (1999)                       |
|Star Wars: Episode IV - A New Hope (1977)|
|Jurassic Park (1993)                     |
|Braveheart (1995)                        |
|Terminator 2: Judgment Day (1991)        |
|Schindler's List (1993)                  |
+-----------------------------------------+





Since each movie may belong to multiple genres, I first used **functions.explode()** and **functions.split()** split the movie data belonging to multiple genres into multiple lines, each line containing exactly one genre.

Then I used **groupBy()** and **avg()** on the ratings table to average all ratings for each movie.

After this, I use **join()** to join the previous results together: i.e. each row of data should contain the movie's title, its genre, and the user's average rating for it.

Finally, for each genre's movie, use **filter()** to get all the data of movies with this genre, and use the method of the previous question to sort the data in descending order based on the average rating, and get the titles of the top 10.

For simplicity, only the results of the first three gernes are printed.

In [15]:
moviesG = movies.withColumn("genres", functions.explode(functions.split("genres", "\\|")))
ratingsAvg = ratings.groupBy("movieId").avg("rating")
moviesGR = moviesG.join(ratingsAvg, moviesG["movieId"]==
                                 ratingsAvg["movieId"], "left").select("title", "genres", "avg(rating)")

count = 0
for genre in moviesGR.select("genres").distinct().collect():
    print(genre[0])
    moviesGR.filter(moviesGR["genres"] == genre[0]).orderBy(
        functions.desc("avg(rating)")).limit(10).select("title").show(truncate=False)
    count += 1
    if (count == 3): break

Crime
+-------------------------------------------------------+
|title                                                  |
+-------------------------------------------------------+
|Ex Drummer (2007)                                      |
|Villain (1971)                                         |
|Mother (Madeo) (2009)                                  |
|Going Places (Valseuses, Les) (1974)                   |
|12 Angry Men (1997)                                    |
|American Friend, The (Amerikanische Freund, Der) (1977)|
|Sisters (Syostry) (2001)                               |
|Little Murders (1971)                                  |
|Faster (2010)                                          |
|Decalogue, The (Dekalog) (1989)                        |
+-------------------------------------------------------+

Romance
+----------------------------------------------------------------+
|title                                                           |
+--------------------------------------

I first select the "movieId", "userId" columns in the ratings table and the ids of the first 100 movies in the movies table.

Use two for loops to iterate through all movie pairs, the second loop starts from the position already traversed in the first loop to avoid double counting, use **filter()** to find the ratings for the first item and second item in movie pairs respectively, use inner join to find users who rated both items, and use **count()** to calculate the number of users who rated both items.

For simplicity, only the first 5 results of are printed.

In [16]:
movieUserId = ratings.select("movieId", "userId")
frist100MovieId = movies.select("movieId").limit(100).collect()

count = 0
for i, movieId1 in enumerate(frist100MovieId):
    for movieId2 in frist100MovieId[i+1:]:
        userMovieId1 = movieUserId.filter(movieUserId["movieId"]==movieId1[0])
        userMovieId2 = movieUserId.filter(movieUserId["movieId"]==movieId2[0])
        commonSupport = userMovieId1.join(userMovieId2, userMovieId1["userId"]
                                          ==userMovieId2["userId"], "inner").count()
        print("(" + str(movieId1[0]) + "," + str(movieId2[0]) + ") : " + str(commonSupport))
        count += 1
        if (count == 5): break
    break

(1,2) : 68
(1,3) : 32
(1,4) : 2
(1,5) : 32
(1,6) : 58


I used **randomSplit()** to split the data into training set and test set.
Using explicit feedback from **ASL()** to build a recommendation model.
Use the training set to train the model and use the test set to predict user ratings for movies

In [17]:
trainingSet, testSet = ratings.randomSplit([8., 2.])
alsExplicit = ALS(maxIter=5, regParam=0.01, userCol="userId", 
                  itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")
modelExplicit = alsExplicit.fit(trainingSet)
predictionsExplicit = modelExplicit.transform(testSet)
predictionsExplicit.show()



+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|   597|    471|   2.0| 941558175|  3.155017|
|    91|    471|   1.0|1112713817| 2.2742562|
|   372|    471|   3.0| 874415126| 3.4207525|
|   474|    471|   3.0| 974668858|  4.519757|
|   520|    471|   5.0|1326609921| 3.2034023|
|   411|    471|   4.0| 835532928| 2.6807187|
|    44|    833|   2.0| 869252237| 3.7162113|
|   307|    833|   1.0|1186172725| 2.4606986|
|   608|    833|   0.5|1117506344| 2.6023002|
|   606|   1088|   3.0|1171926956|  4.141743|
|   599|   1088|   2.5|1498515232| 2.8825223|
|   169|   1088|   4.5|1059427717| 4.5254645|
|    64|   1088|   4.0|1161559902| 2.9987643|
|    41|   1088|   1.5|1458939142| 3.8409963|
|   387|   1088|   1.5|1095040878| 2.9089382|
|   200|   1088|   4.0|1229887977|  5.091709|
|   188|   1088|   4.0| 962560253| 4.0625443|
|    68|   1088|   3.5|1158534614|  4.072585|
|   600|   1088|   3.5|1237851304|

                                                                                

Evaluate the model based on the root mean squared error of rating predictions

In [18]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol ="prediction")
rmseExplicit = evaluator.evaluate(predictionsExplicit)
print("Explicit:Root-mean-square error = "+str(rmseExplicit))



Explicit:Root-mean-square error = 1.1240775356812054




For a user(Id), **getRecommend()** returns the top 5 movies with the highest predictions of ratings, thus completing the recommendation.

In [9]:
def getRecommend(userId):
    recommendIds = predictionsExplicit.filter(predictionsExplicit["userId"]
                                              ==userId).orderBy(functions.desc("prediction")).limit(5)
    
    recommendMovies = recommendIds.join(movies, recommendIds["movieId"]
                                        ==movies["movieId"], "left").select("title")
    
    recommendMovies.show(truncate=False)
    
getRecommend(5)



+----------------------------------------------------------+
|title                                                     |
+----------------------------------------------------------+
|Like Water for Chocolate (Como agua para chocolate) (1992)|
|Remains of the Day, The (1993)                            |
|Shawshank Redemption, The (1994)                          |
|In the Line of Fire (1993)                                |
|Legends of the Fall (1994)                                |
+----------------------------------------------------------+



                                                                                