# Load the Curated Dataset

In [0]:
# Define the path to your dataset in the 'lakehouse/gold' directory
gold_path = "abfss://lakehouse@goodreadsreviews60105179.dfs.core.windows.net/gold/curated_reviews"

# Load the data into a DataFrame using Spark
df_cleaned  = spark.read.format("delta").load(gold_path)

# Show schema and preview the data
df_cleaned .printSchema()
df_cleaned .show(5)

root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- author_name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- n_votes: string (nullable = true)
 |-- date_added: string (nullable = true)

+--------------------+--------+--------------------+---------+-----------------+--------------------+------+--------------------+-------------+-------+--------------------+
|           review_id| book_id|               title|author_id|      author_name|             user_id|rating|         review_text|language_code|n_votes|          date_added|
+--------------------+--------+--------------------+---------+-----------------+--------------------+------+--------------------+-------------+-------+--------------------+
|8cd7a5db2add9048d...|    9

# Clean the Data

### Correct Data Types

In [0]:
from pyspark.sql.functions import col

# Convert the necessary columns to correct data types
df_cleaned = df_cleaned.withColumn('rating', col('rating').cast('float')) \
                       .withColumn('n_votes', col('n_votes').cast('int')) \
                       .withColumn('date_added', col('date_added').cast('timestamp')) \
                       .withColumn('book_id', col('book_id').cast('string')) \
                       .withColumn('author_id', col('author_id').cast('string')) \
                       .withColumn('user_id', col('user_id').cast('string'))

# Verify the schema after applying data type conversions
df_cleaned.printSchema()



root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- author_name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: float (nullable = true)
 |-- review_text: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- n_votes: integer (nullable = true)
 |-- date_added: timestamp (nullable = true)



### Remove Rows with Missing Values

In [0]:
# Remove rows where 'rating', 'review_text', 'book_id', or 'author_id' are null
df_cleaned = df.dropna(subset=['rating', 'review_text', 'book_id', 'author_id'])

# Verify the number of rows after dropping missing values
print(f"Rows after removing missing values: {df_cleaned.count()}")


Rows after removing missing values: 9653063


### Remove Duplicates

In [0]:
# Remove duplicates based on 'review_id'
df_cleaned = df_cleaned.dropDuplicates(['review_id'])

# Verify the number of rows after removing duplicates
print(f"Rows after removing duplicates by review_id: {df_cleaned.count()}")


Rows after removing duplicates by review_id: 9653063


### Normalize Text Fields

In [0]:
from pyspark.sql.functions import trim, lower, regexp_replace

# Normalize text fields: trim spaces, lowercase, and remove malformed characters
df_cleaned = df_cleaned.withColumn('review_text', trim(lower(col('review_text')))) \
                       .withColumn('author_name', trim(lower(col('author_name')))) \
                       .withColumn('title', trim(lower(col('title'))))

# Remove malformed characters (non-ASCII)
df_cleaned = df_cleaned.withColumn('review_text', regexp_replace(col('review_text'), '[^\\x00-\\x7F]', ''))

# Show a preview of the cleaned data
df_cleaned.select('review_text').show(5)


