In [0]:
# ALS - Collaborative filtering
# Imports
from pyspark.sql.types import StructField, IntegerType, StructType
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [0]:
# Initializing a spark session
spark = SparkSession.builder.master("local").getOrCreate()

In [0]:
# Location of the "ratings.csv" file
ratings_location = "/FileStore/tables/ratings-2.csv"


# CSV options
infer_schema = "True"
first_row_is_header = "True"
delimiter = ","

data_schema = [StructField('userId', IntegerType(), True)]
final_struct = StructType(fields = data_schema)

# The applied options are for CSV files. For other file types, these will be ignored.
ratings = spark.read.format("csv") \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(ratings_location)

ratings.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    296|   5.0|1147880044|
|     1|    306|   3.5|1147868817|
|     1|    307|   5.0|1147868828|
|     1|    665|   5.0|1147878820|
|     1|    899|   3.5|1147868510|
|     1|   1088|   4.0|1147868495|
|     1|   1175|   3.5|1147868826|
|     1|   1217|   3.5|1147878326|
|     1|   1237|   5.0|1147868839|
|     1|   1250|   4.0|1147868414|
|     1|   1260|   3.5|1147877857|
|     1|   1653|   4.0|1147868097|
|     1|   2011|   2.5|1147868079|
|     1|   2012|   2.5|1147868068|
|     1|   2068|   2.5|1147869044|
|     1|   2161|   3.5|1147868609|
|     1|   2351|   4.5|1147877957|
|     1|   2573|   4.0|1147878923|
|     1|   2632|   5.0|1147878248|
|     1|   2692|   5.0|1147869100|
+------+-------+------+----------+
only showing top 20 rows



In [0]:
# Location of the "movies.csv" file
movies_location = "/FileStore/tables/movies.csv"

movies = spark.read.format("csv") \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(movies_location)

movies.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [0]:
# Spliting the ratings data into test and train

train,test = ratings.randomSplit([0.80, 0.20])

In [0]:
# Setting the ALS algorithm
als = ALS()
(als.setRank(1)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating")
  .setMaxIter(5)
  .setColdStartStrategy("drop"))

Out[6]: ALS_f5f21c974d15

In [0]:
#Training the model

model = als.fit(train)

In [0]:
prediction = model.transform(test)
prediction.show()

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|   148|    296|   5.0|1454942620| 4.1387672|
|   148|    356|   4.0|1454943237| 3.9963617|
|   148|    541|   4.0|1454942653| 4.1102123|
|   148|    589|   4.5|1454942891| 3.9110081|
|   148|    899|   4.0|1454942737| 4.0923066|
|   148|    953|   4.0|1454942762|  4.068539|
|   148|   1080|   3.5|1454942839| 4.0323925|
|   148|   1089|   4.5|1454942649| 4.0936685|
|   148|   1136|   4.5|1454942642|  4.144229|
|   148|   1201|   5.0|1454943142|  4.150764|
|   148|   1203|   5.0|1454943026| 4.2665935|
|   148|   1204|   4.0|1454943147| 4.1851087|
|   148|   1207|   4.0|1454943114|  4.171539|
|   148|   1221|   4.5|1454942629|  4.252945|
|   148|   1266|   4.0|1454942827| 4.0388007|
|   148|   1267|   4.5|1454942691|  4.139818|
|   148|   1276|   3.5|1454942714| 4.1260514|
|   148|   1284|   4.0|1454943174| 4.2268267|
|   148|   1292|   3.5|1454942862|

In [0]:
def get_movie_name(movie_id, movies):
    print(movies.where(movies.movieId==movie_id).collect())

In [0]:
users = ratings.select(als.getUserCol()).distinct().limit(1)
#users = str('10')
rd = model.recommendForUserSubset(users, 1)
rd = rd.take(1)

In [0]:
# Using index to obtain the movie id of top predicted rated item
recommendation = rd[0]['recommendations'][0][0]

# Using get_movie_name function to get the name and genre of the movie
get_movie_name(recommendation,movies)

[Row(movieId=205277, title='Inside Out (1991)', genres='Comedy|Drama|Romance')]


In [0]:
# Import the requisite packages
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()

In [0]:
# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="rating", 
           predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  16


In [0]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

In [0]:
RMSE = evaluator.evaluate(prediction)
print(RMSE)

0.8654144407083533
