# Import Libraries

In [37]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
import pyspark
import numpy as np
from pyspark.sql.functions import col, explode

In [38]:
ratings_df=pd.read_csv("data/ratings.csv")

In [39]:
movies_df=pd.read_csv("data/updated_movies.csv")

In [40]:
samples_movie_df=movies_df[movies_df["genres"]!="UnKnown"]

In [41]:
samples_movie_df=samples_movie_df[((samples_movie_df['avg_user_rating']>=4)& 
                                   (samples_movie_df['year']>=1980))]


In [42]:
sample_ratings_df=ratings_df.merge(samples_movie_df,on="movieId")

In [43]:
sample_ratings_df.count()

userId             2506487
movieId            2506487
rating             2506487
timestamp          2506487
title              2506487
genres             2506487
avg_user_rating    2506487
year               2506487
comb               2506487
dtype: int64

In [44]:
sample_ratings_df.to_csv("data/sample_ratings_df.csv")

# Initiate Spark Session

In [45]:
scSpark = SparkSession \
        .builder \
        .master("local[*]") \
        .config("spark.driver.memory", "6g") \
        .getOrCreate()


In [46]:
# Load data into spark dataframe
ratings = scSpark.read\
            .option("inferSchema", "true")\
            .csv("data/sample_ratings_df.csv", header=True, sep=",")

In [47]:
# Check dataframe schema
ratings.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- avg_user_rating: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- comb: string (nullable = true)



In [48]:
#Check for first 20 records
ratings.show(20,False)

+---+------+-------+------+----------+-------------------+---------------------------+-----------------+----+---------------------------------------+
|_c0|userId|movieId|rating|timestamp |title              |genres                     |avg_user_rating  |year|comb                                   |
+---+------+-------+------+----------+-------------------+---------------------------+-----------------+----+---------------------------------------+
|0  |1     |296    |5.0   |1147880044|Pulp Fiction (1994)|Comedy Crime Drama Thriller|4.188912039361382|1994|Comedy Crime Drama Thriller 4.1889 1994|
|1  |3     |296    |5.0   |1439474476|Pulp Fiction (1994)|Comedy Crime Drama Thriller|4.188912039361382|1994|Comedy Crime Drama Thriller 4.1889 1994|
|2  |4     |296    |4.0   |1573938898|Pulp Fiction (1994)|Comedy Crime Drama Thriller|4.188912039361382|1994|Comedy Crime Drama Thriller 4.1889 1994|
|3  |5     |296    |4.0   |830786155 |Pulp Fiction (1994)|Comedy Crime Drama Thriller|4.188912039361

In [49]:
#drop timestamp
ratings = ratings.drop(*['timestamp','avg_user_rating','year','comb'])

In [50]:
ratings.show()

+---+------+-------+------+-------------------+--------------------+
|_c0|userId|movieId|rating|              title|              genres|
+---+------+-------+------+-------------------+--------------------+
|  0|     1|    296|   5.0|Pulp Fiction (1994)|Comedy Crime Dram...|
|  1|     3|    296|   5.0|Pulp Fiction (1994)|Comedy Crime Dram...|
|  2|     4|    296|   4.0|Pulp Fiction (1994)|Comedy Crime Dram...|
|  3|     5|    296|   4.0|Pulp Fiction (1994)|Comedy Crime Dram...|
|  4|     7|    296|   4.0|Pulp Fiction (1994)|Comedy Crime Dram...|
|  5|     8|    296|   5.0|Pulp Fiction (1994)|Comedy Crime Dram...|
|  6|    10|    296|   4.5|Pulp Fiction (1994)|Comedy Crime Dram...|
|  7|    12|    296|   5.0|Pulp Fiction (1994)|Comedy Crime Dram...|
|  8|    13|    296|   5.0|Pulp Fiction (1994)|Comedy Crime Dram...|
|  9|    14|    296|   5.0|Pulp Fiction (1994)|Comedy Crime Dram...|
| 10|    15|    296|   5.0|Pulp Fiction (1994)|Comedy Crime Dram...|
| 11|    18|    296|   3.0|Pulp Fi

In [51]:
#Checking the number of records in dataframe
ratings.count()

2506487

