In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 44 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 88.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=74b0210f39434d722faa8802be968e580a9d031669168340ea3308cf3c4829d8
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [2]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [3]:
import pandas as pd
import numpy as np
import re
import warnings
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, udf, desc
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.simplefilter("ignore")
%matplotlib inline

In [4]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import IntegerType

In [5]:
conf = SparkConf().set("spark.ui.port", "4050")


# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [6]:
spark

In [9]:
anime = spark.read.csv("/content/drive/MyDrive/Project Rector/anime.csv", header = True, inferSchema = True)
rating = spark.read.csv("/content/drive/MyDrive/Project Rector/rating.csv",  header = True, inferSchema = True) 

Q1. Identify and describe the number of columns in the two dataset files.

In [10]:
anime.printSchema()

root
 |-- anime_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- type: string (nullable = true)
 |-- episodes: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- members: integer (nullable = true)



In [11]:
anime.describe().show()

+-------+------------------+-----------------+------+-----+------------------+-----------------+-----------------+
|summary|          anime_id|             name| genre| type|          episodes|           rating|          members|
+-------+------------------+-----------------+------+-----+------------------+-----------------+-----------------+
|  count|             12294|            12294| 12232|12269|             12294|            12064|            12294|
|   mean|14058.221652838783|         166778.5|  null| null|12.382549774134182|6.473901690981445|18071.33886448674|
| stddev|11455.294700988177|330891.6746051493|  null| null| 46.86535196440979|1.026746306898068|54820.67692490701|
|    min|                 1|    &quot;0&quot;|Action|Movie|                 1|             1.67|                5|
|    max|             34527|                ◯|  Yaoi|   TV|           Unknown|             10.0|          1013917|
+-------+------------------+-----------------+------+-----+------------------+--

In [12]:
rating.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- anime_id: integer (nullable = true)
 |-- rating: integer (nullable = true)



In [13]:
rating.describe().show()

+-------+------------------+-----------------+------------------+
|summary|           user_id|         anime_id|            rating|
+-------+------------------+-----------------+------------------+
|  count|           7813737|          7813737|           7813737|
|   mean|36727.956744640884|8909.072104295294| 6.144029546937656|
| stddev|20997.946118973723| 8883.94963588107|3.7278004201098067|
|    min|                 1|                1|                -1|
|    max|             73516|            34519|                10|
+-------+------------------+-----------------+------------------+



Q2. Merge/Join/Combine the two datasets and identify the key common column that you would have performed?

In [14]:
#From reading the docs I know that ratings with a value of -1 means a user has watched the movie but hasn’t rated it. I’m making the assumption this gives us no useful information and remove those records before merging
ratings = rating.filter(rating['rating'] != -1)

In [15]:
anime = anime.withColumnRenamed('name', 'anime_title')
ratings = ratings.withColumnRenamed('rating', 'user_rating')
anime_fulldata = anime.join(ratings, ["anime_id"], how = "inner")
anime_fulldata.show()

+--------+--------------------+--------------------+-----+--------+------+-------+-------+-----------+
|anime_id|         anime_title|               genre| type|episodes|rating|members|user_id|user_rating|
+--------+--------------------+--------------------+-----+--------+------+-------+-------+-----------+
|    8074|Highschool of the...|Action, Ecchi, Ho...|   TV|      12|  7.46| 535892|      1|         10|
|   11617|     High School DxD|Comedy, Demons, E...|   TV|      12|   7.7| 398660|      1|         10|
|   11757|    Sword Art Online|Action, Adventure...|   TV|      25|  7.83| 893100|      1|         10|
|   15451| High School DxD New|Action, Comedy, D...|   TV|      12|  7.87| 266657|      1|         10|
|   11771|    Kuroko no Basket|Comedy, School, S...|   TV|      25|  8.46| 338315|      2|         10|
|      20|              Naruto|Action, Comedy, M...|   TV|     220|  7.81| 683297|      3|          8|
|     154|         Shaman King|Action, Adventure...|   TV|      64|  7.83

>*We got many symbols found in anime_title. Let’s remove those using the function below so we can get the title cleaned and neat.*

In [16]:
def text_cleaning(text):
    text = re.sub(r'&quot;', '', text)
    text = re.sub(r'.hack//', '', text)
    text = re.sub(r'&#039;', '', text)
    text = re.sub(r'A&#039;s', '', text)
    text = re.sub(r'I&#039;', 'I\'', text)
    text = re.sub(r'&amp;', 'and', text)
    
    return text

text_cleaningUDF = udf(lambda x:text_cleaning(x),StringType())   

anime_fulldata = anime_fulldata.withColumn("anime_title", text_cleaningUDF(col("anime_title")))

Q3. Find the top 10 anime based on user rating. Use tabular/graphical presentation to provide evidence of your analysis.

In [17]:
#dropping all the rows with any null value as it aids nothing
anime_fulldata = anime_fulldata.dropna(how ='any') 
newdf = anime_fulldata.groupby('anime_id', 'anime_title').agg({'rating': 'avg', 'user_rating': 'count'}).orderBy(desc('count(user_rating)')).show(10)

