In [12]:
#!pip install pyspark

In [112]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext                                            #import to libraries
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import udf,col,when
import numpy as np
from IPython.display import Image
from IPython.display import display
from IPython.display import clear_output

In [113]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("moive analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()                                                              #Evaluating to Spark session

### DATA ANALYSIS AND EXPLORATION

In [114]:
movies_df = spark.read.load("movies.txt", format='csv', header = True)
ratings_df = spark.read.load("ratings.txt", format='csv', header = True)        #Taking tables 
links_df = spark.read.load("links.txt", format='csv', header = True)
tags_df = spark.read.load("tags.txt", format='csv', header = True)

In [115]:
print(type(movies_df))                                                          #movies table row count
movies_df.count()

<class 'pyspark.sql.dataframe.DataFrame'>


8570

In [116]:
movies_df.show(5)                                                               #movies table first 5 rows
movies_df.createOrReplaceTempView("movies_df")
display (spark.sql("SELECT * FROM movies_df limit 5"))

+-------+--------------------+----+--------------------+
|movieId|               title|year|              genres|
+-------+--------------------+----+--------------------+
|      1|           Toy Story|1995|Adventure|Animati...|
|      2|             Jumanji|1995|Adventure|Childre...|
|      3|    Grumpier Old Men|1995|      Comedy|Romance|
|      4|   Waiting to Exhale|1995|Comedy|Drama|Romance|
|      5|Father of the Bri...|1995|              Comedy|
+-------+--------------------+----+--------------------+
only showing top 5 rows



DataFrame[movieId: string, title: string, year: string, genres: string]

In [19]:
ratings_df.show(5)                                                              #ratings table for 5 rows
ratings_df.createOrReplaceTempView("ratings_df")
display (spark.sql("SELECT * FROM ratings_df limit 5"))

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      6|   2.0|980730861|
|     1|     22|   3.0|980731380|
|     1|     32|   2.0|980731926|
|     1|     50|   5.0|980732037|
|     1|    110|   4.0|980730408|
+------+-------+------+---------+
only showing top 5 rows



DataFrame[userId: string, movieId: string, rating: string, timestamp: string]

In [20]:
links_df.show(5)
links_df.createOrReplaceTempView("links_df")                                    #links table first 5 rows
display (spark.sql("SELECT * FROM links_df limit 5"))

+-------+-------+------+
|movieId| imdbId|tmdbId|
+-------+-------+------+
|      1|0114709|   862|
|      2|0113497|  8844|
|      3|0113228| 15602|
|      4|0114885| 31357|
|      5|0113041| 11862|
+-------+-------+------+
only showing top 5 rows



DataFrame[movieId: string, imdbId: string, tmdbId: string]

In [120]:
tmp1 = ratings_df.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings_df.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

For the users that rated movies and the movies that were rated:
Minimum number of ratings per user is 20
Minimum number of ratings per movie is 1


In [22]:
tmp1 = sum(ratings_df.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings_df.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

2808 out of 8552 movies are rated by only one user


In [121]:
sc = spark.sparkContext
sqlContext=SQLContext(sc)



In [125]:
ratings_df=spark.read.csv("ratings.txt",inferSchema=True,header=True) #columns value types in ratings table
ratings_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [126]:
ratings_df.show()                           #taking ratings table

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      6|   2.0|980730861|
|     1|     22|   3.0|980731380|
|     1|     32|   2.0|980731926|
|     1|     50|   5.0|980732037|
|     1|    110|   4.0|980730408|
|     1|    164|   3.0|980731766|
|     1|    198|   3.0|980731282|
|     1|    260|   5.0|980730769|
|     1|    296|   4.0|980731208|
|     1|    303|   3.0|980732235|
|     1|    318|   3.0|980731417|
|     1|    350|   3.0|980731745|
|     1|    366|   2.0|980731621|
|     1|    367|   4.0|980731380|
|     1|    431|   2.0|980731312|
|     1|    432|   2.0|980732235|
|     1|    451|   1.0|980731789|
|     1|    457|   4.0|980730816|
|     1|    474|   3.0|980730816|
|     1|    480|   4.0|980731903|
+------+-------+------+---------+
only showing top 20 rows



In [127]:
movies_df=spark.read.csv('movies.txt',inferSchema=True,header=True)  #columns value types 
movies_df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- year: string (nullable = true)
 |-- genres: string (nullable = true)



In [128]:
movies_df.show()

+-------+--------------------+----+--------------------+
|movieId|               title|year|              genres|
+-------+--------------------+----+--------------------+
|      1|           Toy Story|1995|Adventure|Animati...|
|      2|             Jumanji|1995|Adventure|Childre...|
|      3|    Grumpier Old Men|1995|      Comedy|Romance|
|      4|   Waiting to Exhale|1995|Comedy|Drama|Romance|
|      5|Father of the Bri...|1995|              Comedy|
|      6|                Heat|1995|Action|Crime|Thri...|
|      7|             Sabrina|1995|      Comedy|Romance|
|      8|        Tom and Huck|1995|  Adventure|Children|
|      9|        Sudden Death|1995|              Action|
|     10|           GoldenEye|1995|Action|Adventure|...|
|     11|American Presiden...|1995|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|1995|       Comedy|Horror|
|     13|               Balto|1995|Adventure|Animati...|
|     14|               Nixon|1995|               Drama|
|     15|    Cutthroat Island|1

In [131]:
links_df=spark.read.csv('links.txt',inferSchema=True,header=True)
links_df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- imdbId: integer (nullable = true)
 |-- tmdbId: integer (nullable = true)



DATA TRAIN TEST SPLIT

In [132]:
training_df,validation_df = ratings_df.randomSplit([0.8,0.2])  #split to data for train and test

In [133]:
iterations=10                                                 #model parameters 
regularization_parameter=0.1
rank=4
error=[]
err=0

ALTERNATING LEAST SQUARES RECOMMENDED MODEL

In [134]:
#I used ALS recommended model for my data Alternating Least squares method
als = ALS(maxIter=iterations,regParam=regularization_parameter,rank=5,userCol="userId",itemCol="movieId",ratingCol="rating")

model = als.fit(training_df)                         #training data trained by als model

predictions = model.transform(validation_df)         #and tested model with test data

new_predictions = predictions.filter(col('prediction')!=np.nan)

evaluator = RegressionEvaluator(metricName="rmse",labelCol="rating",predictionCol="prediction") 
                                                      #we look root mean square error for regression model 
rmse = evaluator.evaluate(new_predictions)

print("Root Mean Square Error= "+str(rmse))           #taking perfect  RMSE scores 

Root Mean Square Error= 0.9390473765159149


In [135]:
for rank in range(4,10):
    als = ALS(maxIter=iterations,regParam=regularization_parameter,rank=rank,userCol="userId",itemCol="movieId",ratingCol="rating")
    model = als.fit(training_df)
    predictions = model.transform(validation_df)
    new_predictions = predictions.filter(col('prediction')!=np.nan)
    evaluator = RegressionEvaluator(metricName="rmse",labelCol="rating",predictionCol="prediction")
    rmse = evaluator.evaluate(new_predictions)
    print("Root Mean Square Error= "+str(rmse))               

Root Mean Square Error= 0.9372678988760661
Root Mean Square Error= 0.9390473765159146
Root Mean Square Error= 0.9388196042742588
Root Mean Square Error= 0.9439639521307659
Root Mean Square Error= 0.9401075667945087
Root Mean Square Error= 0.9419780113792817


In [153]:
predictions.show(10)

+------+-------+------+---------+----------+
|userId|movieId|rating|timestamp|prediction|
+------+-------+------+---------+----------+
|     1|    858|   5.0|980730742|  3.747875|
|     1|    451|   1.0|980731789|       NaN|
|     1|   1201|   4.0|980730742|  3.290426|
|     1|   1034|   3.0|980731208| 2.7763813|
|     1|     22|   3.0|980731380| 3.1554518|
|     1|   1197|   4.0|980730769| 3.8877766|
|     1|    590|   2.0|980732165|   3.03294|
|     1|    457|   4.0|980730816| 3.7875729|
|     1|    608|   5.0|980732037|  3.138443|
|     1|    996|   2.0|980732235| 2.7168202|
+------+-------+------+---------+----------+
only showing top 10 rows



In [33]:
from pyspark.ml.tuning import *

In [136]:
from pyspark.ml.tuning import ParamGridBuilder
#Grid Search and cross validation for our model  
als1 = ALS(maxIter=iterations,regParam=regularization_parameter,rank=rank,userCol="userId",itemCol="movieId",ratingCol="rating")
paramGrid = ParamGridBuilder()\
.addGrid(als1.regParam,[0.1,0.01,0.18])\
.addGrid(als1.rank,range(4,10))\
.build()
evaluator=RegressionEvaluator(metricName="rmse",labelCol="rating",predictionCol="prediction")
crossval=CrossValidator(estimator=als1,estimatorParamMaps=paramGrid,evaluator=evaluator,numFolds=5)
cvModel=crossval.fit(training_df)

In [147]:
CV_Pred = cvModel.transform(validation_df)
new_prediction = CV_Pred.filter(col('prediction')!=np.nan)

evaluator = RegressionEvaluator(metricName="r2",labelCol="rating",predictionCol="prediction")
evaluator_rmse = RegressionEvaluator(metricName="rmse",labelCol="rating",predictionCol="prediction")

rsquare = evaluator_rmse.evaluate(new_prediction)
rmse = evaluator.evaluate(new_prediction)
print("r2= "+str(rsquare)) #I took r2 and rmse scores   
print("rmse= "+str(rmse))  #rmse score is perfect but r2 is explane:This data not 
                           #suitable for prediction but we are working on recommendation staff so it is not important

r2= 0.9372678988760661
rmse= 0.21520010750095808


In [152]:
CV_Pred.show(10)

+------+-------+------+---------+----------+
|userId|movieId|rating|timestamp|prediction|
+------+-------+------+---------+----------+
|     1|    858|   5.0|980730742|  3.779253|
|     1|    451|   1.0|980731789|       NaN|
|     1|   1201|   4.0|980730742| 3.2785628|
|     1|   1034|   3.0|980731208|  3.267778|
|     1|     22|   3.0|980731380| 3.0015154|
|     1|   1197|   4.0|980730769|  3.917275|
|     1|    590|   2.0|980732165| 3.2255154|
|     1|    457|   4.0|980730816| 3.8312576|
|     1|    608|   5.0|980732037| 3.2846289|
|     1|    996|   2.0|980732235| 2.8194199|
+------+-------+------+---------+----------+
only showing top 10 rows



In [154]:
predictions.join(movies_df,"movieId").select("userId","title","genres","prediction").show(15)

+------+--------------------+--------------------+----------+
|userId|               title|              genres|prediction|
+------+--------------------+--------------------+----------+
|    31|American Tail: Fi...|Adventure|Animati...| 2.1850998|
|    31|Land Before Time,...|Adventure|Animati...| 2.4595604|
|   516|Devil's Advocate,...|Drama|Mystery|Thr...| 2.8890162|
|   516|                  10|      Comedy|Romance| 2.1996143|
|    85|   American Splendor|        Comedy|Drama|  4.577711|
|    53|        Galaxy Quest|Adventure|Comedy|...| 3.1859865|
|   481|Men in Black (a.k...|Action|Comedy|Sci-Fi|  3.894961|
|   633|Men in Black (a.k...|Action|Comedy|Sci-Fi| 2.4802399|
|   597|       Out of Africa|       Drama|Romance|   4.02375|
|   155|Hellbound: Hellra...|              Horror|  3.100603|
|   193|Hudsucker Proxy, The|              Comedy|   4.20068|
|   126|       Dirty Dancing|Drama|Musical|Rom...| 3.7039447|
|   183|Men in Black (a.k...|Action|Comedy|Sci-Fi| 3.3027043|
|   210|

In [155]:
for_one_user = predictions.filter(col("userId")==599).join(movies_df,"movieId").join(links_df,"movieId").select("userId","title","genres","tmdbId","prediction")
for_one_user.show(5)

+------+--------------------+--------------------+------+----------+
|userId|               title|              genres|tmdbId|prediction|
+------+--------------------+--------------------+------+----------+
|   599|        Strange Days|Action|Crime|Dram...|   281| 3.7138426|
|   599|Clear and Present...|Action|Crime|Dram...|  9331|  3.656493|
|   599|        True Romance|      Crime|Thriller|   319| 4.0647445|
|   599|              Eraser|Action|Drama|Thri...|  9268| 2.8951352|
|   599|    Army of Darkness|Action|Adventure|...|   766|  4.067372|
+------+--------------------+--------------------+------+----------+
only showing top 5 rows



In [156]:
import webbrowser 
link="https://www.themoviedb.org/movie/"           #I get movie titles
for movie in for_one_user.take(5):
    movieURL=link+str(movie.tmdbId)
    print(movie.title)
    webbrowser.open(movieURL)

Replacement Killers, The
Army of Darkness
Ronin
True Romance
Eraser


In [157]:
userRecommends=model.recommendForAllUsers(5)
movieRecommends=model.recommendForAllItems(5)



In [158]:
userRecommends.printSchema()

root
 |-- userId: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- movieId: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [159]:
userRecommends.show(5)

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[{2660, 4.57362},...|
|     3|[{31116, 5.746894...|
|     5|[{72605, 5.328123...|
|     6|[{115170, 4.89595...|
|     9|[{1216, 4.986879}...|
+------+--------------------+
only showing top 5 rows



In [41]:
userRecommends.select("userId","recommendations.movieId").show(10,False)  # I recommendations

+------+------------------------------------+
|userId|movieId                             |
+------+------------------------------------+
|1     |[5294, 2649, 7088, 260, 1428]       |
|3     |[2649, 1428, 5792, 1283, 1404]      |
|5     |[71379, 3262, 3055, 493, 5613]      |
|6     |[115170, 98243, 106841, 1238, 85056]|
|9     |[8675, 7338, 2649, 83, 678]         |
|12    |[2966, 123, 3161, 745, 53894]       |
|13    |[8675, 7338, 7088, 7070, 106841]    |
|15    |[7940, 1306, 1238, 2295, 3204]      |
|16    |[27873, 3466, 7088, 4703, 4187]     |
|17    |[963, 4675, 2398, 5264, 3784]       |
+------+------------------------------------+
only showing top 10 rows



In [51]:
movieRecommends.printSchema()

root
 |-- movieId: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- userId: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [52]:
movieRecommends.select("movieId","recommendations.userId").show(10,False)

+-------+-------------------------+
|movieId|userId                   |
+-------+-------------------------+
|1      |[241, 268, 530, 444, 691]|
|3      |[95, 452, 163, 124, 612] |
|5      |[241, 530, 133, 479, 118]|
|6      |[201, 530, 82, 306, 112] |
|9      |[452, 675, 265, 612, 133]|
|12     |[504, 675, 220, 480, 452]|
|13     |[241, 268, 530, 133, 691]|
|15     |[480, 504, 366, 249, 452]|
|16     |[241, 530, 181, 334, 201]|
|17     |[241, 334, 530, 181, 268]|
+-------+-------------------------+
only showing top 10 rows



In [53]:
users=ratings_df.select("userId").distinct().limit(5)
users.show()

+------+
|userId|
+------+
|   148|
|   463|
|   471|
|   496|
|   243|
+------+



In [54]:
userSubsetRecs = model.recommendForUserSubset(users,10)   
userSubsetRecs.show()



+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[{27873, 5.133694...|
|   463|[{115170, 5.28641...|
|   243|[{6669, 5.170791}...|
|   496|[{37857, 5.92129}...|
|   148|[{8675, 4.008882}...|
+------+--------------------+



In [55]:
userSubsetRecs.select("userId","recommendations.movieId").show(10,False) #recomendations movieIds for userIDs
                                                                         #end it is done !!!!

+------+-------------------------------------------------------------------+
|userId|movieId                                                            |
+------+-------------------------------------------------------------------+
|471   |[27873, 3466, 2810, 31193, 4187, 7088, 2083, 4703, 4873, 8477]     |
|463   |[115170, 98243, 3598, 1414, 93721, 106696, 2810, 74754, 6645, 8477]|
|243   |[6669, 27846, 2920, 26840, 6783, 6643, 8477, 53906, 5607, 26171]   |
|496   |[37857, 31193, 4275, 4228, 1713, 1934, 2184, 5942, 4777, 531]      |
|148   |[8675, 7338, 26840, 7070, 6669, 26171, 5607, 1104, 3550, 5899]     |
+------+-------------------------------------------------------------------+



In [56]:
movies=ratings_df.select("movieId").distinct().limit(5)  #movieIds for recommeds Users
movies.show()

+-------+
|movieId|
+-------+
|   1580|
|   1645|
|    471|
|   1088|
|   2142|
+-------+



In [57]:
movieSubsetRecs = model.recommendForItemSubset(movies,10)
movieSubsetRecs.select("movieId","recommendations.userId").show(10,False)



+-------+--------------------------------------------------+
|movieId|userId                                            |
+-------+--------------------------------------------------+
|1580   |[241, 530, 265, 133, 452, 444, 268, 289, 124, 217]|
|471    |[164, 185, 691, 289, 201, 111, 158, 538, 625, 458]|
|2142   |[241, 452, 25, 594, 698, 265, 95, 124, 429, 41]   |
|1645   |[452, 280, 124, 612, 241, 95, 265, 133, 217, 479] |
|1088   |[241, 268, 472, 265, 530, 133, 3, 112, 283, 82]   |
+-------+--------------------------------------------------+



In [160]:
movie_ids=[1580,3175,2366,1590]  #model tried for new users
user_ids=[543,543,543,543]
new_user_preds=sqlContext.createDataFrame(zip(movie_ids,user_ids),schema=['movieId','userId'])
new_predictions=model.transform(new_user_preds)
new_predictions.show()

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|   1580|   543|  3.381722|
|   3175|   543| 3.2734551|
|   2366|   543| 2.8578584|
|   1590|   543| 2.5245814|
+-------+------+----------+

