# Amazon Reviews Feature Engineering and Modeling Script

For this project, I will only be using the 'amazon_reviews_us_Beauty_v1_00.tsv' data set. This file was downloaded individually on the AWS CLI using an API command provided by the link below. All files in this project will be stored in it's respective folders from my AWS S3 bucket. All scripts were produced using the Databricks Community Edition.

- **Link to Kaggle Data Set**: https://www.kaggle.com/datasets/cynthiarempel/amazon-us-customer-reviews-dataset
- **Link to Databricks Community Edition**: https://docs.databricks.com/en/getting-started/community-edition.html

**Note: This dataset is 22 GB. Please consider copying the API command that is available at the link to download individual files that you will be working with.**

# Import `os` and set up the environment variables for your AWS Access Keys
**Disclaimer: It is important that you do not share your AWS access/secret keys with anyone. Please double-check your script before posting/sharing your script to the public**

In [0]:
import os
# To work with Amazon S3 storage, set the following variables using your AWS Access Key and Secret Key
# Set the Region to where your files are stored in S3.

# Replace 'your-access-key' and 'your-secret-key' with your actual AWS access/secret keys
access_key = 'your-access-key'
secret_key = 'your-secret-key'

# Set the environment variables so boto3 can pick them up later
os.environ['AWS_ACCESS_KEY_ID'] = access_key
os.environ['AWS_SECRET_ACCESS_KEY'] = secret_key
encoded_secret_key = secret_key.replace("/", "%2F")

# Note: You may need to change the aws_region depending on where your AWS S3 is located
aws_region = "us-east-2"

# Update the Spark options to work with our AWS Credentials
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3." + aws_region +
".amazonaws.com")

# Install all necessary libraries

In [0]:
# Pin the s3fs and fsspec versions
%pip install "s3fs==2023.12.1" "fsspec==2023.12.1"
%pip install textblob
%pip install boto3
%pip install --upgrade botocore

# Import all necessary libraries

In [0]:
import os
import io
import pandas as pd
import s3fs
import boto3
import matplotlib.pyplot as plt
import seaborn as sns
import json
from pyspark.sql.functions import col, isnan, when, count, udf
from IPython.display import HTML, display
from datetime import datetime
from pyspark.sql.functions import *


# Create File Path for your AWS S3 Bucket File(s)

In [0]:
# Set up the path to an Amazon reviews data stored on S3
# replace 'bucket-name-xx' with your actual bucket name. Replace 'xx' with your initials.
bucket = 'bucket-name-xx/raw/'
filename = 'amazon_reviews_us_Beauty_v1_00_cleaned.parquet'
file_path = 's3a://' + bucket + filename

# Create a Spark Dataframe from the Parquet file from your AWS S3 Bucket

In [0]:
# Since this is the cleaned filed and saved as a parquet, it should only take about 2 seconds to load
sdf = spark.read.parquet(file_path, inferSchema=True)

# Take a small sample for testing

In [0]:
# Take a small sample (for now) when testing, you can remove this to test the whole dataset after
sdf = sdf.sample(False, 0.01)
sdf.count()

# Check the Data Types of your Spark DataFrame
- **This step is crucial for understanding how you will want to proceed to feature engineering.**
- **Here is how you should consider mapping out your data type conversion in preparation for feature engineering:**
- **String** --> String Indexer --> Encoder --> Vector Assembler
- **Long/Int** --> Encoder --> Vector Assembler
- **Float/Double** --> Vector Assembler

In [0]:
# Check the schema
sdf.printSchema()

# Import all necessary modules for Feature Engineering

In [0]:
# Importing functions and modules
from pyspark.sql.functions import *
from pyspark.ml.feature import *
# Import the evaluation module
from pyspark.ml.evaluation import *
# Import the model tuning module
from pyspark.ml.tuning import *
from pyspark.ml import Pipeline
# Import the logistic regression model
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
import numpy as np
from pyspark.sql.types import DoubleType
from textblob import TextBlob

# 1. Create Indexer, Encoder, and MinMaxScaler

In [0]:
# This wil take 0.5 seconds to run on a 0.01 sample size! (compared to around 8-9 minutes on the full dataset)
# Adding Indexers, Encoders  and Assembler

# Note: since we are predicting star_rating, we cannot have it as a feature, thus it will not need to be binarized
sdf = sdf.withColumn("label", when(sdf.star_rating > 3, 1.0).otherwise(0.0).cast(DoubleType()))

