In [8]:
import pyspark 
import random
from pyspark.sql import SQLContext, functions as func, types as typ
from pyspark.sql.functions  import explode, split, avg, col, desc, asc, count, substring
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
import matplotlib.pyplot as plt

sc = pyspark.SparkContext.getOrCreate() 
## Read the data into memory

sqlContest = SQLContext(sc)

SMALL_DATASET_PATH = 'ml-latest-small'
DATASET_PATH = 'ml-latest'

def load_data(dataPath):
    #Load the movies and rating CSV files
    movies = sqlContest.read.csv(dataPath + '/movies.csv', header=True)
    ratings = sqlContest.read.csv(dataPath + '/ratings.csv', header=True)
    
    return ratings, movies


def clean_data(ratings, movies):
    
    ##Convert the timestamp to date 
    # Drop NA values
    # Convert data Type of columns intro integer and float
    clean_ratings = ratings.na.drop()\
    .withColumn('RatingDate', func.date_format(ratings.timestamp.
                                                           cast(dataType= typ.LongType()).
                                                           cast(dataType= typ.TimestampType()),
                                                           "yyyy-MM-dd"))\
    .withColumn("rating", ratings.rating.cast(dataType = typ.FloatType()))\
    .withColumn("userId", ratings.userId.cast(dataType = typ.IntegerType()))\
    .withColumn("movieId", ratings.movieId.cast(dataType = typ.IntegerType()))
    
    # Drop NA values
    # Convert data Type of columns into integer
    
    clean_movies = movies.na.drop()\
    .withColumn("movieId", movies.movieId.cast(dataType = typ.IntegerType()))\
    .withColumn("production_year", substring(col("title"), -5, 4))
    
    #Split the genres column into rows based on the number of the separator | 

    movies_geners =movies.withColumn("Genr",explode(split("genres","[|]")))

    ## return the clean datafrmes (ratings, movies, and movies genres)
    
    return clean_ratings, clean_movies, movies_geners


def search_user_by_id(user_id, ratings, movies):
    
    ##filter the users by the ID
    filtered_users = ratings.where("userId =" + str(user_id))\
    .join(movies, on=['movieId'], how ='inner')
    
    return filtered_users

def search_users_by_ids(ids, ratings, movies):
    
    ## filtere users by a list of users ids using isin  
    filtered_users = ratings.filter(col("userId").isin(ids))\
    .join(movies, on=['movieId'], how ='inner')
                                  
    return filtered_users

def search_movie_by_id(movieId, movies):
    
    ##filter the movies by the movie id    
    filtered_movies = movies.where("movieId = " + str(movieId))
    
    return filtered_movies

def search_movie_by_title(title, movies):
    
    ##filter the movies by title
    filtered_movies = movies.where("title like \'%" + 
                                  str(title) + "%\'")
    
    return filtered_movies

def search_genre(genr, movies):
    #search one genre using like operator to accept incomplete names
    filtered_movies = movies.where("genres like \'%" + 
                                  str(genr) + "%\'")
    #filtered_movies.show()
    return filtered_movies

def search_genres(genrs, movies_geners, movies):
    
    ##filter all movies that are contained in the genres
    genrs_movies = movies_geners.filter(col("Genr").isin(genrs))
    ##join the filtered movies id with the movies dataframe
    filtered_movies = genrs_movies.join(movies, on=['movieId'], how = 'inner')
        
    return filtered_movies

def search_genres_separated(genrs, movies_geners, movies):
    
    ##Search each Genr, show how many movies it contains and show the first five movies
    for genr in genrs:
        genr_movies = search_genre(genr, movies)
        print("Genre (" + genr + ") contains " + str(genr_movies.count()))
        genr_movies.show(5)
    
    return


def summarize_movie(movies, ratings):
    
    ##summarize movies by getting the number of watchings and average rating
    ## and rename the aggregated fields
    summary = movies.join(ratings, on=['movieId'], how='inner')\
    .groupBy("movieId")\
    .agg(avg(col("rating")), count(col("movieId")))\
    .withColumnRenamed("avg(rating)", "Average_Rating")\
    .withColumnRenamed("count(movieId)", "Number_OF_Watchings")
    
    return summary

def summarize_genres(movies_genres, ratings):
    
    ##summarize movies by getting the number of watchings and average rating
    ## and rename the aggregated fields
    summary = movies_genres.join(ratings, on='movieId', how='inner')\
    .groupBy("Genr")\
    .agg(count(col('movieId')), average(col('rating')))\
    .withColumnRenamed("avg(rating)", "Average_Rating")\
    .withColumnRenamed("count(movieId)", "Number_OF_Watchings")
    
    return summary

