In [186]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StringType, FloatType, StructType

In [69]:
spark = SparkSession.builder.getOrCreate()

# Reading data from Animelist.csv

In [110]:
data_schema = [StructField('id', StringType(), True),
 StructField('workId', StringType(), True),
 StructField('url', StringType(), True),
 StructField('jpName', StringType(), True),
 StructField('engName', StringType(), True),
 StructField('synonymsName', StringType(), True),
 StructField('workType', StringType(), True),
 StructField('episodes', StringType(), True),
 StructField('status', StringType(), True),
 StructField('aired', StringType(), True),
 StructField('premiered', StringType(), True),
 StructField('producer', StringType(), True),
 StructField('broadcast', StringType(), True),
 StructField('licensors', StringType(), True),
 StructField('studios', StringType(), True),
 StructField('genres', StringType(), True),
 StructField('themes', StringType(), True),
 StructField('demographic', StringType(), True),
 StructField('source', StringType(), True),
 StructField('duration', StringType(), True),
 StructField('rating', StringType(), True),
 StructField('score', FloatType(), True),
 StructField('allRank', StringType(), True),
 StructField('popularityRank', StringType(), True),
 StructField('members', StringType(), True),
 StructField('favorites', StringType(), True),
 StructField('scoredByUser', FloatType(), True),
 StructField('lastUpdate', StringType(), True)]

final_struct = StructType(fields=data_schema)
anime_list_df = spark.read.option("header",True).csv('data/anime_list.csv', schema=final_struct)

In [71]:
anime_list_df.show()

+---+------+--------------------+---------------------------------------+--------------------+--------------------+--------+--------+----------------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+------------+---------------+--------------------+-----+-------+--------------+---------+---------+------------+-------------------+
| id|workId|                 url|                                 jpName|             engName|        synonymsName|workType|episodes|          status|               aired|  premiered|            producer|           broadcast|           licensors|             studios|              genres|              themes|demographic|      source|       duration|              rating|score|allRank|popularityRank|  members|favorites|scoredByUser|         lastUpdate|
+---+------+--------------------+---------------------------------------+-------------------

In [73]:
anime_list_df.createOrReplaceTempView('animelist_table')

In [111]:
anime_list_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- workId: string (nullable = true)
 |-- url: string (nullable = true)
 |-- jpName: string (nullable = true)
 |-- engName: string (nullable = true)
 |-- synonymsName: string (nullable = true)
 |-- workType: string (nullable = true)
 |-- episodes: string (nullable = true)
 |-- status: string (nullable = true)
 |-- aired: string (nullable = true)
 |-- premiered: string (nullable = true)
 |-- producer: string (nullable = true)
 |-- broadcast: string (nullable = true)
 |-- licensors: string (nullable = true)
 |-- studios: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- themes: string (nullable = true)
 |-- demographic: string (nullable = true)
 |-- source: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- score: float (nullable = true)
 |-- allRank: string (nullable = true)
 |-- popularityRank: string (nullable = true)
 |-- members: string (nullable = true)
 |-- favorites:

# Query a list of anime with action genre

Steps:
 1) Sort the anime table in descending order
 2) Query only the distinct title (some duplicate title exist) and corresponding genre and overall score
 3) Filter only with Action genre and Manga as source
 4) Clean data by filtering score within (0 - 10) range

In [166]:
anime_list = spark.sql("SELECT DISTINCT engName, workId, genres, score \
                       FROM animelist_table \
                       WHERE (score >= 0 AND score <= 10) \
                       AND genres LIKE 'Action%' \
                       AND source LIKE '%Manga%' \
                       ORDER BY score DESC")
anime_list.show()

