# Movie Recommender
*(Using PySpark) *


**Summary:** 

UserId and the MovieID which the user has liked is given, Based on this information we have to design a system which recommends top 5 movies to the user.

To solve the problem we are given various data like:

1. *ratings.csv:* - UserID,MovieID,Rating(out of 5.0),timestamp
2. *movies.csv:*  - MovieID,title,genre
3. *tags.csv:*    - UserID,MovieID,tags,timestamp
4. *links.csv:*   - MovieID,ImdbId,tmbdId

**===============================================================================**

*Note:*
1. User is considered to like a movie if the rating is above 3.0
2. All the IDs are consistent throughout the data

**===============================================================================**

**Importing all the required libraries:**


In [1]:
from functools import reduce
#from pyspark import SparkContext 
#from pyspark.sql import SQLContext 
#import pandas as pd 
#from pyspark.sql.types import StringType
#sqlContext = SQLContext(sc)

**Getting Input from the user and declaring Global variables**

In [2]:
#Taking userID and movieID from user

givenMovieId = input("Enter the movieID between 1 and 163949: ")
givenUserId = input("Enter the userID between 1 and 671: ")
avgRating = 3.0

#It is assumed that the input data is correct & the givenMovieID is the movieID for which user needs recommendation

Enter the movieID between 1 and 163949: 1
Enter the userID between 1 and 671: 1


**Reading all the given .csv files**

* Reading the files as dataframes
* finding the list of all movies that user likes sorted by rating
* finding the genre of movies liked by the givenUserId
* Perfoming join to get a consolidated table with following columns
    * UserId | MovieId | Rating | timestamp | genre 
      


In [40]:
ratingsDF = spark.read.csv("ratings.csv", header =True, inferSchema =True)
moviesDF = spark.read.csv("movies.csv", header =True, inferSchema =True)
tagsDF = spark.read.csv("tags.csv", header =True, inferSchema =True)
linksDF = spark.read.csv("links.csv", header =True, inferSchema =True)

#Converting All the DataFrames to RDDs

ratingsRDD = ratingsDF.rdd
moviesRDD = moviesDF.rdd
tagRDD = tagsDF.rdd
linksRDD = linksDF.rdd

#reducedRDD = ratingsDF.rdd.map(lambda x: (x[0],x[1],x[2]))

#Getting the list of all movies that user has liked
#(avgRating greater than 3.0) from ratings.csv => userId,movieId,rating,time
highRatedUserMoviesRDD = ratingsRDD.filter(lambda x: x[0] == int(givenUserId) and x[2] >= avgRating).sortBy(lambda x: x[2],ascending=False)

#filters the RDD to give the data for only input userID
tempData = ratingsRDD.filter(lambda x: x[0] == int(givenUserId)) #[userID|movieID|rating|time]
moviesWatchedByUser = spark.createDataFrame(tempData)

allMoviesDF = spark.createDataFrame(moviesRDD)
print("--------------------------------")

#creating join for the movies watched by the user
joinedDF = allMoviesDF.join(moviesWatchedByUser,["movieId"])

#creating a complete join == for all userIDs and all movies
joinedCompleteDF = moviesDF.join(ratingsDF,["movieId"])

moviesNotWatchedByUser = joinedCompleteDF.subtract(joinedDF)

#converting to dataframes to perfom join 
highRatedUserMoviesDF = spark.createDataFrame(highRatedUserMoviesRDD)
#print("High Rated User Movies #Rating above 3.0")
#highRatedUserMoviesDF.show()


#Using Join to get a consolidated table of highLiked User Movies with the genre
combinedLikedMoviesAndGenreDF = highRatedUserMoviesDF.join(moviesDF, ["movieId"])
print("Finding the genre of high rated user movies and print it as a consolidated table")
combinedLikedMoviesAndGenreDF.show()

#Removing Unused Columns eg. timestamp and title
data = combinedLikedMoviesAndGenreDF.rdd.map(lambda x: (x[0],x[1],x[2],x[5]))
reduced_combinedLikedMoviesAndGenreDF = spark.createDataFrame(data)

#reduced_combinedLikedMoviesAndGenreDF.show()



