In [1]:
from pyspark.mllib.recommendation import Rating
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.sql import SparkSession
import pandas as pd

#build spark session
spark = SparkSession.builder.getOrCreate()

In [2]:
# get movie information

# movie titles might include characters like ' " ,

movieId = []
title = []
genres = []

counter = 0
with open('./ml-25m/movies.csv', 'r') as lines:
    header = lines.readline().split(',')
    for line in lines:
        counter +=1
        indexFirstComma = line.find(',')
        indexLastComma = line.rindex(',')
        m = line[:indexFirstComma]
        t = line[indexFirstComma+1:indexLastComma]
        g = line[indexLastComma+1:]
        
        
        movieId.append(m)
        title.append(t)
        genres.append(g.strip('\n'))

In [3]:
header[-1] = header[-1].strip('\n')

In [4]:
movies_pdf = pd.DataFrame(list(zip(movieId, title, genres)), columns= header)

In [5]:
movies_pdf.head(10)

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy
5,6,Heat (1995),Action|Crime|Thriller
6,7,Sabrina (1995),Comedy|Romance
7,8,Tom and Huck (1995),Adventure|Children
8,9,Sudden Death (1995),Action
9,10,GoldenEye (1995),Action|Adventure|Thriller


In [6]:
# get ratings

ratings = spark.read.load('./ml-25m/ratings.csv', format='csv', sep=',', header="true", inferSchema="true")

In [7]:
# drop timestamp column
ratings = ratings.drop('timestamp')

In [8]:
ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|    296|   5.0|
|     1|    306|   3.5|
|     1|    307|   5.0|
|     1|    665|   5.0|
|     1|    899|   3.5|
|     1|   1088|   4.0|
|     1|   1175|   3.5|
|     1|   1217|   3.5|
|     1|   1237|   5.0|
|     1|   1250|   4.0|
|     1|   1260|   3.5|
|     1|   1653|   4.0|
|     1|   2011|   2.5|
|     1|   2012|   2.5|
|     1|   2068|   2.5|
|     1|   2161|   3.5|
|     1|   2351|   4.5|
|     1|   2573|   4.0|
|     1|   2632|   5.0|
|     1|   2692|   5.0|
+------+-------+------+
only showing top 20 rows



In [9]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)



In [10]:
ratings.count()

25000095

In [11]:
# while working on local computer execute this line
# since the dataset is too large it might not fit into memory
(ratings, drop) = ratings.randomSplit([0.5, 0.5])

In [12]:
(training, test) = ratings.randomSplit([0.8, 0.2])

In [13]:
print('Training set: ', training.count())
print('Test set: ', test.count())

Training set:  9999085
Test set:  2500022


### Build/Run Model

In [14]:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",coldStartStrategy="drop", implicitPrefs=False)

In [15]:
model = als.fit(training)

In [16]:
predictions = model.transform(test)

In [17]:
predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|124207|    148|   3.0| 1.4777418|
|  8249|    148|   3.0| 2.8008215|
| 33354|    148|   3.0| 2.9119759|
|  5055|    148|   3.0|  2.913327|
| 38679|    148|   3.0| 2.6689863|
| 64112|    148|   3.0| 3.5638697|
|105987|    148|   3.0|  2.344711|
| 73827|    148|   4.0|  4.564262|
| 49403|    148|   2.0|  2.291417|
| 75209|    148|   2.0|  2.050297|
| 73977|    148|   2.0| 2.5904522|
|115095|    148|   4.0|  2.963902|
| 58615|    148|   3.0| 2.4747012|
|110863|    148|   3.0| 4.3533397|
|105793|    148|   2.0| 2.9515069|
| 60999|    148|   4.0|  2.105264|
| 64415|    148|   3.0|  2.997541|
|137835|    148|   4.0| 3.3728433|
|100318|    148|   4.0| 3.1876101|
| 68169|    148|   2.0| 2.8420095|
+------+-------+------+----------+
only showing top 20 rows



### Evaluate Model

In [18]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
rmse = evaluator.evaluate(predictions)
print('Root mean squared error:', rmse)

Root mean squared error: 0.863797565625846


### Save/Load Model

In [19]:
#save the model
model.write().overwrite().save('recommender_model')

#load the saved model
# model = ALSModel.load('recommender_model')

### Testing

- Combine predictions with movie names and genres to evaluate the model

In [20]:
# create Spark DataFrame from Pandas DataFrame
moviesDF = spark.createDataFrame(movies_pdf)

In [21]:
# create tables for SQL
moviesDF.createOrReplaceTempView('movies')
predictions.createOrReplaceTempView('predictions')

In [22]:
spark.sql(
    'SELECT *\
    FROM predictions AS p INNER JOIN movies AS m ON p.movieId = m.movieId \
    WHERE p.userId IN (100,200,300)\
    ORDER BY 1' ).show(100, truncate=False)

+------+-------+------+----------+-------+-----------------------------------------------------+-------------------------------------------+
|userId|movieId|rating|prediction|movieId|title                                                |genres                                     |
+------+-------+------+----------+-------+-----------------------------------------------------+-------------------------------------------+
|100   |647    |3.0   |3.2151697 |647    |Courage Under Fire (1996)                            |Action|Crime|Drama|War                     |
|100   |766    |3.0   |4.869132  |766    |I Shot Andy Warhol (1996)                            |Drama                                      |
|100   |648    |3.0   |2.8397493 |648    |Mission: Impossible (1996)                           |Action|Adventure|Mystery|Thriller          |
|100   |1393   |4.0   |3.687466  |1393   |Jerry Maguire (1996)                                 |Drama|Romance                              |
|100   |1391 

- Recommend movies for a specific user

In [23]:
user_subset = ratings.where(ratings.userId == 300)
userRecs = model.recommendForUserSubset(user_subset, 3)

In [24]:
userRecs.show(truncate=False)

+------+---------------------------------------------------------------+
|userId|recommendations                                                |
+------+---------------------------------------------------------------+
|300   |[{182759, 15.541958}, {175169, 14.156495}, {148857, 13.793576}]|
+------+---------------------------------------------------------------+



In [25]:
movies_pdf[movies_pdf.movieId.isin(['90963','171781','153184'])]

Unnamed: 0,movieId,title,genres
17344,90963,"""Last Mountain, The (2011)""",Documentary
37687,153184,Vergeef me,(no genres listed)
46113,171781,Tank 432 (2015),Action|Horror|Thriller