# Create an indexer for the string based columns
# We can omit product_title in this step
indexer_input_columns =  ["product_category", "review_yearmonth", "review_dayofweek", "vine", "verified_purchase"]
indexer_output_columns = ["product_category_Index", "review_yearmonth_Index", "review_dayofweek_Index", "vine_Index", "verified_purchase_Index"]
indexer = StringIndexer(inputCols=indexer_input_columns, outputCols=indexer_output_columns)

# Create an encoder 
encoder_input_columns = ["product_category_Index", "review_yearmonth_Index", "review_dayofweek_Index", "vine_Index", "verified_purchase_Index"]
encoder_output_columns = ["product_category_Vector", "review_yearmonth_Vector", "review_dayofweek_Vector", "vine_Vector", "verified_purchase_Vector"]
encoder = OneHotEncoder(inputCols=encoder_input_columns, outputCols=encoder_output_columns, dropLast=False)

# Explicitly cast the two integer columns: "review_year" and "review_month" to DoubleType to ensure consistency of data types
sdf = sdf.withColumn("review_year", col("review_year").cast(DoubleType()))
sdf = sdf.withColumn("review_month", col("review_month").cast(DoubleType()))

# Scale the helpful_votes and total_votes column
helpful_votes_assembler = VectorAssembler(inputCols=['helpful_votes'], outputCol='helpful_votes_Vector')
total_votes_assembler = VectorAssembler(inputCols=['total_votes'], outputCol='total_votes_Vector')
# Scaler
helpful_votes_scaler = MinMaxScaler(inputCol="helpful_votes_Vector", outputCol="helpful_votes_Scaled")
total_votes_scaler = MinMaxScaler(inputCol="total_votes_Vector", outputCol="total_votes_Scaled")

# 2. Create Tokenizer, HasherTF, and IDF for text-based columns

In [0]:
# Note: We can use the same 'sdf' DataFrame each time
# Tokenizer
sdf = sdf.drop("review_words")
regexTokenizer = RegexTokenizer(inputCol="review_body", outputCol="review_words", pattern="\\w+", gaps=False)
sdf = regexTokenizer.transform(sdf)
sdf.select("review_body", "review_words").show(truncate=False)

# HashingTF
sdf = sdf.drop("review_word_features")
hashingTF = HashingTF(numFeatures=4096, inputCol="review_words", outputCol="review_word_features")
sdf = hashingTF.transform(sdf)
sdf.select(['review_words','review_word_features']).show(truncate=False)

# Inverse Document Frequency (IDF)
idf = IDF(inputCol='review_word_features', outputCol="review_body_features", minDocFreq=1)
sdf = idf.fit(sdf).transform(sdf)
sdf.select("review_date", "review_body_features").show(truncate=False)

In [0]:
# Tokenizer
#sdf = sdf.drop("review_words")
regexTokenizer = RegexTokenizer(inputCol="review_headline", outputCol="review_headline_words", pattern="\\w+", gaps=False)
sdf = regexTokenizer.transform(sdf)
sdf.select("review_headline", "review_headline_words").show(truncate=False)

# HashingTF
#sdf = sdf.drop("review_word_features")
hashingTF = HashingTF(numFeatures=4096, inputCol="review_headline_words", outputCol="review_headline_word_features")
sdf = hashingTF.transform(sdf)
sdf.select(['review_headline_words','review_headline_word_features']).show(truncate=False)

# Inverse Document Frequency (IDF)
idf = IDF(inputCol='review_headline_word_features', outputCol="review_headline_features", minDocFreq=1)
sdf = idf.fit(sdf).transform(sdf)
sdf.select("review_date", "review_headline_features").show(truncate=False)

# 3. Create an assembler for the individual feature vectors and the float/double columns

In [0]:
# Create an assembler
assembler = VectorAssembler(inputCols=["product_category_Vector", "review_yearmonth_Vector", "review_dayofweek_Vector",  "helpful_votes_Scaled", "total_votes_Scaled", "vine_Vector", "verified_purchase_Vector", "review_body_features", "review_headline_features", "review_weekend"], 
                            outputCol="features")

# 4. Create the Pipeline

In [0]:
# Create the Pipeline
# Note: since we already ran the  regexTokenizer, hashingTF, idf, those are now part of the 'sdf' dataframe, it does not need to be in the assembler
reviews_pipe = Pipeline(stages=[indexer, encoder, helpful_votes_assembler, total_votes_assembler, helpful_votes_scaler, total_votes_scaler,  assembler])

# Call .fit to transform the data
transformed_sdf = reviews_pipe.fit(sdf).transform(sdf)

