In [1]:
# Import Dependencies
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, FloatType
from pyspark.sql.functions import col, explode, when, sum, size

# Initialize Spark Session
spark = SparkSession.builder.appName("BookRatingPrediction").getOrCreate()

In [2]:
# Path to your .json.gz file
file_path = "Resources/data/goodreads_books.json.gz"

# Read the compressed JSON file into a DataFrame
df = spark.read.json(file_path)

# Show the first few rows to verify
df.show()

23/11/22 01:40:46 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----------+--------------------+--------------+--------+------------+--------------------+-------------------+---------+--------------------+--------+----------+-------------+-----------+-------------+--------------------+---------+--------------------+---------------+-----------------+----------------+--------------------+-------------+---------+--------------------+------------------+--------------------+--------------------+--------------------+--------+
|      asin|             authors|average_rating| book_id|country_code|         description|edition_information|   format|           image_url|is_ebook|      isbn|       isbn13|kindle_asin|language_code|                link|num_pages|     popular_shelves|publication_day|publication_month|publication_year|           publisher|ratings_count|   series|       similar_books|text_reviews_count|               title|title_without_series|                 url| work_id|
+----------+--------------------+--------------+--------+------------+----

In [3]:
# Print the schema to see the types
df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- author_id: string (nullable = true)
 |    |    |-- role: string (nullable = true)
 |-- average_rating: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- description: string (nullable = true)
 |-- edition_information: string (nullable = true)
 |-- format: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- is_ebook: string (nullable = true)
 |-- isbn: string (nullable = true)
 |-- isbn13: string (nullable = true)
 |-- kindle_asin: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- link: string (nullable = true)
 |-- num_pages: string (nullable = true)
 |-- popular_shelves: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- count: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- pub

In [4]:
# Count the number of rows in the DataFrame
df.count()

                                                                                

2360655

In [5]:
# Repartition the DataFrame into smaller chunks
num_partitions = 10 
df_repartitioned = df.repartition(num_partitions)

# Write the repartitioned DataFrame back to disk as multiple files
df_repartitioned.write.mode('overwrite').json("Resources/data_chunks")


                                                                                

In [6]:
# Read the data back in from the multiple files
df = spark.read.json("Resources/data_chunks")

# List of columns to drop
columns_to_drop = ['asin','country_code', 'book_id','description', 'isbn', 'similar_books', 'title_without_series','is_ebook', 'work_id' , 'link', 'image_url', 'url', 'edition_information', 'kindle_asin','language_code', 'publication_day', 'publication_month', 'publisher', 'title']

# Dropping unnecessary columns
df = df.drop(*columns_to_drop)

# Convert data types
df = df.withColumn("average_rating", col("average_rating").cast(FloatType())) \
    .withColumn("ratings_count", col("ratings_count").cast(IntegerType())) \
    .withColumn("num_pages", col("num_pages").cast(IntegerType())) \
    .withColumn("text_reviews_count", col("text_reviews_count").cast(IntegerType()))

# drop rows with null values
df = df.dropna(how='any')

# Show the first few rows to verify
df.show()

                                                                                

