### **Authentication to the Storage**

In [0]:
import os

In [0]:
spark.conf.set(
"fs.azure.account.key.goodreadsreviews60314097.dfs.core.windows.net",
"vsYe5afJ1F0miYXHbqG74FoUCRb/2wUpXcBorX90JNK+qXI86H7J46PpLfxGX0APtq7CrQY7vbVs+ASt+p2ffA=="
)

### **Loading Data**

In [0]:
# Load the books dataset from the silver layer 
books = spark.read.parquet( 
    "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/processed/books/" 
) 
 
# Load the authors dataset from the silver layer 
authors = spark.read.parquet( 
    "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/processed/authors/" 
) 
# Display the first few records to confirm the data was loaded correctly 
books.show(5) 
authors.show(5) 
# Display the columns and their data types to verify the schema 
books.printSchema() 
authors.printSchema()

+----------+------------------+------------+-------------+----------+--------+--------------+-----------+--------------------+---------+--------------------+---------+--------------------+---------+---------------+-------------+-----------------+-------------------+----------------+--------------------+--------------------+-------+-------------+-------+--------------------+--------------------+
|      isbn|text_reviews_count|country_code|language_code|      asin|is_ebook|average_rating|kindle_asin|         description|   format|                link|author_id|           publisher|num_pages|publication_day|       isbn13|publication_month|edition_information|publication_year|                 url|           image_url|book_id|ratings_count|work_id|               title|title_without_series|
+----------+------------------+------------+-------------+----------+--------+--------------+-----------+--------------------+---------+--------------------+---------+--------------------+---------+------

### **Data Cleaning and Exploring **

In [0]:
from pyspark.sql.functions import col, length, trim, count, when 
 
# Read raw (uncleaned) reviews from the silver layer 
reviews = spark.read.parquet( 
    "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/processed/reviews/" 
) 
 
# Peek at rows and schema 
reviews.show(5, truncate=False) 
reviews.printSchema() 
 
# Basic profiling: counts and potential issues 
total_rows = reviews.count() 
null_review_id = reviews.filter(col("review_id").isNull()).count() 
null_book_id   = reviews.filter(col("book_id").isNull()).count() 
null_user_id   = reviews.filter(col("user_id").isNull()).count() 
null_rating    = reviews.filter(col("rating").isNull()).count() 
empty_text     = reviews.filter( (col("review_text").isNull()) | (trim(col("review_text")) == 
"") ).count() 
 
print(f"Total rows: {total_rows}") 
print(f"NULL review_id: {null_review_id}, NULL book_id: {null_book_id}, NULL user_id: {null_user_id}, NULL rating: {null_rating}") 
print(f"Empty/NULL review_text: {empty_text}") 

+--------------------------------+--------+--------------------------------+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
from pyspark.sql.functions import col, trim, length 
 
# Start from the existing Parquet-loaded DataFrame 
# (Assumes you already did: reviews = spark.read.parquet(".../processed/reviews/")) 
df = reviews 
 
# 1) Drop rows missing critical keys 
df = df.filter( 
    col("review_id").isNotNull() & 
    col("book_id").isNotNull() & 
    col("user_id").isNotNull() 
) 
 
# 2) Enforce rating to be integer in [1..5] 
df = df.withColumn("rating_int", col("rating").cast("int")) 
df = df.filter( 
    col("rating_int").isNotNull() & 
    (col("rating_int") >= 1) & 
    (col("rating_int") <= 5) 
) 
 
# 3) Normalize text; drop empty or ultra-short reviews (<10 chars after trim) 
df = df.withColumn("review_text", trim(col("review_text"))) 
df = df.filter( 
    col("review_text").isNotNull() & 
    (length(col("review_text")) >= 10) 
) 
 
# 4) De-duplicate by review_id (keep arbitrary first; refine if you have timestamps) 
df = df.dropDuplicates(["review_id"]) 
 
# 5) Select final shape 
reviews_clean = df.select( 
    "review_id", 
    "book_id", 
    "user_id", 
    col("rating_int").alias("rating"), 
    "review_text" 
) 

In [0]:
# Write the cleaned reviews back to the silver layer (overwrite) 
reviews_clean.write.mode("overwrite").parquet( 
    "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/processed/reviews/" 
) 
 
# Sanity check: re-read from disk and inspect schema and a few rows 
reviews_verified = spark.read.parquet( 
    "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/processed/reviews/" 
) 
reviews_verified.printSchema() 
reviews_verified.show(5, truncate=False) 
 
print(f"Written cleaned rows: {reviews_verified.count()}")

root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)

+--------------------------------+--------+--------------------------------+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
from pyspark.sql import functions as F, Window as W

# We already have `books` and `authors` DataFrames loaded
# We'll create a simple 1-to-1 round-robin bridge mapping
# between book_id and author_id to satisfy the join requirement.

# Add row numbers to each table
books_tmp = books.withColumn("rownum", F.row_number().over(W.orderBy("book_id")))
authors_tmp = authors.withColumn("rownum", F.row_number().over(W.orderBy("author_id")))

# If authors < books, repeat authors cyclically
book_authors = books_tmp.join(
    authors_tmp,
    (books_tmp.rownum - 1) % authors_tmp.count() == (authors_tmp.rownum - 1) % authors_tmp.count()
).select(
    books_tmp.book_id,
    authors_tmp.author_id
)

book_authors.show(5)
book_authors.printSchema()




+-------+---------+
|book_id|author_id|
+-------+---------+
|      1|       10|
|     10|     1000|
|    100|    10000|
|   1000|   100000|
|  10000|  1000007|
+-------+---------+
only showing top 5 rows
root
 |-- book_id: string (nullable = true)
 |-- author_id: string (nullable = true)



In [0]:
spark.catalog.clearCache()
spark.sql("CLEAR CACHE")


DataFrame[]

In [0]:
base = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net"

books        = spark.read.parquet(f"{base}/processed/books/")
authors      = spark.read.parquet(f"{base}/processed/authors/")
reviews_clean = spark.read.parquet(f"{base}/processed/reviews/")  # or your cleaned path


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window as W

# Number of authors
n_auth = authors.count()
if n_auth == 0:
    raise Exception("`authors` is empty; cannot build bridge.")

# Row-number each DF deterministically
books_w = books.select("book_id").withColumn("rn_b", F.row_number().over(W.orderBy("book_id")))
authors_w = authors.select("author_id").withColumn("rn_a", F.row_number().over(W.orderBy("author_id")))

# Round-robin assign each book to an author
book_authors = (books_w
    .withColumn("join_key", ((F.col("rn_b") - 1) % F.lit(n_auth)) + 1)
    .join(authors_w.withColumn("join_key", F.col("rn_a")), on="join_key", how="inner")
    .select("book_id", "author_id")
    .dropDuplicates(["book_id", "author_id"])
)

book_authors.show(5)




+-------+---------+
|book_id|author_id|
+-------+---------+
|      1|       10|
|     10|     1000|
|    100|    10000|
|   1000|   100000|
|  10000|  1000007|
+-------+---------+
only showing top 5 rows


In [0]:
curated = (
    reviews_clean.alias("r")
    .join(books.select("book_id", "title").alias("b"), on="book_id", how="inner")
    .join(book_authors.alias("ba"), on="book_id", how="inner")
    .join(authors.select("author_id", "name").alias("a"), on="author_id", how="inner")
    .select(
        "r.review_id",
        "r.book_id",
        "b.title",
        "a.author_id",
        "a.name",
        "r.user_id",
        "r.rating",
        "r.review_text"
    )
)




In [0]:
curated.printSchema()
curated.show(5, truncate=False)


root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_text: string (nullable = true)





+--------------------------------+--------+----------------------------------------------------------------------------------------------------+---------+-------------------------+--------------------------------+------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
# Switch to the legacy Hive metastore (works even when UC is enabled)
spark.sql("USE CATALOG hive_metastore")
spark.sql("USE SCHEMA default")

# Register your in-memory DataFrame as a managed table
curated.write.format("delta").mode("overwrite").saveAsTable("curated_reviews")

# Verify
spark.sql("SHOW TABLES IN hive_metastore.default").show()
spark.sql("SELECT COUNT(*) AS rows FROM hive_metastore.default.curated_reviews").show()
spark.sql("SELECT review_id, book_id, title, author_id, name, user_id, rating FROM hive_metastore.default.curated_reviews LIMIT 5").show(truncate=False)




+--------+---------------+-----------+
|database|      tableName|isTemporary|
+--------+---------------+-----------+
| default|curated_reviews|      false|
+--------+---------------+-----------+

+--------+
|    rows|
+--------+
|14971371|
+--------+

+--------------------------------+--------+------------------------------------------------------------------+---------+---------------+--------------------------------+------+
|review_id                       |book_id |title                                                             |author_id|name           |user_id                         |rating|
+--------------------------------+--------+------------------------------------------------------------------+---------+---------------+--------------------------------+------+
|a33f894eb61d730863b5e26aa24d2428|17801094|How To Be a Heroine                                               |6517874  |Adam Brody     |81615f138a75574e0179b988f2541d7f|4     |
|30a5914a11308d75be9f74c18ba6652d|135390

In [0]:
# You're on hive_metastore.default already
spark.sql("SHOW TABLES IN hive_metastore.default").show()
spark.sql("SELECT COUNT(*) AS rows FROM hive_metastore.default.curated_reviews").show()
spark.sql("""
SELECT review_id, book_id, title, author_id, name, user_id, rating
FROM hive_metastore.default.curated_reviews
LIMIT 10
""").show(truncate=False)


+--------+---------------+-----------+
|database|      tableName|isTemporary|
+--------+---------------+-----------+
| default|curated_reviews|      false|
+--------+---------------+-----------+

+--------+
|    rows|
+--------+
|14971371|
+--------+

+--------------------------------+--------+------------------------------------------------------------------+---------+---------------+--------------------------------+------+
|review_id                       |book_id |title                                                             |author_id|name           |user_id                         |rating|
+--------------------------------+--------+------------------------------------------------------------------+---------+---------------+--------------------------------+------+
|a33f894eb61d730863b5e26aa24d2428|17801094|How To Be a Heroine                                               |6517874  |Adam Brody     |81615f138a75574e0179b988f2541d7f|4     |
|30a5914a11308d75be9f74c18ba6652d|135390

In [0]:
# تحديد قاعدة البيانات
spark.sql("USE hive_metastore.default")

# عرض كل الجداول للتأكد أن الجدول موجود
print("📋 الجداول المسجلة حالياً:")
spark.sql("SHOW TABLES").show(truncate=False)

# عرض عدد الصفوف في الجدول
print("📊 عدد الصفوف في جدول curated_reviews:")
spark.sql("SELECT COUNT(*) AS total_rows FROM curated_reviews").show()