+--------+--------------------+-----------------+------------------+
|anime_id|         anime_title|      avg(rating)|count(user_rating)|
+--------+--------------------+-----------------+------------------+
|    1535|          Death Note|8.710000000001498|             34226|
|   11757|    Sword Art Online|7.830000000001458|             26310|
|   16498|  Shingeki no Kyojin|8.539999999997944|             25290|
|    1575|Code Geass: Hangy...|8.830000000001466|             24126|
|    6547|        Angel Beats!|8.389999999999478|             23565|
|     226|          Elfen Lied|7.850000000000961|             23528|
|      20|              Naruto|7.809999999998774|             22071|
|    5114|Fullmetal Alchemi...| 9.25999999999865|             21494|
|     121| Fullmetal Alchemist|8.330000000001412|             21332|
|    2904|Code Geass: Hangy...|8.979999999999716|             21124|
+--------+--------------------+-----------------+------------------+
only showing top 10 rows



Q4.	Find the top 10 genre based on user rating. Use tabular/graphical presentation to provide evidence of your analysis.

In [18]:
genredf = anime_fulldata.groupby('genre').agg({'rating': 'avg', 'user_rating': 'count'}).orderBy(desc('count(user_rating)')).show(10)

+--------------------+-----------------+------------------+
|               genre|      avg(rating)|count(user_rating)|
+--------------------+-----------------+------------------+
|Comedy, School, S...| 7.61611594784337|             49850|
|              Hentai|6.647734707647943|             42979|
|Comedy, Slice of ...|7.546397326004867|             42932|
|Action, Adventure...|7.611947468055045|             42260|
|Comedy, Romance, ...|8.034248223204006|             41648|
|              Comedy|6.873754236443347|             40128|
|Comedy, Seinen, S...|7.436532421412579|             38015|
|Mystery, Police, ...|8.639758809626597|             37232|
|Comedy, School, S...|7.828225356688424|             36937|
|Action, Mecha, Sc...|7.819512518327197|             35468|
+--------------------+-----------------+------------------+
only showing top 10 rows



Q5. Design a collaborative filter-based recommendation system.

>*There are users who has rated only once, even if they have rated it 5, it can’t be considered a valuable record for recommendation. So I have considered minimum 200 ratings by the user as threshold value.*

In [19]:
count_df = anime_fulldata.groupBy("user_id").count()
join_df = anime_fulldata.join(count_df, ["user_id"], how = "inner")
new_animedf = join_df.filter(join_df["count"] >= 200)

In [20]:
anime_fulldata_reco = new_animedf.select("user_id", "anime_id", "user_rating", "anime_title")

In [21]:
#Create test and train set
(train, test) = anime_fulldata_reco.randomSplit([0.8, 0.2])

In [22]:
#Create ALS model
als = ALS(maxIter = 5, regParam = 0.17, userCol = "user_id", itemCol = "anime_id", ratingCol = "user_rating", 
          coldStartStrategy = "drop", nonnegative = True)

In [23]:
#Fit ALS model to training data
model = als.fit(train)

#Generate predictions
pred = model.transform(test)

In [24]:
#evaluating our model performance
eval = RegressionEvaluator(metricName="rmse", labelCol = "user_rating", predictionCol="prediction")
rmse = eval.evaluate(pred)
print(f"RMSE: {rmse}")

RMSE: 1.2402263470412234


Q6. Give example of best three anime recommendations for minimum of 10 users.

>*Example 1: Best three anime recommendations for user 5*

In [25]:
user_5 = test.filter(test['user_id'] == 5).select(['user_id', 'anime_id', 'anime_title'])
rec5 = model.transform(user_5)
rec5.sort(desc("prediction")).show(3)

+-------+--------+--------------------+----------+
|user_id|anime_id|         anime_title|prediction|
+-------+--------+--------------------+----------+
|      5|   15335|Gintama Movie: Ka...| 6.1200695|
|      5|     170|           Slam Dunk| 5.8652463|
|      5|    1535|          Death Note|  5.606813|
+-------+--------+--------------------+----------+
only showing top 3 rows



>*Example 2: Best three anime recommendations for user 139*

In [26]:
user_139 = test.filter(test['user_id'] == 139).select(['user_id', 'anime_id', 'anime_title'])
rec139 = model.transform(user_139)
rec139.sort(desc("prediction")).show(3)

+-------+--------+--------------------+----------+
|user_id|anime_id|         anime_title|prediction|
+-------+--------+--------------------+----------+
|    139|      19|             Monster|   8.39358|
|    139|    7311|Suzumiya Haruhi n...|  7.792595|
|    139|   11741|Fate/Zero 2nd Season| 7.6936994|
+-------+--------+--------------------+----------+
only showing top 3 rows



>*Example 3: Best three anime recommendations for user 210*

In [27]:
user_210 = test.filter(test['user_id'] == 210).select(['user_id', 'anime_id', 'anime_title'])
rec210 = model.transform(user_210)
rec210.sort(desc("prediction")).show(3)