In [53]:
#Reducing the size of dataset initially to train model
sample_ratings_df = ratings.sample(fraction=0.02, seed=42)
sample_ratings_df.count()

Py4JJavaError: An error occurred while calling o17367.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 4419.0 failed 1 times, most recent failure: Lost task 7.0 in stage 4419.0 (TID 50411, DESKTOP-LEV8C79, executor driver): java.io.FileNotFoundException: C:\Users\vipul\AppData\Local\Temp\blockmgr-3afa0c21-1a0b-4a04-b4f9-d5950e1a0b3b\23\temp_shuffle_2abcc70f-0573-4bd3-99d2-d398b5ab2cb9 (The system cannot find the path specified)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(Unknown Source)
	at java.io.FileOutputStream.<init>(Unknown Source)
	at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:105)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:118)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:245)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:158)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:385)
	at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:2979)
	at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:2978)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:2978)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.io.FileNotFoundException: C:\Users\vipul\AppData\Local\Temp\blockmgr-3afa0c21-1a0b-4a04-b4f9-d5950e1a0b3b\23\temp_shuffle_2abcc70f-0573-4bd3-99d2-d398b5ab2cb9 (The system cannot find the path specified)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(Unknown Source)
	at java.io.FileOutputStream.<init>(Unknown Source)
	at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:105)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:118)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:245)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:158)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more


In [None]:
sample_ratings_df.show()

In [None]:
spark = pyspark.sql.SparkSession \
    .builder \
    .appName("spark_lens") \
    .master("local[2]") \
    .getOrCreate()

In [None]:
# Register temp table.
#checking for nulls 
sample_ratings_df.registerTempTable('rating')
# Query table for number of nulls.
spark.sql('''
    SELECT COUNT(rating) AS nulls
    FROM rating 
    WHERE rating=null
''').show()

# Build model

In [None]:
#Split Training and Test data
training, test = sample_ratings_df.randomSplit([0.8, 0.2])

In [None]:
training.count()

In [None]:
#Create Basic Model
als = ALS(nonnegative=True, implicitPrefs=False, coldStartStrategy="drop")\
.setMaxIter(5)\
.setRegParam(0.01)\
.setUserCol("userId")\
.setItemCol("movieId")\
.setRatingCol("rating")\


# Confirm that a model called "als" was created
type(als)

In [None]:
als.explainParams()


In [None]:
# Import the requisite items
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()
           
# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

In [None]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

# Confirm cv was built
print(cv)

In [25]:
#Fit cross validator to the 'train' dataset
model = cv.fit(training)

#Extract best model from the cv model above
alsmodel = model.bestModel

In [26]:
# Print best_model
print(type(alsmodel))

# Complete the code below to extract the ALS model parameters
print("**Best Model**")

# # Print "Rank"
print("  Rank:", alsmodel._java_obj.parent().getRank())

# Print "MaxIter"
print("  MaxIter:", alsmodel._java_obj.parent().getMaxIter())

# Print "RegParam"
print("  RegParam:", alsmodel._java_obj.parent().getRegParam())

<class 'pyspark.ml.recommendation.ALSModel'>
**Best Model**
  Rank: 10
  MaxIter: 5
  RegParam: 0.15


In [27]:
# View the predictions
test_predictions = alsmodel.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

1.6255027249120588


In [28]:
test_predictions.show()

+-------+------+-------+------+--------------------+--------------------+-----------------+----+--------------------+----------+
|    _c0|userId|movieId|rating|               title|              genres|  avg_user_rating|year|                comb|prediction|
+-------+------+-------+------+--------------------+--------------------+-----------------+----+--------------------+----------+
|1597692| 29595|  48780|   5.0|Prestige, The (2006)|Drama Mystery Sci...|4.093231050865188|2006|Drama Mystery Sci...| 2.5641563|
|1598476| 35094|  48780|   4.5|Prestige, The (2006)|Drama Mystery Sci...|4.093231050865188|2006|Drama Mystery Sci...| 2.6400785|
|1607676|100007|  48780|   4.5|Prestige, The (2006)|Drama Mystery Sci...|4.093231050865188|2006|Drama Mystery Sci...| 2.3039634|
|1613577|142494|  48780|   4.0|Prestige, The (2006)|Drama Mystery Sci...|4.093231050865188|2006|Drama Mystery Sci...| 2.0484178|
|1600828| 51589|  48780|   4.0|Prestige, The (2006)|Drama Mystery Sci...|4.093231050865188|2006|D

