# Sentiment Analysis with Amazon Reviews - Classic Compute

+ Use the open-sourced Amazon Review data-set to experiment with Sentiment Analysis on customer reviews.  
+ Use only Databricks Classic Compute with ML DBR

Trial with 15.4 ML DBR (non GPU). Also tested with 15.4 ML GPU to speed up Embeddings generation.

**Feature Data**.   
The *Features Table* has been pre-generated with the first section of the notebook "*Amazon Reviews Sentiment*" (sections "Load Sample Dataset to {catalog}.{database}.{table}" and "Create a Features Table with RowIDs")




**SiEBERT**.   
SiEBERT is a sophisticated sentiment analysis model built on RoBERTa-large architecture, specifically designed for binary sentiment classification in English text. Fine-tuned on 15 diverse datasets, it demonstrates exceptional generalization capabilities across various text types, from product reviews to social media content.


Examples: 
+ Cardiff NLP Twitter-roBERTa-base: https://huggingface.co/cardiffnlp/twitter-roberta-base-sentiment   
+ Kaggle with RoBERTa transformer: https://www.kaggle.com/code/stefancomanita/sentiment-analysis-with-hugging-face-transformers
+ SiEBERT: https://huggingface.co/siebert/sentiment-roberta-large-english 

SiEBERT is a binary classifier,  Cardif NLP is just good, bad, medium

Target approach:
- Use SiEBERT (or any transformer model) to extract [CLS] embeddings 
- These embeddings capture sentiment, semantic structure, and topical signals
- Train multiple downstream models
- One for star rating prediction (classification or regression).
- One for topic classification (multi-class or multi-label).
- Possibly one for spam detection, toxicity, etc.


# PART 1 - GENERAL Environment Setup




## Parameterize the Catalog and Schema Location

In [0]:
dbutils.widgets.text("catalog", "users")
dbutils.widgets.text("database", "")
dbutils.widgets.text("features_table", "review_features")  # this table should be pre-created and populated with data
dbutils.widgets.text("embeddings_table", "review_embeddings")   # this gets created and populated with this notebook
dbutils.widgets.text("predictions_table", "review_cl_predictions")  # CLASSIC predictions table created and populated with this notebook - GIVE IT A DIFFERENT NAME to GenAI example notebook  
dbutils.widgets.text("experiment_name", "/Users/ed.bullen@databricks.com/rating_model_experiment")  # MLFlow model training experiment name

In [0]:
print("Table location params set to", dbutils.widgets.getAll())
catalog = dbutils.widgets.get("catalog")
database = dbutils.widgets.get("database")
features_table = dbutils.widgets.get("features_table")  # Amazon reviews with a surrogate ID key
embeddings_table = dbutils.widgets.get("embeddings_table")  
predictions_table = dbutils.widgets.get("predictions_table")
experiment_name = dbutils.widgets.get("experiment_name")

In [0]:
%sql
SELECT * FROM `${catalog}`.`${database}`.`${features_table}`
LIMIT 10;

## Helper Functions and Libraries

In [0]:
import pandas as pd
import numpy as np
from transformers import AutoTokenizer, AutoModel
import torch
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import classification_report

In [0]:
# This should already be in place for DBR 15.4 ML
#%pip install -U transformers scikit-learn pandas


**Huggingface Model Download** 
This step downloads the `sentiment-roberta-large-english` model from Huggingface

In [0]:
tokenizer = AutoTokenizer.from_pretrained("siebert/sentiment-roberta-large-english")
model = AutoModel.from_pretrained("siebert/sentiment-roberta-large-english")
model.eval()
model = model.to("cuda" if torch.cuda.is_available() else "cpu")

# Simple non-parallized get_embedding() function for small data-sets only. See UDF function in PART 2 for bulk operations
#def get_embedding(text):
#    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=512)
#    inputs = {k: v.to(model.device) for k, v in inputs.items()}
#    with torch.no_grad():
#        outputs = model(**inputs)
#    return outputs.last_hidden_state[:, 0, :].squeeze().cpu().numpy()

