In [0]:
# Minimal installs – do NOT touch numpy/pyarrow

# emojis for cleaning
%pip install emoji==2.12.1

# transformers for BERT (no deps so we don't break numpy/pyarrow)
%pip install "transformers==4.57.1" --no-deps

# CPU-only torch (no CUDA, no libcusparseLt.so.0)
%pip install "torch==2.2.2+cpu" \
    --index-url https://download.pytorch.org/whl/cpu \
    --no-deps


[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
Looking in indexes: https://download.pytorch.org/whl/cpu
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
dbutils.library.restartPython()

In [0]:
%pip install "huggingface_hub==0.36.0"


[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
import torch
from transformers import AutoTokenizer, AutoModel
import huggingface_hub

print("Torch:", torch.__version__)
print("HF Hub:", huggingface_hub.__version__)

model_name = "sentence-transformers/all-MiniLM-L6-v2"
tokenizer = AutoTokenizer.from_pretrained(model_name)
bert_model = AutoModel.from_pretrained(model_name)

print("Model loaded OK")


Torch: 2.2.2+cpu
HF Hub: 0.36.0
Model loaded OK


In [0]:
# Imports
import re
import emoji
import nltk
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from pyspark.sql.functions import udf, col, length, size, split
from pyspark.sql.types import StringType, IntegerType, FloatType, ArrayType, StructType, StructField
from pyspark.sql import SparkSession
from sklearn.feature_extraction.text import TfidfVectorizer
from pyspark.sql.functions import pandas_udf
import pandas as pd

# use transformers + torch instead of sentence_transformers
from transformers import AutoTokenizer, AutoModel
import torch

# Spark session
spark = SparkSession.builder.getOrCreate()

# Set storage key
spark.conf.set(
    "fs.azure.account.key.goodreadsreviews60104384.dfs.core.windows.net",
    "6GsPhkGZNxjNX3ph9pR77Kb5jZVywf/ZnBgwaKQPBtToe+sBA9pAoHqA5w7Ls5atRqGPbG8CyhNu+ASts8Yhzw=="
)
print("Key set: OK")


Key set: OK


In [0]:
# Delete old
dbutils.fs.rm("abfss://lakehouse@goodreadsreviews60104384.dfs.core.windows.net/gold/feature_v2/", recurse=True)
dbutils.fs.rm("abfss://lakehouse@goodreadsreviews60104384.dfs.core.windows.net/gold/features_v2/", recurse=True)
print("Old deleted: OK")

Old deleted: OK


In [0]:
# Load curated from Lab 3
gold_df = spark.read.format("delta").load("abfss://lakehouse@goodreadsreviews60104384.dfs.core.windows.net/gold/curated_reviews")
gold_df = gold_df.dropDuplicates().na.drop(subset=["review_id", "book_id", "rating", "review_text"])
print("Curated rows:", gold_df.count())
gold_df.show(5)

Curated rows: 15739524
+--------------------+--------+--------------------+---------+---------------+--------------------+------+--------------------+--------+-------+----------+
|           review_id| book_id|               title|author_id|    author_name|             user_id|rating|         review_text|language|n_votes|date_added|
+--------------------+--------+--------------------+---------+---------------+--------------------+------+--------------------+--------+-------+----------+
|acf387f7c35f22a39...|24779471|Discovering Delil...|  3023973| Melissa Foster|a830edac1d0f3071a...|     4|Find my reviews h...|     eng|      1|2015-08-16|
|45af3578aaa752d42...|18599572|The Intern (The I...|  7307421|Gabrielle Tozer|4514a6d26dbc55338...|     1|A very flat read,...|     eng|      2|2015-02-20|
|9a9c2ccd50c0c2c84...|   19063|      The Book Thief|    11466|   Markus Zusak|8fe69901571244c1f...|     4|This was our offi...|     eng|      0|2011-01-21|
|d87cf3533507428e2...| 6605685| She Walks

In [0]:
# II. Split (70/15/15 stratified)
fractions = {rating: 0.7 for rating in range(6)}
train_df = gold_df.sampleBy("rating", fractions=fractions, seed=42)
remaining_df = gold_df.exceptAll(train_df)
val_df = remaining_df.sample(fraction=0.5, seed=42)
test_df = remaining_df.exceptAll(val_df)

train_df.write.mode("overwrite").format("delta").save("abfss://lakehouse@goodreadsreviews60104384.dfs.core.windows.net/gold/feature_v2/train/")
val_df.write.mode("overwrite").format("delta").save("abfss://lakehouse@goodreadsreviews60104384.dfs.core.windows.net/gold/feature_v2/validation/")
test_df.write.mode("overwrite").format("delta").save("abfss://lakehouse@goodreadsreviews60104384.dfs.core.windows.net/gold/feature_v2/test/")
print("Splits: Train", train_df.count(), "Val", val_df.count(), "Test", test_df.count())

Splits: Train 11017216 Val 2360634 Test 2361674


In [0]:
# III.2 Load train
train_df = spark.read.format("delta").load("abfss://lakehouse@goodreadsreviews60104384.dfs.core.windows.net/gold/feature_v2/train/")
df = train_df.select("review_id", "book_id", "rating", "review_text")

In [0]:
# III.3 Cleaning
def clean_text(text):
    if text is None:
        return ""
    text = text.lower()
    text = re.sub(r'[^\w\s]', '', text)
    text = re.sub(r'\s+', ' ', text)
    text = re.sub(r'http\S+|www\S+|https\S+', '<URL>', text)
    text = re.sub(r'\d+', '<NUM>', text)
    text = emoji.replace_emoji(text, '<EMOJI>')
    text = text.strip()
    if len(text) < 10:
        return ""
    return text

clean_udf = udf(clean_text, StringType())
df = df.withColumn("cleaned_text", clean_udf(col("review_text")))
df = df.filter(col("cleaned_text") != "")

In [0]:
# III.4.a Basic
df = df.withColumn("review_length_chars", length(col("cleaned_text")))
df = df.withColumn("words", split(col("cleaned_text"), " "))
df = df.withColumn("review_length_words", size(col("words")))

def avg_word_length(words):
    if not words:
        return 0.0
    return sum(len(w) for w in words) / len(words)

avg_udf = udf(avg_word_length, FloatType())
df = df.withColumn("avg_word_length", avg_udf(col("words")))

In [0]:
import nltk
from nltk.sentiment.vader import SentimentIntensityAnalyzer

nltk.download("vader_lexicon")
sia = SentimentIntensityAnalyzer()


[nltk_data] Downloading package vader_lexicon to
[nltk_data]     /home/spark-6d61b057-b85d-430f-92f2-b0/nltk_data...


In [0]:
# III.4.b Sentiment
def get_sentiment(text):
    scores = sia.polarity_scores(text)
    return scores['pos'], scores['neg'], scores['neu'], scores['compound']

sent_schema = StructType([StructField(f"sentiment_{k}", FloatType(), True) for k in ['pos', 'neg', 'neu', 'compound']])
sent_udf = udf(get_sentiment, sent_schema)
df = df.withColumn("sentiment", sent_udf(col("cleaned_text")))
df = df.select("*", "sentiment.*").drop("sentiment")

In [0]:
from sklearn.feature_extraction.text import TfidfVectorizer
from pyspark.sql.functions import pandas_udf, col
from pyspark.sql.types import ArrayType, FloatType
import pandas as pd

# III.4.c TF-IDF with scikit-learn (fit on sample, transform ALL rows)

# 1) Fit vocabulary on a manageable SAMPLE (to avoid >4 GiB driver result)
#    This does NOT sample the training data for the final model – only for learning the vocabulary.
sample_fraction = 0.05  # 5% is usually fine; reduce to 0.02 if still heavy

sample_pd = (
    df.sample(fraction=sample_fraction, seed=42)
      .select("cleaned_text")
      .toPandas()
)

vectorizer = TfidfVectorizer(
    max_features=500,           # 500 features as in your lab
    stop_words="english",
    ngram_range=(1, 2)
)
vectorizer.fit(sample_pd["cleaned_text"])

# 2) Apply TF-IDF to ALL rows via pandas_udf
@pandas_udf(ArrayType(FloatType()))
def tfidf_udf(texts: pd.Series) -> pd.Series:
    # texts is a pandas Series of cleaned_text strings
    tfidf = vectorizer.transform(texts.fillna("").tolist())
    return pd.Series([
        row.toarray().ravel().astype("float32").tolist()
        for row in tfidf
    ])

df = df.withColumn("tfidf_features", tfidf_udf(col("cleaned_text")))

# 3) Explode to tfidf_0 ... tfidf_499
for i in range(500):
    df = df.withColumn(f"tfidf_{i}", col("tfidf_features")[i])

df = df.drop("tfidf_features")


In [0]:
# III.4.d Embeddings (distributed with pandas_udf, using transformers + torch)

from transformers import AutoTokenizer, AutoModel
import torch
from pyspark.sql.functions import pandas_udf, col
from pyspark.sql.types import ArrayType, FloatType
import pandas as pd

# Load BERT model and tokenizer once
model_name = "sentence-transformers/all-MiniLM-L6-v2"
tokenizer = AutoTokenizer.from_pretrained(model_name)
bert_model = AutoModel.from_pretrained(model_name)
bert_model = bert_model.to("cpu")
bert_model.eval()

# pandas UDF for distributed embedding generation
@pandas_udf(ArrayType(FloatType()))
def bert_embed_udf(texts: pd.Series) -> pd.Series:
    encoded = tokenizer(
        texts.fillna("").tolist(),
        padding=True,
        truncation=True,
        max_length=128,
        return_tensors="pt"
    )
    with torch.no_grad():
        outputs = bert_model(**encoded)
        # mean pooling → one embedding vector per sentence
        embeddings = outputs.last_hidden_state.mean(dim=1)
    return pd.Series(
        [vec.cpu().numpy().astype("float32").tolist() for vec in embeddings]
    )

# Apply BERT embeddings to ALL rows (no sampling)
df = df.withColumn("bert_embedding", bert_embed_udf(col("cleaned_text")))

# Explode 384 dimensions to bert_emb_0 ... bert_emb_383
for i in range(384):
    df = df.withColumn(f"bert_emb_{i}", col("bert_embedding")[i])

df = df.drop("bert_embedding")

# Drop intermediate text columns
df = df.drop("review_text", "cleaned_text", "words")


In [0]:
# IV. Combined Feature Set - FINAL WORKING VERSION (NO COUNT/DISPLAY)

from pyspark.sql.functions import col

print("Step 1: Creating combined feature matrix...")

# List all feature columns
all_feature_columns = (
    [f"tfidf_{i}" for i in range(500)] + 
    [f"bert_emb_{i}" for i in range(384)] +
    ["review_length_chars", "review_length_words", "avg_word_length",
     "sentiment_pos", "sentiment_neg", "sentiment_neu", "sentiment_compound"]
)

print(f" Total features: {len(all_feature_columns)}")
print("   - 500 TF-IDF features")
print("   - 384 BERT embedding features") 
print("   - 7 basic text features")

# Select metadata + all features - THIS IS YOUR FEATURE MATRIX
final_df = df.select(["review_id", "book_id", "rating"] + all_feature_columns)

print(" FEATURE MATRIX CREATED SUCCESSFULLY")


Step 1: Creating combined feature matrix...
 Total features: 891
   - 500 TF-IDF features
   - 384 BERT embedding features
   - 7 basic text features
 FEATURE MATRIX CREATED SUCCESSFULLY


In [0]:
# Step 2: CORRECT SOLUTION - Save what you have
print("Step 2: Saving feature matrix to Gold layer...")
features_path = "abfss://lakehouse@goodreadsreviews60104384.dfs.core.windows.net/gold/features_v2/train/"

# Just save what you've created - this meets Lab 4 requirements
final_df.write.mode("overwrite").format("delta").save(features_path)

print(" SUCCESS: Feature matrix saved to features_v2/train/")


Step 2: Saving feature matrix to Gold layer...


IOStream.flush timed out


[0;31m---------------------------------------------------------------------------[0m
[0;31mSparkConnectGrpcException[0m                 Traceback (most recent call last)
File [0;32m<command-7777873038985556>, line 6[0m
[1;32m      3[0m features_path [38;5;241m=[39m [38;5;124m"[39m[38;5;124mabfss://lakehouse@goodreadsreviews60104384.dfs.core.windows.net/gold/features_v2/train/[39m[38;5;124m"[39m
[1;32m      5[0m [38;5;66;03m# Just save what you've created - this meets Lab 4 requirements[39;00m
[0;32m----> 6[0m final_df[38;5;241m.[39mwrite[38;5;241m.[39mmode([38;5;124m"[39m[38;5;124moverwrite[39m[38;5;124m"[39m)[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)[38;5;241m.[39msave(features_path)
[1;32m      8[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124m SUCCESS: Feature matrix saved to features_v2/train/[39m[38;5;124m"[39m)

File [0;32m/databricks/spark/python/pyspark/sql/connect/readwriter.py:679[0m, in [0;36