In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import numpy as np
import pandas as pd
import pyspark

In [2]:
# create entry points to spark
try:
    #stop sparkcontext if running
    sc.stop()
except:
    pass
finally:
    sc = SparkSession.builder.appName("MovieRecommendation_Project").config("spark.some.config.option", "some-value").getOrCreate()

In [3]:
rating = sc.read.csv(r"C:\Users\babam\OneDrive\Desktop\Data Science\Hands_On\rating.csv",header=True,inferSchema=True)
movies = sc.read.csv(r"C:\Users\babam\OneDrive\Desktop\Data Science\Hands_On\movie.csv",header=True,inferSchema=True)

In [5]:
movies_filename=sc.read.csv(r"C:\Users\babam\OneDrive\Desktop\Data Science\Hands_On\movie.csv",header=True,inferSchema=True)

In [6]:
rating.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: string (nullable = true)



In [7]:
rating.show(5)

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      2|   3.5|2005-04-02 23:53:47|
|     1|     29|   3.5|2005-04-02 23:31:16|
|     1|     32|   3.5|2005-04-02 23:33:39|
|     1|     47|   3.5|2005-04-02 23:32:07|
|     1|     50|   3.5|2005-04-02 23:29:40|
+------+-------+------+-------------------+
only showing top 5 rows



In [8]:
movies.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [9]:
movies.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [10]:
from pyspark.sql.types import *
#working only on movie.csv right now

movies_with_genres_df_schema =  StructType([
    StructField('ID', IntegerType()),
    StructField('title',StringType()),
    StructField('genres', StringType())
])

movies_df_schema = StructType([
    StructField('ID',IntegerType()),
    StructField('title', StringType())
]) #droping the genres. Also, we will tranform the df to iclude the year later

In [14]:
# creating the dataframes
movies_df = sc.read.format("csv") \
      .option("header", True) \
      .schema(movies_df_schema) \
      .load(r"C:\Users\babam\OneDrive\Desktop\Data Science\Hands_On\movie.csv")
movies_with_genres_df = sc.read.format("csv") \
      .option("header", True) \
      .schema(movies_with_genres_df_schema) \
      .load(r"C:\Users\babam\OneDrive\Desktop\Data Science\Hands_On\movie.csv")

In [15]:
movies_df.show(5)

+---+--------------------+
| ID|               title|
+---+--------------------+
|  1|    Toy Story (1995)|
|  2|      Jumanji (1995)|
|  3|Grumpier Old Men ...|
|  4|Waiting to Exhale...|
|  5|Father of the Bri...|
+---+--------------------+
only showing top 5 rows



In [16]:
movies_with_genres_df.show(5,truncate=False)

+---+----------------------------------+-------------------------------------------+
|ID |title                             |genres                                     |
+---+----------------------------------+-------------------------------------------+
|1  |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2  |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3  |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4  |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5  |Father of the Bride Part II (1995)|Comedy                                     |
+---+----------------------------------+-------------------------------------------+
only showing top 5 rows



In [17]:
from pyspark.sql.functions import split, regexp_extract

movies_with_year_df = movies_df.select("ID","title",regexp_extract('title',r'\((\d+)\)',1).alias('year'))
movies_with_year_df.select('year').distinct().count()

122

In [18]:
movies_with_year_df.show(5,truncate=False)

+---+----------------------------------+----+
|ID |title                             |year|
+---+----------------------------------+----+
|1  |Toy Story (1995)                  |1995|
|2  |Jumanji (1995)                    |1995|
|3  |Grumpier Old Men (1995)           |1995|
|4  |Waiting to Exhale (1995)          |1995|
|5  |Father of the Bride Part II (1995)|1995|
+---+----------------------------------+----+
only showing top 5 rows



# Now we will use the inbuilt functionaluty of Databricks for some insights

In [19]:
# from here we can look at the count and find that the maximum number of movies are produced in 2000

movies_with_year_df.groupBy('year').count().orderBy('count',ascending=False).show()