In [0]:
# Batch embedding function - process in batches to cut down tokenization overhead and improve throughput
#def get_batch_embeddings(texts, batch_size=16):
#    all_embeddings = []
#    for i in range(0, len(texts), batch_size):
#        batch_texts = texts[i:i+batch_size]
#        inputs = tokenizer(batch_texts, return_tensors="pt", padding=True, truncation=True, max_length=512)
#        inputs = {k: v.to(model.device) for k, v in inputs.items()}
#        with torch.no_grad():
#            outputs = model(**inputs)
#        batch_embeddings = outputs.last_hidden_state[:, 0, :].cpu().numpy()
#        all_embeddings.append(batch_embeddings)
#    return np.vstack(all_embeddings)

In [0]:
# Spark parallel cluster exec - get embeddings
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import ArrayType, FloatType
from transformers import AutoTokenizer, AutoModel
import torch
import pandas as pd

# Define the scalar iterator UDF for batch processing
@pandas_udf(ArrayType(FloatType()), functionType=PandasUDFType.SCALAR_ITER)
def generate_embeddings(reviews_iter):
    # Load model/tokenizer once per partition
    tokenizer = AutoTokenizer.from_pretrained("siebert/sentiment-roberta-large-english")
    model = AutoModel.from_pretrained("siebert/sentiment-roberta-large-english")
    model.eval()
    model.to("cuda" if torch.cuda.is_available() else "cpu")

    for reviews in reviews_iter:
        embeddings = []
        for text in reviews:
            inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=512)
            inputs = {k: v.to(model.device) for k, v in inputs.items()}
            with torch.no_grad():
                outputs = model(**inputs)
                vector = outputs.last_hidden_state[:, 0, :].squeeze().cpu().numpy()
            embeddings.append(vector.astype(float).tolist())
        yield pd.Series(embeddings)


In [0]:
torch.cuda.is_available()

In [0]:
# Simple non-parallized get_embedding() function returns same pd.Series(embeddings) format as the UDF
#def get_embeddings_series(text):
#    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=512)
#    inputs = {k: v.to(model.device) for k, v in inputs.items()}
#    with torch.no_grad():
#        outputs = model(**inputs)
#    vector = outputs.last_hidden_state[:, 0, :].squeeze().cpu().numpy()
#    return pd.Series([vector.astype(float).tolist()])

# PART 2 - Embeddings Setup

This takes a long time to do without a GPU.  If the model has already been trained, skip forward to "PART 4 - Use the model to predict"

## Load SiEBERT and Extract Embeddings

Only take the first 18859 features, hold the remaining 1000 rows back for a final test of the model that is trained on the selected feature-set (i.e. keep a "holdout data set" for later).

Running the process to extract embeddings takes a long time without a GPU.  However this can be parallelised on a Spark cluster (with regular CPUs).  Despite only having a small data-set, parition it up into (say) 64 partitions so that the embeddings extract can be split in parallel across cluster nodes and CPU cores.  In this example 64 partitions were created to distribute accross a 4 node cluster with 4 cores on each node. This reduced processing time from nearly 2 hours to ~ 20 minutes.  Add more nodes for faster processing.

A Pandas UDF called `generated_embeddings()`is defined and then used to wrap the embeddings generation into a Spark worker-node execution distributed in parallel around the cluster. 

In [0]:
# load data into a spark DF for cluster processing and force it to be partitioned (despite it being small)
spark_df = spark.sql(f"SELECT * FROM {catalog}.{database}.{features_table} WHERE id <= 18859").repartition(32)

### Extract Embeddings with SiEBERT

In [0]:
# check the number of partitions to see that the data-frame is distributed in the cluster and suitable for multiple worker processes 
print(spark_df.rdd.getNumPartitions())  

In [0]:
# Add the embeddings as a new column to spark_df, using the embedding UDF
spark_df = spark_df.withColumn("embeddings", generate_embeddings(spark_df["review"]))

In [0]:
display(spark_df)

In [0]:
spark_df.printSchema()

# PART 3 - Use the Embeddings and Train a Model


## Train a Classifier - Rating 1 to 5

In [0]:
import mlflow
#import mlflow.sklearn
from mlflow.exceptions import RestException

# Set the MLflow registry URI to Databricks Unity Catalog
mlflow.set_registry_uri("databricks-uc")
model_registry_name = f"{catalog}.{database}.rating_model"

mlflow.set_experiment(experiment_name)

