# Import libraries

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Recommender System").config("spark.sql.crossJoin.enabled","true").getOrCreate()

In [2]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import functions as f

# Load and verify data

In [3]:
from pyspark.sql.types import StringType, DoubleType, IntegerType, StructType, StructField
schema = StructType([StructField('userId', IntegerType(), True),
                     StructField('movieId', IntegerType(), True),
                     StructField('rating', IntegerType(), True),
                     StructField('timestamp', DoubleType(), True)])


In [4]:
data = spark.read.csv('ratings.dat',sep = '::', header = False, schema = schema)
# Data taken from https://grouplens.org/datasets/movielens/1m/

In [5]:
data.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: double (nullable = true)



In [6]:
data.head(3)

[Row(userId=1, movieId=1193, rating=5, timestamp=978300760.0),
 Row(userId=1, movieId=661, rating=3, timestamp=978302109.0),
 Row(userId=1, movieId=914, rating=3, timestamp=978301968.0)]

In [7]:
for item in data.head(1)[0]:
    print(item)

1
1193
5
978300760.0


In [8]:
data.columns

['userId', 'movieId', 'rating', 'timestamp']

In [9]:
data.describe().show()

+-------+------------------+------------------+------------------+--------------------+
|summary|            userId|           movieId|            rating|           timestamp|
+-------+------------------+------------------+------------------+--------------------+
|  count|           1000209|           1000209|           1000209|             1000209|
|   mean| 3024.512347919285|1865.5398981612843| 3.581564453029317| 9.722436954046655E8|
| stddev|1728.4126948999715|1096.0406894572482|1.1171018453732606|1.2152558939916052E7|
|    min|                 1|                 1|                 1|        9.56703932E8|
|    max|              6040|              3952|                 5|        1.04645459E9|
+-------+------------------+------------------+------------------+--------------------+



# Train Test split

In [10]:
train_data,test_data = data.randomSplit([0.7,0.3])

In [11]:
train_data.describe().show()

+-------+------------------+------------------+-----------------+--------------------+
|summary|            userId|           movieId|           rating|           timestamp|
+-------+------------------+------------------+-----------------+--------------------+
|  count|            700438|            700438|           700438|              700438|
|   mean| 3025.572510343528|1864.5546857823247|3.582501234941565| 9.722283748928485E8|
| stddev|1729.2303682458576|1096.2424422575243|1.116184544935731|1.2140714962225813E7|
|    min|                 1|                 1|                1|        9.56703932E8|
|    max|              6040|              3952|                5|        1.04645432E9|
+-------+------------------+------------------+-----------------+--------------------+



In [12]:
test_data.describe().show()

+-------+------------------+------------------+------------------+--------------------+
|summary|            userId|           movieId|            rating|           timestamp|
+-------+------------------+------------------+------------------+--------------------+
|  count|            299771|            299771|            299771|              299771|
|   mean| 3022.035196866942|1867.8419226676363|3.5793755900337256| 9.722794929589853E8|
| stddev|1726.5009734897073| 1095.567508110294|1.1192410652643803|1.2180133568646537E7|
|    min|                 1|                 1|                 1|        9.56703954E8|
|    max|              6040|              3952|                 5|        1.04645459E9|
+-------+------------------+------------------+------------------+--------------------+



# Build Model

In [13]:
recommender = ALS(maxIter = 5, regParam = 0.01, userCol='userId', itemCol='movieId', ratingCol='rating')
# recommender = ALS(maxIter = 5, regParam = 0.01, userCol='userId', itemCol='movieId', ratingCol='rating', coldStartStrategy="drop")

In [14]:
model = recommender.fit(train_data)

# Evaluate Model

In [15]:
pred_data = model.transform(test_data)

In [16]:
pred_data.show()

+------+-------+------+-------------+----------+
|userId|movieId|rating|    timestamp|prediction|
+------+-------+------+-------------+----------+
|   673|    148|     5| 9.75620824E8|  3.508952|
|  4169|    148|     3| 9.76588402E8|   2.15964|
|  3184|    148|     4| 9.68708953E8| 3.0696592|
|  2383|    148|     2| 9.74417654E8| 3.5550392|
|  3539|    148|     3| 9.66932408E8| 3.4837592|
|   482|    148|     2| 9.76219954E8| 2.9147143|
|   424|    148|     4|1.027003224E9| 2.7909567|
|  2507|    148|     4| 9.74082717E8|   3.60089|
|  4858|    463|     3| 9.63746396E8| 2.9577634|
|  3328|    463|     4| 9.67918151E8| 3.3567643|
|  4040|    463|     1| 9.65505136E8| 2.2276387|
|  4277|    463|     4| 9.65469038E8|  3.728101|
|  2051|    463|     1| 9.74663178E8|  2.348032|
|   660|    463|     3| 9.75690189E8|  2.701022|
|  5306|    463|     2|  9.6101316E8| 3.1144714|
|  3032|    463|     4| 9.70356224E8| 4.2642455|
|  3717|    463|     2| 9.67228367E8| 3.7871368|
|  5511|    463|    