# عرض بعض الصفوف من الجدول للتأكد من وجود البيانات
print("🧾 بعض العينات من البيانات:")
spark.sql("""
SELECT review_id,
       book_id,
       title,
       author_id,
       name,
       user_id,
       rating,
       SUBSTRING(review_text, 1, 100) AS sample_review
FROM curated_reviews
LIMIT 10
""").show(truncate=False)

# عرض الإحصائيات الأساسية (مثل أقل وأعلى تقييم)
print("📈 إحصائيات بسيطة:")
spark.sql("""
SELECT
    MIN(rating) AS min_rating,
    MAX(rating) AS max_rating,
    AVG(rating) AS avg_rating
FROM curated_reviews
""").show()


📋 الجداول المسجلة حالياً:
+--------+---------------+-----------+
|database|tableName      |isTemporary|
+--------+---------------+-----------+
|default |curated_reviews|false      |
+--------+---------------+-----------+

📊 عدد الصفوف في جدول curated_reviews:
+----------+
|total_rows|
+----------+
|  14971371|
+----------+

🧾 بعض العينات من البيانات:
+--------------------------------+--------+----------------------------------------------------------------------------------------------------+---------+-------------------------+--------------------------------+------+-----------------------------------------------------------------------------------------------------+
|review_id                       |book_id |title                                                                                               |author_id|name                     |user_id                         |rating|sample_review                                                                                        |
+

In [0]:
df = spark.table("hive_metastore.default.curated_reviews")
df.show(3)


+--------------------+--------+--------------------+---------+--------------+--------------------+------+--------------------+
|           review_id| book_id|               title|author_id|          name|             user_id|rating|         review_text|
+--------------------+--------+--------------------+---------+--------------+--------------------+------+--------------------+
|a33f894eb61d73086...|17801094| How To Be a Heroine|  6517874|    Adam Brody|81615f138a75574e0...|     4|I loved this, exa...|
|30a5914a11308d75b...|13539044|The Silver Lining...|  2824686|   Malko Ebers|5760b79c65c3e5c73...|     5|I saw the movie f...|
|78a3dd083d97f69e3...|33916257|         Infinite Us| 13704514|Ban Teng Yi Li|862c74c57a2c665a4...|     4|I love stories th...|
+--------------------+--------+--------------------+---------+--------------+--------------------+------+--------------------+
only showing top 3 rows


### **Feature Engineering**

In [0]:
train_df, val_df, test_df = df.randomSplit([0.7, 0.15, 0.15], seed=42)

print("Train:", train_df.count())
print("Validation:", val_df.count())
print("Test:", test_df.count())


Train: 10481228
Validation: 2244507
Test: 2245636


In [0]:
base_path = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2"


In [0]:
train_df.write.format("delta").mode("overwrite").save(f"{base_path}/train")
val_df.write.format("delta").mode("overwrite").save(f"{base_path}/val")
test_df.write.format("delta").mode("overwrite").save(f"{base_path}/test")


In [0]:
display(dbutils.fs.ls(f"{base_path}/train"))
display(dbutils.fs.ls(f"{base_path}/val"))
display(dbutils.fs.ls(f"{base_path}/test"))


path,name,size,modificationTime
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/train/_delta_log/,_delta_log/,0,1762868886000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/train/part-00000-398c7bbd-3e7c-419e-9675-777822421c4e.c000.snappy.parquet,part-00000-398c7bbd-3e7c-419e-9675-777822421c4e.c000.snappy.parquet,83573045,1762872771000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/train/part-00000-69eae405-7acd-4895-9407-73b05dee50d2.c000.snappy.parquet,part-00000-69eae405-7acd-4895-9407-73b05dee50d2.c000.snappy.parquet,83573045,1762882077000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/train/part-00000-c59d430d-5469-4eaf-85d0-5a874d2c11d0.c000.snappy.parquet,part-00000-c59d430d-5469-4eaf-85d0-5a874d2c11d0.c000.snappy.parquet,83573045,1762868892000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/train/part-00001-32a6ea31-1965-4960-afbb-ee17c942293d.c000.snappy.parquet,part-00001-32a6ea31-1965-4960-afbb-ee17c942293d.c000.snappy.parquet,86842587,1762882077000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/train/part-00001-3e004565-efaf-4537-89f7-c753189e8fff.c000.snappy.parquet,part-00001-3e004565-efaf-4537-89f7-c753189e8fff.c000.snappy.parquet,86842587,1762872770000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/train/part-00001-c3126449-3fbc-4073-b6c7-099827e98105.c000.snappy.parquet,part-00001-c3126449-3fbc-4073-b6c7-099827e98105.c000.snappy.parquet,86842587,1762868893000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/train/part-00002-6425b16c-823b-40eb-834d-54b329a59bcd.c000.snappy.parquet,part-00002-6425b16c-823b-40eb-834d-54b329a59bcd.c000.snappy.parquet,85551174,1762868893000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/train/part-00002-808e3741-8e0d-4bc9-92e7-514fb56d42ac.c000.snappy.parquet,part-00002-808e3741-8e0d-4bc9-92e7-514fb56d42ac.c000.snappy.parquet,85551174,1762882077000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/train/part-00002-e07dd7f9-1270-4b05-8a65-2e26de763794.c000.snappy.parquet,part-00002-e07dd7f9-1270-4b05-8a65-2e26de763794.c000.snappy.parquet,85551174,1762872770000


path,name,size,modificationTime
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/val/_delta_log/,_delta_log/,0,1762868933000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/val/part-00000-391af8ca-28af-4786-b33e-29373d179860.c000.snappy.parquet,part-00000-391af8ca-28af-4786-b33e-29373d179860.c000.snappy.parquet,18074867,1762882099000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/val/part-00000-7e7985a1-b327-4e97-9ecd-614148690512.c000.snappy.parquet,part-00000-7e7985a1-b327-4e97-9ecd-614148690512.c000.snappy.parquet,18074867,1762868936000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/val/part-00000-f74a2122-3d24-448a-884d-7b76cce72795.c000.snappy.parquet,part-00000-f74a2122-3d24-448a-884d-7b76cce72795.c000.snappy.parquet,18074867,1762872797000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/val/part-00001-79468012-8a17-4093-bd06-ea65a8e4c73d.c000.snappy.parquet,part-00001-79468012-8a17-4093-bd06-ea65a8e4c73d.c000.snappy.parquet,18801822,1762868936000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/val/part-00001-b495dfb1-8d9b-45c0-b522-f3d67af920f7.c000.snappy.parquet,part-00001-b495dfb1-8d9b-45c0-b522-f3d67af920f7.c000.snappy.parquet,18801822,1762882099000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/val/part-00001-eb6501a2-f11f-4ef8-bd24-c90ae4d0bee5.c000.snappy.parquet,part-00001-eb6501a2-f11f-4ef8-bd24-c90ae4d0bee5.c000.snappy.parquet,18801822,1762872794000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/val/part-00002-54501421-541c-4458-bb13-28ea05c56c50.c000.snappy.parquet,part-00002-54501421-541c-4458-bb13-28ea05c56c50.c000.snappy.parquet,18411859,1762882099000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/val/part-00002-83f2bc05-4d68-4f49-9d94-3ad262c0745e.c000.snappy.parquet,part-00002-83f2bc05-4d68-4f49-9d94-3ad262c0745e.c000.snappy.parquet,18411859,1762868935000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/val/part-00002-c12847ec-8f07-475c-9b4c-427a0c64eed3.c000.snappy.parquet,part-00002-c12847ec-8f07-475c-9b4c-427a0c64eed3.c000.snappy.parquet,18411859,1762872795000


path,name,size,modificationTime
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/test/_delta_log/,_delta_log/,0,1762868960000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/test/part-00000-16b4988e-27ef-4574-b8de-3908d4515242.c000.snappy.parquet,part-00000-16b4988e-27ef-4574-b8de-3908d4515242.c000.snappy.parquet,17958643,1762868964000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/test/part-00000-58e13caf-739d-42cf-bf5e-ba5694e0258a.c000.snappy.parquet,part-00000-58e13caf-739d-42cf-bf5e-ba5694e0258a.c000.snappy.parquet,17958643,1762872810000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/test/part-00000-ef1bd774-ccfb-4569-b9d1-85cd0d45fab8.c000.snappy.parquet,part-00000-ef1bd774-ccfb-4569-b9d1-85cd0d45fab8.c000.snappy.parquet,17958643,1762882114000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/test/part-00001-70a97f07-7073-422c-bbc1-0187fd51ad93.c000.snappy.parquet,part-00001-70a97f07-7073-422c-bbc1-0187fd51ad93.c000.snappy.parquet,18840382,1762868963000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/test/part-00001-80c104bd-e14f-42ec-a92e-0ad8989f1d23.c000.snappy.parquet,part-00001-80c104bd-e14f-42ec-a92e-0ad8989f1d23.c000.snappy.parquet,18840382,1762882114000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/test/part-00001-8f89813d-9f31-48db-8be9-63cd5df5c59f.c000.snappy.parquet,part-00001-8f89813d-9f31-48db-8be9-63cd5df5c59f.c000.snappy.parquet,18840382,1762872810000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/test/part-00002-0eecf227-102e-44dc-a6ff-2f1181a8bd4b.c000.snappy.parquet,part-00002-0eecf227-102e-44dc-a6ff-2f1181a8bd4b.c000.snappy.parquet,18737840,1762872811000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/test/part-00002-47ff746d-6587-4470-85ba-76aec19e4ef7.c000.snappy.parquet,part-00002-47ff746d-6587-4470-85ba-76aec19e4ef7.c000.snappy.parquet,18737840,1762882115000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2/test/part-00002-accf90dc-af68-444f-b3b8-b51f6d0d8213.c000.snappy.parquet,part-00002-accf90dc-af68-444f-b3b8-b51f6d0d8213.c000.snappy.parquet,18737840,1762868963000


In [0]:
base_path = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2"   

train_df = spark.read.format("delta").load(f"{base_path}/train")
val_df   = spark.read.format("delta").load(f"{base_path}/val")
test_df  = spark.read.format("delta").load(f"{base_path}/test")


In [0]:
import re
from pyspark.sql import functions as F, types as T

@F.udf("string")
def clean_text(s):
    if not s: return None
    s = s.lower()
    s = re.sub(r"http[s]?://\S+|www\.\S+", " <url> ", s)
    s = re.sub(r"\d+", " <num> ", s)
    s = re.sub(r"[^\w\s<>]", " ", s)     # remove punctuation, keep placeholders
    s = re.sub(r"\s+", " ", s).strip()
    return s

def with_clean(df):
    return (df.withColumn("review_text_clean", clean_text(F.col("review_text")))
              .filter(F.length("review_text_clean") >= 10))
    
train_c = with_clean(train_df)
val_c   = with_clean(val_df)
test_c  = with_clean(test_df)


