## Connect to ADLS Gen2 (Storage Account)

In [0]:
# Replace with your actual storage account name and key
spark.conf.set(
"fs.azure.account.key.goodreadsreviews60300294.dfs.core.windows.net",
"SdrUSgCnzVYmEhQn9mzu3HtSdzHfZLLnQ+2ofOm7fq4GktiUUs3bZw7qJoD8BXFqtyfzCkDbfKZI+ASt5tp6qQ=="
)

## Load Silver Parquet (curated_reviews)

In [0]:
# Load the curated reviews dataset from the silver layer
gold_path = "abfss://lakehouse@goodreadsreviews60300294.dfs.core.windows.net/gold/curated_reviews/"
curated_reviews = spark.read.format("delta").load(gold_path)

print(f"✓ Loaded gold table: {curated_reviews.count()} records")
print(f"Available columns: {curated_reviews.columns}")
print(f"\nSchema: {curated_reviews.printSchema()}")
print(f"\nSample data:{curated_reviews.show(3, truncate=False)}")

df = curated_reviews

# Clean the date 


In [0]:
# import libraries
from pyspark.sql.functions import col, trim, length, lower, regexp_replace, when, to_date, to_timestamp, coalesce, unix_timestamp, from_unixtime, round, avg, count, min, max, percentile_approx, stddev, size, split
from pyspark.sql import functions as F

In [0]:
# count the total number of rows and the number of null values in each column
total_rows = df.count()
null_review_id = df.filter(col("review_id").isNull()).count()
null_book_id = df.filter(col("book_id").isNull()).count()
null_user_id = df.filter(col("user_id").isNull()).count()
null_rating = df.filter(col("rating").isNull()).count()
empty_text = df.filter( (col("review_text").isNull()) | (trim(col("review_text")) == "")).count()
print(f"Total rows: {total_rows}")
print(f"NULL review_id: {null_review_id}, NULL book_id: {null_book_id}, NULL user_id {null_user_id}, NULL rating: {null_rating}")
print(f"Empty/NULL review_text: {empty_text}")



#### from the previous cell number 7, we can say that the there is no null values

## remove missing values and remove duplicates

In [0]:
# 1) Drop rows missing critical keys
# -- thw question from the lab 3 paper, page 18, 
# -- Remove rows where rating, review_text, book_id, or author_id are missing
df = df.filter(
col("review_id").isNotNull() &
col("book_id").isNotNull() &
col("user_id").isNotNull() & 
col("rating").isNotNull() 
)

# 2) De-duplicate by review_id 
# -- Remove duplicates by review_id or by (user_id, book_id)
# -- for me i remove it just from review_id
df = df.dropDuplicates(["review_id"])



## Normalize Text 


In [0]:
# Normalize text-based columns
df = (df
    # remove control / malformed characters (like \n, \r, unicode junk)
    .withColumn("title", trim(regexp_replace(col("title"), r"\p{C}", "")))
    .withColumn("name", trim(regexp_replace(col("name"), r"\p{C}", "")))
    .withColumn("review_text", trim(regexp_replace(col("review_text"), r"\p{C}", "")))

    # make 'language' lowercase and trimmed
    .withColumn("language", lower(trim(col("language"))))
)

df = df.withColumn(
    "language",
    when(col("language").isNull() | (col("language") == ""), "unknown")
    .otherwise(col("language"))
)

# ✅ Quick check on first few rows
df.select("title", "name", "language", "review_text").show(5, truncate=False)

## Drop reviews with very short text.

In [0]:
# Count total before
before = df.count()

# Drop reviews where the text length (after trimming) is too short
df = df.filter(length(col("review_text")) >= 10)

# Count total after
after = df.count()

print(f"Rows before: {before:,}  |  after: {after:,}  |  removed: {before - after:,}")


## Correct data types
- rating are numeric
- ID's are text
- dates are properly formatted

In [0]:
# convert the data types
df = (
    df.withColumn("review_id", col("review_id").cast("string"))   # review_id → string
      .withColumn("user_id",   col("user_id").cast("string"))     # user_id → string
      .withColumn("book_id",   col("book_id").cast("integer"))    # book_id → integer
      .withColumn("author_id", col("author_id").cast("integer"))  # author_id → integer
)
# --- rating as int in [1..5] ---
df = df.withColumn("rating", col("rating").cast("int")) \
       .filter(col("rating").between(1, 5))

# --- n_votes as non-negative int; null/negative → 0 ---
if "n_votes" in df.columns:
    df = (df.withColumn("n_votes", col("n_votes").cast("int"))
            .withColumn("n_votes",
                        when(col("n_votes").isNull() | (col("n_votes") < 0), 0)
                        .otherwise(col("n_votes"))))
    
# --- date_added: parse "EEE MMM dd HH:mm:ss Z yyyy" → DATE ---
# Example: "Wed Dec 15 08:57:20 -0800 2010"
df = df.withColumn("date_added_str", trim(col("date_added").cast("string")))

date_str = col("date_added_str")
ts1 = to_timestamp(date_str, "EEE MMM dd HH:mm:ss Z yyyy")   # e.g., Wed Dec 15 08:57:20 -0800 2010
ts2 = to_timestamp(date_str, "EEE MMM dd HH:mm:ss xx yyyy")  # alternate offset
ts3 = to_timestamp(date_str, "EEE MMM dd HH:mm:ss XX yyyy")  # alternate offset