--------------------------------
Finding the genre of high rated user movies and print it as a consolidated table
+-------+------+------+----------+--------------------+--------------------+
|movieId|userId|rating| timestamp|               title|              genres|
+-------+------+------+----------+--------------------+--------------------+
|   1172|     1|   4.0|1260759205|Cinema Paradiso (...|               Drama|
|   1953|     1|   4.0|1260759191|French Connection...|Action|Crime|Thri...|
|   2105|     1|   4.0|1260759139|         Tron (1982)|Action|Adventure|...|
|   1339|     1|   3.5|1260759125|Dracula (Bram Sto...|Fantasy|Horror|Ro...|
|   1029|     1|   3.0|1260759179|        Dumbo (1941)|Animation|Childre...|
|   1061|     1|   3.0|1260759182|     Sleepers (1996)|            Thriller|
|   2150|     1|   3.0|1260759194|Gods Must Be Craz...|    Adventure|Comedy|
|   3671|     1|   3.0|1260759117|Blazing Saddles (...|      Comedy|Western|
+-------+------+------+----------+-----


**Finding the top Genre liked by the user**

1. Example: 2 movieIDs liked by user are 1 and 2; 
2. there respective genre are **Adventure|Animation|Children|Comedy|Fantasy** and __Adventure|Children|Fantasy__
3. __Aim:__ result should be [(Adventure,2),(Children,2),(Fantasy,2),(Comedy,1),(Animation,1)]
4. So the most likely genre to be liked by user are __Adventure,Children,Fantasy__


In [41]:
#seperating the genre of movies liked by the user from the table
onlyGenreOfLikedMovies = reduced_combinedLikedMoviesAndGenreDF.rdd.map(lambda x: (x[3]))

#splitting the genre on "|"
seperateGenre = onlyGenreOfLikedMovies.map(lambda x: (x.split("|")))
allGenre = seperateGenre.flatMap((list))

equalCountAllGenre= allGenre.map(lambda x: (x,1))
    
countGenre = equalCountAllGenre.reduceByKey(lambda x,y:x+y)

sortedCountForGenre = countGenre.sortBy(lambda x: x[1],ascending=False)

bestFitGenre = sortedCountForGenre.take(3)

print("The top 3 genre and its equivalent count aka genre most liked by the user: Genre | count")
print(bestFitGenre)
topThreeGenreNames=[]
topThreeGenreNames = [bestFitGenre[0][0],bestFitGenre[1][0],bestFitGenre[2][0]]
topThreeGenreNamesSet= set(topThreeGenreNames)

#print(topThreeGenreNamesSet)

#print(bestFitGenre[1][0])

The top 3 genre and its equivalent count aka genre most liked by the user: Genre | count
[('Thriller', 3), ('Drama', 2), ('Action', 2)]


**Taking into account the genre of Input MovieID**

1. Finding the Genre of Input movieID
2. Checking if the top 3 genre includes the genre of the Input Movie

In [42]:
#Getting the details of the input MovieId given by the user
inputMovieDetailsFromMoviesCSV = moviesRDD.filter(lambda x:x[0] == int(givenMovieId)).collect()

inputMovieGenreSet = set(inputMovieDetailsFromMoviesCSV[0][2].split("|"))
inputMovieGenreList =list(inputMovieGenreSet)
print("Input Movie Genre are as follows: ")
print(inputMovieGenreSet,'\n')

commonGenresSet = topThreeGenreNamesSet.intersection(inputMovieGenreSet)
commonGenresList = list(commonGenresSet)
print("Displaying common genre(If any) between the top Genres of the user and the genre of the Input Movie")
print(commonGenresSet)



Input Movie Genre are as follows: 
{'Comedy', 'Adventure', 'Animation', 'Children', 'Fantasy'} 

Displaying common genre(If any) between the top Genres of the user and the genre of the Input Movie
set()


**Finding the relation between the User's most liked Genre and Genre of the input movie**

1. Checking the relation between input Genre and top 3 genre
2. Creating the modified Genre

In [52]:
if(len(commonGenresList) != 0):
    print("There are common genre between users top genre and genre of the input movie")
#    #It means that the movie which the user has given as input has some common Genre with the Users Top3 Genre
    mostLikelyGenre = sortedCountForGenre.filter(lambda x: x[0] in commonGenresList).take(3)
    inputMovieGenreList = mostLikelyGenre
    
    #Here it is important to note the following scenarios:
        #If the no. of common Genre is 1 => that means that:
        #the user has previously liked these genres and also the current movie is of having this genre : 
        #So the user is likely to like any high rated movie of this genre
        
        #If the no. of common Genre is 2 or 3 (G1,G2 and G3) the user is likely to like any movie with Genre G1, G2, G3 
        #or any combination of G1|G2|G3
        
        #All this processing is done in the below section
        
#else:
    # If there are no common Genre, It means that this is a new Genre movie which the user has not liked before
    # So we can do the following:
    #1. Find the users who who have liked the movieID given by the user
    #2. Find the top rated movie in the new Genre liked by these user
    
for genre in inputMovieGenreList:
    data = moviesRDD.filter(lambda x: (genre in x[2].split("|")))
    

detailsForInputMovieGenre = spark.createDataFrame(data)  


#Creating the list for movies in inputGenre which the user has not watched

likelyMoviesForUser = moviesNotWatchedByUser.join(detailsForInputMovieGenre,["movieId","genres","title"])


recommendedmoviesDF = likelyMoviesForUser.join(ratingsDF,["movieId"])


recommendedRDD = recommendedmoviesDF.rdd.map(list).sortBy(lambda x:x[2],ascending=False )
print("top movies recommended are:")
recommendedRDD.take(5)


top movies recommended are:


[[47384,
  'Adventure|Comedy|Drama|Fantasy',
  'Zoom (2006)',
  213,
  2.5,
  1462644086,
  213,
  2.5,
  1462644086],
 [109187,
  'Drama|Fantasy|Sci-Fi',
  'Zero Theorem, The (2013)',
  615,
  0.5,
  1454913681,
  624,
  2.0,
  1431182418],
 [109187,
  'Drama|Fantasy|Sci-Fi',
  'Zero Theorem, The (2013)',
  615,
  0.5,
  1454913681,
  615,
  0.5,
  1454913681],
 [109187,
  'Drama|Fantasy|Sci-Fi',
  'Zero Theorem, The (2013)',
  615,
  0.5,
  1454913681,
  270,
  4.0,
  1469278482],
 [109187,
  'Drama|Fantasy|Sci-Fi',
  'Zero Theorem, The (2013)',
  615,
  0.5,
  1454913681,
  15,
  0.5,
  1465794437]]