In [1]:
# Reading Dataset
import numpy as np
import pandas as pd

# Visualization
import plotly.express as px
from plotly.offline import init_notebook_mode, iplot
init_notebook_mode(connected=True)

from datetime import datetime
import pyspark.sql.functions as f

In [2]:
# Start spark session
from pyspark.sql import SparkSession

spark = SparkSession.builder \
            .config("spark.driver.memory", "12g") \
            .config("spark.cassandra.connection.host", "172.19.0.2") \
            .config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.2.0") \
            .getOrCreate()

24/08/14 00:35:59 WARN Utils: Your hostname, ubuntu20 resolves to a loopback address: 127.0.1.1; using 192.168.1.107 instead (on interface wlp0s20f3)
24/08/14 00:35:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/hungpm/Work/Master/BigData/my-anime-recommendation/.venv/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/hungpm/.ivy2/cache
The jars for the packages stored in: /home/hungpm/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c7b701db-994f-4419-b22d-b9b1df77d996;1.0
	confs: [default]
	found com.datastax.spark#spark-cassandra-connector_2.12;3.2.0 in central
	found com.datastax.spark#spark-cassandra-connector-driver_2.12;3.2.0 in central
	found com.datastax.oss#java-driver-core-shaded;4.13.0 in central
	found com.datastax.oss#native-protocol;1.5.0 in central
	found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
	found com.typesafe#config;1.4.1 in central
	found org.slf4j#slf4j-api;1.7.26 in central
	found io.dropwizard.metrics#metrics-core;4.1.18 in central
	found org.hdrhistogram#HdrHistogram;2.1.12 in central
	found org.reactivestreams#reactive-streams;1.0.3 in central
	found com.github.stephenc.jcip#jcip-annotations;1.0-1 in central
	

## Preprocessing

### Load dataset

In [3]:
# Load anime dataset
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType

schema = StructType([
    StructField("anime_id", IntegerType(), True),
    StructField("Name", StringType(), True),
    StructField("English name", StringType(), True),
    StructField("Other name", StringType(), True),
    StructField("Score", FloatType(), True),
    StructField("Genres", StringType(), True),
    StructField("Synopsis", StringType(), True),
    StructField("Type", StringType(), True),
    StructField("Episodes", FloatType(), True),
    StructField("Aired", StringType(), True),
    StructField("Premiered", StringType(), True),
    StructField("Status", StringType(), True),
    StructField("Producers", StringType(), True),
    StructField("Licensors", StringType(), True),
    StructField("Studios", StringType(), True),
    StructField("Source", StringType(), True),
    StructField("Duration", StringType(), True),
    StructField("Rating", StringType(), True),
    StructField("Rank", FloatType(), True),
    StructField("Popularity", IntegerType(), True),
    StructField("Favorites", IntegerType(), True),
    StructField("Scored By", FloatType(), True),
    StructField("Members", FloatType(), True),
    StructField("Image URL", StringType(), True),
])

df_anime = spark.read.csv(
    '../dataset/myanimelist-dataset/processed-dataset/anime-dataset-2023.csv', 
    header=True, 
    schema=schema, 
    multiLine=True, 
    quote='\"', 
    escape='\"'
)

In [4]:
# df_anime.select(f.col('anime_id'), f.col('Name').alias('name'), f.col('Score').alias('score')).write \
#   .format("org.apache.spark.sql.cassandra") \
#   .mode("append") \
#   .options(table="test_animes", keyspace="anime") \
#   .save()

In [4]:
# Importing user details dataset
schema = StructType([
    StructField("Mal ID", IntegerType(), True),
    StructField("Username", StringType(), True),
    StructField("Gender", StringType(), True),
    StructField("Birthday", StringType(), True),
    StructField("Location", StringType(), True),
    StructField("Joined", StringType(), True),
    StructField("Days Watched", FloatType(), True),
    StructField("Mean Score", FloatType(), True),
    StructField("Watching", FloatType(), True),
    StructField("Completed", FloatType(), True),
    StructField("On Hold", FloatType(), True),
    StructField("Dropped", FloatType(), True),
    StructField("Plan to Watch", FloatType(), True),
    StructField("Total Entries", FloatType(), True),
    StructField("Rewatched", FloatType(), True),
    StructField("Episodes Watched", FloatType(), True)
])

df_user = spark.read.csv("../dataset/myanimelist-dataset/processed-dataset/users-details-2023.csv", header=True, schema=schema)

In [5]:
# Importing user score dataset
schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("Username", StringType(), True),
    StructField("anime_id", IntegerType(), True),
    StructField("Anime Title", StringType(), True),
    StructField("rating", IntegerType(), True)
])

df_score = spark.read.csv("../dataset/myanimelist-dataset/processed-dataset/users-score-2023.csv", header=True, schema=schema)

### Filter anime dataset

In [6]:
df_anime = df_anime \
            .filter(~f.col('Genres').contains('UNKNOWN')) \
            .filter(~f.col('Genres').contains('Hentai')) \
            .filter(~f.col('Studios').contains('UNKNOWN'))

df_anime.show()

