<a href="https://colab.research.google.com/github/ale1995co/Final-Capstone/blob/master/ALS_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Install Java, Spark, Findspark and PySpark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

!pip install -q findspark
!pip install pyspark

# mount Google Drive
from google.colab import drive
drive.mount('/content/gdrive')

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
[K     |████████████████████████████████| 217.8MB 59kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 47.2MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.5-py2.py3-none-any.whl size=218257927 sha256=860cc2e90b90af1144f360e2eab29fd8e4bd9c20051d38ea9118a34bb82584f2
  Stored in directory: /root/.cache/pip/wheels/bf/db/04/61d66a5939364e756eb1c1be4ec5bdce6e04047fc7929a3c3c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.5
Go t

In [0]:
import os
import time

# spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction, explode, desc
from pyspark.sql.types import StringType, ArrayType
from pyspark.mllib.recommendation import ALS
from pyspark.sql import Row,SQLContext, SparkSession

# data science imports
import math
import numpy as np
import pandas as pd

# visualization imports
import seaborn as sns
import matplotlib.pyplot as plt

%matplotlib inline

In [0]:
USERS_PATH = "/content/gdrive/My Drive/Colab Notebooks/Colab Datasets/Final Capstone/users_cleaned.csv" 
ANIME_PATH = "/content/gdrive/My Drive/Colab Notebooks/Colab Datasets/Final Capstone/anime_cleaned.csv"
SCORES_PATH = "/content/gdrive/My Drive/Colab Notebooks/Colab Datasets/Final Capstone/animelists_cleaned.csv"
APP_NAME = "Anime Recommender"
SPARK_URL = "local[*]"
spark = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).getOrCreate()
sc = spark.sparkContext

# Context 

The csvs that are use for pyspark are the clean ones. The creator of the dataset omitted rows that has rows the best of his ability. I tried running the filtered ones but it was giving me errors through pyspark.

# Load Data

In [0]:
sqlContext = SQLContext(sc)
usersdf = spark.read.load(os.path.join(USERS_PATH), format='csv', header=True, inferSchema=True)
animedf = spark.read.load(os.path.join(ANIME_PATH), format='csv', header=True, inferSchema=True)
scoresdf =spark.read.load(os.path.join(SCORES_PATH), format='csv', header=True, inferSchema=True)

In [5]:
usersdf.show(5)

+--------------+-------+-------------+--------------+-----------+------------+----------------+------------------------+------+-------------------+-------------------+-----------+-------------------+-------------------+----------------+---------------+--------------+
|      username|user_id|user_watching|user_completed|user_onhold|user_dropped|user_plantowatch|user_days_spent_watching|gender|           location|         birth_date|access_rank|          join_date|        last_online|stats_mean_score|stats_rewatched|stats_episodes|
+--------------+-------+-------------+--------------+-----------+------------+----------------+------------------------+------+-------------------+-------------------+-----------+-------------------+-------------------+----------------+---------------+--------------+
|      karthiga|2255153|            3|            49|          1|           0|               0|       55.09166666666667|Female|    Chennai, India |1990-04-29 00:00:00|       null|2013-03-03 00:00:

In [6]:
animedf.show(5)

+--------+-------------------+--------------------+----------------------+--------------------+--------------------+----+--------+--------+---------------+------+--------------------+--------------------+---------------+--------------------+-----+---------+------+----------+--------+---------+--------------------+-----------+------------------+--------------------+--------------------+----------------+----------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|anime_id|              title|       title_english|        title_japanese|      title_synonyms|           image_url|type|  source|episodes|         status|airing|        aired_string|               aired|       duration|              rating|score|scored_by|  rank|popularity| members|favorites|          background|  premiered|         broadcast|             related|            producer|        licensor|          studio|               genre|       opening_theme|        

In [7]:
scoresdf.show(5)

+--------+--------+-------------------+-------------+--------------+--------+---------+-------------+----------------+-------------------+-------+
|username|anime_id|my_watched_episodes|my_start_date|my_finish_date|my_score|my_status|my_rewatching|my_rewatching_ep|    my_last_updated|my_tags|
+--------+--------+-------------------+-------------+--------------+--------+---------+-------------+----------------+-------------------+-------+
|karthiga|      21|                586|   0000-00-00|    0000-00-00|       9|        1|         null|               0|2013-03-03 10:52:53|   null|
|karthiga|      59|                 26|   0000-00-00|    0000-00-00|       7|        2|         null|               0|2013-03-10 13:54:51|   null|
|karthiga|      74|                 26|   0000-00-00|    0000-00-00|       7|        2|         null|               0|2013-04-27 16:43:35|   null|
|karthiga|     120|                 26|   0000-00-00|    0000-00-00|       7|        2|         null|               0|

