# Importing Necessary Libraries 

In [37]:
from pyspark.sql.functions import  when, col, round, row_number, to_date, to_timestamp, regexp_replace, lit, udf, explode, split, count, regexp_extract
from pyspark.sql.types import LongType, IntegerType, ArrayType, BooleanType, StructType, StructField, TimestampType, DoubleType, DateType, StringType, FloatType
from pyspark.sql.functions import countDistinct, count, sum, lower
from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from nltk import pos_tag, word_tokenize
from nltk.stem import WordNetLemmatizer
from nltk.corpus import wordnet

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 39, Finished, Available)

# Formatting & Cleaning `Consumer Tweet`

In [26]:
schema = StructType([
    StructField("tweet_id", StringType(), True),
    StructField("author_id", LongType(), True),
    StructField("screen_name", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("text", StringType(), True),
    StructField("brand_name", StringType(), True),
    StructField("in_reply_to_user_id", LongType(), True),
    StructField("lang", StringType(), True),
    StructField("impression_count", IntegerType(), True),
    StructField("like_count", IntegerType(), True),
    StructField("reply_count", IntegerType(), True),
    StructField("repost_count", IntegerType(), True),
    StructField("quote_count", IntegerType(), True),
    StructField("hashtags", StringType(), True),
    StructField("user_followers_count", IntegerType(), True),
    StructField("user_following_count", IntegerType(), True),
    StructField("verified", BooleanType(), False)
])

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 28, Finished, Available)

# Loading the Data

In [27]:
df_consumer = spark.read.format("csv") \
    .option("header", "true") \
    .schema(schema) \
    .load("Files/external_data/consumer_tweets.csv")

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 29, Finished, Available)

In [28]:
display(df_consumer)

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 30, Finished, Available)

SynapseWidget(Synapse.DataFrame, b702ae27-3ce3-4f02-b318-1559bf1455fa)

In [29]:
df_consumer.printSchema()

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 31, Finished, Available)

root
 |-- tweet_id: string (nullable = true)
 |-- author_id: long (nullable = true)
 |-- screen_name: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- text: string (nullable = true)
 |-- brand_name: string (nullable = true)
 |-- in_reply_to_user_id: long (nullable = true)
 |-- lang: string (nullable = true)
 |-- impression_count: integer (nullable = true)
 |-- like_count: integer (nullable = true)
 |-- reply_count: integer (nullable = true)
 |-- repost_count: integer (nullable = true)
 |-- quote_count: integer (nullable = true)
 |-- hashtags: string (nullable = true)
 |-- user_followers_count: integer (nullable = true)
 |-- user_following_count: integer (nullable = true)
 |-- verified: boolean (nullable = true)



In [30]:
df_consumer = df_consumer.withColumn(
    "timestamp", regexp_replace(col("timestamp"), "Z$", "")
)

df_consumer = df_consumer.withColumn(
    "timestamp", to_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss")
)


StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 32, Finished, Available)

In [31]:
df_consumer.printSchema()

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 33, Finished, Available)

root
 |-- tweet_id: string (nullable = true)
 |-- author_id: long (nullable = true)
 |-- screen_name: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- text: string (nullable = true)
 |-- brand_name: string (nullable = true)
 |-- in_reply_to_user_id: long (nullable = true)
 |-- lang: string (nullable = true)
 |-- impression_count: integer (nullable = true)
 |-- like_count: integer (nullable = true)
 |-- reply_count: integer (nullable = true)
 |-- repost_count: integer (nullable = true)
 |-- quote_count: integer (nullable = true)
 |-- hashtags: string (nullable = true)
 |-- user_followers_count: integer (nullable = true)
 |-- user_following_count: integer (nullable = true)
 |-- verified: boolean (nullable = true)



# Checking Duplicates and NULL values

In [32]:
total_rows = df_consumer.count()

distinct_rows = df_consumer.distinct().count()

# Check for duplicates
duplicate_rows = total_rows - distinct_rows