In [0]:
train_c = (train_c
  .withColumn("review_length_chars", F.length("review_text_clean"))
  .withColumn("review_length_words", F.size(F.split(F.col("review_text_clean"), r"\s+")))
)
val_c  = (val_c
  .withColumn("review_length_chars", F.length("review_text_clean"))
  .withColumn("review_length_words", F.size(F.split(F.col("review_text_clean"), r"\s+")))
)
test_c = (test_c
  .withColumn("review_length_chars", F.length("review_text_clean"))
  .withColumn("review_length_words", F.size(F.split(F.col("review_text_clean"), r"\s+")))
)


In [0]:
train_clean = train_c
STAGE = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/tmp_features_v2"
OUT   = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/features_v2"



In [0]:
from pyspark.sql import functions as F

train_tok = train_clean.withColumn(
    "tokens_raw",
    F.array_remove(F.split(F.regexp_replace(F.col("review_text_clean"), r"[^\w]+", " "), r"\s+"), "")
)

train_tok.write.format("delta").mode("overwrite").save(f"{STAGE}/tokens/train")


In [0]:
from pyspark.sql import functions as F

# Load tokenized data
train_tok = spark.read.format("delta").load(f"{STAGE}/tokens/train")

# Load a small English stopword list (you can expand as needed)
stopwords = set([
    "a","an","and","are","as","at","be","by","for","from","has",
    "he","in","is","it","its","of","on","that","the","to","was","were","will","with"
])

@F.udf("array<string>")
def remove_stopwords(tokens):
    return [t for t in tokens if t.lower() not in stopwords]

# Apply UDF to remove stopwords
train_sw = train_tok.withColumn("tokens_ns", remove_stopwords(F.col("tokens_raw")))

# Write clean output
train_sw.write.format("delta").mode("overwrite").save(f"{STAGE}/sw/train")



In [0]:
from pyspark.sql import functions as F

# read the stopword-clean tokens we just saved
train_sw = spark.read.format("delta").load(f"{STAGE}/sw/train")

# make bigrams without Spark-ML NGram (avoids Py4J whitelist)
@F.udf("array<string>")
def make_bigrams(tokens):
    if tokens is None:
        return None
    n = len(tokens)
    if n < 2:
        return []
    return [tokens[i] + " " + tokens[i+1] for i in range(n-1)]

train_bg = train_sw.withColumn("bigrams", make_bigrams(F.col("tokens_ns")))

# persist stage
train_bg.write.format("delta").mode("overwrite").save(f"{STAGE}/bigrams/train")


In [0]:
from pyspark.sql import functions as F

train_bg = spark.read.format("delta").load(f"{STAGE}/bigrams/train")

# Concatenate unigrams + bigrams (keeps duplicates)
train_m = train_bg.withColumn("tokens_merged", F.concat(F.col("tokens_ns"), F.col("bigrams")))

# If you prefer unique tokens only, use this instead:
# train_m = train_bg.withColumn("tokens_merged", F.array_distinct(F.concat(F.col("tokens_ns"), F.col("bigrams"))))

train_m.write.format("delta").mode("overwrite").save(f"{STAGE}/merged/train")


In [0]:
from pyspark.sql import functions as F

# merged unigrams+bigrams from the last step
train_m = spark.read.format("delta").load(f"{STAGE}/merged/train")

# total tokens per doc (for normalization)
train_len = train_m.select(
    "review_id",
    F.size(F.col("tokens_merged")).alias("tok_total")
)

# explode tokens -> per (doc, token) counts
train_tok_counts = (
    train_m
    .select("review_id", F.explode(F.col("tokens_merged")).alias("token"))
    .groupBy("review_id", "token")
    .agg(F.count("*").alias("tok_cnt"))
)

# TF = count / total
train_tf = (
    train_tok_counts
    .join(train_len, on="review_id", how="inner")
    .withColumn("tf", F.col("tok_cnt") / F.col("tok_total"))
    .select("review_id", "token", "tf")
)

train_tf.write.format("delta").mode("overwrite").save(f"{STAGE}/tf_sql/train")


In [0]:
from pyspark.sql import functions as F

# distinct doc count (N)
N = spark.read.format("delta").load(f"{STAGE}/merged/train") \
    .select("review_id").distinct().count()

# document frequency per token
train_dfreq = (
    spark.read.format("delta").load(f"{STAGE}/merged/train")
    .select("review_id", F.explode(F.col("tokens_merged")).alias("token"))
    .distinct()
    .groupBy("token")
    .agg(F.count("*").alias("df"))
)

# IDF = log((N+1)/(df+1)) + 1  (smoothing)
train_idf = train_dfreq.withColumn("idf", F.log((F.lit(N)+1) / (F.col("df")+1)) + F.lit(1.0)) \
                       .select("token","idf")

train_idf.write.format("delta").mode("overwrite").save(f"{STAGE}/idf_sql/train")


In [0]:
from pyspark.sql import functions as F

train_tf   = spark.read.format("delta").load(f"{STAGE}/tf_sql/train")
train_idf  = spark.read.format("delta").load(f"{STAGE}/idf_sql/train")
train_base = spark.read.format("delta").load(f"{STAGE}/merged/train") \
                       .select("review_id","book_id","rating",
                               "review_length_words","review_length_chars")

# join TF with IDF -> weight
train_w = (train_tf.join(train_idf, on="token", how="inner")
                    .withColumn("weight", F.col("tf") * F.col("idf"))
                    .select("review_id","token","weight"))

# collapse tokens to a map<string,double> per doc
train_tfidf_map = (
    train_w
    .groupBy("review_id")
    .agg(F.map_from_entries(F.collect_list(F.struct(F.col("token"), F.col("weight")))).alias("tfidf_map"))
    .join(train_base, on="review_id", how="left")
)

# save (we’ll keep features as a map for now; it’s numeric and compact)
train_tfidf_map.write.format("delta").mode("overwrite").save(f"{STAGE}/tfidf_map/train")


In [0]:
from pyspark.sql import functions as F

def ensure_basic_features(df):
    # create review_text_clean if missing
    if "review_text_clean" not in df.columns:
        rt = F.lower(F.col("review_text"))
        rt = F.regexp_replace(rt, r"(http|https)://\S+|www\.\S+", " <url> ")
        rt = F.regexp_replace(rt, r"\d+", " <num> ")
        rt = F.regexp_replace(rt, r"[^\w\s<>]", " ")
        rt = F.regexp_replace(rt, r"\s+", " ")
        rt = F.trim(rt)
        df = df.withColumn("review_text_clean", rt)

    # III.4.a — basic text features
    df = (df
          .withColumn("review_length_chars", F.length("review_text_clean"))
          .withColumn("review_length_words", F.size(F.split(F.col("review_text_clean"), r"\s+")))
          .filter(F.col("review_length_chars") >= 10))  # drop very short reviews (lab rule)
    return df

# apply to your splits (train_c / val_c / test_c)
train_c = ensure_basic_features(train_c)
val_c   = ensure_basic_features(val_c)
test_c  = ensure_basic_features(test_c)

# quick peek (no heavy action)
train_c.select("review_text_clean","review_length_words","review_length_chars").limit(5).toPandas()


Unnamed: 0,review_text_clean,review_length_words,review_length_chars
0,i love bill bryson he reminds me why i was a h...,37,186
1,this is going to be a difficult review to writ...,509,2690
2,a very satisfying conclusion to this entertain...,8,56
3,this book was good lol i think it is getting a...,63,284
4,mostly just loved this for helping me re live ...,17,87


In [0]:
# only first time — installs VADER in your cluster
%pip install nltk


Collecting nltk
  Downloading nltk-3.9.2-py3-none-any.whl.metadata (3.2 kB)