In [29]:
nrecoomendations=alsmodel.recommendForAllUsers(5)
nrecoomendations.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|  1342|[[171773, 5.80972...|
|  1959|[[165575, 9.34316...|
|  2142|[[165575, 5.14053...|
|  3175|[[166291, 5.61728...|
|  3794|[[132253, 8.81997...|
|  4935|[[172197, 7.07680...|
|  5300|[[165575, 6.81612...|
|  5803|[[158894, 9.78420...|
|  6336|[[165575, 6.74957...|
|  6658|[[116975, 9.19224...|
|  7240|[[171773, 5.17630...|
|  7340|[[165575, 5.09727...|
|  7880|[[165575, 5.14053...|
|  7993|[[171773, 5.80972...|
|  9852|[[175397, 6.13809...|
| 10362|[[170683, 5.64290...|
| 10623|[[171773, 8.29960...|
| 11141|[[150696, 6.46160...|
| 15447|[[165575, 7.83186...|
| 15619|[[165575, 5.81849...|
+------+--------------------+
only showing top 20 rows



In [30]:
nrecoomendations = nrecoomendations\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('userId', col("rec_exp.movieId"), col("rec_exp.rating"))

nrecoomendations.limit(10).show()

+------+-------+---------+
|userId|movieId|   rating|
+------+-------+---------+
|  1342| 171773| 5.809721|
|  1342| 165575|5.7393336|
|  1342| 148667|5.7349095|
|  1342| 116975| 5.635259|
|  1342| 168716|  5.58637|
|  1959| 165575| 9.343166|
|  1959| 170683| 8.687488|
|  1959| 196631| 8.687488|
|  1959| 111562| 8.687488|
|  1959| 145060| 8.687488|
+------+-------+---------+



In [31]:
# Load data into spark dataframe
movies = scSpark.read\
            .option("inferSchema", "true")\
            .csv("data/movies.csv", header=True, sep=",")

In [32]:
ratings.join(movies, on='movieId').filter('userId = 7340').sort('rating', ascending=False).limit(10).show()

+-------+-------+------+------+--------------------+--------------------+------------------+----+--------------------+--------------------+--------------------+
|movieId|    _c0|userId|rating|               title|              genres|   avg_user_rating|year|                comb|               title|              genres|
+-------+-------+------+------+--------------------+--------------------+------------------+----+--------------------+--------------------+--------------------+
|   4973|  97826|  7340|   5.0|Amelie (Fabuleux ...|      Comedy Romance| 4.101282051282051|2001|Comedy Romance 4....|Amelie (Fabuleux ...|      Comedy|Romance|
|   6016| 182780|  7340|   5.0|City of God (Cida...|Action Adventure ...|  4.18158741329044|2002|Action Adventure ...|City of God (Cida...|Action|Adventure|...|
|    318| 297826|  7340|   5.0|Shawshank Redempt...|         Crime Drama| 4.413576004516335|1994|Crime Drama 4.413...|Shawshank Redempt...|         Crime|Drama|
|  44555|2163096|  7340|   5.0|Liv

In [33]:
nrecoomendations.join(movies, on='movieId').filter('userId=7340').sort('rating', ascending=False).limit(10).show()

+-------+------+---------+--------------------+--------------------+
|movieId|userId|   rating|               title|              genres|
+-------+------+---------+--------------------+--------------------+
| 165575|  7340|5.0972724|In guerra per amo...|  (no genres listed)|
|  99764|  7340| 4.679859|It's Such a Beaut...|Animation|Comedy|...|
| 168716|  7340|4.6706047|A Gathering of Ca...|  (no genres listed)|
| 150696|  7340| 4.443341|     Tomorrow (2015)|         Documentary|
|   5912|  7340|4.3567977|Hit the Bank (Vab...|        Comedy|Crime|
+-------+------+---------+--------------------+--------------------+



In [35]:
#Exporting model into a .pickl file for use in front-end
nrecoomendations.toPandas().to_csv("collab_filter_recommendations.csv")

In [36]:
movies.toPandas().to_csv("collab_filter_movies.csv")