print(f"Total Rows: {total_rows}")
print(f"Distinct Rows: {distinct_rows}")
print(f"Duplicate Rows: {duplicate_rows}")


# Count the number of null values in each column
null_counts = df_consumer.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_consumer.columns])

display(null_counts)

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 34, Finished, Available)

Total Rows: 10000
Distinct Rows: 10000
Duplicate Rows: 0


SynapseWidget(Synapse.DataFrame, 427d0290-865f-49a6-ab91-ec90220ed8d4)

# Remove Random Noise

Identify and remove common random noise patterns, such as " #RandomTweet", " lol", etc. This can be done using regular expressions.

In [33]:
# Define the noise patterns to remove
noise_patterns = ["#RandomTweet", "#JustSaying", "lol", "😂", "#Life", "#FoodForThought"]

# Function to remove noisy hashtags
def remove_noise(text):
    for pattern in noise_patterns:
        text = text.replace(pattern, "")
    return text.strip()

# Register UDF
remove_noise_udf = udf(remove_noise, StringType())
# Apply the noise removal UDF to the 'text' column
df_consumer = df_consumer.withColumn("text", remove_noise_udf(col("text")))
# Show the DataFrame with the cleaned text
df_consumer.select("text").show(truncate=False)

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 35, Finished, Available)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|text                                                                                                                                                                            |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|¿Alguien más piensa que los auriculares Sony tienen el mejor bajo? Es como estar en un concierto en vivo. #BassHeads #MusicLover  #soundquality                                 |
|Just got a pair of Skullcandy for my gaming setup. The spatial audio is unreal! #gaming #tech #Bluetooth #noiseCancellation                                                     |
|Considering switching to Anker for their waterproof features. Anyone with experience? #waterproof #tech 

# Removing Hashtags and checking if there's any noise in the `text` column

The `text` column doesn't contain any noise, if there were noise, we could have apply lemmatinzation to remove the noise

In [34]:
# Define a regular expression to match hashtags
hashtag_regex = r'#\w+'

# Remove hashtags from the text column
df_consumer = df_consumer.withColumn("text_cleaned", regexp_replace(col("text"), hashtag_regex, ''))

df_consumer.select("text", "text_cleaned").show(10, truncate=False)

df_consumer = df_consumer.drop("text").withColumnRenamed("text_cleaned", "text")

df_consumer.printSchema()
df_consumer.show(10, truncate=False)

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 36, Finished, Available)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------+
|text                                                                                                                                                       |text_cleaned                                                                                                         |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------+
|¿Alguien más piensa que los auriculares Sony tienen el mejor bajo? Es como estar en un concierto en vivo. #BassHeads #MusicLover  #soundquality            |¿Alguien más pi

In [35]:
# Convert NULL values to -1 and create 'is_reply' column in a single transformation
df_consumer = df_consumer.withColumn(
    'in_reply_to_user_id',
    when(col('in_reply_to_user_id').isNull(), -1).otherwise(col('in_reply_to_user_id'))
).withColumn(
    'is_reply',
    when(col('in_reply_to_user_id') != -1, True).otherwise(False)
)

df_consumer.show()

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 37, Finished, Available)

+--------------------+----------+--------------+-------------------+--------------+-------------------+----+----------------+----------+-----------+------------+-----------+--------------------+--------------------+--------------------+--------+--------------------+--------+
|            tweet_id| author_id|   screen_name|          timestamp|    brand_name|in_reply_to_user_id|lang|impression_count|like_count|reply_count|repost_count|quote_count|            hashtags|user_followers_count|user_following_count|verified|                text|is_reply|
+--------------------+----------+--------------+-------------------+--------------+-------------------+----+----------------+----------+-----------+------------+-----------+--------------------+--------------------+--------------------+--------+--------------------+--------+
|5bb2f92e-d03d-441...|4098528015|  bennettdylan|2023-03-24 12:38:27|          Sony|         4098528015|  en|            7891|       162|         51|         100|         33

