In [0]:
# Set up storage authentication (use your actual storage account and key)
spark.conf.set(
    "fs.azure.account.key.goodreadsreviews60301511.dfs.core.windows.net",
    "QZzPMlZcQvM/LeucwJ67H1zRkEhbWCH9+uxdJaTWALJU/QN8ArtpEhMHmVb7vT2DaXAgMY52PkPH+AStsA7+fw=="
)

# Load the curated gold dataset
curated_reviews_gold = spark.read.format("delta").load(
    "abfss://lakehouse@goodreadsreviews60301511.dfs.core.windows.net/gold/curated_reviews/"
)

print(f"Loaded {curated_reviews_gold.count()} records")
curated_reviews_gold.show(5)

Loaded 19688803 records
+--------------------+--------+--------------------+---------+---------------+--------------------+------+--------------------+
|           review_id| book_id|               title|author_id|           name|             user_id|rating|         review_text|
+--------------------+--------+--------------------+---------+---------------+--------------------+------+--------------------+
|165cf8dcbcf493129...|20172134|Nightmares! (Nigh...|   109354| Kirsten Miller|5ef3b7a0f64ae79d0...|     4|I was pleasantly ...|
|2cd8eec721eaf9bee...| 7937843|                Room|    23613|  Emma Donoghue|5ccf302a3b317983e...|     2|I tend to have ha...|
|b9fc663c5c884bdb7...|  341336|To the Edge (The ...|   195778|   Cindy Gerard|58f7d3af14dfa25ac...|     3|3 STARS \n Jillia...|
|0efcd32371b30497c...|18304774|     Một cuộc gặp gỡ|     6343|  Milan Kundera|5ae5183d9cd8ca0d6...|     3|Khong hieu do ngu...|
+--------------------+--------+--------------------+---------+---------------+--

In [0]:
from pyspark.sql.functions import rand

# Split the data into train, validation, and test sets
splits = curated_reviews_gold.randomSplit([0.7, 0.15, 0.15], seed=42)
train_df, val_df, test_df = splits

print(f"Train: {train_df.count()} records")
print(f"Validation: {val_df.count()} records")
print(f"Test: {test_df.count()} records")

# Save the splits to the gold layer
train_df.write.mode("overwrite").format("delta").save("abfss://lakehouse@goodreadsreviews60301511.dfs.core.windows.net/gold/feature_v2/train")
val_df.write.mode("overwrite").format("delta").save("abfss://lakehouse@goodreadsreviews60301511.dfs.core.windows.net/gold/feature_v2/validation")
test_df.write.mode("overwrite").format("delta").save("abfss://lakehouse@goodreadsreviews60301511.dfs.core.windows.net/gold/feature_v2/test")

Train: 13783384 records
Validation: 2951447 records
Test: 2953972 records


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import re

def clean_text(df, text_column="review_text"):
    """
    Comprehensive text cleaning function
    """
    # Convert to lowercase
    df = df.withColumn(text_column, lower(col(text_column)))
    
    # Remove extra whitespace
    df = df.withColumn(text_column, regexp_replace(col(text_column), "\\s+", " "))
    
    # Remove punctuation (keep basic sentence structure)
    df = df.withColumn(text_column, regexp_replace(col(text_column), "[^a-zA-Z0-9\\s.!?]", ""))
    
    # Trim whitespace
    df = df.withColumn(text_column, trim(col(text_column)))
    
    # Replace URLs with placeholder
    df = df.withColumn(text_column, regexp_replace(col(text_column), "http\\S+", "<URL>"))
    
    # Replace numbers with placeholder (optional - depends on your use case)
    # df = df.withColumn(text_column, regexp_replace(col(text_column), "\\d+", "<NUM>"))
    
    return df

# Apply cleaning to train, validation, and test sets
train_df_clean = clean_text(train_df)
val_df_clean = clean_text(val_df)
test_df_clean = clean_text(test_df)

In [0]:
from pyspark.sql.functions import size, split, length

def extract_basic_features(df, text_column="review_text"):
    """
    Extract basic text features
    """
    # Word count
    df = df.withColumn("review_length_words", size(split(col(text_column), " ")))
    
    # Character count
    df = df.withColumn("review_length_chars", length(col(text_column)))
    
    # Sentence count (approximate)
    df = df.withColumn("sentence_count", 
                      size(split(col(text_column), "[.!?]+")) - 1)
    
    # Average word length
    df = df.withColumn("avg_word_length", 
                      col("review_length_chars") / col("review_length_words"))
    
    return df

# Apply to all datasets
train_df_features = extract_basic_features(train_df_clean)
val_df_features = extract_basic_features(val_df_clean)
test_df_features = extract_basic_features(test_df_clean)

print("Basic features added:")
train_df_features.select("review_length_words", "review_length_chars", "sentence_count", "avg_word_length").show(5)

Basic features added:
+-------------------+-------------------+--------------+-----------------+
|review_length_words|review_length_chars|sentence_count|  avg_word_length|
+-------------------+-------------------+--------------+-----------------+
|                 56|                303|             5|5.410714285714286|
|                 22|                119|             5|5.409090909090909|
|                120|                654|             7|             5.45|
|                 84|                443|             3|5.273809523809524|
|                  3|                 28|             0|9.333333333333334|
+-------------------+-------------------+--------------+-----------------+
only showing top 5 rows


In [0]:
# Install necessary Python libraries
%pip install nltk textblob textstat

import nltk
nltk.download('vader_lexicon')
nltk.download('punkt')

Collecting textblob
  Downloading textblob-0.19.0-py3-none-any.whl.metadata (4.4 kB)
Collecting textstat
  Downloading textstat-0.7.11-py3-none-any.whl.metadata (15 kB)
Collecting nltk
  Downloading nltk-3.9.2-py3-none-any.whl.metadata (3.2 kB)
Collecting pyphen (from textstat)
  Downloading pyphen-0.17.2-py3-none-any.whl.metadata (3.2 kB)
Downloading textblob-0.19.0-py3-none-any.whl (624 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/624.3 kB[0m [31m?[0m eta [36m-:--:--[0m
[2K   [91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.2/624.3 kB[0m [31m?[0m eta [36m-:--:--[0m
[2K   [91m━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m30.7/624.3 kB[0m [31m958.0 kB/s[0m eta [36m0:00:01[0m
[2K   [91m━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m41.0/624.3 kB[0m [31m382.3 kB/s[0m eta [36m0:00:02[0m
[2K   [91m━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m92.2/624.3 k

[nltk_data] Downloading package vader_lexicon to /root/nltk_data...
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


True

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
from nltk.sentiment import SentimentIntensityAnalyzer
from textblob import TextBlob

# Initialize sentiment analyzer
sia = SentimentIntensityAnalyzer()

# Define UDFs for sentiment analysis
def get_vader_sentiment(text):
    if not text or text.strip() == "":
        return (0.0, 0.0, 0.0, 0.0)
    scores = sia.polarity_scores(text)
    return (float(scores['pos']), float(scores['neg']), float(scores['neu']), float(scores['compound']))

def get_textblob_sentiment(text):
    if not text or text.strip() == "":
        return (0.0, 0.0)
    blob = TextBlob(text)
    return (float(blob.sentiment.polarity), float(blob.sentiment.subjectivity))

# Register UDFs
vader_udf = udf(get_vader_sentiment, 
                StructType([
                    StructField("vader_pos", FloatType()),
                    StructField("vader_neg", FloatType()),
                    StructField("vader_neu", FloatType()),
                    StructField("vader_compound", FloatType())
                ]))

textblob_udf = udf(get_textblob_sentiment,
                   StructType([
                       StructField("blob_polarity", FloatType()),
                       StructField("blob_subjectivity", FloatType())
                   ]))

def add_sentiment_features(df, text_column="review_text"):
    df = df.withColumn("vader_sentiment", vader_udf(col(text_column)))
    df = df.withColumn("textblob_sentiment", textblob_udf(col(text_column)))
    
    # Extract individual sentiment columns
    df = df.withColumn("vader_pos", col("vader_sentiment.vader_pos"))
    df = df.withColumn("vader_neg", col("vader_sentiment.vader_neg"))
    df = df.withColumn("vader_neu", col("vader_sentiment.vader_neu"))
    df = df.withColumn("vader_compound", col("vader_sentiment.vader_compound"))
    df = df.withColumn("blob_polarity", col("textblob_sentiment.blob_polarity"))
    df = df.withColumn("blob_subjectivity", col("textblob_sentiment.blob_subjectivity"))
    
    # Drop the struct columns
    df = df.drop("vader_sentiment", "textblob_sentiment")
    
    return df

# Apply sentiment analysis (only fit on training data)
train_df_sentiment = add_sentiment_features(train_df_features)
val_df_sentiment = add_sentiment_features(val_df_features)
test_df_sentiment = add_sentiment_features(test_df_features)

print("Sentiment features added:")
train_df_sentiment.select("vader_pos", "vader_neg", "vader_compound", "blob_polarity").show(5)

Sentiment features added:
+---------+---------+--------------+-------------+
|vader_pos|vader_neg|vader_compound|blob_polarity|
+---------+---------+--------------+-------------+
|    0.215|    0.122|        0.7446|      0.58125|
|      0.0|      0.0|           0.0|          0.0|
|    0.231|    0.096|        0.9701|    0.1516567|
|    0.253|    0.024|        0.9701|    0.3064394|
|    0.672|      0.0|        0.6249|          1.0|
+---------+---------+--------------+-------------+
only showing top 5 rows


In [0]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml import Pipeline

# Prepare text for TF-IDF
tokenizer = Tokenizer(inputCol="review_text", outputCol="words")
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
hashing_tf = HashingTF(inputCol="filtered_words", outputCol="raw_features", numFeatures=1000)
idf = IDF(inputCol="raw_features", outputCol="tfidf_features")

# Create pipeline
tfidf_pipeline = Pipeline(stages=[tokenizer, stopwords_remover, hashing_tf, idf])

# Fit ONLY on training data
tfidf_model = tfidf_pipeline.fit(train_df_sentiment)

# Transform all datasets
train_df_tfidf = tfidf_model.transform(train_df_sentiment)
val_df_tfidf = tfidf_model.transform(val_df_sentiment)
test_df_tfidf = tfidf_model.transform(test_df_sentiment)

print("TF-IDF features added")
train_df_tfidf.select("words", "filtered_words", "tfidf_features").show(3, truncate=50)

Downloading artifacts:   0%|          | 0/30 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

TF-IDF features added
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|                                             words|                                    filtered_words|                                    tfidf_features|
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|[so, i, really, enjoyed, this, story., i, found...|[really, enjoyed, story., found, three, narrato...|(1000,[31,56,61,85,112,189,263,277,365,373,385,...|
|[2.5, stars., very, hohum., slightly, predictab...|[2.5, stars., hohum., slightly, predictable., s...|(1000,[1,16,237,345,406,426,578,616,640,707],[2...|
|[4.5, stars., being, proclaimed, as, the, new, ...|[4.5, stars., proclaimed, new, gone, girl, big,...|(1000,[0,1,10,21,29,38,48,56,65,104,112,115,125...|
+-----------------------------------------------

In [0]:
from pyspark.ml.feature import NGram, CountVectorizer

# Create bigrams
ngram = NGram(n=2, inputCol="filtered_words", outputCol="bigrams")
bigram_pipeline = Pipeline(stages=[ngram])

# Fit on training data
bigram_model = bigram_pipeline.fit(train_df_tfidf)

# Transform all datasets
train_df_ngrams = bigram_model.transform(train_df_tfidf)
val_df_ngrams = bigram_model.transform(val_df_tfidf)
test_df_ngrams = bigram_model.transform(test_df_tfidf)

print("Bigram features added:")
train_df_ngrams.select("bigrams").show(3, truncate=50)

Downloading artifacts:   0%|          | 0/10 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Bigram features added:
+--------------------------------------------------+
|                                           bigrams|
+--------------------------------------------------+
|[really enjoyed, enjoyed story., story. found, ...|
|[2.5 stars., stars. hohum., hohum. slightly, sl...|
|[4.5 stars., stars. proclaimed, proclaimed new,...|
+--------------------------------------------------+
only showing top 3 rows


In [0]:
import textstat
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# Define readability UDFs
def flesch_reading_ease_udf(text):
    if not text or text.strip() == "":
        return 0.0
    return float(textstat.flesch_reading_ease(text))

def flesch_kincaid_grade_udf(text):
    if not text or text.strip() == "":
        return 0.0
    return float(textstat.flesch_kincaid_grade(text))

def smog_index_udf(text):
    if not text or text.strip() == "":
        return 0.0
    return float(textstat.smog_index(text))

# Register UDFs
flesch_reading_ease_udf = udf(flesch_reading_ease_udf, FloatType())
flesch_kincaid_grade_udf = udf(flesch_kincaid_grade_udf, FloatType())
smog_index_udf = udf(smog_index_udf, FloatType())

def add_readability_features(df, text_column="review_text"):
    df = df.withColumn("flesch_reading_ease", flesch_reading_ease_udf(col(text_column)))
    df = df.withColumn("flesch_kincaid_grade", flesch_kincaid_grade_udf(col(text_column)))
    df = df.withColumn("smog_index", smog_index_udf(col(text_column)))
    return df

# Apply readability features
train_df_readability = add_readability_features(train_df_ngrams)
val_df_readability = add_readability_features(val_df_ngrams)
test_df_readability = add_readability_features(test_df_ngrams)

print("Readability features added:")
train_df_readability.select("flesch_reading_ease", "flesch_kincaid_grade", "smog_index").show(5)

Readability features added:
+-------------------+--------------------+----------+
|flesch_reading_ease|flesch_kincaid_grade|smog_index|
+-------------------+--------------------+----------+
|          73.099144|            5.845857|  8.841846|
|          61.450455|           10.153636|  8.841846|
|          57.268616|           11.084718| 11.208143|
|          65.499054|            9.278403| 11.208143|
|              90.99|           1.3133334|    3.1291|
+-------------------+--------------------+----------+
only showing top 5 rows


In [0]:
# Note: This step requires significant computational resources
# Consider running on a GPU-enabled cluster for better performance

%pip install transformers torch

from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import ArrayType, FloatType
import pandas as pd
import numpy as np
from transformers import AutoTokenizer, AutoModel
import torch

# Load pre-trained model and tokenizer
model_name = "distilbert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name)

def get_embeddings(texts):
    """Generate embeddings for a batch of texts"""
    inputs = tokenizer(texts.tolist(), padding=True, truncation=True, 
                      max_length=128, return_tensors="pt")
    
    with torch.no_grad():
        outputs = model(**inputs)
    
    # Use [CLS] token embedding as document representation
    embeddings = outputs.last_hidden_state[:, 0, :].numpy()
    return pd.Series([embeddings[i] for i in range(len(embeddings))])

# Define pandas UDF for embeddings
@pandas_udf(ArrayType(FloatType()))
def get_embeddings_udf(texts: pd.Series) -> pd.Series:
    return get_embeddings(texts)

# Apply to a sample of data (due to computational constraints)
sample_train = train_df_readability.limit(1000)
sample_train = sample_train.withColumn("bert_embedding", get_embeddings_udf(col("review_text")))

print("BERT embeddings generated for sample")

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


2025-11-14 00:35:16.573269: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-11-14 00:35:16.693613: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-11-14 00:35:16.815915: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-11-14 00:35:16.913442: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2025-11-14 00:35:16.939630: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-11-14 00:35:17.138341: I tensorflow/core/platform/cpu_feature_gu

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/483 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/268M [00:00<?, ?B/s]

BERT embeddings generated for sample


In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors

# Select numerical features for final feature vector
feature_columns = [
    "review_length_words", "review_length_chars", "sentence_count", "avg_word_length",
    "vader_pos", "vader_neg", "vader_neu", "vader_compound",
    "blob_polarity", "blob_subjectivity",
    "flesch_reading_ease", "flesch_kincaid_grade", "smog_index"
]

# Create final feature vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Transform all datasets
final_train = assembler.transform(train_df_readability)
final_val = assembler.transform(val_df_readability)
final_test = assembler.transform(test_df_readability)

# Add metadata columns for identification
final_columns = ["review_id", "book_id", "user_id", "rating", "review_text", "features"]

final_train_selected = final_train.select(final_columns)
final_val_selected = final_val.select(final_columns)
final_test_selected = final_test.select(final_columns)

print("Final feature engineering complete:")
final_train_selected.show(3)

Final feature engineering complete:
+--------------------+--------+--------------------+------+--------------------+--------------------+
|           review_id| book_id|             user_id|rating|         review_text|            features|
+--------------------+--------+--------------------+------+--------------------+--------------------+
|000146735f713ef96...|23598478|bb421cc1b9862e84a...|     5|received through ...|[112.0,588.0,10.0...|
|0001c26c917d676c6...| 7817785|20d2b5cbac006cad9...|     4|i liked it but fe...|[77.0,402.0,6.0,5...|
|0002b1b4e05010d33...|13542832|377f543f3690c75a5...|     5|a very entertaini...|[108.0,605.0,4.0,...|
+--------------------+--------+--------------------+------+--------------------+--------------------+
only showing top 3 rows


In [0]:
# Save the final feature datasets
final_train_selected.write.mode("overwrite").format("delta").save(
    "abfss://lakehouse@goodreadsreviews60301511.dfs.core.windows.net/gold/features_v2/train_final"
)

final_val_selected.write.mode("overwrite").format("delta").save(
    "abfss://lakehouse@goodreadsreviews60301511.dfs.core.windows.net/gold/features_v2/validation_final"
)

final_test_selected.write.mode("overwrite").format("delta").save(
    "abfss://lakehouse@goodreadsreviews60301511.dfs.core.windows.net/gold/features_v2/test_final"
)

print("All feature datasets saved successfully!")

All feature datasets saved successfully!


In [0]:
# Generate feature summary
print("=== FEATURE ENGINEERING SUMMARY ===")
print(f"Training set: {final_train_selected.count()} records")
print(f"Validation set: {final_val_selected.count()} records")
print(f"Test set: {final_test_selected.count()} records")

# Show what columns are available
print("\nAvailable columns:")
for col in final_train_selected.columns:
    print(f"  - {col}")

# Show feature vector sample
print("\nSample feature vector:")
final_train_selected.select("review_id", "rating", "features").show(3, truncate=50)

# Verify no data leakage
print("\n=== DATA LEAKAGE CHECK ===")
print("✓ All transformers fitted ONLY on training data")
print("✓ Validation and test sets transformed using fitted objects only")
print("✓ Proper train/validation/test split before feature engineering")

# Get feature statistics from the intermediate DataFrame
print("\n=== FEATURE STATISTICS (From Training Data) ===")

# Check if we have the intermediate DataFrame with individual features
if 'train_df_readability' in locals():
    train_stats = train_df_readability.select(
        mean("review_length_words").alias("avg_words"),
        mean("review_length_chars").alias("avg_chars"),
        mean("vader_compound").alias("avg_sentiment"),
        mean("blob_polarity").alias("avg_blob_polarity"),
        mean("flesch_reading_ease").alias("avg_readability")
    ).collect()[0]
    
    print(f"Average words per review: {train_stats['avg_words']:.2f}")
    print(f"Average characters per review: {train_stats['avg_chars']:.2f}")
    print(f"Average VADER sentiment: {train_stats['avg_sentiment']:.4f}")
    print(f"Average TextBlob polarity: {train_stats['avg_blob_polarity']:.4f}")
    print(f"Average readability score: {train_stats['avg_readability']:.2f}")
else:
    # Alternative: Show basic stats from the original data
    basic_stats = train_df.select(
        mean(length("review_text")).alias("avg_chars"),
        mean(size(split("review_text", " "))).alias("avg_words")
    ).collect()[0]
    print(f"Average characters per review: {basic_stats['avg_chars']:.2f}")
    print(f"Average words per review: {basic_stats['avg_words']:.2f}")

# Show feature vector dimensions
print(f"\nFeature vector dimensions:")
feature_sample = final_train_selected.select("features").first()[0]
print(f"  - Number of features: {len(feature_sample)}")
print(f"  - Feature vector type: {type(feature_sample).__name__}")

print("\n=== FEATURE ENGINEERING COMPLETED SUCCESSFULLY ===")

=== FEATURE ENGINEERING SUMMARY ===
Training set: 13783384 records
Validation set: 2951447 records
Test set: 2953972 records

Available columns:
  - review_id
  - book_id
  - user_id
  - rating
  - review_text
  - features

Sample feature vector:
+--------------------------------+------+--------------------------------------------------+
|                       review_id|rating|                                          features|
+--------------------------------+------+--------------------------------------------------+
|00019f560216496370f244e1a58065d7|     4|[56.0,303.0,5.0,5.410714285714286,0.21500000357...|
|00020ca0a1d0820b97bfaccbb386f749|     3|[22.0,119.0,5.0,5.409090909090909,0.0,0.0,1.0,0...|
|0004c2104830760e15fba28d5a4380b6|     4|[120.0,654.0,7.0,5.45,0.23100000619888306,0.096...|
+--------------------------------+------+--------------------------------------------------+
only showing top 3 rows

=== DATA LEAKAGE CHECK ===
✓ All transformers fitted ONLY on training data
✓ 