+--------------------+--------------+--------------------+-------------+---------+--------------------+----------------+-------------+----------------+------------------+
|             authors|average_rating|              format|       isbn13|num_pages|     popular_shelves|publication_year|ratings_count|          series|text_reviews_count|
+--------------------+--------------+--------------------+-------------+---------+--------------------+----------------+-------------+----------------+------------------+
|[{115816, }, {400...|          3.82|           Paperback|9780890844816|      152|[{10, to-read}, {...|            1989|           20|              []|                 2|
|         [{12170, }]|          3.82|           Hardcover|9780679406181|      243|[{29, to-read}, {...|            1992|           19|              []|                 3|
|       [{5191981, }]|          4.09|           Paperback|9781844573691|       96|[{11, to-read}, {...|            2012|           11|           

In [7]:
# Print the schema to see the types
df.printSchema()

root
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- author_id: string (nullable = true)
 |    |    |-- role: string (nullable = true)
 |-- average_rating: float (nullable = true)
 |-- format: string (nullable = true)
 |-- isbn13: string (nullable = true)
 |-- num_pages: integer (nullable = true)
 |-- popular_shelves: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- count: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- publication_year: string (nullable = true)
 |-- ratings_count: integer (nullable = true)
 |-- series: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- text_reviews_count: integer (nullable = true)



In [8]:
# To filter out rows with empty strings in any column
df_cleaned = df.filter(
    (col("country_code").isNotNull() & (col("country_code") != "")) &
    (col("format").isNotNull() & (col("format") != "")) &
    (col("publication_year").isNotNull() & (col("publication_year") != "")) &
    col("num_pages").isNotNull() &
    col("ratings_count").isNotNull() &
    col("text_reviews_count").isNotNull() & col("average_rating").isNotNull()
    & (col("isbn13").isNotNull() & (col("isbn13") != ""))
)

df_cleaned.show()

+--------------------+--------------+--------------------+-------------+---------+--------------------+----------------+-------------+--------+------------------+
|             authors|average_rating|              format|       isbn13|num_pages|     popular_shelves|publication_year|ratings_count|  series|text_reviews_count|
+--------------------+--------------+--------------------+-------------+---------+--------------------+----------------+-------------+--------+------------------+
|[{115816, }, {400...|          3.82|           Paperback|9780890844816|      152|[{10, to-read}, {...|            1989|           20|      []|                 2|
|         [{12170, }]|          3.82|           Hardcover|9780679406181|      243|[{29, to-read}, {...|            1992|           19|      []|                 3|
|       [{5191981, }]|          4.09|           Paperback|9781844573691|       96|[{11, to-read}, {...|            2012|           11|      []|                 3|
|[{176372, }, {462...|

In [9]:
# Drop duplicates
df = df_cleaned.dropDuplicates(['isbn13'])

In [10]:
# Explode the popular_shelves array
df_exploded = df.withColumn("shelf", explode("popular_shelves"))

# Create a column that holds the count if the shelf name is 'to-read', otherwise 0
df_with_count = df_exploded.withColumn("to_read_count", when(col("shelf.name") == "to-read", col("shelf.count").cast("int")).otherwise(0))

# Aggregate to sum the to-read counts for each book
df_aggregated = df_with_count.groupBy("isbn13").agg(sum("to_read_count").alias("total_to_read_count"))

# Show the result
df_aggregated.show()





+-------------+-------------------+
|       isbn13|total_to_read_count|
+-------------+-------------------+
|0000000001976|                  7|
|0000470534982|                  5|
|0000521885841|                  7|
|0000814474233|                154|
|000155206591x|                332|
|0008520919197|               1648|
|0009773781771|                585|
|0009792204458|                984|
|0009796865300|                 21|
|0020049036464|                461|
|0020049070222|                 61|
|0021898130853|                 81|
|0023755023346|                135|
|0023755077226|                 38|
|0023755077899|                207|
|0025986210915|                 59|
|0025986232290|                 48|
|0025986236601|                 12|
|0025986347017|                 12|
|0031809029169|               5143|
+-------------+-------------------+
only showing top 20 rows



                                                                                

In [11]:
# Join the original DataFrame with the aggregated DataFrame
df = df.join(df_aggregated, "isbn13", "left")
df.show()

[Stage 18:>                                                         (0 + 1) / 1]

+-------------+--------------------+--------------+--------------------+---------+--------------------+----------------+-------------+----------------+------------------+-------------------+
|       isbn13|             authors|average_rating|              format|num_pages|     popular_shelves|publication_year|ratings_count|          series|text_reviews_count|total_to_read_count|
+-------------+--------------------+--------------+--------------------+---------+--------------------+----------------+-------------+----------------+------------------+-------------------+
|0000470534982|         [{16923, }]|          2.86|           Paperback|      273|[{5, to-read}, {1...|            2009|            7|              []|                 2|                  5|
|0008520919197|        [{123715, }]|          4.07|           Paperback|      288|[{2944, mystery},...|            2006|           21|[227086, 356197]|                 2|               1648|
|0009792204458|          [{6218, }]|         

                                                                                

In [12]:
# Transform the 'series' column into binary values
df = df.withColumn("series_binary", when(size(col("series")) > 0, 1).otherwise(0))
df.show()



+-------------+--------------------+--------------+--------------------+---------+--------------------+----------------+-------------+----------------+------------------+-------------------+-------------+
|       isbn13|             authors|average_rating|              format|num_pages|     popular_shelves|publication_year|ratings_count|          series|text_reviews_count|total_to_read_count|series_binary|
+-------------+--------------------+--------------+--------------------+---------+--------------------+----------------+-------------+----------------+------------------+-------------------+-------------+
|0000470534982|         [{16923, }]|          2.86|           Paperback|      273|[{5, to-read}, {1...|            2009|            7|              []|                 2|                  5|            0|
|0008520919197|        [{123715, }]|          4.07|           Paperback|      288|[{2944, mystery},...|            2006|           21|[227086, 356197]|                 2|          

                                                                                

In [13]:
# Path to your .json.gz file
file2_path = "Resources/data/goodreads_book_authors.json.gz"

# Read the compressed JSON file into a DataFrame
author_rating_df = spark.read.json(file2_path)

# Show the first few rows to verify
author_rating_df.show()

[Stage 24:>                                                         (0 + 1) / 1]

+---------+--------------+-------------------+-------------+------------------+
|author_id|average_rating|               name|ratings_count|text_reviews_count|
+---------+--------------+-------------------+-------------+------------------+
|   604031|          3.98|   Ronald J. Fields|           49|                 7|
|   626222|          4.08|      Anita Diamant|       546796|             28716|
|    10333|          3.92|     Barbara Hambly|       122118|              5075|
|     9212|          3.68|    Jennifer Weiner|       888522|             36262|
|   149918|          3.82|      Nigel Pennick|         1740|                96|
|  3041852|          3.89|   Alfred J. Church|          947|                85|
|   215594|          4.17| Michael Halberstam|           23|                 6|
|    19158|          4.18|     Rachel Roberts|        13677|               486|
|  5807700|          3.99|         V.L. Locey|         3130|               986|
|  2983296|          3.48|Anton Szandor 

                                                                                

In [14]:
# Drop unnecessary columns
author_rating_df = author_rating_df.drop('name', 'ratings_count', 'text_reviews_count')

In [15]:
# Select only the 'authors' and 'isbn13' columns from the main DataFrame
df_authors_isbn = df.select("authors", "isbn13")

In [16]:
# Explode the 'authors' column to create a new row for each author
df_authors_exploded = df_authors_isbn.withColumn("author", explode("authors"))

# Extract the author_id from the struct
df_authors_exploded = df_authors_exploded.withColumn("unique_author_id", col("author.author_id"))
df_authors_exploded = df_authors_exploded.drop(*["author", "authors"])
df_authors_exploded.show()



+-------------+----------------+
|       isbn13|unique_author_id|
+-------------+----------------+
|0000000001976|          180734|
|0000000067317|            1654|
|0000195118146|          224783|
|0000195118146|          363486|
|0000195397916|         1842587|
|0000262541785|          322903|
|0000340826681|           18174|
|0000415458250|           21233|
|0000470534982|           16923|
|0000521885841|         2549651|
|0000521885841|         1198502|
|0000785116583|           10297|
|0000785116583|           10294|
|0000814474233|          325296|
|0000815774141|         2020614|
|0000847691225|         1101012|
|0000847691225|          537802|
|0000977316351|         1375008|
|0001442206098|           65398|
|0001442206098|         4623567|
+-------------+----------------+
only showing top 20 rows



                                                                                

In [17]:
# Join with author ratings DataFrame (assuming it exists)
df_authors_with_ratings = df_authors_exploded.join(author_rating_df, df_authors_exploded.unique_author_id == author_rating_df.author_id, "left")
df_authors_with_ratings.show()



+-------------+----------------+---------+--------------+
|       isbn13|unique_author_id|author_id|average_rating|
+-------------+----------------+---------+--------------+
|0000000001976|          180734|   180734|          3.77|
|0000000067317|            1654|     1654|          4.14|
|0000195118146|          224783|   224783|          3.90|
|0000195118146|          363486|   363486|          3.83|
|0000195397916|         1842587|  1842587|          3.37|
|0000262541785|          322903|   322903|          4.06|
|0000340826681|           18174|    18174|          4.10|
|0000415458250|           21233|    21233|          3.51|
|0000470534982|           16923|    16923|          3.37|
|0000521885841|         2549651|  2549651|          4.50|
|0000521885841|         1198502|  1198502|          3.63|
|0000785116583|           10297|    10297|          3.81|
|0000785116583|           10294|    10294|          3.83|
|0000814474233|          325296|   325296|          3.62|
|0000815774141

                                                                                

In [18]:
# Drop author ids columns
df_authors_with_ratings = df_authors_with_ratings.drop(*["author_id", "unique_author_id"])

# Aggregate the author ratings for each book
df_authors_agg = df_authors_with_ratings.groupBy("isbn13").agg({"average_rating": "avg"})
df_authors_agg.show()

[Stage 36:>                                                         (0 + 1) / 1]

+-------------+-------------------+
|       isbn13|avg(average_rating)|
+-------------+-------------------+
|0000000001976|               3.77|
|0000000067317|               4.14|
|0000195118146|              3.865|
|0000195397916|               3.37|
|0000262541785|               4.06|
|0000340826681|                4.1|
|0000415458250|               3.51|
|0000470534982|               3.37|
|0000521885841| 4.0649999999999995|
|0000785116583| 3.8200000000000003|
|0000814474233|               3.62|
|0000815774141|               3.99|
|0000847691225|               3.45|
|0000977316351|                3.6|
|0001442206098|              3.545|
|000155206591x|               3.77|
|0001603705341|               3.52|
|0001607061562|               4.07|
|0001843608081|               3.93|
|0008520919197|               3.97|
+-------------+-------------------+
only showing top 20 rows



                                                                                

In [19]:
# Join the authors average rating with the main DataFrame
df = df.join(df_authors_agg, "isbn13", "left")
df.show()



+-------------+--------------------+--------------+--------------------+---------+--------------------+----------------+-------------+----------------+------------------+-------------------+-------------+-------------------+
|       isbn13|             authors|average_rating|              format|num_pages|     popular_shelves|publication_year|ratings_count|          series|text_reviews_count|total_to_read_count|series_binary|avg(average_rating)|
+-------------+--------------------+--------------+--------------------+---------+--------------------+----------------+-------------+----------------+------------------+-------------------+-------------+-------------------+
|0000470534982|         [{16923, }]|          2.86|           Paperback|      273|[{5, to-read}, {1...|            2009|            7|              []|                 2|                  5|            0|               3.37|
|0008520919197|        [{123715, }]|          4.07|           Paperback|      288|[{2944, mystery},.

                                                                                

In [20]:
# Create a temporary view of the DataFrame
df.createOrReplaceTempView("books")

In [21]:
# Run a SQL query to count the number of books per format
spark.sql("SELECT format, count(*) FROM books group by format order by 2 desc limit 10").show()



+--------------------+--------+
|              format|count(1)|
+--------------------+--------+
|           Paperback|  727551|
|           Hardcover|  295467|
|               ebook|   92761|
|Mass Market Paper...|   30394|
|            Audio CD|   12047|
|     Unknown Binding|    5416|
|      Kindle Edition|    4605|
|          Board Book|    3886|
|           Audiobook|    3435|
|               Audio|    1328|
+--------------------+--------+



                                                                                

In [22]:
# Run a SQL query to count the books for less popular formats
spark.sql("select count (*) from books where format != 'Paperback' and format != 'Hardcover' and format != 'ebook'").show()



+--------+
|count(1)|
+--------+
|   72323|
+--------+



                                                                                

In [24]:
# Change format to 'other' if it is not a popular format
df = df.withColumn("format", when((col("format") != "Paperback") & (col("format") != "Hardcover") & (col("format") != "ebook"), "other").otherwise(col("format")))
df.show()



+-------------+--------------------+--------------+---------+---------+--------------------+----------------+-------------+----------------+------------------+-------------------+-------------+-------------------+
|       isbn13|             authors|average_rating|   format|num_pages|     popular_shelves|publication_year|ratings_count|          series|text_reviews_count|total_to_read_count|series_binary|avg(average_rating)|
+-------------+--------------------+--------------+---------+---------+--------------------+----------------+-------------+----------------+------------------+-------------------+-------------+-------------------+
|0000470534982|         [{16923, }]|          2.86|Paperback|      273|[{5, to-read}, {1...|            2009|            7|              []|                 2|                  5|            0|               3.37|
|0008520919197|        [{123715, }]|          4.07|Paperback|      288|[{2944, mystery},...|            2006|           21|[227086, 356197]|    

                                                                                

In [25]:
# Drop unnecessary columns
df = df.drop(*["isbn13", "authors", "popular_shelves", "series"])

# Rename the columns
df = df.withColumnRenamed("avg(average_rating)", "author_average_rating") \
    .withColumnRenamed("total_to_read_count", "to_read_count")

df.show()

[Stage 72:>                                                         (0 + 1) / 1]

+--------------+---------+---------+----------------+-------------+------------------+-------------+-------------+---------------------+
|average_rating|   format|num_pages|publication_year|ratings_count|text_reviews_count|to_read_count|series_binary|author_average_rating|
+--------------+---------+---------+----------------+-------------+------------------+-------------+-------------+---------------------+
|           4.5|Paperback|      140|            1976|            1|                 1|            7|            0|                 3.77|
|          2.86|Paperback|      273|            2009|            7|                 2|            5|            0|                 3.37|
|           4.5|Hardcover|      385|            2008|            2|                 1|            7|            0|   4.0649999999999995|
|          3.62|Hardcover|      220|            2007|           97|                19|          154|            0|                 3.62|
|          4.07|Paperback|      288|     

                                                                                

In [26]:
# Tranform df to pandas dataframe
pandas_df = df.toPandas()

                                                                                

In [28]:
# Save pandas dataframe to csv
pandas_df.to_csv("Resources/panda_df/books_cleaned.csv", index=False)

In [30]:
# Extract a sample of the DataFrame
df_sample = df.sample(False, 0.05, seed=42)
df_sample_pandas = df_sample.toPandas()
df_sample_pandas.to_csv("Resources/panda_df/books_cleaned_sample.csv", index=False)

23/11/22 02:07:48 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                                

In [31]:
# Stop the Spark session
spark.stop()