In [11]:

from pyspark.sql import SparkSession, functions as F, types as T, Window


def get_spark(app_name: str = "LocalSparkApp", memory: str = "4g") -> SparkSession:
    spark = (
        SparkSession.builder
        .appName(app_name)
        .master("local[*]")  # run locally on all cores
        .config("spark.sql.shuffle.partitions", "4")
        .config("spark.driver.memory", memory)
        .config("spark.sql.execution.arrow.pyspark.enabled", "true")
        .getOrCreate()
    )

    spark.sparkContext.setLogLevel("WARN")
    return spark

In [12]:
# <-- replace with your silver dataset path
SILVER_PATH = "../data/silver/amazon_reviews_furniture"
GOLD_OUT_PATH = "../data/gold/sentiment/amazon_reviews_furniture"
EXPERIMENT_NAME = "gold_sentiment_analysis"
MLFLOW_URI = "file:../mlruns"

REQUIRED_FIELDS = [
    "review_id", "product_id", "customer_id",
    "star_rating", "review_date", "review_body"
]

SENTIMENT_MAP = {
    "negative": [1, 2],
    "neutral": [3],
    "positive": [4, 5]
}

In [13]:
spark = get_spark('Gold-sentiment-analysis')
df = spark.read.parquet(SILVER_PATH)
print(
    f"Loaded Silver dataset with {df.count()} rows and {len(df.columns)} columns.")

df.printSchema()
df.show(5)