In [8]:
scoresdf.dtypes

[('username', 'string'),
 ('anime_id', 'string'),
 ('my_watched_episodes', 'string'),
 ('my_start_date', 'string'),
 ('my_finish_date', 'string'),
 ('my_score', 'string'),
 ('my_status', 'string'),
 ('my_rewatching', 'string'),
 ('my_rewatching_ep', 'string'),
 ('my_last_updated', 'string'),
 ('my_tags', 'string')]

In [9]:
scoresdf = scoresdf.drop("my_watched_episodes", "my_start_date", "my_finish_date", "my_rewatching", 
                         "my_last_updated", "my_tags", "my_status", "my_rewatching_ep" )
scoresdf.show(5)

+--------+--------+--------+
|username|anime_id|my_score|
+--------+--------+--------+
|karthiga|      21|       9|
|karthiga|      59|       7|
|karthiga|      74|       7|
|karthiga|     120|       7|
|karthiga|     178|       7|
+--------+--------+--------+
only showing top 5 rows



The ALS model only use three features. Breaking this dataframe to down organize it.

## Questions I like to answer using pyspark tools:

What are the scores?
What is the total number of users in the data sets?
What is the total number of anime in the data sets?
How many anime are rated by users? List how many anime not rated yet?

In [10]:
scoresdf.dtypes

[('username', 'string'), ('anime_id', 'string'), ('my_score', 'string')]

In [0]:
from pyspark.sql.types import IntegerType

In [0]:
scoresdf = scoresdf.withColumn("my_score", scoresdf["my_score"].cast(IntegerType()))

Changing the my_score to integers

What are the scores?

In [13]:
scoresdf.groupby("my_score")\
        .count()\
        .show()

+--------+--------+
|my_score|   count|
+--------+--------+
|    null|  100962|
|       1|  103177|
|       6| 2128502|
|       3|  223202|
|       5| 1085660|
|       9| 3443674|
|      17|       1|
|       4|  480871|
|       8| 4834595|
|       7| 4234726|
|      10| 2507404|
|       2|  130314|
|    2017|       1|
|       0|12111905|
+--------+--------+



In [14]:
# null, 17, 0, 2017 doesn't exist in the rating system. 
scoresdf = scoresdf.na.drop()
scoresdf = scoresdf.filter((scoresdf["my_score"] != 0) & (scoresdf["my_score"] != 2017) & (scoresdf["my_score"] != 17 ))
scoresdf.groupby("my_score")\
        .count()\
        .show()

+--------+-------+
|my_score|  count|
+--------+-------+
|       1| 103177|
|       6|2128502|
|       3| 223202|
|       5|1085660|
|       9|3443674|
|       4| 480871|
|       8|4834595|
|       7|4234726|
|      10|2507404|
|       2| 130314|
+--------+-------+



What is the total number of users in the data sets?

In [15]:
tmp = scoresdf.select('username').distinct().count()
print('We have a total of {} distinct users in the data sets'.format(tmp))

We have a total of 106402 distinct users in the data sets


How many anime are rated by users? List how many anime not rated yet?

In [16]:
tmp1 = animedf.select('anime_id').distinct().count()
tmp2 = scoresdf.select('anime_id').distinct().count()
print('We have a total of {} distinct anime that are rated by users in ratings table'.format(tmp2))
print('We have {} anime that are not rated yet'.format(tmp1-tmp2))

We have a total of 6598 distinct anime that are rated by users in ratings table
We have 70 anime that are not rated yet


In [17]:
animedf.createOrReplaceTempView("animedf")
scoresdf.createOrReplaceTempView("scoresdf")
print('List movies that are not rated yet: ')
# SQL query (NOTE: WHERE ... NOT IN ... == ... LEFT JOIN ... WHERE ... IS NULL)
# Approach 1
spark.sql(
    "SELECT anime_id, title "
    "FROM animedf "
    "WHERE anime_id NOT IN (SELECT distinct(anime_id) FROM scoresdf)"
).show(10)