In [38]:
# Convert the brand_name column to lowercase
df_consumer = df_consumer.withColumn("brand_name", lower(df_consumer["brand_name"]))

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 40, Finished, Available)

In [39]:
display(df_consumer)

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 41, Finished, Available)

SynapseWidget(Synapse.DataFrame, 359c5a83-8616-45a9-bc96-95585c2523d1)

In [36]:
df_consumer.write.format("delta").mode("overwrite").saveAsTable("consumer_tweet")

StatementMeta(, 93038063-3c16-42f5-aac3-0eb8c3b2607f, 38, Finished, Available)

# Formatting `User Tweet`

In [40]:
user_tweet_schema = StructType([
    StructField("id", LongType(), True),
    StructField("name", StringType(), True),
    StructField("username", StringType(), True),
    StructField("created_at", StringType(), True),  
    StructField("url", StringType(), True),
    StructField("followers_count", IntegerType(), True),
    StructField("record_date", StringType(), True), 
    StructField("friends_count", IntegerType(), True),
    StructField("verified", BooleanType(), False)
])

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 42, Finished, Available)

In [41]:
df_user_tweet = spark.read.format("csv") \
    .option("header", "true") \
    .schema(user_tweet_schema) \
    .load("Files/external_data/user_data.csv")

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 43, Finished, Available)

In [42]:
display(df_user_tweet)

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 44, Finished, Available)

SynapseWidget(Synapse.DataFrame, 5be34f62-2094-4569-98a7-430b7050dbd5)

In [43]:
df_user_tweet.printSchema()

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 45, Finished, Available)

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- username: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- url: string (nullable = true)
 |-- followers_count: integer (nullable = true)
 |-- record_date: string (nullable = true)
 |-- friends_count: integer (nullable = true)
 |-- verified: boolean (nullable = true)



# Checking Duplicate values

In [44]:
total_rows = df_user_tweet.count()

distinct_rows = df_user_tweet.distinct().count()

# Check for duplicates
duplicate_rows = total_rows - distinct_rows

print(f"Total Rows: {total_rows}")
print(f"Distinct Rows: {distinct_rows}")
print(f"Duplicate Rows: {duplicate_rows}")

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 46, Finished, Available)

Total Rows: 3393
Distinct Rows: 3393
Duplicate Rows: 0


- Converting id to long dtype and removing -, name, username, url to string, followers_count, friends_count to integer, created_at to datetime, and verified to boolean

In [45]:
df_user_tweet = df_user_tweet.withColumn("id", regexp_replace(col("id"), "^-", ""))
df_user_tweet = df_user_tweet.withColumn(
    "created_at", to_date(col("created_at"), "yyyy-MM-dd")
)

df_user_tweet = df_user_tweet.withColumn(
    "record_date", to_date(col("record_date"), "yyyy-MM-dd")
)

df_user_tweet.printSchema()


StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 47, Finished, Available)

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- username: string (nullable = true)
 |-- created_at: date (nullable = true)
 |-- url: string (nullable = true)
 |-- followers_count: integer (nullable = true)
 |-- record_date: date (nullable = true)
 |-- friends_count: integer (nullable = true)
 |-- verified: boolean (nullable = true)



In [46]:
# Validate data before writing
print("Data before writing:")
df_user_tweet.filter(df_user_tweet.verified == True).show()

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 48, Finished, Available)

Data before writing:
+-------------------+----+--------+----------+--------------------+---------------+-----------+-------------+--------+
|                 id|name|username|created_at|                 url|followers_count|record_date|friends_count|verified|
+-------------------+----+--------+----------+--------------------+---------------+-----------+-------------+--------+
|3287742300390049885| JBL|JBLaudio|2013-05-30|https://twitter.c...|         784877| 2020-01-05|           63|    true|
|3287742300390049885| JBL|JBLaudio|2013-05-30|https://twitter.c...|         786888| 2020-01-12|           63|    true|
|3287742300390049885| JBL|JBLaudio|2013-05-30|https://twitter.c...|         789965| 2020-01-19|           63|    true|
|3287742300390049885| JBL|JBLaudio|2013-05-30|https://twitter.c...|         790816| 2020-01-26|           63|    true|
|3287742300390049885| JBL|JBLaudio|2013-05-30|https://twitter.c...|         794731| 2020-02-02|           63|    true|
|3287742300390049885| JBL|J

