In [0]:
"""
Configure Spark Session for Azure Access and Datetime Compatibility

This cell defines two Spark configuration settings required for data access 
and compatibility across Spark versions.

1. Azure Data Lake Access:
   - The first configuration registers the access key for the Azure Data Lake 
     Storage Gen2 account associated with the Goodreads project.
   - It enables authenticated read and write operations through the ABFSS protocol.

2. Legacy Datetime Parser Policy:
   - The second setting restores the legacy time-parsing behavior introduced before 
     Spark 3.0. This ensures consistent parsing of datetime strings that use 
     non-standard or locale-specific formats (e.g., those found in the Goodreads data).

Note:
For production or collaborative use, credentials should be stored securely in 
Databricks Secrets or Azure Key Vault rather than embedded directly in code.
"""

spark.conf.set(
    "fs.azure.account.key.goodreadsreviews60302363.dfs.core.windows.net",
    "8aeNipwlgfgeg1YnUzDh8PeVxg0I5MmnwgWEORAqG5WIJ4Q/XsFa5m714y55ZfAzUw3nNaEFM/e8+AStXU0APQ=="
)

spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")


In [0]:
"""
Load Curated Gold Dataset for Verification

This cell reads the 'curated_reviews' Delta table from the Gold layer of 
the Azure Data Lake into a Spark DataFrame named 'silver'.

Purpose:
    - Validate that the previously written Delta dataset can be accessed successfully.
    - Display a sample of records to confirm the integrity and readability of the data.

This verification step ensures the curated dataset is correctly stored and ready 
for subsequent cleaning, transformation, or feature engineering tasks.
"""

# Read the Delta table
silver = spark.read.format("delta").load("abfss://lakehouse@goodreadsreviews60302363.dfs.core.windows.net/gold/curated_reviews/")

# Show few rows
silver.show(5)

+--------------------+--------+--------------------+---------+----------------+--------------------+------+--------------------+--------+-------+--------------------+
|           review_id| book_id|               title|author_id|            name|             user_id|rating|         review_text|language|n_votes|          date_added|
+--------------------+--------+--------------------+---------+----------------+--------------------+------+--------------------+--------+-------+--------------------+
|1b5575e93fafb5a6f...|30739547|Preppy: The Life ...|  5769238|    T.M. Frazier|68f9915717ccc347b...|     5|4 . 5 Bow tie wea...|   en-US| 115329|Sat Jul 30 14:26:...|
|c8fb6d2d662d6d498...|13539044|The Silver Lining...|  1251730|   Matthew Quick|b48aade607aa17b3d...|     5|The book was very...|     eng| 201333|Thu Mar 28 07:45:...|
|7f7c14c976eb3d645...|29074768|        If I Fix You| 14981314|Abigail  Johnson|d76881f6f75216d6f...|     4|Ah, If I Fix You....|     eng|   1192|Sat Jun 11 20:02:...

In [0]:
"""
Standardize Column Data Types in Curated Dataset

This cell enforces consistent data types for all columns in the 'silver' DataFrame
to ensure compatibility and accuracy during analytical and transformation operations.

Purpose:
    - Convert key identifier fields (review_id, book_id, author_id, user_id) to StringType.
    - Cast numeric attributes such as 'rating' and 'n_votes' to IntegerType.
    - Ensure textual fields (title, name, review_text, language) are explicitly defined as StringType.

This step guarantees schema consistency across all data layers and minimizes 
type-related errors in downstream processing or aggregations.
"""

from pyspark.sql.types import StringType, IntegerType, DateType
from pyspark.sql.functions import to_date, col

# Adjusting Data Types
silver = silver.withColumn("review_id", col("review_id").cast(StringType())) \
       .withColumn("book_id", col("book_id").cast(StringType())) \
       .withColumn("author_id", col("author_id").cast(StringType())) \
       .withColumn("user_id", col("user_id").cast(StringType())) \
       .withColumn("rating", col("rating").cast(IntegerType())) \
       .withColumn("n_votes", col("n_votes").cast(IntegerType())) \
       .withColumn("title", col("title").cast(StringType())) \
       .withColumn("name", col("name").cast(StringType())) \
       .withColumn("review_text", col("review_text").cast(StringType())) \
       .withColumn("language", col("language").cast(StringType())) \

silver.printSchema()
silver.show(5)


root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- language: string (nullable = true)
 |-- n_votes: integer (nullable = true)
 |-- date_added: string (nullable = true)

+--------------------+--------+--------------------+---------+----------------+--------------------+------+--------------------+--------+-------+--------------------+
|           review_id| book_id|               title|author_id|            name|             user_id|rating|         review_text|language|n_votes|          date_added|
+--------------------+--------+--------------------+---------+----------------+--------------------+------+--------------------+--------+-------+--------------------+
|1b5575e93fafb5a6f...|30739547|Preppy: The Life ...|  57

In [0]:
"""
Convert 'date_added' Column to Standard Date Format

This cell standardizes the 'date_added' column by converting its string-based
timestamp values into a proper Spark DateType using explicit dateâ€“time parsing.

Process:
    - The 'to_timestamp()' function interprets the original string pattern 
      (e.g., "Tue May 09 14:22:31 +0000 2017") according to the specified format.
    - The 'to_date()' function then extracts and stores only the date component 
      for simplified temporal analysis.

This conversion ensures consistency across all date fields and supports
accurate filtering, aggregation, and time-based analysis in later stages.
"""

