# # Connect to ADLS Gen2 (Storage Account)

In [0]:
spark.conf.set(
    "fs.azure.account.key.goodreadsreviewsgen2.dfs.core.windows.net",
    "WnTNNhXumIt06S3pwQEB8Vzkqh9D/VRtLq7ULy2OFppgW0zCb4QVwU1tthtk2Pz4HTxUNmHCXgC9+AStYAOSmQ=="
)


# Load Silver Parquet (Books & Authors)

In [0]:
# Load the books dataset from the silver layer
books = spark.read.parquet("abfss://lakehouse@goodreadsreviewsgen2.dfs.core.windows.net/processed/books/")
authors = spark.read.parquet("abfss://lakehouse@goodreadsreviewsgen2.dfs.core.windows.net/processed/authors/")

# Display first 5 rows of each dataset
books.show(5)
authors.show(5)

# Print the schema to confirm structure
books.printSchema()
authors.printSchema()


+----------+------------------+------------+-------------+----------+--------+--------------+-----------+--------------------+---------+--------------------+---------+--------------------+---------+---------------+-------------+-----------------+-------------------+----------------+--------------------+--------------------+-------+-------------+-------+--------------------+--------------------+
|      isbn|text_reviews_count|country_code|language_code|      asin|is_ebook|average_rating|kindle_asin|         description|   format|                link|author_id|           publisher|num_pages|publication_day|       isbn13|publication_month|edition_information|publication_year|                 url|           image_url|book_id|ratings_count|work_id|               title|title_without_series|
+----------+------------------+------------+-------------+----------+--------+--------------+-----------+--------------------+---------+--------------------+---------+--------------------+---------+------

In [0]:
from pyspark.sql.functions import col, length, trim, count, when

# Read raw (uncleaned) reviews from the silver layer
reviews = spark.read.parquet("abfss://lakehouse@goodreadsreviewsgen2.dfs.core.windows.net/processed/reviews/")

# Peek at rows and schema
reviews.show(5, truncate=False)
reviews.printSchema()

# Basic profiling: count total and potential issues
total_rows = reviews.count()
null_review_id = reviews.filter(col("review_id").isNull()).count()
null_book_id = reviews.filter(col("book_id").isNull()).count()
null_user_id = reviews.filter(col("user_id").isNull()).count()
null_rating = reviews.filter(col("rating").isNull()).count()
empty_text = reviews.filter((col("review_text").isNull()) | (trim(col("review_text")) == "")).count()

print(f"Total rows: {total_rows}")
print(f"Null review_id: {null_review_id}, Null book_id: {null_book_id}, Null user_id: {null_user_id}, Null rating: {null_rating}")
print(f"Empty/Null review_text: {empty_text}")


+--------------------------------+--------+--------------------------------+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
from pyspark.sql.functions import col, trim, length 
 
# Start from the existing Parquet-loaded DataFrame 
# (Assumes you already did: reviews = spark.read.parquet(".../processed/reviews/")) 
df = reviews 
 
# 1) Drop rows missing critical keys 
df = df.filter( 
    col("review_id").isNotNull() & 
    col("book_id").isNotNull() & 
    col("user_id").isNotNull() 
) 
 
# 2) Enforce rating to be integer in [1..5] 
df = df.withColumn("rating_int", col("rating").cast("int")) 
df = df.filter( 
    col("rating_int").isNotNull() & 
    (col("rating_int") >= 1) & 
    (col("rating_int") <= 5) 
) 
 
# 3) Normalize text; drop empty or ultra-short reviews (<10 chars after trim) 
df = df.withColumn("review_text", trim(col("review_text"))) 
df = df.filter( 
    col("review_text").isNotNull() & 
    (length(col("review_text")) >= 10) 
) 
 
# 4) De-duplicate by review_id (keep arbitrary first; refine if you have timestamps) 
df = df.dropDuplicates(["review_id"]) 
 
# 5) Select final shape 
reviews_clean = df.select( 
    "review_id", 
    "book_id", 
    "user_id", 
    col("rating_int").alias("rating"), 
    "review_text", 
    "date_updated"

) 

In [0]:
# Write the cleaned reviews back to the silver layer (overwrite mode)
reviews_clean.write.mode("overwrite").parquet(
    "abfss://lakehouse@goodreadsreviewsgen2.dfs.core.windows.net/processed/reviews/"
)

