# Lets make a anime recommendation system

I got a free trial for Microsoft Fabric so I will be using pyspark in notebook provided by Microsoft

First of all let's clean the data

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.appName("pgpu").master("local[*]").getOrCreate()  

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 5, Finished, Available)

In [4]:
rating = spark.read.csv('Files/rating.csv')
rating.show()

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 6, Finished, Available)

+-------+--------+------+
|    _c0|     _c1|   _c2|
+-------+--------+------+
|user_id|anime_id|rating|
|      1|      20|    -1|
|      1|      24|    -1|
|      1|      79|    -1|
|      1|     226|    -1|
|      1|     241|    -1|
|      1|     355|    -1|
|      1|     356|    -1|
|      1|     442|    -1|
|      1|     487|    -1|
|      1|     846|    -1|
|      1|     936|    -1|
|      1|    1546|    -1|
|      1|    1692|    -1|
|      1|    1836|    -1|
|      1|    2001|    -1|
|      1|    2025|    -1|
|      1|    2144|    -1|
|      1|    2787|    -1|
|      1|    2993|    -1|
+-------+--------+------+
only showing top 20 rows



In [5]:
anime = spark.read.csv('Files/anime.csv')
anime.show()

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 7, Finished, Available)

+--------+--------------------+--------------------+-----+--------+------+-------+
|     _c0|                 _c1|                 _c2|  _c3|     _c4|   _c5|    _c6|
+--------+--------------------+--------------------+-----+--------+------+-------+
|anime_id|                name|               genre| type|episodes|rating|members|
|   32281|      Kimi no Na wa.|Drama, Romance, S...|Movie|       1|  9.37| 200630|
|    5114|Fullmetal Alchemi...|Action, Adventure...|   TV|      64|  9.26| 793665|
|   28977|            Gintama°|Action, Comedy, H...|   TV|      51|  9.25| 114262|
|    9253|         Steins;Gate|    Sci-Fi, Thriller|   TV|      24|  9.17| 673572|
|    9969|       Gintama&#039;|Action, Comedy, H...|   TV|      51|  9.16| 151266|
|   32935|Haikyuu!!: Karasu...|Comedy, Drama, Sc...|   TV|      10|  9.15|  93351|
|   11061|Hunter x Hunter (...|Action, Adventure...|   TV|     148|  9.13| 425855|
|     820|Ginga Eiyuu Densetsu|Drama, Military, ...|  OVA|     110|  9.11|  80679|
|   

The data is stored in DataLake so we can directly read with schema but here I will do it manually as I'm trying to learn spark

In [6]:
anim_sch = StructType([
    StructField('anime_id', IntegerType(), nullable=False),
    StructField('name', StringType(), nullable=False),
    StructField('genre', StringType(), nullable=False),
    StructField('type', StringType(), nullable=False),
    StructField('episodes', IntegerType(), nullable=False),
    StructField('rating', FloatType(), nullable=False),
    StructField('members', IntegerType(), nullable=True),
])

rat_sch = StructType([
    StructField('user_id', IntegerType(), nullable=False),
    StructField('anime_id', IntegerType(), nullable=False),
    StructField('rating', FloatType(), nullable=False),
])

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 8, Finished, Available)

In [7]:
anime = spark.read.option("header", True).schema(anim_sch).csv('Files/anime.csv')
anime.show()

rating = spark.read.option("header", True).schema(rat_sch).csv('Files/rating.csv')
rating.show()

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 9, Finished, Available)

+--------+--------------------+--------------------+-----+--------+------+-------+
|anime_id|                name|               genre| type|episodes|rating|members|
+--------+--------------------+--------------------+-----+--------+------+-------+
|   32281|      Kimi no Na wa.|Drama, Romance, S...|Movie|       1|  9.37| 200630|
|    5114|Fullmetal Alchemi...|Action, Adventure...|   TV|      64|  9.26| 793665|
|   28977|            Gintama°|Action, Comedy, H...|   TV|      51|  9.25| 114262|
|    9253|         Steins;Gate|    Sci-Fi, Thriller|   TV|      24|  9.17| 673572|
|    9969|       Gintama&#039;|Action, Comedy, H...|   TV|      51|  9.16| 151266|
|   32935|Haikyuu!!: Karasu...|Comedy, Drama, Sc...|   TV|      10|  9.15|  93351|
|   11061|Hunter x Hunter (...|Action, Adventure...|   TV|     148|  9.13| 425855|
|     820|Ginga Eiyuu Densetsu|Drama, Military, ...|  OVA|     110|  9.11|  80679|
|   15335|Gintama Movie: Ka...|Action, Comedy, H...|Movie|       1|   9.1|  72534|
|   

Let's see summary of anime DataFrame

In [8]:
anime.summary().show()

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 10, Finished, Available)

