In [1]:
spark

In [2]:
# Set the logging level for ERRORs only.
sc.setLogLevel("ERROR")

In [3]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType

schema = StructType([
    StructField("asin", StringType(), nullable=True),
    StructField("authors", ArrayType(
        StructType([
            StructField("author_id", IntegerType(), nullable=True),
            StructField("role", StringType(), nullable=True)
        ])
    ), nullable=True),
    StructField("average_rating", StringType(), nullable=True),
    StructField("book_id", StringType(), nullable=True),
    StructField("country_code", StringType(), nullable=True),
    StructField("description", StringType(), nullable=True),
    StructField("edition_information", StringType(), nullable=True),
    StructField("format", StringType(), nullable=True),
    StructField("image_url", StringType(), nullable=True),
    StructField("is_ebook", StringType(), nullable=True),
    StructField("isbn", StringType(), nullable=True),
    StructField("isbn13", StringType(), nullable=True),
    StructField("kindle_asin", StringType(), nullable=True),
    StructField("language_code", StringType(), nullable=True),
    StructField("link", StringType(), nullable=True),
    StructField("num_pages", StringType(), nullable=True),
    StructField("popular_shelves", ArrayType(
        StructType([
            StructField("count", IntegerType(), nullable=True),
            StructField("name", StringType(), nullable=True)
        ])
    ), nullable=True),
    StructField("publication_day", StringType(), nullable=True),
    StructField("publication_month", StringType(), nullable=True),
    StructField("publication_year", StringType(), nullable=True),
    StructField("publisher", StringType(), nullable=True),
    StructField("ratings_count", StringType(), nullable=True),
    StructField("series", ArrayType(StringType()), nullable=True),
    StructField("similar_books", ArrayType(StringType()), nullable=True),
    StructField("text_reviews_count", StringType(), nullable=True),
    StructField("title", StringType(), nullable=True),
    StructField("title_without_series", StringType(), nullable=True),
    StructField("url", StringType(), nullable=True),
    StructField("work_id", StringType(), nullable=True)
])

goodreads_books_path = 'gs://my-bucket-apb/landing/goodreads_books.json'
goodreads_books_sdf = spark.read.json(goodreads_books_path, schema=schema)


In [4]:
goodreads_books_sdf.printSchema()

root
 |-- asin: string (nullable = true)
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- author_id: integer (nullable = true)
 |    |    |-- role: string (nullable = true)
 |-- average_rating: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- description: string (nullable = true)
 |-- edition_information: string (nullable = true)
 |-- format: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- is_ebook: string (nullable = true)
 |-- isbn: string (nullable = true)
 |-- isbn13: string (nullable = true)
 |-- kindle_asin: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- link: string (nullable = true)
 |-- num_pages: string (nullable = true)
 |-- popular_shelves: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- count: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- p

In [5]:
from pyspark.sql.functions import concat, col, lit

# Create a new column called publication date by concatenating the information stored in year, month, day.  
goodreads_books_sdf = goodreads_books_sdf.withColumn(
    "publication_date",
    concat(
        col("publication_year"),
        lit("-"),
        col("publication_month"),
        lit("-"),
        col("publication_day")
    )
)

# Drop the publication_day, publication_month, and publication_year columns
goodreads_books_sdf = goodreads_books_sdf.drop("publication_day", "publication_month", "publication_year")


In [6]:
goodreads_books_sdf.printSchema()

root
 |-- asin: string (nullable = true)
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- author_id: integer (nullable = true)
 |    |    |-- role: string (nullable = true)
 |-- average_rating: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- description: string (nullable = true)
 |-- edition_information: string (nullable = true)
 |-- format: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- is_ebook: string (nullable = true)
 |-- isbn: string (nullable = true)
 |-- isbn13: string (nullable = true)
 |-- kindle_asin: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- link: string (nullable = true)
 |-- num_pages: string (nullable = true)
 |-- popular_shelves: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- count: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- p

In [7]:
# Show the new column and some of its entries
goodreads_books_sdf.select("publication_date").show(5, truncate=False)

                                                                                

+----------------+
|publication_date|
+----------------+
|1984-9-1        |
|2001-10-1       |
|1987--          |
|2009-7-14       |
|--              |
+----------------+
only showing top 5 rows



In [7]:
#Dropping unnecessary columns
goodreads_books_sdf = goodreads_books_sdf.drop("isbn13", "title_without_series", "work_id","image_url","link","url","asin", "country_code","edition_information","publisher","similar_books","is_ebook","isbn","kindle_asin","series")

In [8]:
goodreads_books_sdf.printSchema()

root
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- author_id: integer (nullable = true)
 |    |    |-- role: string (nullable = true)
 |-- average_rating: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- description: string (nullable = true)
 |-- format: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- num_pages: string (nullable = true)
 |-- popular_shelves: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- count: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- ratings_count: string (nullable = true)
 |-- text_reviews_count: string (nullable = true)
 |-- title: string (nullable = true)
 |-- publication_date: string (nullable = true)



