# Import libraries

In [7]:
# import libraries
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Recommender System").config("spark.sql.crossJoin.enabled","true").getOrCreate()
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import functions as f

# Load and verify data

In [9]:
# load and verify data
from pyspark.sql.types import StringType, DoubleType, IntegerType, StructType, StructField
schema = StructType([StructField('userId', IntegerType(), True),
                     StructField('movieId', IntegerType(), True),
                     StructField('rating', IntegerType(), True),
                     StructField('timestamp', DoubleType(), True)])

data = spark.read.csv('ratings.dat',sep = '::', header = False, schema = schema)
# Data taken from https://grouplens.org/datasets/movielens/1m/

In [11]:
data.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: double (nullable = true)



In [12]:
data.head(3)

[Row(userId=1, movieId=1193, rating=5, timestamp=978300760.0),
 Row(userId=1, movieId=661, rating=3, timestamp=978302109.0),
 Row(userId=1, movieId=914, rating=3, timestamp=978301968.0)]

In [13]:
for item in data.head(1)[0]:
    print(item)

1
1193
5
978300760.0


In [14]:
data.columns

['userId', 'movieId', 'rating', 'timestamp']

In [15]:
data.describe().show()

23/10/01 13:56:43 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------+------------------+------------------+------------------+--------------------+
|summary|            userId|           movieId|            rating|           timestamp|
+-------+------------------+------------------+------------------+--------------------+
|  count|           1000209|           1000209|           1000209|             1000209|
|   mean| 3024.512347919285|1865.5398981612843| 3.581564453029317| 9.722436954046655E8|
| stddev|1728.4126948999715|1096.0406894572482|1.1171018453732606|1.2152558939916052E7|
|    min|                 1|                 1|                 1|        9.56703932E8|
|    max|              6040|              3952|                 5|        1.04645459E9|
+-------+------------------+------------------+------------------+--------------------+



# Train Test split

In [16]:
# test train split
train_data,test_data = data.randomSplit([0.7,0.3])

In [17]:
train_data.describe().show()

+-------+------------------+------------------+------------------+-------------------+
|summary|            userId|           movieId|            rating|          timestamp|
+-------+------------------+------------------+------------------+-------------------+
|  count|            699988|            699988|            699988|             699988|
|   mean| 3027.150701154877| 1865.975916730001|3.5819642622444956|9.722421961229407E8|
| stddev|1727.9631295847996|1095.5489410345056|1.1170462349542134|1.216796736602763E7|
|    min|                 1|                 1|                 1|       9.56703932E8|
|    max|              6040|              3952|                 5|       1.04645459E9|
+-------+------------------+------------------+------------------+-------------------+



In [18]:
test_data.describe().show()

+-------+------------------+------------------+------------------+--------------------+
|summary|            userId|           movieId|            rating|           timestamp|
+-------+------------------+------------------+------------------+--------------------+
|  count|            300221|            300221|            300221|              300221|
|   mean|3018.3608275237243| 1864.523287844621|3.5806322675628954|  9.72247191093561E8|
| stddev|1729.4476829678836|1097.1875348383253|1.1172327993377649|1.2116576434228351E7|
|    min|                 1|                 1|                 1|        9.56703954E8|
|    max|              6040|              3952|                 5|       1.046454338E9|
+-------+------------------+------------------+------------------+--------------------+



# Build Model

In [19]:
# model building
recommender = ALS(maxIter = 5, regParam = 0.01, userCol='userId', itemCol='movieId', ratingCol='rating')
model = recommender.fit(train_data)

23/10/01 13:56:48 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/10/01 13:56:48 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
23/10/01 13:56:48 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


# Evaluate Model

In [21]:
# Predition of testing data
pred_data = model.transform(test_data)
pred_data.show()

In [22]:
pred_data.show()

+------+-------+------+------------+----------+
|userId|movieId|rating|   timestamp|prediction|
+------+-------+------+------------+----------+
|     1|    588|     4|9.78824268E8| 3.8116057|
|     1|   2018|     4|9.78301777E8| 4.8739533|
|     1|   2355|     5|9.78824291E8| 3.2990558|
|     1|   2692|     4| 9.7830157E8| 3.5244827|
|     1|   2791|     4|9.78302188E8|  4.382037|
|     2|    163|     4|9.78299809E8| 2.8751786|
|     2|    165|     3|9.78300002E8| 3.1004095|
|     1|    594|     4|9.78302268E8| 4.8510346|
|     1|   2687|     3|9.78824268E8|  4.271018|
|     2|    736|     4|  9.783001E8| 3.3499422|
|     2|    648|     4|9.78299913E8| 3.3781297|
|     2|    356|     5|9.78299686E8| 4.5485916|
|     1|   1907|     4| 9.7882433E8| 4.2699113|
|     2|     95|     2|9.78300143E8| 2.6824057|
|     2|     21|     1|9.78299839E8|  3.269908|
|     2|    515|     5|9.78298542E8| 3.4440072|
|     1|    783|     4|9.78824291E8|  3.894491|
|     2|    110|     5|9.78298625E8| 4.1

                                                                                

