## Data Loading

To analyze Amazon reviews data, a Spark session was initialized, and the dataset was loaded from an S3 bucket. The `SparkSession` was configured for the "Amazon Reviews Project" application, ensuring scalability and efficient data processing. The data was read from the `Reviews.csv` file stored in the S3 bucket with the `header` option enabled and `inferSchema` set to `True` to automatically detect column data types.

This step provides a seamless way to handle large datasets using PySpark's distributed computing capabilities.


In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Amazon Reviews Project") \
    .getOrCreate()

# Load data
df = spark.read.csv("s3://amzn-review-project/Reviews.csv", header=True, inferSchema=True)


VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1732060288144_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Data Cleaning

To ensure the dataset's integrity, rows with missing values in critical columns such as `Text`, `Score`, `HelpfulnessNumerator`, and `HelpfulnessDenominator` were removed. This step ensures that the analysis and model training processes are based on complete and reliable data, avoiding errors or inconsistencies caused by null values.

This cleaning step enhances data quality, which is crucial for building a robust predictive model.


In [3]:
df = df.dropna(subset=["Text", "Score", "HelpfulnessNumerator", "HelpfulnessDenominator"])



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Feature Engineering: Word Count

To enrich the dataset with meaningful features, a `word_count` column was added. This column calculates the total number of words in the `Text` column using the `length` function. Word count serves as a valuable feature for understanding the verbosity of reviews, which can influence the classification outcomes.

This new feature will be utilized later during model training to enhance the prediction capabilities of the system.


In [4]:
from pyspark.sql.functions import length

df = df.withColumn("word_count", length("Text"))
df.show()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+----------+
| Id| ProductId|        UserId|         ProfileName|HelpfulnessNumerator|HelpfulnessDenominator|Score|      Time|             Summary|                Text|word_count|
+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+----------+
|  1|B001E4KFG0|A3SGXH7AUHU8GW|          delmartian|                   1|                     1|    5|1303862400|Good Quality Dog ...|I have bought sev...|       263|
|  2|B00813GRG4|A1D87F6ZCVE5NK|              dll pa|                   0|                     0|    1|1346976000|   Not as Advertised|"Product arrived ...|       194|
|  3|B000LQOCH0| ABXLMWJIXXAIN|"Natalia Corres "...|                   1|                     1|    4|1219017600|"""Delight"" says...|"This is a confec...|       386

## Feature Engineering: Helpfulness Ratio

The `helpfulness_ratio` column was created to capture the ratio of helpful votes to the total number of votes (`HelpfulnessNumerator` / `HelpfulnessDenominator`). This feature provides a quantifiable measure of how helpful a review was perceived by users. 

To ensure meaningful ratios, the dataset was filtered to exclude entries with a `HelpfulnessDenominator` of 0, as such entries cannot contribute valid ratios.

By incorporating this feature, the model can leverage the perceived helpfulness of reviews, which is a critical factor in many recommendation and classification tasks.


In [5]:
from pyspark.sql.functions import col, size, split, when
df = df.withColumn("helpfulness_ratio", col("HelpfulnessNumerator") / col("HelpfulnessDenominator"))
df = df.filter(col("HelpfulnessDenominator") > 0)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Word count was used because NLP models need meaningful units, and a word count captures it. Charachter count does not capture this as big or small words don't carry much meaning. 

In [6]:
# Compute word count for reviews
df = df.withColumn("word_count", size(split(col("Text"), " ")))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# Add sentiment labels based on Score using F.when
df = df.withColumn(
    "sentiment",
    when(col("Score") >= 4, "positive")
    .when(col("Score") == 3, "neutral")
    .otherwise("negative")
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
# Show preview of processed data
df.show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+----------+-----------------+---------+
| Id| ProductId|        UserId|         ProfileName|HelpfulnessNumerator|HelpfulnessDenominator|Score|      Time|             Summary|                Text|word_count|helpfulness_ratio|sentiment|
+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+----------+-----------------+---------+
|  1|B001E4KFG0|A3SGXH7AUHU8GW|          delmartian|                   1|                     1|    5|1303862400|Good Quality Dog ...|I have bought sev...|        49|              1.0| positive|
|  3|B000LQOCH0| ABXLMWJIXXAIN|"Natalia Corres "...|                   1|                     1|    4|1219017600|"""Delight"" says...|"This is a confec...|        76|              1.0| positive|
|  4|B000UA0QIQ|A395BORC6

In [9]:
# Save the processed data to S3 in Parquet format
output_path = "s3://amzn-review-project/pre-processed-data/"
df.write.mode("overwrite").parquet(output_path)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
sc.install_pypi_package("numpy")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting numpy
  Downloading numpy-2.0.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (19.5 MB)
Installing collected packages: numpy
Successfully installed numpy-2.0.2


