In [None]:
import urllib.request as urllib
# u.data -- The full u data set, 100000 ratings by 943 users on 1682 item. 
          # Each user has rated at least 20 movies.  Users and items are numbered consecutively from 1. 
          # The data is randomly ordered. This is a tab separated list of user id | item id | rating | timestamp 
urllib.urlretrieve ("http://files.grouplens.org/datasets/movielens/ml-100k/u.data", "u.data")
# u.item     -- Information about the items (movies); this is a tab separated list of
              # movie id | movie title | release date | video release date | IMDb URL |
              # unknown | Action | Adventure | Animation | Children's | Comedy | Crime | Documentary |# Drama | Fantasy |Film-Noir | Horror | Musical | Mystery | Romance | Sci-Fi |Thriller | War | Western |
              # The last 19 fields are the genres, a 1 indicates the movie is of that genre, a 0 indicates it is not
              # The movie ids are the ones used in the u.data data set
urllib.urlretrieve ("http://files.grouplens.org/datasets/movielens/ml-100k/u.item", "u.item")

In [None]:
from pyspark import SparkContext
sc = SparkContext()

### The aim of this exercise is to recommend movies to the users.The exercise is divided into three parts. 
#### In the first part , you will preprocess the data, transform it into a meaningful format and use mathematical calculations to recommend. 
#### In the second part, we will use Machine learning methods to recommend on a much more efficient way.
#### In the third part, you will recommend movies for yourself based on the ratings you supply manually

#### 1 A. Create the ratings and movies RDDs

In [None]:
ratings = sc.textFile('u.data', 20)
movies = sc.textFile('u.item', 20)

print(ratings.take(1))
print(movies.take(1))

#### 1 B. Feature Extraction: Extracting the relevant features for our problem

In [None]:
# Write down the code for parsing the ratings of the above generated RDD called ratings
def ratings_parse(x):
    """
    Returns: (user_id, movie_id(item_id), rating)
    """


def movies_parse(x):
    """
    Returns: (movie_id, movie_title)
    """

ratingsRDD = ratings.map(ratings_parse).cache()
print(ratingsRDD.take(5))
print(ratingsRDD.count())

moviesRDD = movies.map(movies_parse).cache()
print(moviesRDD.take(5))
print(moviesRDD.count())

#### 1 C. First, we will try to recommend movies to the general public and the very basic way is to show all the movies which have high average ratings. We have to display the name, number of ratings, and the average rating of atleast 20 movies with the highest average rating. We should also filter our records based on a specific review threshold i.e. we need only select movies which have total number of reviews above a certain threshold value. 

In [None]:
# You need to implement a helper function which can help in the desired mathematical calculations
def getCountsAndAverages(movieIDandRatingsItem):
    """ Calculate average rating of a movie
    Args:
        movieIDandRatingsItem: (movie_id, (rating1, rating2, ...))
    Returns:
        (movie_id, (total number of ratings, averageRating))
    """

#### -> 1 D. Bring all the reviews for a movie together and then using the above helper function calculate the total count of ratings and average rating

In [None]:
# Map the ratingsRDD in such a way that it contains only (movie_id, rating)
# Then bring all the ratings for a particular movie_id together
# movieIDsWithRatingsRDD = .....
# print(movieIDsWithRatingsRDD.mapValues(list).take(1))  # [(movie_id, [rating1, rating2 ....])]


# Use the helper function getCountsAndAverages to get the total number of ratings for a particular movie and the average of of them
# movieIDsWithAvgRatingsRDD = .....
# print(movieIDsWithAvgRatingsRDD.take(5))

####  -> 1 E. Attach the name of the movie in the movieIDsWithAvgRatings RDD using moviesRDD which contains the movie name

In [None]:
# Attach the name from the moviesRDD to moviesIDsWithAvgRatingsRDD first
# _movieNameWithAvgRatingsRDD = ....
# print(_movieNameWithAvgRatingsRDD.take(1))  # (movie_id, (movie name, (total_ratings, avg_rating)))

# Transform the RDD into this form -> (average rating, movie name, number of ratings)
# movieNameWithAvgRatingsRDD = ....
# print(movieNameWithAvgRatingsRDD.take(1))

#### -> 1 F. Selecting only those movies who have more than 200 reviews to appeal to a broader audience

In [None]:
# First select only those records where the total number of reviews are greater than the threshold which is 200
# And then sort the final results by avg ratings in descending orders so that the highest avg rating is on the top
# Then show first 20 records
# movieLimitedAndSortedByRatingRDD = ....
# print(movieLimitedAndSortedByRatingRDD.take(20))  # Top 20 Movies for general public

### Option 2: Now with a more advanced approach we can do tackle the same problem in a more efficient way with one of the Machine learning techniques known as Collaborative filtering. Benefits?

In [None]:
# Use the ALS algorithm for performing Collaborative filtering
from pyspark.mllib.recommendation import ALS, Rating
# Divide the dataset into three parts as exlained in the slides
# ratingsRDD -> (movie_id, user_id, rating)
trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=0)
print(validationRDD.take(1))

# Build the recommendation model using Alternating Least Squares
ranks = [2, 3, 4, 8, 12]  # Rank is a factor which can be tuned to get the best model for our dataset
numIterations = 5
regularizationParameter = 0.1
for rank in ranks:
    model = ALS.train(trainingRDD, rank, numIterations, lambda_=regularizationParameter)

    # Evaluate the model on training data
    validation_data = validationRDD.map(lambda p: (p[0], p[1]))
    predictions = model.predictAll(validation_data).map(lambda r: ((r[0], r[1]), r[2]))
    ratesAndPreds = validationRDD.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
    MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
    print("Mean Squared Error = " + str(MSE))
    