+--------+--------------------+--------------------+------------------------------+-----+--------------------+--------------------+-----+--------+--------------------+-----------+----------------+--------------------+--------------------+----------------+-----------+-------------+--------------------+------+----------+---------+---------+---------+--------------------+
|anime_id|                Name|        English name|                    Other name|Score|              Genres|            Synopsis| Type|Episodes|               Aired|  Premiered|          Status|           Producers|           Licensors|         Studios|     Source|     Duration|              Rating|  Rank|Popularity|Favorites|Scored By|  Members|           Image URL|
+--------+--------------------+--------------------+------------------------------+-----+--------------------+--------------------+-----+--------+--------------------+-----------+----------------+--------------------+--------------------+----------------+-

In [7]:
df_anime_reduced = df_anime.select('anime_id', 'Name', 'Genres', 'Synopsis', 'Studios')
df_anime_reduced.show(10, truncate=False)

+--------+-------------------------------+------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### Preprocessing Animes' Name

In [8]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, RegexTokenizer, StopWordsRemover
from pyspark.sql.types import ArrayType

# Breakdown animes' name into tokens with alphabet character, number, "-" and "_" only, other characters are deleted
nameRegexTokenizer = RegexTokenizer(inputCol="Name", outputCol="Name_tokens", pattern="[a-zA-Z0-9-_]+", gaps=False)
df_anime_reduced = nameRegexTokenizer.transform(df_anime_reduced)

In [12]:
df_anime_reduced.select('anime_id', 'Name', 'Name_tokens').show(10, truncate=False)

+--------+-------------------------------+------------------------------------+
|anime_id|Name                           |Name_tokens                         |
+--------+-------------------------------+------------------------------------+
|1       |Cowboy Bebop                   |[cowboy, bebop]                     |
|5       |Cowboy Bebop: Tengoku no Tobira|[cowboy, bebop, tengoku, no, tobira]|
|6       |Trigun                         |[trigun]                            |
|7       |Witch Hunter Robin             |[witch, hunter, robin]              |
|8       |Bouken Ou Beet                 |[bouken, ou, beet]                  |
|15      |Eyeshield 21                   |[eyeshield, 21]                     |
|16      |Hachimitsu to Clover           |[hachimitsu, to, clover]            |
|17      |Hungry Heart: Wild Striker     |[hungry, heart, wild, striker]      |
|18      |Initial D Fourth Stage         |[initial, d, fourth, stage]         |
|19      |Monster                       

In [9]:
# udf to remove "-" and "_" from the tokens
import re
remove_hyphen_udf = f.udf(lambda x: [re.sub('[-|_]', '', word) for word in x], ArrayType(StringType()))

In [10]:
df_anime_reduced = df_anime_reduced.withColumn('Name_tokens', remove_hyphen_udf(f.col('Name_tokens')))

In [15]:
df_anime_reduced.show(10)

[Stage 5:>                                                          (0 + 1) / 1]

