In [1]:
import numpy as np
import re
import gc
import warnings
warnings.filterwarnings('ignore')

# pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

# eda
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Window

# machine learning
import pyspark.ml.feature
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, CountVectorizer
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.stat import Summarizer

# etc
SEED = 42

In [2]:
conf = SparkConf().set('spark.ui.port', '4050')\
                    .set('spark.sql.autoBroadcastJoinThreshold', '-1')

sc = SparkContext(conf = conf)
spark = SparkSession.builder.getOrCreate()

22/08/24 02:36:12 WARN Utils: Your hostname, kevin-H resolves to a loopback address: 127.0.1.1; using 192.168.1.7 instead (on interface wlo1)
22/08/24 02:36:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/08/24 02:36:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
anime_df = spark.read.format('csv').option('header', 'true')\
                                    .option('headers', 'true')\
                                    .option('escape', '"')\
                                    .option('inferSchema', 'true')\
                                    .load('AnimeList.csv')

anime_df.toPandas().head()

22/08/24 02:36:24 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Unnamed: 0,anime_id,title,title_english,title_japanese,title_synonyms,image_url,type,source,episodes,status,...,background,premiered,broadcast,related,producer,licensor,studio,genre,opening_theme,ending_theme
0,11013,Inu x Boku SS,Inu X Boku Secret Service,妖狐×僕SS,Youko x Boku SS,https://myanimelist.cdn-dena.com/images/anime/...,TV,Manga,12,Finished Airing,...,Inu x Boku SS was licensed by Sentai Filmworks...,Winter 2012,Fridays at Unknown,"{'Adaptation': [{'mal_id': 17207, 'type': 'man...","Aniplex, Square Enix, Mainichi Broadcasting Sy...",Sentai Filmworks,David Production,"Comedy, Supernatural, Romance, Shounen","['""Nirvana"" by MUCC']","['#1: ""Nirvana"" by MUCC (eps 1, 11-12)', '#2: ..."
1,2104,Seto no Hanayome,My Bride is a Mermaid,瀬戸の花嫁,The Inland Sea Bride,https://myanimelist.cdn-dena.com/images/anime/...,TV,Manga,26,Finished Airing,...,,Spring 2007,Unknown,"{'Adaptation': [{'mal_id': 759, 'type': 'manga...","TV Tokyo, AIC, Square Enix, Sotsu",Funimation,Gonzo,"Comedy, Parody, Romance, School, Shounen","['""Romantic summer"" by SUN&LUNAR']","['#1: ""Ashita e no Hikari (明日への光)"" by Asuka Hi..."
2,5262,Shugo Chara!! Doki,Shugo Chara!! Doki,しゅごキャラ！！どきっ,"Shugo Chara Ninenme, Shugo Chara! Second Year",https://myanimelist.cdn-dena.com/images/anime/...,TV,Manga,51,Finished Airing,...,,Fall 2008,Unknown,"{'Adaptation': [{'mal_id': 101, 'type': 'manga...","TV Tokyo, Sotsu",,Satelight,"Comedy, Magic, School, Shoujo","['#1: ""Minna no Tamago (みんなのたまご)"" by Shugo Cha...","['#1: ""Rottara Rottara (ロッタラ ロッタラ)"" by Buono! ..."
3,721,Princess Tutu,Princess Tutu,プリンセスチュチュ,,https://myanimelist.cdn-dena.com/images/anime/...,TV,Original,38,Finished Airing,...,Princess Tutu aired in two parts. The first pa...,Summer 2002,Fridays at Unknown,"{'Adaptation': [{'mal_id': 1581, 'type': 'mang...","Memory-Tech, GANSIS, Marvelous AQL",ADV Films,Hal Film Maker,"Comedy, Drama, Magic, Romance, Fantasy","['""Morning Grace"" by Ritsuko Okazaki']","['""Watashi No Ai Wa Chiisaikeredo"" by Ritsuko ..."
4,12365,Bakuman. 3rd Season,Bakuman.,バクマン。,Bakuman Season 3,https://myanimelist.cdn-dena.com/images/anime/...,TV,Manga,25,Finished Airing,...,,Fall 2012,Unknown,"{'Adaptation': [{'mal_id': 9711, 'type': 'mang...","NHK, Shueisha",,J.C.Staff,"Comedy, Drama, Romance, Shounen","['#1: ""Moshimo no Hanashi (もしもの話)"" by nano.RIP...","['#1: ""Pride on Everyday"" by Sphere (eps 1-13)..."


# Anime DataProcessing

In [4]:
# Drop columns
cols = ['title', 'title_english', 'title_japanese', 'title_synonyms', 'image_url', 'aired_string', 'background',
       'broadcast', 'related', 'opening_theme', 'ending_theme', 'studio', 'premiered', 'producer', 'licensor',
       'rank', 'aired', 'duration', 'airing']
anime_df = anime_df.drop(*cols)


# Fill in missing values
anime_df = anime_df.fillna('', subset=['genre'])

