### Spark HW2 Moive Recommendation
In this notebook, we will use an Alternating Least Squares (ALS) algorithm with Spark APIs to predict the ratings for the movies in [MovieLens small dataset](https://grouplens.org/datasets/movielens/latest/)
The deadline is 03/10/2019

In [2]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math
from csv import reader
from pyspark.sql import Row 
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pandas as pd
import numpy as np
import seaborn as sb
import matplotlib.pyplot as plt
from ggplot import *
import warnings
from pyspark.sql.functions import col

In [3]:
import os
os.environ["PYSPARK_PYTHON"] = "python2"

## Part1: Data ETL and Data Exploration

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("moive analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [6]:
movies = spark.read.load("/FileStore/tables/movies.csv", format='csv', header = True)
ratings = spark.read.load("/FileStore/tables/ratings.csv", format='csv', header = True)
links = spark.read.load("/FileStore/tables/links.csv", format='csv', header = True)
tags = spark.read.load("/FileStore/tables/tags.csv", format='csv', header = True)

In [7]:
movies.show(5)

In [8]:
ratings.show(5)

In [9]:
tmp1 = ratings.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

In [10]:
tmp1 = sum(ratings.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

## Part 1: Spark SQL and OLAP

### Q1: The number of Users

In [13]:
ratings.createOrReplaceTempView("ratingtable")
uses_num=spark.sql("select count(distinct userID) from ratingtable")
uses_num.show()

### Q2: The number of Movies

In [15]:
movie_num=spark.sql("select count(distinct movieId) from ratingtable")
movie_num.show()

### Q3:  How many movies are rated by users? List movies not rated before

In [17]:
movies.createOrReplaceTempView("moviestable")
total_movies=spark.sql("select count(distinct movieId) from moviestable")
display(total_movies)

count(DISTINCT movieId)
9742


In [18]:
##list movies not rated before
not_listed=spark.sql("select distinct movieId, title, genres from moviestable a where not exists(select distinct movieId from ratingtable b where movieId = a.movieId)")
display(not_listed)

movieId,title,genres
4194,I Know Where I'm Going! (1945),Drama|Romance|War
1076,"Innocents, The (1961)",Drama|Horror|Thriller
30892,In the Realms of the Unreal (2004),Animation|Documentary
26085,Mutiny on the Bounty (1962),Adventure|Drama|Romance
5721,"Chosen, The (1981)",Drama
32160,Twentieth Century (1934),Comedy
2939,Niagara (1953),Drama|Thriller
25855,"Roaring Twenties, The (1939)",Crime|Drama|Thriller
32371,Call Northside 777 (1948),Crime|Drama|Film-Noir
6849,Scrooge (1970),Drama|Fantasy|Musical


### Q4: List Movie Genres

In [20]:
movie_genres=spark.sql("select distinct * from moviestable")
display(movie_genres)

movieId,title,genres
484,Lassie (1994),Adventure|Children
555,True Romance (1993),Crime|Thriller
762,Striptease (1996),Comedy|Crime
1125,"Return of the Pink Panther, The (1975)",Comedy|Crime
1328,"Amityville Curse, The (1990)",Horror
1757,Fallen Angels (Duo luo tian shi) (1995),Drama|Romance
1947,West Side Story (1961),Drama|Musical|Romance
2060,BASEketball (1998),Comedy
2338,I Still Know What You Did Last Summer (1998),Horror|Mystery|Thriller
2587,Life (1999),Comedy|Crime|Drama


In [21]:
import pyspark.sql.functions as f

df=movies.select(
        "movieId","title",
        f.split("genres", "\\|").alias("genres"),
        f.posexplode(f.split("genres", "\\|")).alias("pos", "val")
    )

df.createOrReplaceTempView("genrestable")

display(df)

movieId,title,genres,pos,val
1,Toy Story (1995),"List(Adventure, Animation, Children, Comedy, Fantasy)",0,Adventure
1,Toy Story (1995),"List(Adventure, Animation, Children, Comedy, Fantasy)",1,Animation
1,Toy Story (1995),"List(Adventure, Animation, Children, Comedy, Fantasy)",2,Children
1,Toy Story (1995),"List(Adventure, Animation, Children, Comedy, Fantasy)",3,Comedy
1,Toy Story (1995),"List(Adventure, Animation, Children, Comedy, Fantasy)",4,Fantasy
2,Jumanji (1995),"List(Adventure, Children, Fantasy)",0,Adventure
2,Jumanji (1995),"List(Adventure, Children, Fantasy)",1,Children
2,Jumanji (1995),"List(Adventure, Children, Fantasy)",2,Fantasy
3,Grumpier Old Men (1995),"List(Comedy, Romance)",0,Comedy
3,Grumpier Old Men (1995),"List(Comedy, Romance)",1,Romance


In [22]:
movie_genres=spark.sql("select distinct val from genrestable")
movie_genres.createOrReplaceTempView("movie_genres")
display(movie_genres)

val
Crime
Romance
Thriller
Adventure
Drama
War
Documentary
Fantasy
Mystery
Musical


### Q5: Movie for Each Category

In [24]:
movie_category=spark.sql("select a.val, a.movieId, a.title from genrestable a right join movie_genres b on b.val=a.val group by 1,2,3 order by 1 DESC")
display(movie_category)

val,movieId,title
Western,2921,High Plains Drifter (1973)
Western,7070,Red River (1948)
Western,32392,800 Bullets (800 Balas) (2002)
Western,4534,Return to Snowy River (a.k.a. The Man From Snowy River II) (1988)
Western,553,Tombstone (1993)
Western,54121,Broken Arrow (1950)
Western,55363,"Assassination of Jesse James by the Coward Robert Ford, The (2007)"
Western,5550,Love Me Tender (1956)
Western,4042,"Alamo, The (1960)"
Western,5699,Tom Horn (1980)


## Part2: Spark ALS based approach for training model
We will use an RDD-based API from [pyspark.mllib](https://spark.apache.org/docs/2.1.1/mllib-collaborative-filtering.html) to predict the ratings, so let's reload "ratings.csv" using ``sc.textFile`` and then convert it to the form of (user, item, rating) tuples.

### Please refer to Spark ML ALS model
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/4999972933037924/899848065201823/8135547933712821/latest.html

In [26]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [27]:
movie_rating = sc.textFile("/FileStore/tables/ratings.csv")

In [28]:
header = movie_rating.take(1)[0]
rating_data = movie_rating.filter(lambda line: line!=header).map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()
ratings = spark.createDataFrame(rating_data,['userId','movieId','rating'])

from pyspark.sql.types import IntegerType
rating_table1 = ratings.withColumn("userId", ratings["userId"].cast(IntegerType()))
rating_table2 = rating_table1.withColumn("movieId", rating_table1["movieId"].cast(IntegerType()))
rating_table3 = rating_table2.withColumn("rating", rating_table2["rating"].cast(IntegerType()))

##we need trandfer the rdd data to DF and then data type string to int.

In [29]:
# check three rows
rating_table3.take(3)

Now we split the data into training/validation/testing sets using a 6/2/2 ratio.

In [31]:
train, validation, test = rating_table3.randomSplit([0.6,0.2,0.2])

In [32]:
train.cache()
display(train.take(5))


userId,movieId,rating
1,1,4
1,3,4
1,6,4
1,47,5
1,50,5


In [33]:
validation.cache()

In [34]:
test.cache()

### ALS Model Selection and Evaluation

With the ALS model, we can use a grid search to find the optimal hyperparameters.

In [36]:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(train)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)


In [37]:
def train_ALS(train_data, validation_data, num_iters, reg_param, ranks):
    min_error = float('inf')
    best_rank = -1
    best_regularization = 0
    best_model = None
    for rank in ranks:
        for reg in reg_param:
            # write your approach to train ALS model
            # make prediction
            # get the rating result
            # get the RMSE
            als = ALS(maxIter=num_iters, regParam=reg, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")
            model = als.fit(train_data)

            # Evaluate the model by computing the RMSE on the test data
            predictions = model.transform(validation_data)

            evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                            predictionCol="prediction")
            error = evaluator.evaluate(predictions)
            print ('{} latent factors and regularization = {}: validation RMSE is {}'.format(rank, reg, error))
            if error < min_error:
                min_error = error
                best_rank = rank
                best_regularization = reg
                best_model = model
    print ('\nThe best model has {} latent factors and regularization = {}'.format(best_rank, best_regularization))
    return best_model

In [38]:
num_iterations = 10
ranks = [6, 8, 10, 12]
reg_params = [0.05, 0.2,0.4,0.6,0.8]


import time
start_time = time.time()
final_model = train_ALS(train, validation, num_iterations, reg_params, ranks)

print ('Total Runtime: {:.2f} seconds'.format(time.time() - start_time))

### Model testing on the test data
And finally, wite your code to make a prediction and check the testing error.

In [40]:
num_iterations = 10
ranks = 6
reg_params = 0.2

als = ALS(maxIter=5, regParam=0.2, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(test)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))