def search_movies_by_year(year, movies):
    
    ##filter movies by year
    filtered_movies = movies.where("title like \'%(" + 
                                   str(year) + ")\'")
    return filtered_movies

def list_top_rated(ratings, movies):
    
    ## group the movies by movie id and aggregate their rating
    ## then sort them by rating from heighst to lowest
    top_rated = ratings\
    .groupBy("movieId")\
    .agg(avg(col("rating")))\
    .withColumnRenamed("avg(rating)", "Average_Rating")\
    .join(movies, on=['movieId'], how='inner')\
    .sort(desc("Average_Rating"))
    
    
    return top_rated


def list_top_watched(ratings, movies):
    
    ## group the movies by movie id and aggregate their watchings
    ## then sort them by watchings from heighst to lowest
    
    most_popular = ratings\
    .groupBy("movieId")\
    .agg(func.count("movieId"))\
    .withColumnRenamed("count(movieId)", "Number_OF_Watchings")\
    .join(movies, on=['movieId'], how='inner')\
    .sort(func.desc("Number_OF_Watchings"))
    
    return most_popular


## Intermediate Requirements:

def find_users_genres(user_id, movies_genres, ratings, movies):
    
    ##Search the movies that the user has wathced
    ##group them by the user and Genre and aggregate their 
    user_movies = search_user_by_id(user_id, ratings, movies)
    user_genres = user_movies.join(movies_genres, on=['movieId'], how='inner')
    user_genres = user_genres.groupBy("userId","Genr")\
    .agg(func.round(avg(col("rating")),2), func.count(col("user_id")))\
    .withColumnRenamed("round(avg(rating), 2)", "Average_Rating")\
    .withColumnRenamed("count(user_id)", "wachings")\
    .sort(desc("Average_Rating"))#.where("Average_Rating >=3 ")\
    
    return user_genres

def find_user_favourite_genres(user_id, movies_genres, ratings, movies):
  
    user_geners = find_users_genres(user_id, movies_genres, ratings, movies)
    user_geners.show(50)
    user_averages = user_geners.groupBy("userId")\
    .agg(avg(col("Average_Rating")), avg(col("wachings")))
    user_averages.show()    
    average_rating = user_averages.collect()[0]['avg(Average_Rating)']
    average_wachings = user_averages.collect()[0]['avg(wachings)']
    print("Average_Rating >= " + str(average_rating) + "  wachings >= " + str(average_wachings))
    
    user_fav = user_geners.where("Average_Rating >= " + str(average_rating))\
    .where("wachings >= " + str(average_wachings))
    
    return user_fav
    
def compareTastes(user1_Id, user2_Id, movies_genres, ratings, movies):
    u1 = find_users_genres(user1_Id, movies_genres, ratings, movies)
    u2 = find_users_genres(user2_Id, movies_genres, ratings, movies)
    u1 = u1.withColumnRenamed("wachings","User_" + str(user1_Id) +"_Watching_Times")\
    .withColumnRenamed("Average_Rating","User_" + str(user1_Id) +"_Rating")
    u2 = u2.withColumnRenamed("wachings","User_" + str(user2_Id) +"_Watching_Times")\
    .withColumnRenamed("Average_Rating","User_" + str(user2_Id) +"_Rating")    
    summary = u1.join(u2, on=['Genr'], how='inner')
    
    u1.select('Genr').subtract(u2.select('Genr')).join(u1, on=['Genr'], how='inner').show()
    u2.select('Genr').subtract(u1.select('Genr')).join(u2, on=['Genr'], how='inner').show()

    print("Comparing the Tastes of user:" +
         str(user1_Id) + " , and user:" + str(user2_Id) + " :" )
    
    return summary

## Advanced Requirements:


def visualize_the_dataset():
    
    return 0