+-------+------------------+-----------------+------+-----+------------------+------------------+-----------------+
|summary|          anime_id|             name| genre| type|          episodes|            rating|          members|
+-------+------------------+-----------------+------+-----+------------------+------------------+-----------------+
|  count|             12294|            12294| 12232|12269|             11954|             12064|            12294|
|   mean|14058.221652838783|         166778.5|  null| null|12.382549774134182| 6.473901690633607|18071.33886448674|
| stddev|11455.294700988177|330891.6746051493|  null| null| 46.86535196440979|1.0267463037708622|54820.67692490701|
|    min|                 1|    &quot;0&quot;|Action|Movie|                 1|              1.67|                5|
|    25%|              3483|              1.0|  null| null|                 1|              5.88|              225|
|    50%|             10259|           1989.0|  null| null|             

In the episode column we can see that mean is 12.38 but the max number is 1818. Such a difference is definitely due to series like Doremon, One Piece etc. which has large number of episodes compared to newer animes which typically has 12-24 episodes

In [9]:
# We will need all these functions below
from pyspark.sql.functions import desc, count, when, percentile_approx, mean, mode, col, trim, round as rnd

anime.groupBy("episodes").agg(count('*')).orderBy(anime.episodes).show()

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 11, Finished, Available)

+--------+--------+
|episodes|count(1)|
+--------+--------+
|    null|     340|
|       1|    5677|
|       2|    1076|
|       3|     505|
|       4|     327|
|       5|     121|
|       6|     268|
|       7|      72|
|       8|      60|
|       9|      40|
|      10|     114|
|      11|      72|
|      12|     816|
|      13|     572|
|      14|      32|
|      15|      21|
|      16|      30|
|      17|       5|
|      18|      11|
|      19|       4|
+--------+--------+
only showing top 20 rows



There seems to be bunch of movies and animes with episodes betweeen 1-13, that was the reason for the huge gap between mean and max value.

So let's fill the NULL values. If the type is movie then the episode will be 1 for the ova other stuffs we will use median. Also we will drop animes that has no genre

In [10]:
anime = anime.filter(anime.genre.isNotNull())

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 12, Finished, Available)

In [11]:
condn = (anime.type == "Movie") & (anime.episodes.isNull())

anime = anime.withColumn('episodes', when(condn, 1).otherwise(anime.episodes))

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 13, Finished, Available)

Now for remaining NULL values we will fill them with their respective types median or mode eg. we will only calculate median or mode of ova, ona etc and use it to fill it ofr other ova and ona.

But before that let's see if any animes having movie or ova in title has NULL values

In [12]:
anime.filter((anime.name.like('%Movie%') | anime.name.like("%OVA%")) & anime.episodes.isNull()).show()

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 14, Finished, Available)

+--------+----+-----+----+--------+------+-------+
|anime_id|name|genre|type|episodes|rating|members|
+--------+----+-----+----+--------+------+-------+
+--------+----+-----+----+--------+------+-------+



...

In [13]:
anime.groupBy("type").agg(*[percentile_approx(col(x), 0.5) for x in ['episodes', 'rating']]).show()

anime.groupBy("type").agg(*[mode(col(x)) for x in ['episodes', 'rating']]).show()

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 15, Finished, Available)

+-------+---------------------------------------+-------------------------------------+
|   type|percentile_approx(episodes, 0.5, 10000)|percentile_approx(rating, 0.5, 10000)|
+-------+---------------------------------------+-------------------------------------+
|     TV|                                     24|                                 6.94|
|   null|                                   null|                                 null|
|Special|                                      1|                                 6.63|
|    OVA|                                      2|                                 6.38|
|  Music|                                      1|                                 5.62|
|  Movie|                                      1|                                 6.49|
|    ONA|                                      2|                                 5.76|
+-------+---------------------------------------+-------------------------------------+

+-------+--------------+-------

Looking at the result median makes much sense

In [14]:
mdns = anime.groupBy("type").agg(
    *[percentile_approx(col(x), 0.5).alias(f'median_{x}') for x in ['episodes', 'rating']]
)