List movies that are not rated yet: 
+--------+--------------------+
|anime_id|               title|
+--------+--------------------+
|    7639|Shounen Santa no ...|
|   33310|Peach Command Shi...|
|   28119|Kuma no Minakuro ...|
|   33484|          Shiroi Zou|
|   31020|Norasco: Cinema P...|
|   34091|    Norasco Specials|
|   32663|Tama & Friends: U...|
|   37187| Kuiba Yao Xia Zhuan|
|   20001|     Mouretsu Atarou|
|   23537|  Two Down Full Base|
+--------+--------------------+
only showing top 10 rows



Converting the two dataframes of user-id, anime_id, scores

In [18]:
left_join = usersdf.join(scoresdf, usersdf.username == scoresdf.username,how='left') # Could also use 'left_outer'
left_join.show()

+--------+-------+-------------+--------------+-----------+------------+----------------+------------------------+------+--------------------+-------------------+-----------+-------------------+-------------------+----------------+---------------+--------------+--------+--------+--------+
|username|user_id|user_watching|user_completed|user_onhold|user_dropped|user_plantowatch|user_days_spent_watching|gender|            location|         birth_date|access_rank|          join_date|        last_online|stats_mean_score|stats_rewatched|stats_episodes|username|anime_id|my_score|
+--------+-------+-------------+--------------+-----------+------------+----------------+------------------------+------+--------------------+-------------------+-----------+-------------------+-------------------+----------------+---------------+--------------+--------+--------+--------+
| -Himeko|2156961|           13|           164|          0|           0|               0|       34.59513888888889|Female|Buenos Ai

In [0]:
left_join= left_join.select("user_id", "my_score", "anime_id" )
#Only need three features for the ALS model. 

In [20]:
left_join.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- my_score: integer (nullable = true)
 |-- anime_id: string (nullable = true)



# ALS Model

ALS is one of the low rank matrix approximation algorithms for collaborative filtering. ALS decomposes user-item matrix into two low rank matrixes: user matrix and item matrix. In collaborative filtering, users and products are described by a small set of latent factors that can be used to predict missing entries. And ALS algorithm learns these latent factors by matrix factorization

In [0]:
left_join = left_join.withColumn("anime_id", scoresdf["anime_id"].cast(IntegerType()))

In [22]:
left_join.printSchema

<bound method DataFrame.printSchema of DataFrame[user_id: int, my_score: int, anime_id: int]>

In [0]:
#Split dataset to train and test
train_data, test_data = left_join.randomSplit([0.8, 0.2])

In [0]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [25]:
left_join = left_join.na.drop()
#nulls will crash the ALS model
left_join.describe()

DataFrame[summary: string, user_id: string, my_score: string, anime_id: string]

In [0]:
model = ALS(userCol='user_id', itemCol='anime_id', ratingCol='my_score').fit(left_join)

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator



In [0]:
(trainingRatings, testRatings) = left_join.randomSplit([80.0, 20.0])


In [0]:
als = ALS(userCol='user_id', itemCol='anime_id', ratingCol='my_score')
model = als.fit(trainingRatings)
predictions = model.transform(testRatings)

In [31]:
predictions.toPandas().head()

Unnamed: 0,user_id,my_score,anime_id,prediction
0,252259,7,148,4.653338
1,285987,7,148,6.841636
2,26111,6,148,5.92171
3,91084,6,148,6.213613
4,58066,8,148,6.544576


In [32]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='my_score', predictionCol='prediction')
print('The root mean squared error for our model is: {}'.format(evaluator.evaluate(predictions)))

The root mean squared error for our model is: nan


nan happens when the model can't predict value that they don't have data on.

Replace predicted NaN values with the average rating and evaluate the model:

In [33]:
avgRatings = left_join.select('my_score').groupBy().avg().first()[0]
print ('The average rating in the dataset is: {}'.format(avgRatings))

evaluator = RegressionEvaluator(metricName='rmse', labelCol='my_score', predictionCol='prediction')
print ('The root mean squared error for our model is: {}'.format(evaluator.evaluate(predictions.na.fill(avgRatings))))

The average rating in the dataset is: 7.59139401605195
The root mean squared error for our model is: 1.2301980583899745


Now exclude predicted NaN values and evaluate the model:

In [34]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='my_score', predictionCol='prediction')
print ('The root mean squared error for our model is: {}'.format(evaluator.evaluate(predictions.na.drop())))

The root mean squared error for our model is: 1.2300982176995245


Now we can run the model to test the recommendations.

In [0]:
from pyspark.sql.functions import lit