def recommend_movies():
    
    ##Define ALS collaborative filtering model 
    ## Max iteration 10 with learning rate 0.2, specifing the 
    als_model = ALS(maxIter=10, regParam=0.2, userCol="user_id",
         itemCol='movieId', ratingCol= "rating", coldStartStrategy="drop")
    
    ##Split the data into training and testing data
    training_set, test_set = ratings.randomSplit([0.2,0.02])
    
    ##Train the model on the data
    als_trained_model = als_model.fit(training_set)
    
    ##Predict the test data set
    prediction = als_trained_model.transform(test)
   
    ##Evaluate the accuracy 
    evaluator = RegressionEvaluator(metricName="mse", 
                               labelCol = "rating", 
                               predictionCol="prediction")
    mse = evaluator.evaluate(prediction)
    print(alsModel.recommendForUserSubset([1,2,3]))
    print(mse)
    
    return 0

def enterId(message):
    
    try:
        user_id = int(input('Enter the ' + message + ' ID (>0):').strip())
        
        if user_id < 0:            
            raise Exception( message +   " ID should be larger than or equal to 1!")
            
        return user_id
    
    except ValueError:
            print("You should enter only a number larger than 0!")
            

def stripList(lis):
    strippedList = []
    for item in lis:
        strippedList.append(item.strip())
    return strippedList

            
def enterUsersIds():
    
    usersIdsString = input("Enter a list of users IDs separated by comma (,). Note: IDs should be larger than 0!:").strip()
    usersIdsListOfStrings = usersIdsString.split(",")
    usersIdsListOfIntegers = []
    for user_id in usersIdsListOfStrings:
        try:
            user_idInt = int(user_id.strip())
            usersIdsListOfIntegers.append(user_idInt)
        except ValueError:
            print("Some inputs are not appropriate!")           
    
    return usersIdsListOfIntegers

def enterGenres():
    genresStr = input("Enter Genres list separated by comma (,):").strip()
    genresList = genresStr.split(",")
    genresList = stripList(genresList)
    
    return genresList




def controlPanel():
    
    #Select the dataset
    dataPath = SMALL_DATASET_PATH
    
    #Load the datasets
    ratings, movies = load_data(dataPath)
    
    #Clean the dataset
    ratings, movies, movies_genres = clean_data(ratings, movies)
    
    #presist the data in memory
    movies.persist()
    ratings.persist()
    
    #Set the number of records to show
    N = 7
    
    #Interact with the dataset
    while True:
        print("Choices:")
        print("1- Search user | 2-Search Users | 3-Search Genre | 4-Search Genres")
        print("5- Search Movie By ID | 6-search Movie by title | 7- search movies by year")
        print("8-List top rating movies | 9- List top watching movies")
        print("10- Show User's favourite genres | 11- compare two users' taste | 12- Show visualizations")
        print("13- Recommend Movies for user | 14- Exit")
        
        try:
            choice = int(input('Enter your choice number from the list above:').strip())
            
            if choice == 1:
                user_id = enterId("user ")
                search_user_by_id(user_id, ratings, movies).show()

            elif choice == 2:
                usersIds = enterUsersIds()
                search_users_by_ids(usersIds, ratings, movies).show()

            elif choice == 3:
                genre = input("Please enter the genre name:").strip()
                search_genre(genre, movies).show()

            elif choice == 4:
                genres = enterGenres()
                print(genres)
                search_genres(genres, movies_genres, movies).show()

            elif choice == 5:
                movieId = enterId("Movie ")
                summarize_movie( search_movie_by_id(movieId, movies), ratings).show()

            elif choice == 6:
                title = input("Please enter the movie title:").strip()
                summarize_movie( search_movie_by_title(title, movies), ratings).show()

            elif choice == 7:
                year = input("Please enter a year between 1750 - 2018")
                search_movies_by_year(year, movies).show()

            elif choice == 8:
                n = int(input("please enter the number of top ratings to show:").strip())
                list_top_rated(ratings, movies).show(n)

            elif choice == 9:
                n = int(input("please enter the number of top watching movies to show:").strip())
                list_top_watched(ratings, movies).show(n)

            elif choice == 10:
                user_id = enterId("user ")
                find_user_favourite_genres(user_id, movies_genres, ratings, movies).show()

            elif choice == 11:
                user1_Id = enterId("First user ")
                user2_Id = enterId("Second user ")
                compareTastes(user1_Id, user2_Id, movies_genres, ratings, movies).show()
            elif choice == 12:
                genresSummary = summarize_genres(movies_genres, ratings)
                plt.bar(genresSummary.select("Genr").collect(), genresSummary.select())
            elif choice == 16:
                break
            
            
        
        
        except ValueError:
            print("You should enter only a number between 1 and 15!")



controlPanel()