In [0]:
# spark_df copied to pandas_df (now running single threaded in the driver node)
#pandas_df = spark_df.toPandas()

## Taining with UDF VERSION 1

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors

# Define the UDF
vector_udf = F.udf(lambda arr: Vectors.dense(arr), ArrayType(DoubleType()))

# Apply the UDF
df_vec = spark_df.withColumn("features", vector_udf("embeddings"))

gbt   = GBTRegressor(labelCol="rating", featuresCol="features",
                     maxIter=100, stepSize=0.1, maxDepth=5, seed=42)

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DoubleType

# Convert array<double> to Vector
array_to_vector_udf = udf(lambda array: Vectors.dense(array), VectorUDT())
df_vec = df_vec.withColumn('features_vec', array_to_vector_udf(df_vec['features']))

assembler = VectorAssembler(inputCols=['features_vec'], outputCol='features_vec_assembled')
df_vec = assembler.transform(df_vec).drop('features').drop('features_vec').withColumnRenamed('features_vec_assembled', 'features')

mlflow.set_experiment(experiment_name)
import mlflow.spark
with mlflow.start_run():
    mlflow.spark.autolog()
    model = gbt.fit(df_vec)       
experiment_metadata = dict(mlflow.get_experiment_by_name(experiment_name))

## Training with UDF and VECTOR ASSEMBER VERSION 2


Notes on training and logging:
https://docs.databricks.com/aws/en/machine-learning/train-model/xgboost-spark

In [0]:
# Check PYSPARK_PIN_THREAD (this has to be set in the cluster configuation and meeds to be FALSE for autologging to work) 
#
# Using Databricks UI: 
# Go to Edit cluster settings.
# Scroll to Advanced options and open the Spark tab.
# In the **Environment variables** field, enter (or correct): PYSPARK_PIN_THREAD=false

import os
print("PYSPARK_PIN_THREAD =", os.environ.get("PYSPARK_PIN_THREAD"))

In [0]:
from pyspark.ml.functions import array_to_vector
from pyspark.ml.regression import GBTRegressor
import mlflow, mlflow.spark

mlflow.set_experiment(experiment_name)

# 1. DataFrame with a native vector column (no UDF)
df = (
    spark_df
      .select("rating", array_to_vector("embeddings").alias("features"))
      .cache()
)
df.count()   # materialise cache

gbt = GBTRegressor(
        labelCol="rating",
        featuresCol="features",
        maxIter=80,
        stepSize=0.1,
        maxDepth=4,
        seed=42
     )

with mlflow.start_run() as run:
    mlflow.spark.autolog()
    #mlflow.pyspark.ml.autolog()
    model = gbt.fit(df)
    experiment_metadata = dict(mlflow.get_experiment_by_name(experiment_name))



In [0]:
# find the best model - refer to a specific experiment
import json
experiment_metadata=dict(mlflow.get_experiment_by_name(experiment_name))
print(json.dumps(experiment_metadata,indent=4))

# find the best model - metadata unique ID
experiment_id=experiment_metadata['experiment_id']

# find the best model - search runs in the experiment by various metics
mlflow_model = mlflow.search_runs([experiment_id]
                                #, filter_string='tags.project = "my_tag"'
                                #, order_by = ['metrics.f1_score DESC']).iloc[0] # best F1
                                , order_by = ['attributes.start_time DESC']).iloc[0] # just get the latest run

# show the model details
mlflow_model

In [0]:
registered_model = mlflow.register_model(
  f"runs:/{mlflow_model.run_id}/model", model_registry_name
)

## RDD VERSION

In [0]:
from pyspark.mllib.tree import GradientBoostedTrees

In [0]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
#from pyspark.ml.classification import BoostingStrategy

# Convert the DataFrame to an RDD of LabeledPoint
data = spark_df.rdd.map(lambda row: LabeledPoint(row['rating'], Vectors.dense(row['embeddings'])))
print("Training Partitions:", data.getNumPartitions())

# Set the boosting strategy
#boosting_strategy = BoostingStrategy.defaultParams('classification')
#boosting_strategy.numIterations = 10  # Number of iterations

# Train the Gradient Boosted Trees model
#rating_model = GradientBoostedTrees.train(data, boosting_strategy)
rating_model = GradientBoostedTrees.trainClassifier(data, {}, numIterations=10)