In [17]:
evaluator = RegressionEvaluator(metricName='rmse',labelCol='rating',predictionCol='prediction')
print("Root-mean-square error = " + str(evaluator.evaluate(pred_data)))

Root-mean-square error = nan


In [18]:
# A NaN result is due to SPARK-14489 and because the model can't predict values for users for which there's no data. 
# A temporary workaround is to exclude rows with predicted NaN values or to replace them with a constant, for instance,
# the general mean rating. However, to map to a real business problem, the data scientist, in collaboration with the 
# business owner, must define what happens if such an event occurs. For example, you can provide no recommendation for 
# a user until that user rates a few items. Alternatively, before user rates five items, you can use a user-based recommender
# system that's based on the user's profile (that's another recommender system to develop).

# Replace predicted NaN values with the average rating and evaluate the model:

In [19]:
avgRatings = data.select('rating').groupBy().avg().first()[0]
print ('The average rating in the dataset is: {}'.format(avgRatings))

The average rating in the dataset is: 3.581564453029317


In [20]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
print ('The root mean squared error for our model is: {}'.format(evaluator.evaluate(pred_data.na.fill(avgRatings))))

The root mean squared error for our model is: 0.9072353327216


In [21]:
# Now exclude predicted NaN values and evaluate the model:

In [22]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
print ('The root mean squared error for our model is: {}'.format(evaluator.evaluate(pred_data.na.drop())))

The root mean squared error for our model is: 0.9071309268091656


# Define Movie details 

In [23]:
schema = StructType([StructField('movieId', IntegerType(), True),
                     StructField('title', StringType(), True),
                     StructField('genres', StringType(), True)])
movieDetails = spark.read.csv('movies.dat',sep = '::', header = False, schema = schema)
movieDetails.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [24]:
movieDetails.head(3)

[Row(movieId=1, title='Toy Story (1995)', genres="Animation|Children's|Comedy"),
 Row(movieId=2, title='Jumanji (1995)', genres="Adventure|Children's|Fantasy"),
 Row(movieId=3, title='Grumpier Old Men (1995)', genres='Comedy|Romance')]

In [25]:
schema = StructType([StructField('UserID', IntegerType(), True),
                     StructField('Gender', StringType(), True),
                     StructField('Age', IntegerType(), True),
                     StructField('Occupation', IntegerType(), True),
                     StructField('Zipcode', IntegerType(), True)])
occupation ={
  0:  "other", 
  1:  "academic/educator",
  2:  "artist",
  3:  "clerical/admin",
  4:  "college/grad student",
  5:  "customer service",
  6:  "doctor/health care",
  7:  "executive/managerial",
  8:  "farmer",
  9:  "homemaker",
 10:  "K-12 student",
 11:  "lawyer",
 12:  "programmer",
 13:  "retired",
 14:  "sales/marketing",
 15:  "scientist",
 16:  "self-employed",
 17:  "technician/engineer",
 18:  "tradesman/craftsman",
 19:  "unemployed",
 20:  "writer"  
    
}
userDetails = spark.read.csv('users.dat',sep = '::', header = False, schema = schema)
userDetails.printSchema()

root
 |-- UserID: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- Zipcode: integer (nullable = true)



In [26]:
userDetails.head(3)

[Row(UserID=1, Gender='F', Age=1, Occupation=10, Zipcode=48067),
 Row(UserID=2, Gender='M', Age=56, Occupation=16, Zipcode=70072),
 Row(UserID=3, Gender='M', Age=25, Occupation=15, Zipcode=55117)]

# Movie recommendations 

