In [1]:
# Goals:
#  - remove books with a blank title
#  - sort by the number of reviews as a way of gauging popularity
#  - normalize the ratings distribution field (maybe)
#  - create an "average rating" field
#  - create a truncated version of the dataset by taking the top 100k books (out of 1,521,962)
#  - use a python package to infer the language for rows where there is no language (polyglot / langdetect?)
#  - import the genres dataset and join it to the dataframe

# the 'books' data has: author, isbn, image, link to goodreads
# the 'works' data has: reviews / ratings

# since works has ratings count, we can sort by that and join in 'books'

In [2]:
import sys
import os
import itertools
from operator import add
from csv import reader
from itertools import chain
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import col, expr, udf, collect_list, struct, array, lit, array_max
from pyspark.sql.types import FloatType, StringType, ArrayType, IntegerType

#from polyglot.detect import Detector
from langdetect import detect

In [3]:
cf = SparkConf()
cf.set("spark.submit.deployMode","client")
sc = SparkContext.getOrCreate(cf)
from pyspark.sql import SparkSession
spark = SparkSession \
	    .builder \
	    .appName("Python Spark SQL basic example") \
	    .config("spark.some.config.option", "some-value") \
	    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/12 11:45:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
#path = "../goodreads_book_genres_initial.json"
path = "./Datasets/goodreads_book_works.json"
bookDF = spark.read.json(path)

                                                                                

In [5]:
#bookDF.printSchema()
#bookDF.show(10)
# best_book_id corresponds to the book ID in the reviews dataset
# the book ID in the genres dataset and the book ID in the books dataset
bookDF.filter(bookDF.best_book_id == 17310087).show()

[Stage 3:>                                                          (0 + 5) / 5]

+------------+-----------+--------------------------+---------------------------------+----------+--------------------+------------------------+--------------------------+-------------------------+--------------------+--------------------+-------------+-----------+-------------+------------------+--------+
|best_book_id|books_count|default_chaptering_book_id|default_description_language_code|media_type|original_language_id|original_publication_day|original_publication_month|original_publication_year|      original_title|         rating_dist|ratings_count|ratings_sum|reviews_count|text_reviews_count| work_id|
+------------+-----------+--------------------------+---------------------------------+----------+--------------------+------------------------+--------------------------+-------------------------+--------------------+--------------------+-------------+-----------+-------------+------------------+--------+
|    17310087|          1|                          |                       

                                                                                

In [6]:
## COUNTS
# TOTAL ROWS: 1,521,962 
# MEDIA TYPE BOOK: 1,102,395
# HAS ORIGINAL TITLE: 646,906
# CLEANED: 475,143

In [7]:
bookDF = bookDF.filter(col('original_title') != '').filter(col('media_type') == 'book')
#bookDF.count()

In [8]:
bookDF = bookDF.orderBy(bookDF.ratings_count.cast("int"), ascending=False).limit(2000)

In [9]:
bookDF.filter(col("original_title").like("%Potter%")).select('best_book_id',\
            'original_publication_year',\
            'original_title',\
            'rating_dist',\
            'ratings_count',\
            'reviews_count',\
            'work_id'
           ).show()



+------------+-------------------------+--------------------+--------------------+-------------+-------------+--------+
|best_book_id|original_publication_year|      original_title|         rating_dist|ratings_count|reviews_count| work_id|
+------------+-------------------------+--------------------+--------------------+-------------+-------------+--------+
|           3|                     1997|Harry Potter and ...|5:3131920|4:11907...|      4972886|      5801988| 4640799|
|           5|                     1999|Harry Potter and ...|5:1303937|4:51915...|      2019176|      2496720| 2402163|
|       15881|                     1998|Harry Potter and ...|5:1097387|4:56060...|      1955144|      2431183| 6231171|
|           6|                     2000|Harry Potter and ...|5:1227707|4:50391...|      1912948|      2375359| 3046572|
|      136251|                     2007|Harry Potter and ...|5:1351479|4:39082...|      1889600|      2383409| 2963218|
|           2|                     2003|

                                                                                