mdns.show()

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 16, Finished, Available)

+-------+---------------+-------------+
|   type|median_episodes|median_rating|
+-------+---------------+-------------+
|     TV|             24|         6.94|
|   null|           null|         null|
|Special|              1|         6.63|
|    OVA|              2|         6.38|
|  Music|              1|         5.62|
|  Movie|              1|         6.49|
|    ONA|              2|         5.76|
+-------+---------------+-------------+



Now we will join it with our anime dataframe

In [15]:
anime = anime.join(mdns, on='type', how='left').withColumn(
    'episodes', when(anime.episodes.isNull(), mdns.median_episodes)\
    .otherwise(anime.episodes))\
    .withColumn('rating', when(anime.rating.isNull(), mdns.median_rating)\
    .otherwise(anime.rating))

anime.show()

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 17, Finished, Available)

+-----+--------+--------------------+--------------------+--------+------+-------+---------------+-------------+
| type|anime_id|                name|               genre|episodes|rating|members|median_episodes|median_rating|
+-----+--------+--------------------+--------------------+--------+------+-------+---------------+-------------+
|Movie|   32281|      Kimi no Na wa.|Drama, Romance, S...|       1|  9.37| 200630|              1|         6.49|
|   TV|    5114|Fullmetal Alchemi...|Action, Adventure...|      64|  9.26| 793665|             24|         6.94|
|   TV|   28977|            Gintama°|Action, Comedy, H...|      51|  9.25| 114262|             24|         6.94|
|   TV|    9253|         Steins;Gate|    Sci-Fi, Thriller|      24|  9.17| 673572|             24|         6.94|
|   TV|    9969|       Gintama&#039;|Action, Comedy, H...|      51|  9.16| 151266|             24|         6.94|
|   TV|   32935|Haikyuu!!: Karasu...|Comedy, Drama, Sc...|      10|  9.15|  93351|             2

Now let's drop the extra two columns

In [16]:
anime = anime.drop('median_episodes', 'median_rating')
anime.show()

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 18, Finished, Available)

+-----+--------+--------------------+--------------------+--------+------+-------+
| type|anime_id|                name|               genre|episodes|rating|members|
+-----+--------+--------------------+--------------------+--------+------+-------+
|Movie|   32281|      Kimi no Na wa.|Drama, Romance, S...|       1|  9.37| 200630|
|   TV|    5114|Fullmetal Alchemi...|Action, Adventure...|      64|  9.26| 793665|
|   TV|   28977|            Gintama°|Action, Comedy, H...|      51|  9.25| 114262|
|   TV|    9253|         Steins;Gate|    Sci-Fi, Thriller|      24|  9.17| 673572|
|   TV|    9969|       Gintama&#039;|Action, Comedy, H...|      51|  9.16| 151266|
|   TV|   32935|Haikyuu!!: Karasu...|Comedy, Drama, Sc...|      10|  9.15|  93351|
|   TV|   11061|Hunter x Hunter (...|Action, Adventure...|     148|  9.13| 425855|
|  OVA|     820|Ginga Eiyuu Densetsu|Drama, Military, ...|     110|  9.11|  80679|
|Movie|   15335|Gintama Movie: Ka...|Action, Comedy, H...|       1|   9.1|  72534|
|   

In [17]:
anime.summary().show()

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 19, Finished, Available)

+-------+-----+------------------+-----------------+------+------------------+-----------------+------------------+
|summary| type|          anime_id|             name| genre|          episodes|           rating|           members|
+-------+-----+------------------+-----------------+------+------------------+-----------------+------------------+
|  count|12210|             12232|            12232| 12232|             12210|            12210|             12232|
|   mean| null|13970.041530412034|         166778.5|  null|12.525470925470925|6.481859950488166|18159.224329627206|
| stddev| null|11415.656332338984|330891.6746051493|  null|46.399754577783575|1.016821609030229|54945.277559897164|
|    min|Movie|                 1|    &quot;0&quot;|Action|                 1|             1.67|                 5|
|    25%| null|              3464|              1.0|  null|                 1|              5.9|               229|
|    50%| null|             10197|           1989.0|  null|             

If we look at the 4th row of above output in name we can see HTML escape character. Let's remove it

In [18]:
from html import unescape
from pyspark.sql.functions import udf