In [23]:
evaluator = RegressionEvaluator(metricName='rmse',labelCol='rating',predictionCol='prediction')
print("Root-mean-square error = " + str(evaluator.evaluate(pred_data)))

Root-mean-square error = nan


In [24]:
# A NaN result is due to SPARK-14489 and because the model can't predict values for users for which there's no data. 
# A temporary workaround is to exclude rows with predicted NaN values or to replace them with a constant, for instance,
# the general mean rating. However, to map to a real business problem, the data scientist, in collaboration with the 
# business owner, must define what happens if such an event occurs. For example, you can provide no recommendation for 
# a user until that user rates a few items. Alternatively, before user rates five items, you can use a user-based recommender
# system that's based on the user's profile (that's another recommender system to develop).

# Replace predicted NaN values with the average rating and evaluate the model:

In [25]:
avgRatings = data.select('rating').groupBy().avg().first()[0]
print ('The average rating in the dataset is: {}'.format(avgRatings))

The average rating in the dataset is: 3.581564453029317


In [26]:
# Model Evaluation
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
print ('The root mean squared error for our model is: {}'.format(evaluator.evaluate(pred_data.na.fill(avgRatings))))
# The root mean squared error for our model is: 0.9082971185858143

The root mean squared error for our model is: 0.9082971185858143


In [27]:
# Now exclude predicted NaN values and evaluate the model:

In [28]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
print ('The root mean squared error for our model is: {}'.format(evaluator.evaluate(pred_data.na.drop())))

The root mean squared error for our model is: 0.9081952083327971


# Define Movie details 

In [29]:
# Defining Movie Details
schema = StructType([StructField('movieId', IntegerType(), True),
                     StructField('title', StringType(), True),
                     StructField('genres', StringType(), True)])
movieDetails = spark.read.csv('movies.dat',sep = '::', header = False, schema = schema)
movieDetails.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [30]:
movieDetails.head(3)

[Row(movieId=1, title='Toy Story (1995)', genres="Animation|Children's|Comedy"),
 Row(movieId=2, title='Jumanji (1995)', genres="Adventure|Children's|Fantasy"),
 Row(movieId=3, title='Grumpier Old Men (1995)', genres='Comedy|Romance')]

In [31]:
schema = StructType([StructField('UserID', IntegerType(), True),
                     StructField('Gender', StringType(), True),
                     StructField('Age', IntegerType(), True),
                     StructField('Occupation', IntegerType(), True),
                     StructField('Zipcode', IntegerType(), True)])
occupation ={
  0:  "other", 
  1:  "academic/educator",
  2:  "artist",
  3:  "clerical/admin",
  4:  "college/grad student",
  5:  "customer service",
  6:  "doctor/health care",
  7:  "executive/managerial",
  8:  "farmer",
  9:  "homemaker",
 10:  "K-12 student",
 11:  "lawyer",
 12:  "programmer",
 13:  "retired",
 14:  "sales/marketing",
 15:  "scientist",
 16:  "self-employed",
 17:  "technician/engineer",
 18:  "tradesman/craftsman",
 19:  "unemployed",
 20:  "writer"  
    
}
userDetails = spark.read.csv('users.dat',sep = '::', header = False, schema = schema)
userDetails.printSchema()
userDetails.head(3)

root
 |-- UserID: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- Zipcode: integer (nullable = true)



In [32]:
userDetails.head(3)

[Row(UserID=1, Gender='F', Age=1, Occupation=10, Zipcode=48067),
 Row(UserID=2, Gender='M', Age=56, Occupation=16, Zipcode=70072),
 Row(UserID=3, Gender='M', Age=25, Occupation=15, Zipcode=55117)]

# Movie recommendations 

In [33]:
# Movie Recommendations
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# userRecs.show()
userRecsExplode = userRecs.select(userRecs.userId,f.explode(userRecs.recommendations)).orderBy(userRecs.userId)
userRecsExplode.show()