Downloading nltk-3.9.2-py3-none-any.whl (1.5 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.5 MB[0m [31m?[0m eta [36m-:--:--[0m
[2K   [91m━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.1/1.5 MB[0m [31m2.7 MB/s[0m eta [36m0:00:01[0m
[2K   [91m━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.4/1.5 MB[0m [31m6.1 MB/s[0m eta [36m0:00:01[0m
[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━[0m [32m1.5/1.5 MB[0m [31m14.4 MB/s[0m eta [36m0:00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.5/1.5 MB[0m [31m13.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: nltk
Successfully installed nltk-3.9.2
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
!pip install nltk
import nltk

nltk.download("vader_lexicon")

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


[nltk_data] Downloading package vader_lexicon to
[nltk_data]     /home/spark-83af3437-b05f-4028-bdaa-fd/nltk_data...


True

In [0]:
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from pyspark.sql import functions as F, types as T

# Initialize VADER sentiment analyzer
sia = SentimentIntensityAnalyzer()

# Define UDF to compute sentiment scores
@F.udf(returnType=T.StructType([
    T.StructField("pos", T.DoubleType()),
    T.StructField("neg", T.DoubleType()),
    T.StructField("neu", T.DoubleType()),
    T.StructField("compound", T.DoubleType())
]))
def get_sentiment(text):
    if text is None or text.strip() == "":
        return {"pos": 0.0, "neg": 0.0, "neu": 0.0, "compound": 0.0}
    return sia.polarity_scores(text)

# Apply on all splits
train_sent = train_c.withColumn("sentiment", get_sentiment(F.col("review_text_clean")))
val_sent   = val_c.withColumn("sentiment", get_sentiment(F.col("review_text_clean")))
test_sent  = test_c.withColumn("sentiment", get_sentiment(F.col("review_text_clean")))

# Split the struct into separate columns
def expand_sent(df):
    return (df
            .withColumn("sentiment_pos", F.col("sentiment.pos"))
            .withColumn("sentiment_neg", F.col("sentiment.neg"))
            .withColumn("sentiment_neu", F.col("sentiment.neu"))
            .withColumn("sentiment_compound", F.col("sentiment.compound"))
            .drop("sentiment"))

train_sent = expand_sent(train_sent)
val_sent   = expand_sent(val_sent)
test_sent  = expand_sent(test_sent)

# ✅ quick check
train_sent.select("review_text_clean", "sentiment_pos", "sentiment_neg", "sentiment_compound").show(5, truncate=False)


+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
display(dbutils.fs.ls("abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/tmp_features_v2"))


path,name,size,modificationTime
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/tmp_features_v2/bigrams/,bigrams/,0,1762873843000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/tmp_features_v2/idf_sql/,idf_sql/,0,1762874694000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/tmp_features_v2/merged/,merged/,0,1762874113000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/tmp_features_v2/sw/,sw/,0,1762873360000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/tmp_features_v2/tf_sql/,tf_sql/,0,1762874322000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/tmp_features_v2/tfidf_map/,tfidf_map/,0,1762874921000
abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/tmp_features_v2/tokens/,tokens/,0,1762872874000


In [0]:
from pyspark.sql import functions as F

# paths
STAGE = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/tmp_features_v2"
OUT   = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/features_v2"

# use sentiment-enriched frames if present; else the cleaned ones
val_base_df  = val_sent  if 'val_sent'  in globals() else val_c
test_base_df = test_sent if 'test_sent' in globals() else test_c

# helpers
def fs_exists(p):
    try:
        dbutils.fs.ls(p)
        return True
    except Exception:
        return False

# stopwords + bigrams UDFs (redefine if needed)
stop = set("""a an and are as at be by for from has he in is it its of on that the to was were will with""".split())
@F.udf("array<string>")
def rm_stop(tokens):
    return [t for t in (tokens or []) if t.lower() not in stop]

@F.udf("array<string>")
def bigrams(tokens):
    if not tokens or len(tokens) < 2: 
        return []
    return [tokens[i] + " " + tokens[i+1] for i in range(len(tokens)-1)]

def ensure_sw_and_merged(split, base_df):
    # sw/{split}
    sw_path = f"{STAGE}/sw/{split}"
    if not fs_exists(sw_path):
        df = base_df.withColumn(
            "tokens_raw",
            F.array_remove(F.split(F.regexp_replace(F.col("review_text_clean"), r"[^\w]+", " "), r"\s+"), "")
        ).withColumn("tokens_ns", rm_stop(F.col("tokens_raw")))
        df.select("review_id","book_id","rating","review_length_words","review_length_chars","tokens_ns") \
          .write.format("delta").mode("overwrite").save(sw_path)

    # merged/{split}
    merged_path = f"{STAGE}/merged/{split}"
    if not fs_exists(merged_path):
        df = spark.read.format("delta").load(sw_path)
        df = df.withColumn("bigrams", bigrams(F.col("tokens_ns")))
        df = df.withColumn("tokens_merged", F.concat(F.col("tokens_ns"), F.col("bigrams")))
        df.write.format("delta").mode("overwrite").save(merged_path)

# 1) ensure inputs exist
ensure_sw_and_merged("val",  val_base_df)
ensure_sw_and_merged("test", test_base_df)

# 2) TF for split + join train IDF -> tfidf_map
train_idf = spark.read.format("delta").load(f"{STAGE}/idf_sql/train")  # token,idf

def build_tfidf_map(split):
    m = spark.read.format("delta").load(f"{STAGE}/merged/{split}")
    tot = m.select("review_id", F.size("tokens_merged").alias("tot"))
    cnt = m.select("review_id", F.explode("tokens_merged").alias("token")).groupBy("review_id","token").count()
    tf  = cnt.join(tot, "review_id").withColumn("tf", F.col("count")/F.col("tot")).select("review_id","token","tf")
    w   = tf.join(train_idf, "token").withColumn("weight", F.col("tf")*F.col("idf"))
    tfidf_map = (w.groupBy("review_id")
                   .agg(F.map_from_entries(F.collect_list(F.struct("token","weight"))).alias("tfidf_map"))
                   .join(m.select("review_id","book_id","rating","review_length_words","review_length_chars"),
                         "review_id","left"))
    tfidf_map.write.format("delta").mode("overwrite").save(f"{STAGE}/tfidf_map/{split}")

build_tfidf_map("val")
build_tfidf_map("test")

# 3) finalize (add sentiment if present) -> save to GOLD
def finalize(split, base_df):
    tfidf = spark.read.format("delta").load(f"{STAGE}/tfidf_map/{split}")
    # add sentiment columns if they exist
    cols = ["review_id","sentiment_pos","sentiment_neg","sentiment_neu","sentiment_compound"]
    base_sel = [c for c in cols if c in base_df.columns]
    df = tfidf.join(base_df.select(*base_sel), "review_id", "left")
    df_final = df.select(
        "review_id","book_id","rating",
        F.struct(
            "tfidf_map","review_length_words","review_length_chars",
            *[c for c in ["sentiment_pos","sentiment_neg","sentiment_neu","sentiment_compound"] if c in df.columns]
        ).alias("features")
    )
    df_final.write.format("delta").mode("overwrite").save(f"{OUT}/{split}")

finalize("val",  val_base_df)
finalize("test", test_base_df)


In [0]:
from pyspark.sql import functions as F

# paths (set if not already)
STAGE = STAGE if 'STAGE' in globals() else "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/tmp_features_v2"

# load merged tokens + train IDF
val_m    = spark.read.format("delta").load(f"{STAGE}/merged/val")
train_idf = spark.read.format("delta").load(f"{STAGE}/idf_sql/train")   # cols: token,idf

# TF only on VAL
tot_v = val_m.select("review_id", F.size("tokens_merged").alias("tot"))
cnt_v = val_m.select("review_id", F.explode("tokens_merged").alias("token")).groupBy("review_id","token").count()
val_tf = cnt_v.join(tot_v, "review_id").withColumn("tf", F.col("count")/F.col("tot")).select("review_id","token","tf")

# join TF×IDF → weight → collapse to map + keep basic columns
val_w = val_tf.join(train_idf, "token").withColumn("weight", F.col("tf")*F.col("idf"))
val_tfidf_map = (val_w.groupBy("review_id")
    .agg(F.map_from_entries(F.collect_list(F.struct("token","weight"))).alias("tfidf_map"))
    .join(val_m.select("review_id","book_id","rating","review_length_words","review_length_chars"),
          "review_id","left"))

val_tfidf_map.write.format("delta").mode("overwrite").save(f"{STAGE}/tfidf_map/val")


In [0]:
# Databricks/Fabric: install once, then Restart Python
%pip install -q sentence-transformers==2.7.0


[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
%pip install sentence-transformers==2.7.0


Collecting sentence-transformers==2.7.0
  Downloading sentence_transformers-2.7.0-py3-none-any.whl.metadata (11 kB)
Collecting transformers<5.0.0,>=4.34.0 (from sentence-transformers==2.7.0)
  Downloading transformers-4.57.1-py3-none-any.whl.metadata (43 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/44.0 kB[0m [31m?[0m eta [36m-:--:--[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.0/44.0 kB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting tqdm (from sentence-transformers==2.7.0)
  Downloading tqdm-4.67.1-py3-none-any.whl.metadata (57 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/57.7 kB[0m [31m?[0m eta [36m-:--:--[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m57.7/57.7 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting torch>=1.11.0 (from sentence-transformers==2.7.0)
  Downloading torch-2.9.0-cp312-cp312-manylinux_2_28_x86_64.whl.metadata (30 kB)
Collect

In [0]:
%pip install transformers


Collecting transformers
  Downloading transformers-4.57.1-py3-none-any.whl.metadata (43 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/44.0 kB[0m [31m?[0m eta [36m-:--:--[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.0/44.0 kB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
Collecting huggingface-hub<1.0,>=0.34.0 (from transformers)
  Downloading huggingface_hub-0.36.0-py3-none-any.whl.metadata (14 kB)
Collecting regex!=2019.12.17 (from transformers)
  Downloading regex-2025.11.3-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (40 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/40.5 kB[0m [31m?[0m eta [36m-:--:--[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.5/40.5 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
Collecting tokenizers<=0.23.0,>=0.22.0 (from transformers)
  Downloading tokenizers-0.22.1-cp39-abi3-manylinux_2_17_x86_64.manylinux

In [0]:
%restart_python


In [0]:
%pip install --upgrade transformers sentence-transformers
dbutils.library.restartPython()


Collecting transformers
  Downloading transformers-4.57.1-py3-none-any.whl.metadata (43 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/44.0 kB[0m [31m?[0m eta [36m-:--:--[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.0/44.0 kB[0m [31m1.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting sentence-transformers
  Downloading sentence_transformers-5.1.2-py3-none-any.whl.metadata (16 kB)
Collecting huggingface-hub<1.0,>=0.34.0 (from transformers)
  Downloading huggingface_hub-0.36.0-py3-none-any.whl.metadata (14 kB)
Collecting regex!=2019.12.17 (from transformers)
  Downloading regex-2025.11.3-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl.metadata (40 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/40.5 kB[0m [31m?[0m eta [36m-:--:--[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.5/40.5 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
Collecting tokeni

In [0]:
import transformers

None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.
Unexpected internal error when monkey patching `PreTrainedModel.from_pretrained`: 
PreTrainedModel requires the PyTorch library but it was not found in your environment. Check out the instructions on the
installation page: https://pytorch.org/get-started/locally/ and follow the ones that match your environment.
Please note that you may need to restart your runtime after installation.

Unexpected internal error when monkey patching `Trainer.train`: 
Trainer requires the PyTorch library but it was not found in your environment. Check out the instructions on the
installation page: https://pytorch.org/get-started/locally/ and follow the ones that match your environment.
Please note that you may need to restart your runtime after installation.



In [0]:
from transformers import AutoTokenizer, AutoModel
import torch
import numpy as np
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf

# paths
STAGE = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/tmp_features_v2"
OUT   = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/features_v2"

# pick your base frames (sentiment-enriched if present)
train_base_df = train_sent if 'train_sent' in globals() else train_c
val_base_df   = val_sent   if 'val_sent'   in globals() else val_c
test_base_df  = test_sent  if 'test_sent'  in globals() else test_c

# Load the SAME model as SBERT uses, but from HF directly
MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"   # 384-dim
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model     = AutoModel.from_pretrained(MODEL_NAME)
model.eval()
torch.set_grad_enabled(False)


tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

torch.autograd.grad_mode.set_grad_enabled(mode=False)

In [0]:
@pandas_udf("array<float>")
def hf_embed_udf(texts: pd.Series) -> pd.Series:
    # tokenize batch
    toks = tokenizer(
        list(texts.fillna("")),
        padding=True, truncation=True, max_length=256,
        return_tensors="pt"
    )
    # forward pass
    out = model(**toks)  # last_hidden_state: [B, T, H]
    # mean-pool over tokens
    emb = out.last_hidden_state.mean(dim=1)               # [B, H]
    # L2-normalize (cosine ready)
    emb = torch.nn.functional.normalize(emb, p=2, dim=1)  # [B, H]
    # to float32 lists
    return pd.Series([list(v.cpu().numpy().astype("float32")) for v in emb])


In [0]:
from pyspark.sql import functions as F

# 0) Tuning knobs — small batches & more tasks
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "128")   # default ~10k; too big for models
spark.conf.set("spark.sql.shuffle.partitions", "200")                   # spread work

STAGE = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/tmp_features_v2"

# if path already exists from a failed attempt, clean it once
for split in ["train","val","test"]:
    try:
        dbutils.fs.rm(f"{STAGE}/embeddings/{split}", recurse=True)
    except:
        pass


In [0]:
from pyspark.sql.functions import pandas_udf
import pandas as pd

MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"

@pandas_udf("array<float>")
def hf_embed_udf(texts: pd.Series) -> pd.Series:
    import torch
    import numpy as np
    from transformers import AutoTokenizer, AutoModel

    # cache per worker
    global _hf_tok, _hf_mdl
    if "_hf_tok" not in globals():
        _hf_tok = AutoTokenizer.from_pretrained(MODEL_NAME)
    if "_hf_mdl" not in globals():
        _hf_mdl = AutoModel.from_pretrained(MODEL_NAME)
        _hf_mdl.eval()

    # inputs
    batch = list(texts.fillna(""))

    with torch.no_grad():
        toks = _hf_tok(
            batch, padding=True, truncation=True, max_length=256, return_tensors="pt"
        )
        # mean-pool then L2-normalize
        emb = _hf_mdl(**toks).last_hidden_state.mean(dim=1)
        emb = torch.nn.functional.normalize(emb, p=2, dim=1)
        arr = emb.detach().cpu().numpy().astype("float32")

    return pd.Series([list(v) for v in arr])


In [0]:
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "512")  # fine for mapInPandas
spark.conf.set("spark.sql.shuffle.partitions", "400")                  # more parallelism


In [0]:
# Spark tuning (keep)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "512")
spark.conf.set("spark.sql.shuffle.partitions", "400")

from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType

STAGE = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/tmp_features_v2"

schema = StructType([
    StructField("review_id", StringType(), False),
    StructField("bert_embedding", ArrayType(FloatType()), False),
])

MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"   # SBERT MiniLM (384-dim)

def embed_map(pdf_iter):
    import os
    import pandas as pd
    from transformers import AutoTokenizer, AutoModel

    # set these BEFORE importing torch to limit CPU threads
    os.environ.setdefault("OMP_NUM_THREADS", "1")
    os.environ.setdefault("MKL_NUM_THREADS", "1")

    import torch

    HF_CACHE = "/tmp/hf_cache"
    os.makedirs(HF_CACHE, exist_ok=True)
    os.environ["TRANSFORMERS_CACHE"] = HF_CACHE
    os.environ["HF_HOME"] = HF_CACHE
    os.environ["HUGGINGFACE_HUB_CACHE"] = HF_CACHE
    os.environ["TORCH_HOME"] = HF_CACHE

    tok = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2",
                                        cache_dir=HF_CACHE)
    mdl = AutoModel.from_pretrained("sentence-transformers/all-MiniLM-L6-v2",
                                    cache_dir=HF_CACHE)
    mdl.eval(); torch.set_grad_enabled(False)

    # ✅ set torch threads at most once per worker, swallow errors if already set
    if not globals().get("_TORCH_THREADS_SET", False):
        try:
            torch.set_num_threads(1)
            torch.set_num_interop_threads(1)
        except Exception:
            pass
        globals()["_TORCH_THREADS_SET"] = True

    BATCH, MAXLEN = 256, 128

    for pdf in pdf_iter:
        if pdf.empty:
            yield pd.DataFrame({"review_id": [], "bert_embedding": []}); continue

        texts = pdf["review_text_clean"].fillna("").tolist()
        rids  = pdf["review_id"].astype(str).tolist()

        out = []
        for i in range(0, len(texts), BATCH):
            toks = tok(texts[i:i+BATCH], padding=True, truncation=True,
                       max_length=MAXLEN, return_tensors="pt")
            with torch.no_grad():
                x = mdl(**toks).last_hidden_state.mean(dim=1)
                x = torch.nn.functional.normalize(x, p=2, dim=1)
                out += x.detach().cpu().numpy().astype("float32").tolist()

        yield pd.DataFrame({"review_id": rids, "bert_embedding": out})

def write_fast(df, split, parts=600):
    (df.select("review_id","review_text_clean")
       .repartition(parts)
       .mapInPandas(embed_map, schema)
       .write.format("delta").mode("overwrite")
       .save(f"{STAGE}/embeddings/{split}"))


In [0]:
# to reduce overhead, run one split now; you can lower parts too
write_fast(train_base_df, "train", parts=200)


In [0]:
write_fast(val_base_df,   "val",   parts=100)

In [0]:
write_fast(test_base_df,  "test",  parts=100)

In [0]:
STAGE = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/tmp_features_v2"
OUT   = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/features_v2"

def check_split(split):
    ok = True
    print(f"\n=== Checking {split.upper()} ===")
    for name in ["merged", "tfidf_map", "embeddings"]:
        path = f"{STAGE}/{name}/{split}"
        try:
            df = spark.read.format("delta").load(path)
            c = df.count()
            print(f"{name:11s}: OK  rows={c}")
        except Exception as e:
            print(f"{name:11s}: MISSING")
            ok = False
    # optional sentiment
    try:
        spark.read.format("delta").load(f"{STAGE}/sentiment/{split}").limit(1).count()
        print("sentiment   : OK (optional)")
    except:
        print("sentiment   : not present (optional)")
    return ok

ready = {s: check_split(s) for s in ["train","val","test"]}
ready



=== Checking TRAIN ===
merged     : OK  rows=10434620
tfidf_map  : OK  rows=10434610
embeddings : OK  rows=10434620
sentiment   : not present (optional)

=== Checking VAL ===
merged     : OK  rows=2235157
tfidf_map  : OK  rows=2234912
embeddings : OK  rows=2235157
sentiment   : not present (optional)

=== Checking TEST ===
merged     : OK  rows=2236333
tfidf_map  : OK  rows=2236057
embeddings : OK  rows=2236333
sentiment   : not present (optional)


{'train': True, 'val': True, 'test': True}

In [0]:
from pyspark.sql import functions as F, types as T

STAGE = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/tmp_features_v2"
OUT   = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/features_v2"

def path_exists(path: str) -> bool:
    try:
        dbutils.fs.ls(path); return True
    except:
        return False

empty_map = F.map_from_arrays(
    F.array().cast(T.ArrayType(T.StringType())),
    F.array().cast(T.ArrayType(T.DoubleType()))
)

def finalize(split):
    base = spark.read.format("delta").load(f"{STAGE}/merged/{split}") \
            .select("review_id","book_id","rating","review_length_words","review_length_chars") \
            .dropDuplicates(["review_id"])

    tfidf = spark.read.format("delta").load(f"{STAGE}/tfidf_map/{split}") \
            .select("review_id","tfidf_map") \
            .dropDuplicates(["review_id"])

    emb = spark.read.format("delta").load(f"{STAGE}/embeddings/{split}") \
          .select("review_id","bert_embedding") \
          .dropDuplicates(["review_id"])

    df = (base.join(tfidf, "review_id", "left")
             .withColumn("tfidf_map", F.coalesce(F.col("tfidf_map"), empty_map))
             .join(emb, "review_id", "left"))

    sent_path = f"{STAGE}/sentiment/{split}"
    if path_exists(sent_path):
        sent = spark.read.format("delta").load(sent_path)
        sent_cols = [c for c in sent.columns if c.startswith("sentiment_")]
        if "review_id" not in sent_cols:
            sent_cols = ["review_id"] + sent_cols
        sent = sent.select(*sent_cols).dropDuplicates(["review_id"])
        df = df.join(sent, "review_id", "left")

    df.write.format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .save(f"{OUT}/{split}")

    print(f"✔ wrote {split} → {OUT}/{split}")

for s in ["train","val","test"]:
    finalize(s)


✔ wrote train → abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/features_v2/train
✔ wrote val → abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/features_v2/val
✔ wrote test → abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/features_v2/test


In [0]:
for s in ["train","val","test"]:
    df = spark.read.format("delta").load(f"{OUT}/{s}")
    print(s, "→ rows:", df.count())
    df.printSchema()



train → rows: 10434620
root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_length_words: integer (nullable = true)
 |-- review_length_chars: integer (nullable = true)
 |-- tfidf_map: map (nullable = true)
 |    |-- key: string
 |    |-- value: double (valueContainsNull = true)
 |-- bert_embedding: array (nullable = true)
 |    |-- element: float (containsNull = true)

val → rows: 2235157
root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_length_words: integer (nullable = true)
 |-- review_length_chars: integer (nullable = true)
 |-- tfidf_map: map (nullable = true)
 |    |-- key: string
 |    |-- value: double (valueContainsNull = true)
 |-- bert_embedding: array (nullable = true)
 |    |-- element: float (containsNull = true)

test → rows: 2236333
root
 |-- review_id: string (nullable = true)
 |-- book_id: string (null

In [0]:
from pyspark.sql import functions as F, types as T

STAGE = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/tmp_features_v2"
OUT   = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/features_v2"

def path_exists(path: str) -> bool:
    try:
        dbutils.fs.ls(path)
        return True
    except Exception:
        return False

empty_map = F.map_from_arrays(
    F.array().cast(T.ArrayType(T.StringType())),
    F.array().cast(T.ArrayType(T.DoubleType()))
)

def finalize(split):
    emb   = spark.read.format("delta").load(f"{STAGE}/embeddings/{split}").dropDuplicates(["review_id"])
    tfidf = spark.read.format("delta").load(f"{STAGE}/tfidf_map/{split}").dropDuplicates(["review_id"])
    base  = spark.read.format("delta").load(f"{STAGE}/merged/{split}")

    df = (base.select("review_id","book_id","rating","review_length_words","review_length_chars")
              .join(tfidf, "review_id", "left")
              .withColumn("tfidf_map", F.coalesce(F.col("tfidf_map"), empty_map))
              .join(emb,   "review_id", "left"))

    sent_path = f"{STAGE}/sentiment/{split}"
    if path_exists(sent_path):
        sent = spark.read.format("delta").load(sent_path).dropDuplicates(["review_id"])
        df = df.join(sent, "review_id", "left")

    df.write.format("delta").mode("overwrite").save(f"{OUT}/{split}")
    print(f"✔ wrote {split} → {OUT}/{split}")


In [0]:
# paths
OUT = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/features_v2"

from pyspark.sql import functions as F

# show summary for all splits
for s in ["train","val","test"]:
    print(f"\n=== {s.upper()} ===")
    try:
        df = spark.read.format("delta").load(f"{OUT}/{s}")
        print("Rows:", df.count())
        df.printSchema()

        # check key columns
        must_cols = ["review_id","book_id","rating",
                     "review_length_words","review_length_chars",
                     "tfidf_map","bert_embedding"]
        missing = [c for c in must_cols if c not in df.columns]
        print("Missing required columns:", missing)

        # show a few samples
        df.select("review_id","book_id","rating",
                  F.size("bert_embedding").alias("emb_dim"),
                  F.size(F.map_keys("tfidf_map")).alias("tfidf_terms")
                 ).show(5, truncate=False)
    except Exception as e:
        print(f"{s}: not found or unreadable → {e}")



=== TRAIN ===
Rows: 10434620
root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_length_words: integer (nullable = true)
 |-- review_length_chars: integer (nullable = true)
 |-- tfidf_map: map (nullable = true)
 |    |-- key: string
 |    |-- value: double (valueContainsNull = true)
 |-- bert_embedding: array (nullable = true)
 |    |-- element: float (containsNull = true)

Missing required columns: []
+--------------------------------+--------+------+-------+-----------+
|review_id                       |book_id |rating|emb_dim|tfidf_terms|
+--------------------------------+--------+------+-------+-----------+
|007ada296a8fa02bcc277450c17a2358|5129    |4     |384    |182        |
|0181b6003243e79333fe5ce720ab3418|18693115|2     |384    |4          |
|024382517d12f0826f9db23364adbf63|25514002|4     |384    |97         |
|02566064c846f151641482bca49294f2|18721666|5     |384    |160        |
|02579708c27

In [0]:
%pip install textblob
dbutils.library.restartPython()


Collecting textblob
  Downloading textblob-0.19.0-py3-none-any.whl.metadata (4.4 kB)
Downloading textblob-0.19.0-py3-none-any.whl (624 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/624.3 kB[0m [31m?[0m eta [36m-:--:--[0m
[2K   [91m━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m92.2/624.3 kB[0m [31m2.7 MB/s[0m eta [36m0:00:01[0m
[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━[0m [32m450.6/624.3 kB[0m [31m6.6 MB/s[0m eta [36m0:00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m624.3/624.3 kB[0m [31m6.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: textblob
Successfully installed textblob-0.19.0
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
from pyspark.sql import functions as F, types as T

STAGE = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/tmp_features_v2"

# pandas UDF ترجع struct فيه الأربعة قيم
@F.pandas_udf("struct<sentiment_pos:double, sentiment_neg:double, sentiment_neu:double, sentiment_compound:double>")
def textblob_sentiment_udf(texts):
    from textblob import TextBlob
    import pandas as pd

    def score_one(t):
        if t is None:
            return (0.0, 0.0, 1.0, 0.0)   # كلّه neutral
        t = str(t)
        blob = TextBlob(t)
        comp = float(blob.sentiment.polarity)  # من -1 إلى 1
        # نشتق pos/neg/neu بصورة بسيطة
        pos = max(comp, 0.0)
        neg = max(-comp, 0.0)
        neu = max(0.0, 1.0 - pos - neg)
        return (pos, neg, neu, comp)

    vals = [score_one(t) for t in texts]
    pdf = pd.DataFrame(vals, columns=["sentiment_pos","sentiment_neg","sentiment_neu","sentiment_compound"])
    return pdf

def build_sentiment(split):
    print(f"=== sentiment for {split} ===")

    # نقرأ النص المنظّف من merged/{split}
    df = (spark.read.format("delta")
                  .load(f"{STAGE}/merged/{split}")
                  .select("review_id", "review_text_clean"))

    # نضيف عمود struct فيه الأربعة قيم
    df = df.withColumn("sent_struct", textblob_sentiment_udf("review_text_clean"))

    # نفصل الأعمدة لأربعة أعمدة مستقلة
    df = (df
          .withColumn("sentiment_pos",       F.col("sent_struct.sentiment_pos"))
          .withColumn("sentiment_neg",       F.col("sent_struct.sentiment_neg"))
          .withColumn("sentiment_neu",       F.col("sent_struct.sentiment_neu"))
          .withColumn("sentiment_compound",  F.col("sent_struct.sentiment_compound"))
          .drop("sent_struct", "review_text_clean")
          .dropDuplicates(["review_id"]))

    (df.write
        .format("delta")
        .mode("overwrite")
        .save(f"{STAGE}/sentiment/{split}"))

    print(f"✔ wrote sentiment → {STAGE}/sentiment/{split}")


In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

BASE_GOLD = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/feature_v2"
STAGE     = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/tmp_features_v2"

@F.pandas_udf(
    "struct<sentiment_pos:double, sentiment_neg:double, sentiment_neu:double, sentiment_compound:double>"
)
def textblob_sentiment_udf(texts):
    from textblob import TextBlob
    import pandas as pd

    def score_one(t):
        if t is None:
            return (0.0, 0.0, 1.0, 0.0)   # neutral
        t = str(t)
        blob = TextBlob(t)
        comp = float(blob.sentiment.polarity)  # من -1 إلى 1
        pos = max(comp, 0.0)
        neg = max(-comp, 0.0)
        neu = max(0.0, 1.0 - pos - neg)
        return (pos, neg, neu, comp)

    vals = [score_one(t) for t in texts]
    pdf = pd.DataFrame(
        vals,
        columns=["sentiment_pos","sentiment_neg","sentiment_neu","sentiment_compound"]
    )
    return pdf


In [0]:
def build_sentiment(split):
    print(f"=== sentiment for {split} ===")

    # نقرأ الـ split الأصلي من feature_v2 (فيه review_text)
    text_df = (spark.read.format("delta")
                     .load(f"{BASE_GOLD}/{split}")
                     .select("review_id", "review_text")
                     .dropna(subset=["review_text"]))

    # نضيف struct بالدرجات
    sent = text_df.withColumn("sent_struct", textblob_sentiment_udf("review_text"))

    # نفصل الأعمدة
    sent = (sent
            .withColumn("sentiment_pos",      F.col("sent_struct.sentiment_pos"))
            .withColumn("sentiment_neg",      F.col("sent_struct.sentiment_neg"))
            .withColumn("sentiment_neu",      F.col("sent_struct.sentiment_neu"))
            .withColumn("sentiment_compound", F.col("sent_struct.sentiment_compound"))
            .drop("sent_struct", "review_text")
            .dropDuplicates(["review_id"]))

    # نحفظها تحت tmp_features_v2/sentiment/{split}
    (sent.write
         .format("delta")
         .mode("overwrite")
         .save(f"{STAGE}/sentiment/{split}"))

    print(f"✔ wrote sentiment → {STAGE}/sentiment/{split}")


In [0]:
for s in ["val", "test", "train"]:
    build_sentiment(s)


=== sentiment for val ===
✔ wrote sentiment → abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/tmp_features_v2/sentiment/val
=== sentiment for test ===
✔ wrote sentiment → abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/tmp_features_v2/sentiment/test
=== sentiment for train ===
✔ wrote sentiment → abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/tmp_features_v2/sentiment/train


In [0]:
from pyspark.sql import functions as F, types as T

STAGE = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/tmp_features_v2"
OUT   = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/features_v2"

def path_exists(path: str) -> bool:
    try:
        dbutils.fs.ls(path)
        return True
    except:
        return False

# map فارغ احتياطاً لو tfidf_map يكون null
empty_map = F.map_from_arrays(
    F.array().cast(T.ArrayType(T.StringType())),
    F.array().cast(T.ArrayType(T.DoubleType()))
)

def finalize(split):
    # base = طول النص + الميتاداتا (من tmp_features_v2/merged/{split})
    base = (spark.read.format("delta")
                   .load(f"{STAGE}/merged/{split}")
                   .select("review_id","book_id","rating",
                           "review_length_words","review_length_chars")
                   .dropDuplicates(["review_id"]))

    # tfidf_map
    tfidf = (spark.read.format("delta")
                   .load(f"{STAGE}/tfidf_map/{split}")
                   .select("review_id","tfidf_map")
                   .dropDuplicates(["review_id"]))

    # bert_embedding
    emb = (spark.read.format("delta")
                  .load(f"{STAGE}/embeddings/{split}")
                  .select("review_id","bert_embedding")
                  .dropDuplicates(["review_id"]))

    # دمج base + tfidf + embeddings
    df = (base
          .join(tfidf, "review_id", "left")
          .withColumn("tfidf_map", F.coalesce(F.col("tfidf_map"), empty_map))
          .join(emb, "review_id", "left"))

    # دمج sentiment لو الجدول موجود
    sent_path = f"{STAGE}/sentiment/{split}"
    if path_exists(sent_path):
        sent = spark.read.format("delta").load(sent_path)
        sent_cols = [c for c in sent.columns if c.startswith("sentiment_")]
        if "review_id" not in sent_cols:
            sent_cols = ["review_id"] + sent_cols
        sent = sent.select(*sent_cols).dropDuplicates(["review_id"])
        df = df.join(sent, "review_id", "left")

    # حفظ النسخة النهائية في gold/features_v2/{split}
    (df.write
   .format("delta")
   .mode("overwrite")
   .option("overwriteSchema", "true")   # مهم
   .save(f"{OUT}/{split}"))

    print(f"✔ wrote {split} → {OUT}/{split}")


for s in ["train","val","test"]:
    finalize(s)


✔ wrote train → abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/features_v2/train
✔ wrote val → abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/features_v2/val
✔ wrote test → abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/features_v2/test


In [0]:
OUT = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/features_v2"

for split in ["train", "val", "test"]:
    print(f"=== {split.upper()} ===")
    df = spark.read.format("delta").load(f"{OUT}/{split}")
    print("Rows:", df.count())
    df.printSchema()
    df.select("review_id", "rating", "review_length_words", "tfidf_map", "bert_embedding").show(3, truncate=False)


=== TRAIN ===
Rows: 10434620
root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_length_words: integer (nullable = true)
 |-- review_length_chars: integer (nullable = true)
 |-- tfidf_map: map (nullable = true)
 |    |-- key: string
 |    |-- value: double (valueContainsNull = true)
 |-- bert_embedding: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- sentiment_pos: double (nullable = true)
 |-- sentiment_neg: double (nullable = true)
 |-- sentiment_neu: double (nullable = true)
 |-- sentiment_compound: double (nullable = true)

+--------------------------------+------+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
from pyspark.sql import functions as F

# نفس مسار الـ Gold اللي استخدمناه قبل
OUT = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/features_v2"

for split in ["train", "val", "test"]:
    print("\n" + "="*15, split.upper(), "="*15)
    df = spark.read.format("delta").load(f"{OUT}/{split}")
    
    # عدد الصفوف
    print("Rows:", df.count())
    
    # شكل الأعمدة والأنواع
    df.printSchema()
    
    # نعرض أول 5 صفوف بدون قطع للنص
    df.select(
        "review_id",
        "book_id",
        "rating",
        "review_length_words",
        "review_length_chars",
        "tfidf_map",
        "bert_embedding",
        "sentiment_pos",
        "sentiment_neg",
        "sentiment_neu",
        "sentiment_compound"
    ).show(5, truncate=False)



Rows: 10434620
root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_length_words: integer (nullable = true)
 |-- review_length_chars: integer (nullable = true)
 |-- tfidf_map: map (nullable = true)
 |    |-- key: string
 |    |-- value: double (valueContainsNull = true)
 |-- bert_embedding: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- sentiment_pos: double (nullable = true)
 |-- sentiment_neg: double (nullable = true)
 |-- sentiment_neu: double (nullable = true)
 |-- sentiment_compound: double (nullable = true)

+--------------------------------+--------+------+-------------------+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
from pyspark.sql import functions as F

OUT = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/features_v2"

for split in ["train", "val", "test"]:
    print("\n" + "="*15, split.upper(), "="*15)
    try:
        df = spark.read.format("delta").load(f"{OUT}/{split}")
        
        # لا تطبع count الآن عشان تقلل اللوج
        # print("Rows:", df.count())
        
        df.printSchema()

        # نستخدم limit عشان الجوب يكون خفيف
        sample = (
            df.select(
                "review_id",
                "book_id",
                "rating",
                "review_length_words",
                "review_length_chars",
                "tfidf_map",
                "bert_embedding",
                "sentiment_pos",
                "sentiment_neg",
                "sentiment_neu",
                "sentiment_compound"
            )
            .limit(5)
        )

        sample.show(truncate=False)

    except Exception as e:
        print(f"{split}: not found or unreadable → {e}")



root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_length_words: integer (nullable = true)
 |-- review_length_chars: integer (nullable = true)
 |-- tfidf_map: map (nullable = true)
 |    |-- key: string
 |    |-- value: double (valueContainsNull = true)
 |-- bert_embedding: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- sentiment_pos: double (nullable = true)
 |-- sentiment_neg: double (nullable = true)
 |-- sentiment_neu: double (nullable = true)
 |-- sentiment_compound: double (nullable = true)

+--------------------------------+--------+------+-------------------+-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
from pyspark.sql import functions as F

OUT = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/features_v2"

# خلينا نشتغل بس على train كبداية
split = "train"
df = spark.read.format("delta").load(f"{OUT}/{split}")

df.select(
    "review_id",
    "book_id",
    "rating",
    "review_length_words",
    "review_length_chars",
    "tfidf_map",
    "bert_embedding",
    "sentiment_pos",
    "sentiment_neg",
    "sentiment_neu",
    "sentiment_compound"
).show(5, truncate=False)


+--------------------------------+--------+------+-------------------+-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
from pyspark.sql import functions as F

OUT = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/features_v2"

for split in ["train", "val", "test"]:
    print("\n" + "="*15, split.upper(), "="*15)
    
    df = spark.read.format("delta").load(f"{OUT}/{split}")
    
    # لا تستخدم count هنا عشان اللوج
    # print("Rows:", df.count())
    
    df.select(
        "review_id",
        "book_id",
        "rating",
        "review_length_words",
        "review_length_chars",
        "sentiment_pos",
        "sentiment_neg",
        "sentiment_neu",
        "sentiment_compound"
    ).limit(5).show(truncate=False)



+--------------------------------+--------+------+-------------------+-------------------+-------------------+-------------------+------------------+--------------------+
|review_id                       |book_id |rating|review_length_words|review_length_chars|sentiment_pos      |sentiment_neg      |sentiment_neu     |sentiment_compound  |
+--------------------------------+--------+------+-------------------+-------------------+-------------------+-------------------+------------------+--------------------+
|001cae58709fffae988fabec43287177|13477676|3     |126                |614                |0.0                |0.03269688644688644|0.9673031135531136|-0.03269688644688644|
|007ada296a8fa02bcc277450c17a2358|5129    |4     |157                |813                |0.21947601010101012|0.0                |0.7805239898989899|0.21947601010101012 |
|00e6a53809180b8435275328d8783fed|7728889 |5     |7                  |42                 |0.75               |0.0                |0.25          

In [0]:
display(df.select(
    "review_id", "book_id", "rating",
    "review_length_words", "review_length_chars",
    "sentiment_pos", "sentiment_neg", "sentiment_neu", "sentiment_compound"
).limit(10))


review_id,book_id,rating,review_length_words,review_length_chars,sentiment_pos,sentiment_neg,sentiment_neu,sentiment_compound
0135dded366343d3df2d952c0399589f,18655866,5,15,58,0.675,0.0,0.3249999999999999,0.675
046018168f318a200b0696c0602d64fc,15703337,2,514,2721,0.0038293650793651,0.0,0.9961706349206348,0.0038293650793651
04eb2b620033664780aa494f962b1e6f,15847315,3,249,1264,0.2359375,0.0,0.7640625,0.2359375
06faafd8446f994c00b9edd82300bf1e,15541561,5,34,170,0.44,0.0,0.56,0.44
070305fb0b4f1ac43f1d39b8bee20429,30971715,5,149,801,0.0307291666666666,0.0,0.9692708333333332,0.0307291666666666
08b8db54af9499a89ae02cdb3e9cc255,24378015,3,3,17,0.0,-0.0,1.0,0.0
09d48fc82584a6a060b33c6d0e17be59,6900,5,172,881,0.2449194324194324,0.0,0.7550805675805676,0.2449194324194324
0a3273945fa0f5647140fde275b6a225,36298130,3,40,187,0.0,0.0774999999999999,0.9225,-0.0774999999999999
0acb9bca6a005ae2cf49b05780b16be3,8487363,5,151,702,0.2866666666666667,0.0,0.7133333333333333,0.2866666666666667
0b248bb9c9fa8412bba670269ce286a4,20881537,4,6,36,1.0,0.0,0.0,1.0


In [0]:
df = spark.read.format("delta").load("abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/features_v2/train")

# extract one tfidf map and count its keys
row = df.select("tfidf_map").limit(1).collect()[0]
vocab_size = len(row["tfidf_map"])

print("TF-IDF features:", vocab_size)
print("BERT embedding features:", 384)
print("Sentiment features:", 4)
print("Length features:", 2)
print("Metadata:", 3)
print("Estimated total:", vocab_size + 384 + 4 + 2 + 3)


TF-IDF features: 149
BERT embedding features: 384
Sentiment features: 4
Length features: 2
Metadata: 3
Estimated total: 542


In [0]:
df = spark.read.format("delta").load(OUT + "/train")
print("Columns:", len(df.columns))
df.printSchema()

Columns: 11
root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_length_words: integer (nullable = true)
 |-- review_length_chars: integer (nullable = true)
 |-- tfidf_map: map (nullable = true)
 |    |-- key: string
 |    |-- value: double (valueContainsNull = true)
 |-- bert_embedding: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- sentiment_pos: double (nullable = true)
 |-- sentiment_neg: double (nullable = true)
 |-- sentiment_neu: double (nullable = true)
 |-- sentiment_compound: double (nullable = true)



### **Sampling and Training**

In [0]:
from pyspark.sql import functions as F

BASE = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/features_v2"

# حمل الداتا
train_df = spark.read.format("delta").load(f"{BASE}/train")
val_df   = spark.read.format("delta").load(f"{BASE}/val")
test_df  = spark.read.format("delta").load(f"{BASE}/test")

print("Train rows:", train_df.count())
print("Val rows  :", val_df.count())
print("Test rows :", test_df.count())

# نختار تقريباً 400K من الـ train
target_n = 400_000
train_n  = train_df.count()
frac     = min(1.0, float(target_n) / float(train_n))

train_sample = train_df.sample(withReplacement=False, fraction=frac, seed=42)
print("Sampled train rows:", train_sample.count())


Train rows: 10434620
Val rows  : 2235157
Test rows : 2236333
Sampled train rows: 400087


In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.ml.linalg import SparseVector, VectorUDT, Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

FEATURES_PATH = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/features_v2"


In [0]:
# اقرأ الداتا النهائية من gold/features_v2
train_full = spark.read.format("delta").load(f"{FEATURES_PATH}/train")
val_full   = spark.read.format("delta").load(f"{FEATURES_PATH}/val")

# خذ 400 ألف سامبل من train (بدون replacement)
train_df = train_full.sample(withReplacement=False, fraction=400000 / train_full.count(), seed=42)
train_df = train_df.limit(400000)

# نخلي val كامل (غالباً 2.2M) أو إذا حاب تصغره ممكن تعمل limit
val_df = val_full

print("Train rows:", train_df.count())
print("Val rows:", val_df.count())
train_df.select("review_id", "rating", "tfidf_map", "bert_embedding").show(3, truncate=False)


Train rows: 400000
Val rows: 2235157
+--------------------------------+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
tokens_df = (
    train_df.select(F.explode(F.map_keys("tfidf_map")).alias("token"))
    .unionByName(val_df.select(F.explode(F.map_keys("tfidf_map")).alias("token")))
    .groupBy("token").count()
    .orderBy(F.desc("count"))
    .limit(149)
)

tokens = [r["token"] for r in tokens_df.collect()]
token2idx = {t: i for i, t in enumerate(tokens)}
dim = len(tokens)
print("TF-IDF vocab size:", dim)


TF-IDF vocab size: 149


In [0]:
from pyspark.ml.linalg import SparseVector

def map_tfidf_partitions(rows, token2idx, dim):
    for r in rows:
        m = r["tfidf_map"]
        if m is None:
            yield (r["review_id"], SparseVector(dim, [], []))
            continue
        
        idxs = []
        vals = []
        for token, weight in m.items():
            idx = token2idx.get(token)
            if idx is not None:
                idxs.append(idx)
                vals.append(float(weight))
        
        yield (r["review_id"], SparseVector(dim, idxs, vals))

def tfidf_to_sparse(df, token2idx):
    dim = len(token2idx)

    rdd = df.rdd.mapPartitions(
        lambda part: map_tfidf_partitions(part, token2idx, dim)
    )

    return spark.createDataFrame(rdd, ["review_id", "tfidf_sparse"])



In [0]:
from pyspark.sql import functions as F
from pyspark.ml.linalg import SparseVector, VectorUDT

OUT = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/features_v2"

# نقرأ أي split عشان نعرف حجم الـ vocab (كلها نفس الشيء)
train_df_full = spark.read.format("delta").load(f"{OUT}/train")

# نحتاج فقط الـ tfidf_map لمرة واحدة لعمل mapping من token -> idx
# نأخذ أول عدة صفوف فيها بيانات
some_maps = (train_df_full
             .select("tfidf_map")
             .where(F.col("tfidf_map").isNotNull())
             .limit(1000)
             .collect())

# نبني set بكل التوكنات الموجودة في الـ maps
tokens = set()
for row in some_maps:
    for k in row["tfidf_map"].keys():
        tokens.add(k)

# نرتبهم ونعطي كل token index
tokens_sorted = sorted(tokens)
token2idx = {tok: i for i, tok in enumerate(tokens_sorted)}
dim_tfidf = len(token2idx)

print("TF-IDF vocab size (from maps):", dim_tfidf)


TF-IDF vocab size (from maps): 82209


In [0]:
from pyspark.sql import functions as F

OUT = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/features_v2"

# نقرأ train النهائي
train_full = spark.read.format("delta").load(f"{OUT}/train")

# نشتغل على عينة أصغر لو حاب (اختياري لتسريع العد)
train_for_vocab = train_full.limit(400000)   # تقدر تخليها نفس 400k اللي حنعمل عليها تدريب

# نفجر الـ tfidf_map إلى token + value
tokens_df = (
    train_for_vocab
      .select(F.explode("tfidf_map").alias("token", "weight"))
      .groupBy("token")
      .agg(F.sum("weight").alias("tf_sum"))
      .orderBy(F.desc("tf_sum"))
      .limit(149)      # <-- هنا نثبت 149 بالضبط
)

tokens_df.show(10, truncate=False)

# نجمعهم في driver كـ list
top_tokens = [r["token"] for r in tokens_df.collect()]

len(top_tokens), top_tokens[:10]


+---------+------------------+
|token    |tf_sum            |
+---------+------------------+
|i        |11291.100945596543|
|this     |6813.482971726549 |
|book     |6515.9765079910185|
|num      |5739.432946924137 |
|read     |4816.868972889467 |
|but      |4116.053054151503 |
|s        |4082.166663514707 |
|t        |3551.3369806466017|
|story    |3540.288434567663 |
|this book|3455.370557239999 |
+---------+------------------+
only showing top 10 rows


(149,
 ['i', 'this', 'book', 'num', 'read', 'but', 's', 't', 'story', 'this book'])

In [0]:
# mapping من token -> index
token2idx = {tok: i for i, tok in enumerate(top_tokens)}
dim_tfidf = len(token2idx)

print("TF-IDF feature dimension:", dim_tfidf)


TF-IDF feature dimension: 149


In [0]:
from pyspark.sql.functions import udf
from pyspark.ml.linalg import SparseVector, VectorUDT
from pyspark.sql import functions as F

@udf(VectorUDT())
def tfidf_map_to_vec(m):
    if m is None:
        return SparseVector(dim_tfidf, [], [])
    
    idxs = []
    vals = []
    for k, v in m.items():
        idx = token2idx.get(k)   # فقط الكلمات الـ 149
        if idx is not None:
            idxs.append(idx)
            vals.append(float(v))
    
    if not idxs:
        return SparseVector(dim_tfidf, [], [])
    
    return SparseVector(dim_tfidf, idxs, vals)


In [0]:
from pyspark.sql.functions import udf
from pyspark.ml.linalg import SparseVector, VectorUDT
from pyspark.sql import functions as F

@udf(VectorUDT())
def tfidf_map_to_vec(m):
    if m is None:
        return SparseVector(dim_tfidf, [], [])

    idx_val = {}  # نجمع هنا
    for k, v in m.items():
        idx = token2idx.get(k)
        if idx is not None:
            idx_val[idx] = idx_val.get(idx, 0.0) + float(v)

    if not idx_val:
        return SparseVector(dim_tfidf, [], [])

    # نرتّب الـ indices
    idxs = sorted(idx_val.keys())
    vals = [idx_val[i] for i in idxs]

    return SparseVector(dim_tfidf, idxs, vals)



In [0]:
# 400k من train
train_400k = train_full.limit(400000)

# إضافة tfidf_vec بالـ UDF الجديد
train_400k_vec = train_400k.withColumn("tfidf_vec", tfidf_map_to_vec(F.col("tfidf_map")))

# تأكد
train_400k_vec.select("tfidf_vec").printSchema()
train_400k_vec.select("tfidf_vec").show(1, truncate=False)


root
 |-- tfidf_vec: vectorudt (nullable = true)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|tfidf_vec                                                                                                                                   

In [0]:
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors, VectorUDT

@udf(VectorUDT())
def bert_arr_to_vec(a):
    if a is None:
        return Vectors.dense([])
    return Vectors.dense(a)


In [0]:
train_400k_all = (
    train_400k_vec
      .withColumn("bert_vec", bert_arr_to_vec("bert_embedding"))
)


In [0]:
import pandas as pd

cols = [
    "tfidf_vec",
    "bert_vec",
    "sentiment_pos", "sentiment_neg", "sentiment_neu", "sentiment_compound",
    "review_length_words", "review_length_chars",
    "rating"
]

pdf = train_400k_all.select(cols).toPandas()
print(pdf.shape)
pdf.head()


(400000, 9)


Unnamed: 0,tfidf_vec,bert_vec,sentiment_pos,sentiment_neg,sentiment_neu,sentiment_compound,review_length_words,review_length_chars,rating
0,"(0.03711862461898635, 0.0, 0.0, 0.0, 0.0, 0.02...","[-0.028768088668584824, 0.007945484481751919, ...",0.159062,0.0,0.840938,0.159062,56,310,3
1,"(0.035239200587645264, 0.03728069545034738, 0....","[-0.08673273772001266, -0.018838806077837944, ...",0.199116,0.0,0.800884,0.199116,51,288,5
2,"(0.0, 0.0775046036994064, 0.0, 0.0, 0.0, 0.093...","[-0.025239789858460426, -0.011545519344508648,...",0.0,0.133333,0.866667,-0.133333,14,94,2
3,"(0.06186437436497725, 0.03272416600641604, 0.0...","[-0.019573692232370377, -0.057219814509153366,...",0.38,0.0,0.62,0.38,36,189,4
4,"(0.019604907369182932, 0.0, 0.0230007000647465...","[-0.07872558385133743, -0.03259136900305748, 0...",0.175,0.0,0.825,0.175,61,289,4


In [0]:
import numpy as np

num_cols = [
    "sentiment_pos", "sentiment_neg", "sentiment_neu", "sentiment_compound",
    "review_length_words", "review_length_chars"
]

def row_to_np(row):
    # TF-IDF كـ SparseVector -> array
    tf = row["tfidf_vec"].toArray()
    
    # BERT ممكن يكون list أو array
    bert = np.array(row["bert_vec"])
    
    # باقي الأرقام
    extra = row[num_cols].values.astype("float32")
    
    return np.concatenate([tf, bert, extra])

X = np.vstack(pdf.apply(row_to_np, axis=1))
y = pdf["rating"].astype(int).values

print("X shape:", X.shape)   
print("y shape:", y.shape)


X shape: (400000, 539)
y shape: (400000,)


In [0]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report

X_train, X_val, y_train, y_val = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

clf = RandomForestClassifier(
    n_estimators=200,
    max_depth=None,
    n_jobs=-1,
    random_state=42
)

clf.fit(X_train, y_train)

y_pred = clf.predict(X_val)
print(classification_report(y_val, y_pred))


Uploading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/9 [00:00<?, ?it/s]

Uploading /local_disk0/user_tmp_data/spark-65a3a3f7-fe22-4ca9-bdb3-8d/tmp_w4vpdm5/model/model.pkl:   0%|      …

🏃 View run sassy-grub-659 at: https://adb-3098290281736649.9.azuredatabricks.net/ml/experiments/2348951171668039/runs/8caa298a243d4397affea47294f6ef95
🧪 View experiment at: https://adb-3098290281736649.9.azuredatabricks.net/ml/experiments/2348951171668039
              precision    recall  f1-score   support

           1       0.59      0.05      0.08      2372
           2       0.42      0.03      0.06      5820
           3       0.40      0.19      0.25     16370
           4       0.40      0.60      0.48     27634
           5       0.57      0.62      0.59     27804

    accuracy                           0.47     80000
   macro avg       0.48      0.30      0.30     80000
weighted avg       0.47      0.47      0.43     80000



### **Test Dataset**

In [0]:
import mlflow
from mlflow.pyfunc import load_model

model_uri = "runs:/2073a2e98e734385ba120d9b38d5a116/model"
clf = load_model(model_uri)


Downloading artifacts:   0%|          | 0/9 [00:00<?, ?it/s]

Downloading /local_disk0/user_tmp_data/spark-65a3a3f7-fe22-4ca9-bdb3-8d/tmpluba8b0w/model/model.pkl:   0%|    …

In [0]:
test_full = spark.read.format("delta").load(f"{OUT}/test")
print(test_full.count(), "rows in TEST")
test_full.printSchema()


2236333 rows in TEST
root
 |-- review_id: string (nullable = true)
 |-- book_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_length_words: integer (nullable = true)
 |-- review_length_chars: integer (nullable = true)
 |-- tfidf_map: map (nullable = true)
 |    |-- key: string
 |    |-- value: double (valueContainsNull = true)
 |-- bert_embedding: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- sentiment_pos: double (nullable = true)
 |-- sentiment_neg: double (nullable = true)
 |-- sentiment_neu: double (nullable = true)
 |-- sentiment_compound: double (nullable = true)



In [0]:
test_vec = test_full.withColumn(
    "tfidf_vec",
    tfidf_map_to_vec(F.col("tfidf_map"))
)


In [0]:
test_vec = test_vec.withColumn(
    "bert_vec",
    F.array(*[F.col("bert_embedding")[i] for i in range(384)])
)


In [0]:
from pyspark.sql import functions as F
import numpy as np
from sklearn.metrics import classification_report

# نفس المسار اللي استخدمناه قبل
OUT = "abfss://lakehouse@goodreadsreviews60314097.dfs.core.windows.net/gold/features_v2"

# --- 1) تحميل test من gold/features_v2 ---
test_full = spark.read.format("delta").load(f"{OUT}/test")

# خذ عيّنة 400k زي train (تقدر تغيّر الرقم لو حابب)
test_400k = test_full.limit(400000)

# --- 2) إضافة عمود tfidf_vec باستخدام نفس UDF اللي استخدمناه في التدريب ---
# مهم: لازم يكون tfidf_map_to_vec, token2idx_bc, dim_bc موجودين من قبل
test_400k_vec = test_400k.withColumn("tfidf_vec", tfidf_map_to_vec(F.col("tfidf_map")))


In [0]:
# نختار نفس الأعمدة اللي استخدمناها في التدريب
test_pdf = (
    test_400k_vec
      .select(
          "tfidf_vec",
          "bert_embedding",
          "sentiment_pos", "sentiment_neg", "sentiment_neu", "sentiment_compound",
          "review_length_words", "review_length_chars",
          F.col("rating").alias("label")
      )
      .toPandas()
)


In [0]:
# TF-IDF من SparseVector -> numpy
tfidf_mat = np.stack([np.array(v.toArray()) for v in test_pdf["tfidf_vec"]])

# BERT embeddings: already array<float>
bert_mat = np.stack(test_pdf["bert_embedding"].to_list())

# الميزات الاسكالرية: sentiment + lengths
other_feats = test_pdf[
    ["sentiment_pos", "sentiment_neg", "sentiment_neu", "sentiment_compound",
     "review_length_words", "review_length_chars"]
].to_numpy(dtype="float32")

# دمج كل شيء في مصفوفة واحدة بالترتيب نفسه اللي استخدمناه في التدريب
X_test = np.hstack([tfidf_mat, bert_mat, other_feats])

y_test = test_pdf["label"].to_numpy()


In [0]:
y_pred_test = clf.predict(X_test)
print(classification_report(y_test, y_pred_test))


              precision    recall  f1-score   support

           1       0.65      0.04      0.08     11640
           2       0.40      0.02      0.04     29372
           3       0.39      0.16      0.23     81678
           4       0.39      0.61      0.48    138661
           5       0.55      0.58      0.57    138649

    accuracy                           0.45    400000
   macro avg       0.48      0.28      0.28    400000
weighted avg       0.46      0.45      0.41    400000