unescape_udf = udf(unescape, StringType())

anime = anime.withColumn("name", unescape_udf(anime.name))

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 20, Finished, Available)

In [19]:
to_rmv = anime.filter(anime.rating.isNull())
anime = anime.filter(anime.rating.isNotNull())
anime = anime.withColumn('genre', trim(anime.genre))

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 21, Finished, Available)

to_rmv contains animes that are not rated, we might need to remove it from rating column also. 

In [20]:
to_rmv.show()

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 22, Finished, Available)

+----+--------+--------------------+--------------------+--------+------+-------+
|type|anime_id|                name|               genre|episodes|rating|members|
+----+--------+--------------------+--------------------+--------+------+-------+
|null|   30484|       Steins;Gate 0|    Sci-Fi, Thriller|    null|  null|  60999|
|null|   34437|Code Geass: Fukka...|Action, Drama, Me...|    null|  null|  22748|
|null|   33352|   Violet Evergarden|      Drama, Fantasy|    null|  null|  20564|
|null|   33248|    K: Seven Stories|Action, Drama, Su...|    null|  null|  22133|
|null|   33845|    Free! (Shinsaku)|      School, Sports|    null|  null|   8666|
|null|   33475|Busou Shoujo Mach...|Action, School, S...|    null|  null|   1896|
|null|   31456|Code:Realize: Sou...|Adventure, Fantas...|    null|  null|   4017|
|null|   34280|             Gamers!|Comedy, Romance, ...|    null|  null|   1045|
|null|   32455|             Gekidol|               Music|    null|  null|    586|
|null|   31433|G

# let's look at rating dataframe

In [21]:
# here we are remvoing all unrated animes along with above to_rmv and noramlize the rating values

rating = rating.filter(rating.rating!=-1)
rating = rating.subtract(rating.join(to_rmv, 'anime_id', how='inner').select('user_id', 'anime_id', rating.rating))

# let's separate data for content based filtering
for_content = rating

usr_cnt = rating.groupBy(rating.user_id).count()

#Note: Here I have not rounded the normalized rating

rating = rating.join(usr_cnt, on='user_id', how='inner').filter(col('count')>10).select(rating.columns)

min_rating, max_rating = (1, 10)

rating = rating.withColumn("rating", 4*(col("rating") - min_rating) / (max_rating - min_rating)+1)

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 23, Finished, Available)

# Let's start with Content based filtering

In [22]:
from pyspark.ml.feature import HashingTF, IDF, RegexTokenizer
from pyspark.mllib.linalg import SparseVector, DenseVector

def get_tfidf(df):
    tk = RegexTokenizer(inputCol='genre', outputCol='tokens', pattern=r',\s*').transform(df)
    num_features = tk.selectExpr('explode(tokens)').distinct().count()
    tf = HashingTF(inputCol='tokens', outputCol='tf', numFeatures=num_features).transform(tk)
    idf = IDF(inputCol='tf', outputCol='tfidf').fit(tf)
    return idf.transform(tf).drop('episodes', 'members', 'tf', 'tokens')

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 24, Finished, Available)

In [23]:
# this function will return list of animes that user has already watched
def get_user_profile(anime_tfidf, for_content, uid):
    r_animes = for_content.filter(for_content.user_id==uid).select("anime_id")

    return anime_tfidf.join(r_animes, r_animes.anime_id==anime_tfidf.anime_id, 'inner').drop(for_content.anime_id)

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 25, Finished, Available)

In [24]:
def get_sim_mat(df, for_content, uid):
    anime_tfidf = get_user_profile(df, for_content, uid).select("tfidf").first()[0]

    def cosine_similarity(vector):
        return float(vector.dot(anime_tfidf) / (vector.norm(2) * anime_tfidf.norm(2)))

    sim_df = df.withColumn("similarity", udf(cosine_similarity)(col("tfidf")))
    return sim_df
    

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 26, Finished, Available)

In [25]:
def return_top_animes(anime_tfidf, for_content, uid):
    sim_df = get_sim_mat(anime_tfidf, for_content, uid)
    return sim_df.orderBy(desc("similarity")).drop('tfidf', 'similarity').limit(10)

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 27, Finished, Available)

# Collaborative

First of all we wll split the data in train and test part

In [26]:
train, test = rating.randomSplit([0.7,0.3], seed=420)

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 28, Finished, Available)

Now initialize the ALS method. The optimal rank and regParam is obtained through CrossValidation which is commented for now.