In [0]:

rating_model = GradientBoostedTrees.trainClassifier(spark_df, "rating", "embeddings", {}, numIterations=10)

In [0]:
# Takes up to 1 hour to run training against 18859 records
#X = pandas_df['embeddings'].apply(lambda x: np.array(x)).tolist()
#y = pandas_df['rating']

#X_train, X_test, y_train, y_test = train_test_split(X, y, stratify=y)

# Log the model
#with mlflow.start_run():
#    mlflow.sklearn.autolog()  # use auto-log to ensure signature gets logged
#    #mlflow.sklearn.log_model(rating_model, "rating_model")
#    rating_model = GradientBoostingClassifier()
#    rating_model.fit(X_train, y_train)

#print("Rating prediction results:")
#print(classification_report(y_test, rating_model.predict(X_test)))

## Get the MLflow model ID and register it in Unity Catalog

In [0]:
# find the best model - refer to a specific experiment
import json
experiment_metadata=dict(mlflow.get_experiment_by_name(experiment_name))
print(json.dumps(experiment_metadata,indent=4))

In [0]:
# find the best model - metadata unique ID
experiment_id=experiment_metadata['experiment_id']

In [0]:
# find the best model - search runs in the experiment by various metics
mlflow_model = mlflow.search_runs([experiment_id]
                                #, filter_string='tags.project = "my_tag"'
                                #, order_by = ['metrics.f1_score DESC']).iloc[0] # best F1
                                , order_by = ['attributes.start_time DESC']).iloc[0] # just get the latest run

# show the model details
mlflow_model

In [0]:
registered_model = mlflow.register_model(
  f"runs:/{mlflow_model.run_id}/model", model_registry_name
)

In [0]:
from mlflow import MlflowClient

client = MlflowClient()

# Set an alias for a specific model version
registered_model_name = registered_model.name
alias_name = "live"

client.set_registered_model_alias(registered_model_name, alias_name, registered_model.version)

# PART 4 - Use the Model to predict Rating

uses an alias "live" to determine which model version is picked up

In [0]:
# find the model to load

model_version_uri = f"models:/{catalog}.{database}.rating_model@live"

# load the model
model = mlflow.sklearn.load_model(model_uri=model_version_uri)

In [0]:
import mlflow.pyfunc
pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_version_uri)

In [0]:
# load data into a spark DF for cluster processing - single node
spark_df = spark.sql(f"SELECT * FROM {catalog}.{database}.{features_table} WHERE id > 18859").repartition(4)

In [0]:
# Add the embeddings as a new column to spark_df, using the embedding UDF
spark_df = spark_df.withColumn("embeddings", generate_embeddings(spark_df["review"]))
display(spark_df)

In [0]:
#convert to Pandas DF, predict against X, use y to check accuracy
pandas_df = spark_df.toPandas()
X = pandas_df['embeddings'].apply(lambda x: np.array(x)).tolist()
y = pandas_df['rating']

In [0]:
predictions = model.predict(X)

In [0]:
review_cl_predictions = pd.DataFrame({
    'id': pandas_df['id'],
    'predictions': predictions,
    'rating': pandas_df['rating'],
    'review': pandas_df['review']
})
display(review_cl_predictions)

In [0]:
review_cl_predictions_spark = spark.createDataFrame(review_cl_predictions)
review_cl_predictions_spark.write.format("delta").saveAsTable(f"{catalog}.{database}.review_cl_predictions")

In [0]:
%sql
SELECT COUNT(*) from users.ed_bullen.review_cl_predictions;

In [0]:
%sql
SELECT 
  p.predictions - f.rating as diff,
  COUNT(*) as count,
  ROUND(100*COUNT(*)/1100) as percent
FROM 
  users.ed_bullen.review_cl_predictions p 
JOIN 
  users.ed_bullen.review_features f 
ON 
  p.id = f.id
GROUP BY 
  p.predictions - f.rating
ORDER BY diff;

Databricks visualization. Run in Databricks to view.

%md
## Conclusion

Results for XGBoost + HuggingFace SiEBERT with `sentiment-roberta-large-english`:

  
+ 71% of the predicted ratings have the same rating prediction as was given by the Amazon customer
+ 93% are within +/- 1 rating of the Amazon customer review.

