In [0]:
# Loading Amazon Reviews Data
from datasets import load_dataset

# Load user reviews data
reviews_data = load_dataset("McAuley-Lab/Amazon-Reviews-2023", "raw_review_All_Beauty", trust_remote_code=True)
print(reviews_data["full"][0])  # Printing the first review

# Load product metadata
metadata = load_dataset("McAuley-Lab/Amazon-Reviews-2023", "raw_meta_All_Beauty", split="full", trust_remote_code=True)
print(metadata[0])  # Printing the first metadata entry




README.md:   0%|          | 0.00/19.7k [00:00<?, ?B/s]

Amazon-Reviews-2023.py:   0%|          | 0.00/39.6k [00:00<?, ?B/s]



All_Beauty.jsonl:   0%|          | 0.00/327M [00:00<?, ?B/s]

Generating full split: 0 examples [00:00, ? examples/s]

{'rating': 5.0, 'title': 'Such a lovely scent but not overpowering.', 'text': "This spray is really nice. It smells really good, goes on really fine, and does the trick. I will say it feels like you need a lot of it though to get the texture I want. I have a lot of hair, medium thickness. I am comparing to other brands with yucky chemicals so I'm gonna stick with this. Try it!", 'images': [], 'asin': 'B00YQ6X8EO', 'parent_asin': 'B00YQ6X8EO', 'user_id': 'AGKHLEW2SOWHNMFQIJGBECAF7INQ', 'timestamp': 1588687728923, 'helpful_vote': 0, 'verified_purchase': True}




meta_All_Beauty.jsonl:   0%|          | 0.00/213M [00:00<?, ?B/s]

Generating full split: 0 examples [00:00, ? examples/s]

{'main_category': 'All Beauty', 'title': 'Howard LC0008 Leather Conditioner, 8-Ounce (4-Pack)', 'average_rating': 4.8, 'rating_number': 10, 'features': [], 'description': [], 'price': 'None', 'images': {'hi_res': [None, 'https://m.media-amazon.com/images/I/71i77AuI9xL._SL1500_.jpg'], 'large': ['https://m.media-amazon.com/images/I/41qfjSfqNyL.jpg', 'https://m.media-amazon.com/images/I/41w2yznfuZL.jpg'], 'thumb': ['https://m.media-amazon.com/images/I/41qfjSfqNyL._SS40_.jpg', 'https://m.media-amazon.com/images/I/41w2yznfuZL._SS40_.jpg'], 'variant': ['MAIN', 'PT01']}, 'videos': {'title': [], 'url': [], 'user_id': []}, 'store': 'Howard Products', 'categories': [], 'details': '{"Package Dimensions": "7.1 x 5.5 x 3 inches; 2.38 Pounds", "UPC": "617390882781"}', 'parent_asin': 'B01CUPMQZE', 'bought_together': None, 'subtitle': None, 'author': None}


In [0]:
# Converting Data to Spark Dataframes
import pandas as pd

# Converting to Pandas DataFrame
reviews_df = pd.DataFrame(reviews_data["full"])
metadata_df = pd.DataFrame(metadata)

# Converting Pandas DataFrame to Spark DataFrame
reviews_spark_df = spark.createDataFrame(reviews_df)
metadata_spark_df = spark.createDataFrame(metadata_df)

# Displays schemas
reviews_spark_df.printSchema()
metadata_spark_df.printSchema()


root
 |-- rating: double (nullable = true)
 |-- title: string (nullable = true)
 |-- text: string (nullable = true)
 |-- images: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- attachment_type: string (nullable = true)
 |    |    |-- large_image_url: string (nullable = true)
 |    |    |-- medium_image_url: string (nullable = true)
 |    |    |-- small_image_url: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- parent_asin: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- helpful_vote: long (nullable = true)
 |-- verified_purchase: boolean (nullable = true)