# fix genre column
anime_df = anime_df.withColumn(
    'genre',
    split(regexp_replace('genre', " ", ''), ',').cast("array<string>").alias('genre')
)

cols_float = ['episodes', 'score', 'scored_by', 'popularity', 'members', 'favorites']
for col_name in cols_float:
    anime_df = anime_df.withColumn(col_name, col(col_name).cast('float'))

In [5]:
anime_df = CountVectorizer(
    inputCol='genre',
    outputCol= 'genre_fv'
).fit(anime_df).transform(anime_df)
anime_df = anime_df.drop('genre')

                                                                                

We will use StringIndexer on categorical columns. StringIndexer encodes a string column of labels to a column of label indices. If the input column is numeric, we cast it to a string and index the string values. The indices are in [0, numLabels]

In [6]:
cat_cols = ['type', 'source', 'status', 'rating']
stages = []
for cat in cat_cols:
    indexer = StringIndexer(
        inputCol=cat,
        outputCol= cat + 'Index'
    )
    encoder = OneHotEncoder(
        inputCols= [indexer.getOutputCol()],
        outputCols= [cat + 'classVec']
    )
    stages += [indexer, encoder]

VectorAssembler is a transformer that combnes a given list of columns into a single vector column. It is useful for combining raw features and features generated by different feature transformers into a single feature vector, in order to train ML models. In this case, we will combine a couple of columns into a single feature vector and use it for prediction

In [7]:
num_cols = ['episodes', 'score', 'scored_by', 'popularity', 'members', 'favorites', 'genre_fv']

assemblerInputs = [c + 'classVec' for c in cat_cols] + num_cols
assembler = VectorAssembler(
    inputCols= assemblerInputs,
    outputCol = 'item_features_profile'
)
stages += [assembler]

In [8]:
@udf("array<integer>")
def indices(v):
    return v.indices.tolist()

In [9]:
pipeline = Pipeline(stages = stages)

pipelineModel = pipeline.fit(anime_df)
anime_df = pipelineModel.transform(anime_df)

                                                                                

In [10]:
anime_df.select('item_features_profile').show(5)