+--------------------+------+--------------------+-----+
|             engName|workId|              genres|score|
+--------------------+------+--------------------+-----+
|Fullmetal Alchemi...|  5114|Action, Adventure...|  9.1|
|Bleach: Thousand-...| 41467|Action, Adventure...| 9.07|
|Attack on Titan S...| 38524|       Action, Drama| 9.06|
|Attack on Titan: ...| 51535|Action, Drama, Su...| 9.06|
|    Gintama Season 4| 28977|Action, Comedy, S...| 9.06|
|Gintama: The Very...| 39486|Action, Comedy, D...| 9.04|
|     Hunter x Hunter| 11061|Action, Adventure...| 9.04|
|    Gintama Season 2|  9969|Action, Comedy, S...| 9.04|
|  Gintama: Enchousen| 15417|Action, Comedy, S...| 9.03|
|    Gintama Season 5| 34096|Action, Comedy, S...| 8.98|
|             Gintama|   918|Action, Comedy, S...| 8.94|
|Gintama: The Movi...| 15335|Action, Comedy, S...| 8.91|
|Gintama.: Silver ...| 37491|Action, Comedy, S...| 8.88|
|   Kingdom: Season 3| 40682|              Action| 8.81|
|Gintama.: Silver ...| 36838|Ac

In [167]:
anime_list.createOrReplaceTempView('anime_list')

# Reading data from Reviewlist.csv

In [144]:
data_schema = [StructField('id',StringType(), True),
 StructField('workId',StringType(), True),
 StructField('url',StringType(), True),
 StructField('workName',StringType(), True),
 StructField('postTime',StringType(), True),
 StructField('episodesSeen',StringType(), True),
 StructField('overallRating',StringType(), True),
 StructField('author',StringType(), True),
 StructField('reviewId',StringType(), True),
 StructField('review',StringType(), True),
 StructField('recommendationStatus', StringType(), True),
 StructField('nice',StringType(), True),
 StructField('loveIt',StringType(), True),
 StructField('funny',StringType(), True),
 StructField('confusing',StringType(), True),
 StructField('informative',StringType(), True),
 StructField('wellWritten',StringType(), True),
 StructField('creative', StringType(), True)]

final_struct = StructType(fields=data_schema)
reviews_df = spark.read.option("header",True).csv('data/reviews.csv')

In [145]:
reviews_df.columns

['id',
 'workId',
 'url',
 'workName',
 'postTime',
 'episodesSeen',
 'overallRating',
 'author',
 'reviewId',
 'review',
 'recommendationStatus',
 'nice',
 'loveIt',
 'funny',
 'confusing',
 'informative',
 'wellWritten',
 'creative']

In [146]:
reviews_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- workId: string (nullable = true)
 |-- url: string (nullable = true)
 |-- workName: string (nullable = true)
 |-- postTime: string (nullable = true)
 |-- episodesSeen: string (nullable = true)
 |-- overallRating: string (nullable = true)
 |-- author: string (nullable = true)
 |-- reviewId: string (nullable = true)
 |-- review: string (nullable = true)
 |-- recommendationStatus: string (nullable = true)
 |-- nice: string (nullable = true)
 |-- loveIt: string (nullable = true)
 |-- funny: string (nullable = true)
 |-- confusing: string (nullable = true)
 |-- informative: string (nullable = true)
 |-- wellWritten: string (nullable = true)
 |-- creative: string (nullable = true)



In [147]:
reviews_df.show()

+--------------------+--------------------+--------------------+--------------------+-----------+------------+-------------+------------+--------+--------------------+--------------------+----+------+-----+---------+-----------+-----------+--------+
|                  id|              workId|                 url|            workName|   postTime|episodesSeen|overallRating|      author|reviewId|              review|recommendationStatus|nice|loveIt|funny|confusing|informative|wellWritten|creative|
+--------------------+--------------------+--------------------+--------------------+-----------+------------+-------------+------------+--------+--------------------+--------------------+----+------+-----+---------+-----------+-----------+--------+
|              155870|                 196|https://myanimeli...|Onegai%E2%98%86Twins|Apr 1, 2008|        null|            7|    CapitalJ|    3533|The plot is prett...|                null|null|  null| null|     null|       null|       null|    null|


In [148]:
reviews_df.createOrReplaceTempView('review_list')

In [168]:
sql_results = spark.sql('SELECT * FROM anime_list')
sql_results.show()