# Sanity check: read from disk and inspect schema + few rows
reviews_verified = spark.read.parquet(
    "abfss://lakehouse@goodreadsreviewsgen2.dfs.core.windows.net/processed/reviews/"
)

reviews_verified.printSchema()
reviews_verified.show(5, truncate=False)

print("Verified cleaned rows:", reviews_verified.count())


root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- date_updated: string (nullable = true)

+--------------------------------+--------+--------------------------------+------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

# Homework 1

In [0]:
# Read from your processed (silver) zone
books = spark.read.parquet("abfss://lakehouse@goodreadsreviewsgen2.dfs.core.windows.net/processed/books/")
authors = spark.read.parquet("abfss://lakehouse@goodreadsreviewsgen2.dfs.core.windows.net/processed/authors/")
reviews_clean = spark.read.parquet("abfss://lakehouse@goodreadsreviewsgen2.dfs.core.windows.net/processed/reviews/")


In [0]:
# Create the bridge table (book_authors)
book_authors = books.select("book_id", "author_id").distinct()

# Show schema and sample
book_authors.printSchema()
book_authors.show(5, truncate=False)


root
 |-- book_id: string (nullable = true)
 |-- author_id: string (nullable = true)

+--------+---------+
|book_id |author_id|
+--------+---------+
|11964338|5019771  |
|884322  |211728   |
|19539455|90825    |
|611637  |696805   |
|30759346|5772357  |
+--------+---------+
only showing top 5 rows


In [0]:
reviews_clean.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- date_updated: string (nullable = true)



In [0]:
from pyspark.sql import functions as F

curated_gold_table = reviews_clean \
    .join(books, reviews_clean.book_id == books.book_id, "inner") \
    .join(authors, books.author_id == authors.author_id, "inner") \
    .select(
        reviews_clean.review_id,
        reviews_clean.book_id,
        books.title,
        authors.author_id,
        authors.name.alias("author_name"),
        reviews_clean.user_id,
        reviews_clean.rating,
        reviews_clean.review_text,
        books.language_code,
        books.ratings_count.alias("n_votes"),
        reviews_clean.date_updated 
    )

# Check schema and preview
curated_gold_table.printSchema()
curated_gold_table.show(5, truncate=False)



root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- author_name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- n_votes: string (nullable = true)
 |-- date_updated: string (nullable = true)

+--------------------------------+--------+-------------------------------------------------------+---------+---------------------+--------------------------------+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
# Write curated gold table to the Gold zone as Delta
curated_gold_table.write.format("delta").mode("overwrite").save(
    "abfss://lakehouse@goodreadsreviewsgen2.dfs.core.windows.net/gold/curated_reviews/"
)

# Register directly in Spark (no external location required)
spark.sql("DROP TABLE IF EXISTS curated_reviews")

spark.sql("""
CREATE TABLE curated_reviews
USING DELTA
AS SELECT * FROM delta.`abfss://lakehouse@goodreadsreviewsgen2.dfs.core.windows.net/gold/curated_reviews/`
""")

# Confirm registration and preview
spark.sql("SHOW TABLES").show(truncate=False)

#  Verify the table content
spark.sql("""
SELECT author_name, title, rating, language_code
FROM curated_reviews
ORDER BY rating DESC
LIMIT 5
""").show(truncate=False)


+--------+---------------+-----------+
|database|tableName      |isTemporary|
+--------+---------------+-----------+
|default |curated_reviews|false      |
+--------+---------------+-----------+

+-------------+------------------------------------------+------+-------------+
|author_name  |title                                     |rating|language_code|
+-------------+------------------------------------------+------+-------------+
|Avelyn Paige |Angels and Ashes (Heaven's Rejects MC, #2)|5     |en-US        |
|Shannon Mayer|Rylee (The Rylee Adamson Epilogues, #1)   |5     |eng          |
|Toni Aleo    |Taking Shots (Assassins, #1)              |5     |eng          |
|Damon Suede  |Hot Head (Head #1)                        |5     |             |
|Whitney G.   |Turbulence (Turbulence, #1)               |5     |             |
+-------------+------------------------------------------+------+-------------+