+----+-----+
|year|count|
+----+-----+
|2009| 1112|
|2012| 1022|
|2011| 1016|
|2013| 1011|
|2008|  979|
|2010|  962|
|2007|  902|
|2006|  855|
|2005|  741|
|2014|  740|
|2004|  706|
|2002|  678|
|2003|  655|
|2001|  633|
|2000|  613|
|1998|  555|
|1999|  542|
|1997|  528|
|1996|  509|
|1995|  474|
+----+-----+
only showing top 20 rows



# Now lets move to Ratings

In [20]:
ratings_df_schema = StructType([
    StructField("userId",IntegerType()),
    StructField('movieId', IntegerType()),
    StructField('rating', DoubleType())
])#we are droping the line Time Stamp Column

In [23]:
ratings_df = movies_with_genres_df = sc.read.format("csv") \
      .option("header", True) \
      .schema(ratings_df_schema) \
      .load(r"C:\Users\babam\OneDrive\Desktop\Data Science\Hands_On\rating.csv")

In [24]:
ratings_df.show(5)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      2|   3.5|
|     1|     29|   3.5|
|     1|     32|   3.5|
|     1|     47|   3.5|
|     1|     50|   3.5|
+------+-------+------+
only showing top 5 rows



In [25]:
# we will cache both the dataframe so that we can access them quickly

ratings_df.cache()
movies_df.cache()

DataFrame[ID: int, title: string]

In [26]:
from pyspark.sql import functions as F

movie_ids_with_avg_ratings_df = ratings_df.groupBy('movieId').agg(F.count(ratings_df.rating).alias('count'),\
                                                                 F.avg(ratings_df.rating).alias('average'))
movie_ids_with_avg_ratings_df.show(5,truncate=False)

+-------+-----+------------------+
|movieId|count|average           |
+-------+-----+------------------+
|3997   |2047 |2.0703468490473864|
|1580   |35580|3.55831928049466  |
|3918   |1246 |2.918940609951846 |
|2366   |6627 |3.5492681454655197|
|3175   |13945|3.600717102904267 |
+-------+-----+------------------+
only showing top 5 rows



In [27]:
movie_ids_with_avg_ratings_df.columns

['movieId', 'count', 'average']

In [28]:
movies_names_with_avg_rating_df = movie_ids_with_avg_ratings_df.join(movies_df, F.col('movieID') == F.col('ID')).drop("ID")
movies_names_with_avg_rating_df.show(5)

+-------+-----+------------------+--------------------+
|movieId|count|           average|               title|
+-------+-----+------------------+--------------------+
|   3997| 2047|2.0703468490473864|Dungeons & Dragon...|
|   1580|35580|  3.55831928049466|Men in Black (a.k...|
|   3918| 1246| 2.918940609951846|Hellbound: Hellra...|
|   2366| 6627|3.5492681454655197|    King Kong (1933)|
|   3175|13945| 3.600717102904267| Galaxy Quest (1999)|
+-------+-----+------------------+--------------------+
only showing top 5 rows



In [29]:
from pyspark.sql.functions import col,isnan,when,count

movies_names_with_avg_rating_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in movies_names_with_avg_rating_df.columns]).show(10)

+-------+-----+-------+-----+
|movieId|count|average|title|
+-------+-----+-------+-----+
|      0|    0|      0|    0|
+-------+-----+-------+-----+



In [30]:
# so let us see the global popularity

movies_with_500_ratings_or_more = movies_names_with_avg_rating_df.filter(movies_names_with_avg_rating_df['count'] \
                                                                        >= 500).orderBy('average', ascending=False)
movies_with_500_ratings_or_more.show(truncate=False)

+-------+-----+------------------+---------------------------------------------------------------------------+
|movieId|count|average           |title                                                                      |
+-------+-----+------------------+---------------------------------------------------------------------------+
|318    |63366|4.446990499637029 |Shawshank Redemption, The (1994)                                           |
|858    |41355|4.364732196832306 |Godfather, The (1972)                                                      |
|50     |47006|4.334372207803259 |Usual Suspects, The (1995)                                                 |
|527    |50054|4.310175010988133 |Schindler's List (1993)                                                    |
|1221   |27398|4.275640557704942 |Godfather: Part II, The (1974)                                             |
|2019   |11611|4.2741796572216   |Seven Samurai (Shichinin no samurai) (1954)                                |
|

