In [0]:
# --- Azure storage key setup ---
spark.conf.set(
    "fs.azure.account.key.goodreadsreviews60302712.dfs.core.windows.net",
    "00u5OPJ3GYKrSACO9uvWxs8ZikHiJ7FosLxuXngxN1GqYBs5f2SLr/V0yESwhmfXXSovGYabn6Ai+AStqwR5Tg=="
)

# --- Load the features_v1 (from Lab 3) ---
spark.sql("USE SCHEMA default")
df = spark.table("features_v1")

print("Total rows:", df.count())
df.printSchema()
df.show(5)


Total rows: 14926784
root
 |-- book_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- n_votes: long (nullable = true)
 |-- date_added: string (nullable = true)
 |-- review_length_words: integer (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- num_reviews: long (nullable = true)

+--------+--------------------+--------------------+---------+--------------------+--------------------+------+--------------------+-------------+-------+--------------------+-------------------+------------------+-----------+
| book_id|           review_id|               title|author_id|                name|             user_id|rating|         review_text|language_code|n_votes|          date_add

In [0]:
from pyspark.sql import functions as F

# Basic sanity cleaning
df = (
    df.filter((F.col("rating") >= 1) & (F.col("rating") <= 5))
      .filter(F.length("review_text") >= 10)
      .dropna(subset=["review_text", "rating"])
)

# Split the dataset
train_df, val_df, test_df = df.randomSplit([0.7, 0.15, 0.15], seed=42)

# Save the splits to Gold/features_v2
base = "abfss://lakehouse@goodreadsreviews60302712.dfs.core.windows.net/gold/features_v2"
(train_df.write.mode("overwrite").format("delta").save(f"{base}/train"))
(val_df.write.mode("overwrite").format("delta").save(f"{base}/val"))
(test_df.write.mode("overwrite").format("delta").save(f"{base}/test"))

print("Train:", train_df.count(), "Val:", val_df.count(), "Test:", test_df.count())


Train: 10450634 Val: 2237780 Test: 2238370


In [0]:
from pyspark.sql import functions as F

# Load train split
train_df = spark.read.format("delta").load(
    "abfss://lakehouse@goodreadsreviews60302712.dfs.core.windows.net/gold/features_v2/train"
)

# Normalize text: trim, lowercase, remove control characters
def normalize_text(colname):
    return F.lower(
        F.regexp_replace(
            F.trim(F.col(colname)),
            r"[\x00-\x1F\x7F]", " "
        )
    )

train_df = (
    train_df
    .withColumn("review_text", normalize_text("review_text"))
    .withColumn("title", F.trim(F.col("title")))
    .withColumn("name", F.trim(F.col("name")))
    .filter(F.length("review_text") > 10)
)

# --- Basic features ---
train_df = (
    train_df
    .withColumn("review_length_words", F.size(F.split(F.col("review_text"), r"\s+")))
    .withColumn("review_length_chars", F.length(F.col("review_text")))
    .withColumn("review_date", F.to_date("date_added"))
    .withColumn("review_age_days", F.datediff(F.current_date(), F.col("review_date")))
)

# Preview
train_df.select(
    "review_id", "rating", "review_length_words",
    "review_length_chars", "review_age_days"
).show(5)


+--------------------+------+-------------------+-------------------+---------------+
|           review_id|rating|review_length_words|review_length_chars|review_age_days|
+--------------------+------+-------------------+-------------------+---------------+
|16b1077b60e3674e4...|     4|                 65|                357|           NULL|
|3c604adc11027e462...|     4|                179|                966|           NULL|
|41ee27142256927ba...|     3|                 16|                 98|           NULL|
|6589cda6675f6b52b...|     2|                130|                781|           NULL|
|db7dcf73787f66199...|     4|                 89|                515|           NULL|
+--------------------+------+-------------------+-------------------+---------------+
only showing top 5 rows


In [0]:
train_df.write.mode("overwrite").format("delta").save(
    "abfss://lakehouse@goodreadsreviews60302712.dfs.core.windows.net/gold/features_v2/train_clean"
)


In [0]:
# Install once (on cluster)
%pip install nltk
import nltk
nltk.download('vader_lexicon')

from nltk.sentiment.vader import SentimentIntensityAnalyzer
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# Initialize analyzer
sid = SentimentIntensityAnalyzer()

# Define UDFs
def get_sentiment_score(text, key):
    if text is None:
        return 0.0
    s = sid.polarity_scores(text)
    return float(s[key])

get_compound = udf(lambda x: get_sentiment_score(x, "compound"), DoubleType())
get_pos = udf(lambda x: get_sentiment_score(x, "pos"), DoubleType())
get_neg = udf(lambda x: get_sentiment_score(x, "neg"), DoubleType())
get_neu = udf(lambda x: get_sentiment_score(x, "neu"), DoubleType())

# Load train split again
train_df = spark.read.format("delta").load(
    "abfss://lakehouse@goodreadsreviews60302712.dfs.core.windows.net/gold/features_v2/train_clean"
)

# Apply sentiment columns
sentiment_df = (
    train_df
    .withColumn("sent_pos", get_pos(F.col("review_text")))
    .withColumn("sent_neg", get_neg(F.col("review_text")))
    .withColumn("sent_neu", get_neu(F.col("review_text")))
    .withColumn("sent_compound", get_compound(F.col("review_text")))
)

sentiment_df.select("review_id","sent_pos","sent_neg","sent_compound").show(5)


Collecting nltk
  Downloading nltk-3.9.2-py3-none-any.whl.metadata (3.2 kB)
Collecting regex>=2021.8.3 (from nltk)
  Downloading regex-2025.11.3-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (40 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/40.5 kB[0m [31m?[0m eta [36m-:--:--[0m
[2K     [91m━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━[0m [32m20.5/40.5 kB[0m [31m653.4 kB/s[0m eta [36m0:00:01[0m
[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━[0m [32m30.7/40.5 kB[0m [31m651.4 kB/s[0m eta [36m0:00:01[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.5/40.5 kB[0m [31m338.0 kB/s[0m eta [36m0:00:00[0m
[?25hCollecting tqdm (from nltk)
  Downloading tqdm-4.67.1-py3-none-any.whl.metadata (57 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/57.7 kB[0m [31m?[0m eta [36m-:--:--[0m
[2K     [91m━━━━━━━━━━━━━━━━━━━━━━

[nltk_data] Downloading package vader_lexicon to /home/spark-
[nltk_data]     cd8ff534-1416-4f9a-9882-2f/nltk_data...


+--------------------+--------+--------+-------------+
|           review_id|sent_pos|sent_neg|sent_compound|
+--------------------+--------+--------+-------------+
|4f761257f88a8ed51...|   0.162|   0.047|       0.8196|
|78937bbd2d1bc4d68...|   0.132|     0.0|       0.4927|
|a2acca81bc0462006...|   0.124|   0.097|       0.9509|
|304c3f0317bdaa18a...|   0.249|   0.031|       0.9783|
|796ecd157de6b82da...|   0.311|   0.039|       0.9942|
+--------------------+--------+--------+-------------+
only showing top 5 rows


In [0]:
# Load your cleaned training split
train_df = spark.read.format("delta").load(
    "abfss://lakehouse@goodreadsreviews60302712.dfs.core.windows.net/gold/features_v2/train_clean"
)

train_df.select("review_id", "review_text").show(3)


+--------------------+--------------------+
|           review_id|         review_text|
+--------------------+--------------------+
|4f761257f88a8ed51...|...don't really r...|
|78937bbd2d1bc4d68...|a very good story...|
|a2acca81bc0462006...|i discovered a co...|
+--------------------+--------------------+
only showing top 3 rows


In [0]:
# ==============================================================
# STEP 3 – FULL DATASET TF-IDF (SAFE + OPTIMIZED VERSION)
# ==============================================================

from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType
import re

# --- Optimize Spark for large workloads ---
spark.conf.set("spark.sql.shuffle.partitions", "200")
spark.conf.set("spark.default.parallelism", "200")

# --- 1️⃣ Load your full cleaned training dataset ---
train_df = spark.read.format("delta").load(
    "abfss://lakehouse@goodreadsreviews60302712.dfs.core.windows.net/gold/features_v2/train_clean"
)

print("Training rows:", train_df.count())

# --- 2️⃣ Tokenize text manually (no blocked constructors) ---
def simple_tokenize(text):
    if text is None:
        return []
    # find all word tokens (alphanumeric)
    return re.findall(r"\b\w+\b", text.lower())

tokenize_udf = F.udf(simple_tokenize, ArrayType(StringType()))
tokens_df = train_df.withColumn("tokens", tokenize_udf(F.col("review_text")))

# --- 3️⃣ Compute term frequency (TF) per review_id + token ---
tf_df = (
    tokens_df
    .withColumn("token", F.explode("tokens"))
    .groupBy("review_id", "token")
    .agg(F.count("*").alias("tf"))
)

# --- 4️⃣ Compute document frequency (DF) per token ---
df_counts = (
    tf_df.select("token", "review_id")
         .distinct()
         .groupBy("token")
         .agg(F.count("*").alias("df"))
)

# --- 5️⃣ Compute inverse document frequency (IDF) ---
total_docs = train_df.count()
idf_df = df_counts.withColumn("idf", F.log(F.lit(total_docs) / (F.col("df") + 1)))

# --- 6️⃣ Join TF + IDF to get TF-IDF per (review_id, token) ---
tfidf_df = (
    tf_df.join(idf_df, "token", "left")
         .withColumn("tfidf", F.col("tf") * F.col("idf"))
)

# --- 7️⃣ Aggregate average TF-IDF per review ---
agg_tfidf_df = (
    tfidf_df.groupBy("review_id")
            .agg(F.avg("tfidf").alias("avg_tfidf"))
)

# --- 8️⃣ Join aggregated TF-IDF back to main training DataFrame ---
final_features = (
    train_df.join(agg_tfidf_df, "review_id", "left")
            .fillna({"avg_tfidf": 0.0})
)

# --- 9️⃣ Save the new dataset to the Gold layer ---
(
    final_features
    .write
    .mode("overwrite")
    .format("delta")
    .option("overwriteSchema", "true")
    .save("abfss://lakehouse@goodreadsreviews60302712.dfs.core.windows.net/gold/features_v3/train_tfidf_full")
)

print("✅ Full TF-IDF features created and saved successfully!")


Training rows: 10450382
✅ Full TF-IDF features created and saved successfully!


In [0]:
final_features = spark.read.format("delta").load(
    "abfss://lakehouse@goodreadsreviews60302712.dfs.core.windows.net/gold/features_v3/train_tfidf_full"
)
final_features.select("review_id", "avg_tfidf").show(10, truncate=False)
print("Total rows:", final_features.count())


+--------------------------------+------------------+
|review_id                       |avg_tfidf         |
+--------------------------------+------------------+
|000250b46f177fe76441bbd178a64671|2.571978419582607 |
|00027dd587d48a1ea0799f140b0fef60|6.607502762016232 |
|00064c6ed6be9052c383a5f8a26d1cf9|5.735628913909095 |
|000671601b00cc04bd8eaf95de8f1ab4|5.2709950740302265|
|00076d5127e698fa6386a108bf6b3b49|3.7015110192835037|
|000840728ece4129d2c437e7dbdc4148|4.483717371363774 |
|0008423c9faf30b7956c58987e4e49b9|5.94937770943427  |
|000848b3aa2132509689cfbb3f0ccc9a|2.62155774064224  |
|0008ad99cab8191f3eb09476c74d6e10|4.262934645461822 |
|0008c54e6ff69a02f01ddc405c1fd8e4|4.154119287532405 |
+--------------------------------+------------------+
only showing top 10 rows
Total rows: 10450382


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType
import re

# Load your saved IDF data from Step 3
idf_df = spark.read.format("delta").load(
    "abfss://lakehouse@goodreadsreviews60302712.dfs.core.windows.net/gold/features_v3/train_tfidf_full"
).select("review_id", "avg_tfidf")  # sanity check read works

print("Loaded IDF base OK:", idf_df.count())

# Function to tokenize safely (same as before)
def simple_tokenize(text):
    if text is None:
        return []
    return re.findall(r"\b\w+\b", text.lower())

tokenize_udf = F.udf(simple_tokenize, ArrayType(StringType()))


Loaded IDF base OK: 10450382


In [0]:
# =====================  VALIDATION  =====================
val_df = spark.read.format("delta").load(
    "abfss://lakehouse@goodreadsreviews60302712.dfs.core.windows.net/gold/features_v2/val"
)

tokens_val = val_df.withColumn("tokens", tokenize_udf(F.col("review_text")))

# Compute TF for validation
tf_val = (
    tokens_val
    .withColumn("token", F.explode("tokens"))
    .groupBy("review_id", "token")
    .agg(F.count("*").alias("tf"))
)

# Reuse training IDF weights (computed earlier)
total_docs_train = spark.read.format("delta").load(
    "abfss://lakehouse@goodreadsreviews60302712.dfs.core.windows.net/gold/features_v2/train_clean"
).count()

df_counts_val = (
    tf_val.select("token", "review_id")
          .distinct()
          .groupBy("token")
          .agg(F.count("*").alias("df"))
)

idf_val = df_counts_val.withColumn(
    "idf",
    F.log(F.lit(total_docs_train) / (F.col("df") + 1))
)

tfidf_val = (
    tf_val.join(idf_val, "token", "left")
          .withColumn("tfidf", F.col("tf") * F.col("idf"))
)

agg_val = tfidf_val.groupBy("review_id").agg(F.avg("tfidf").alias("avg_tfidf"))
final_val = val_df.join(agg_val, "review_id", "left").fillna({"avg_tfidf": 0.0})

final_val.write.mode("overwrite").format("delta").save(
    "abfss://lakehouse@goodreadsreviews60302712.dfs.core.windows.net/gold/features_v3/val_tfidf_full"
)

# =====================  TEST  =====================
test_df = spark.read.format("delta").load(
    "abfss://lakehouse@goodreadsreviews60302712.dfs.core.windows.net/gold/features_v2/test"
)

tokens_test = test_df.withColumn("tokens", tokenize_udf(F.col("review_text")))

tf_test = (
    tokens_test
    .withColumn("token", F.explode("tokens"))
    .groupBy("review_id", "token")
    .agg(F.count("*").alias("tf"))
)

df_counts_test = (
    tf_test.select("token", "review_id")
           .distinct()
           .groupBy("token")
           .agg(F.count("*").alias("df"))
)

idf_test = df_counts_test.withColumn(
    "idf",
    F.log(F.lit(total_docs_train) / (F.col("df") + 1))
)

tfidf_test = (
    tf_test.join(idf_test, "token", "left")
           .withColumn("tfidf", F.col("tf") * F.col("idf"))
)

agg_test = tfidf_test.groupBy("review_id").agg(F.avg("tfidf").alias("avg_tfidf"))
final_test = test_df.join(agg_test, "review_id", "left").fillna({"avg_tfidf": 0.0})

final_test.write.mode("overwrite").format("delta").save(
    "abfss://lakehouse@goodreadsreviews60302712.dfs.core.windows.net/gold/features_v3/test_tfidf_full"
)

print("Validation and Test TF-IDF features created successfully!")


Validation and Test TF-IDF features created successfully!


In [0]:
check_val = spark.read.format("delta").load(
    "abfss://lakehouse@goodreadsreviews60302712.dfs.core.windows.net/gold/features_v3/val_tfidf_full"
)
check_test = spark.read.format("delta").load(
    "abfss://lakehouse@goodreadsreviews60302712.dfs.core.windows.net/gold/features_v3/test_tfidf_full"
)

print("Val rows:", check_val.count(), "Test rows:", check_test.count())
check_val.select("review_id","avg_tfidf").show(5,truncate=False)



Val rows: 2237751 Test rows: 2238651
+--------------------------------+------------------+
|review_id                       |avg_tfidf         |
+--------------------------------+------------------+
|000f2a9874877c0bf5d79dbc609fd274|3.667228984818703 |
|0014695adffdb153acb7ae1e92f1fe68|3.5939799350433574|
|001670bf56a8c149a50ba794f5df59d6|3.3185408144825224|
|001aacf065d1f8bd067b0bf511768efa|7.512714756906683 |
|0023471454a739b777ee495dde3fa018|6.562922043429661 |
+--------------------------------+------------------+
only showing top 5 rows


In [0]:
from pyspark.sql import functions as F

train_final = spark.read.format("delta").load(
    "abfss://lakehouse@goodreadsreviews60302712.dfs.core.windows.net/gold/features_v3/train_tfidf_full"
)

# Optional sanity checks
train_final.select(
    "review_id","rating","review_length_words","avg_tfidf"
).show(5, truncate=False)

print("Rows:", train_final.count())


+--------------------------------+------+-------------------+------------------+
|review_id                       |rating|review_length_words|avg_tfidf         |
+--------------------------------+------+-------------------+------------------+
|000250b46f177fe76441bbd178a64671|4     |32                 |2.571978419582607 |
|00027dd587d48a1ea0799f140b0fef60|3     |40                 |6.607502762016232 |
|00064c6ed6be9052c383a5f8a26d1cf9|5     |360                |5.735628913909095 |
|000671601b00cc04bd8eaf95de8f1ab4|4     |475                |5.2709950740302265|
|00076d5127e698fa6386a108bf6b3b49|2     |192                |3.7015110192835037|
+--------------------------------+------+-------------------+------------------+
only showing top 5 rows
Rows: 10450382
