# Imports and setup 

In [54]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [55]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Add Check for .tgz file present
!wget -q https://mirrors.estointernet.in/apache/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar xf /content/spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

In [56]:
#Step 2: Imports

#Spark
import findspark
findspark.init()
findspark.find()
from pyspark.sql.types import *
from pyspark.sql.functions import col
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, CountVectorizer
from pyspark.ml import Pipeline
import regex as re
import string
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.stat import Summarizer
import matplotlib.pyplot as plt
import numpy as np

In [57]:
spark = SparkSession.builder\
         .master("local")\
         .appName("Colab")\
         .config('spark.ui.port', '4050')\
         .getOrCreate()

In [58]:
ROOT = "/content/drive/MyDrive/MyAnimeList-parquet/"

In [59]:
anime_df = spark.read.format("parquet").option("header", "true") \
                                   .option("headers", "true") \
                                   .option('escape','"') \
                                   .option("inferSchema", "true") \
                                   .load(ROOT + "anime_cleaned.parquet", sep=',')

In [60]:
anime_df.show(5)

+--------+-------------------+--------------------+----------------------+--------------------+--------------------+----+--------+--------+---------------+------+--------------------+--------------------+---------------+--------------------+-----+---------+------+----------+-------+---------+--------------------+-----------+------------------+--------------------+--------------------+----------------+----------------+--------------------+--------------------+--------------------+------------+---------------+
|anime_id|              title|       title_english|        title_japanese|      title_synonyms|           image_url|type|  source|episodes|         status|airing|        aired_string|               aired|       duration|              rating|score|scored_by|  rank|popularity|members|favorites|          background|  premiered|         broadcast|             related|            producer|        licensor|          studio|               genre|       opening_theme|        ending_theme|du

In [61]:
user_df = spark.read.format("parquet").option("header", "true") \
                                   .option("headers", "true") \
                                   .option('escape','"') \
                                   .option("inferSchema", "true") \
                                   .load(ROOT + "users_cleaned.parquet", sep=',')

In [62]:
user_df.show(5)

+--------------+-------+-------------+--------------+-----------+------------+----------------+------------------------+------+-------------------+-------------------+-----------+-------------------+-------------------+----------------+---------------+--------------+
|      username|user_id|user_watching|user_completed|user_onhold|user_dropped|user_plantowatch|user_days_spent_watching|gender|           location|         birth_date|access_rank|          join_date|        last_online|stats_mean_score|stats_rewatched|stats_episodes|
+--------------+-------+-------------+--------------+-----------+------------+----------------+------------------------+------+-------------------+-------------------+-----------+-------------------+-------------------+----------------+---------------+--------------+
|      karthiga|2255153|            3|            49|          1|           0|               0|       55.09166666666667|Female|    Chennai, India |1990-04-29 00:00:00|       null|2013-03-03 00:00:

In [63]:
UA_df = spark.read.format("parquet").option("header", "true") \
                                   .option("headers", "true") \
                                   .option('escape','"') \
                                   .option("inferSchema", "true") \
                                   .load(ROOT + "animelists_cleaned.parquet", sep=',')

In [64]:
UA_df.show(5)

+--------+--------+-------------------+-------------+--------------+--------+---------+-------------+----------------+-------------------+-------+
|username|anime_id|my_watched_episodes|my_start_date|my_finish_date|my_score|my_status|my_rewatching|my_rewatching_ep|    my_last_updated|my_tags|
+--------+--------+-------------------+-------------+--------------+--------+---------+-------------+----------------+-------------------+-------+
|karthiga|      21|                586|   0000-00-00|    0000-00-00|       9|        1|         null|               0|2013-03-03 10:52:53|   null|
|karthiga|      59|                 26|   0000-00-00|    0000-00-00|       7|        2|         null|               0|2013-03-10 13:54:51|   null|
|karthiga|      74|                 26|   0000-00-00|    0000-00-00|       7|        2|         null|               0|2013-04-27 16:43:35|   null|
|karthiga|     120|                 26|   0000-00-00|    0000-00-00|       7|        2|         null|               0|

# Preparing data

anime df

In [65]:
#filling null genre values
anime_df = anime_df.fillna('', subset=['genre'])

In [66]:
anime_df.show(2)