+--------------------+
|         review_text|
+--------------------+
|already know this...|
|mml@ \n llg@ d`yf...|
|text: 5 stars \n ...|
|as a book, i had ...|
|i've never read a...|
+--------------------+
only showing top 5 rows


### Drop Reviews with Very Short Text

In [0]:
from pyspark.sql.functions import length

# Create a new column for review length (in characters)
df_cleaned = df_cleaned.withColumn('review_length', length(col('review_text')))

# Filter out reviews with less than 10 characters
df_cleaned = df_cleaned.filter(col('review_length') >= 10)

# Show the number of rows after filtering short reviews
print(f"Rows after removing short reviews: {df_cleaned.count()}")


Rows after removing short reviews: 9653063


### Replace missing 'n_votes' with 0 and 'language_code' with "Unknown"

In [0]:
# Replace missing 'n_votes' with 0 and 'language_code' with "Unknown"
df_cleaned = df_cleaned.withColumn('n_votes', when(col('n_votes').isNull(), 0).otherwise(col('n_votes')))
df_cleaned = df_cleaned.withColumn('language_code', when(col('language_code').isNull(), 'Unknown').otherwise(col('language_code')))

### Verify Numeric Columns

In [0]:
from pyspark.sql.functions import col, current_date, count, when

# Step 1: Check for missing values in numeric columns
missing_values = df_cleaned.select([count(when(col(c).isNull(), c)).alias(c) for c in df_cleaned.columns])
missing_values.show()

# Step 2: Check if the numeric columns have valid values and fall within expected ranges

# Validate 'rating' (ensure it's between 0 and 5)
invalid_ratings = df_cleaned.filter((col('rating') < 0) | (col('rating') > 5))
invalid_ratings.show()

# Validate 'n_votes' (ensure it's positive, greater than or equal to 0)
invalid_votes = df_cleaned.filter(col('n_votes') < 0)
invalid_votes.show()

# Validate 'date_added' (ensure it's not null and not in the future)
invalid_dates = df_cleaned.filter((col('date_added').isNull()) | (col('date_added') > current_date()))
invalid_dates.show()

# Step 3: Make sure no unexpected non-numeric values exist in numeric columns
# For example, check if 'n_votes' contains only numeric values
invalid_n_votes = df_cleaned.filter(~col('n_votes').rlike('^\d+$'))
invalid_n_votes.show()

# If any of the above steps returns any rows, i need to clean or handle those rows. 

  invalid_n_votes = df_cleaned.filter(~col('n_votes').rlike('^\d+$'))


+---------+-------+-----+---------+-----------+-------+------+-----------+-------------+-------+----------+-------------+
|review_id|book_id|title|author_id|author_name|user_id|rating|review_text|language_code|n_votes|date_added|review_length|
+---------+-------+-----+---------+-----------+-------+------+-----------+-------------+-------+----------+-------------+
|        0|      0|    0|        0|          0|      0|     0|          0|            0|      0|         0|            0|
+---------+-------+-----+---------+-----------+-------+------+-----------+-------------+-------+----------+-------------+

+---------+-------+-----+---------+-----------+-------+------+-----------+-------------+-------+----------+-------------+
|review_id|book_id|title|author_id|author_name|user_id|rating|review_text|language_code|n_votes|date_added|review_length|
+---------+-------+-----+---------+-----------+-------+------+-----------+-------------+-------+----------+-------------+
+---------+-------+----



In [0]:
# Convert the necessary columns to correct data types
df_cleaned = df_cleaned.withColumn('rating', col('rating').cast('float')) \
                       .withColumn('n_votes', col('n_votes').cast('int')) \
                       .withColumn('date_added', col('date_added').cast('timestamp')) \
                       .withColumn('book_id', col('book_id').cast('string')) \
                       .withColumn('author_id', col('author_id').cast('string')) \
                       .withColumn('user_id', col('user_id').cast('string'))

### Final check on the schema and data preview

In [0]:

df_cleaned.printSchema()

# Show a preview of the cleaned data
df_cleaned.show(5)

root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- author_name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: float (nullable = true)
 |-- review_text: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- n_votes: integer (nullable = true)
 |-- date_added: timestamp (nullable = true)
 |-- review_length: integer (nullable = true)

+--------------------+--------+--------------------+---------+--------------------+--------------------+------+--------------------+-------------+-------+----------+-------------+
|           review_id| book_id|               title|author_id|         author_name|             user_id|rating|         review_text|language_code|n_votes|date_added|review_length|
+--------------------+--------+--------------------+---------+--------------------+--------------------+------+--------------------+------

In [0]:
print(f"Row count after cleaning: {df_cleaned.count()}")



Row count after cleaning: 9653063


# Feature preparation

### Compute Review Length in Words

In [0]:
from pyspark.sql.functions import length, split

# Calculate review length in words by splitting the review text and counting them
df_cleaned = df_cleaned.withColumn(
    'review_length_in_words', 
    size(split(col('review_text'), '\s+'))
)

# Show the new feature to confirm
df_cleaned.select('review_text', 'review_length_in_words').show(5)


  size(split(col('review_text'), '\s+'))


+--------------------+----------------------+
|         review_text|review_length_in_words|
+--------------------+----------------------+
|already know this...|                    99|
|mml@ \n llg@ d`yf...|                    34|
|text: 5 stars \n ...|                    87|
|as a book, i had ...|                    29|
|i've never read a...|                   159|
+--------------------+----------------------+
only showing top 5 rows