print('Predictions RDD: ', predictions.take(1))
print('Ratings and Predictions combined RDD: ', ratesAndPreds.take(1))

In [None]:
bestRank = ....  # Fill the best rank by observing the errors from several runs
myModel = ALS.train(trainingRDD, bestRank, seed=0, iterations=numIterations,
                      lambda_=regularizationParameter)
testForPredictingRDD = testRDD.map(lambda item: (item[0], item[1]) )  # (user, movie, rating) -> (user, movie)
predictedTestRDD = myModel.predictAll(testForPredictingRDD)
# Check how does it look like after predicting from the model
predictedTestRDD.take(2)

In [None]:
# Calculate average rating value of all the ratings for the whole predicted test RDD
predictedTestRDD.map(lambda item: item[2]).reduce(lambda a,b: a+b) / predictedTestRDD.count() # item[2] = ratings

In [None]:
testRDD.take(2) # verify again how it looks like

In [None]:
# Calculate average ratings for the test RDD
testRDD.map(lambda item: item[2]).reduce(lambda a,b: a+b) / testRDD.count() # item[2] = ratings

#### 3. In this section you will use the above demonstration to calculate recommendations for yourself. You need to provide manual ratings to at least 10 of the movies from the list and then do the same step of training and testing steps again.

In [None]:
# Execute this section to get the list of movies which need to select from
print('List of movies with maximum number of ratings')
print('(average rating, movie name, number of reviews)')
for ratingsTuple in movieLimitedAndSortedByRatingRDD.take(50):
    print(ratingsTuple)

In [None]:
moviesRDD.take(2)  # Verify again what moviesRDD looks like (movie_id, movie_name)

In [None]:
my_user_id = 0  # This is your user id , do not change it

# Note that the movie IDs are the *last* number on each line. A common error was to use the number of ratings as the movie ID.
myRatedMoviesName = [
    (myUserID, u'' , 0),
    (myUserID, u'' , 0),
    (myUserID, u'' , 0),
    (myUserID, u'', 0),
    (myUserID, u'', 0),
    (myUserID, u'', 0),
    (myUserID, u'', 0),
    (myUserID, u'', 0),
    (myUserID, u'', 0),
    (myUserID, u'', 0),
    ]
myRatedMovies = []
# We need to pull movie_ids for the movies that you have entered from the moviesRDD
for (uid, name, rating) in myRatedMoviesName:
    movie_id = moviesRDD.filter(lambda item: item[1] == name).take(1)[0][0]
    myRatedMovies.append( (uid, movie_id, float(rating)) )
# Convert the python list into RDD     
myRatingsRDD = sc.parallelize(myRatedMovies)
print(myRatingsRDD.take(10))

In [None]:
# Use the union() function of spark to append the contents of myRatingsRDD to trainingRDD
# trainingWithMyRatingsRDD = ....
# Then train the model with the new RDD using same old parameters as before
# myRatingsModel = ....

In [None]:
# Now select all movies except the ones you rated in myRatedMovies array. 
# Hint: you can run a for loop with lambda to filter and select only the movies which were not in the myRatedMovies array 
# _myUnratedMoviesRDD = ....
# print('_myUnratedMoviesRDD', _myUnratedMoviesRDD.take(1))  # (movie_id, movie_name)

# Transform the above result into an RDD which looks like (my_user_id, movie_id)
# myUnratedMoviesRDD = ....
# print('myUnratedMoviesRDD ', myUnratedMoviesRDD.take(1))

# Remember how we converted testRDD to testForPredictingRDD by removing the ratings field from the testRDD 
# myUnratedMovies now has an user_id (my_user_id) and movie_id.
# Hence now you can use myUnratedMoviesRDD with myRatingsModel to predict your ratings for the movies

# predictedRatingsRDD = ....
# predictedRatingsRDD.take(2) # IMPORTANT, This RDD is not made up of tuples now, it is an RDD of 'Rating' objects!

In [None]:
# Transform movieIDsWithAvgRatingsRDD from section(1 D)
# Remember it has the form (MovieID, (number of ratings, average rating)), transform into an RDD of the form (MovieID, number of ratings)
# movieCountsRDD = ....

# Transform predictedRatingsRDD into an RDD with entries that are pairs of the form (Movie ID, Predicted Rating)
# predictedMoviesWithRatingsRDD = ....

# Use predictedMoviesWithRatingsRDD and movieCountsRDD (created above) to yield a new RDD of the form (Movie ID, (Predicted Rating, number of ratings))
# predictedMoviesWithRatingsAndCountsRDD  = ....

# predictedMoviesWithRatingsAndCountsRDD.take(2)

In [None]:
# Select movies from predictedMoviesWithRatingsAndCountsRDD with number of ratings more than say, 150
# Then, Using PredictedMoviesWithRatingsAndCountsRDD and moviesRDD (which has the movie name) we need to yield an RDD of the form
# (Predicted Rating, Movie Name, number of ratings)

# predictedMoviesWithRatingsCountsAndNamesRDD = ....
# print('predictedMoviesWithRatingsCountsAndNamesRDD ', predictedMoviesWithRatingsCountsAndNamesRDD.take(1)) 


# ratingsWithNamesRDD = ....                      
# print('ratingsWithNamesRDD ', ratingsWithNamesRDD.take(1))

# use takeOrdered instead of take and pass the lambda function in key to sort it in descending order (select 20 movies)
# ratingsWithNamesRDD.takeOrdered(20, key=lambda ....)

# These are Highest rated 20 movies (Predicted Recommendations) with reviews > 150