+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    26|[{2545, 6.608972}...|
|    27|[{526, 9.283108},...|
|    28|[{2785, 6.45993},...|
|    31|[{583, 7.960484},...|
|    34|[{2157, 7.7260346...|
|    44|[{128, 7.1540003}...|
|    53|[{793, 6.404241},...|
|    65|[{3854, 7.442017}...|
|    76|[{3636, 6.9789863...|
|    78|[{1504, 6.8341}, ...|
|    81|[{341, 9.037582},...|
|    85|[{649, 10.921098}...|
|   101|[{2388, 8.584005}...|
|   103|[{341, 8.10499}, ...|
|   108|[{2674, 9.1541395...|
|   115|[{2964, 11.108625...|
|   126|[{2128, 9.695844}...|
|   133|[{2128, 8.40657},...|
|   137|[{2512, 6.748901}...|
|   148|[{3854, 7.0310674...|
+------+--------------------+
only showing top 20 rows



                                                                                

In [34]:
userRecsExplode = userRecs.select(userRecs.userId,f.explode(userRecs.recommendations)).orderBy(userRecs.userId)
userRecsExplode.show()



+------+-----------------+
|userId|              col|
+------+-----------------+
|     1|  {297, 7.651825}|
|     1|  {2984, 7.62075}|
|     1| {1181, 7.365942}|
|     1| {583, 7.3479505}|
|     1| {860, 7.0517154}|
|     1|{2758, 7.0380554}|
|     1| {3092, 7.028389}|
|     1| {3010, 6.685078}|
|     1|{1426, 6.6842637}|
|     1|   {72, 6.668399}|
|     2| {1038, 6.575716}|
|     2|{1058, 6.5158105}|
|     2| {2209, 6.379738}|
|     2| {844, 6.1593995}|
|     2|   {128, 6.10542}|
|     2| {2175, 5.911818}|
|     2|{2897, 5.8956857}|
|     2|    {59, 5.87255}|
|     2|{3874, 5.4844913}|
|     2| {1564, 5.457382}|
+------+-----------------+
only showing top 20 rows



                                                                                

In [35]:
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)
# movieRecs.join(movieDetails,movieRecs.movieId==movieDetails.movieId,"left").select([movieRecs.movieId,movieDetails.title,movieDetails.genres,movieRecs.recommendations]).show()
movieRecsExplode = movieRecs.select(movieRecs.movieId,f.explode(movieRecs.recommendations)).orderBy(movieRecs.movieId)
movieRecsExplode.join(movieDetails,movieRecsExplode.movieId==movieDetails.movieId,"left").select([movieRecsExplode.movieId,movieDetails.title,movieDetails.genres,movieRecsExplode.col.alias('recommendation')]).show()



+-------+--------------------+--------------------+--------------------+
|movieId|               title|              genres|     recommendations|
+-------+--------------------+--------------------+--------------------+
|     12|Dracula: Dead and...|       Comedy|Horror|[{4504, 10.700329...|
|     26|      Othello (1995)|               Drama|[{917, 6.8927784}...|
|     27| Now and Then (1995)|               Drama|[{4056, 9.164587}...|
|     28|   Persuasion (1995)|             Romance|[{5320, 7.19597},...|
|     31|Dangerous Minds (...|               Drama|[{5297, 6.774862}...|
|     34|         Babe (1995)|Children's|Comedy...|[{3165, 6.19402},...|
|     44|Mortal Kombat (1995)|    Action|Adventure|[{5320, 6.6046824...|
|     53|     Lamerica (1994)|               Drama|[{1445, 9.687789}...|
|     65|     Bio-Dome (1996)|              Comedy|[{2441, 8.251137}...|
|     76|    Screamers (1995)|     Sci-Fi|Thriller|[{3165, 6.1382136...|
|     78|Crossing Guard, T...|               Drama|

                                                                                

In [36]:
movieRecsExplode = movieRecs.select(movieRecs.movieId,f.explode(movieRecs.recommendations)).orderBy(movieRecs.movieId)
movieRecsExplode.join(movieDetails,movieRecsExplode.movieId==movieDetails.movieId,"left").select([movieRecsExplode.movieId,movieDetails.title,movieDetails.genres,movieRecsExplode.col.alias('recommendation')]).show()



+-------+--------------------+-------------+-----------------+
|movieId|               title|       genres|   recommendation|
+-------+--------------------+-------------+-----------------+
|     12|Dracula: Dead and...|Comedy|Horror|{4504, 10.700329}|
|     12|Dracula: Dead and...|Comedy|Horror| {3404, 8.776536}|
|     12|Dracula: Dead and...|Comedy|Horror| {2502, 8.501911}|
|     12|Dracula: Dead and...|Comedy|Horror|  {527, 8.216371}|
|     12|Dracula: Dead and...|Comedy|Horror|{2908, 7.9429936}|
|     12|Dracula: Dead and...|Comedy|Horror|{3222, 7.6486673}|
|     12|Dracula: Dead and...|Comedy|Horror| {6027, 7.617967}|
|     12|Dracula: Dead and...|Comedy|Horror| {606, 7.5262337}|
|     12|Dracula: Dead and...|Comedy|Horror|{1310, 7.5227346}|
|     12|Dracula: Dead and...|Comedy|Horror| {4758, 7.521691}|
|     26|      Othello (1995)|        Drama| {917, 6.8927784}|
|     26|      Othello (1995)|        Drama| {5760, 6.836006}|
|     26|      Othello (1995)|        Drama| {906, 6.60

                                                                                

In [37]:
# Generate top 10 movie recommendations for a specified set of users
singleUser = test_data.filter(test_data.userId==11).select(['movieId','userId'])
singleUser.join(movieDetails,singleUser.movieId==movieDetails.movieId,"left").select([singleUser.userId,singleUser.movieId,movieDetails.title,movieDetails.genres]).show()

+------+-------+--------------------+--------------------+
|userId|movieId|               title|              genres|
+------+-------+--------------------+--------------------+
|    11|     47|Seven (Se7en) (1995)|      Crime|Thriller|
|    11|    246|  Hoop Dreams (1994)|         Documentary|
|    11|    272|Madness of King G...|               Drama|
|    11|    318|Shawshank Redempt...|               Drama|
|    11|    333|    Tommy Boy (1995)|              Comedy|
|    11|    441|Dazed and Confuse...|              Comedy|
|    11|    481|   Kalifornia (1993)|      Drama|Thriller|
|    11|    515|Remains of the Da...|               Drama|
|    11|    551|Nightmare Before ...|Children's|Comedy...|
|    11|    586|   Home Alone (1990)|   Children's|Comedy|
|    11|    764|        Heavy (1995)|       Drama|Romance|
|    11|   1089|Reservoir Dogs (1...|      Crime|Thriller|
|    11|   1188|Strictly Ballroom...|      Comedy|Romance|
|    11|   1198|Raiders of the Lo...|    Action|Adventur

In [38]:
userSubsetRecs = model.recommendForUserSubset(singleUser, 10)
userSubsetRecs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    11|[{2157, 9.082993}...|
+------+--------------------+



In [39]:
userSubsetRecsExplode = userSubsetRecs.select(userSubsetRecs.userId,f.explode(userSubsetRecs.recommendations))
userSubsetRecsExplode.show()

+------+-----------------+
|userId|              col|
+------+-----------------+
|    11| {2157, 9.082993}|
|    11|  {793, 7.806675}|
|    11|{1539, 7.0899663}|
|    11| {2892, 7.001297}|
|    11|  {561, 6.989329}|
|    11| {2466, 6.879111}|
|    11|{2209, 6.8694057}|
|    11| {3711, 6.842184}|
|    11| {495, 6.5796084}|
|    11|{2602, 6.4957123}|
+------+-----------------+



In [40]:
recommendations = model.transform(singleUser)
userRecommendations= recommendations.orderBy('prediction',ascending=False)
userRecommendations.show()

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|   1923|    11| 4.4603453|
|    318|    11| 4.4476805|
|   1394|    11|  4.422484|
|   1732|    11|  4.422385|
|   1213|    11|  4.354578|
|    333|    11|  4.270006|
|   3552|    11| 4.2437677|
|   1089|    11|  4.204986|
|   1704|    11| 4.1901064|
|   2918|    11| 4.1666775|
|   2329|    11| 4.0828342|
|   2804|    11| 4.0135565|
|   1244|    11| 3.9353898|
|   2706|    11| 3.8801434|
|    246|    11| 3.8608074|
|     47|    11| 3.8278868|
|    441|    11| 3.8173943|
|    551|    11| 3.7835927|
|   1198|    11| 3.7584832|
|   1259|    11|  3.693514|
+-------+------+----------+
only showing top 20 rows



In [41]:
userRecommendations.join(movieDetails,userRecommendations.movieId==movieDetails.movieId,"left").select([userRecommendations.userId,movieDetails.title,movieDetails.genres,userRecommendations.prediction]).show()

+------+--------------------+--------------------+----------+
|userId|               title|              genres|prediction|
+------+--------------------+--------------------+----------+
|    11|Groundhog Day (1993)|      Comedy|Romance| 3.6058524|
|    11|   Kalifornia (1993)|      Drama|Thriller| 2.8247309|
|    11|Raiders of the Lo...|    Action|Adventure| 3.7584832|
|    11|  Stand by Me (1986)|Adventure|Comedy|...|  3.693514|
|    11|    Tommy Boy (1995)|              Comedy|  4.270006|
|    11|  Hoop Dreams (1994)|         Documentary| 3.8608074|
|    11|Seven (Se7en) (1995)|      Crime|Thriller| 3.8278868|
|    11|    Manhattan (1979)|Comedy|Drama|Romance| 3.9353898|
|    11|Reservoir Dogs (1...|      Crime|Thriller|  4.204986|
|    11|Raising Arizona (...|              Comedy|  4.422484|
|    11|Deconstructing Ha...|        Comedy|Drama| 3.1456385|
|    11|        Heavy (1995)|       Drama|Romance| 1.8397661|
|    11|Shawshank Redempt...|               Drama| 4.4476805|
|    11|

In [42]:
# Generate top 10 user recommendations for a specified set of movies
singleMovie = test_data.filter(test_data.movieId==1).select(['movieId','userId'])
singleMovie.join(movieDetails,singleMovie.movieId==movieDetails.movieId,"left").select([singleMovie.movieId,movieDetails.title,movieDetails.genres,singleMovie.userId]).show()

+-------+----------------+--------------------+------+
|movieId|           title|              genres|userId|
+-------+----------------+--------------------+------+
|      1|Toy Story (1995)|Animation|Childre...|     6|
|      1|Toy Story (1995)|Animation|Childre...|    28|
|      1|Toy Story (1995)|Animation|Childre...|    44|
|      1|Toy Story (1995)|Animation|Childre...|    45|
|      1|Toy Story (1995)|Animation|Childre...|    51|
|      1|Toy Story (1995)|Animation|Childre...|    60|
|      1|Toy Story (1995)|Animation|Childre...|    68|
|      1|Toy Story (1995)|Animation|Childre...|    73|
|      1|Toy Story (1995)|Animation|Childre...|    76|
|      1|Toy Story (1995)|Animation|Childre...|    78|
|      1|Toy Story (1995)|Animation|Childre...|    99|
|      1|Toy Story (1995)|Animation|Childre...|   112|
|      1|Toy Story (1995)|Animation|Childre...|   121|
|      1|Toy Story (1995)|Animation|Childre...|   123|
|      1|Toy Story (1995)|Animation|Childre...|   134|
|      1|T

In [43]:
movieSubSetRecs = model.recommendForItemSubset(singleMovie, 10)
movieSubSetRecs.join(movieDetails,movieSubSetRecs.movieId==movieDetails.movieId,"left").select([movieSubSetRecs.movieId,movieDetails.title,movieDetails.genres,movieSubSetRecs.recommendations]).show()

+-------+----------------+--------------------+--------------------+
|movieId|           title|              genres|     recommendations|
+-------+----------------+--------------------+--------------------+
|      1|Toy Story (1995)|Animation|Childre...|[{1341, 6.233338}...|
+-------+----------------+--------------------+--------------------+



In [44]:
movieSubSetRecsExplode = movieSubSetRecs.select(movieSubSetRecs.movieId,f.explode(movieSubSetRecs.recommendations))
movieSubSetRecsExplode.join(movieDetails,movieSubSetRecsExplode.movieId==movieDetails.movieId,"left").select([movieSubSetRecsExplode.movieId,movieDetails.title,movieDetails.genres,movieSubSetRecsExplode.col.alias('recommendation')]).show()

+-------+----------------+--------------------+-----------------+
|movieId|           title|              genres|   recommendation|
+-------+----------------+--------------------+-----------------+
|      1|Toy Story (1995)|Animation|Childre...| {1341, 6.233338}|
|      1|Toy Story (1995)|Animation|Childre...|{2432, 5.9119587}|
|      1|Toy Story (1995)|Animation|Childre...| {2867, 5.817516}|
|      1|Toy Story (1995)|Animation|Childre...|  {535, 5.653746}|
|      1|Toy Story (1995)|Animation|Childre...|{2431, 5.5299263}|
|      1|Toy Story (1995)|Animation|Childre...|{1081, 5.4990344}|
|      1|Toy Story (1995)|Animation|Childre...|{4672, 5.4836683}|
|      1|Toy Story (1995)|Animation|Childre...| {1620, 5.482055}|
|      1|Toy Story (1995)|Animation|Childre...|{5072, 5.4706435}|
|      1|Toy Story (1995)|Animation|Childre...| {665, 5.4675655}|
+-------+----------------+--------------------+-----------------+