def recommendMovies(model, user, nbRecommendations):
    # Create a Spark DataFrame with the specified user and all the anime listed in the ratings DataFrame
    dataSet = left_join.select('anime_id').distinct().withColumn('user_id', lit(user))

    # Create a Spark DataFrame with the anime that have already been rated by this user
    moviesAlreadyRated = left_join.filter(left_join.user_id == user).select('anime_id', 'user_id')

    # Apply the recommender system to the data set without the already rated anime to predict ratings
    predictions = model.transform(dataSet.subtract(moviesAlreadyRated)).dropna().orderBy('prediction', ascending=False).limit(nbRecommendations).select('anime_id', 'prediction')

    # Join with the movies DataFrame to get the anime titles and genres
    recommendations = predictions.join(animedf, predictions.anime_id == animedf.anime_id).select(predictions.anime_id, animedf.title, animedf.genre, predictions.prediction, animedf.premiered)

#     recommendations.show(truncate=False)
    return recommendations

In [36]:
print('Recommendations for user 3924291:')
recommendMovies(model, 3924291, 10).toPandas()

Recommendations for user 3924291:


Unnamed: 0,anime_id,title,genre,prediction,premiered
0,35478,Neko no Robu,"Comedy, Slice of Life",9.476129,Spring 2017
1,32281,Kimi no Na wa.,"Supernatural, Drama, Romance, School",9.002776,
2,28977,Gintama°,"Action, Comedy, Historical, Parody, Samurai, S...",9.162655,Spring 2015
3,15417,Gintama&#039;: Enchousen,"Action, Comedy, Historical, Parody, Samurai, S...",8.993153,Fall 2012
4,30484,Steins;Gate 0,"Sci-Fi, Thriller",9.004238,Spring 2018
5,28851,Koe no Katachi,"Drama, School, Shounen",8.849479,
6,9969,Gintama&#039;,"Action, Sci-Fi, Comedy, Historical, Parody, Sa...",9.060499,Spring 2011
7,34096,Gintama.,"Action, Comedy, Historical, Parody, Samurai, S...",8.96278,Winter 2017
8,263,Hajime no Ippo,"Comedy, Sports, Drama, Shounen",8.846721,Fall 2000
9,15335,Gintama Movie 2: Kanketsu-hen - Yorozuya yo Ei...,"Action, Sci-Fi, Comedy, Historical, Parody, Sa...",8.966371,


In [37]:
print('Recommendations for user 303577:')
recommendMovies(model, 303577, 10).toPandas()

Recommendations for user 303577:


Unnamed: 0,anime_id,title,genre,prediction,premiered
0,35478,Neko no Robu,"Comedy, Slice of Life",10.540213,Spring 2017
1,28977,Gintama°,"Action, Comedy, Historical, Parody, Samurai, S...",9.528095,Spring 2015
2,3784,Evangelion: 2.0 You Can (Not) Advance,"Action, Sci-Fi, Mecha",9.567251,
3,5114,Fullmetal Alchemist: Brotherhood,"Action, Military, Adventure, Comedy, Drama, Ma...",9.647413,Spring 2009
4,36732,Qin Shi Ming Yue: Tian Xing Jiu Ge,"Action, Historical, Martial Arts, Fantasy",9.684301,
5,9253,Steins;Gate,"Thriller, Sci-Fi",9.560666,Spring 2011
6,33473,Tokyo Futago Athletic,"Comedy, Sports",10.158769,Winter 2017
7,15417,Gintama&#039;: Enchousen,"Action, Comedy, Historical, Parody, Samurai, S...",9.536556,Fall 2012
8,35385,Yukai na Animal Bus 2nd Season,"Comedy, Kids",10.179564,Summer 2017
9,9969,Gintama&#039;,"Action, Sci-Fi, Comedy, Historical, Parody, Sa...",9.654662,Spring 2011


The ALS algorthim shows the top ten anime recommendation. When comparing the results from the baseline, the anime varied but both has the same anime with each other. The ALS leans toward anime that are not known but includes popular anime also. While baseline shows mostly popular anime. The RMSE for ALS is 1.22568. While Baselineonly is 1.289960.  

# Conclusion

If I have to use one recommendation system it would be the ALS because it accounts for all data and not samples like the baseline. Also it has a lower RSME than baseline This recomendation system would be great for the website to help recommend the users what anime to watch. Since the system cater towards people who are familar with anime. I can see it run similar like netflix or spotify recommendation system . To go further is to use all the data for the model for baseline model and knn to use it for the full potential.  