root
 |-- main_category: string (nullable = true)
 |-- title: string (nullable = true)
 |-- average_rating: double (nullable = true)
 |-- rating_number: long (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- description: array (nullable = true)
 |    |-- eleme

In [0]:
# Data Exploration
# Shows data sample
reviews_spark_df.show(5)

# Checks for missing values
from pyspark.sql.functions import col, when, count

reviews_spark_df.select([count(when(col(c).isNull(), c)).alias(c) for c in reviews_spark_df.columns]).show()


+------+--------------------+--------------------+------+----------+-----------+--------------------+-------------+------------+-----------------+
|rating|               title|                text|images|      asin|parent_asin|             user_id|    timestamp|helpful_vote|verified_purchase|
+------+--------------------+--------------------+------+----------+-----------+--------------------+-------------+------------+-----------------+
|   5.0|Such a lovely sce...|This spray is rea...|    []|B00YQ6X8EO| B00YQ6X8EO|AGKHLEW2SOWHNMFQI...|1588687728923|           0|             true|
|   4.0|Works great but s...|This product does...|    []|B081TJ8YS3| B081TJ8YS3|AGKHLEW2SOWHNMFQI...|1588615855070|           1|             true|
|   5.0|                Yes!|Smells good, feel...|    []|B07PNNCSP9| B097R46CSY|AE74DYR3QUGVPZJ3P...|1589665266052|           2|             true|
|   1.0|   Synthetic feeling|      Felt synthetic|    []|B09JS339BZ| B09JS339BZ|AFQLNQNQYFWQZPJQZ...|1643393630220|   

In [0]:
# Data Cleaning
from pyspark.sql.functions import lower, regexp_replace, col

# Clean review text
reviews_cleaned = reviews_spark_df.withColumn(
    "cleaned_text",
    lower(regexp_replace(col("text"), "[^a-zA-Z0-9\s]", ""))  # Replace "reviewText" with "text"
)
reviews_cleaned.show(5)



+------+--------------------+--------------------+------+----------+-----------+--------------------+-------------+------------+-----------------+--------------------+
|rating|               title|                text|images|      asin|parent_asin|             user_id|    timestamp|helpful_vote|verified_purchase|        cleaned_text|
+------+--------------------+--------------------+------+----------+-----------+--------------------+-------------+------------+-----------------+--------------------+
|   5.0|Such a lovely sce...|This spray is rea...|    []|B00YQ6X8EO| B00YQ6X8EO|AGKHLEW2SOWHNMFQI...|1588687728923|           0|             true|this spray is rea...|
|   4.0|Works great but s...|This product does...|    []|B081TJ8YS3| B081TJ8YS3|AGKHLEW2SOWHNMFQI...|1588615855070|           1|             true|this product does...|
|   5.0|                Yes!|Smells good, feel...|    []|B07PNNCSP9| B097R46CSY|AE74DYR3QUGVPZJ3P...|1589665266052|           2|             true|smells good fe

In [0]:
# Alligning Reviews with Metadata
# Merge the reviews and metadata datasets using the ASIN (product ID) column to link reviews to product information
aligned_data = reviews_cleaned.join(metadata_spark_df, reviews_cleaned["asin"] == metadata_spark_df["parent_asin"], "inner")
aligned_data.show(5)


+------+--------------------+--------------------+------+----------+-----------+--------------------+-------------+------------+-----------------+--------------------+-------------+--------------------+--------------+-------------+--------+-----------+-----+--------------------+------------+--------------------+----------+--------------------+-----------+---------------+--------+------+
|rating|               title|                text|images|      asin|parent_asin|             user_id|    timestamp|helpful_vote|verified_purchase|        cleaned_text|main_category|               title|average_rating|rating_number|features|description|price|              images|      videos|               store|categories|             details|parent_asin|bought_together|subtitle|author|
+------+--------------------+--------------------+------+----------+-----------+--------------------+-------------+------------+-----------------+--------------------+-------------+--------------------+--------------+---

In [0]:
# Tokenization and Stop Word Removal
# Using Spark MLlib to tokenize the text and remove common stop words

from pyspark.ml.feature import Tokenizer, StopWordsRemover

# Tokenize the cleaned text
tokenizer = Tokenizer(inputCol="cleaned_text", outputCol="tokens")
tokenized_data = tokenizer.transform(aligned_data)

# Remove stop words
remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")
final_data = remover.transform(tokenized_data)
final_data.select("filtered_tokens").show(5)


+--------------------+
|     filtered_tokens|
+--------------------+
|[spray, really, n...|
|[product, need, w...|
|   [felt, synthetic]|
|              [love]|
|[polish, quiet, t...|
+--------------------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import when

# Map ratings to binary sentiment (polarity)
tfidf_data = tfidf_data.withColumn(
    "polarity",
    when(col("rating") >= 4, 1).when(col("rating") <= 2, 0).otherwise(None)  # 1 for positive, 0 for negative
)

# Drop rows with neutral ratings (if desired)
tfidf_data = tfidf_data.filter(col("polarity").isNotNull())

# Verify the polarity column
tfidf_data.select("rating", "polarity").show(10)


+------+--------+
|rating|polarity|
+------+--------+
|   5.0|       1|
|   4.0|       1|
|   1.0|       0|
|   5.0|       1|
|   4.0|       1|
|   5.0|       1|
|   5.0|       1|
|   5.0|       1|
|   5.0|       1|
|   5.0|       1|
+------+--------+
only showing top 10 rows



In [0]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Train-test split
(training_data, test_data) = tfidf_data.randomSplit([0.8, 0.2])

# Logistic Regression
lr = LogisticRegression(featuresCol="tfidf_features", labelCol="polarity")
lr_model = lr.fit(training_data)

# Evaluate Model
predictions = lr_model.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol="polarity", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Model Accuracy: {accuracy}")


Model Accuracy: 0.9056183973640409


In [0]:
# Display a sample of predictions
predictions.select("rating", "polarity", "prediction", "probability").show(5)


+------+--------+----------+--------------------+
|rating|polarity|prediction|         probability|
+------+--------+----------+--------------------+
|   1.0|       0|       1.0|[0.46213456752878...|
|   1.0|       0|       0.0|[0.99999958647872...|
|   1.0|       0|       0.0|[0.99838759547934...|
|   1.0|       0|       0.0|[0.62083503778393...|
|   1.0|       0|       0.0|[0.99998796810872...|
+------+--------+----------+--------------------+
only showing top 5 rows



In [0]:
print(training_data.columns)


['rating', 'title', 'text', 'images', 'asin', 'parent_asin', 'user_id', 'timestamp', 'helpful_vote', 'verified_purchase', 'cleaned_text', 'main_category', 'title', 'average_rating', 'rating_number', 'features', 'description', 'price', 'images', 'videos', 'store', 'categories', 'details', 'parent_asin', 'bought_together', 'subtitle', 'author', 'tokens', 'filtered_tokens', 'raw_features', 'tfidf_features', 'polarity']


In [0]:
from pyspark.ml.feature import StringIndexer

# Index user_id
user_indexer = StringIndexer(inputCol="user_id", outputCol="user_id_numeric")
training_data = user_indexer.fit(training_data).transform(training_data)

# Index asin
item_indexer = StringIndexer(inputCol="asin", outputCol="asin_numeric")
training_data = item_indexer.fit(training_data).transform(training_data)

# Verify the transformation
training_data.select("user_id", "user_id_numeric", "asin", "asin_numeric").show(5)


+--------------------+---------------+----------+------------+
|             user_id|user_id_numeric|      asin|asin_numeric|
+--------------------+---------------+----------+------------+
|AHHSVWZYRMXDXFEB6...|       376362.0|B07CZ5XJT8|      7671.0|
|AFHB6ANU67B6ELEDN...|       170119.0|B09LH6BDLR|      5510.0|
|AHN5FZLRA4UAYEHWT...|        23591.0|B07CXTNVRJ|     22629.0|
|AHN5FZLRA4UAYEHWT...|        23591.0|B07CXTNVRJ|     22629.0|
|AF7BEQFILV6WB7QYF...|       144430.0|B08WJPNZPQ|        74.0|
+--------------------+---------------+----------+------------+
only showing top 5 rows



In [0]:
from pyspark.ml.recommendation import ALS

# Build ALS Model
als = ALS(
    userCol="user_id_numeric",
    itemCol="asin_numeric",
    ratingCol="rating",
    coldStartStrategy="drop"
)
als_model = als.fit(training_data)

# Generate Recommendations
recommendations = als_model.recommendForAllUsers(10)
recommendations.show(5)


+---------------+--------------------+
|user_id_numeric|     recommendations|
+---------------+--------------------+
|             26|[{48169, 6.348346...|
|             27|[{20991, 6.522863...|
|             28|[{12755, 5.985104...|
|             31|[{25385, 6.845991...|
|             34|[{48583, 6.249381...|
+---------------+--------------------+
only showing top 5 rows



In [0]:
# Show recommendations
recommendations.select("user_id_numeric", "recommendations").show(5)


+---------------+--------------------+
|user_id_numeric|     recommendations|
+---------------+--------------------+
|             26|[{48169, 6.348346...|
|             27|[{20991, 6.522863...|
|             28|[{12755, 5.985104...|
|             31|[{25385, 6.845991...|
|             34|[{48583, 6.249381...|
+---------------+--------------------+
only showing top 5 rows



In [0]:
# Sentiment Distribution Visualization
# Sentiment distribution
sentiment_dist = tfidf_data.groupBy("polarity").count()

# Display bar chart
display(sentiment_dist)


polarity,count
1,453938
0,134682


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import explode

# Flatten recommendations
recommendations_exploded = recommendations.withColumn("recommendation", explode("recommendations"))
recommendations_flattened = recommendations_exploded.select(
    "user_id_numeric", 
    "recommendation.asin_numeric",  # Replace "item" with the correct field
    "recommendation.rating"
)

# Show the top 5 recommendations for the first 5 users
display(recommendations_flattened.limit(25))


user_id_numeric,asin_numeric,rating
26,48169,6.3483467
26,9509,6.2361403
26,41746,6.14402
26,43193,6.1281834
26,18124,6.1212263
26,63443,6.0537176
26,38424,6.0535426
26,19856,6.0508385
26,46705,6.0428867
26,23071,6.040094


Databricks visualization. Run in Databricks to view.