# Anime Recommended System

## Team member
  - Nguyễn Quốc Bảo - 19133002
  - Võ Hoàng Khả Diệu - 19133014
  
  This notebook explains how to use the [Anime Datasets]() to build a movie recommender using [collaborative filtering](https://en.wikipedia.org/wiki/Recommender_system#Collaborative_filtering) with [Spark's Alternating Least Saqures](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.recommendation.ALS.html) implementation. It is organised in two parts. The first one is about getting and parsing movies and ratings data into Spark RDDs. The second is about building and using the recommender and persisting it for later use in our on-line recommender system. 

## Getting and processing the data

In [98]:
import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, RegexTokenizer
from pyspark.sql import Row
from pyspark.sql.functions import col,isnan, when, count


In [99]:
# Khởi tạo một sparkSession
spark = SparkSession.builder \
    .appName("ALS recommendation spark session") \
    .config("spark.driver.memory", "16g") \
    .config("spark.executor.memory", "25g") \
    .config('spark.cores.max', '16') \
    .enableHiveSupport() \
    .getOrCreate()
#    .master('spark://192.168.1.171:7077') \
#    .config("spark.driver.host", "192.168.1.171") \
#    .config("spark.driver.port", "10027") \
#    .config("spark.submit.deployMode", "cluster") \
#    .config("spark.driver.bindAddress", "0.0.0.0") \
#    .config("spark.dynamicAllocation.enabled", False) \

In [100]:
sc = spark.sparkContext
sqlContext = SQLContext(sc)
import os

In [4]:
datasets_path = os.path.join('../data')
rating_file_path = os.path.join(datasets_path, 'rating.csv')
rating_raw_RDD = sc.textFile(rating_file_path)
anime_file_path = os.path.join(datasets_path, 'anime.csv')
anime_raw_RDD = sc.textFile(anime_file_path)

In [7]:
rating_data_raw_header = rating_raw_RDD.take(1)[0]
anime_data_raw_header = anime_raw_RDD.take(1)[0]

In [119]:
rating_RDD = rating_raw_RDD.filter(lambda line: line!=rating_data_raw_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]), int(tokens[1]), int(tokens[2]))).cache()#
anime_title_RDD = anime_raw_RDD.filter(lambda line: line!=anime_data_raw_header)\
    .map(lambda line: line.split(",")).map(lambda x: (int(x[0]), x[1])).cache()
anime_genre_tf_idf_rdd = anime_raw_RDD.filter(lambda line: line != anime_data_raw_header) \
    .map(lambda line: line.split(",")).map(lambda x: (int(x[0]), x[1], x[2])).cache()

In [13]:
rating_RDD.take(5)

[(1, 20, -1), (1, 24, -1), (1, 79, -1), (1, 226, -1), (1, 241, -1)]

In [14]:
anime_title_RDD.take(5)

[(32281, 'Kimi no Na wa.'),
 (5114, 'Fullmetal Alchemist: Brotherhood'),
 (28977, 'Gintama°'),
 (9253, 'Steins;Gate'),
 (9969, 'Gintama&#039;')]

In [15]:
def change_rating(rating):
    if(rating == 6):
        return 1
    if(rating == 7):
        return 2
    if(rating == 8):
        return 3
    if(rating == 9):
        return 4
    if(rating == 10):
        return 5

In [16]:
#Test rating from 1 to 10
#rating_RDD_data = rating_RDD.filter(lambda line: line!=rating_data_raw_header)\
#    .filter(lambda x: x[2] != -1)

In [17]:
#Test rating from 6 to 10
rating_RDD_data = rating_RDD.filter(lambda line: line!=rating_data_raw_header)\
    .filter(lambda x: x[2] != -1 and x[2] != 1 and x[2] != 2 and x[2] != 3 and x[2] != 4 and x[2] != 5)\
    .map(lambda x: (int(x[0]), int(x[1]), int(change_rating(x[2]))))

In [22]:
rating_RDD_data.take(10)

[(1, 8074, 5),
 (1, 11617, 5),
 (1, 11757, 5),
 (1, 15451, 5),
 (2, 11771, 5),
 (3, 20, 3),
 (3, 154, 1),
 (3, 170, 4),
 (3, 199, 5),
 (3, 225, 4)]

In [23]:
rating_RDD_data.count()

5868892

In [66]:
#create dataframe from rdd
ratings_df = spark.createDataFrame(data = rating_RDD_data, schema = ["user_id", "anime_id", "rating"])

In [67]:
ratings_df.show(5)

+-------+--------+------+
|user_id|anime_id|rating|
+-------+--------+------+
|      1|    8074|     5|
|      1|   11617|     5|
|      1|   11757|     5|
|      1|   15451|     5|
|      2|   11771|     5|
+-------+--------+------+
only showing top 5 rows



## Building and using the Recommended system

### Content-base Filtering

### Collaborative Filtering - ALS

#### Building and training the model

In [26]:
# Chia tập train & tập test theo tỉ lệ 8 : 2
(training, testing) = ratings_df.randomSplit([0.8, 0.2])


In [27]:
# Xây dựng mô hình recommendation sử dụng thuật toán ALS trên tập dữ liệu huấn luyện
from datetime import datetime
start_time = datetime.now()
als = ALS(maxIter=10, regParam=0.1, userCol="user_id", itemCol="anime_id", ratingCol="rating")
model = als.fit(training)
end_time = datetime.now()

print('Execute time {}'.format(end_time - start_time))

Execute time 0:01:03.786310


In [28]:
# Test
model.setColdStartStrategy("drop");
predictions = model.transform(testing)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                               predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.9385759774524401


#### Using the model for recommended system

In [58]:
#Recommended: top 10 movies for a user
user_subset = ratings_df.where(ratings_df.user_id == 215)
user_subset_recs = model.recommendForUserSubset(user_subset, 10)
list_user_predictions = list(user_subset_recs.select('recommendations').toPandas()['recommendations'])
user_prediction_rdd = sc.parallelize(list_user_predictions[0])

In [61]:
#Join name of Anime into a new list
list_complete_user_prediction = user_prediction_rdd.join(anime_title_RDD) \
    .map(lambda x: (x[0], x[1][1], x[1][0])) \
    .takeOrdered(10, key=lambda x: -x[2])

In [71]:
# Change list to dataframe
user_subset_recs_columns = ["anime_id","name", "rating"]
user_subset_recs_DF = spark.createDataFrame(data=list_complete_user_prediction, schema = user_subset_recs_columns)

In [125]:
# Show output
print("Top 10 anime recommended for userID is " + str(user_subset.collect()[0][0]))
print(user_subset_recs_DF.show(10))

Top 10 anime recommended for userID is 215
+--------+--------------------+-----------------+
|anime_id|                name|           rating|
+--------+--------------------+-----------------+
|    7416|              Socket|6.557953357696533|
|   32400|           KochinPa!|6.261995792388916|
|   29978|                 001|6.244216442108154|
|   29995|The Embryo Develo...|6.110090732574463|
|    7485|      Urashima Tarou|6.042207717895508|
|   22059|Kakumeiteki Broad...|5.638498306274414|
|   22615|Kero Kero Keroppi...|5.526596546173096|
|   22445|Hello Kitty no Ya...|5.526596546173096|
|   17985|Kero Kero Keroppi...|5.526596546173096|
|   22607|Ahiru no Pekkle n...|5.526596546173096|
+--------+--------------------+-----------------+

None


#### Persisting the model

In [122]:
model_path = os.path.join('model') + "/als_model"
model.save(model_path)