In [9]:
from pyspark.sql.functions import col, explode

# Explode the authors array to separate rows for each author
exploded_authors = goodreads_books_sdf.withColumn("author", explode("authors"))

# Select distinct author names
distinct_authors = exploded_authors.select("author.author_id", "author.role").distinct()

# Show distinct author names
distinct_authors.show(20,truncate=False)


                                                                                

+---------+---------------------------+
|author_id|role                       |
+---------+---------------------------+
|NULL     |guion                      |
|NULL     |Traduction                 |
|NULL     |ldktwr                     |
|NULL     |muHwr                      |
|NULL     |as Alex Dumas              |
|NULL     |introduction               |
|NULL     |Co-Author                  |
|NULL     |Contriutor                 |
|NULL     |Prologue                   |
|NULL     |Editor, Introduction, Notes|
|NULL     |co-author                  |
|NULL     |adapte du roman de         |
|NULL     |-Illustrator               |
|NULL     |Foreword, Photographer     |
|NULL     |Arranged by                |
|NULL     |Colourist                  |
|NULL     |Art                        |
|NULL     |Narrador                   |
|NULL     |Curator                    |
|NULL     |Colored                    |
+---------+---------------------------+
only showing top 20 rows



In [10]:
# Drop records where the specified columns are empty (null or nan)
goodreads_books_sdf = goodreads_books_sdf.na.drop(subset=["authors.author_id"])

In [27]:
# Count the number of null values in the authors.author_id column
null_count = goodreads_books_sdf.where(col("authors.author_id").isNull()).count()

# Print the number of null values
print("Number of null values in authors.author_id:", null_count)

# Show some sample rows where authors.author_id is null
goodreads_books_sdf.filter(col("authors.author_id").isNull()).show(10)


                                                                                

Number of null values in authors.author_id: 0




+----+-------+--------------+-------+------------+-----------+-------------------+------+--------+----+-----------+-------------+---------+---------------+---------+-------------+------+-------------+------------------+-----+----------------+
|asin|authors|average_rating|book_id|country_code|description|edition_information|format|is_ebook|isbn|kindle_asin|language_code|num_pages|popular_shelves|publisher|ratings_count|series|similar_books|text_reviews_count|title|publication_date|
+----+-------+--------------+-------+------------+-----------+-------------------+------+--------+----+-----------+-------------+---------+---------------+---------+-------------+------+-------------+------------------+-----+----------------+
+----+-------+--------------+-------+------------+-----------+-------------------+------+--------+----+-----------+-------------+---------+---------------+---------+-------------+------+-------------+------------------+-----+----------------+



                                                                                

In [11]:
from pyspark.sql.functions import col

distinct_dates = goodreads_books_sdf.select("publication_date").distinct()

# Show distinct publication dates
distinct_dates.show(20,truncate=False)



+----------------+
|publication_date|
+----------------+
|2013-11-5       |
|2017-2-20       |
|2017-9-29       |
|1986-11-5       |
|2012-12-5       |
|416--           |
|2014-12-9       |
|2010-4-24       |
|2004-1-13       |
|1961-12-1       |
|2002-2-19       |
|1389--          |
|1997-11-1       |
|2015-11-20      |
|1982-1-2        |
|2005-1-21       |
|2010-12-7       |
|2016-3-12       |
|1998-3-21       |
|2010-11-3       |
+----------------+
only showing top 20 rows



                                                                                

In [11]:
from pyspark.sql.functions import col, to_date

# Filter out rows with invalid publication dates
cleaned_goodreads_books_sdf = goodreads_books_sdf.filter(
    col("publication_date").rlike("^\\d{4}-\\d{1,2}-\\d{1,2}$")
)

# Update the original DataFrame with the cleaned data
goodreads_books_sdf = cleaned_goodreads_books_sdf

In [13]:
from pyspark.sql.functions import col

# Find distinct values in the num_pages column with a limit of 20
distinct_num_pages = goodreads_books_sdf.select("num_pages").distinct().limit(20)

distinct_num_pages.show()



+---------+
|num_pages|
+---------+
|      296|
|      451|
|     1280|
|        7|
|      475|
|      307|
|      383|
|     1008|
|      700|
|      886|
|      154|
|      714|
|      428|
|      854|
|      422|
|      595|
|     1856|
|      323|
|      424|
|      586|
+---------+



[Stage 9:>                                                          (0 + 1) / 1]                                                                                

In [12]:
# Filter out rows with missing or null values in the 'format' column
goodreads_books_sdf = goodreads_books_sdf.filter(goodreads_books_sdf["format"].isNotNull() & (goodreads_books_sdf["format"] != ""))