from pyspark.sql.functions import to_date, to_timestamp, col

silver = silver.withColumn(
    "date_added",
    to_date(to_timestamp(col("date_added"), "EEE MMM dd HH:mm:ss Z yyyy"))
)

silver.show(5)

+--------------------+--------+--------------------+---------+----------------+--------------------+------+--------------------+--------+-------+----------+
|           review_id| book_id|               title|author_id|            name|             user_id|rating|         review_text|language|n_votes|date_added|
+--------------------+--------+--------------------+---------+----------------+--------------------+------+--------------------+--------+-------+----------+
|1b5575e93fafb5a6f...|30739547|Preppy: The Life ...|  5769238|    T.M. Frazier|68f9915717ccc347b...|     5|4 . 5 Bow tie wea...|   en-US| 115329|2016-07-30|
|c8fb6d2d662d6d498...|13539044|The Silver Lining...|  1251730|   Matthew Quick|b48aade607aa17b3d...|     5|The book was very...|     eng| 201333|2013-03-28|
|7f7c14c976eb3d645...|29074768|        If I Fix You| 14981314|Abigail  Johnson|d76881f6f75216d6f...|     4|Ah, If I Fix You....|     eng|   1192|2016-06-12|
|1114d9b34b67d629e...|13790759|Sarah Gives Thank...|  5861

In [0]:
"""
Handle Missing Values in Key Columns

This cell performs targeted data cleaning operations to address null or missing 
values in critical columns of the 'silver' DataFrame.

Cleaning Steps:
    - Remove records with null ratings, as these cannot contribute to quantitative analysis.
    - Replace missing 'n_votes' values with 0 to indicate books or authors with no recorded votes.
    - Replace missing 'language' entries with the label "Unknown" to preserve data completeness.

These transformations improve data integrity, ensuring that the dataset remains
consistent and analyzable without introducing bias or invalid records.
"""

from pyspark.sql.functions import when, lit

# Drop rows with no rating
silver = silver.filter(col("rating").isNotNull())

# Replace missing n_votes with 0
silver = silver.withColumn("n_votes", when(col("n_votes").isNull(), lit(0)).otherwise(col("n_votes")))

# Replace missing language with "Unknown"
silver = silver.withColumn("language", when(col("language").isNull(), lit("Unknown")).otherwise(col("language")))


In [0]:
"""
Filter Invalid Reviews and Compute Review Length

This cell enhances data quality by introducing a new derived feature and 
filtering out unreliable records from the 'silver' DataFrame.

Processing Steps:
    - Add a new column, 'review_length', which measures the number of characters 
      in each review's text content.
    - Exclude reviews with fewer than 10 characters to remove non-informative entries.
    - Remove records with null or invalid 'date_added' values and ensure all dates 
      fall on or before the current system date.

These cleaning steps ensure that only meaningful, time-valid, and analytically useful 
reviews are retained for downstream analysis.
"""

from pyspark.sql.functions import length, current_date
from pyspark.sql.functions import length

silver = silver.withColumn("review_length", length(col("review_text")))

# Drop reviews with text length < 10
silver = silver.filter(col("review_length") >= 10)

# Remove invalid or future dates
silver = silver.filter((col("date_added").isNotNull()) & (col("date_added") <= current_date()))

In [0]:
"""
Normalize Text Fields for Consistency

This cell standardizes text-based columns in the 'silver' DataFrame to ensure
uniform formatting and readability across records.

Transformations Applied:
    - 'trim()' removes leading and trailing whitespace from all text fields.
    - 'initcap()' capitalizes the first letter of each word in 'title' and 'name' 
      to maintain consistent naming conventions.
    - 'review_text' is trimmed to eliminate unnecessary spaces without altering case.

These formatting adjustments improve data presentation and prevent inconsistencies 
that may affect matching, grouping, or visualization tasks.
"""

from pyspark.sql.functions import trim, initcap

# Trim and Capitalize Each Word
silver = silver.withColumn("title", initcap(trim(col("title")))) \
       .withColumn("name", initcap(trim(col("name")))) \
       .withColumn("review_text", trim(col("review_text")))


In [0]:
"""
Save Cleaned and Enriched Dataset to Gold Layer

This cell writes the fully cleaned and standardized 'silver' DataFrame to the 
Gold layer of the Azure Data Lake in Delta format. This step finalizes the data 
preparation process, making the dataset ready for analysis, reporting, or modeling.

Key Parameters:
    - format("delta"): Stores the dataset in Delta Lake format for ACID compliance 
      and efficient querying.
    - mode("overwrite"): Replaces any existing version of the dataset in the target directory.
    - option("overwriteSchema", "true"): Ensures that the stored schema matches 
      the current DataFrame structure.

Destination Path:
    abfss://lakehouse@goodreadsreviews60302363.dfs.core.windows.net/gold/features_v1/

The resulting Gold-layer dataset represents the final, production-ready version of the 
Goodreads data pipeline.
"""

silver.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("abfss://lakehouse@goodreadsreviews60302363.dfs.core.windows.net/gold/features_v1/")