In [50]:
# Convert the brand_name column to lowercase
df_user_tweet = df_user_tweet.withColumn("name", lower(df_user_tweet["name"]))
df_user_tweet = df_user_tweet.withColumn("username", lower(df_user_tweet["username"]))

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 52, Finished, Available)

In [51]:
display(df_user_tweet)

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 53, Finished, Available)

SynapseWidget(Synapse.DataFrame, a8a69fcf-bfee-4270-acf3-8530ee8ff42c)

In [46]:
# Write the DataFrame to the Delta table
df_user_tweet.write.format("delta").mode("overwrite").saveAsTable("user_tweet_data")

StatementMeta(, 93038063-3c16-42f5-aac3-0eb8c3b2607f, 48, Finished, Available)

In [48]:
print("Data after writing:")
spark.sql("SELECT * FROM user_tweet_data WHERE verified = true").show()

StatementMeta(, 93038063-3c16-42f5-aac3-0eb8c3b2607f, 50, Finished, Available)

Data after writing:
+-------------------+--------+-------------+----------+--------------------+---------------+-----------+-------------+--------+
|                 id|    name|     username|created_at|                 url|followers_count|record_date|friends_count|verified|
+-------------------+--------+-------------+----------+--------------------+---------------+-----------+-------------+--------+
|5258022259168678051|Logitech|Logitechaudio|2012-05-11|https://twitter.c...|         168214| 2020-01-05|           81|    true|
|5258022259168678051|Logitech|Logitechaudio|2012-05-11|https://twitter.c...|         170847| 2020-01-12|           81|    true|
|5258022259168678051|Logitech|Logitechaudio|2012-05-11|https://twitter.c...|         171290| 2020-01-19|           81|    true|
|5258022259168678051|Logitech|Logitechaudio|2012-05-11|https://twitter.c...|         173419| 2020-01-26|           81|    true|
|5258022259168678051|Logitech|Logitechaudio|2012-05-11|https://twitter.c...|        

# Formatting `Facebook Data`


In [10]:
facebook_data_schema = StructType([
    StructField("id", StringType(), True),
    StructField("contact_address", StringType(), True),
    StructField("current_location", StringType(), True),
    StructField("emails", StringType(), True),
    StructField("fan_count", IntegerType(), True),
    StructField("followers_count", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("username", StringType(), True),
    StructField("post_id", StringType(), True),
    StructField("created_time9", StringType(), True),
    StructField("message", StringType(), True),
    StructField("shares", IntegerType(), True),
    StructField("likes", IntegerType(), True),
    StructField("reactions", IntegerType(), True),
    StructField("comment_id", StringType(), True),
    StructField("created_time15", StringType(), True), 
    StructField("from", StringType(), True),
    StructField("like_count", IntegerType(), True),
    StructField("comment", StringType(), True),
    StructField("verified", BooleanType(), True)
])


StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 12, Finished, Available)

In [11]:
df_facebook_data = spark.read.format("csv") \
    .option("header", "true") \
    .schema(facebook_data_schema) \
    .load("Files/external_data/facebook_data.csv")

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 13, Finished, Available)

In [12]:
df_facebook_data.printSchema()

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 14, Finished, Available)

root
 |-- id: string (nullable = true)
 |-- contact_address: string (nullable = true)
 |-- current_location: string (nullable = true)
 |-- emails: string (nullable = true)
 |-- fan_count: integer (nullable = true)
 |-- followers_count: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- username: string (nullable = true)
 |-- post_id: string (nullable = true)
 |-- created_time9: string (nullable = true)
 |-- message: string (nullable = true)
 |-- shares: integer (nullable = true)
 |-- likes: integer (nullable = true)
 |-- reactions: integer (nullable = true)
 |-- comment_id: string (nullable = true)
 |-- created_time15: string (nullable = true)
 |-- from: string (nullable = true)
 |-- like_count: integer (nullable = true)
 |-- comment: string (nullable = true)
 |-- verified: boolean (nullable = true)