Choices:
1- Search user | 2-Search Users | 3-Search Genre | 4-Search Genres
5- Search Movie By ID | 6-search Movie by title | 7- search movies by year
8-List top rating movies | 9- List top watching movies
10- Show User's favourite genres | 11- compare two users' taste | 12- Show visualizations
13- Recommend Movies for user | 14- Exit


Enter your choice number from the list above: 1
Enter the user  ID (>0): 1


+-------+------+------+---------+----------+--------------------+--------------------+---------------+
|movieId|userId|rating|timestamp|RatingDate|               title|              genres|production_year|
+-------+------+------+---------+----------+--------------------+--------------------+---------------+
|      1|     1|   4.0|964982703|2000-07-30|    Toy Story (1995)|Adventure|Animati...|           1995|
|      3|     1|   4.0|964981247|2000-07-30|Grumpier Old Men ...|      Comedy|Romance|           1995|
|      6|     1|   4.0|964982224|2000-07-30|         Heat (1995)|Action|Crime|Thri...|           1995|
|     47|     1|   5.0|964983815|2000-07-30|Seven (a.k.a. Se7...|    Mystery|Thriller|           1995|
|     50|     1|   5.0|964982931|2000-07-30|Usual Suspects, T...|Crime|Mystery|Thr...|           1995|
|     70|     1|   3.0|964982400|2000-07-30|From Dusk Till Da...|Action|Comedy|Hor...|           1996|
|    101|     1|   5.0|964980868|2000-07-30|Bottle Rocket (1996)|Adventur

Enter your choice number from the list above: 2
Enter a list of users IDs separated by comma (,). Note: IDs should be larger than 0!: 1


+-------+------+------+---------+----------+--------------------+--------------------+---------------+
|movieId|userId|rating|timestamp|RatingDate|               title|              genres|production_year|
+-------+------+------+---------+----------+--------------------+--------------------+---------------+
|      1|     1|   4.0|964982703|2000-07-30|    Toy Story (1995)|Adventure|Animati...|           1995|
|      3|     1|   4.0|964981247|2000-07-30|Grumpier Old Men ...|      Comedy|Romance|           1995|
|      6|     1|   4.0|964982224|2000-07-30|         Heat (1995)|Action|Crime|Thri...|           1995|
|     47|     1|   5.0|964983815|2000-07-30|Seven (a.k.a. Se7...|    Mystery|Thriller|           1995|
|     50|     1|   5.0|964982931|2000-07-30|Usual Suspects, T...|Crime|Mystery|Thr...|           1995|
|     70|     1|   3.0|964982400|2000-07-30|From Dusk Till Da...|Action|Comedy|Hor...|           1996|
|    101|     1|   5.0|964980868|2000-07-30|Bottle Rocket (1996)|Adventur

Enter your choice number from the list above: 4
Enter Genres list separated by comma (,): 1


['1']
+-------+-----+------+----+-----+------+---------------+
|movieId|title|genres|Genr|title|genres|production_year|
+-------+-----+------+----+-----+------+---------------+
+-------+-----+------+----+-----+------+---------------+

Choices:
1- Search user | 2-Search Users | 3-Search Genre | 4-Search Genres
5- Search Movie By ID | 6-search Movie by title | 7- search movies by year
8-List top rating movies | 9- List top watching movies
10- Show User's favourite genres | 11- compare two users' taste | 12- Show visualizations
13- Recommend Movies for user | 14- Exit


Enter your choice number from the list above: 4
Enter Genres list separated by comma (,): Action


['Action']
+-------+--------------------+--------------------+------+--------------------+--------------------+---------------+
|movieId|               title|              genres|  Genr|               title|              genres|production_year|
+-------+--------------------+--------------------+------+--------------------+--------------------+---------------+
|      6|         Heat (1995)|Action|Crime|Thri...|Action|         Heat (1995)|Action|Crime|Thri...|           1995|
|      9| Sudden Death (1995)|              Action|Action| Sudden Death (1995)|              Action|           1995|
|     10|    GoldenEye (1995)|Action|Adventure|...|Action|    GoldenEye (1995)|Action|Adventure|...|           1995|
|     15|Cutthroat Island ...|Action|Adventure|...|Action|Cutthroat Island ...|Action|Adventure|...|           1995|
|     20|  Money Train (1995)|Action|Comedy|Cri...|Action|  Money Train (1995)|Action|Comedy|Cri...|           1995|
|     23|    Assassins (1995)|Action|Crime|Thri...|Ac

KeyboardInterrupt: Interrupted by user