+--------+----------------+--------------------+--------------+--------------------+--------------------+----+------+--------+---------------+------+--------------------+--------------------+---------------+--------------------+-----+---------+------+----------+-------+---------+--------------------+-----------+------------------+--------------------+--------------------+----------------+----------------+--------------------+--------------------+--------------------+------------+---------------+
|anime_id|           title|       title_english|title_japanese|      title_synonyms|           image_url|type|source|episodes|         status|airing|        aired_string|               aired|       duration|              rating|score|scored_by|  rank|popularity|members|favorites|          background|  premiered|         broadcast|             related|            producer|        licensor|          studio|               genre|       opening_theme|        ending_theme|duration_min|aired_from_year

In [67]:
#removing scored_by as its highly correlated with members column and rank as its inversely correlated with score
cols = ['title', 'title_english', 'title_japanese', 'title_synonyms', 'image_url', 'aired_string', 'background',
       'broadcast', 'related', 'opening_theme', 'ending_theme', 'studio','scored_by','premiered','producer','licensor','rank','duration']
anime_df = anime_df.drop(*cols)

In [68]:
anime_df.show(2)

+--------+----+------+--------+---------------+------+--------------------+--------------------+-----+----------+-------+---------+--------------------+------------+---------------+
|anime_id|type|source|episodes|         status|airing|               aired|              rating|score|popularity|members|favorites|               genre|duration_min|aired_from_year|
+--------+----+------+--------+---------------+------+--------------------+--------------------+-----+----------+-------+---------+--------------------+------------+---------------+
|   11013|  TV| Manga|      12|Finished Airing| false|{'from': '2012-01...|PG-13 - Teens 13 ...| 7.63|       231| 283882|     2809|Comedy, Supernatu...|        24.0|         2012.0|
|    2104|  TV| Manga|      26|Finished Airing| false|{'from': '2007-04...|PG-13 - Teens 13 ...| 7.89|       366| 204003|     2579|Comedy, Parody, R...|        24.0|         2007.0|
+--------+----+------+--------+---------------+------+--------------------+---------------

In [69]:
anime_df = anime_df.withColumn("episodes", anime_df.episodes.cast('float'))
anime_df = anime_df.withColumn("score", anime_df.score.cast('float'))
anime_df = anime_df.withColumn("popularity", anime_df.popularity.cast('float'))
anime_df = anime_df.withColumn("members", anime_df.members.cast('float'))
anime_df = anime_df.withColumn("favorites", anime_df.favorites.cast('float'))
anime_df = anime_df.withColumn("airing", anime_df.favorites.cast('String'))

In [70]:
#converting genre to array of strings
anime_df = anime_df.withColumn(
    'genre',
    split(regexp_replace('genre', ' ', ''), ',').cast("array<string>").alias("genre")
)

In [71]:
anime_df.show(2)

+--------+----+------+--------+---------------+------+--------------------+--------------------+-----+----------+--------+---------+--------------------+------------+---------------+
|anime_id|type|source|episodes|         status|airing|               aired|              rating|score|popularity| members|favorites|               genre|duration_min|aired_from_year|
+--------+----+------+--------+---------------+------+--------------------+--------------------+-----+----------+--------+---------+--------------------+------------+---------------+
|   11013|  TV| Manga|    12.0|Finished Airing|2809.0|{'from': '2012-01...|PG-13 - Teens 13 ...| 7.63|     231.0|283882.0|   2809.0|[Comedy, Supernat...|        24.0|         2012.0|
|    2104|  TV| Manga|    26.0|Finished Airing|2579.0|{'from': '2007-04...|PG-13 - Teens 13 ...| 7.89|     366.0|204003.0|   2579.0|[Comedy, Parody, ...|        24.0|         2007.0|
+--------+----+------+--------+---------------+------+--------------------+----------

In [72]:
anime_df = CountVectorizer(inputCol="genre", outputCol="genre_fv").fit(anime_df).transform(anime_df)
anime_df = anime_df.drop('genre')

In [73]:
anime_df.show(2)

+--------+----+------+--------+---------------+------+--------------------+--------------------+-----+----------+--------+---------+------------+---------------+--------------------+
|anime_id|type|source|episodes|         status|airing|               aired|              rating|score|popularity| members|favorites|duration_min|aired_from_year|            genre_fv|
+--------+----+------+--------+---------------+------+--------------------+--------------------+-----+----------+--------+---------+------------+---------------+--------------------+
|   11013|  TV| Manga|    12.0|Finished Airing|2809.0|{'from': '2012-01...|PG-13 - Teens 13 ...| 7.63|     231.0|283882.0|   2809.0|        24.0|         2012.0|(44,[0,6,7,10],[1...|
|    2104|  TV| Manga|    26.0|Finished Airing|2579.0|{'from': '2007-04...|PG-13 - Teens 13 ...| 7.89|     366.0|204003.0|   2579.0|        24.0|         2007.0|(44,[0,6,7,8,25],...|
+--------+----+------+--------+---------------+------+--------------------+----------

In [74]:
categoricalColumns = ['type', 'source', 'status', 'rating','airing','aired']
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

In [75]:
numericCols = ['episodes', 'score', 'popularity', 'members', 'favorites', 'genre_fv','duration_min','aired_from_year'] 
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="item_feats_profile")
stages += [assembler]

In [76]:
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(anime_df)
anime_df = pipelineModel.transform(anime_df)

In [77]:
anime_df.show(2)

+--------+----+------+--------+---------------+------+--------------------+--------------------+-----+----------+--------+---------+------------+---------------+--------------------+---------+-------------+-----------+--------------+-----------+--------------+-----------+--------------+-----------+------------------+----------+-------------------+--------------------+
|anime_id|type|source|episodes|         status|airing|               aired|              rating|score|popularity| members|favorites|duration_min|aired_from_year|            genre_fv|typeIndex| typeclassVec|sourceIndex|sourceclassVec|statusIndex|statusclassVec|ratingIndex|ratingclassVec|airingIndex|    airingclassVec|airedIndex|      airedclassVec|  item_feats_profile|
+--------+----+------+--------+---------------+------+--------------------+--------------------+-----+----------+--------+---------+------------+---------------+--------------------+---------+-------------+-----------+--------------+-----------+-------------

In [78]:
selectedCols = ['anime_id', 'item_feats_profile']
anime_df = anime_df.select(selectedCols)

In [79]:
anime_df.show(5)

+--------+--------------------+
|anime_id|  item_feats_profile|
+--------+--------------------+
|   11013|(6531,[0,5,19,20,...|
|    2104|(6531,[0,5,19,20,...|
|    5262|(6531,[0,5,19,24,...|
|     721|(6531,[0,6,19,20,...|
|   12365|(6531,[0,5,19,20,...|
+--------+--------------------+
only showing top 5 rows



user df 

In [80]:
user_df.show(2)

+---------+-------+-------------+--------------+-----------+------------+----------------+------------------------+------+----------------+-------------------+-----------+-------------------+-------------------+----------------+---------------+--------------+
| username|user_id|user_watching|user_completed|user_onhold|user_dropped|user_plantowatch|user_days_spent_watching|gender|        location|         birth_date|access_rank|          join_date|        last_online|stats_mean_score|stats_rewatched|stats_episodes|
+---------+-------+-------------+--------------+-----------+------------+----------------+------------------------+------+----------------+-------------------+-----------+-------------------+-------------------+----------------+---------------+--------------+
| karthiga|2255153|            3|            49|          1|           0|               0|       55.09166666666667|Female| Chennai, India |1990-04-29 00:00:00|       null|2013-03-03 00:00:00|2014-02-04 01:32:00|         

In [82]:
user_df = user_df.filter(col('user_id').isNotNull())
user_df = user_df.filter(col('username').isNotNull())
user_df = user_df.filter(col('stats_episodes').isNotNull())
user_df = user_df.fillna('', subset=['location'])

In [89]:
#user_completed is correlated to stats_episodes
cols = ['access_rank', 'birth_date','join_date', 'last_online','user_completed','location']
user_df = user_df.drop(*cols)

In [90]:
user_df.show(2)

+---------+-------+-------------+-----------+------------+----------------+------------------------+------+----------------+---------------+--------------+
| username|user_id|user_watching|user_onhold|user_dropped|user_plantowatch|user_days_spent_watching|gender|stats_mean_score|stats_rewatched|stats_episodes|
+---------+-------+-------------+-----------+------------+----------------+------------------------+------+----------------+---------------+--------------+
| karthiga|2255153|            3|          1|           0|               0|       55.09166666666667|Female|            7.43|            0.0|          3391|
|Damonashu|  37326|           45|         27|          25|              59|       82.57430555555555|  Male|            6.15|            6.0|          4903|
+---------+-------+-------------+-----------+------------+----------------+------------------------+------+----------------+---------------+--------------+
only showing top 2 rows



In [91]:
categoricalColumns = ['gender']
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

In [92]:
NumericCols = ['user_watching', 'user_onhold', 'user_dropped', 'user_plantowatch', 'user_days_spent_watching', 
        'stats_rewatched', 'stats_episodes','stats_mean_score']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + NumericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="user_feats_profile")
stages += [assembler]

In [93]:
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(user_df)
user_df = pipelineModel.transform(user_df)

In [94]:
selectedCols = ['username', 'user_id', 'user_feats_profile']
user_df = user_df.select(selectedCols)

In [95]:
user_df.show(5)

+--------------+-------+--------------------+
|      username|user_id|  user_feats_profile|
+--------------+-------+--------------------+
|      karthiga|2255153|[0.0,1.0,3.0,1.0,...|
|     Damonashu|  37326|[1.0,0.0,45.0,27....|
|         bskai| 228342|[1.0,0.0,25.0,2.0...|
|terune_uzumaki| 327311|[0.0,1.0,5.0,0.0,...|
|         Bas_G|5015094|[1.0,0.0,35.0,6.0...|
+--------------+-------+--------------------+
only showing top 5 rows



UA 

In [96]:
UA_df.show(5)

+--------+--------+-------------------+-------------+--------------+--------+---------+-------------+----------------+-------------------+-------+
|username|anime_id|my_watched_episodes|my_start_date|my_finish_date|my_score|my_status|my_rewatching|my_rewatching_ep|    my_last_updated|my_tags|
+--------+--------+-------------------+-------------+--------------+--------+---------+-------------+----------------+-------------------+-------+
|karthiga|      21|                586|   0000-00-00|    0000-00-00|       9|        1|         null|               0|2013-03-03 10:52:53|   null|
|karthiga|      59|                 26|   0000-00-00|    0000-00-00|       7|        2|         null|               0|2013-03-10 13:54:51|   null|
|karthiga|      74|                 26|   0000-00-00|    0000-00-00|       7|        2|         null|               0|2013-04-27 16:43:35|   null|
|karthiga|     120|                 26|   0000-00-00|    0000-00-00|       7|        2|         null|               0|

In [97]:
cols = ['my_watched_episodes', 'my_start_date', 'my_finish_date', 'my_status', 'my_rewatching', 
        'my_rewatching_ep', 'my_last_updated', 'my_tags']
UA_df = UA_df.drop(*cols)

In [98]:
UA_df = UA_df.na.drop()
UA_df = UA_df.filter(col('username').isNotNull())

In [99]:
UA_df = UA_df.filter(UA_df.my_score <= 10)

In [100]:
UA_df = UA_df.filter(UA_df.my_score > 0)

In [101]:
UA_df.show(2)

+--------+--------+--------+
|username|anime_id|my_score|
+--------+--------+--------+
|karthiga|      21|       9|
|karthiga|      59|       7|
+--------+--------+--------+
only showing top 2 rows



In [103]:
UA_df = UA_df.join(user_df, 'username', how='left')

In [104]:
UA_df.show(2)

+--------+--------+--------+-------+--------------------+
|username|anime_id|my_score|user_id|  user_feats_profile|
+--------+--------+--------+-------+--------------------+
|karthiga|      21|       9|2255153|[0.0,1.0,3.0,1.0,...|
|karthiga|      59|       7|2255153|[0.0,1.0,3.0,1.0,...|
+--------+--------+--------+-------+--------------------+
only showing top 2 rows



In [105]:
UA_df = UA_df.drop('username')

In [106]:
UA_df = UA_df.join(anime_df, 'anime_id', how='left')

In [107]:
assembler = VectorAssembler(inputCols=["user_feats_profile", "item_feats_profile"], outputCol="features")

In [108]:
user_anime_df = assembler.transform(UA_df)

In [109]:
user_anime_df.show(5)

+--------+--------+-------+--------------------+--------------------+--------------------+
|anime_id|my_score|user_id|  user_feats_profile|  item_feats_profile|            features|
+--------+--------+-------+--------------------+--------------------+--------------------+
|      21|       9|2255153|[0.0,1.0,3.0,1.0,...|(6531,[0,5,20,110...|(6541,[1,2,3,6,8,...|
|      59|       7|2255153|[0.0,1.0,3.0,1.0,...|(6531,[0,5,19,20,...|(6541,[1,2,3,6,8,...|
|      74|       7|2255153|[0.0,1.0,3.0,1.0,...|(6531,[0,5,19,21,...|(6541,[1,2,3,6,8,...|
|     120|       7|2255153|[0.0,1.0,3.0,1.0,...|(6531,[0,5,19,20,...|(6541,[1,2,3,6,8,...|
|     178|       7|2255153|[0.0,1.0,3.0,1.0,...|(6531,[0,5,19,21,...|(6541,[1,2,3,6,8,...|
+--------+--------+-------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [110]:
print("No of user-anime records: ",user_anime_df.count())

No of user-anime records:  19171950