In [27]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
userRecs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|  1580|[[2962, 6.6843476...|
|  4900|[[2962, 8.419462]...|
|  5300|[[2962, 6.404892]...|
|   471|[[2998, 7.0608788...|
|  1591|[[1421, 6.5675516...|
|  4101|[[2892, 10.938805...|
|  1342|[[2962, 6.4749775...|
|  2122|[[3161, 6.862848]...|
|  2142|[[2185, 10.207331...|
|   463|[[1743, 7.9208336...|
|   833|[[2964, 9.368681]...|
|  5803|[[2964, 10.681781...|
|  3794|[[3349, 7.422888]...|
|  1645|[[119, 8.433569],...|
|  3175|[[1846, 6.5974584...|
|  4935|[[1000, 10.929255...|
|   496|[[3021, 10.489981...|
|  2366|[[1421, 7.181728]...|
|  2866|[[2931, 7.101722]...|
|  5156|[[2897, 6.506299]...|
+------+--------------------+
only showing top 20 rows



In [28]:
userRecsExplode = userRecs.select(userRecs.userId,f.explode(userRecs.recommendations)).orderBy(userRecs.userId)
userRecsExplode.show()

+------+-----------------+
|userId|              col|
+------+-----------------+
|     1|[2938, 6.7937236]|
|     1| [1471, 6.970809]|
|     1| [1421, 9.322677]|
|     1|[1547, 6.7079444]|
|     1| [811, 6.7112656]|
|     1|[1565, 7.2358603]|
|     1|[2893, 6.7922587]|
|     1| [1930, 8.169257]|
|     1|[2964, 8.5539255]|
|     1| [2825, 6.792793]|
|     2|[1651, 5.8887258]|
|     2|[1519, 5.6566043]|
|     2|  [572, 7.178902]|
|     2|   [131, 5.86207]|
|     2| [1725, 5.781336]|
|     2|[1715, 5.7014794]|
|     2|[3860, 6.0539355]|
|     2| [1755, 5.964891]|
|     2|[2185, 6.3663654]|
|     2| [3245, 5.884728]|
+------+-----------------+
only showing top 20 rows



In [29]:
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)
movieRecs.join(movieDetails,movieRecs.movieId==movieDetails.movieId,"left").select([movieRecs.movieId,movieDetails.title,movieDetails.genres,movieRecs.recommendations]).show()

+-------+--------------------+--------------------+--------------------+
|movieId|               title|              genres|     recommendations|
+-------+--------------------+--------------------+--------------------+
|   1580| Men in Black (1997)|Action|Adventure|...|[[1341, 5.604971]...|
|    471|Hudsucker Proxy, ...|      Comedy|Romance|[[665, 6.5844216]...|
|   1591|        Spawn (1997)|Action|Adventure|...|[[5052, 6.049133]...|
|   1342|     Candyman (1992)|              Horror|[[5052, 6.9310412...|
|   2122|Children of the C...|     Horror|Thriller|[[5642, 6.928201]...|
|   2142|American Tail: Fi...|Animation|Childre...|[[1823, 5.4887695...|
|    463|Guilty as Sin (1993)|Crime|Drama|Thriller|[[5804, 7.643492]...|
|    833|High School High ...|              Comedy|[[5642, 7.1914544...|
|   3794| Chuck & Buck (2000)|        Comedy|Drama|[[4441, 11.361889...|
|   1645|Devil's Advocate,...|Crime|Horror|Myst...|[[5642, 6.888715]...|
|   3175| Galaxy Quest (1999)|Adventure|Comedy|...|

In [30]:
movieRecsExplode = movieRecs.select(movieRecs.movieId,f.explode(movieRecs.recommendations)).orderBy(movieRecs.movieId)
movieRecsExplode.join(movieDetails,movieRecsExplode.movieId==movieDetails.movieId,"left").select([movieRecsExplode.movieId,movieDetails.title,movieDetails.genres,movieRecsExplode.col.alias('recommendation')]).show()

+-------+--------------------+--------------------+-----------------+
|movieId|               title|              genres|   recommendation|
+-------+--------------------+--------------------+-----------------+
|   1580| Men in Black (1997)|Action|Adventure|...| [1341, 5.604971]|
|   1580| Men in Black (1997)|Action|Adventure|...| [3765, 5.367282]|
|   1580| Men in Black (1997)|Action|Adventure|...|[4383, 5.3232975]|
|   1580| Men in Black (1997)|Action|Adventure|...| [4565, 5.297894]|
|   1580| Men in Black (1997)|Action|Adventure|...| [5052, 5.214266]|
|   1580| Men in Black (1997)|Action|Adventure|...|[5856, 5.2086186]|
|   1580| Men in Black (1997)|Action|Adventure|...| [2449, 5.192043]|
|   1580| Men in Black (1997)|Action|Adventure|...|[4216, 5.1595926]|
|   1580| Men in Black (1997)|Action|Adventure|...|  [790, 5.155496]|
|   1580| Men in Black (1997)|Action|Adventure|...|[2925, 5.1428547]|
|    471|Hudsucker Proxy, ...|      Comedy|Romance| [665, 6.5844216]|
|    471|Hudsucker P

In [31]:
# Generate top 10 movie recommendations for a specified set of users
singleUser = test_data.filter(test_data.userId==11).select(['movieId','userId'])
singleUser.join(movieDetails,singleUser.movieId==movieDetails.movieId,"left").select([singleUser.userId,singleUser.movieId,movieDetails.title,movieDetails.genres]).show()

+------+-------+--------------------+--------------------+
|userId|movieId|               title|              genres|
+------+-------+--------------------+--------------------+
|    11|     36|Dead Man Walking ...|               Drama|
|    11|     50|Usual Suspects, T...|      Crime|Thriller|
|    11|     88|  Black Sheep (1996)|              Comedy|
|    11|    104|Happy Gilmore (1996)|              Comedy|
|    11|    246|  Hoop Dreams (1994)|         Documentary|
|    11|    249|Immortal Beloved ...|       Drama|Romance|
|    11|    318|Shawshank Redempt...|               Drama|
|    11|    345|Adventures of Pri...|        Comedy|Drama|
|    11|    356| Forrest Gump (1994)|  Comedy|Romance|War|
|    11|    441|Dazed and Confuse...|              Comedy|
|    11|    543|So I Married an A...|Comedy|Romance|Th...|
|    11|    586|   Home Alone (1990)|   Children's|Comedy|
|    11|    708|Truth About Cats ...|      Comedy|Romance|
|    11|    788|Nutty Professor, ...|Comedy|Fantasy|Ro..

In [32]:
userSubsetRecs = model.recommendForUserSubset(singleUser, 10)
userSubsetRecs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    11|[[2962, 7.8672667...|
+------+--------------------+



In [33]:
userSubsetRecsExplode = userSubsetRecs.select(userSubsetRecs.userId,f.explode(userSubsetRecs.recommendations))
userSubsetRecsExplode.show()

+------+-----------------+
|userId|              col|
+------+-----------------+
|    11|[2962, 7.8672667]|
|    11|[3349, 7.3768806]|
|    11| [3854, 7.253388]|
|    11|[3880, 6.8059125]|
|    11| [1502, 6.143665]|
|    11|[2157, 5.9470844]|
|    11|  [771, 5.913026]|
|    11| [1585, 5.874009]|
|    11| [2892, 5.872451]|
|    11|  [1455, 5.80228]|
+------+-----------------+



In [34]:
recommendations = model.transform(singleUser)
userRecommendations= recommendations.orderBy('prediction',ascending=False)
userRecommendations.show()

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|   2959|    11|  4.832191|
|   2109|    11| 4.3804784|
|   1136|    11|   4.25353|
|     50|    11| 4.2288938|
|    318|    11| 4.1427226|
|   1288|    11|  4.142036|
|   3552|    11|  4.118287|
|   1244|    11|  4.083081|
|   1704|    11| 3.8887162|
|   2710|    11|  3.827378|
|   1265|    11| 3.7715447|
|   1682|    11|  3.733756|
|    246|    11|  3.709584|
|   2396|    11| 3.6656115|
|    543|    11| 3.6215792|
|    441|    11| 3.5810618|
|   2507|    11| 3.5496643|
|   2795|    11|  3.532816|
|    104|    11| 3.5320704|
|     36|    11|  3.507107|
+-------+------+----------+
only showing top 20 rows



In [35]:
userRecommendations.join(movieDetails,userRecommendations.movieId==movieDetails.movieId,"left").select([userRecommendations.userId,movieDetails.title,movieDetails.genres,userRecommendations.prediction]).show()

+------+--------------------+--------------------+----------+
|userId|               title|              genres|prediction|
+------+--------------------+--------------------+----------+
|    11|Groundhog Day (1993)|      Comedy|Romance| 3.7715447|
|    11|Shakespeare in Lo...|      Comedy|Romance| 3.6656115|
|    11|    Superstar (1999)|              Comedy| 2.2992592|
|    11| Benny & Joon (1993)|      Comedy|Romance| 3.3752885|
|    11|Austin Powers: Th...|              Comedy|     3.464|
|    11|When Harry Met Sa...|      Comedy|Romance| 3.4213314|
|    11|Christmas Story, ...|        Comedy|Drama| 3.4937685|
|    11|  Hoop Dreams (1994)|         Documentary|  3.709584|
|    11|    Manhattan (1979)|Comedy|Drama|Romance|  4.083081|
|    11|    Jerk, The (1979)|              Comedy| 4.3804784|
|    11|This Is Spinal Ta...|Comedy|Drama|Musical|  4.142036|
|    11|Truman Show, The ...|               Drama|  3.733756|
|    11|Vegas Vacation (1...|              Comedy| 2.3880935|
|    11|

In [36]:
# Generate top 10 user recommendations for a specified set of movies
singleMovie = test_data.filter(test_data.movieId==1).select(['movieId','userId'])
singleMovie.join(movieDetails,singleMovie.movieId==movieDetails.movieId,"left").select([singleMovie.movieId,movieDetails.title,movieDetails.genres,singleMovie.userId]).show()

+-------+----------------+--------------------+------+
|movieId|           title|              genres|userId|
+-------+----------------+--------------------+------+
|      1|Toy Story (1995)|Animation|Childre...|    10|
|      1|Toy Story (1995)|Animation|Childre...|    23|
|      1|Toy Story (1995)|Animation|Childre...|    26|
|      1|Toy Story (1995)|Animation|Childre...|    28|
|      1|Toy Story (1995)|Animation|Childre...|    36|
|      1|Toy Story (1995)|Animation|Childre...|    44|
|      1|Toy Story (1995)|Animation|Childre...|    48|
|      1|Toy Story (1995)|Animation|Childre...|    65|
|      1|Toy Story (1995)|Animation|Childre...|    68|
|      1|Toy Story (1995)|Animation|Childre...|    76|
|      1|Toy Story (1995)|Animation|Childre...|    92|
|      1|Toy Story (1995)|Animation|Childre...|    96|
|      1|Toy Story (1995)|Animation|Childre...|   114|
|      1|Toy Story (1995)|Animation|Childre...|   118|
|      1|Toy Story (1995)|Animation|Childre...|   121|
|      1|T

In [37]:
movieSubSetRecs = model.recommendForItemSubset(singleMovie, 10)
movieSubSetRecs.join(movieDetails,movieSubSetRecs.movieId==movieDetails.movieId,"left").select([movieSubSetRecs.movieId,movieDetails.title,movieDetails.genres,movieSubSetRecs.recommendations]).show()

+-------+----------------+--------------------+--------------------+
|movieId|           title|              genres|     recommendations|
+-------+----------------+--------------------+--------------------+
|      1|Toy Story (1995)|Animation|Childre...|[[1341, 6.6795297...|
+-------+----------------+--------------------+--------------------+



In [38]:
movieSubSetRecsExplode = movieSubSetRecs.select(movieSubSetRecs.movieId,f.explode(movieSubSetRecs.recommendations))
movieSubSetRecsExplode.join(movieDetails,movieSubSetRecsExplode.movieId==movieDetails.movieId,"left").select([movieSubSetRecsExplode.movieId,movieDetails.title,movieDetails.genres,movieSubSetRecsExplode.col.alias('recommendation')]).show()

+-------+----------------+--------------------+-----------------+
|movieId|           title|              genres|   recommendation|
+-------+----------------+--------------------+-----------------+
|      1|Toy Story (1995)|Animation|Childre...|[1341, 6.6795297]|
|      1|Toy Story (1995)|Animation|Childre...|[3723, 6.0799565]|
|      1|Toy Story (1995)|Animation|Childre...| [883, 6.0258837]|
|      1|Toy Story (1995)|Animation|Childre...| [2269, 5.894488]|
|      1|Toy Story (1995)|Animation|Childre...| [2490, 5.845372]|
|      1|Toy Story (1995)|Animation|Childre...| [2902, 5.772177]|
|      1|Toy Story (1995)|Animation|Childre...|[5899, 5.7563214]|
|      1|Toy Story (1995)|Animation|Childre...|[3009, 5.7245474]|
|      1|Toy Story (1995)|Animation|Childre...| [3034, 5.698413]|
|      1|Toy Story (1995)|Animation|Childre...|[4383, 5.6596065]|
+-------+----------------+--------------------+-----------------+

