In [0]:
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder.appName('myproj').getOrCreate()

In [0]:
#import ratings data

file_location = "/FileStore/tables/ratings-1.csv"
file_type = "csv"

# CSV options
infer_schema = "True"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
ratings = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(ratings)

# Create Temporary Tables
ratings.createOrReplaceTempView("ratings")

userId,movieId,rating,timestamp
1,296,5.0,1147880044
1,306,3.5,1147868817
1,307,5.0,1147868828
1,665,5.0,1147878820
1,899,3.5,1147868510
1,1088,4.0,1147868495
1,1175,3.5,1147868826
1,1217,3.5,1147878326
1,1237,5.0,1147868839
1,1250,4.0,1147868414


In [0]:
#import ratings data
file_location = "/FileStore/tables/movies.csv"
file_type = "csv"

# CSV options
infer_schema = "True"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
movies = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(movies)

# Create Temporary Tables
movies.createOrReplaceTempView("movies")

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action
10,GoldenEye (1995),Action|Adventure|Thriller


In [0]:
#converting books data into pandas dataframe

moives_df = movies.toPandas()
moives_df.head()

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [0]:
ratings.show(5)

In [0]:
#Each book has 100 ratings in the ratingss dataframe

ratings.groupby('movieid').count().show()

In [0]:
%sql

SELECT b.title, r.movieId,count(*) as c  FROM ratings r inner join movies b on b.movieId = r.movieId group by r.movieId, b.title order by c desc

title,movieId,c
Forrest Gump (1994),356,81491
"Shawshank Redemption, The (1994)",318,81482
Pulp Fiction (1994),296,79672
"Silence of the Lambs, The (1991)",593,74127
"Matrix, The (1999)",2571,72674
Star Wars: Episode IV - A New Hope (1977),260,68717
Jurassic Park (1993),480,64144
Schindler's List (1993),527,60411
Braveheart (1995),110,59184
Fight Club (1999),2959,58773


In [0]:
ratings = ratings.select(ratings.userId,
                         ratings.movieId,
                         ratings.rating.cast("double"))

In [0]:
# Count the total number of ratings in the dataset
numerator = ratings.select("rating").count()

# Count the number of distinct Id's
num_users = ratings.select("userId").distinct().count()
num_items = ratings.select("movieId").distinct().count()

# Set the denominator equal to the number of users multiplied by the number of items
denominator = num_users * num_items

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator * 1.0)/ denominator) * 100
print("The ratings dataframe is ", "%.2f" % sparsity + "% empty.")

In [0]:
# Min num ratings 
print("Item with the fewest ratings: ")
ratings.groupBy("movieId").count().sort('count').show(10)

In [0]:
# Group data by user_id, count ratings
(ratings.groupBy("userId")
    .count()
    .filter("`count` >= 5")
    .orderBy('count', ascending=False)
    .show(n = 10))

split the data into training and test set to use collaborative filtering usingn Alternate Least Square method

In [0]:
(training, test) = ratings.randomSplit([0.8, 0.2])

In [0]:
test.show(5)

**import ALS and regression evaluator to find RMSE.**

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [0]:
als = ALS( userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop", nonnegative = True, implicitPrefs = False)

In [0]:
type(als)

In [0]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator 

param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 75, 100]) \
            .addGrid(als.maxIter, [5, 50, 75, 100]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()

In [0]:
# Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName = "rmse", 
                                labelCol = "rating", 
                                predictionCol = "prediction")
# Print length of evaluator
print ("Num models to be tested using param_grid: ", len(param_grid))

In [0]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator = als, 
                    estimatorParamMaps = param_grid, 
                    evaluator = evaluator, 
                    numFolds = 5)

In [0]:
print(cv)

In [0]:
model = als.fit(training)

In [0]:
predictions = model.transform(test)

In [0]:
predictions.show(n = 10)

In [0]:
predictions.createOrReplaceTempView("predictions")

In [0]:
%sql
select * from predictions

userId,movieId,rating,prediction
1580,68135,4.5,3.4591599
1829,2142,4.0,3.3308475
3794,1088,2.5,2.618145
3794,1580,3.0,3.2512724
3794,44022,2.0,2.7902145
3918,1645,3.5,3.2924814
3997,1580,1.0,2.1649244
5803,471,4.5,3.9253407
7982,3175,3.5,4.149643
8592,36525,4.0,4.5157247


In [0]:
%sql
select predictions.userId, predictions.movieId, predictions.rating, predictions.prediction, movies.title from 
predictions inner join movies 
ON predictions.movieId = movies.movieId

userId,movieId,rating,prediction,title
148,541,4.0,4.1514287,Blade Runner (1982)
148,589,4.5,3.8303318,Terminator 2: Judgment Day (1991)
148,924,5.0,4.0444565,2001: A Space Odyssey (1968)
148,1193,4.5,4.252879,One Flew Over the Cuckoo's Nest (1975)
148,1209,4.5,4.159973,Once Upon a Time in the West (C'era una volta il West) (1968)
148,1213,4.5,4.1848516,Goodfellas (1990)
148,1217,3.5,4.112922,Ran (1985)
148,1234,4.5,4.128608,"Sting, The (1973)"
148,1244,3.5,3.9517379,Manhattan (1979)
148,1247,3.5,4.019241,"Graduate, The (1967)"


In [0]:
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

In [0]:
# Generate n recommendations for all users
ALS_recommendations = model.recommendForAllUsers(numItems = 10) # n - 10

In [0]:
ALS_recommendations.show(n = 10)

In [0]:
# Temporary table
ALS_recommendations.registerTempTable("ALS_recs_temp")

In [0]:
clean_recs = spark.sql("""SELECT userId,
                            movieIds_and_ratings.movieId AS movieId,
                            movieIds_and_ratings.rating AS prediction
                        FROM ALS_recs_temp
                        LATERAL VIEW explode(recommendations) exploded_table
                            AS movieIds_and_ratings""")
clean_recs.show()

In [0]:
# Recommendations for unread movies
(clean_recs.join(ratings, ["userId", "movieId"], "left")
    .filter(ratings.rating.isNull()).show())

In [0]:
new_movies = (clean_recs.join(ratings, ["userId", "movieId"], "left")
    .filter(ratings.rating.isNull()))

In [0]:
print(new_movies.count())

In [0]:
new_movies.show(5)

In [0]:
# Create Temporary Tables
new_movies.createOrReplaceTempView("new_movies")

In [0]:
%sql

select max(prediction) from new_movies

max(prediction)
9.983547


In [0]:
%sql

select userId, movieId, round(prediction,2) as prediction from new_movies

userId,movieId,prediction
4,203882,5.19
5,194434,5.95
6,184299,5.5
6,203882,5.75
9,140024,5.74
10,120272,5.14
10,165559,5.19
10,205277,6.41
12,165689,5.0
15,183947,6.51