In [27]:
from pyspark.ml.recommendation import ALS

als = ALS(userCol='user_id', ratingCol='rating', itemCol='anime_id', nonnegative = True, coldStartStrategy="drop", regParam=0.3, rank=40)

model = als.fit(train)

# from pyspark.ml.tuning import CrossValidator
# from pyspark.ml.tuning import ParamGridBuilder

# paramGrid = ParamGridBuilder().addGrid(als.regParam, [0.1, 0.3, 0.5, 0.7, 0.9]).addGrid(als.rank, [25, 30, 35, 40, 45]).build()

# crossvalidation = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=eval,numFolds=5)

# model = crossvalidation.fit(train).bestModel


StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 29, Finished, Available)



I have downloaded the model from fabric to use locally. We can load it as below

In [28]:
# from pyspark.ml.recommendation import ALSModel
# model = ALSModel.load('model')

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 30, Finished, Available)

We will calculate rmse value using code below

In [29]:
pred = model.transform(test)

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 31, Finished, Available)

In [30]:
from pyspark.ml.evaluation import RegressionEvaluator

eval=RegressionEvaluator(metricName="rmse",labelCol="rating",predictionCol="prediction")

rmse=eval.evaluate(pred)

rmse

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 32, Finished, Available)



0.6065302180725447

Our RMSE score looks bad :(

Here I wanted to implement Online learning but I am currently not able to work in it. I will complete it in future

In [31]:


# def get_train_data(ani_id, rate):
#     rated_animes = rating.filter(rating.user_id==uid).select("anime_id")

#     md_anime = anime.join(rated_animes, rated_animes.anime_id==anime.anime_id, "left_anti")
        
#     lst = spark.createDataFrame([(uid, ani_id, rate)], rating.columns)

#     lst = lst.withColumn("rating", 4*(col("rating") - 1) / (10 - 1)+1)

#     return train.union(lst), for_content.union(lst)


StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 33, Finished, Available)

Below function will return the prediction in proper form

In [32]:
def get_predicted_values(uid):
    rec = model.recommendForAllUsers(10)
    rec = rec.where(rec.user_id == uid).selectExpr("user_id", "explode(recommendations) as recm")
    rec = rec.select(rec.recm.getItem("anime_id").alias("anime_id"), rec.recm.getItem("rating").alias("rating"))

    return anime.join(rec, rec.anime_id==anime.anime_id, "inner").drop(rec.rating, rec.anime_id, 'episodes', 'members')

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 34, Finished, Available)

In [33]:
# It seems like you cannot take input in  fabric notebook
#uid = int(input("Enter user id: "))

uid = 1645

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 35, Finished, Available)

In [34]:
tfidf = get_tfidf(anime)
con = return_top_animes(tfidf, rating, 1645)
col = get_predicted_values(uid)

# we will combine the result from both content and collaborative filtering and show it to the user
rslt = con.union(col)

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 36, Finished, Available)

In [35]:
print(f"Here is the recommendation for user {uid}")
rslt.show()

StatementMeta(, e7210410-a7d2-427c-8cf1-81c59883a9b7, 37, Finished, Available)

Here is the recommendation for user 1645
+-------+--------+--------------------+--------------------+------+
|   type|anime_id|                name|               genre|rating|
+-------+--------+--------------------+--------------------+------+
|     TV|    6586| Yume-iro Pâtissière|Kids, School, Shoujo|  8.07|
|Special|    8894|Yume-iro Pâtissiè...|Kids, School, Shoujo|  7.29|
|Special|   28723|Aikatsu!: Dai Sta...|Music, School, Sh...|  6.85|
|     TV|    4908|     Bihada Ichizoku|Drama, Parody, Sh...|  5.87|
|    OVA|    3421|    Kodomo no Omocha|Comedy, Drama, Sc...|  7.24|
|Special|   28963|Nekota no Koto ga...|      School, Shoujo|  6.41|
|    ONA|   30842|Nekota no Koto ga...|      School, Shoujo|  6.46|
|    OVA|   24985|        Elite Jack!!|      School, Shoujo|  5.29|
|Special|    9366|Kaichou wa Maid-s...|Comedy, School, S...|  7.55|
|Special|   30625|Princess Princess...|Comedy, School, S...|  6.75|
|     TV|   32400|           KochinPa!|              Comedy|  4.94|
|  Musi