In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, regexp_replace, split, explode, trim,
    countDistinct, desc
)

spark = (
    SparkSession.builder
    .appName("AnimeGenres")
    .master("local[*]")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.sql.shuffle.partitions", "8")
    .getOrCreate()
)

PATH = r"C:\Users\Rodion\Documents\BigDataPy\lab3\data\final_anime_ratings.csv"

df = (
    spark.read
    .option("header", "true")
    .option("escape", '"')
    .option("quote", '"')
    .csv(PATH)
    .select("anime_id", "Genres")
)

df_genres = (
    df
    .where(col("Genres").isNotNull())
    .withColumn("Genres", regexp_replace("Genres", ";", ","))
    .withColumn("genre", explode(split(col("Genres"), ",")))
    .withColumn("genre", trim(col("genre")))
    .where(col("genre") != "")
)

result = (
    df_genres
    .groupBy("genre")
    .agg(countDistinct("anime_id").alias("anime_cnt"))
    .orderBy(desc("anime_cnt"))
)

result.show(20, truncate=False)

spark.stop()

+-------------+---------+
|genre        |anime_cnt|
+-------------+---------+
|Comedy       |5987     |
|Action       |3861     |
|Fantasy      |3258     |
|Adventure    |2944     |
|Drama        |2617     |
|Kids         |2616     |
|Sci-Fi       |2571     |
|Music        |2234     |
|Shounen      |2000     |
|Romance      |1897     |
|Slice of Life|1891     |
|School       |1639     |
|Supernatural |1472     |
|Hentai       |1409     |
|Historical   |1138     |
|Mecha        |1097     |
|Magic        |1079     |
|Seinen       |829      |
|Ecchi        |767      |
|Mystery      |723      |
+-------------+---------+
only showing top 20 rows