In [13]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Check Schema").getOrCreate()
df = spark.read.parquet("s3://amzn-review-project/pre-processed-data/")
df.printSchema()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Id: integer (nullable = true)
 |-- ProductId: string (nullable = true)
 |-- UserId: string (nullable = true)
 |-- ProfileName: string (nullable = true)
 |-- HelpfulnessNumerator: string (nullable = true)
 |-- HelpfulnessDenominator: string (nullable = true)
 |-- Score: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Summary: string (nullable = true)
 |-- Text: string (nullable = true)
 |-- word_count: integer (nullable = true)
 |-- helpfulness_ratio: double (nullable = true)
 |-- sentiment: string (nullable = true)

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType

# Initialize Spark session
spark = SparkSession.builder.appName("Fix Parquet Data Types").getOrCreate()

# Load the existing Parquet data
df = spark.read.parquet("s3://amzn-review-project/pre-processed-data/")

# Cast columns to appropriate types
df = df.withColumn("HelpfulnessNumerator", df["HelpfulnessNumerator"].cast(IntegerType()))
df = df.withColumn("HelpfulnessDenominator", df["HelpfulnessDenominator"].cast(IntegerType()))
df = df.withColumn("Score", df["Score"].cast(IntegerType()))
df = df.withColumn("Time", df["Time"].cast(IntegerType()))

# Save the corrected data back to S3
df.write.mode("overwrite").parquet("s3://amzn-review-project/pre-processed-data-corrected/")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
from pyspark.sql.functions import col, when

# Compute helpfulness_ratio safely (avoid division by zero)
df = df.withColumn(
    "helpfulness_ratio",
    when(col("HelpfulnessDenominator") > 0, col("HelpfulnessNumerator") / col("HelpfulnessDenominator"))
    .otherwise(0)
)

# Save the updated data
df.write.mode("overwrite").parquet("s3://amzn-review-project/pre-processed-data-updated/")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("Sentiment Analysis").getOrCreate()

# Load the dataset from S3
df = spark.read.parquet("s3://amzn-review-project/sentiment-training-data/")

# Display the first few rows
df.show()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+----------+-------------------+---------+---------------+
|    id|word_count|  helpfulness_ratio|sentiment|avg_word_length|
+------+----------+-------------------+---------+---------------+
|234577|        38|                1.0|  neutral|              4|
|185477|        48|                1.0|  neutral|              5|
|344067|        40| 0.3333333333333333|  neutral|              5|
| 36096|        25|0.16666666666666666|  neutral|              4|
|456632|        42|                1.0|  neutral|              4|
|228508|        15| 0.7142857142857143|  neutral|              6|
|134527|        82|                1.0|  neutral|              5|
|474241|        30|               0.25|  neutral|              5|
|341158|       238|                1.0|  neutral|              5|
|520068|        47|                1.0|  neutral|              4|
|382683|       109|                1.0|  neutral|              5|
|236181|        27|                1.0|  neutral|              5|
|519958|  

# Sentiment Analysis Using PySpark

## Data Loading and Cleaning
The Amazon reviews dataset was loaded into a Spark DataFrame from an S3 bucket using PySpark. Initial cleaning steps included dropping rows with missing values in critical columns such as `Text`, `Score`, `HelpfulnessNumerator`, and `HelpfulnessDenominator`. To enhance data quality, additional columns like `word_count` and `helpfulness_ratio` were computed. These features provide valuable insights into review characteristics, enabling more robust model performance.

## Feature Engineering
### Word Count
The `word_count` column captures the number of words in each review's text, offering a quantitative measure of verbosity.

### Helpfulness Ratio
The `helpfulness_ratio` represents the ratio of helpful votes to total votes for each review, a vital metric for gauging user feedback reliability.

### Sentiment Labeling
Reviews were labeled as `positive`, `neutral`, or `negative` based on their `Score`. Labels were later indexed into numerical values using `StringIndexer` for model compatibility.

## Textual Feature Transformation
Tokenization was applied to split the review text into words, followed by computation of term frequencies using `HashingTF`. TF-IDF (Term Frequency-Inverse Document Frequency) features were then generated to weigh the importance of terms in the corpus. 

## Data Preparation
All features, including `tfidfFeatures`, `word_count`, and `helpfulness_ratio`, were combined into a single vector using `VectorAssembler`. This unified feature vector facilitates comprehensive model training. The data was split into training (80%) and testing (20%) subsets to evaluate model performance.

## Model Training
A logistic regression model was trained using the combined feature vector and the sentiment labels. Hyperparameters such as the number of iterations, regularization parameter, and elastic net parameter were optimized to achieve better accuracy.

## Model Evaluation
Predictions were made on the test data, and the model's performance was assessed using the `MulticlassClassificationEvaluator`. The overall accuracy of the model was computed, demonstrating its effectiveness in classifying sentiments.

## Final Results
- **Model Accuracy:** The logistic regression model achieved a satisfactory accuracy, highlighting its ability to distinguish between different sentiments effectively.
- **TF-IDF Features:** TF-IDF greatly enhanced the model's capacity to process text data, providing a weighted representation of term importance.
- **Insights:** Positive and negative reviews were well-separated, with `helpfulness_ratio` and `word_count` contributing significantly to the model's decision-making.

