## Installing Transformer Package:

In [69]:
!pip install -q transformers

StatementMeta(, c67cfaa1-6b52-4905-8b2f-4cd934d5922a, 73, Finished, Available, Finished)

## Importing Required Libraries:

In [70]:
from pyspark.sql.functions import * 
import pandas as pd 

StatementMeta(, c67cfaa1-6b52-4905-8b2f-4cd934d5922a, 74, Finished, Available, Finished)

## Ingesting Amazon Reviews Data From Lakehouse File:

In [71]:

reviews_df = spark.read.csv("Files/amazon_reviews_products.csv",header = True,inferSchema = True)

reviews_df = reviews_df.drop('asins','categories','imageURLs','keys','reviews.didPurchase','reviews.doRecommend','reviews.sourceURLs','reviews.id','reviews.title','sourceURLs')


StatementMeta(, c67cfaa1-6b52-4905-8b2f-4cd934d5922a, 75, Finished, Available, Finished)

### Standardizing Column Names:

In [72]:

cols_lst = reviews_df.columns

for col in cols_lst : 
    old_col = col 
    if '.' in old_col : 
        new_col = old_col.replace('.','').lower() 
    else: 
        new_col = old_col.lower() 
    reviews_df = reviews_df.withColumnRenamed(old_col,new_col) 


StatementMeta(, c67cfaa1-6b52-4905-8b2f-4cd934d5922a, 76, Finished, Available, Finished)

## Removing "By" text from username column (Data Cleaning) 

In [73]:
reviews_df = reviews_df.withColumn('reviewsusername',initcap(expr("case when reviewsusername like 'By%' then replace(reviewsusername,'By','') else reviewsusername end")) )
reviews_df = reviews_df.withColumn('brand',initcap('brand'))

StatementMeta(, c67cfaa1-6b52-4905-8b2f-4cd934d5922a, 77, Finished, Available, Finished)

## Importing Hugging Face Sentiment Analysis Transformer Model: 

In [74]:
from transformers import pipeline

# Load Hugging Face sentiment analysis pipeline
sentiment_pipeline = pipeline("sentiment-analysis")

StatementMeta(, c67cfaa1-6b52-4905-8b2f-4cd934d5922a, 78, Finished, Available, Finished)

No model was supplied, defaulted to distilbert-base-uncased-finetuned-sst-2-english and revision af0f99b (https://huggingface.co/distilbert-base-uncased-finetuned-sst-2-english).
Using a pipeline without specifying a model name and revision in production is not recommended.


### Deriving Sentiments From Customer Reviews :

In [75]:
reviews_txt_lst = reviews_df.select("reviewstext").rdd.flatMap(lambda x: x).collect()
results = sentiment_pipeline(reviews_txt_lst)
sentiments_df = spark.createDataFrame(results)

StatementMeta(, c67cfaa1-6b52-4905-8b2f-4cd934d5922a, 79, Finished, Available, Finished)

### Merging Derived Sentiment Dataframe With Main Reviews Dataframe:

In [76]:

sent_df = sentiments_df.toPandas()
rev_df = reviews_df.toPandas() 

res_df = pd.concat([rev_df,sent_df],axis = 1)

final_df = spark.createDataFrame(res_df) 
final_df = final_df.withColumn('postive_reviews',expr("case when label = 'POSITIVE' THEN 1 ELSE 0 END")) \
.withColumn('negative_reviews',expr("case when label = 'NEGATIVE' then 1 else 0 end"))

# Displaying Data For Troubleshooting: 
display(final_df)

StatementMeta(, c67cfaa1-6b52-4905-8b2f-4cd934d5922a, 80, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 5734f0ed-89b5-4c72-9187-09a0e5daecfa)

## Loading Sentiments By Product Categories Data To Delta Table:

In [81]:
report_df = final_df.groupBy('primarycategories','manufacturer') \
.agg(sum('postive_reviews').alias('total_positive_reviews'),sum('negative_reviews').alias('total_negative_reviews')  ,
max('reviewsrating').alias('max_product_rating'),
min('reviewsrating').alias('min_product_rating')  ) \
.orderBy(desc('total_positive_reviews'))

# Creating Delta Table : 
report_df.write.mode("overwrite").saveAsTable("amazon_report_t") 

StatementMeta(, c67cfaa1-6b52-4905-8b2f-4cd934d5922a, 85, Finished, Available, Finished)