# Converting the features to right format

In [13]:
# Convert emails from string to array<string>
df_facebook_data = df_facebook_data.withColumn(
    "emails", split(col("emails"), ",")
)

# Remove trailing 'Z' and convert to timestamp for created_time9 and created_time15
df_facebook_data = df_facebook_data.withColumn(
    "created_time9", regexp_replace(col("created_time9"), "Z$", "")
).withColumn(
    "created_time9", to_timestamp(col("created_time9"), "yyyy-MM-dd'T'HH:mm:ss.SSS")
)

df_facebook_data = df_facebook_data.withColumn(
    "created_time15", regexp_replace(col("created_time15"), "Z$", "")
).withColumn(
    "created_time15", to_timestamp(col("created_time15"), "yyyy-MM-dd'T'HH:mm:ss.SSS")
)

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 15, Finished, Available)

In [14]:
display(df_facebook_data)

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 16, Finished, Available)

SynapseWidget(Synapse.DataFrame, 9af11050-dda9-4733-aacb-61fb14bb1caf)

In [96]:
# Write the DataFrame to the Delta table
df_facebook_data.write.format("delta").mode("overwrite").saveAsTable("facebook_data")

StatementMeta(, 93038063-3c16-42f5-aac3-0eb8c3b2607f, 98, Finished, Available)

# Cleaning CNET `Expert Review` Data

In [None]:
df_expert_review = spark.read.option("multiline", "true").json("Files/external_data/expert_reviews.json")

display(df_expert_review)

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, da861c7c-5134-4d03-a8bf-9710c30719c2)

In [None]:
df_expert_review.printSchema()

StatementMeta(, , , Waiting, )

root
 |-- Cons: string (nullable = true)
 |-- Product Description: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Pros: string (nullable = true)
 |-- Rating: string (nullable = true)



Converting Rating feature to integer

In [None]:
df_expert_review = df_expert_review.withColumn("Rating", col("Rating").cast("float"))

StatementMeta(, , , Waiting, )

In [None]:
all_columns = ["Cons", "Product Description", "Product Name", "Pros", "Rating"]


duplicates = df_expert_review.groupBy(all_columns).agg(count("*").alias("count")).filter("count > 1")
duplicates.show(50)

StatementMeta(, , , Waiting, )

+--------------------+--------------------+--------------------+--------------------+------+-----+
|                Cons| Product Description|        Product Name|                Pros|Rating|count|
+--------------------+--------------------+--------------------+--------------------+------+-----+
|                    |Take one look at ...|Apple AirPods 3rd...|                    |   8.4|    2|
|Voice-calling suf...|The well-designed...|      JBL Live 770NC|Well-designed and...|  null|    2|
|Missing: wireless...|Alas, for those o...|Beats Studio Buds...|Lightweight and d...|   8.2|    2|
|                    |1More has a sligh...|   1More ComfoBuds 2|                    |  null|    2|
|Higher price tag ...|When you have a p...|     Sony WH-1000XM5|More refined soun...|   9.2|    2|
|Expensive, Look m...|The Bose Ultra Op...|Bose Ultra Open E...|Innovative clip-o...|   8.0|    2|
|                    |The LinkBuds are,...|       Sony LinkBuds|                    |  null|    3|
|         

Changing the feature name

In [None]:
df_expert_review = df_expert_review.withColumnRenamed("Product Description", "Review")
df_expert_review = df_expert_review.withColumnRenamed("Product Name", "product_name")
df_expert_review = df_expert_review.withColumnRenamed("Rating", "rating")
df_expert_review = df_expert_review.withColumnRenamed("Pros", "pros")
df_expert_review = df_expert_review.withColumnRenamed("Cons", "cons")
df_expert_review = df_expert_review.select("product_name", "review", "rating", "pros", "cons")