legacy_ts = from_unixtime(unix_timestamp(date_str, "EEE MMM dd HH:mm:ss Z yyyy")).cast("timestamp")

df = (df
      .withColumn("date_added_ts", coalesce(ts1, ts2, ts3, legacy_ts))
      .withColumn("date_added_date", to_date(col("date_added_ts")))
      .drop("date_added_str")
     )

after = df.count()
print(f"Rows before: {before:,}  |  after: {after:,}  |  removed: {before - after:,}")

# ✅ Quick check
df.printSchema()
df.select("review_id","book_id","author_id","user_id","rating","n_votes","date_added").show(10, truncate=False)

## Drop unused or irrelevant columns

In [0]:
# List of required final columns for Gold
required_cols = [
    "review_id", "book_id", "title", "author_id", "name",
    "user_id", "rating", "review_text", "language", "n_votes", "date_added"
]

# Select only the required columns (automatically drops the rest)
curated_reviews_gold = df.select(*required_cols)

# Verify the schema
curated_reviews_gold.printSchema()
curated_reviews_gold.show(5, truncate=False)


## Compute review length in words 

In [0]:
# Compute review length in words
curated_reviews_gold = curated_reviews_gold.withColumn(
    "review_len_words",
    when(
        col("review_text").isNotNull(),
        size(
            split(
                trim(regexp_replace(col("review_text"), r"\s+", " ")),  # normalize spaces
                r"\s+"  # split on whitespace
            )
        )
    ).otherwise(0)  # handle null review_text
)

# Verify
curated_reviews_gold.select("review_id", "review_len_words").show(10, truncate=False)


## Check the review_text if it has charachers less than 10

In [0]:
# Show short reviews (less than 10 characters)
curated_reviews_gold.filter(length(col("review_text")) < 10) \
    .select("review_id", "review_text", "review_len_words") \
    .show(20, truncate=False)


## Redo the Aggregations and Enrichment, in Group by in Data Fabric task into Data bricks

### Calculate Average rating per BookID and Number of reviews per BookID

In [0]:
book_stats = (curated_reviews_gold
    .groupBy("book_id")
    .agg(
        round(avg("rating"), 2).alias("avg_rating"),
        count("*").alias("num_reviews")
    )
)

display(book_stats)

### Calculate number of Average rating per author name, with number of reviews

In [0]:
author_stats = (curated_reviews_gold
    .groupBy("name")
    .agg(
        round(avg("rating"), 2).alias("avg_rating"),
        count("*").alias("num_reviews")
    )
    .withColumnRenamed("name", "author_name")
)

display(author_stats)

### Word count statistics on reviews, I calculated the followings:
- min
- max
- average,
- standard deviation (std)
- percentile:
  - 0.25
  - 0.5 or median
  - 0.75

In [0]:
word_stats = (curated_reviews_gold
    .select("review_len_words")
    .agg(
        min("review_len_words").alias("min_words"),
        max("review_len_words").alias("max_words"),
        round(avg("review_len_words"), 2).alias("avg_words"),
        round(stddev("review_len_words"), 2).alias("std_words"),
        percentile_approx("review_len_words", 0.25).alias("p25_words"),
        percentile_approx("review_len_words", 0.50).alias("median_words"),
        percentile_approx("review_len_words", 0.75).alias("p75_words"),
    )
)

display(word_stats)

## Save results to the Gold layer

In [0]:
# Use the schema (database)
spark.sql("USE SCHEMA default")

# Save as a managed Delta table
curated_reviews_gold.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("default.features_v1")

# Verification: 
# Check total number of records in the saved table
spark.sql("SELECT COUNT(*) AS rows FROM default.features_v1").show()

# Preview a few sample rows to confirm data looks correct
spark.sql("""
  SELECT review_id, title, rating, review_len_words, language, date_added
  FROM default.features_v1
  ORDER BY date_added DESC
  LIMIT 10
""").show(truncate=False)


In [0]:
# 1️⃣ Reload the saved Delta table
features_v1 = spark.read.table("default.features_v1")

# 2️⃣ Inspect schema
print("=== Schema ===")
features_v1.printSchema()

# 3️⃣ Record count
print("=== Record Count ===")
features_v1_count = features_v1.count()
print(f"Total records in features_v1: {features_v1_count}")

# 4️⃣ Show sample rows
print("=== Sample Rows ===")
features_v1.select(
    "review_id", "title", "rating", "review_len_words", "language", "date_added"
).show(10, truncate=False)

# 5️⃣ Quick summary stats
print("=== Summary Statistics ===")
features_v1.selectExpr(
    "round(avg(rating),2) as avg_rating",
    "round(avg(review_len_words),2) as avg_review_length",
    "min(review_len_words) as min_review_length",
    "max(review_len_words) as max_review_length"
).show()

# 6️⃣ Optional SQL check (sanity)
spark.sql("""
  SELECT COUNT(*) AS rows,
         ROUND(AVG(rating),2) AS avg_rating,
         ROUND(AVG(review_len_words),2) AS avg_review_length
  FROM default.features_v1
""").show()