Loaded Silver dataset with 791971 rows and 17 columns.
root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: float (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: boolean (nullable = true)
 |-- verified_purchase: boolean (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = true)
 |-- source_file: string (nullable = true)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+-----+-----------------+------------------

In [14]:
missing = [c for c in REQUIRED_FIELDS if c not in df.columns]
if missing:
    raise ValueError(f"Missing required fields: {missing}")

# Filter invalid or incomplete rows
df = df.filter(
    F.col("review_body").isNotNull() &
    (F.length(F.trim(F.col("review_body"))) > 5) &
    (F.col("star_rating").isNotNull())
)
df = df.withColumn("star_rating", F.col("star_rating").cast(T.IntegerType()))
df = df.filter(F.col("star_rating").between(1, 5))

print(f"After filtering: {df.count()} rows remain.")

After filtering: 786952 rows remain.


In [28]:
def basic_text_cleaning(df, text_col="review_body", out_col="clean_text"):
    expr = F.col(text_col)
    expr = F.lower(F.regexp_replace(expr, r"\s+", " "))
    expr = F.regexp_replace(expr, r"<[^>]+>", '')
    expr = F.regexp_replace(expr, r"[“”«»„‟]", '"')
    df = df.withColumn(out_col, expr)
    df = df.withColumn("n_chars", F.length(F.col(out_col)))
    df = df.withColumn("n_words", F.size(F.split(F.col(out_col), " ")))
    return df


df = basic_text_cleaning(df)

df.select("clean_text", "n_words").show(5, truncate=80)

+--------------------------------------------------------------------------------+-------+
|                                                                      clean_text|n_words|
+--------------------------------------------------------------------------------+-------+
|we were skeptical about ordering this online but it arrived within 2 days and...|     36|
|this worked perfectly with my queen mattress and frame and most importantly f...|     49|
|i love everything about the rug. the colors are perfect and it is nice qualit...|     25|
|would have gave it a 5 star, but had hardware necessary for assembly issues.....|     89|
|                                       great dresser and very easy to assemble .|      8|
+--------------------------------------------------------------------------------+-------+
only showing top 5 rows



In [29]:
def map_ratings_to_labels(df, rating_col="star_rating", out_col="sentiment_label"):
    mapping = SENTIMENT_MAP
    expr = F.when(F.col(rating_col).isin(mapping["negative"]), F.lit("negative")) \
        .when(F.col(rating_col).isin(mapping["neutral"]), F.lit("neutral")) \
        .when(F.col(rating_col).isin(mapping["positive"]), F.lit("positive")) \
        .otherwise(F.lit("neutral"))
    return df.withColumn(out_col, expr)


df = map_ratings_to_labels(df)

# Quick check
df.groupBy("sentiment_label").count().show()

+---------------+------+
|sentiment_label| count|
+---------------+------+
|       positive|596840|
|       negative|116892|
|        neutral| 73220|
+---------------+------+



In [30]:
product_window = Window.partitionBy(
    "product_id").orderBy(F.col("review_date").desc())
user_window = Window.partitionBy("user_id").orderBy("review_date")

df = df.withColumn("review_rank", F.rank().over(product_window))
df = df.withColumn("avg_sentiment_product", F.avg(
    "sentiment_label").over(product_window.rowsBetween(-5, 0)))

In [31]:
selected_cols = [
    "review_id", "product_id", "customer_id",
    "clean_text", "sentiment_label", "star_rating",
    "review_date", "helpful_votes", "total_votes", "verified_purchase"
]
selected_cols = [c for c in selected_cols if c in df.columns]

df_gold = df.select(*selected_cols)

df_gold.show(5, truncate=100)

+--------------+----------+-----------+----------------------------------------------------------------------------------------------------+---------------+-----------+-----------+-------------+-----------+-----------------+
|     review_id|product_id|customer_id|                                                                                          clean_text|sentiment_label|star_rating|review_date|helpful_votes|total_votes|verified_purchase|
+--------------+----------+-----------+----------------------------------------------------------------------------------------------------+---------------+-----------+-----------+-------------+-----------+-----------------+
|R1000XRMHJXTEZ|B006ZP2926|   35415428|we were skeptical about ordering this online but it arrived within 2 days and my husband didn't h...|       positive|          5| 2012-03-07|            0|          0|             true|
| R10011PNF3T89|B00OUWHUCU|   19786430|this worked perfectly with my queen mattress and frame and mo

In [32]:
import os
import json
os.makedirs(GOLD_OUT_PATH, exist_ok=True)

out_path = os.path.join(GOLD_OUT_PATH, "sentiment.parquet")
df_gold.write.mode("overwrite").parquet(out_path)

# Metadata
created_at = df_gold.select(F.current_timestamp().alias(
    "created_at")).first()["created_at"]

meta = {
    # convert to string if you plan to JSON dump
    "created_at": str(created_at),
    "total_rows": df_gold.count(),
    "label_distribution": {
        r["sentiment_label"]: r["count"]
        for r in df_gold.groupBy("sentiment_label").count().collect()
    }
}
with open(os.path.join(GOLD_OUT_PATH, "sentiment_meta.json"), "w") as f:
    json.dump(meta, f, indent=2)

print(f"✅ Gold sentiment dataset saved to {out_path}")

                                                                                

✅ Gold sentiment dataset saved to ../data/gold/sentiment/amazon_reviews_furniture/sentiment.parquet


In [33]:
import mlflow

mlflow.set_tracking_uri(MLFLOW_URI)
mlflow.set_experiment(EXPERIMENT_NAME)

with mlflow.start_run():
    mlflow.log_param("total_rows", df_gold.count())
    mlflow.log_param("n_columns", len(df_gold.columns))
    mlflow.log_param("columns", ", ".join(df_gold.columns))
    # Label distribution
    label_dist = df_gold.groupBy("sentiment_label").count().collect()
    total = sum([r["count"] for r in label_dist])

    for r in label_dist:
        label = r["sentiment_label"]
        ratio = r["count"] / total
        mlflow.log_metric(f"label_ratio_{label}", ratio)

    # Average review length
    from pyspark.sql import functions as F
    avg_length = df_gold.select(F.avg(F.length("clean_text"))).first()[0]
    mlflow.log_metric("avg_review_length", float(avg_length))

    # Missing data ratio (quick data health check)
    missing_ratios = {
        col: df_gold.filter(F.col(col).isNull()).count() / total
        for col in df_gold.columns
    }
    for col, ratio in missing_ratios.items():
        mlflow.log_metric(f"missing_ratio_{col}", ratio)

    completeness_score = 1 - \
        sum(missing_ratios.values()) / len(df_gold.columns)
    balance_score = 1 - \
        max(abs(r["count"] - total / len(label_dist)) /
            total for r in label_dist)

    mlflow.log_metric("completeness_score", completeness_score)
    mlflow.log_metric("balance_score", balance_score)

                                                                                