# Review the transformed features
print("Transformed features")

transformed_sdf.select('features','label').show()

# 5. Plug new sdf into the Logistic Regression model

In [0]:
# At this point now, we have a decent looking features and label
# Now, we can plug that in to the Logistic Regression model.
# This will take about 9-11 minutes to run
trainingData, testData = transformed_sdf.randomSplit([0.7, 0.3], seed=42)

# Create a LogisticRegression Estimator
lr = LogisticRegression()

# Fit the model to the training data
model = lr.fit(trainingData)

In [0]:
# Save model to S3 bucket in the /model directory
# Replace 'bucket-name-xx' with your actual bucket name and replace 'xx' with your initials
# For the file name, replace 'yyyy-mm-dd' with the today's date
model_path = "s3://bucket-name-xx/models/logistic_regression_model_yyyy-mm-dd"
model.write().overwrite().save(model_path)

In [0]:
# This will take about 2 minutes to run
# Show model coefficients and intercept
print("Coefficients: ", model.coefficients)
print("Intercept: ", model.intercept)

# Test the model on the testData
test_results = model.transform(testData)

# Show the test results
test_results.select('product_title','product_category', 'helpful_votes', 'total_votes', 'verified_purchase', 'review_body', 'review_date', 'probability', 'prediction').show()

# Show the confusion matrix
test_results.groupby('label').pivot('prediction').count().sort('label').show()

confusion_matrix = test_results.groupby('label').pivot('prediction').count().fillna(0).collect()

def calculate_recall_precision(confusion_matrix):
    tn = confusion_matrix[0][1]  # True Negative
    fp = confusion_matrix[0][2]  # False Positive
    fn = confusion_matrix[1][1]  # False Negative
    tp = confusion_matrix[1][2]  # True Positive
    precision = tp / ( tp + fp )            
    recall = tp / ( tp + fn )
    accuracy = ( tp + tn ) / ( tp + tn + fp + fn )
    f1_score = 2 * ( ( precision * recall ) / ( precision + recall ) )
    return accuracy, precision, recall, f1_score

print("Accuracy, Precision, Recall, F1 Score")
print( calculate_recall_precision(confusion_matrix) )

# 6. Data Visualization

**1. Linear Regression Model based on helpful_votes_Scaled and total_votes_Scaled** 

In [0]:
# Linear Regression Model based on helpful_votes_Scaled and total_votes_Scaled
import matplotlib.pyplot as plt
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split

# Extract features and labels
X = np.array(transformed_sdf.select('total_votes_Scaled').collect()).reshape(-1, 1)
y = np.array(transformed_sdf.select('helpful_votes_Scaled').collect()).ravel()

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Create a linear regression model
model = LinearRegression()
model.fit(X_train, y_train)

# Predict the target variable
y_pred = model.predict(X_test)

# Plot the regression line
plt.figure(figsize=(10, 6))
plt.scatter(X_test, y_test, color='red', marker='o', label='Actual Data')
plt.plot(X_test, y_pred, label='Linear Regression Line', color='blue')
plt.xlabel('Total Votes')
plt.ylabel('Helpful Votes (Scaled)')
plt.title('Linear Regression')
plt.legend()
plt.show()

In [0]:
from sklearn.metrics import mean_squared_error, r2_score
# Calculate RMSE
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
print(f'Root Mean Squared Error (RMSE): {rmse}')

# Calculate R-squared
r2 = r2_score(y_test, y_pred)
print(f'R-squared (R2): {r2}')

**2. Logistic Regression Model based on total_votes_Scaled and label** 

In [0]:
# Logistic Regression Model based on total_votes_Scaled and star_rating ('label'= star_rating > 3 = 1.0) 
# Note: a Star Rating of 1.0 is a star rating that is more than 3, which indicates a "good" rating
import matplotlib.pyplot as plt
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split

# Extract features and labels
X = np.array(transformed_sdf.select('total_votes_Scaled').collect()).reshape(-1, 1)
y = np.array(transformed_sdf.select('label').collect()).ravel()

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Create a logistic regression model
model = LogisticRegression()
model.fit(X_train, y_train)

# Plot the logistic regression curve
plt.figure(figsize=(10, 6))

# Generate a range of values for the star_rating
rating_range = np.linspace(X[:, 0].min(), X[:, 0].max(), 300).reshape(-1, 1)

# Calculate the predicted probabilities
probabilities = model.predict_proba(rating_range)[:, 1]

