In [None]:
#Create SPARK_HOME and PYLIB env var and update PATH env var

In [None]:
import os
import sys
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

In [None]:
import findspark

In [None]:
findspark.init("/usr/local/spark/")

In [None]:
from pyspark.sql import SparkSession

In [None]:
from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession

conf = SparkConf().setAppName("Movie Recommendation Applicationk").setMaster('local')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

In [None]:
#Loading the dependent libraries

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.sql.functions import isnan, when, count, col, countDistinct

In [None]:
'''
Problem Statement
Building a system that predicts the rating a user gives to a perticular movie.
Data Dictionary
Ratings Data File Structure (ratings.csv)
All ratings are contained in the file ratings.csv. Each line of this file after the header row represents one rating of one movie by one user, and has the following format:
userId, movieId, rating, timestamp

The lines within this file are ordered first by userId, then, within user, by movieId.
Ratings are made on a 5-star scale, with half-star increments (0.5 stars - 5.0 stars).
Timestamps represent seconds since midnight Coordinated Universal Time (UTC) of January 1, 1970.
Movies Data File Structure (movies.csv)
Movie information is contained in the file movies.csv. Each line of this file after the header row represents one movie, and has the following format:
movieId, title, genres

Genres are a pipe-separated list, and are selected from the following:
Action
Adventure
Animation
Children's
Comedy
Crime
Documentary
Drama
Fantasy
Film-Noir
Horror
Musical
Mystery
Romance
Sci-Fi
Thriller
War
Western
(no genres listed)
'''

In [None]:
#Reading the movies and ratings data and creating a dataframe

In [None]:
## Read data and create a dataframe
ratingsData = spark.read.format("csv")\
       .option("header", "true")\
       .option("inferSchema", "true")\
       .load("file:///Users/pavantej/Desktop/SCIT/sem2/big data/20180701_Batch39_CSE7322c_Recommendation/ml-latest-small/rating_edx.csv")
    
moviesData = spark.read.format("csv")\
       .option("header","true")\
       .option("inferSchema", "true")\
       .load("file:///Users/pavantej/Desktop/SCIT/sem2/big data/20180701_Batch39_CSE7322c_Recommendation/ml-latest-small/movies.csv")
    


In [None]:
#Understanding Data

In [None]:
#Priniting Schema

In [None]:
ratingsData.printSchema()
moviesData.printSchema()

In [None]:
#Total number of Columns and Records

In [None]:
ratingsData.count()

In [None]:
print("No. of Columns in Ratings data= {}".format(len(ratingsData.columns)))

print('No. of Records in rating data= {}'.format(ratingsData.count()))

print("No. of Columns in movies data = {}".format(len(moviesData.columns)))

print('No. of Records in movies data= {}'.format(moviesData.count()))

In [None]:
#Look at first 3 row of the dataframe

In [None]:
ratingsData.show(3)
moviesData.show(3)

In [None]:
#Summary statistics
ratingsData.describe().show()
moviesData.describe().show()

In [None]:
#Getting the count of Distinct usersIds and movieIDs
print ("Number of different users: " + str(ratingsData.select('userId').distinct().count()))
print ("Number of different movies: " + str(ratingsData.select('movieId').distinct().count()))
print ("Number of different movies: " + str(moviesData.select('movieId').distinct().count()))

In [None]:
#Split the data into training and test sets (20% held out for testing)
(trainingData,testData)=ratingsData.randomSplit([0.8,0.2])

In [None]:
'''
Model Building and Evaluation
ALS model params
numBlocks is the number of blocks the users and items will be partitioned into in order to parallelize computation (defaults to 10).
rank is the number of latent factors in the model (defaults to 10).
maxIter is the maximum number of iterations to run (defaults to 10).
regParam specifies the regularization parameter in ALS (defaults to 1.0).
implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data (defaults to false which means using explicit feedback).
alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations (defaults to 1.0).
nonnegative specifies whether or not to use nonnegative constraints for least squares (defaults to false).
'''

In [None]:
from pyspark.ml.recommendation import ALS

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als= ALS(userCol="userId",itemCol="movieId",ratingCol="rating",coldStartStrategy='drop')

In [None]:
model = als.fit(trainingData)

In [None]:
# Predicting on the test data
predictions=model.transform(testData)
predictions.show(50)

In [None]:
#Defining the evaluator
from pyspark.ml.evaluation import RegressionEvaluator
evaluator=RegressionEvaluator(metricName='rmse',labelCol="rating",predictionCol="prediction")

In [None]:
#Evaluation on the test data
rmse=evaluator.evaluate(predictions)
print("RMSE Error =" + str(rmse))
predictions.show(100)

In [None]:
#recommend movies
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)

In [None]:
userRecs.show(10)

In [None]:
movieRecs=model.recommendForAllItems(10)

In [None]:
spark.stop()