StatementMeta(, , , Waiting, )

In [None]:
window = Window.partitionBy("product_name").orderBy(col("product_name"))
df_with_row_num = df_expert_review.withColumn("row_num", row_number().over(window))
df_expert_review = df_with_row_num.filter(col("row_num") == 1).drop("row_num")

display(df_expert_review)

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, 312487fb-3e0e-4a59-8f6b-e6f2d2dbf33a)

# Applying Regression model to impute the missing values for `Rating` feature

### Feature Extraction from Text
To enhance our dataset and make it more informative for modeling, we extracted several features from the product reviews:

1. Sentiment Scores: We calculated the overall sentiment of each review to understand the emotional tone, which can often correlate with the numerical rating. A positive sentiment might indicate a higher rating and vice versa.

2. TF-IDF Values: We transformed the text into a numerical representation using TF-IDF (Term Frequency-Inverse Document Frequency), which helps highlight the most relevant words in each review. This technique diminishes the weight of commonly used words and boosts the importance of words that are significant to specific reviews.

3. Review Length: We also considered the length of each review, as more detailed reviews might provide deeper insights into the user’s experience and possibly their rating.

In [None]:
from pyspark.sql.functions import col, udf, size, split, round
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, VectorAssembler
from pyspark.ml.linalg import DenseVector
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from nltk.sentiment.vader import SentimentIntensityAnalyzer
import nltk
from pyspark.ml.linalg import DenseVector, VectorUDT
nltk.download('vader_lexicon')

StatementMeta(, , , Waiting, )

[nltk_data] Downloading package vader_lexicon to /home/trusted-
[nltk_data]     service-user/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


True

In [None]:
vader_analyzer = SentimentIntensityAnalyzer()

def get_vader_sentiment(review):
    return vader_analyzer.polarity_scores(review)['compound']

# Register UDF for sentiment polarity
sentiment_udf = udf(get_vader_sentiment, DoubleType())

# Add sentiment polarity column
df_expert_review = df_expert_review.withColumn("SentimentPolarity", sentiment_udf(col("review")))

# Add review length column
df_expert_review = df_expert_review.withColumn("ReviewLength", size(split(col("review"), " ")))

# Tokenize text
tokenizer = Tokenizer(inputCol="review", outputCol="Words")
df_expert_review = tokenizer.transform(df_expert_review)

# Remove stop words
remover = StopWordsRemover(inputCol="Words", outputCol="FilteredWords")
df_expert_review = remover.transform(df_expert_review)

# Compute TF-IDF
hashingTF = HashingTF(inputCol="FilteredWords", outputCol="RawFeatures", numFeatures=100)
tf_df = hashingTF.transform(df_expert_review)

idf = IDF(inputCol="RawFeatures", outputCol="TFIDFFeatures")
idf_model = idf.fit(tf_df)
tfidf_df = idf_model.transform(tf_df)

def sparse_to_dense(vector):
    return DenseVector(vector.toArray())

dense_udf = udf(sparse_to_dense, VectorUDT())
tfidf_df = tfidf_df.withColumn("TFIDFFeaturesDense", dense_udf(col("TFIDFFeatures")))

result_df = tfidf_df.select("product_name", "review", "rating", "ReviewLength", "SentimentPolarity", "TFIDFFeaturesDense")

display(result_df)

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, 27a173d5-1b67-4ad5-814c-61419ec1daa6)

### Preparing the Dataset

1. We separated the data into two groups: one with known ratings (which we used for training our model) and another with missing ratings (which we aimed to predict).

2. This separation allowed us to cleanly apply predictive modeling techniques without the complication of missing target values during training.

In [None]:
# Separate the data into two groups
known_ratings_df = result_df.filter(result_df["rating"].isNotNull())
missing_ratings_df = result_df.filter(result_df["rating"].isNull())

print("Count of known ratings: ", known_ratings_df.count())
print("Count of missing ratings: ", missing_ratings_df.count())

StatementMeta(, , , Waiting, )