# Check the number of rows after filtering
print("Number of rows after removing null or empty values in the format column:", goodreads_books_sdf.count())



Number of rows after removing null or empty values in the format column: 1279368


                                                                                

In [13]:
from pyspark.sql.functions import col

# Count the number of null or empty values in the 'format' column
null_format_count = goodreads_books_sdf.filter(col("format").isNull() | (col("format") == "")).count()

# Show the count of null or empty values in the 'format' column
print("Number of null or empty values in the 'format' column:", null_format_count)



Number of null or empty values in the 'format' column: 0


                                                                                

In [14]:
from pyspark.sql.types import IntegerType

# Convert the num_pages column to an integer type
goodreads_books_sdf = goodreads_books_sdf.withColumn("num_pages", goodreads_books_sdf["num_pages"].cast(IntegerType()))


In [15]:
goodreads_books_sdf = goodreads_books_sdf.drop("language_code")

In [16]:
# Remove rows with no title column
goodreads_books_sdf = goodreads_books_sdf.filter(col("title").isNotNull())

# Count the number of null values in the title column
null_titles_count = goodreads_books_sdf.filter(col("title").isNull()).count()

print("Number of null values in title column:", null_titles_count)




Number of null values in title column: 0


                                                                                

In [17]:
from pyspark.sql.functions import col

# Filter rows where publisher is null
null_publishers_df = goodreads_books_sdf.filter(col("publisher").isNull())

# Show description and title for publishers with null values
null_publishers_df.select("description", "title").show(10, truncate=False)



+-----------+-----+
|description|title|
+-----------+-----+
+-----------+-----+





In [18]:
from pyspark.sql.functions import col, trim

# Remove leading and trailing spaces from the description column
cleaned_description_df = goodreads_books_sdf.withColumn("description", trim(col("description")))

# Update the DataFrame
goodreads_books_sdf = cleaned_description_df

In [20]:
from pyspark.sql.functions import col

# Convert columns to integer type
goodreads_books_sdf = goodreads_books_sdf.withColumn("average_rating", col("average_rating").cast("float")) \
    .withColumn("book_id", col("book_id").cast("integer")) \
    .withColumn("ratings_count", col("ratings_count").cast("integer")) \
    .withColumn("text_reviews_count", col("text_reviews_count").cast("integer"))

#check to see if change is reflected
goodreads_books_sdf.printSchema()

root
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- author_id: integer (nullable = true)
 |    |    |-- role: string (nullable = true)
 |-- average_rating: float (nullable = true)
 |-- book_id: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- format: string (nullable = true)
 |-- num_pages: integer (nullable = true)
 |-- popular_shelves: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- count: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- ratings_count: integer (nullable = true)
 |-- text_reviews_count: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- publication_date: string (nullable = true)



In [21]:
#read in goodreads_interactions_csv file
interactions_file_path='gs://my-bucket-apb/landing/goodreads_interactions.csv'
interactions_sdf = spark.read.csv(interactions_file_path, sep='\t', header=True, inferSchema=True)

                                                                                

In [33]:
# Print the schema of interactions_sdf
interactions_sdf.printSchema()

# Show a couple of rows from interactions_sdf
interactions_sdf.show(5)


root
 |-- user_id,book_id,is_read,rating,is_reviewed: string (nullable = true)

+------------------------------------------+
|user_id,book_id,is_read,rating,is_reviewed|
+------------------------------------------+
|                               0,948,1,5,0|
|                               0,947,1,5,1|
|                               0,946,1,5,0|
|                               0,945,1,5,0|
|                               0,944,1,5,0|
+------------------------------------------+
only showing top 5 rows



In [22]:
#split the single interactions file into 4 corresponding columns

from pyspark.sql.functions import split

# Split the column into multiple columns using the split function
split_col = split(interactions_sdf["user_id,book_id,is_read,rating,is_reviewed"], ",")

# Update the existing DataFrame with the new columns
interactions_sdf = interactions_sdf.withColumn("user_id", split_col.getItem(0)) \
    .withColumn("book_id", split_col.getItem(1)) \
    .withColumn("is_read", split_col.getItem(2)) \
    .withColumn("rating", split_col.getItem(3)) \
    .withColumn("is_reviewed", split_col.getItem(4))

interactions_sdf = interactions_sdf.drop("user_id,book_id,is_read,rating,is_reviewed")

interactions_sdf.printSchema()
interactions_sdf.show(5)


root
 |-- user_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- is_read: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- is_reviewed: string (nullable = true)

+-------+-------+-------+------+-----------+
|user_id|book_id|is_read|rating|is_reviewed|
+-------+-------+-------+------+-----------+
|      0|    948|      1|     5|          0|
|      0|    947|      1|     5|          1|
|      0|    946|      1|     5|          0|
|      0|    945|      1|     5|          0|
|      0|    944|      1|     5|          0|
+-------+-------+-------+------+-----------+
only showing top 5 rows



