In [0]:
# Access key replaced with placeholder inorder to publish this notebook into github
spark.conf.set(
"fs.azure.account.key.goodreadsreviews60104758.dfs.core.windows.net",
"<access key>"
)

In [0]:
# Path to your curated Gold dataset
gold_path = "abfss://lakehouse@goodreadsreviews60104758.dfs.core.windows.net/lakehouse/gold/curated_reviews"

# Load the Delta table into a Spark DataFrame
df = spark.read.table("gold.curated_reviews")

# Preview the data
df.show(5)

+--------------------+--------+--------------------+---------+---------------+--------------------+------+--------------------+-------------+-------+--------------------+
|           review_id| book_id|               title|author_id|           name|             user_id|rating|         review_text|language_code|n_votes|          date_added|
+--------------------+--------+--------------------+---------+---------------+--------------------+------+--------------------+-------------+-------+--------------------+
|3818908d5a98733ba...|17282103|    هكذا تكلم زرادشت|   196327|     Jody Rosen|b72b9ef6b43e415ee...|     3|hkdh tklm zrdsht ...|          ara|      3|Sat Feb 09 11:01:...|
|a04023de2a39a2cab...|   35350|What the Body Rem...|    35285|Jimmy McDonough|b9c1edf6bcc9b1819...|     3|Read this for a c...|          eng|      0|Mon Sep 23 10:23:...|
|092beaa2de4ea5a77...|   92146|Carved in Bone (B...|    88874|    Cris Freddi|6e1d7ac0a6e738b23...|     5|It is two years s...|        en-US|    

In [0]:
#checking the data types
df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- n_votes: long (nullable = true)
 |-- date_added: string (nullable = true)



In [0]:
#fixing the data types
from pyspark.sql.types import IntegerType

df = df.withColumn("rating", df["rating"].cast(IntegerType()))
df = df.withColumn("n_votes", df["n_votes"].cast(IntegerType()))


In [0]:
df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- n_votes: integer (nullable = true)
 |-- date_added: string (nullable = true)



In [0]:
# Show a sample of the date_added column
df.select("date_added").show(5, truncate=False)


+------------------------------+
|date_added                    |
+------------------------------+
|Sat Feb 09 11:01:35 -0800 2013|
|Mon Sep 23 10:23:54 -0700 2013|
|Mon Nov 10 11:54:40 -0800 2008|
|Tue Dec 30 20:03:15 -0800 2008|
|Tue Sep 24 06:45:34 -0700 2013|
+------------------------------+
only showing top 5 rows


In [0]:
# covnert date_added to iso format and name it date_added_iso and then change its data type to date
from pyspark.sql.functions import regexp_extract, concat_ws, col

# Extract year, month, day using regex
df = df.withColumn("year", regexp_extract(col("date_added"), r"\d{4}$", 0)) \
       .withColumn("month_str", regexp_extract(col("date_added"), r"\b(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\b", 0)) \
       .withColumn("day", regexp_extract(col("date_added"), r"\b\d{2}\b", 0))

# Map month names to numbers
month_dict = {
    "Jan":"01","Feb":"02","Mar":"03","Apr":"04","May":"05","Jun":"06",
    "Jul":"07","Aug":"08","Sep":"09","Oct":"10","Nov":"11","Dec":"12"
}

In [0]:
from pyspark.sql.functions import create_map, lit
from itertools import chain

mapping_expr = create_map([lit(x) for x in chain(*month_dict.items())])
df = df.withColumn("month", mapping_expr[col("month_str")])

# Combine into yyyy-mm-dd and cast to date
df = df.withColumn("date_added_iso", concat_ws("-", col("year"), col("month"), col("day")).cast("date"))

# Drop intermediate columns if you want
df = df.drop("year", "month_str", "day", "month")

# Show result
df.select("date_added", "date_added_iso").show(5, truncate=False)
df.printSchema()

+------------------------------+--------------+
|date_added                    |date_added_iso|
+------------------------------+--------------+
|Sat Feb 09 11:01:35 -0800 2013|2013-02-09    |
|Mon Sep 23 10:23:54 -0700 2013|2013-09-23    |
|Mon Nov 10 11:54:40 -0800 2008|2008-11-10    |
|Tue Dec 30 20:03:15 -0800 2008|2008-12-30    |
|Tue Sep 24 06:45:34 -0700 2013|2013-09-24    |
+------------------------------+--------------+
only showing top 5 rows
root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- n_votes: integer (nullable = true)
 |-- date_added: string (nullable = true)
 |-- date_added_iso: date (nullable = true)



In [0]:
before_count = df.count()
print(f"BEFORE cleaning: {before_count} rows")

BEFORE cleaning: 788770 rows


In [0]:
from pyspark.sql.functions import col, trim
# Remove nulls
df = df.dropna(subset=["rating", "book_id", "review_text", "author_id"])
after_null = df.count()
print(f"AFTER removing NULLs: {after_null} rows")

# Remove EMPTY STRINGS
df = df.filter(
    (col("rating").isNotNull()) &
    (trim(col("book_id")) != "") &
    (trim(col("review_text")) != "") &
    (trim(col("author_id")) != "")
)
after_empty = df.count()
print(f"AFTER removing empty strings: {after_empty} rows")


AFTER removing NULLs: 788770 rows
AFTER removing empty strings: 788770 rows


In [0]:
# Removing duplicates
df = df.dropDuplicates(["review_id"])
after_review_id = df.count()
print(f"AFTER removing review_id duplicates: {after_review_id} rows")

df = df.dropDuplicates(["user_id", "book_id"])
after_user_book = df.count()
print(f"AFTER removing (user_id, book_id) duplicates: {after_user_book} rows")


AFTER removing review_id duplicates: 788770 rows
AFTER removing (user_id, book_id) duplicates: 788770 rows


In [0]:
# Trim text fields
df = df.withColumn("title", trim(col("title"))) \
       .withColumn("name", trim(col("name"))) \
       .withColumn("review_text", trim(col("review_text")))

In [0]:
from pyspark.sql.functions import col, lower, regexp_replace
# Normalize review_text

df = df.withColumn("review_text", lower(col("review_text"))) \
       .withColumn("review_text", regexp_replace(col("review_text"), "[^\\x20-\\x7E]", ""))

# Preview result
df.select("review_text").show(5, truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
from pyspark.sql.functions import initcap, col

# Capitalize each word in title and name
df = df.withColumn("title", initcap(col("title"))) \
       .withColumn("name", initcap(col("name")))

# Preview result
df.select("title", "name").show(5, truncate=False)


+---------------------------------------------+----------------------+
|title                                        |name                  |
+---------------------------------------------+----------------------+
|To Kill A Mockingbird                        |Lito Fernandez        |
|Snow Crash                                   |Deborah Woodward      |
|Lucy                                         |Jeff Slayton          |
|Twelve Years A Slave                         |Susan Schindehette    |
|The Way Of Kings (the Stormlight Archive, #1)|Matthew Ryan Defibaugh|
+---------------------------------------------+----------------------+
only showing top 5 rows


In [0]:
# Create review lenght column to count number of characters in review
from pyspark.sql.functions import length, col
df = df.withColumn("review_length", length(col("review_text")))

df.select("review_length").show(5, truncate=False)

+-------------+
|review_length|
+-------------+
|586          |
|2053         |
|91           |
|485          |
|37           |
+-------------+
only showing top 5 rows


In [0]:
before_count = df.count()
df = df.filter(col("review_length") >= 10)
after_count = df.count()
print(f"Number of rows after filtering: {after_count}")

Number of rows after filtering: 788769


In [0]:
from pyspark.sql.functions import lit, current_date
# Remove rows with invalid or future dates
df = df.filter(
    (col("date_added_iso").isNotNull()) &
    (col("date_added_iso") <= current_date())
)

In [0]:
from pyspark.sql.functions import when
# Replacing missing n_votes with value 0
df = df.fillna({"n_votes": 0})

# Replace null or empty language_code with "Unknown"
df = df.withColumn(
    "language_code",
    when((col("language_code").isNull()) | (col("language_code") == ""), "Unknown")
    .otherwise(col("language_code"))
)

In [0]:
# Loop through all columns and count nulls and Verify that all columns contain valid values
for c in df.columns:
    null_count = df.filter(col(c).isNull()).count()
    print(f"Column '{c}' has {null_count} null values")

Column 'review_id' has 0 null values
Column 'book_id' has 0 null values
Column 'title' has 0 null values
Column 'author_id' has 0 null values
Column 'name' has 0 null values
Column 'user_id' has 0 null values
Column 'rating' has 0 null values
Column 'review_text' has 0 null values
Column 'language_code' has 0 null values
Column 'n_votes' has 0 null values
Column 'date_added' has 0 null values
Column 'date_added_iso' has 0 null values
Column 'review_length' has 0 null values


In [0]:
text_cols = ["title", "name", "review_text", "language_code"]
for c in text_cols:
    empty_count = df.filter(col(c) == "").count()
    print(f"Column '{c}' has {empty_count} empty strings")

Column 'title' has 0 empty strings
Column 'name' has 0 empty strings
Column 'review_text' has 0 empty strings
Column 'language_code' has 0 empty strings


In [0]:
# Show schema to verify types and check column names
df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- n_votes: integer (nullable = false)
 |-- date_added: string (nullable = true)
 |-- date_added_iso: date (nullable = true)
 |-- review_length: integer (nullable = true)



In [0]:
# Drop the unnecessary columns
df = df.drop("date_added", "review_length")

# Check remaining columns
df.columns

['review_id',
 'book_id',
 'title',
 'author_id',
 'name',
 'user_id',
 'rating',
 'review_text',
 'language_code',
 'n_votes',
 'date_added_iso']

In [0]:
#Verify that all numeric columns contain valid values and within expected ranges
# Check RATING (1-5)
invalid_rating = df.filter((col("rating") < 1) | (col("rating") > 5)).count()
print(f"Invalid ratings: {invalid_rating}")

# ============================================================
# N_VOTES: Can be negative (downvotes/dislikes)
# Negative votes might represent downvotes or "not helpful" votes
# ============================================================
invalid_votes = df.filter(col("n_votes") < 0).count()
print(f"Negative votes: {invalid_votes}")

Invalid ratings: 0
Negative votes: 57


In [0]:
# Add Aggregate columns
# Compute review length in words
from pyspark.sql.functions import length, split, size, col
df = df.withColumn('review_word_count', size(split(col('review_text'), ' ')))
df.select('review_text', 'review_word_count').show(5, truncate=True)

+--------------------+-----------------+
|         review_text|review_word_count|
+--------------------+-----------------+
|more of an academ...|               10|
|          230 - 2015|                3|
|i liked this. it ...|               14|
|intrigado com ess...|              140|
|what's to say? an...|               38|
+--------------------+-----------------+
only showing top 5 rows


In [0]:
from pyspark.sql.functions import avg, count, round
# Group by book_id and calculate aggregates
book_features = (
    df.groupBy("book_id")
      .agg(
          round(avg("rating"), 2).alias("avg_book_rating"),
          count("review_id").alias("num_reviews_per_book")
      )
)
# Show first few rows
book_features.show(5)

+--------+---------------+--------------------+
| book_id|avg_book_rating|num_reviews_per_book|
+--------+---------------+--------------------+
|    6194|            4.0|                 131|
|15772440|            4.0|                   1|
|  133241|           4.26|                 123|
|  169338|           4.57|                  28|
|17160066|           3.83|                  23|
+--------+---------------+--------------------+
only showing top 5 rows


In [0]:
# Average rating per Author
author_avg = (
    df.groupBy("name")
      .agg(
          round(avg("rating"), 2).alias("author_avg_rating")
      )
)
author_avg.show(5)


+--------------------+-----------------+
|                name|author_avg_rating|
+--------------------+-----------------+
|         Iola Fuller|             3.76|
|Richard Roger Van...|             4.06|
|    Robert R. Mccrae|             3.69|
|      Robert Hampson|             3.15|
|     Edwin M. Curley|              4.0|
+--------------------+-----------------+
only showing top 5 rows


In [0]:
from pyspark.sql.functions import avg, round, min as spark_min, max as spark_max, col, size, split

word_stats_per_book = df.groupBy('book_id').agg(
    spark_min('review_word_count').alias('min_words_per_book'),
    spark_max('review_word_count').alias('max_words_per_book'),
    round(avg('review_word_count'), 2).alias('avg_words_per_book')
)

word_stats_per_book.show(5)

+--------+------------------+------------------+------------------+
| book_id|min_words_per_book|max_words_per_book|avg_words_per_book|
+--------+------------------+------------------+------------------+
|    6194|                 2|              1231|            150.32|
|15772440|                46|                46|              46.0|
|  133241|                 2|               841|              88.0|
|  169338|                 4|               704|            142.64|
|17160066|                 9|              1089|            152.57|
+--------+------------------+------------------+------------------+
only showing top 5 rows


In [0]:
# Join book-level features (avg rating & num reviews)
df = df.join(book_features, on='book_id', how='left')

# Join author-level average rating
df = df.join(author_avg, on='name', how='left')

# Join word count statistics per book
df = df.join(word_stats_per_book, on='book_id', how='left')


In [0]:
# drop helper column
df = df.drop('review_word_count')


In [0]:
df = df.withColumn("num_reviews_per_book", df["num_reviews_per_book"].cast(IntegerType()))

In [0]:
print(f"Final columns: {len(df.columns)}")
df.printSchema()

Final columns: 17
root
 |-- book_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- n_votes: integer (nullable = false)
 |-- date_added_iso: date (nullable = true)
 |-- avg_book_rating: double (nullable = true)
 |-- num_reviews_per_book: integer (nullable = true)
 |-- author_avg_rating: double (nullable = true)
 |-- min_words_per_book: integer (nullable = true)
 |-- max_words_per_book: integer (nullable = true)
 |-- avg_words_per_book: double (nullable = true)



In [0]:
# Define the Gold layer path where the Delta table will be stored
gold_path_new  = "abfss://lakehouse@goodreadsreviews60104758.dfs.core.windows.net/lakehouse/gold/features_v1"


In [0]:
# Save the DataFrame as a Delta table in the Gold path
# mode='overwrite' ensures that if the table already exists, it will be replaced
df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(gold_path_new)

In [0]:
# Register the Delta table in the Databricks metastore for SQL queries
spark.sql(f"""
CREATE TABLE IF NOT EXISTS gold.features_v1
USING DELTA
LOCATION '{gold_path_new}'
""")

# Show a few rows to verify the saved dataset
df.show(5)

+--------+--------------------+--------------------+--------------------+---------+--------------------+------+--------------------+-------------+-------+--------------+---------------+--------------------+-----------------+------------------+------------------+------------------+
| book_id|                name|           review_id|               title|author_id|             user_id|rating|         review_text|language_code|n_votes|date_added_iso|avg_book_rating|num_reviews_per_book|author_avg_rating|min_words_per_book|max_words_per_book|avg_words_per_book|
+--------+--------------------+--------------------+--------------------+---------+--------------------+------+--------------------+-------------+-------+--------------+---------------+--------------------+-----------------+------------------+------------------+------------------+
|   12067|Catherine Van Moppes|20950466ccbdaa041...|Good Omens: The N...|  4110990|118bfe1cc97da3ea6...|     5|i've already read...|          eng|      0|