Count of known ratings:  22
Count of missing ratings:  119


In [None]:
feature_columns = ["ReviewLength", "SentimentPolarity", "TFIDFFeaturesDense"]

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
known_ratings_df = assembler.transform(known_ratings_df)

known_ratings_df = known_ratings_df.withColumnRenamed("rating", "label")

# Split the data into training and testing sets
train_df, test_df = known_ratings_df.randomSplit([0.8, 0.2], seed=42)

lr = LinearRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_df)

# Predict on the test set
predictions = lr_model.transform(test_df)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
mse = evaluator.evaluate(predictions, {evaluator.metricName: "mse"})

# Cross-validation
paramGrid = ParamGridBuilder().build()
crossval = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cv_model = crossval.fit(known_ratings_df)
cv_predictions = cv_model.transform(known_ratings_df)

# Evaluate cross-validation model
cv_rmse = evaluator.evaluate(cv_predictions)
cv_mse = evaluator.evaluate(cv_predictions, {evaluator.metricName: "mse"})

print(f"Mean Squared Error (MSE) on test data: {mse}")
print(f"Root Mean Squared Error (RMSE) on test data: {rmse}")
print(f"Cross-Validated RMSE: {cv_rmse}")

# Make predictions on the dataset with missing ratings
missing_ratings_df = assembler.transform(missing_ratings_df)
missing_predictions = lr_model.transform(missing_ratings_df)

missing_predictions.select("product_name", "review", "prediction").show(truncate=False)


StatementMeta(, , , Waiting, )











Mean Squared Error (MSE) on test data: 4.225367792329095e-15
Root Mean Squared Error (RMSE) on test data: 6.500282911019408e-08
Cross-Validated RMSE: 1.773154201254615e-08


+-----------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
display(missing_predictions)

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, 5402d7da-8667-4fa5-89b4-a1272fef46c1)

In [None]:
from pyspark.sql.functions import coalesce

# Round the predictions to one decimal place
missing_predictions = missing_predictions.withColumn("prediction", round(col("prediction"), 1))

missing_predictions = missing_predictions.select("product_name", "review", "prediction")

# Join the predicted ratings back to the original dataframe
final_df = df_expert_review.join(missing_predictions, on=["product_name", "review"], how="left")

# Impute the missing ratings with predictions
final_df = final_df.withColumn("rating", coalesce(col("rating"), col("prediction")))

# Round the final rating column to one decimal place
final_df = final_df.withColumn("rating", round(col("rating"), 1))

final_df = final_df.select("product_name", "review", "pros", "cons", "rating")

display(final_df)


StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, d50b3150-2836-46b8-8406-ca25a2d2ba95)

In [None]:
from pyspark.sql.functions import lit, when, col

# Impute missing or empty values in Pros and Cons with placeholders
final_df = final_df.withColumn("pros", when((col("pros").isNull()) | (col("pros") == ""), lit("No Data")).otherwise(col("pros")))
final_df = final_df.withColumn("cons", when((col("cons").isNull()) | (col("cons") == ""), lit("No Data")).otherwise(col("cons")))

# Show the final dataframe after imputation
display(final_df)

StatementMeta(, , , Waiting, )

SynapseWidget(Synapse.DataFrame, af72d912-cf17-47cf-bc31-3d87defbb852)

In [None]:
# Function to calculate null values for each column
def count_nulls(df):
    null_counts = df.select([sum(col(c).isNull().cast("float")).alias(c) for c in df.columns])
    return null_counts

# Count null values in each column
null_counts_df = count_nulls(final_df)

# Show the null counts
null_counts_df.show()

StatementMeta(, , , Waiting, )

+------------+------+----+----+------+
|product_name|review|pros|cons|rating|
+------------+------+----+----+------+
|         0.0|   0.0| 0.0| 0.0|   0.0|
+------------+------+----+----+------+



Now, we are going to manually download the csv file from fabric notebook and run Llama and Grok in this dataset to find out the strengths and weaknesses of a product, brand, and categorize the product its type. 