# Plot the logistic regression curve
plt.plot(rating_range, probabilities, label='Logistic Regression Curve', color='blue')

# Scatter plot of the data points
plt.scatter(X[:, 0], y, color='red', marker='o', label='Actual Data')

# Add labels and title
plt.xlabel('Star Rating')
plt.ylabel('Probability of Total Votes')
plt.title('Logistic Regression Curve')

# Show the legend
plt.legend()

# Show the plot
plt.show()


**3. Logistic Regression Model based on review_body_length and 'label'**

In [0]:
# Logistic Regression Model based on review_body_length and 'label'
import matplotlib.pyplot as plt
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split

# Extract features and labels
X = np.array(transformed_sdf.select('review_body_length').collect())
y = np.array(transformed_sdf.select('label').collect()).ravel()

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Create a logistic regression model
model = LogisticRegression()
model.fit(X_train, y_train)

# Plot the logistic regression curve
plt.figure(figsize=(10, 6))

# Generate a range of values for the review_body_length
length_range = np.linspace(X[:, 0].min(), X[:, 0].max(), 300).reshape(-1, 1)

# Calculate the predicted probabilities
probabilities = model.predict_proba(length_range)[:, 1]

# Plot the logistic regression curve
plt.plot(length_range, probabilities, label='Logistic Regression Curve', color='blue')

# Scatter plot of the data points
plt.scatter(X[:, 0], y, color='red', marker='o', label='Actual Data')

# Add labels and title
plt.xlabel('Review Body Length')
plt.ylabel('Probability of Label 1')
plt.title('Logistic Regression Curve')

# Show the legend
plt.legend()

# Show the plot
plt.show()


# Creating a Logistic Regression Model using Sentiment Analysis 

In [0]:
# Creating a Logistic Regression Model using sentiment analysis 
# Defining sentiment analysis function
from textblob import TextBlob
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# Create a function to perform sentiment analysis on some text
def sentiment_analysis(some_text):
    sentiment = TextBlob(some_text).sentiment.polarity
    return sentiment

sentiment_analysis_udf = udf(sentiment_analysis, DoubleType())

# Apply sentiment analysis UDF
transformed_sdf = sdf.withColumn("sentiment_score_review", sentiment_analysis_udf(sdf['review_body']))


**4. Logistic Regression Model based on 'sentiment_score_review' and 'label'**

In [0]:
# Logistic Regression Model based on 'sentiment_score_review' and 'label'
import matplotlib.pyplot as plt
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split

# Extract features and labels
X = np.array(transformed_sdf.select('sentiment_score_review').collect())
y = np.array(transformed_sdf.select('label').collect()).ravel()

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Create a logistic regression model
model = LogisticRegression()
model.fit(X_train, y_train)

# Plot the logistic regression curve
plt.figure(figsize=(10, 6))

# Generate a range of values for the sentiment_score_review
score_range = np.linspace(X[:, 0].min(), X[:, 0].max(), 300).reshape(-1, 1)

# Calculate the predicted probabilities
probabilities = model.predict_proba(score_range)[:, 1]

# Plot the logistic regression curve
plt.plot(score_range, probabilities, label='Logistic Regression Curve', color='blue')

# Scatter plot of the data points
plt.scatter(X[:, 0], y, color='red', marker='o', label='Actual Data')

# Add labels and title
plt.xlabel('Sentiment Score Review')
plt.ylabel('Probability of Label 1')
plt.title('Logistic Regression Curve')

# Show the legend
plt.legend()

# Save the figure
plt.savefig("label_by_sentiment_score_matplotlib.png")

# Show the plot
plt.show()

# Save Figure to your AWS S3 Bucket Folder '/models'

In [0]:
# Save Figure to your AWS S3 Bucket Folder '/models'
from io import BytesIO

# Save the figure to a BytesIO object
img_data = BytesIO()
plt.savefig(img_data, format='png')
img_data.seek(0)

# Your S3 bucket details
# Replace 'bucket-name-xx' with your actual bucket name. Replace 'xx' with your initials.
bucket_name = 'bucket-name-xx'
s3_path = 'models/'

# Upload the figure to S3 using boto3
# Replace 'your-access-key' and 'your-secret-key' with your actual AWS access/secret keys.
s3 = boto3.client('s3', aws_access_key_id='your-access-key', aws_secret_access_key='your-secret-key')
s3.upload_fileobj(img_data, bucket_name, s3_path)

# Close the plot
plt.close()

# Print statement
print(f"Figure saved to S3: {bucket_name}/{s3_path}")