In [31]:
movies_with_500_ratings_or_more.withColumn("average", movies_with_500_ratings_or_more["average"].cast(IntegerType())).show(6)

+-------+-----+-------+--------------------+
|movieId|count|average|               title|
+-------+-----+-------+--------------------+
|    318|63366|      4|Shawshank Redempt...|
|    858|41355|      4|Godfather, The (1...|
|     50|47006|      4|Usual Suspects, T...|
|    527|50054|      4|Schindler's List ...|
|   1221|27398|      4|Godfather: Part I...|
|   2019|11611|      4|Seven Samurai (Sh...|
+-------+-----+-------+--------------------+
only showing top 6 rows



# Splitting in Train, Test and Validation dataset

As with all the Machine Learning Algorithms in practice we have to tune parameters and then accuracy. For this we will split the data into 3 parts Train, Test(Checking the final accuracy) and Validation(optimizing hyperparameters) data.

In [32]:
seed = 4
(split_60_df, split_a_20_df, split_b_20_df) = ratings_df.randomSplit([0.6,0.2,0.2],seed)

training_df = split_60_df.cache()
validation_df =split_a_20_df.cache()
test_df = split_b_20_df.cache()

print('Training: {0}, validation: {1}, test: {2}\n'.format(training_df.count(),validation_df.count(),test_df.count()))

training_df.show(5)
validation_df.show(5)
test_df.show()

Training: 11999231, validation: 4000853, test: 4000179

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|     29|   3.5|
|     1|     32|   3.5|
|     1|     47|   3.5|
|     1|     50|   3.5|
|     1|    112|   3.5|
+------+-------+------+
only showing top 5 rows

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|    260|   4.0|
|     1|    318|   4.0|
|     1|    541|   4.0|
|     1|    589|   3.5|
|     1|   1036|   4.0|
+------+-------+------+
only showing top 5 rows

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      2|   3.5|
|     1|    223|   4.0|
|     1|    367|   3.5|
|     1|    924|   3.5|
|     1|   1080|   3.5|
|     1|   1090|   4.0|
|     1|   1200|   4.0|
|     1|   1201|   3.0|
|     1|   1208|   3.5|
|     1|   1240|   4.0|
|     1|   1259|   4.0|
|     1|   1262|   3.5|
|     1|   1374|   4.0|
|     1|   1525|   3.0|
|     1|   1848|   3.5|
|     1|   2140|   4.0|
|     

In [33]:
from pyspark.ml.recommendation import ALS

als = ALS()

als.setPredictionCol("prediction").setMaxIter(5) \
.setSeed(seed).setRegParam(0.1).setUserCol('userId')\
.setItemCol('movieId').setRatingCol('rating').setRank(8) # we got rank 8 as optimal

my_ratings_model = als.fit(training_df)

In [35]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col

#Create an RMSE evaluator using the label and predicted columns
#it will essentially calculate the rmse score based on these columns

reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")
my_predict_df = my_ratings_model.transform(test_df)

# Remove NaN values from prediction
predicted_test_my_ratings_df = my_predict_df.filter(my_predict_df.prediction != float('nan'))

# Run the previously create RMSE evaluator, reg_eval, on the predicted_test_my_ratings_df DataFrame
test_RMSE_my_ratings = reg_eval.evaluate(predicted_test_my_ratings_df)
print('The model had a RMSE on the test set of {0}'.format(test_RMSE_my_ratings))
#ll = predicted_test_my_ratings_df.filer(col('userId') == uid)


The model had a RMSE on the test set of 0.8149067145486095


In [52]:
user_input = int(input())
ll = predicted_test_my_ratings_df.filter(col('userId') == user_input)
ll.show()

1
+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|     1|      2|   3.5| 3.3235312|
|     1|    223|   4.0|  3.704118|
|     1|    367|   3.5|  3.294037|
|     1|    924|   3.5| 3.5627913|
|     1|   1080|   3.5| 3.8593206|
|     1|   1090|   4.0| 3.6929808|
|     1|   1200|   4.0| 3.9582977|
|     1|   1201|   3.0|  3.909423|
|     1|   1208|   3.5| 3.6677313|
|     1|   1240|   4.0| 3.8941927|
|     1|   1259|   4.0| 3.8882084|
|     1|   1262|   3.5| 3.9833024|
|     1|   1374|   4.0| 3.7711673|
|     1|   1525|   3.0|  2.838419|
|     1|   1848|   3.5| 3.1693645|
|     1|   2140|   4.0| 3.7380056|
|     1|   2173|   4.0|  3.417886|
|     1|   2716|   3.5| 3.8197331|
|     1|   2761|   3.0|  3.768232|
|     1|   2762|   4.0| 3.9792001|
+------+-------+------+----------+
only showing top 20 rows



In [54]:
ll.sort('rating', ascending=True).limit(10).show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|     1|   1525|   3.0|  2.838419|
|     1|   1201|   3.0|  3.909423|
|     1|   2761|   3.0|  3.768232|
|     1|   4915|   3.0| 3.2821014|
|     1|   7449|   3.5| 2.7528682|
|     1|      2|   3.5| 3.3235312|
|     1|   3476|   3.5| 3.5421002|
|     1|   1208|   3.5| 3.6677313|
|     1|   2716|   3.5| 3.8197331|
|     1|   1848|   3.5| 3.1693645|
+------+-------+------+----------+



In [89]:
predicted_test_my_ratings_df.join(movies_df,F.col('movieID') == F.col('ID')).drop("ID").orderBy('movieId', ascending=False) \
.distinct().show(10)

+------+-------+------+----------+--------------------+
|userId|movieId|rating|prediction|               title|
+------+-------+------+----------+--------------------+
| 41169|   2338|   0.5|0.26039073|I Still Know What...|
|  2974|  42721|   0.5|0.66575694|   BloodRayne (2005)|
| 26655|   3593|   2.0|0.75884736|Battlefield Earth...|
| 12710|  31698|   1.5| 0.8623145|Son of the Mask (...|
|  7388|   4255|   1.0| 0.9529568|Freddy Got Finger...|
| 31302|   3962|   1.0|0.99716854|  Ghoulies II (1987)|
| 46072|  92719|   0.5| 1.0189399|Tim and Eric's Bi...|
| 45815|   2385|   1.0| 1.0475426|   Home Fries (1998)|
| 16050|   1839|   0.5| 1.0862435|     My Giant (1998)|
| 37253|   1192|   0.5| 1.0993501|Paris Is Burning ...|
+------+-------+------+----------+--------------------+
only showing top 10 rows



In [91]:
#my_predict_df.join(movies_df,F.col('movieID') == F.col('ID')).select('title','rating','movieId').orderBy('title','rating',ascending= False).distinct().show(10)
my_predict_df.join(movies_df,F.col('movieID') == F.col('ID')).drop("ID").sort('movieId','rating',ascending= False).distinct().show(10)

+------+-------+------+----------+--------------------+
|userId|movieId|rating|prediction|               title|
+------+-------+------+----------+--------------------+
| 41169|   2338|   0.5|0.26039073|I Still Know What...|
|  2974|  42721|   0.5|0.66575694|   BloodRayne (2005)|
| 26655|   3593|   2.0|0.75884736|Battlefield Earth...|
| 12710|  31698|   1.5| 0.8623145|Son of the Mask (...|
|  7388|   4255|   1.0| 0.9529568|Freddy Got Finger...|
| 31302|   3962|   1.0|0.99716854|  Ghoulies II (1987)|
| 46072|  92719|   0.5| 1.0189399|Tim and Eric's Bi...|
| 45815|   2385|   1.0| 1.0475426|   Home Fries (1998)|
| 16050|   1839|   0.5| 1.0862435|     My Giant (1998)|
| 37253|   1192|   0.5| 1.0993501|Paris Is Burning ...|
+------+-------+------+----------+--------------------+
only showing top 10 rows

