In [0]:
ratings_schema = "`UserID` INT, `MovieID` INT, `Rating` INT, `Timestamp` bigint"

In [0]:
# File location and type
ratings_file_location = "dbfs:/FileStore/tables/ratings.dat"

In [0]:
Ratings_DF=spark.read.format("csv").option("delimiter", "::").schema(ratings_schema).load(ratings_file_location)

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
movies_schema = StructType([ 
    StructField("MovieID",IntegerType(),True), 
    StructField("Title",StringType(),True), 
    StructField("Genres",StringType(), True)])
movies_file_location = "dbfs:/FileStore/tables/movies.dat"
Movies_DF=spark.read.format("csv").option("delimiter", "::").schema(movies_schema).load(movies_file_location)
Final_Movies_DF=Movies_DF.withColumn("Genres",explode(split("Genres","[|]")))
Final_Movies_DF.show()

In [0]:
# What are the titles of top 5 most popular movies i.e. have the most ranking in the whole dataset?
joinDF=Final_Movies_DF.join(Ratings_DF,Final_Movies_DF.MovieID==Ratings_DF.MovieID).drop(Ratings_DF.MovieID)

In [0]:
joinDF.show()

In [0]:
#What are the titles of top 5 most popular movies i.e. have the most ranking in the whole dataset?
from pyspark.sql.functions import desc
joinDF.groupBy("Title").agg(sum("Rating").alias("popularity")).filter(col("popularity").isNotNull()).sort(desc("popularity")).show(5)

In [0]:
#What are the top 5 ranked movie genres on average in the whole dataset?
joinDF.groupBy("Genres").agg(avg("Rating").alias("popularity_genres")).filter(col("popularity_genres").isNotNull()).sort(desc("popularity_genres")).show(5)

In [0]:
#How many movies have been ranked the most consecutive days?
from pyspark.sql.functions import from_utc_timestamp
joinDF= joinDF.withColumn("Timestamp",(col("Timestamp") / 1e6).cast("Timestamp"))



In [0]:
joinDF.show()

In [0]:
# Count the total number of ratings in the dataset
numerator = Ratings_DF.select("Rating").count()

# Count the number of distinct userIds and distinct movieIds
num_users = Ratings_DF.select("UserID").distinct().count()
num_movies = Ratings_DF.select("MovieID").distinct().count()

# Set the denominator equal to the number of users multiplied by the number of movies
denominator = num_users * num_movies

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("The ratings dataframe is ", "%.2f" % sparsity + "% empty.")

In [0]:
 #Group data by userId, count ratings
userId_ratings = Ratings_DF.groupBy("UserID").count().orderBy('count', ascending=False)
userId_ratings.show()

In [0]:
#Group data by userId, count ratings
movieId_ratings = Ratings_DF.groupBy("MovieID").count().orderBy('count', ascending=False)
movieId_ratings.show()

In [0]:
# Import the required functions
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [0]:
 #Create test and train set
(train, test) = Ratings_DF.randomSplit([0.8, 0.2], seed = 1234)

# Create ALS model
als = ALS(userCol="UserID", itemCol="MovieID", ratingCol="Rating", nonnegative = True, implicitPrefs = False, coldStartStrategy="drop")

# Confirm that a model called "als" was created
type(als)

In [0]:
# Import the required functions
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [0]:
# Create test and train set
(train, test) = Ratings_DF.randomSplit([0.8, 0.2], seed = 1234)

# Create ALS model
als = ALS(userCol="UserID", itemCol="MovieID", ratingCol="Rating", nonnegative = True, implicitPrefs = False, coldStartStrategy="drop")

# Confirm that a model called "als" was created
type(als)

In [0]:
# Import the requisite items
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()
            #             .addGrid(als.maxIter, [5, 50, 100, 200]) \

           
# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

In [0]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

# Confirm cv was built
print(cv)

In [0]:
#Fit cross validator to the 'train' dataset
model = cv.fit(train)

#Extract best model from the cv model above
best_model = model.bestModel

In [0]:
# Print best_model
print(type(best_model))

# Complete the code below to extract the ALS model parameters
print("**Best Model**")

# # Print "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())

# Print "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())

# Print "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

In [0]:
# View the predictions
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

In [0]:
# Generate n Recommendations for all users
nrecommendations = best_model.recommendForAllUsers(5)
nrecommendations.limit(10).show()

In [0]:
nrecommendations = nrecommendations\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('userId', col("rec_exp.movieId"), col("rec_exp.rating"))

nrecommendations.limit(10).show()