In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Goodreads_Feature_Preparation").getOrCreate()

print("✅ Spark session started")


✅ Spark session started


In [0]:
# Set Azure key
spark.conf.set(
    "fs.azure.account.key.goodreadsreviews60107031.dfs.core.windows.net",
    "uu8nYhaDL3GsV9lth+OpwIKB+K5rK30hFzynvbAjiDKlZIeJ+GAboPBrtwmXoRCPLAP446WZiO+q+AStNlv5NQ=="
)

# ✅ Correct path (Gold Features v1)
gold_path = "abfss://lakehouse@goodreadsreviews60107031.dfs.core.windows.net/gold/features_v1"

# Load
gold_df = spark.read.format("delta").load(gold_path)

print("✅ Loaded Gold features_v1 successfully")
gold_df.printSchema()
gold_df.show(5, truncate=False)




✅ Loaded Gold features_v1 successfully
root
 |-- book_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- review_length_words: integer (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- num_reviews: long (nullable = true)

+--------+--------------------------------+-------------------------------+-------+--------------------------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
# Step 4: Split the dataset into Train (70%), Validation (15%), Test (15%)

# Randomly split with fixed seed for reproducibility
train_df, val_df, test_df = gold_df.randomSplit([0.7, 0.15, 0.15], seed=42)

print("✅ Dataset split complete")
print(f"Train count: {train_df.count()}")
print(f"Validation count: {val_df.count()}")
print(f"Test count: {test_df.count()}")


✅ Dataset split complete
Train count: 10482164
Validation count: 2243987
Test count: 2245220


In [0]:
# Define output paths for the new splits
output_base = "abfss://lakehouse@goodreadsreviews60107031.dfs.core.windows.net/gold/feature_v2"

train_df.write.mode("overwrite").format("delta").save(f"{output_base}/train")
val_df.write.mode("overwrite").format("delta").save(f"{output_base}/validation")
test_df.write.mode("overwrite").format("delta").save(f"{output_base}/test")

print("✅ Saved splits successfully to gold/feature_v2/")



✅ Saved splits successfully to gold/feature_v2/


In [0]:
display(dbutils.fs.ls("abfss://lakehouse@goodreadsreviews60107031.dfs.core.windows.net/gold/feature_v2/"))


path,name,size,modificationTime
abfss://lakehouse@goodreadsreviews60107031.dfs.core.windows.net/gold/feature_v2/test/,test/,0,1762977439000
abfss://lakehouse@goodreadsreviews60107031.dfs.core.windows.net/gold/feature_v2/train/,train/,0,1762977380000
abfss://lakehouse@goodreadsreviews60107031.dfs.core.windows.net/gold/feature_v2/validation/,validation/,0,1762977420000


In [0]:
# Step 1: Load the training split from the Gold layer

train_path = "abfss://lakehouse@goodreadsreviews60107031.dfs.core.windows.net/gold/feature_v2/train"

train_df = spark.read.format("delta").load(train_path)

print("✅ Loaded feature_v2/train successfully")
train_df.printSchema()
train_df.show(5, truncate=False)


✅ Loaded feature_v2/train successfully
root
 |-- book_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)
 |-- review_length_words: integer (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- num_reviews: long (nullable = true)

+--------+--------------------------------+--------------------------------+-------+--------------------------------+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
%pip install emoji


Collecting emoji
  Downloading emoji-2.15.0-py3-none-any.whl.metadata (5.7 kB)
Downloading emoji-2.15.0-py3-none-any.whl (608 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/608.4 kB[0m [31m?[0m eta [36m-:--:--[0m
[2K   [91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.2/608.4 kB[0m [31m?[0m eta [36m-:--:--[0m
[2K   [91m━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m30.7/608.4 kB[0m [31m923.2 kB/s[0m eta [36m0:00:01[0m
[2K   [91m━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m41.0/608.4 kB[0m [31m367.9 kB/s[0m eta [36m0:00:02[0m
[2K   [91m━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m92.2/608.4 kB[0m [31m681.0 kB/s[0m eta [36m0:00:01[0m
[2K   [91m━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m92.2/608.4 kB[0m [31m681.0 kB/s[0m eta [36m0:00:01[0m
[2K   [91m━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1

In [0]:
import re
import emoji
from pyspark.sql.functions import udf, col, length, trim
from pyspark.sql.types import StringType

# Define a cleaning function
def clean_text(text):
    if text is None:
        return ""
    # Convert to lowercase
    text = text.lower()
    # Replace URLs
    text = re.sub(r'http\S+|www\S+', ' URL ', text)
    # Replace numbers
    text = re.sub(r'\d+', ' NUM ', text)
    # Remove punctuation
    text = re.sub(r'[^\w\s]', '', text)
    # Replace emojis
    text = emoji.replace_emoji(text, replace=' EMOJI ')
    # Remove extra spaces
    text = re.sub(r'\s+', ' ', text).strip()
    return text

# Register as UDF
clean_text_udf = udf(clean_text, StringType())

# Apply cleaning
cleaned_df = train_df.withColumn("clean_review", clean_text_udf(col("review_text")))

# Filter out empty or too-short reviews (<10 chars)
cleaned_df = cleaned_df.filter(length(trim(col("clean_review"))) >= 10)

print("✅ Text cleaning and normalization complete")
cleaned_df.select("review_text", "clean_review").show(5, truncate=False)


✅ Text cleaning and normalization complete
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|review_text                                                                                                                                                            

In [0]:
from pyspark.sql.functions import length, split, size, col

# Add basic text length features
features_basic = (
    cleaned_df
    .withColumn("review_length_chars", length(col("clean_review")))
    .withColumn("review_length_words", size(split(col("clean_review"), " ")))
)

print("✅ Added basic text features: review_length_words, review_length_chars")
features_basic.select("clean_review", "review_length_words", "review_length_chars").show(5, truncate=False)


✅ Added basic text features: review_length_words, review_length_chars
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+-------------------+
|clean_review                                                                                                                                                                                                                                                                                                                                                                                |review_length_words|review_length_chars|
+-----------------------------------------------------------------------------------

In [0]:
%pip install nltk


[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
import nltk
nltk.download('vader_lexicon')

from nltk.sentiment.vader import SentimentIntensityAnalyzer
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# Initialize VADER sentiment analyzer
analyzer = SentimentIntensityAnalyzer()

# Define UDFs for each sentiment score
def get_sentiment_compound(text):
    return float(analyzer.polarity_scores(text)['compound']) if text else 0.0

def get_sentiment_pos(text):
    return float(analyzer.polarity_scores(text)['pos']) if text else 0.0

def get_sentiment_neg(text):
    return float(analyzer.polarity_scores(text)['neg']) if text else 0.0

def get_sentiment_neu(text):
    return float(analyzer.polarity_scores(text)['neu']) if text else 0.0

sentiment_compound_udf = udf(get_sentiment_compound, FloatType())
sentiment_pos_udf = udf(get_sentiment_pos, FloatType())
sentiment_neg_udf = udf(get_sentiment_neg, FloatType())
sentiment_neu_udf = udf(get_sentiment_neu, FloatType())

# Add sentiment columns
features_sentiment = (
    features_basic
    .withColumn("sentiment_compound", sentiment_compound_udf(col("clean_review")))
    .withColumn("sentiment_pos", sentiment_pos_udf(col("clean_review")))
    .withColumn("sentiment_neg", sentiment_neg_udf(col("clean_review")))
    .withColumn("sentiment_neu", sentiment_neu_udf(col("clean_review")))
)

print("✅ Added sentiment features (compound, pos, neg, neu)")
features_sentiment.select("clean_review", "sentiment_compound", "sentiment_pos", "sentiment_neg", "sentiment_neu").show(5, truncate=False)


[nltk_data] Downloading package vader_lexicon to /root/nltk_data...


✅ Added sentiment features (compound, pos, neg, neu)
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+-------------+-------------+-------------+
|clean_review                                                                                                                                                                                                                                                                                                                                                                                |sentiment_compound|sentiment_pos|sentiment_neg|sentiment_neu|
+----------------------------------------------------------

In [0]:
%pip install scikit-learn


[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
from sklearn.feature_extraction.text import TfidfVectorizer
import pandas as pd

# Convert to pandas
pdf = features_sentiment.select("clean_review").limit(5000).toPandas()  # limit for efficiency

# Initialize TF-IDF Vectorizer
vectorizer = TfidfVectorizer(max_features=1000, stop_words='english', ngram_range=(1,2))
tfidf_matrix = vectorizer.fit_transform(pdf["clean_review"])

# Convert TF-IDF to DataFrame
tfidf_df = pd.DataFrame(tfidf_matrix.toarray(), columns=vectorizer.get_feature_names_out())

print("✅ TF-IDF feature matrix created:", tfidf_df.shape)
tfidf_df.head()


✅ TF-IDF feature matrix created: (5000, 1000)


Unnamed: 0,abby,ability,able,absolutely,account,action,actions,actual,actually,add,added,addition,adlerolsen,admit,adult,adults,adventure,age,ago,al,alert,alex,alive,amazing,america,american,anche,angel,angels,animals,appreciate,arent,art,aspect,assad,assistant,attention,audio,audiobook,author,...,wish,wl,woman,women,wonder,wonderful,wondering,wont,word,words,work,worked,working,works,world,worse,worth,wouldnt,wow,write,writer,writers,writes,writing,writing style,written,wrong,wrote,ya,yang,year,years,years ago,yes,york,youll,young,younger,youre,youve
0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.149277,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
2,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,0.0,0.0,0.0,0.0,0.142087,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.113328,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.143177,0.0,0.164521,...,0.0,0.0,0.0,0.0,0.0,0.111531,0.0,0.0,0.0,0.0,0.090973,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.083934,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [0]:
%pip install sentence-transformers


[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
from sentence_transformers import SentenceTransformer
import numpy as np

model = SentenceTransformer('all-MiniLM-L6-v2')
embeddings = model.encode(pdf["clean_review"].tolist(), show_progress_bar=True)
print("✅ BERT embeddings created:", np.array(embeddings).shape)


2025-11-12 20:07:37.559292: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-11-12 20:07:37.649100: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-11-12 20:07:37.786645: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-11-12 20:07:37.892145: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2025-11-12 20:07:37.931609: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-11-12 20:07:38.186604: I tensorflow/core/platform/cpu_feature_gu

modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Batches:   0%|          | 0/157 [00:00<?, ?it/s]

✅ BERT embeddings created: (5000, 384)


In [0]:
gold_features_path = "abfss://lakehouse@goodreadsreviews60107031.dfs.core.windows.net/gold/features_v2"

features_sentiment.write.format("delta").mode("overwrite").save(gold_features_path)
print("✅ Saved feature_v2 dataset to Gold layer")


✅ Saved feature_v2 dataset to Gold layer
