In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("BookGenomeAnalysis") \
    .getOrCreate()


In [7]:
tag_df = spark.read.json("book-genome\\book_dataset\\raw\\tag_count.json")
tag_df.show(50)


+-------+----+------+
|item_id| num|tag_id|
+-------+----+------+
|    115|  52|    13|
|    115| 180|    25|
|    115| 110|    38|
|    115|  42|    47|
|    115| 478|    52|
|    115|  94|   104|
|    115|  72|   129|
|    115| 607|   139|
|    115|1610|   141|
|    115|2429|   142|
|    115|3510|   151|
|    115| 123|   161|
|    115| 208|   212|
|    115|  46|   214|
|    115|  41|   225|
|    115|  40|   247|
|    115|2025|   259|
|    115|  43|   275|
|    115|  96|   326|
|    115| 163|   381|
|    115| 337|   383|
|    115| 143|   395|
|    115| 259|   435|
|    115| 113|   436|
|    115| 140|   481|
|    115| 270|   547|
|    115|  60|   571|
|    115| 380|   579|
|    115| 101|   657|
|    115|  46|   659|
|    115|2064|   672|
|    115| 131|   725|
|    387|  10|    23|
|    387|   7|    47|
|    387|  18|    75|
|    387|  84|    93|
|    387|   4|   114|
|    387|   5|   166|
|    387|  10|   186|
|    387|   4|   223|
|    387|   6|   230|
|    387|  15|   247|
|    387| 

In [9]:
# We find the most popular tag for each book.
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

window_spec = Window.partitionBy("item_id").orderBy(col("num").desc())

top_tags_df = tag_df.withColumn("rank", row_number().over(window_spec)).filter(col("rank") <= 5)
top_tags_df.show(50)


+-------+----+------+----+
|item_id| num|tag_id|rank|
+-------+----+------+----+
|    115|3510|   151|   1|
|    115|2429|   142|   2|
|    115|2064|   672|   3|
|    115|2025|   259|   4|
|    115|1610|   141|   5|
|    387| 529|   478|   1|
|    387| 146|   534|   2|
|    387|  84|    93|   3|
|    387|  71|   426|   4|
|    387|  67|   589|   5|
|    423|7034|   580|   1|
|    423|1060|   259|   2|
|    423| 330|   706|   3|
|    423| 222|   151|   4|
|    423| 202|   438|   5|
|    434| 797|   462|   1|
|    434| 671|   179|   2|
|    434| 629|   259|   3|
|    434| 299|   477|   4|
|    434| 235|   669|   5|
|    466| 862|   512|   1|
|    466| 289|   259|   2|
|    466| 288|   602|   3|
|    466| 257|   249|   4|
|    466| 138|   342|   5|
|    505|1663|   249|   1|
|    505| 772|   672|   2|
|    505| 267|   259|   3|
|    505| 241|   564|   4|
|    505| 215|   326|   5|
|    574|1413|   478|   1|
|    574| 824|   238|   2|
|    574| 153|   342|   3|
|    574|  99|   602|   4|
|

In [11]:
# We find the tags most frequently applied across all books
tag_counts = tag_df.groupBy("tag_id").sum("num").orderBy(col("sum(num)").desc())
tag_counts.show(50)


+------+--------+
|tag_id|sum(num)|
+------+--------+
|   672| 5627942|
|   249| 5355792|
|   259| 4929087|
|   151| 2631087|
|   580| 2158948|
|   564| 1996633|
|   207| 1324692|
|   478| 1266736|
|   462| 1223647|
|   166|  885600|
|   495|  779765|
|   142|  616277|
|   686|  541936|
|   481|  514698|
|   304|  513694|
|   141|  512189|
|    23|  497055|
|   694|  471663|
|   669|  448372|
|    25|  434346|
|   335|  427146|
|   410|  379831|
|   326|  357972|
|   136|  329706|
|   395|  301460|
|   342|  298289|
|   426|  290027|
|   465|  280647|
|   659|  268079|
|   327|  259846|
|   179|  242524|
|   159|  235094|
|   643|  232561|
|    38|  230741|
|   139|  228207|
|    13|  225679|
|    93|  207374|
|   547|  200120|
|   650|  168959|
|   435|  158783|
|   579|  158186|
|   230|  158090|
|   293|  156553|
|   214|  146530|
|   602|  137988|
|   629|  135673|
|   236|  121605|
|   383|  119780|
|   464|  118175|
|   397|  115633|
+------+--------+
only showing top 50 rows



In [13]:
#We find out the most popular books base on their tags.
tags_per_book = tag_df.groupBy("item_id").sum("num").withColumnRenamed("sum(num)", "total_tag_votes")
tags_per_book.orderBy(col("total_tag_votes").desc()).show(50)


+--------+---------------+
| item_id|total_tag_votes|
+--------+---------------+
| 4640799|         973596|
| 2792775|         764914|
| 8812783|         493724|
| 6231171|         400278|
| 2402163|         396207|
|13155899|         368561|
| 3275794|         327339|
|15524549|         299644|
| 2963218|         284836|
|41335427|         277272|
| 6171458|         271994|
|16827462|         260108|
| 3060926|         248843|
| 1708725|         224763|
|15524542|         222136|
|14863741|         219249|
|15545385|         195102|
|  878368|         182824|
| 3036731|         181205|
| 1970226|         180983|
|  153313|         179926|
| 6366642|         164380|
| 2422333|         162791|
| 2982101|         161923|
|  245494|         153033|
|14345371|         152760|
| 1272463|         150123|
|41107568|         148520|
| 3212258|         145218|
|13306276|         138813|
| 2207778|         137801|
|13355552|         131896|
| 2267189|         123783|
|14245059|         123017|
|

In [15]:
# Now Count how many unique tags each book has
from pyspark.sql.functions import countDistinct

tag_diversity = tag_df.groupBy("item_id").agg(countDistinct("tag_id").alias("unique_tags"))
tag_diversity.orderBy(col("unique_tags").desc()).show(50)


+--------+-----------+
| item_id|unique_tags|
+--------+-----------+
|26129516|         47|
|25370670|         46|
|25209900|         46|
|25589246|         46|
|42804162|         46|
| 3303888|         45|
|24803357|         45|
| 3389674|         44|
|   47950|         43|
|28166399|         43|
|45363962|         43|
|   69081|         43|
|41106601|         43|
| 1947012|         43|
| 2000351|         42|
|45620597|         42|
|45900469|         42|
|41941424|         42|
|43165888|         42|
|51383662|         42|
|19694996|         42|
|17441876|         42|
|24070983|         42|
|18166592|         42|
|13344769|         42|
| 1743336|         42|
| 6127168|         41|
|42421291|         41|
|21987573|         41|
|50652749|         41|
| 1995335|         41|
|26415493|         41|
| 6568189|         41|
|49343156|         41|
|17912072|         41|
|18285292|         41|
| 1782551|         41|
| 3271379|         40|
|21768943|         40|
|41912642|         40|
|42397963| 

In [18]:
tags_df = spark.read.json("book-genome/book_dataset/raw/tags.json")
tags_df.show(5, truncate=False)


+---+------------+
|id |tag         |
+---+------------+
|0  |18th century|
|1  |1920s       |
|2  |1930s       |
|3  |1950s       |
|4  |1960s       |
+---+------------+
only showing top 5 rows



AnalysisException: [UNRESOLVED_USING_COLUMN_FOR_JOIN] USING column `tag_id` cannot be resolved on the right side of the join. The right-side columns: [`id`, `tag`].