In [27]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

# Convert user_id and book_id columns to integers
interactions_sdf = interactions_sdf.withColumn("rating", col("rating").cast(IntegerType())) \
    .withColumn("book_id", col("book_id").cast(IntegerType())) \
    .withColumn("is_read", col("is_read").cast(IntegerType())) \
    .withColumn("is_reviewed", col("is_reviewed").cast(IntegerType())) \
    .withColumn("user_id", col("user_id").cast(StringType())) \

interactions_sdf.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- book_id: integer (nullable = true)
 |-- is_read: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- is_reviewed: integer (nullable = true)



In [28]:
# Join two dateframes based on book_id column
new_joined_sdf = interactions_sdf.join(goodreads_books_sdf, "book_id", "inner")

# Show the joined DataFrame
new_joined_sdf.show(10)




+-------+-------+-------+------+-----------+----------+--------------+--------------------+--------------------+---------+--------------------+-------------+------------------+--------------------+----------------+
|book_id|user_id|is_read|rating|is_reviewed|   authors|average_rating|         description|              format|num_pages|     popular_shelves|ratings_count|text_reviews_count|               title|publication_date|
+-------+-------+-------+------+-----------+----------+--------------+--------------------+--------------------+---------+--------------------+-------------+------------------+--------------------+----------------+
|     65|      0|      0|     0|          0|[{NULL, }]|          3.68|A ten year vetera...|Mass Market Paper...|      288|[{NULL, to-read},...|           29|                 6|Shadows in the St...|        2007-2-6|
|     65|   3142|      0|     0|          0|[{NULL, }]|          3.68|A ten year vetera...|Mass Market Paper...|      288|[{NULL, to-read},.

[Stage 28:>                                                         (0 + 1) / 1]                                                                                

In [29]:
new_joined_sdf.printSchema()

root
 |-- book_id: integer (nullable = true)
 |-- user_id: string (nullable = true)
 |-- is_read: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- is_reviewed: integer (nullable = true)
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- author_id: integer (nullable = true)
 |    |    |-- role: string (nullable = true)
 |-- average_rating: float (nullable = true)
 |-- description: string (nullable = true)
 |-- format: string (nullable = true)
 |-- num_pages: integer (nullable = true)
 |-- popular_shelves: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- count: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- ratings_count: integer (nullable = true)
 |-- text_reviews_count: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- publication_date: string (nullable = true)



In [32]:
top_rated_with_most_reads = new_joined_sdf.filter(col("is_read") == 1) \
    .groupBy("book_id", "title") \
    .agg({"rating": "avg", "is_read": "sum"}) \
    .withColumnRenamed("avg(rating)", "average_rating") \
    .withColumnRenamed("sum(is_read)", "read_count") \
    .orderBy(col("read_count").desc(), col("average_rating").desc()) \
    .limit(10)  # Limit the results to top 10

# Show the top 10 rated books with most reads and highest average rating
top_rated_with_most_reads.show()



+-------+--------------------+----------+------------------+
|book_id|               title|read_count|    average_rating|
+-------+--------------------+----------+------------------+
|    943|Marriage Special ...|    285698| 4.431984123095017|
|    536|    The Lovely Bones|    277345| 4.302388721628297|
|   1000|Millionaire Women...|    231952| 3.329348313444161|
|    941|Love As A Foreign...|    176261| 4.455392854914019|
|    968|The Da Vinci Code...|    176099| 4.278934008711009|
|   1387|         The Odyssey|    170713| 4.180244035310726|
|    938|How to Make Anyon...|    170678| 4.427834870340641|
|   1473|Medea and Other P...|    168909| 4.133746573598802|
|   1386|Cliffs Notes on H...|    161905|3.8753404774404743|
|    461|The Inner Life of...|    136279|  3.82671578159511|
+-------+--------------------+----------+------------------+



                                                                                

In [47]:
# Check for duplicate book_id and user_id rows. Ensure there is none. 
duplicate_count = new_joined_sdf.groupBy("book_id", "user_id").count().where("count > 1").count()
if duplicate_count > 0:
    print(f"Duplicate rows found. Total duplicates: {duplicate_count}")
else:
    print("No duplicate rows.")



No duplicate rows.


                                                                                

In [30]:
import pandas as pd
from google.cloud import storage

client = storage.Client()

x = 1000000  # Number of reviews to process

file_path = "gs://my-bucket-apb/landing/goodreads_reviews_dedup.json"
print("Reading from", file_path)

# Read JSON file in chunks
chunks = pd.read_json(file_path, orient="records", lines=True, chunksize=x)

# Initialize DataFrame and counter
df = pd.DataFrame()
chunk_count = 0

# Loop through chunks (should be only one chunk with 1 million reviews)
for chunk in chunks:
    df = pd.concat([df, chunk], ignore_index=True)
    chunk_count += 1  

    # Check if the desired number of chunks has been processed (should be only one chunk)
    if chunk_count >= 1:
        break  

# Display the DataFrame
print(df)

Reading from gs://my-bucket-apb/landing/goodreads_reviews_dedup.json
                                 user_id   book_id  \
0       8842281e1d1347389f2ab93d60773d4d  24375664   
1       8842281e1d1347389f2ab93d60773d4d  18245960   
2       8842281e1d1347389f2ab93d60773d4d   6392944   
3       8842281e1d1347389f2ab93d60773d4d  22078596   
4       8842281e1d1347389f2ab93d60773d4d   6644782   
...                                  ...       ...   
999995  f131126e97b09f87010f4d419391ee9f  13331204   
999996  f131126e97b09f87010f4d419391ee9f  15843480   
999997  f131126e97b09f87010f4d419391ee9f  14291982   
999998  f131126e97b09f87010f4d419391ee9f  13093165   
999999  f131126e97b09f87010f4d419391ee9f  12975883   

                               review_id  rating  \
0       5cd416f3efc3f944fce4ce2db2290d5e       5   
1       dfdbb7b0eb5a7e4c26d59a937e2e5feb       5   
2       5e212a62bced17b4dbe41150e5bb9037       3   
3       fdd13cad0695656be99828cd75d6eb73       4   
4       bd0df91c9d918c

In [31]:
sdf_reviews = spark.createDataFrame(df)

sdf_reviews.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- book_id: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- rating: long (nullable = true)
 |-- review_text: string (nullable = true)
 |-- date_added: string (nullable = true)
 |-- date_updated: string (nullable = true)
 |-- read_at: string (nullable = true)
 |-- started_at: string (nullable = true)
 |-- n_votes: long (nullable = true)
 |-- n_comments: long (nullable = true)



In [33]:
# Assuming sdf_reviews is already created

# Drop columns n_votes and n_comments
sdf_reviews = sdf_reviews.drop("n_votes", "n_comments","date_updated")

# Print the updated schema
sdf_reviews.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- book_id: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- rating: long (nullable = true)
 |-- review_text: string (nullable = true)
 |-- date_added: string (nullable = true)
 |-- read_at: string (nullable = true)
 |-- started_at: string (nullable = true)



In [34]:
from pyspark.sql.functions import col

# Convert book_id to integer
sdf_reviews = sdf_reviews.withColumn("book_id", sdf_reviews["book_id"].cast("integer"))

# Convert rating to integer
sdf_reviews = sdf_reviews.withColumn("rating", sdf_reviews["rating"].cast("integer"))

# Print the updated schema to confirm the changes
sdf_reviews.printSchema()


root
 |-- user_id: string (nullable = true)
 |-- book_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- date_added: string (nullable = true)
 |-- read_at: string (nullable = true)
 |-- started_at: string (nullable = true)



In [35]:
from pyspark.sql.functions import col, to_date, date_format

# Define a function to parse and format the date strings
def format_date_string(date_str):
    return date_format(to_date(date_str, "EEE MMM dd HH:mm:ss Z yyyy"), "yy-MM-dd")

# Apply the function using withColumn to create new formatted columns
sdf_reviews_formatted = sdf_reviews.withColumn(
    "date_added_formatted",
    format_date_string("date_added")
).withColumn(
    "read_at_formatted",
    format_date_string("read_at")
).withColumn(
    "started_at_formatted",
    format_date_string("started_at")
)

# Optionally drop the original columns if needed
sdf_reviews_formatted = sdf_reviews_formatted.drop("date_added", "read_at", "started_at")

# Show the DataFrame schema to confirm the changes
sdf_reviews_formatted.printSchema()



root
 |-- user_id: string (nullable = true)
 |-- book_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- date_added_formatted: string (nullable = true)
 |-- read_at_formatted: string (nullable = true)
 |-- started_at_formatted: string (nullable = true)



In [36]:
from pyspark.sql.functions import col

# Drop rows with empty book_id in sdf_reviews
sdf_reviews = sdf_reviews.na.drop(subset=["book_id"])


In [38]:
# Set the legacy time parser policy directly on the existing SparkSession
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [39]:

# Assuming you have two DataFrames: new_joined_sdf and sdf_reviews_formatted

# Perform left outer join
goodreads_combined_sdf = new_joined_sdf.join(
    sdf_reviews_formatted,
    new_joined_sdf.book_id == sdf_reviews_formatted.book_id,
    how='left_outer'
)

# Show the result or perform further operations
goodreads_combined_sdf.show(10)


24/04/21 05:16:43 WARN TaskSetManager: Stage 39 contains a task of very large size (222887 KiB). The maximum recommended task size is 1000 KiB.
24/04/21 05:16:45 WARN YarnAllocator: Container from a bad node: container_1713663873898_0007_01_000001 on host: cluster-e6af-m.us-central1-c.c.advance-avatar-413816.internal. Exit status: 137. Diagnostics: [2024-04-21 05:16:45.429]Container killed on request. Exit code is 137
[2024-04-21 05:16:45.430]Container exited with a non-zero exit code 137. 
[2024-04-21 05:16:45.430]Killed by external signal
.
24/04/21 05:16:45 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 1 for reason Container from a bad node: container_1713663873898_0007_01_000001 on host: cluster-e6af-m.us-central1-c.c.advance-avatar-413816.internal. Exit status: 137. Diagnostics: [2024-04-21 05:16:45.429]Container killed on request. Exit code is 137
[2024-04-21 05:16:45.430]Container exited with a non-zero exit code 137. 
[2024-04-21 05:16:45

+-------+-------+-------+------+-----------+----------+--------------+--------------------+--------------------+---------+--------------------+-------------+------------------+--------------------+----------------+-------+-------+---------+------+-----------+--------------------+-----------------+--------------------+
|book_id|user_id|is_read|rating|is_reviewed|   authors|average_rating|         description|              format|num_pages|     popular_shelves|ratings_count|text_reviews_count|               title|publication_date|user_id|book_id|review_id|rating|review_text|date_added_formatted|read_at_formatted|started_at_formatted|
+-------+-------+-------+------+-----------+----------+--------------+--------------------+--------------------+---------+--------------------+-------------+------------------+--------------------+----------------+-------+-------+---------+------+-----------+--------------------+-----------------+--------------------+
|     65|      0|      0|     0|        

                                                                                

In [41]:
goodreads_combined_sdf.printSchema()

root
 |-- book_id: integer (nullable = true)
 |-- user_id: string (nullable = true)
 |-- is_read: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- is_reviewed: integer (nullable = true)
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- author_id: integer (nullable = true)
 |    |    |-- role: string (nullable = true)
 |-- average_rating: float (nullable = true)
 |-- description: string (nullable = true)
 |-- format: string (nullable = true)
 |-- num_pages: integer (nullable = true)
 |-- popular_shelves: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- count: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- ratings_count: integer (nullable = true)
 |-- text_reviews_count: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- publication_date: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- book_id: integer (nullable 

In [42]:
# Perform inner join
goodreads_combined2_sdf = new_joined_sdf.join(
    sdf_reviews_formatted,
    new_joined_sdf.book_id == sdf_reviews_formatted.book_id,
    how='inner'
)

# Show the result or perform further operations
goodreads_combined2_sdf.show(8)


24/04/21 05:25:10 WARN TaskSetManager: Stage 46 contains a task of very large size (222887 KiB). The maximum recommended task size is 1000 KiB.
24/04/21 05:25:11 WARN YarnAllocator: Container from a bad node: container_1713663873898_0007_01_000002 on host: cluster-e6af-m.us-central1-c.c.advance-avatar-413816.internal. Exit status: 143. Diagnostics: [2024-04-21 05:25:11.176]Container killed on request. Exit code is 143
[2024-04-21 05:25:11.176]Container exited with a non-zero exit code 143. 
[2024-04-21 05:25:11.177]Killed by external signal
.
24/04/21 05:25:11 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 2 for reason Container from a bad node: container_1713663873898_0007_01_000002 on host: cluster-e6af-m.us-central1-c.c.advance-avatar-413816.internal. Exit status: 143. Diagnostics: [2024-04-21 05:25:11.176]Container killed on request. Exit code is 143
[2024-04-21 05:25:11.176]Container exited with a non-zero exit code 143. 
[2024-04-21 05:25:11

+-------+-------+-------+------+-----------+----------+--------------+--------------------+---------+---------+--------------------+-------------+------------------+--------------------+----------------+--------------------+-------+--------------------+------+--------------------+--------------------+-----------------+--------------------+
|book_id|user_id|is_read|rating|is_reviewed|   authors|average_rating|         description|   format|num_pages|     popular_shelves|ratings_count|text_reviews_count|               title|publication_date|             user_id|book_id|           review_id|rating|         review_text|date_added_formatted|read_at_formatted|started_at_formatted|
+-------+-------+-------+------+-----------+----------+--------------+--------------------+---------+---------+--------------------+-------------+------------------+--------------------+----------------+--------------------+-------+--------------------+------+--------------------+--------------------+--------------

                                                                                

In [43]:
goodreads_combined2_sdf.printSchema()

root
 |-- book_id: integer (nullable = true)
 |-- user_id: string (nullable = true)
 |-- is_read: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- is_reviewed: integer (nullable = true)
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- author_id: integer (nullable = true)
 |    |    |-- role: string (nullable = true)
 |-- average_rating: float (nullable = true)
 |-- description: string (nullable = true)
 |-- format: string (nullable = true)
 |-- num_pages: integer (nullable = true)
 |-- popular_shelves: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- count: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- ratings_count: integer (nullable = true)
 |-- text_reviews_count: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- publication_date: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- book_id: integer (nullable 

In [44]:
from pyspark.sql.functions import col

# Perform inner join with column aliasing
goodreads_combined3_sdf = new_joined_sdf.alias("new_joined").join(
    sdf_reviews_formatted.alias("sdf_reviews"),
    col("new_joined.book_id") == col("sdf_reviews.book_id"),
    how='inner'
)

# Show the result or perform further operations
goodreads_combined3_sdf.show(5)


24/04/21 05:33:33 WARN TaskSetManager: Stage 53 contains a task of very large size (222887 KiB). The maximum recommended task size is 1000 KiB.
24/04/21 05:33:34 WARN YarnAllocator: Container from a bad node: container_1713663873898_0007_01_000004 on host: cluster-e6af-m.us-central1-c.c.advance-avatar-413816.internal. Exit status: 137. Diagnostics: [2024-04-21 05:33:34.308]Container killed on request. Exit code is 137
[2024-04-21 05:33:34.309]Container exited with a non-zero exit code 137. 
[2024-04-21 05:33:34.309]Killed by external signal
.
24/04/21 05:33:34 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 4 for reason Container from a bad node: container_1713663873898_0007_01_000004 on host: cluster-e6af-m.us-central1-c.c.advance-avatar-413816.internal. Exit status: 137. Diagnostics: [2024-04-21 05:33:34.308]Container killed on request. Exit code is 137
[2024-04-21 05:33:34.309]Container exited with a non-zero exit code 137. 
[2024-04-21 05:33:34

+-------+-------+-------+------+-----------+----------+--------------+--------------------+---------+---------+--------------------+-------------+------------------+--------------------+----------------+--------------------+-------+--------------------+------+--------------------+--------------------+-----------------+--------------------+
|book_id|user_id|is_read|rating|is_reviewed|   authors|average_rating|         description|   format|num_pages|     popular_shelves|ratings_count|text_reviews_count|               title|publication_date|             user_id|book_id|           review_id|rating|         review_text|date_added_formatted|read_at_formatted|started_at_formatted|
+-------+-------+-------+------+-----------+----------+--------------+--------------------+---------+---------+--------------------+-------------+------------------+--------------------+----------------+--------------------+-------+--------------------+------+--------------------+--------------------+--------------

                                                                                

In [45]:
goodreads_combined3_sdf.printSchema()

root
 |-- book_id: integer (nullable = true)
 |-- user_id: string (nullable = true)
 |-- is_read: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- is_reviewed: integer (nullable = true)
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- author_id: integer (nullable = true)
 |    |    |-- role: string (nullable = true)
 |-- average_rating: float (nullable = true)
 |-- description: string (nullable = true)
 |-- format: string (nullable = true)
 |-- num_pages: integer (nullable = true)
 |-- popular_shelves: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- count: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- ratings_count: integer (nullable = true)
 |-- text_reviews_count: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- publication_date: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- book_id: integer (nullable 

In [46]:
from pyspark.sql.functions import col

# Perform inner join with column aliasing and select desired columns
goodreads_combined4_sdf = new_joined_sdf.alias("new_joined").join(
    sdf_reviews_formatted.alias("sdf_reviews"),
    col("new_joined.book_id") == col("sdf_reviews.book_id"),
    how='inner'
).select(
    col("new_joined.book_id").alias("book_id"),
    col("new_joined.user_id").alias("user_id"),
    col("new_joined.is_read"),
    col("new_joined.rating"),
    col("new_joined.is_reviewed"),
    col("new_joined.authors"),
    col("new_joined.average_rating"),
    col("new_joined.description"),
    col("new_joined.format"),
    col("new_joined.num_pages"),
    col("new_joined.popular_shelves"),
    col("new_joined.ratings_count"),
    col("new_joined.text_reviews_count"),
    col("new_joined.title"),
    col("new_joined.publication_date"),
    col("sdf_reviews.user_id").alias("review_user_id"),
    col("sdf_reviews.review_id"),
    col("sdf_reviews.rating").alias("review_rating"),
    col("sdf_reviews.review_text"),
    col("sdf_reviews.date_added_formatted"),
    col("sdf_reviews.read_at_formatted"),
    col("sdf_reviews.started_at_formatted")
)

# Show the result or perform further operations
goodreads_combined4_sdf.show(5)


24/04/21 05:40:34 WARN TaskSetManager: Stage 60 contains a task of very large size (222887 KiB). The maximum recommended task size is 1000 KiB.
24/04/21 05:40:35 WARN YarnAllocator: Container from a bad node: container_1713663873898_0007_01_000005 on host: cluster-e6af-m.us-central1-c.c.advance-avatar-413816.internal. Exit status: 137. Diagnostics: [2024-04-21 05:40:35.862]Container killed on request. Exit code is 137
[2024-04-21 05:40:35.862]Container exited with a non-zero exit code 137. 
[2024-04-21 05:40:35.862]Killed by external signal
.
24/04/21 05:40:35 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 5 for reason Container from a bad node: container_1713663873898_0007_01_000005 on host: cluster-e6af-m.us-central1-c.c.advance-avatar-413816.internal. Exit status: 137. Diagnostics: [2024-04-21 05:40:35.862]Container killed on request. Exit code is 137
[2024-04-21 05:40:35.862]Container exited with a non-zero exit code 137. 
[2024-04-21 05:40:35

[Stage 64:>                                                         (0 + 1) / 1]

+-------+-------+-------+------+-----------+----------+--------------+--------------------+---------+---------+--------------------+-------------+------------------+--------------------+----------------+--------------------+--------------------+-------------+--------------------+--------------------+-----------------+--------------------+
|book_id|user_id|is_read|rating|is_reviewed|   authors|average_rating|         description|   format|num_pages|     popular_shelves|ratings_count|text_reviews_count|               title|publication_date|      review_user_id|           review_id|review_rating|         review_text|date_added_formatted|read_at_formatted|started_at_formatted|
+-------+-------+-------+------+-----------+----------+--------------+--------------------+---------+---------+--------------------+-------------+------------------+--------------------+----------------+--------------------+--------------------+-------------+--------------------+--------------------+-----------------

                                                                                

In [47]:
goodreads_combined4_sdf.printSchema()

root
 |-- book_id: integer (nullable = true)
 |-- user_id: string (nullable = true)
 |-- is_read: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- is_reviewed: integer (nullable = true)
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- author_id: integer (nullable = true)
 |    |    |-- role: string (nullable = true)
 |-- average_rating: float (nullable = true)
 |-- description: string (nullable = true)
 |-- format: string (nullable = true)
 |-- num_pages: integer (nullable = true)
 |-- popular_shelves: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- count: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- ratings_count: integer (nullable = true)
 |-- text_reviews_count: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- publication_date: string (nullable = true)
 |-- review_user_id: string (nullable = true)
 |-- review_id: string (n

In [48]:
# Save the cleaned DataFrame as a Parquet file to /cleaned2
output_path = "gs://my-bucket-apb/cleaned2"

new_joined_sdf.write.mode("overwrite").parquet(output_path)

24/04/21 05:55:31 WARN YarnAllocator: Container from a bad node: container_1713663873898_0007_01_000008 on host: cluster-e6af-m.us-central1-c.c.advance-avatar-413816.internal. Exit status: 137. Diagnostics: [2024-04-21 05:55:31.188]Container killed on request. Exit code is 137
[2024-04-21 05:55:31.188]Container exited with a non-zero exit code 137. 
[2024-04-21 05:55:31.189]Killed by external signal
.
24/04/21 05:55:31 ERROR YarnScheduler: Lost executor 8 on cluster-e6af-m.us-central1-c.c.advance-avatar-413816.internal: Container from a bad node: container_1713663873898_0007_01_000008 on host: cluster-e6af-m.us-central1-c.c.advance-avatar-413816.internal. Exit status: 137. Diagnostics: [2024-04-21 05:55:31.188]Container killed on request. Exit code is 137
[2024-04-21 05:55:31.188]Container exited with a non-zero exit code 137. 
[2024-04-21 05:55:31.189]Killed by external signal
.
24/04/21 05:55:31 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 8 f

24/04/21 05:58:33 WARN YarnAllocator: Container from a bad node: container_1713663873898_0007_01_000011 on host: cluster-e6af-m.us-central1-c.c.advance-avatar-413816.internal. Exit status: 137. Diagnostics: [2024-04-21 05:58:33.600]Container killed on request. Exit code is 137
[2024-04-21 05:58:33.600]Container exited with a non-zero exit code 137. 
[2024-04-21 05:58:33.601]Killed by external signal
.
24/04/21 05:58:33 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 11 for reason Container from a bad node: container_1713663873898_0007_01_000011 on host: cluster-e6af-m.us-central1-c.c.advance-avatar-413816.internal. Exit status: 137. Diagnostics: [2024-04-21 05:58:33.600]Container killed on request. Exit code is 137
[2024-04-21 05:58:33.600]Container exited with a non-zero exit code 137. 
[2024-04-21 05:58:33.601]Killed by external signal
.
24/04/21 05:58:33 ERROR YarnScheduler: Lost executor 11 on cluster-e6af-m.us-central1-c.c.advance-avatar-41381