The attached csv file contains the updated dataset and it includes additional features like strengths, weaknesses, brand_name, product_type

In [15]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("cnet_reviews_llm").getOrCreate()

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 17, Finished, Available)

In [16]:
schema = StructType([
    StructField("product_name", StringType(), True),
    StructField("review", StringType(), True),
    StructField("pros", StringType(), True),
    StructField("cons", StringType(), True),
    StructField("rating", FloatType(), True),
    StructField("strengths", StringType(), True),
    StructField("weaknesses", StringType(), True),
    StructField("product_type", StringType(), True),
    StructField("brand_name", StringType(), True)
])

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 18, Finished, Available)

In [17]:
df_cnet_llm = spark.read.format("csv").option("header","true").load("Files/external_data/expert_review_cnet.csv")
display(df_cnet_llm)

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 19, Finished, Available)

SynapseWidget(Synapse.DataFrame, 1924217b-dbd0-4ade-8430-b549378a6164)

In [18]:
df_cnet_llm.printSchema()

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 20, Finished, Available)

root
 |-- product_name: string (nullable = true)
 |-- review: string (nullable = true)
 |-- pros: string (nullable = true)
 |-- cons: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- strengths: string (nullable = true)
 |-- weaknesses: string (nullable = true)
 |-- product_type: string (nullable = true)
 |-- brand_name: string (nullable = true)



In [19]:
df_cnet_llm = df_cnet_llm.withColumn("rating", col("rating").cast("float"))

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 21, Finished, Available)

In [20]:
df_cnet_llm.printSchema()

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 22, Finished, Available)

root
 |-- product_name: string (nullable = true)
 |-- review: string (nullable = true)
 |-- pros: string (nullable = true)
 |-- cons: string (nullable = true)
 |-- rating: float (nullable = true)
 |-- strengths: string (nullable = true)
 |-- weaknesses: string (nullable = true)
 |-- product_type: string (nullable = true)
 |-- brand_name: string (nullable = true)



In [22]:
# Function to calculate null values for each column
def count_nulls(df):
    null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
    return null_counts

# Count null values in each column
null_counts_df = count_nulls(df_cnet_llm)

# Show the null counts
null_counts_df.show()

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 24, Finished, Available)

+------------+------+----+----+------+---------+----------+------------+----------+
|product_name|review|pros|cons|rating|strengths|weaknesses|product_type|brand_name|
+------------+------+----+----+------+---------+----------+------------+----------+
|           0|     0|   0|   0|    22|        0|         0|           0|         0|
+------------+------+----+----+------+---------+----------+------------+----------+



In [23]:
# Remove rows with null values in the rating column
df_cnet_llm_cleaned = df_cnet_llm.filter(col("rating").isNotNull())

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 25, Finished, Available)

In [24]:
display(df_cnet_llm_cleaned)

StatementMeta(, dd71df91-8cb1-477a-9b36-5092e0f5fafc, 26, Finished, Available)

SynapseWidget(Synapse.DataFrame, 6aaec58a-2c0e-4d94-b328-2db0c0194587)

In [None]:
df_cnet_llm_cleaned.write.format("delta").saveAsTable("cnet_expert_review")

StatementMeta(, fd3410f0-8900-49b3-b9d4-342f7cdd07e1, 183, Finished, Available)

In [None]:
# Function to calculate null values for each column
def count_nulls(df):
    null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
    return null_counts

# Count null values in each column
null_counts_df = count_nulls(df_cnet_llm_cleaned)

# Show the null counts
null_counts_df.show()

StatementMeta(, fd3410f0-8900-49b3-b9d4-342f7cdd07e1, 182, Finished, Available)

+------------+------+----+----+------+---------+----------+------------+----------+
|product_name|review|pros|cons|rating|strengths|weaknesses|product_type|brand_name|
+------------+------+----+----+------+---------+----------+------------+----------+
|           0|     0|   0|   0|     0|        0|         0|           0|         0|
+------------+------+----+----+------+---------+----------+------------+----------+

