# Readme

Before running, import into cluster FileStore/tables. All files should be `gzip`ed already.

* All of [IMDB Non-Commerical Dataset](https://datasets.imdbws.com/) in .gz format
    * `title.akas.tsv.gz`
    * `title.ratings.tsv.gz`
    * `title.principals.tsv.gz`
    * `title.episode.tsv.gz`
    * `title.crew.tsv.gz`
    * `title.basics.tsv.gz`
    * `name.basics.tsv.gz`
* All files from the [Kaggle Anime Dataset](https://www.kaggle.com/datasets/dbdmobile/myanimelist-dataset)
    * `anime_dataset.csv.gz`
        > This dataset contains comprehensive details of 24,905 anime entries.
    * `anime_filtered.csv.gz`
        > This dataset provide information about the different attributes and characteristics of each anime (Based on 2020 data).
    * `final_animedataset.csv.gz`
        * Note, this file needs to be compressed with gzip and uploaded by itself
        > This dataset contains user ratings and information about various anime titles. It is curated for building an anime recommendation system(Based on 2018 data).
    * `user_filtered.csv.gz`
        > This dataset contains the user's ratings for every anime they watched and rated(Based on 2020 data).
    * `users_details_2023.csv.gz`
        > This dataset comprises information on 731,290 users registered on the MyAnimeList platform. It is worth noting that while a significant portion of these users are genuine anime enthusiasts, there may be instances of bots, inactive accounts, and alternate profiles present within the dataset.
    * `users_score_2023.csv.gz`
        > This dataset comprises anime scores provided by 270,033 users, resulting in a total of 24,325,191 rows or samples.
* Justin Huang's Anime IMDB scrape
    * `anime_omdb_data.csv.gz`
        > This dataset is an extraction of the top 800ish animes using IMDB filters, which is superior due to additional annotations that don't exist in the non-commerical IMDB dataset.

# Import data

Check status at http://localhost:4040/

In [1]:
import os
memory = '2g'
cores = '4'
pyspark_submit_args = ' --driver-memory ' + memory + ' --executor-memory ' + memory + ' --executor-cores ' + cores + ' pyspark-shell'
# pyspark_submit_args = ' --driver-memory ' + memory + ' --executor-memory ' + memory + ' --executor-cores ' + cores + ' --bindAddress  127.0.0.1 pyspark-shell'
os.environ["SPARK_LOCAL_IP"]="127.0.0.1"
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

In [2]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql import DataFrame
import os
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.types import StringType
from pyspark.sql.functions import regexp_replace, col
from pyspark.sql.functions import lit
from pyspark.sql.functions import col, collect_list, concat_ws
from pyspark.sql.types import StructType, StructField, StringType

In [3]:
spark = SparkSession.builder \
    .appName("project") \
    .master("local[*]") \
    .config("spark.driver.bindAddress", "127.0.0.1")\
    .config("spark.sql.pivotMaxValues", "99999")\
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/13 23:17:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
#files = dbutils.fs.ls("dbfs:/FileStore/tables/")
files = os.listdir("./data")
for file in files:
    print(file)

.DS_Store
kaggle
spark-export
imdb-scrape
imdb
export


In [5]:
def add_prefix_to_columns(df: DataFrame, prefix: str) -> DataFrame:
    return df.select([col(c).alias(prefix + "_" + c) for c in df.columns])

## Import Kaggle

In [6]:
schema = StructType([
    StructField("anime_id", StringType()),
    StructField("Name", StringType()),
    StructField("English name", StringType()),
    StructField("Other name", StringType()),
    StructField("Score", StringType()),
    StructField("Genres", StringType()),
    StructField("Synopsis", StringType()),
    StructField("Type", StringType()),
    StructField("Episodes", StringType()),
    StructField("Aired", StringType()),
    StructField("Premiered", StringType()),
    StructField("Status", StringType()),
    StructField("Producers", StringType()),
    StructField("Licensors", StringType()),
    StructField("Studios", StringType()),
    StructField("Source", StringType()),
    StructField("Duration", StringType()),
    StructField("Rating", StringType()),
    StructField("Rank", StringType()),
    StructField("Popularity", StringType()),
    StructField("Favorites", StringType()),
    StructField("Scored By", StringType()),
    StructField("Members", StringType()),
    StructField("Image URL", StringType())
])

anime_dataset = spark.read.option("header", "true") \
    .option("inferSchema", "false") \
    .option("escape", "\"") \
    .schema(schema) \
    .csv("data/kaggle/anime-dataset-2023.csv.gz")

# anime_dataset = spark.read.option("header", "true") \
#     .option("inferSchema", "true") \
#     .option("multiLine", "true") \
#     .csv("data/kaggle/anime-dataset-2023.csv.gz")
    #.csv("dbfs:/FileStore/tables/anime_dataset_2023_csv.gz")

prefix = "anime"
anime_dataset = add_prefix_to_columns(anime_dataset, prefix)

# anime_filtered = spark.read.option("header", "true") \
#     .option("inferSchema", "true") \
#     .option("multiLine", "true") \
#     .csv("data/kaggle/anime-filtered.csv.gz")
#     # .csv("dbfs:/FileStore/tables/anime_filtered_csv.gz")

# prefix = "anime_filtered"
# anime_filtered = add_prefix_to_columns(anime_filtered, prefix)

# final_animedataset = spark.read.option("header", "true") \
#     .option("inferSchema", "true") \
#     .option("multiLine", "true") \
#     .csv("data/kaggle/final-animedataset.csv.gz")
#     # .csv("dbfs:/FileStore/tables/final_animedataset_csv.gz")

# prefix = "final_animedataset"
# final_animedataset = add_prefix_to_columns(final_animedataset, prefix)

# user_filtered = spark.read.option("header", "true") \
#     .option("inferSchema", "true") \
#     .option("multiLine", "true") \
#     .csv("data/kaggle/user-filtered.csv.gz")
#     # .csv("dbfs:/FileStore/tables/user_filtered_csv.gz")

# prefix = "user_filtered"
# user_filtered = add_prefix_to_columns(user_filtered, prefix)

# users_details_2023 = spark.read.option("header", "true") \
#     .option("inferSchema", "true") \
#     .option("multiLine", "true") \
#     .csv("data/kaggle/users-details-2023.csv.gz")
#     # .csv("dbfs:/FileStore/tables/users_details_2023_csv.gz")

# prefix = "users_details_2023"
# users_details_2023 = add_prefix_to_columns(users_details_2023, prefix)

# users_score_2023 = spark.read.option("header", "true") \
#     .option("inferSchema", "true") \
#     .option("multiLine", "true") \
#     .csv("data/kaggle/users-score-2023.csv.gz")
#     # .csv("dbfs:/FileStore/tables/users_score_2023_csv.gz")

# prefix = "users_score_2023"
# users_score_2023 = add_prefix_to_columns(users_score_2023, prefix)

# anime_dataset.csv.gz
# anime_filtered.csv.gz
# final_animedataset.csv.gz
# user_filtered.csv.gz
# users_details_2023.csv.gz
# users_score_2023.csv.gz

## Import IMDB Non-Commerical

In [7]:
imdb_title = spark.read.option("header", "true")\
    .option("delimiter", "\t")\
    .option("inferSchema", "true")\
    .csv("data/imdb/title.akas.tsv.gz")

prefix = "imdb_title"
imdb_title = add_prefix_to_columns(imdb_title, prefix)

# imdb_ratings = spark.read.option("header", "true")\
#     .option("delimiter", "\t")\
#     .option("inferSchema", "true")\
#     .csv("data/imdb//title.ratings.tsv.gz")

# prefix = "imdb_ratings"
# imdb_ratings = add_prefix_to_columns(imdb_ratings, prefix)

# imdb_principals = spark.read.option("header", "true")\
#     .option("delimiter", "\t")\
#     .option("inferSchema", "true")\
#     .csv("data/imdb/title.principals.tsv.gz")

# prefix = "imdb_principals"
# imdb_principals = add_prefix_to_columns(imdb_principals, prefix)

# imdb_episode = spark.read.option("header", "true")\
#     .option("delimiter", "\t")\
#     .option("inferSchema", "true")\
#     .csv("data/imdb/title.episode.tsv.gz")

# prefix = "imdb_episode"
# imdb_episode = add_prefix_to_columns(imdb_episode, prefix)

# imdb_crew = spark.read.option("header", "true")\
#     .option("delimiter", "\t")\
#     .option("inferSchema", "true")\
#     .csv("data/imdb/title.crew.tsv.gz")

# prefix = "imdb_crew"
# imdb_crew = add_prefix_to_columns(imdb_crew, prefix)

# imdb_title_basics = spark.read.option("header", "true")\
#     .option("delimiter", "\t")\
#     .option("inferSchema", "true")\
#     .csv("data/imdb/title.basics.tsv.gz")

# prefix = "imdb_title_basics"
# imdb_title_basics = add_prefix_to_columns(imdb_title_basics, prefix)

# imdb_name_basics = spark.read.option("header", "true")\
#     .option("delimiter", "\t")\
#     .option("inferSchema", "true")\
#     .csv("data/imdb/name.basics.tsv.gz")

# prefix = "imdb_name_basics"
# imdb_name_basics = add_prefix_to_columns(imdb_name_basics, prefix)



24/03/13 23:17:34 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

## Import IMDB Scrape from Justin Huang

In [8]:
imdb_scrape = spark.read.option("header", "true") \
    .option("inferSchema", "true") \
    .csv("data/imdb-scrape/imdb_scraped_datav2.csv.gz")
    #.csv("dbfs:/FileStore/tables/imdb_scraped_datav2_csv.gz")

prefix = "imdb_scrape"
imdb_scrape = add_prefix_to_columns(imdb_scrape, prefix)

# Inspect data

## IMDB Data

In [9]:
imdb_title.printSchema()
x = imdb_title.count()
print(x)
imdb_title.show(n=5)

root
 |-- imdb_title_titleId: string (nullable = true)
 |-- imdb_title_ordering: integer (nullable = true)
 |-- imdb_title_title: string (nullable = true)
 |-- imdb_title_region: string (nullable = true)
 |-- imdb_title_language: string (nullable = true)
 |-- imdb_title_types: string (nullable = true)
 |-- imdb_title_attributes: string (nullable = true)
 |-- imdb_title_isOriginalTitle: string (nullable = true)



[Stage 4:>                                                          (0 + 1) / 1]

38853125
+------------------+-------------------+--------------------+-----------------+-------------------+----------------+---------------------+--------------------------+
|imdb_title_titleId|imdb_title_ordering|    imdb_title_title|imdb_title_region|imdb_title_language|imdb_title_types|imdb_title_attributes|imdb_title_isOriginalTitle|
+------------------+-------------------+--------------------+-----------------+-------------------+----------------+---------------------+--------------------------+
|         tt0000001|                  1|          Карменсіта|               UA|                 \N|     imdbDisplay|                   \N|                         0|
|         tt0000001|                  2|          Carmencita|               DE|                 \N|              \N|        literal title|                         0|
|         tt0000001|                  3|Carmencita - span...|               HU|                 \N|     imdbDisplay|                   \N|                       

                                                                                

In [10]:
# imdb_title.filter(F.lower(F.col("title")) == 'cowboy bebop').show()

In [11]:
# imdb_title.filter(F.col("titleId") == "tt0213338").show(n=100)

In [12]:
# imdb_title.filter((F.col("titleId") == "tt0213338") & (F.col("region") == "US")).show()

In [13]:
# imdb_title.filter((F.col("titleId") == "tt0213338") & (F.col("region") == "JP")).show()

In [14]:
# imdb_title_basics.filter(F.col("tconst") == "tt0213338").show(truncate=False)

In [15]:
# imdb_ratings.filter(F.col("tconst") == "tt0213338").show(truncate=False)

In [16]:
# imdb_episode.filter(F.col("parentTconst") == "tt0213338").show(truncate=False)

In [17]:
# imdb_name_basics.printSchema()
# x = imdb_name_basics.count()
# print(x)
# imdb_name_basics.show(n=5)

In [18]:
# imdb_crew.printSchema()
# x = imdb_crew.count()
# print(x)
# imdb_crew.show(n=5)

In [19]:
# imdb_episode.printSchema()
# x = imdb_episode.count()
# print(x)
# imdb_episode.show(n=5)

In [20]:
# imdb_principals.printSchema()
# x = imdb_principals.count()
# print(x)
# imdb_principals.show(n=5)

## Kaggle Anime dataset

In [21]:
x = anime_dataset.count()
print(x)
anime_dataset.printSchema()
anime_dataset.show(n=5, truncate=False)
anime_dataset.filter(F.col('anime_id') == '1').collect()

40779
root
 |-- anime_anime_id: string (nullable = true)
 |-- anime_Name: string (nullable = true)
 |-- anime_English name: string (nullable = true)
 |-- anime_Other name: string (nullable = true)
 |-- anime_Score: string (nullable = true)
 |-- anime_Genres: string (nullable = true)
 |-- anime_Synopsis: string (nullable = true)
 |-- anime_Type: string (nullable = true)
 |-- anime_Episodes: string (nullable = true)
 |-- anime_Aired: string (nullable = true)
 |-- anime_Premiered: string (nullable = true)
 |-- anime_Status: string (nullable = true)
 |-- anime_Producers: string (nullable = true)
 |-- anime_Licensors: string (nullable = true)
 |-- anime_Studios: string (nullable = true)
 |-- anime_Source: string (nullable = true)
 |-- anime_Duration: string (nullable = true)
 |-- anime_Rating: string (nullable = true)
 |-- anime_Rank: string (nullable = true)
 |-- anime_Popularity: string (nullable = true)
 |-- anime_Favorites: string (nullable = true)
 |-- anime_Scored By: string (nullable

[Row(anime_anime_id='1', anime_Name='Cowboy Bebop', anime_English name='Cowboy Bebop', anime_Other name='カウボーイビバップ', anime_Score='8.75', anime_Genres='Action, Award Winning, Sci-Fi', anime_Synopsis='Crime is timeless. By the year 2071, humanity has expanded across the galaxy, filling the surface of other planets with settlements like those on Earth. These new societies are plagued by murder, drug use, and theft, and intergalactic outlaws are hunted by a growing number of tough bounty hunters.', anime_Type=None, anime_Episodes=None, anime_Aired=None, anime_Premiered=None, anime_Status=None, anime_Producers=None, anime_Licensors=None, anime_Studios=None, anime_Source=None, anime_Duration=None, anime_Rating=None, anime_Rank=None, anime_Popularity=None, anime_Favorites=None, anime_Scored By=None, anime_Members=None, anime_Image URL=None),
 Row(anime_anime_id='1', anime_Name='200 years after this epic battle', anime_English name=" Enno's descendant", anime_Other name=' Chiaki', anime_Score=

In [22]:
# find anime title with title Ace Attorney

# anime_dataset.filter(F.lower(F.col("anime_Other name")) == 'Ace Attorney').show()
# anime_dataset.filter(F.lower(F.col("anime_anime_id")) == '31630').show()

# 31630,"Gyakuten Saiban: Sono ""Shinjitsu"", Igi Ari!",Ace Attorney,逆転裁判 ～その「真実」、異議あり！～,

In [23]:
# anime_filtered.printSchema()
# x = anime_filtered.count()
# print(x)
# anime_filtered.show(n=5)
# anime_filtered.filter(F.col('anime_id') == 1).collect()

In [24]:
# final_animedataset.printSchema()
# x = final_animedataset.count()
# print(x)
# final_animedataset.show(n=5)
# final_animedataset.filter(F.col('anime_id') == 1)

In [25]:
# user_filtered.printSchema()
# x = user_filtered.count()
# print(x)
# user_filtered.show(n=5)

In [26]:
# users_details_2023.printSchema()
# x = users_details_2023.count()
# print(x)
# users_details_2023.show(n=5)

In [27]:
# users_score_2023.printSchema()
# x = users_score_2023.count()
# print(x)
# users_score_2023.show(n=5)

## IMDB Scrape

In [28]:
imdb_scrape.printSchema()
x = imdb_scrape.count()
print(x)
imdb_scrape.show(n=5)

root
 |-- imdb_scrape__c0: string (nullable = true)
 |-- imdb_scrape_title: string (nullable = true)
 |-- imdb_scrape_description: string (nullable = true)
 |-- imdb_scrape_url: string (nullable = true)
 |-- imdb_scrape_image: string (nullable = true)
 |-- imdb_scrape_rating_value: string (nullable = true)
 |-- imdb_scrape_genre: string (nullable = true)
 |-- imdb_scrape_content_rating: string (nullable = true)
 |-- imdb_scrape_creator: string (nullable = true)
 |-- imdb_scrape_main_cast: string (nullable = true)
 |-- imdb_scrape_keywords: string (nullable = true)
 |-- imdb_scrape_duration: string (nullable = true)
 |-- imdb_scrape_soundtrack_trackname: string (nullable = true)
 |-- imdb_scrape_lyrics: string (nullable = true)
 |-- imdb_scrape_music: string (nullable = true)
 |-- imdb_scrape_arrangement: string (nullable = true)
 |-- imdb_scrape_performed_by: string (nullable = true)
 |-- imdb_scrape_imdb_id: string (nullable = true)

53786
+---------------+--------------------+-------

24/03/13 23:17:58 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , title, description, url, image, rating_value, genre, content_rating, creator, main_cast, keywords, duration, soundtrack_trackname, lyrics, music, arrangement, performed_by, imdb_id
 Schema: _c0, title, description, url, image, rating_value, genre, content_rating, creator, main_cast, keywords, duration, soundtrack_trackname, lyrics, music, arrangement, performed_by, imdb_id
Expected: _c0 but found: 
CSV file: file:///Users/jonathanlin/Documents/cse6242/project/data/imdb-scrape/imdb_scraped_datav2.csv.gz


# Clean data

## IMDB Scrape data

In [29]:
imdb_scrape.printSchema()
imdb_scrape_row_count = imdb_scrape.count()
print(imdb_scrape_row_count)

root
 |-- imdb_scrape__c0: string (nullable = true)
 |-- imdb_scrape_title: string (nullable = true)
 |-- imdb_scrape_description: string (nullable = true)
 |-- imdb_scrape_url: string (nullable = true)
 |-- imdb_scrape_image: string (nullable = true)
 |-- imdb_scrape_rating_value: string (nullable = true)
 |-- imdb_scrape_genre: string (nullable = true)
 |-- imdb_scrape_content_rating: string (nullable = true)
 |-- imdb_scrape_creator: string (nullable = true)
 |-- imdb_scrape_main_cast: string (nullable = true)
 |-- imdb_scrape_keywords: string (nullable = true)
 |-- imdb_scrape_duration: string (nullable = true)
 |-- imdb_scrape_soundtrack_trackname: string (nullable = true)
 |-- imdb_scrape_lyrics: string (nullable = true)
 |-- imdb_scrape_music: string (nullable = true)
 |-- imdb_scrape_arrangement: string (nullable = true)
 |-- imdb_scrape_performed_by: string (nullable = true)
 |-- imdb_scrape_imdb_id: string (nullable = true)

53786


In [30]:
imdb_scrape_unique = imdb_scrape.select("imdb_scrape_imdb_id", "imdb_scrape_title").distinct()
imdb_scrape_unique_row_count = imdb_scrape_unique.count()
print(imdb_scrape_unique_row_count)


18957


In [31]:
# imdb_scrape_music_unique_notnull = imdb_scrape_unique.filter(
#     col("imdb_scrape_soundtrack_trackname").isNotNull() |
#     col("imdb_scrape_lyrics").isNotNull() |
#     col("imdb_scrape_music").isNotNull() |
#     col("imdb_scrape_arrangement").isNotNull() |
#     col("imdb_scrape_performed_by").isNotNull())

# imdb_scrape_music_unique_notnull.show(n = 10)

# imdb_scrape_music_unique_notnull_row_count = imdb_scrape_music_unique_notnull.count()
# print(imdb_scrape_music_unique_notnull_row_count)

## V3 - Iterative Data Matching

First start with higher confidence joins. With items that are not joinable, join on Japanese Language

In [32]:
# imdb_scrape gives is the in-scope animes. We need to join it to all possible names in imdb_title. This is imdb_title_inscope.

# Then we take all the possibilies in anime_dataset and and left join it

v3_imdb_titles = imdb_title.join(imdb_scrape_unique, imdb_title["imdb_title_titleId"] == imdb_scrape["imdb_scrape_imdb_id"], "inner")\
    .drop(imdb_scrape["imdb_scrape_title"])

priorityExpr = F.when(F.col("imdb_title_region") == "US", 1)\
    .when((F.col("imdb_title_region") == "JP") & (F.col("imdb_title_language") == "ja"), 2)\
    .when(F.col("imdb_title_types") == "original", 3)\
    .otherwise(4)

v3_imdb_titles_priority = v3_imdb_titles\
    .withColumn("priority", priorityExpr)\
    .orderBy("priority", "imdb_title_ordering")

v3_imdb_titles_priority.show()

[Stage 29:>                                                         (0 + 1) / 1]

+------------------+-------------------+--------------------+-----------------+-------------------+----------------+---------------------+--------------------------+-------------------+--------+
|imdb_title_titleId|imdb_title_ordering|    imdb_title_title|imdb_title_region|imdb_title_language|imdb_title_types|imdb_title_attributes|imdb_title_isOriginalTitle|imdb_scrape_imdb_id|priority|
+------------------+-------------------+--------------------+-----------------+-------------------+----------------+---------------------+--------------------------+-------------------+--------+
|         tt0101137|                  1|          Luna Varga|               US|                 \N|     imdbDisplay|                   \N|                         0|          tt0101137|       1|
|         tt0170180|                  1|Lupin III: Dead o...|               US|                 \N|     imdbDisplay|                   \N|                         0|          tt0170180|       1|
|         tt0103179|     

                                                                                

In [33]:
v3_imdb_titles_priority.filter(F.col("imdb_title_titleId") == "tt0213338").show(n=100)

[Stage 33:>                                                         (0 + 1) / 1]

+------------------+-------------------+--------------------+-----------------+-------------------+----------------+---------------------+--------------------------+-------------------+--------+
|imdb_title_titleId|imdb_title_ordering|    imdb_title_title|imdb_title_region|imdb_title_language|imdb_title_types|imdb_title_attributes|imdb_title_isOriginalTitle|imdb_scrape_imdb_id|priority|
+------------------+-------------------+--------------------+-----------------+-------------------+----------------+---------------------+--------------------------+-------------------+--------+
|         tt0213338|                 21|        Cowboy Bebop|               US|                 \N|     imdbDisplay|                   \N|                         0|          tt0213338|       1|
|         tt0213338|                 18|  カウボーイビバップ|               JP|                 ja|     imdbDisplay|                   \N|                         0|          tt0213338|       2|
|         tt0213338|              

                                                                                

In [34]:
v3_join1 = anime_dataset.join(v3_imdb_titles_priority, anime_dataset["anime_Name"] == v3_imdb_titles_priority["imdb_title_title"], "inner")
v3_antijoin1 = anime_dataset.join(v3_imdb_titles_priority, anime_dataset["anime_Name"] == v3_imdb_titles_priority["imdb_title_title"], "left_anti")
v3_join2 = v3_antijoin1.join(v3_imdb_titles_priority, v3_antijoin1["anime_Other name"] == v3_imdb_titles_priority["imdb_title_title"], "inner")
v3_antijoin2 = v3_antijoin1.join(v3_imdb_titles_priority, v3_antijoin1["anime_Other name"] == v3_imdb_titles_priority["imdb_title_title"], "left_anti")
v3_join3 = v3_antijoin2.join(v3_imdb_titles_priority, v3_antijoin2["anime_English name"] == v3_imdb_titles_priority["imdb_title_title"], "inner")

v3_imdb_found_in_animedataset = v3_join1.union(v3_join2).union(v3_join3)


In [35]:
x = v3_imdb_found_in_animedataset.count()
print(x)

[Stage 47:>                                                         (0 + 3) / 3]

16957


                                                                                

In [36]:
windowSpec = Window.partitionBy("anime_Name").orderBy("priority")  
v3_imdb_found_in_animedataset_ranked = v3_imdb_found_in_animedataset.withColumn("rank", F.row_number().over(windowSpec))

v3_imdb_found_in_animedataset_ranked_final = v3_imdb_found_in_animedataset_ranked.filter(F.col("rank") == 1).drop("rank")

v3_imdb_found_in_animedataset_ranked_final.show()


24/03/13 23:19:55 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+--------------+--------------------+--------------------+-----------------------------+-----------+--------------------+--------------------+----------+--------------+--------------------+---------------+---------------+--------------------+--------------------+-------------+------------+--------------+--------------------+----------+----------------+---------------+---------------+-------------+--------------------+------------------+-------------------+-----------------------------+-----------------+-------------------+----------------+---------------------+--------------------------+-------------------+--------+
|anime_anime_id|          anime_Name|  anime_English name|             anime_Other name|anime_Score|        anime_Genres|      anime_Synopsis|anime_Type|anime_Episodes|         anime_Aired|anime_Premiered|   anime_Status|     anime_Producers|     anime_Licensors|anime_Studios|anime_Source|anime_Duration|        anime_Rating|anime_Rank|anime_Popularity|anime_Favorites|anime_

                                                                                

In [37]:
x = v3_imdb_found_in_animedataset_ranked_final.count()
print(x)



5256


                                                                                

In [38]:
v3_imdb_found_in_animedataset_ranked_final.filter(F.col("imdb_title_titleId") == "tt0213338").show()

[Stage 106:>                                                        (0 + 3) / 3]

+--------------+------------+------------------+------------------+-----------+--------------------+--------------------+----------+--------------+-----------+---------------+------------+---------------+---------------+-------------+------------+--------------+------------+----------+----------------+---------------+---------------+-------------+---------------+------------------+-------------------+----------------+-----------------+-------------------+----------------+---------------------+--------------------------+-------------------+--------+
|anime_anime_id|  anime_Name|anime_English name|  anime_Other name|anime_Score|        anime_Genres|      anime_Synopsis|anime_Type|anime_Episodes|anime_Aired|anime_Premiered|anime_Status|anime_Producers|anime_Licensors|anime_Studios|anime_Source|anime_Duration|anime_Rating|anime_Rank|anime_Popularity|anime_Favorites|anime_Scored By|anime_Members|anime_Image URL|imdb_title_titleId|imdb_title_ordering|imdb_title_title|imdb_title_region|imdb_t

                                                                                

In [39]:
v3_imdb_titles.unpersist()
v3_imdb_titles_priority.unpersist()
v3_join1.unpersist()
v3_antijoin1.unpersist()
v3_join2.unpersist()
v3_antijoin2.unpersist()
v3_join3.unpersist()
v3_imdb_found_in_animedataset.unpersist()

DataFrame[anime_anime_id: string, anime_Name: string, anime_English name: string, anime_Other name: string, anime_Score: string, anime_Genres: string, anime_Synopsis: string, anime_Type: string, anime_Episodes: string, anime_Aired: string, anime_Premiered: string, anime_Status: string, anime_Producers: string, anime_Licensors: string, anime_Studios: string, anime_Source: string, anime_Duration: string, anime_Rating: string, anime_Rank: string, anime_Popularity: string, anime_Favorites: string, anime_Scored By: string, anime_Members: string, anime_Image URL: string, imdb_title_titleId: string, imdb_title_ordering: int, imdb_title_title: string, imdb_title_region: string, imdb_title_language: string, imdb_title_types: string, imdb_title_attributes: string, imdb_title_isOriginalTitle: string, imdb_scrape_imdb_id: string, priority: int]

# Export

In [40]:
final_df = v3_imdb_found_in_animedataset_ranked_final

def remove_newlines(df):
    for column_name in df.columns:
        # Check if the column type is string
        if isinstance(df.schema[column_name].dataType, StringType):
            df = df.withColumn(column_name, regexp_replace(col(column_name), "\r|\n", " "))
    return df

final_final_df = remove_newlines(final_df)

# remove buggy row tt12853970

final_final_df = final_final_df.filter(F.col("imdb_title_titleId") != "tt12853970")

In [41]:
final_final_df.printSchema()

root
 |-- anime_anime_id: string (nullable = true)
 |-- anime_Name: string (nullable = true)
 |-- anime_English name: string (nullable = true)
 |-- anime_Other name: string (nullable = true)
 |-- anime_Score: string (nullable = true)
 |-- anime_Genres: string (nullable = true)
 |-- anime_Synopsis: string (nullable = true)
 |-- anime_Type: string (nullable = true)
 |-- anime_Episodes: string (nullable = true)
 |-- anime_Aired: string (nullable = true)
 |-- anime_Premiered: string (nullable = true)
 |-- anime_Status: string (nullable = true)
 |-- anime_Producers: string (nullable = true)
 |-- anime_Licensors: string (nullable = true)
 |-- anime_Studios: string (nullable = true)
 |-- anime_Source: string (nullable = true)
 |-- anime_Duration: string (nullable = true)
 |-- anime_Rating: string (nullable = true)
 |-- anime_Rank: string (nullable = true)
 |-- anime_Popularity: string (nullable = true)
 |-- anime_Favorites: string (nullable = true)
 |-- anime_Scored By: string (nullable = tru

In [42]:
imdb_columns = sorted([col for col in final_final_df.columns if col.startswith('imdb_')])
anime_columns = sorted([col for col in final_final_df.columns if col.startswith('anime_')])

selected_columns = imdb_columns + anime_columns

final_final_final_df = final_final_df.select(*selected_columns)

In [43]:
(final_final_final_df\
  .coalesce(1)
  .write\
  .mode('overwrite')\
  .option("header", "true")\
  .option("quoteAll", "true")\
  .option("delimiter", "|")\
  .option("nullValue", "null")\
  .csv("data/export/imdb_imdbscrape_kaggle_master.csv"))

                                                                                

In [44]:
spark.stop()