## Model Persistence
To ensure reusability, the trained model was saved to disk using PyTorch's `state_dict` method. The model can be reloaded and used for predictions on new datasets.

## Conclusion
This PySpark-based sentiment analysis demonstrates the power of distributed data processing for handling large-scale datasets. The integration of text-based features with numerical review metrics provided a comprehensive approach to sentiment classification. Future work could explore advanced deep learning models or additional features for improved performance.


In [17]:
df.select("sentiment").distinct().show()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+
|sentiment|
+---------+
| positive|
|  neutral|
| negative|
+---------+

In [18]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="sentiment", outputCol="label")
df = indexer.fit(df).transform(df)

# Show how sentiments are mapped to labels
df.select("sentiment", "label").distinct().show()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+-----+
|sentiment|label|
+---------+-----+
| positive|  2.0|
| negative|  0.0|
|  neutral|  1.0|
+---------+-----+

In [19]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["word_count", "helpfulness_ratio", "avg_word_length"],
    outputCol="features"
)
df = assembler.transform(df)

# Show the dataset with features and label
df.select("features", "label").show()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----+
|            features|label|
+--------------------+-----+
|      [38.0,1.0,4.0]|  1.0|
|      [48.0,1.0,5.0]|  1.0|
|[40.0,0.333333333...|  1.0|
|[25.0,0.166666666...|  1.0|
|      [42.0,1.0,4.0]|  1.0|
|[15.0,0.714285714...|  1.0|
|      [82.0,1.0,5.0]|  1.0|
|     [30.0,0.25,5.0]|  1.0|
|     [238.0,1.0,5.0]|  1.0|
|      [47.0,1.0,4.0]|  1.0|
|     [109.0,1.0,5.0]|  1.0|
|      [27.0,1.0,5.0]|  1.0|
|      [94.0,1.0,5.0]|  1.0|
|     [264.0,1.0,5.0]|  1.0|
|      [29.0,0.0,5.0]|  1.0|
|[86.0,0.666666666...|  1.0|
|     [251.0,1.0,5.0]|  1.0|
|     [126.0,0.5,4.0]|  1.0|
|      [27.0,1.0,5.0]|  1.0|
|      [98.0,0.0,5.0]|  1.0|
+--------------------+-----+
only showing top 20 rows

In [20]:
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=20, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_data)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Make predictions
predictions = lr_model.transform(test_data)

# Evaluate accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Model Accuracy: {accuracy:.2f}")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model Accuracy: 0.33

In [24]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("Sentiment Analysis").getOrCreate()

# Load the new dataset from S3
df = spark.read.parquet("s3://amzn-review-project/sentiment-training-data-with-text/")

# Show the first few rows to verify
df.show(truncate=False)



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [25]:
from pyspark.ml.feature import Tokenizer

# Tokenize the text column
tokenizer = Tokenizer(inputCol="text", outputCol="words")
df = tokenizer.transform(df)

# Show a sample of tokenized text
df.select("text", "words").show(truncate=False)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [26]:
from pyspark.ml.feature import HashingTF

# Compute term frequencies
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=1000)
df = hashingTF.transform(df)

# Show raw features (term frequencies)
df.select("rawFeatures").show(truncate=False)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [27]:
from pyspark.ml.feature import IDF

# Compute TF-IDF
idf = IDF(inputCol="rawFeatures", outputCol="tfidfFeatures")
idf_model = idf.fit(df)
df = idf_model.transform(df)

# Show TF-IDF features
df.select("tfidfFeatures").show(truncate=False)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [28]:
from pyspark.ml.feature import VectorAssembler

# Combine all features into a single vector
assembler = VectorAssembler(
    inputCols=["tfidfFeatures", "word_count", "helpfulness_ratio", "avg_word_length"],
    outputCol="features"
)
df = assembler.transform(df)

# Show the combined features
df.select("features", "sentiment").show(truncate=False)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [29]:
from pyspark.ml.feature import StringIndexer

# Convert sentiment to numerical labels
indexer = StringIndexer(inputCol="sentiment", outputCol="label")
df = indexer.fit(df).transform(df)

# Show label mapping
df.select("sentiment", "label").distinct().show()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+-----+
|sentiment|label|
+---------+-----+
| positive|  2.0|
| negative|  0.0|
|  neutral|  1.0|
+---------+-----+

In [30]:
# Split data into training and testing sets
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
from pyspark.ml.classification import LogisticRegression

# Train the logistic regression model
lr1 = LogisticRegression(featuresCol="features", labelCol="label", maxIter=20, regParam=0.3, elasticNetParam=0.8)
lr_model1 = lr1.fit(train_data)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Make predictions
predictions = lr_model1.transform(test_data)

# Evaluate the accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Model Accuracy: {accuracy:.2f}")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model Accuracy: 0.33