+--------------------+------+--------------------+-----+
|             engName|workId|              genres|score|
+--------------------+------+--------------------+-----+
|Fullmetal Alchemi...|  5114|Action, Adventure...|  9.1|
|Bleach: Thousand-...| 41467|Action, Adventure...| 9.07|
|Attack on Titan S...| 38524|       Action, Drama| 9.06|
|Attack on Titan: ...| 51535|Action, Drama, Su...| 9.06|
|    Gintama Season 4| 28977|Action, Comedy, S...| 9.06|
|Gintama: The Very...| 39486|Action, Comedy, D...| 9.04|
|     Hunter x Hunter| 11061|Action, Adventure...| 9.04|
|    Gintama Season 2|  9969|Action, Comedy, S...| 9.04|
|  Gintama: Enchousen| 15417|Action, Comedy, S...| 9.03|
|    Gintama Season 5| 34096|Action, Comedy, S...| 8.98|
|             Gintama|   918|Action, Comedy, S...| 8.94|
|Gintama: The Movi...| 15335|Action, Comedy, S...| 8.91|
|Gintama.: Silver ...| 37491|Action, Comedy, S...| 8.88|
|   Kingdom: Season 3| 40682|              Action| 8.81|
|Gintama.: Silver ...| 36838|Ac

# Join anime table and review list for getting the author

In [176]:
anime_master = spark.sql("SELECT anime_list.engName As title, anime_list.genres As genre, \
                         review_list.author, score FROM anime_list\
                         JOIN review_list ON anime_list.workId = review_list.workId \
                         ORDER BY score DESC")
anime_master.show()



+--------------------+--------------------+----------------+-----+
|               title|               genre|          author|score|
+--------------------+--------------------+----------------+-----+
|Fullmetal Alchemi...|Action, Adventure...|           GMMPA|  9.1|
|Fullmetal Alchemi...|Action, Adventure...|       Torec0399|  9.1|
|Fullmetal Alchemi...|Action, Adventure...|     VashXTrigun|  9.1|
|Fullmetal Alchemi...|Action, Adventure...|Adventure2345517|  9.1|
|Fullmetal Alchemi...|Action, Adventure...|   crocmandiablo|  9.1|
|Fullmetal Alchemi...|Action, Adventure...|        MSochist|  9.1|
|Fullmetal Alchemi...|Action, Adventure...|         scribbs|  9.1|
|Fullmetal Alchemi...|Action, Adventure...|      syaoran555|  9.1|
|Fullmetal Alchemi...|Action, Adventure...|        har12430|  9.1|
|Fullmetal Alchemi...|Action, Adventure...|       not_inept|  9.1|
|Fullmetal Alchemi...|Action, Adventure...|          iNVE14|  9.1|
|Fullmetal Alchemi...|Action, Adventure...| Half-MagePrince|  

                                                                                

In [177]:
anime_master.createOrReplaceTempView('anime_master_table')

# final data clean up

In [184]:
sql_results = spark.sql('SELECT title, genre, COUNT(author) As no_of_authors, score \
                        FROM anime_master_table \
                        WHERE title IS NOT NULL \
                        GROUP BY title, genre, score ORDER BY score DESC LIMIT 10')
sql_results.show()



+--------------------+--------------------+-------------+-----+
|               title|               genre|no_of_authors|score|
+--------------------+--------------------+-------------+-----+
|Fullmetal Alchemi...|Action, Adventure...|          971|  9.1|
|Bleach: Thousand-...|Action, Adventure...|          113| 9.07|
|    Gintama Season 4|Action, Comedy, S...|           80| 9.06|
|Attack on Titan S...|       Action, Drama|          212| 9.06|
|Attack on Titan: ...|Action, Drama, Su...|           83| 9.06|
|     Hunter x Hunter|Action, Adventure...|          847| 9.04|
|Gintama: The Very...|Action, Comedy, D...|           75| 9.04|
|    Gintama Season 2|Action, Comedy, S...|           39| 9.04|
|  Gintama: Enchousen|Action, Comedy, S...|           25| 9.03|
|    Gintama Season 5|Action, Comedy, S...|           27| 8.98|
+--------------------+--------------------+-------------+-----+



                                                                                

In [185]:
spark.stop()