In [10]:
bookDF = bookDF.select('best_book_id',\
            'original_publication_day',\
            'original_publication_month',\
            'original_publication_year',\
            'original_title',\
            'rating_dist',\
            'ratings_count',\
            'ratings_sum',\
            'reviews_count',\
            'text_reviews_count',\
            'work_id'
           ).withColumn("avg_rating", col("ratings_sum") / col("ratings_count"))

In [11]:
# UDF to detect the language from the title
# none of the rows have an entry for `default_description_language_code`
# so there's no need to check that before filling
def get_language(title):
    try:
        inf_lang = detect(title)
        return inf_lang
    except Exception as e:
        return ''

lang_udf = udf(get_language, StringType())

In [12]:
bookDFLanguages = bookDF.withColumn('inferred_language_id', lang_udf(bookDF.original_title))

In [13]:
bookDFLanguages.count()

                                                                                

2000

In [14]:
# join in the genres information
pathG = "./Datasets/goodreads_book_genres_initial.json"
genreDF = spark.read.json(pathG)

                                                                                

In [15]:
genreDF.printSchema()
genreDF.show(5, truncate=False)

root
 |-- book_id: string (nullable = true)
 |-- genres: struct (nullable = true)
 |    |-- children: long (nullable = true)
 |    |-- comics, graphic: long (nullable = true)
 |    |-- fantasy, paranormal: long (nullable = true)
 |    |-- fiction: long (nullable = true)
 |    |-- history, historical fiction, biography: long (nullable = true)
 |    |-- mystery, thriller, crime: long (nullable = true)
 |    |-- non-fiction: long (nullable = true)
 |    |-- poetry: long (nullable = true)
 |    |-- romance: long (nullable = true)
 |    |-- young-adult: long (nullable = true)

+-------+---------------------------------------------------------+
|book_id|genres                                                   |
+-------+---------------------------------------------------------+
|5333265|{NULL, NULL, NULL, NULL, 1, NULL, NULL, NULL, NULL, NULL}|
|1333909|{NULL, NULL, NULL, 219, 5, NULL, NULL, NULL, NULL, NULL} |
|7327624|{NULL, NULL, 31, 8, NULL, 1, NULL, 1, NULL, NULL}        |
|6066819|{NUL

In [16]:
# flatten genres
expandedGenreDF = genreDF.select(
    col("book_id"),
    col("genres.children").alias("children"),
    col("genres.`comics, graphic`").alias("comics_graphic"),
    col("genres.`fantasy, paranormal`").alias("fantasy_paranormal"),
    col("genres.fiction").alias("fiction"),
    col("genres.`history, historical fiction, biography`").alias("history_biography"),
    col("genres.`mystery, thriller, crime`").alias("mystery_thriller_crime"),
    col("genres.`non-fiction`").alias("non_fiction"),
    col("genres.poetry").alias("poetry"),
    col("genres.romance").alias("romance"),
    col("genres.`young-adult`").alias("young_adult")
)

In [17]:
def filter_genres(genre_list):
    return list(filter(None, genre_list))

filter_genres_udf = udf(filter_genres, ArrayType(StringType()))

In [18]:
df_with_genres = expandedGenreDF.withColumn(
    "genres",
    expr(
        "array("
        + ", ".join(
            [
                f"CASE WHEN {col_name} IS NOT NULL THEN '{col_name}' END"
                for col_name in expandedGenreDF.columns[1:]
            ]
        )
        + ")"
    ),
).select("book_id", "genres")

filtered_genres = df_with_genres.withColumn('genres', filter_genres_udf(df_with_genres.genres))
#filtered_genres = filtered_genres.withColumn("genres", col("genres").cast("string"))
filtered_genres.show(truncate=False)

+--------+------------------------------------------------------------------------+
|book_id |genres                                                                  |
+--------+------------------------------------------------------------------------+
|5333265 |[history_biography]                                                     |
|1333909 |[fiction, history_biography]                                            |
|7327624 |[fantasy_paranormal, fiction, mystery_thriller_crime, poetry]           |
|6066819 |[fiction, mystery_thriller_crime, romance]                              |
|287140  |[non_fiction]                                                           |
|287141  |[children, fantasy_paranormal, fiction, history_biography, young_adult] |
|378460  |[fiction]                                                               |
|6066812 |[children, fantasy_paranormal, fiction, young_adult]                    |
|34883016|[romance]                                                         

In [19]:
bookGenreDF = bookDFLanguages.join(filtered_genres, col('best_book_id') == col('book_id'))

In [20]:
#bookGenreDF.filter(bookGenreDF.book_id == 27036533).show()

In [21]:
bookGenreDF.count() # 1998

                                                                                

1998

In [22]:
pathB = "./Datasets/goodreads_books.json"
bookDetailDF = spark.read.json(pathB)
bookDetailDF.printSchema()

ERROR:root:KeyboardInterrupt while sending command.              (20 + 10) / 69]
Traceback (most recent call last):
  File "/Users/shirleyberry/Library/Python/3.9/lib/python/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Users/shirleyberry/Library/Python/3.9/lib/python/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt

KeyboardInterrupt: 

In [None]:
bookDetailDF = bookDetailDF.select("book_id",\
                                   "work_id",\
                                   "isbn",\
                                   "isbn13",\
                                   "title",\
                                   "format",\
                                   "url",\
                                   "image_url",\
                                   "authors")

                                   #"description",\
                                   #"link",\

In [None]:
# books have a list of authors, flatten it by just taking the first
def get_author_id(authors_list):
    try:
        if authors_list and authors_list[0]:
            return int(authors_list[0]['author_id'])
        else:
            return 0
    except EOFError:
        return 0

get_author_id_udf = udf(get_author_id, IntegerType())

In [None]:
bookDetailDF = bookDetailDF.withColumn('author_id', get_author_id_udf(col('authors'))).drop('authors')

In [None]:
bookDetailDF = bookDetailDF.withColumnRenamed('book_id', 'det_book_id').withColumnRenamed('work_id', 'det_work_id')

In [None]:
bookDetailDF = bookDetailDF.filter(bookDetailDF.author_id != 0) # CAN REMOVE THIS

In [None]:
bookDetailDF.count()

In [None]:
# now we have works+genres and books+author IDs, join them
fullBookDF = bookGenreDF.join(bookDetailDF, bookGenreDF.best_book_id == bookDetailDF.det_book_id)
#fullBookDF.count() # 1998

In [None]:
# join in author data to get the author name
pathA = "./Datasets/goodreads_book_authors.json"
authorDetailDF = spark.read.json(pathA)
#authorDetailDF.printSchema()

In [None]:
authorDetailDF = authorDetailDF.select(authorDetailDF.author_id, authorDetailDF.name).withColumnRenamed('author_id', 'a_author_id')

In [None]:
fullBookDF = fullBookDF.join(authorDetailDF, fullBookDF.author_id == authorDetailDF.a_author_id).drop('a_author_id')
#fullBookDF.count() # 1998

In [None]:
# repartition so we can write it to a single file
fullBookDF = fullBookDF.repartition(1)
#fullBookDF.write.csv("full_book_data.csv", header=True, mode="overwrite")

In [None]:
# get the IDs before we re-partition and write
# used later to filter the interactions dataset
bookIDs = fullBookDF.select("best_book_id").rdd.flatMap(lambda x: x).collect()

In [None]:
# use the idsDF to truncate / filter down the interactions dataset which is MASSIVE
pathI = "./Datasets/goodreads_interactions.csv"
interactionsDF = spark.read.option("header", True).csv(pathI)

In [None]:
interactionsDF.printSchema()
#interactionsDF.count() # prefiltered count: 228,648,342

In [None]:
# bookDetailDF = bookDetailDF.filter(bookDetailDF.author_id != 0)
interactionsDF = interactionsDF.filter(interactionsDF.is_read == 1) # 112,131,203
interactionsDF = interactionsDF.filter(interactionsDF.book_id.isin(bookIDs)) # 193,937
#interactionsDF.count() #1,381,758

In [None]:
#interactionsDF = interactionsDF.join(idsDF, "book_id", "inner")

In [None]:
#filteredInteractionsDF.count() # 

In [None]:
interactionsDF = interactionsDF.repartition(1)
interactionsDF.write.csv("interactions_data.csv", header=True, mode="overwrite")

                                                                                