### Aggregate by book_id to Calculate Average Rating and Number of Reviews

In [0]:
from pyspark.sql.functions import avg, count

# Group by 'book_id' and calculate average rating and number of reviews
df_aggregated = df_cleaned.groupBy('book_id') \
    .agg(
        avg('rating').alias('average_rating'),
        count('review_id').alias('number_of_reviews')
    )

# Show a preview of the aggregated data
df_aggregated.show(5)


+--------+------------------+-----------------+
| book_id|    average_rating|number_of_reviews|
+--------+------------------+-----------------+
| 7663760|             3.125|                8|
|22387890|               3.5|                2|
| 1096390|3.9038031319910513|              447|
|13166894|               4.0|                1|
|25430624| 4.545454545454546|               11|
+--------+------------------+-----------------+
only showing top 5 rows


### Save Results to the Gold Layer

In [0]:
# Join the aggregated data with the original dataframe
df_final = df_cleaned.join(df_aggregated, on='book_id', how='left')

# Show the final dataset with aggregated features
df_final.show(5)




+--------+--------------------+--------------------+---------+--------------------+--------------------+------+--------------------+-------------+-------+----------+-------------+----------------------+------------------+-----------------+
| book_id|           review_id|               title|author_id|         author_name|             user_id|rating|         review_text|language_code|n_votes|date_added|review_length|review_length_in_words|    average_rating|number_of_reviews|
+--------+--------------------+--------------------+---------+--------------------+--------------------+------+--------------------+-------------+-------+----------+-------------+----------------------+------------------+-----------------+
|16096824|e51b73ff64cbb7127...|a court of thorns...|  3433047|       sarah j. maas|6ed429d912d646e64...|   5.0|this book proves ...|          eng| 182581|      NULL|          228|                    43| 4.232232920678587|             4362|
|18250100|c763b80f325e4af49...|         

In [0]:
# Define the path for the Gold layer (features_v1)
gold_path = "abfss://lakehouse@goodreadsreviews60105179.dfs.core.windows.net/gold/features_v1"

# Save the dataframe to Delta format in the Gold layer
df_final.write.format("delta").mode("overwrite").option("mergeSchema", "true").save(gold_path)


### Verify


In [0]:
# Reload the saved dataset from the Gold layer
df_verified = spark.read.format("delta").load(gold_path)

# Verify the schema and preview the data
df_verified.printSchema()
# Check the number of records to verify data size after saving
record_count = df_verified.count()
print(f"Record count after saving: {record_count}")

df_verified.show(5)


root
 |-- book_id: string (nullable = true)
 |-- average_rating: double (nullable = true)
 |-- number_of_reviews: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- author_name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: float (nullable = true)
 |-- review_text: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- n_votes: integer (nullable = true)
 |-- date_added: timestamp (nullable = true)
 |-- review_length: integer (nullable = true)
 |-- review_length_in_words: integer (nullable = true)

Record count after saving: 9653063
+--------+------------------+-----------------+--------------------+--------------------+---------+--------------------+--------------------+------+--------------------+-------------+-------+----------+-------------+----------------------+
| book_id|    average_rating|number_of_reviews|           review_id|       