+-------+--------+--------------------+----------+
|user_id|anime_id|         anime_title|prediction|
+-------+--------+--------------------+----------+
|    210|    2904|Code Geass: Hangy...|  9.383395|
|    210|    5114|Fullmetal Alchemi...|  9.353313|
|    210|    1575|Code Geass: Hangy...|   9.18853|
+-------+--------+--------------------+----------+
only showing top 3 rows



>*Example 4: Best three recommendations for user 233*

In [28]:
user_233 = test.filter(test['user_id'] == 233).select(['user_id', 'anime_id', 'anime_title'])
rec233 = model.transform(user_233)
rec233.sort(desc("prediction")).show(3)

+-------+--------+--------------------+----------+
|user_id|anime_id|         anime_title|prediction|
+-------+--------+--------------------+----------+
|    233|   11757|    Sword Art Online| 10.261229|
|    233|   23273|Shigatsu wa Kimi ...| 10.235156|
|    233|   12365| Bakuman. 3rd Season| 10.116904|
+-------+--------+--------------------+----------+
only showing top 3 rows



>*Example 5: Best three recommendations for user 250*

In [29]:
user_250 = test.filter(test['user_id'] == 250).select(['user_id', 'anime_id', 'anime_title'])
rec250 = model.transform(user_250)
rec250.sort(desc("prediction")).show(3)

+-------+--------+--------------------+----------+
|user_id|anime_id|         anime_title|prediction|
+-------+--------+--------------------+----------+
|    250|    4181|Clannad: After Story|  8.795191|
|    250|   11741|Fate/Zero 2nd Season|  8.559677|
|    250|   30276|       One Punch Man|  8.502831|
+-------+--------+--------------------+----------+
only showing top 3 rows



>*Example 6: Best three recommendations for user 271*

In [30]:
user_271 = test.filter(test['user_id'] == 271).select(['user_id', 'anime_id', 'anime_title'])
rec271 = model.transform(user_271)
rec271.sort(desc("prediction")).show(3)

+-------+--------+--------------------+----------+
|user_id|anime_id|         anime_title|prediction|
+-------+--------+--------------------+----------+
|    271|   30276|       One Punch Man|  8.351711|
|    271|   23273|Shigatsu wa Kimi ...|  8.337369|
|    271|   22297|Fate/stay night: ...|  8.207536|
+-------+--------+--------------------+----------+
only showing top 3 rows



>*Example 7: Best three recommendations for user 308*

In [31]:
user_308 = test.filter(test['user_id'] == 308).select(['user_id', 'anime_id', 'anime_title'])
rec308 = model.transform(user_308)
rec308.sort(desc("prediction")).show(3)

+-------+--------+--------------------+----------+
|user_id|anime_id|         anime_title|prediction|
+-------+--------+--------------------+----------+
|    308|   21031|Precure All Stars...|  8.980688|
|    308|   28999|           Charlotte|  8.555373|
|    308|   17265|         Log Horizon| 8.3668585|
+-------+--------+--------------------+----------+
only showing top 3 rows



>*Example 8: Best three recommendations for user 593*

In [32]:
user_593 = test.filter(test['user_id'] == 593).select(['user_id', 'anime_id', 'anime_title'])
rec593 = model.transform(user_593)
rec593.sort(desc("prediction")).show(3)

+-------+--------+--------------------+----------+
|user_id|anime_id|         anime_title|prediction|
+-------+--------+--------------------+----------+
|    593|   30276|       One Punch Man|  8.840844|
|    593|   22297|Fate/stay night: ...|  8.687056|
|    593|   22535|Kiseijuu: Sei no ...|  8.600446|
+-------+--------+--------------------+----------+
only showing top 3 rows



>*Example 9: Best three recommendations for user 572*

In [33]:
user_572 = test.filter(test['user_id'] == 572).select(['user_id', 'anime_id', 'anime_title'])
rec572 = model.transform(user_572)
rec572.sort(desc("prediction")).show(3)

+-------+--------+--------------------+----------+
|user_id|anime_id|         anime_title|prediction|
+-------+--------+--------------------+----------+
|    572|    9253|         Steins;Gate|  6.359953|
|    572|    2251|            Baccano!| 6.2521405|
|    572|   11741|Fate/Zero 2nd Season|  6.169533|
+-------+--------+--------------------+----------+
only showing top 3 rows



>*Example 10: Best three recommendations for user 497*

In [34]:
user_497 = test.filter(test['user_id'] == 497).select(['user_id', 'anime_id', 'anime_title'])
rec497 = model.transform(user_497)
rec497.sort(desc("prediction")).show(3)

+-------+--------+--------------------+----------+
|user_id|anime_id|         anime_title|prediction|
+-------+--------+--------------------+----------+
|    497|    5114|Fullmetal Alchemi...| 7.7820606|
|    497|    6702|          Fairy Tail| 7.5191374|
|    497|   16498|  Shingeki no Kyojin| 7.4918613|
+-------+--------+--------------------+----------+
only showing top 3 rows