+---------------------+
|item_features_profile|
+---------------------+
| (79,[0,8,21,23,29...|
| (79,[0,8,21,23,29...|
| (79,[0,8,21,25,29...|
| (79,[0,7,21,23,29...|
| (79,[0,8,21,23,29...|
+---------------------+
only showing top 5 rows



# User Dataprocessing

In [11]:
user_df = spark.read.format('csv')\
                    .option('header', 'true')\
                    .option('headers', 'true')\
                    .option('escape', '"')\
                    .option('inferSchema', 'true')\
                    .load('UserList.csv', sep=',')




                                                                                

In [12]:
user_df.count()

302676

In [13]:
user_df.columns

['username',
 'user_id',
 'user_watching',
 'user_completed',
 'user_onhold',
 'user_dropped',
 'user_plantowatch',
 'user_days_spent_watching',
 'gender',
 'location',
 'birth_date',
 'access_rank',
 'join_date',
 'last_online',
 'stats_mean_score',
 'stats_rewatched',
 'stats_episodes']

In [14]:
# Drop null values
user_df = user_df.filter((col('username').isNotNull()) & (col('stats_episodes').isNotNull()))

# Drop unnecessary columns
cols = ['location', 'access_rank', 'stats_mean_score', 'birth_date', 'gender', 
        'join_date', 'last_online']
user_df = user_df.drop(*cols)

# Convert columns to float
cols_float = ['user_id', 'user_watching', 'user_completed', 'user_onhold', 'user_dropped',
                'user_plantowatch', 'user_days_spent_watching', 'stats_rewatched', 'stats_episodes']

for feat in cols_float:
    user_df = user_df.withColumn(
        feat,
        col(feat).cast('float')
    )

cols = ['user_watching', 'user_completed', 'user_onhold', 'user_dropped', 'user_plantowatch', 'user_days_spent_watching', 
        'stats_rewatched', 'stats_episodes']
assembler = VectorAssembler(
    inputCols=cols,
    outputCol= 'user_feats_profile'
)

stages = [assembler]

In [15]:
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(user_df)
user_df = pipelineModel.transform(user_df)

In [16]:
selectCols = ['username', 'user_id', 'user_feats_profile']
user_df = user_df.select(selectCols)
user_df.show(5)

+----------------+---------+--------------------+
|        username|  user_id|  user_feats_profile|
+----------------+---------+--------------------+
|        karthiga|2255153.0|[3.0,49.0,1.0,0.0...|
|RedvelvetDaisuki|1897606.0|[61.0,396.0,39.0,...|
|       Damonashu|  37326.0|[45.0,195.0,27.0,...|
|           bskai| 228342.0|[25.0,414.0,2.0,5...|
|       shuzzable|2347781.0|[36.0,72.0,16.0,2...|
+----------------+---------+--------------------+
only showing top 5 rows



# User anime data preprocessing

In [17]:
user_anime_df = spark.read.format('csv')\
                            .option('header', 'true')\
                            .load('UserAnimeList.csv')

In [18]:
user_anime_df.show()

+--------+--------+-------------------+-------------+--------------+--------+---------+-------------+----------------+---------------+-------+
|username|anime_id|my_watched_episodes|my_start_date|my_finish_date|my_score|my_status|my_rewatching|my_rewatching_ep|my_last_updated|my_tags|
+--------+--------+-------------------+-------------+--------------+--------+---------+-------------+----------------+---------------+-------+
|karthiga|      21|                586|   0000-00-00|    0000-00-00|       9|        1|         null|               0|     1362307973|   null|
|karthiga|      59|                 26|   0000-00-00|    0000-00-00|       7|        2|         null|               0|     1362923691|   null|
|karthiga|      74|                 26|   0000-00-00|    0000-00-00|       7|        2|         null|               0|     1367081015|   null|
|karthiga|     120|                 26|   0000-00-00|    0000-00-00|       7|        2|         null|               0|     1362308037|   null|

In [19]:
cols = ['my_watched_episodes', 'my_start_date', 'my_finish_date', 'my_status', 
        'my_rewatching', 'my_rewatching_ep', 'my_last_updated', 'my_tags']
# drop unnecessary columns
user_anime_df = user_anime_df.drop(*cols)

# convert columns to float
cols_float = ['anime_id', 'my_score']
for feat in cols_float:
        user_anime_df = user_anime_df.withColumn(
                feat,
                col(feat).cast('float')
        )

# drop null values
user_anime_df = user_anime_df.na.drop()

# filter for scores less/equal to 10
user_anime_df = user_anime_df.filter((user_anime_df.my_score <= 10) & (user_anime_df.my_score != 0))

user_anime_df.show(5)

+--------+--------+--------+
|username|anime_id|my_score|
+--------+--------+--------+
|karthiga|    21.0|     9.0|
|karthiga|    59.0|     7.0|
|karthiga|    74.0|     7.0|
|karthiga|   120.0|     7.0|
|karthiga|   178.0|     7.0|
+--------+--------+--------+
only showing top 5 rows



In [20]:
# join with user_df
user_anime_df = user_anime_df.join(user_df, 'username', how='left')

user_anime_df = user_anime_df.drop('username')

# join with anime_df 
user_anime_df = user_anime_df.join(anime_df, 'anime_id', how='left')

# Drop missing values
user_anime_df = user_anime_df.na.drop()

In [21]:
assembler = VectorAssembler(
    inputCols=['user_feats_profile', 'item_features_profile'],
    outputCol='features'
) 
user_anime_df = assembler.transform(user_anime_df)
user_anime_df.show(5)




+--------+--------+---------+--------------------+----+-------+--------+---------------+------+-----+---------+----------+-------+---------+--------------------+---------+-------------+-----------+--------------+-----------+--------------+-----------+--------------+---------------------+--------------------+
|anime_id|my_score|  user_id|  user_feats_profile|type| source|episodes|         status|rating|score|scored_by|popularity|members|favorites|            genre_fv|typeIndex| typeclassVec|sourceIndex|sourceclassVec|statusIndex|statusclassVec|ratingIndex|ratingclassVec|item_features_profile|            features|
+--------+--------+---------+--------------------+----+-------+--------+---------------+------+-----+---------+----------+-------+---------+--------------------+---------+-------------+-----------+--------------+-----------+--------------+-----------+--------------+---------------------+--------------------+
|   714.0|     7.0|1216451.0|[10.0,423.0,2.0,1...|  TV|Unknown|    86.

                                                                                

In [22]:
# Train test split
(training, test) = user_anime_df.randomSplit([0.8, 0.2], seed = SEED)
(training, valid) = training.randomSplit([0.9, 0.1], seed = SEED)

# Popularity Based

In [27]:
avg_score_by_anime = training.groupBy('anime_id').agg(avg('my_score').alias('preds_0'))

In [28]:
avg_score = training.agg(avg('my_score').alias('overall_average'))

In [29]:
c = avg_score.collect()

                                                                                

In [30]:
valid = valid.join(avg_score_by_anime, 'anime_id', how='left')

valid = valid.fillna(c[0].overall_average, subset=['preds_0'])


In [31]:
evaluator = RegressionEvaluator(
    metricName = 'rmse',
    labelCol = 'my_score',
    predictionCol='preds_0'
)
rmse = evaluator.evaluate(valid)
print(f"RMSE: {str(rmse)} ")

22/08/24 03:09:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/08/24 03:09:50 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/08/24 03:09:52 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/08/24 03:09:54 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/08/24 03:09:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/08/24 03:09:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/08/24 03:09:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/08/24 03:09:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/08/24 03:09:56 WARN RowBasedKeyValueBatch: Calling spill() on

KeyboardInterrupt: 