+--------+--------------------+--------------------+--------------------+----------------+--------------------+
|anime_id|                Name|              Genres|            Synopsis|         Studios|         Name_tokens|
+--------+--------------------+--------------------+--------------------+----------------+--------------------+
|       1|        Cowboy Bebop|Action, Award Win...|Crime is timeless...|         Sunrise|     [cowboy, bebop]|
|       5|Cowboy Bebop: Ten...|      Action, Sci-Fi|Another day, anot...|           Bones|[cowboy, bebop, t...|
|       6|              Trigun|Action, Adventure...|Vash the Stampede...|        Madhouse|            [trigun]|
|       7|  Witch Hunter Robin|Action, Drama, My...|Robin Sena is a p...|         Sunrise|[witch, hunter, r...|
|       8|      Bouken Ou Beet|Adventure, Fantas...|It is the dark ce...|  Toei Animation|  [bouken, ou, beet]|
|      15|        Eyeshield 21|              Sports|Shy, reserved, an...|          Gallop|     [eyeshiel

                                                                                

In [11]:
# Remove (English) stopwords from tokens
nameRemover = StopWordsRemover(inputCol="Name_tokens", outputCol="Name_tokens_removed")
nameRemover.loadDefaultStopWords('english')
df_anime_reduced = nameRemover.transform(df_anime_reduced)

In [19]:
df_anime_reduced.select('Name', 'Name_tokens_removed').show(20, truncate=False)

+----------------------------------------------------------+--------------------------------------------------------------+
|Name                                                      |Name_tokens_removed                                           |
+----------------------------------------------------------+--------------------------------------------------------------+
|Arc the Lad                                               |[arc, lad]                                                    |
|E's Otherwise                                             |[e, otherwise]                                                |
|eX-Driver the Movie                                       |[exdriver, movie]                                             |
|Aria the Animation                                        |[aria, animation]                                             |
|Saishuu Heiki Kanojo: Another Love Song                   |[saishuu, heiki, kanojo, another, love, song]                 |
|Chou He

### Preprocessing Animes' Synopsis

In [12]:
# Breakdown synopsis into tokens, remove charaters except alphabet, numbers, "-" and "_"
regexTokenizer = RegexTokenizer(inputCol="Synopsis", outputCol="Synopsis_tokens", pattern="[a-zA-Z0-9-_]+", gaps=False)
df_anime_reduced = regexTokenizer.transform(df_anime_reduced)

In [21]:
df_anime_reduced.show(10)

+--------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+--------------------+
|anime_id|                Name|              Genres|            Synopsis|         Studios|         Name_tokens| Name_tokens_removed|     Synopsis_tokens|
+--------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+--------------------+
|       1|        Cowboy Bebop|Action, Award Win...|Crime is timeless...|         Sunrise|     [cowboy, bebop]|     [cowboy, bebop]|[crime, is, timel...|
|       5|Cowboy Bebop: Ten...|      Action, Sci-Fi|Another day, anot...|           Bones|[cowboy, bebop, t...|[cowboy, bebop, t...|[another, day, an...|
|       6|              Trigun|Action, Adventure...|Vash the Stampede...|        Madhouse|            [trigun]|            [trigun]|[vash, the, stamp...|
|       7|  Witch Hunter Robin|Action, Drama, My...|Robin Sena is a p...|   

In [13]:
# Remove "-" and "_"
df_anime_reduced = df_anime_reduced.withColumn('Synopsis_tokens', remove_hyphen_udf(f.col('Synopsis_tokens')))

In [23]:
df_anime_reduced.show(10)

+--------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+--------------------+
|anime_id|                Name|              Genres|            Synopsis|         Studios|         Name_tokens| Name_tokens_removed|     Synopsis_tokens|
+--------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+--------------------+
|       1|        Cowboy Bebop|Action, Award Win...|Crime is timeless...|         Sunrise|     [cowboy, bebop]|     [cowboy, bebop]|[crime, is, timel...|
|       5|Cowboy Bebop: Ten...|      Action, Sci-Fi|Another day, anot...|           Bones|[cowboy, bebop, t...|[cowboy, bebop, t...|[another, day, an...|
|       6|              Trigun|Action, Adventure...|Vash the Stampede...|        Madhouse|            [trigun]|            [trigun]|[vash, the, stamp...|
|       7|  Witch Hunter Robin|Action, Drama, My...|Robin Sena is a p...|   

In [14]:
# Remove (English) stopwords
remover = StopWordsRemover(inputCol="Synopsis_tokens", outputCol="Synopsis_tokens_removed")
remover.loadDefaultStopWords('english')
df_anime_reduced = remover.transform(df_anime_reduced)

In [25]:
df_anime_reduced.show(10)

+--------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+--------------------+-----------------------+
|anime_id|                Name|              Genres|            Synopsis|         Studios|         Name_tokens| Name_tokens_removed|     Synopsis_tokens|Synopsis_tokens_removed|
+--------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+--------------------+-----------------------+
|       1|        Cowboy Bebop|Action, Award Win...|Crime is timeless...|         Sunrise|     [cowboy, bebop]|     [cowboy, bebop]|[crime, is, timel...|   [crime, timeless,...|
|       5|Cowboy Bebop: Ten...|      Action, Sci-Fi|Another day, anot...|           Bones|[cowboy, bebop, t...|[cowboy, bebop, t...|[another, day, an...|   [another, day, an...|
|       6|              Trigun|Action, Adventure...|Vash the Stampede...|        Madhouse|            [trigun]

In [15]:
# Apply stemmming of NLTK to remove stems and convert words in tokens to original form (e.g: tries --> try)

from nltk.stem import PorterStemmer
from nltk.tokenize import sent_tokenize, word_tokenize

# Apply stemming with NLTK
# Built-in class from NLTK
ps = PorterStemmer()
# udf to apply stemming
stemming = f.udf(lambda x: [ps.stem(item) for item in x], ArrayType(StringType()))
# apply udf to tokens
df_anime_reduced = df_anime_reduced.withColumn('Synopsis_tokens_stemmed', stemming(f.col('Synopsis_tokens_removed')))

In [16]:
df_anime_reduced = df_anime_reduced.select('anime_id', 'Name', 'Genres', 'Name_tokens_removed', 'Studios', 'Synopsis_tokens_stemmed')

In [31]:
df_anime_reduced.select("Genres", "Studios").show(10, truncate=False)

+------------------------------------+----------------+
|Genres                              |Studios         |
+------------------------------------+----------------+
|Action, Award Winning, Sci-Fi       |Sunrise         |
|Action, Sci-Fi                      |Bones           |
|Action, Adventure, Sci-Fi           |Madhouse        |
|Action, Drama, Mystery, Supernatural|Sunrise         |
|Adventure, Fantasy, Supernatural    |Toei Animation  |
|Sports                              |Gallop          |
|Comedy, Drama, Romance              |J.C.Staff       |
|Comedy, Slice of Life, Sports       |Nippon Animation|
|Action, Drama                       |A.C.G.T.        |
|Drama, Mystery, Suspense            |Madhouse        |
+------------------------------------+----------------+
only showing top 10 rows



### Preprocessing Genres

In [17]:
# Lowercase Genres
df_anime_reduced = df_anime_reduced.withColumn("Genres", f.lower(f.col("Genres")))

# Convert Genres from string to list of genres
df_anime_reduced = df_anime_reduced.withColumn("Genres", f.split(f.col('Genres'), ', ').cast(ArrayType(StringType())))

df_anime_reduced.select("Genres").show(5)

+--------------------+
|              Genres|
+--------------------+
|[action, award wi...|
|    [action, sci-fi]|
|[action, adventur...|
|[action, drama, m...|
|[adventure, fanta...|
+--------------------+
only showing top 5 rows



### Preprocessing Animes' Studios

In [26]:
# Check if animes produced by multiple studios exist
df_anime_reduced.filter(df_anime_reduced.Studios.contains(',')).show()

+--------+--------------------+--------------------+--------------------+--------------------+-----------------------+
|anime_id|                Name|              Genres| Name_tokens_removed|             Studios|Synopsis_tokens_stemmed|
+--------+--------------------+--------------------+--------------------+--------------------+-----------------------+
|      30|Neon Genesis Evan...|[action, avant ga...|[neon, genesis, e...|Gainax, Tatsunoko...|   [fifteen, year, c...|
|      31|Neon Genesis Evan...|     [drama, sci-fi]|[neon, genesis, e...|Gainax, Productio...|   [year, 2015, deca...|
|      32|Neon Genesis Evan...|[avant garde, dra...|[neon, genesis, e...|Gainax, Productio...|   [shinji, ikari, l...|
|      45|Rurouni Kenshin: ...|[action, adventur...|[rurouni, kenshin...| Gallop, Studio Deen|   [final, year, bak...|
|      62|       D.C.: Da Capo|    [drama, romance]|    [d, c, da, capo]|        feel., Zexcs|   [hatsunejima, abo...|
|     112|Chou Henshin Cosp...|[action, adventur

In [18]:
# Lowercase Studios
df_anime_reduced = df_anime_reduced.withColumn('Studios', f.lower(f.col('Studios')))

# Convert Studios from string to list of studios
df_anime_reduced = df_anime_reduced.withColumn("Studios", f.split(f.col('Studios'), ', ').cast(ArrayType(StringType())))

df_anime_reduced.select("Studios").show(5)

+----------------+
|         Studios|
+----------------+
|       [sunrise]|
|         [bones]|
|      [madhouse]|
|       [sunrise]|
|[toei animation]|
+----------------+
only showing top 5 rows



In [19]:
# Select and rename necessary columns
df_anime_reduced = df_anime_reduced \
                    .select(
                        f.col('anime_id'), 
                        f.col('Name_tokens_removed').alias('preprocessed_name'),
                        f.col('Genres').alias('preprocessed_genres'),
                        f.col('Synopsis_tokens_stemmed').alias('preprocessed_synopsis'),
                        f.col('Studios').alias('preprocessed_studios')
                    )

df_anime_reduced.show(5)

[Stage 4:>                                                          (0 + 1) / 1]

+--------+--------------------+--------------------+---------------------+--------------------+
|anime_id|   preprocessed_name| preprocessed_genres|preprocessed_synopsis|preprocessed_studios|
+--------+--------------------+--------------------+---------------------+--------------------+
|       1|     [cowboy, bebop]|[action, award wi...| [crime, timeless,...|           [sunrise]|
|       5|[cowboy, bebop, t...|    [action, sci-fi]| [anoth, day, anot...|             [bones]|
|       6|            [trigun]|[action, adventur...| [vash, stamped, m...|          [madhouse]|
|       7|[witch, hunter, r...|[action, drama, m...| [robin, sena, pow...|           [sunrise]|
|       8|  [bouken, ou, beet]|[adventure, fanta...| [dark, centuri, p...|    [toei animation]|
+--------+--------------------+--------------------+---------------------+--------------------+
only showing top 5 rows



                                                                                

## TF-IDF & Cosine Similarity

### TF-IDF

In [20]:
# featurize data with tf-idf
def featurizeDataWithTFIDF(input_df: pd.DataFrame, input_col: str, output_col: str):
    raw_output_col = "raw_" + output_col

    # hashing tf
    hashingTF = HashingTF(inputCol=input_col, outputCol=raw_output_col)
    featurized_data_df = hashingTF.transform(input_df)

    # idf transform
    idf = IDF(inputCol=raw_output_col, outputCol=output_col)
    idfModel = idf.fit(featurized_data_df)
    rescaled_data_df = idfModel.transform(featurized_data_df)
    rescaled_data_df = rescaled_data_df.drop(raw_output_col)

    return rescaled_data_df

In [21]:
# apply tf-idf on name
df_anime_reduced = featurizeDataWithTFIDF(df_anime_reduced, "preprocessed_name", "name_features")
df_anime_reduced.show(5)

24/08/14 00:37:09 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
[Stage 6:>                                                          (0 + 1) / 1]

+--------+--------------------+--------------------+---------------------+--------------------+--------------------+
|anime_id|   preprocessed_name| preprocessed_genres|preprocessed_synopsis|preprocessed_studios|       name_features|
+--------+--------------------+--------------------+---------------------+--------------------+--------------------+
|       1|     [cowboy, bebop]|[action, award wi...| [crime, timeless,...|           [sunrise]|(262144,[61669,21...|
|       5|[cowboy, bebop, t...|    [action, sci-fi]| [anoth, day, anot...|             [bones]|(262144,[61669,10...|
|       6|            [trigun]|[action, adventur...| [vash, stamped, m...|          [madhouse]|(262144,[231180],...|
|       7|[witch, hunter, r...|[action, drama, m...| [robin, sena, pow...|           [sunrise]|(262144,[59767,62...|
|       8|  [bouken, ou, beet]|[adventure, fanta...| [dark, centuri, p...|    [toei animation]|(262144,[38215,41...|
+--------+--------------------+--------------------+------------

                                                                                

In [22]:
# apply tf-idf on genres
df_anime_reduced = featurizeDataWithTFIDF(df_anime_reduced, "preprocessed_genres", "genres_features")
df_anime_reduced.show(5)

24/08/14 00:37:13 WARN DAGScheduler: Broadcasting large task binary with size 8.1 MiB
[Stage 8:>                                                          (0 + 1) / 1]

+--------+--------------------+--------------------+---------------------+--------------------+--------------------+--------------------+
|anime_id|   preprocessed_name| preprocessed_genres|preprocessed_synopsis|preprocessed_studios|       name_features|     genres_features|
+--------+--------------------+--------------------+---------------------+--------------------+--------------------+--------------------+
|       1|     [cowboy, bebop]|[action, award wi...| [crime, timeless,...|           [sunrise]|(262144,[61669,21...|(262144,[90693,16...|
|       5|[cowboy, bebop, t...|    [action, sci-fi]| [anoth, day, anot...|             [bones]|(262144,[61669,10...|(262144,[161038,2...|
|       6|            [trigun]|[action, adventur...| [vash, stamped, m...|          [madhouse]|(262144,[231180],...|(262144,[161038,2...|
|       7|[witch, hunter, r...|[action, drama, m...| [robin, sena, pow...|           [sunrise]|(262144,[59767,62...|(262144,[6512,685...|
|       8|  [bouken, ou, beet]|[ad

                                                                                

In [23]:
# apply tf-idf on synopsis
df_anime_reduced = featurizeDataWithTFIDF(df_anime_reduced, "preprocessed_synopsis", "synopsis_features")
df_anime_reduced.show(5)

24/08/14 00:37:25 WARN DAGScheduler: Broadcasting large task binary with size 12.1 MiB


+--------+--------------------+--------------------+---------------------+--------------------+--------------------+--------------------+--------------------+
|anime_id|   preprocessed_name| preprocessed_genres|preprocessed_synopsis|preprocessed_studios|       name_features|     genres_features|   synopsis_features|
+--------+--------------------+--------------------+---------------------+--------------------+--------------------+--------------------+--------------------+
|       1|     [cowboy, bebop]|[action, award wi...| [crime, timeless,...|           [sunrise]|(262144,[61669,21...|(262144,[90693,16...|(262144,[1218,720...|
|       5|[cowboy, bebop, t...|    [action, sci-fi]| [anoth, day, anot...|             [bones]|(262144,[61669,10...|(262144,[161038,2...|(262144,[17891,20...|
|       6|            [trigun]|[action, adventur...| [vash, stamped, m...|          [madhouse]|(262144,[231180],...|(262144,[161038,2...|(262144,[10809,11...|
|       7|[witch, hunter, r...|[action, drama,

In [24]:
# apply tf-idf on studios
df_anime_reduced = featurizeDataWithTFIDF(df_anime_reduced, "preprocessed_studios", "studios_features")
df_anime_reduced.show(5)

24/08/14 00:37:28 WARN DAGScheduler: Broadcasting large task binary with size 16.1 MiB
[Stage 12:>                                                         (0 + 1) / 1]

+--------+--------------------+--------------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|anime_id|   preprocessed_name| preprocessed_genres|preprocessed_synopsis|preprocessed_studios|       name_features|     genres_features|   synopsis_features|    studios_features|
+--------+--------------------+--------------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|       1|     [cowboy, bebop]|[action, award wi...| [crime, timeless,...|           [sunrise]|(262144,[61669,21...|(262144,[90693,16...|(262144,[1218,720...|(262144,[13445],[...|
|       5|[cowboy, bebop, t...|    [action, sci-fi]| [anoth, day, anot...|             [bones]|(262144,[61669,10...|(262144,[161038,2...|(262144,[17891,20...|(262144,[41817],[...|
|       6|            [trigun]|[action, adventur...| [vash, stamped, m...|          [madhouse]|(2621

                                                                                

In [None]:
# apply tf-idf on name
df_anime_reduced = featurizeDataWithTFIDF(df_anime_reduced, "preprocessed_name", "name_features")
# apply tf-idf on genres
df_anime_reduced = featurizeDataWithTFIDF(df_anime_reduced, "preprocessed_genres", "genres_features")
# apply tf-idf on synopsis
df_anime_reduced = featurizeDataWithTFIDF(df_anime_reduced, "preprocessed_synopsis", "synopsis_features")
# apply tf-idf on studios
df_anime_reduced = featurizeDataWithTFIDF(df_anime_reduced, "preprocessed_studios", "studios_features")

In [41]:
df_anime_reduced.select('name_features', 'genres_features', 'synopsis_features', 'studios_features').show(5)

24/08/13 23:54:23 WARN DAGScheduler: Broadcasting large task binary with size 16.1 MiB


+--------------------+--------------------+--------------------+--------------------+
|       name_features|     genres_features|   synopsis_features|    studios_features|
+--------------------+--------------------+--------------------+--------------------+
|(262144,[61669,21...|(262144,[90693,16...|(262144,[1218,720...|(262144,[13445],[...|
|(262144,[61669,10...|(262144,[161038,2...|(262144,[17891,20...|(262144,[41817],[...|
|(262144,[231180],...|(262144,[161038,2...|(262144,[10809,11...|(262144,[250632],...|
|(262144,[59767,62...|(262144,[6512,685...|(262144,[5674,594...|(262144,[13445],[...|
|(262144,[38215,41...|(262144,[183324,1...|(262144,[9129,114...|(262144,[98507],[...|
+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



### Consine Similarity

In [25]:
# Consine Similarity Function

import numpy as np 

def cos_sim(u, v):
    return float(u.dot(v) / (u.norm(2) * v.norm(2)))

compute_sim = f.udf(cos_sim, FloatType())

In [26]:
df_anime_reduced = df_anime_reduced.select('anime_id', 'name_features', 'genres_features', 'synopsis_features', 'studios_features')
df_anime_reduced.show(5)

24/08/14 00:37:36 WARN DAGScheduler: Broadcasting large task binary with size 16.1 MiB


+--------+--------------------+--------------------+--------------------+--------------------+
|anime_id|       name_features|     genres_features|   synopsis_features|    studios_features|
+--------+--------------------+--------------------+--------------------+--------------------+
|       1|(262144,[61669,21...|(262144,[90693,16...|(262144,[1218,720...|(262144,[13445],[...|
|       5|(262144,[61669,10...|(262144,[161038,2...|(262144,[17891,20...|(262144,[41817],[...|
|       6|(262144,[231180],...|(262144,[161038,2...|(262144,[10809,11...|(262144,[250632],...|
|       7|(262144,[59767,62...|(262144,[6512,685...|(262144,[5674,594...|(262144,[13445],[...|
|       8|(262144,[38215,41...|(262144,[183324,1...|(262144,[9129,114...|(262144,[98507],[...|
+--------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



                                                                                

In [27]:
# cross join to create decartes product --> list of pairs of animes
df_anime_joined = df_anime_reduced.crossJoin(
                    df_anime_reduced \
                        .withColumnRenamed('anime_id', 'anime_id_2')
                        .withColumnRenamed('name_features', 'name_features2')
                        .withColumnRenamed('genres_features', 'genres_features2')
                        .withColumnRenamed('synopsis_features', 'synopsis_features2')
                        .withColumnRenamed('studios_features', 'studios_features2')
                )

df_anime_joined.show(5)

24/08/14 00:37:39 WARN DAGScheduler: Broadcasting large task binary with size 16.1 MiB
24/08/14 00:37:40 WARN DAGScheduler: Broadcasting large task binary with size 16.1 MiB


+--------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+
|anime_id|       name_features|     genres_features|   synopsis_features|    studios_features|anime_id_2|      name_features2|    genres_features2|  synopsis_features2|   studios_features2|
+--------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+
|       1|(262144,[61669,21...|(262144,[90693,16...|(262144,[1218,720...|(262144,[13445],[...|         1|(262144,[61669,21...|(262144,[90693,16...|(262144,[1218,720...|(262144,[13445],[...|
|       1|(262144,[61669,21...|(262144,[90693,16...|(262144,[1218,720...|(262144,[13445],[...|         5|(262144,[61669,10...|(262144,[161038,2...|(262144,[17891,20...|(262144,[41817],[...|
|       1|(262144,[61669,21...|(262144,[90693,16..

                                                                                

In [28]:
# remove pairs that have same animes
print("Before remove: ", df_anime_joined.count(), " pairs")
df_anime_joined = df_anime_joined \
                    .filter(df_anime_joined.anime_id != df_anime_joined.anime_id_2) \
                    .filter(df_anime_joined.anime_id < df_anime_joined.anime_id_2)
print("After remove: ", df_anime_joined.count(), " pairs")

Before remove:  141610000  pairs


[Stage 21:>                                                         (0 + 1) / 1]

After remove:  70799050  pairs


                                                                                

In [29]:
# Compute cosine similarity on each features
computed_df = df_anime_joined \
                .withColumn('name_cos_sim', compute_sim(f.col('name_features'), f.col('name_features2'))) \
                .withColumn('genres_cos_sim', compute_sim(f.col('genres_features'), f.col('genres_features2'))) \
                .withColumn('synopsis_cos_sim', compute_sim(f.col('synopsis_features'), f.col('synopsis_features2'))) \
                .withColumn('studios_cos_sim', compute_sim(f.col('studios_features'), f.col('studios_features2')))

computed_df.select('name_cos_sim', 'genres_cos_sim', 'synopsis_cos_sim', 'studios_cos_sim').show(5)

24/08/14 00:37:51 WARN DAGScheduler: Broadcasting large task binary with size 16.1 MiB
24/08/14 00:38:01 WARN DAGScheduler: Broadcasting large task binary with size 16.2 MiB
[Stage 25:>                                                         (0 + 1) / 1]

+------------+--------------+----------------+---------------+
|name_cos_sim|genres_cos_sim|synopsis_cos_sim|studios_cos_sim|
+------------+--------------+----------------+---------------+
|   0.7017934|    0.41932264|      0.28034365|            0.0|
|         0.0|     0.3329548|     0.023258194|            0.0|
|         0.0|    0.06463374|     0.033542246|            1.0|
|         0.0|           0.0|     0.007451746|            0.0|
|         0.0|           0.0|     0.020076532|            0.0|
+------------+--------------+----------------+---------------+
only showing top 5 rows



                                                                                

In [48]:
computed_df.show(5)

24/08/14 00:18:06 WARN DAGScheduler: Broadcasting large task binary with size 16.1 MiB
24/08/14 00:18:16 WARN DAGScheduler: Broadcasting large task binary with size 16.2 MiB
[Stage 42:>                                                         (0 + 1) / 1]

+--------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+------------+--------------+----------------+---------------+
|anime_id|       name_features|     genres_features|   synopsis_features|    studios_features|anime_id_2|      name_features2|    genres_features2|  synopsis_features2|   studios_features2|name_cos_sim|genres_cos_sim|synopsis_cos_sim|studios_cos_sim|
+--------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+------------+--------------+----------------+---------------+
|       1|(262144,[61669,21...|(262144,[90693,16...|(262144,[1218,720...|(262144,[13445],[...|         5|(262144,[61669,10...|(262144,[161038,2...|(262144,[17891,20...|(262144,[41817],[...|   0.7017934|    0.41932264|      0.28034365|            0

                                                                                

In [30]:
computed_df = computed_df.fillna({
    'name_cos_sim': 0,
    'genres_cos_sim': 0,
    'synopsis_cos_sim': 0,
    'studios_cos_sim': 0
})

In [31]:
# Compute final cos_sim from cos_sim components
def computeFinalCosineSimilarity(name, genres, synopsis, studios):
    return name + genres + synopsis + 0.05*studios

computeFinalSim = f.udf(computeFinalCosineSimilarity, FloatType())

In [32]:
computed_df = computed_df.withColumn('cos_sim', computeFinalSim(
                                                    f.col('name_cos_sim'), 
                                                    f.col('genres_cos_sim'), 
                                                    f.col('synopsis_cos_sim'), 
                                                    f.col('studios_cos_sim'))
                                    )

computed_df.select("anime_id", "anime_id_2", "cos_sim").show(10)

24/08/14 00:38:11 WARN DAGScheduler: Broadcasting large task binary with size 16.1 MiB
24/08/14 00:38:20 WARN DAGScheduler: Broadcasting large task binary with size 16.2 MiB
[Stage 27:>                                                         (0 + 1) / 1]

+--------+----------+-----------+
|anime_id|anime_id_2|    cos_sim|
+--------+----------+-----------+
|       1|         5|  1.4014597|
|       1|         6| 0.35621297|
|       1|         7| 0.14817598|
|       1|         8|0.007451746|
|       1|        15|0.020076532|
|       1|        16|        0.0|
|       1|        17|0.012350674|
|       1|        18| 0.13665321|
|       1|        19|0.022622788|
|       1|        20| 0.12198547|
+--------+----------+-----------+
only showing top 10 rows



                                                                                

In [None]:
computed_df = computed_df.filter(computed_df.cos_sim > 0)
computed_df.count()

In [44]:
computed_df.show()

24/08/08 15:35:59 WARN DAGScheduler: Broadcasting large task binary with size 16.1 MiB
[Stage 37:>                 (0 + 1) / 1][Stage 38:>                 (0 + 1) / 1]

## Recommendation

In [51]:
# Find an anime by name
df_anime.filter(f.col('Name').contains('Sen to')).select('anime_id', 'Name', 'English name').show(truncate=False)

+--------+-----------------------------+-------------+
|anime_id|Name                         |English name |
+--------+-----------------------------+-------------+
|199     |Sen to Chihiro no Kamikakushi|Spirited Away|
+--------+-----------------------------+-------------+



In [33]:
# Get chosen anime info based on anime id
anime_id = 199

print("Chosen anime: ")
chosen_anime = df_anime.filter(df_anime.anime_id == anime_id).select('anime_id', 'Name', 'Score', 'Genres', 'Studios', 'Synopsis')
chosen_anime.show(truncate=False)

Chosen anime: 
+--------+-----------------------------+-----+--------------------------------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|anime_id|Name                         |Score|Genres                                |S

In [37]:
# Get raw recommendations
recommendation_result = computed_df.filter((computed_df.anime_id == anime_id) | (computed_df.anime_id_2 == anime_id)).orderBy('cos_sim', ascending=False)
recommendation_result.show(20)

24/08/14 00:52:42 WARN DAGScheduler: Broadcasting large task binary with size 16.1 MiB
24/08/14 00:52:51 WARN DAGScheduler: Broadcasting large task binary with size 16.2 MiB


+--------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+------------+--------------+----------------+---------------+----------+
|anime_id|       name_features|     genres_features|   synopsis_features|    studios_features|anime_id_2|      name_features2|    genres_features2|  synopsis_features2|   studios_features2|name_cos_sim|genres_cos_sim|synopsis_cos_sim|studios_cos_sim|   cos_sim|
+--------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+------------+--------------+----------------+---------------+----------+
|     199|(262144,[17246,73...|(262144,[90693,18...|(262144,[440,868,...|(262144,[27674],[...|       523|(262144,[40546,22...|(262144,[90693,18...|(262144,[3329,871...|(262144,[27674],[...|         0.0|           1

                                                                                

In [None]:
recommendation_result_reduced = recommendation_result.select('anime_id_2', 'cos_sim')
recommendation_result_reduced.show()

24/08/08 10:12:41 WARN DAGScheduler: Broadcasting large task binary with size 16.1 MiB
24/08/08 10:12:42 WARN DAGScheduler: Broadcasting large task binary with size 16.2 MiB


+----------+----------+
|anime_id_2|   cos_sim|
+----------+----------+
|       523| 1.0711282|
|     34792| 1.0050004|
|      2154| 1.0007684|
|     37976| 0.9612518|
|     24591|0.95036334|
|     10389| 0.9487332|
|      2895| 0.9463384|
|      8487|0.93051946|
|      7711|0.93019897|
|     37682|0.92877895|
|     32281| 0.9275644|
|     23987| 0.9247904|
|      2890|0.92308253|
|      1030| 0.9099737|
|     33089| 0.9077813|
|       164|0.89474857|
|     39792|0.89347494|
|     12477| 0.8797684|
|     10408| 0.8698769|
|     16664| 0.8686535|
+----------+----------+
only showing top 20 rows



                                                                                

In [38]:
# Get raw recommendations
recommendation_result = computed_df \
        .filter(computed_df.anime_id == anime_id | computed_df.anime_id_2 == anime_id) \
        .orderBy('cos_sim', ascending=False)
recommendation_result_reduced = recommendation_result.select('anime_id', 'anime_id_2', 'cos_sim').withColumnRenamed('anime_id', 'anime_id_1')

# Get recommended anime info based on raw recommendations
df_anime_recommended_info = recommendation_result_reduced \
                    .join(
                        df_anime, 
                        (
                            ((recommendation_result_reduced.anime_id_2 != anime_id) & (recommendation_result_reduced.anime_id_2 == df_anime.anime_id)) | 
                            ((recommendation_result_reduced.anime_id_1 != anime_id) & (recommendation_result_reduced.anime_id_1 == df_anime.anime_id)))
                        ) \
                    .orderBy('cos_sim', ascending=False) \
                    .select('anime_id', 'cos_sim', 'Name', 'Score', 'Genres', 'Studios')

df_anime_recommended_info.show(truncate=False)

24/08/14 00:57:29 WARN DAGScheduler: Broadcasting large task binary with size 16.1 MiB
24/08/14 00:57:38 WARN DAGScheduler: Broadcasting large task binary with size 16.2 MiB


+--------+----------+------------------------------------------------+-----+--------------------------------------------------+-------------------------------------+
|anime_id|cos_sim   |Name                                            |Score|Genres                                            |Studios                              |
+--------+----------+------------------------------------------------+-----+--------------------------------------------------+-------------------------------------+
|523     |1.0711282 |Tonari no Totoro                                |8.25 |Adventure, Award Winning, Supernatural            |Studio Ghibli                        |
|34792   |1.0050004 |Yoake Tsugeru Lu no Uta                         |7.39 |Adventure, Award Winning, Supernatural            |Science SARU                         |
|2154    |1.0007684 |Tekkon Kinkreet                                 |7.95 |Action, Adventure, Award Winning, Supernatural    |Studio 4°C                           |
|379

                                                                                

In [None]:
# Get recommended anime produced by the same studio as chosen anime's
number_of_recommendations = 20
studio = chosen_anime.select(f.collect_list('Studios')).first()[0][0]

df_anime_recommended_info.filter(f.col('Studios').contains(studio)).show(int(number_of_recommendations), truncate=False)

24/08/07 15:04:35 WARN DAGScheduler: Broadcasting large task binary with size 16.1 MiB
24/08/07 15:04:36 WARN DAGScheduler: Broadcasting large task binary with size 16.2 MiB


+--------+----------+----------------------------+-----+--------------------------------------------------------+-------------------------------+
|anime_id|cos_sim   |Name                        |Score|Genres                                                  |Studios                        |
+--------+----------+----------------------------+-----+--------------------------------------------------------+-------------------------------+
|523     |1.0711282 |Tonari no Totoro            |8.25 |Adventure, Award Winning, Supernatural                  |Studio Ghibli                  |
|2895    |0.9463384 |Kujiratori                  |6.07 |Adventure, Award Winning                                |Studio Ghibli                  |
|7711    |0.93019897|Karigurashi no Arrietty     |7.89 |Award Winning, Fantasy                                  |Studio Ghibli                  |
|2890    |0.92308253|Gake no Ue no Ponyo         |7.92 |Adventure, Award Winning, Fantasy                       |Studio Ghib

                                                                                

In [None]:
df_anime_recommended_info.filter(~f.col('Studios').contains(studio)).show(int(number_of_recommendations), truncate=False)

24/08/07 13:27:32 WARN DAGScheduler: Broadcasting large task binary with size 16.1 MiB
24/08/07 13:27:34 WARN DAGScheduler: Broadcasting large task binary with size 16.2 MiB


+--------+----------+------------------------------------------------+-----+---------------------------------------------------+-------------------------------------+
|anime_id|cos_sim   |Name                                            |Score|Genres                                             |Studios                              |
+--------+----------+------------------------------------------------+-----+---------------------------------------------------+-------------------------------------+
|34792   |1.0050004 |Yoake Tsugeru Lu no Uta                         |7.39 |Adventure, Award Winning, Supernatural             |Science SARU                         |
|2154    |1.0007684 |Tekkon Kinkreet                                 |7.95 |Action, Adventure, Award Winning, Supernatural     |Studio 4°C                           |
|37976   |0.9612518 |Zombieland Saga                                 |7.51 |Award Winning, Comedy, Supernatural                |MAPPA                                

                                                                                

### Save computed results

In [None]:
# save computed results to cassandra
computed_df.select(f.col('anime_id'), f.col('anime_id_2'), f.col('cos_sim')).write \
  .format("org.apache.spark.sql.cassandra") \
  .mode("append") \
  .options(table="anime_similarity", keyspace="anime") \
  .save()