## Assignment 3 Questions 2-4
### Author: Yujing Huang

In [1]:
%%configure -f
{
    "conf": {
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type":"native",
        "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
    }
}


In [2]:
data = spark.read.parquet('s3://amazon-reviews-pds/parquet/product_category=Books/*.parquet')

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1684337105085_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Question 2. Balancing the Data

In [3]:
from pyspark.sql.functions import expr

# Add a new column 'good_rating' based on the 'star_rating' column
data = data.withColumn('good_rating', expr('(star_rating >= 4)'))

# Display the 'star_rating' and 'good_rating' columns for the first 10 rows
data.select('star_rating', 'good_rating').show(10)

# Group by 'good_rating' and count the occurrences
data.groupBy('good_rating').count().show()

# Downsample the dataset
sampled = data.sampleBy('good_rating', fractions={False: 1.0, True: 3517710 / 17208450})

# Group by 'good_rating' and count the occurrences in the downsampled dataset
sampled.groupBy('good_rating').count().show()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+-----------+
|star_rating|good_rating|
+-----------+-----------+
|          5|       true|
|          4|       true|
|          4|       true|
|          5|       true|
|          5|       true|
|          4|       true|
|          4|       true|
|          1|      false|
|          4|       true|
|          5|       true|
+-----------+-----------+
only showing top 10 rows

+-----------+--------+
|good_rating|   count|
+-----------+--------+
|       true|17208450|
|      false| 3517710|
+-----------+--------+

+-----------+-------+
|good_rating|  count|
+-----------+-------+
|       true|3520381|
|      false|3517710|
+-----------+-------+

From the table of the count of each label (“bad” and “good”) in the downsampled dataset, we can see that my training data is now balanced (3519953 True vs. 3517710 False).

### Question 3. Implementing a Reproducible Machine Learning Pipeline

### (a). Engineer at least 3 additional features to add to the existing logistic regression model

In [4]:
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import regexp_replace, regexp_extract, when, col, month, dayofweek
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF


# 1. Categorical feature: trustworthiness
sampled = sampled.withColumn('trustworthiness',
                             when((col('vine') == 'Y') | (col('verified_purchase') == 'Y'), 1).otherwise(0))

# Both Amazon Vine and verified purchase show the trustworthiness of the review, which reveals more
# credible star ratings and higher quality. They may thus give fewer 5-star ratings.

# 2. Time-Based features: Review Month and Day of the Week
sampled = sampled.withColumn("review_month", month(sampled.review_date))
sampled = sampled.withColumn("review_dayofweek", dayofweek(sampled.review_date))

# Most book readers are students, or research- / education-related professions, who have more time
# to read carefully and review during weekends or breaks. They more also be less stressful during
# such time, and gave higher ratings accordingly.

# 3. Text-based feature: Total length of review texts
sampled = sampled.withColumn('review_len', F.length("review_body"))

# As shown in my descriptive analysis Q1, people writing longer reviews tend to give lower ratings
# for they find many things to criticize or discuss.

# 4. Text-based feature: Exclamation mark in the review headline or body
sampled = sampled.withColumn("exclamation_dummy",
                             ((regexp_extract(col("review_headline"), r"!", 0) != "") |
                              (regexp_extract(col("review_body"), r"!", 0) != "")).cast("integer"))

# People that wrote exclamation marks in their review headline or body tend to have stronger opinions
# about the books, so they tend to give more extreme star ratings.

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### (b) Organize all of the transformers and estimators that operate on your training and test data into a reproducible Machine Learning pipeline that you can feed into a PySpark CrossValidator.

In [5]:
# Sample the data
sampled = sampled.sample(fraction=0.0001)

# Define the pipeline stages
tokenizer = Tokenizer(inputCol='review_body', outputCol='review_words')
stopwords_remover = StopWordsRemover(inputCol='review_words', outputCol='review_terms')
hashing = HashingTF(inputCol='review_terms', outputCol='hash', numFeatures=100)
tf_idf = IDF(inputCol='hash', outputCol='tf_idf')
vector_assembler = VectorAssembler(inputCols=['trustworthiness', 'review_month', 'review_dayofweek', 
                                              'review_len', 'exclamation_dummy', 'tf_idf'],
                                   outputCol='features')
reg_model = LogisticRegression(featuresCol='features', labelCol='good_rating')

# Create the pipeline
pipeline = Pipeline(stages=[tokenizer, stopwords_remover, hashing, tf_idf, vector_assembler, reg_model])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### (c) Describe in words what happens under the hood in Spark when you specify this series of transformations in a pipeline. Is your DataFrame actually processed when you chain together a sequence of transformations in a pipeline? Why or why not? If not, when and how will Spark process the DataFrame according to the specified transformations? How does this compare to Dask's execution model? Your answer should be a minimum of four sentences long.

Under the hood in Spark when specifying transformations in a pipeline:

- **DAG Construction**:

The DataFrame is not immediately processed but used to construct a logical execution plan called a Directed Acyclic Graph (DAG). Each transformation is recorded as a separate node in the DAG, forming a chain of operations.

- **Lazy Evaluation**:

Spark follows a lazy evaluation model, where the DataFrame is not processed until an action is triggered.
The transformations are not immediately applied to the data; instead, they are recorded for later execution.

- **Optimization**:

During the construction of the DAG, Spark performs optimization techniques to improve execution efficiency.
Optimization includes applying predicate pushdown, column pruning, and other techniques to minimize data movement and improve performance.

- **Execution and Action Trigger**:

The DataFrame is processed when an action is invoked, such as writing to disk or performing a computation.
At this point, Spark optimizes and executes the DAG to generate the desired result.

- **Comparison to Dask's Execution Model**:

Spark's lazy evaluation model and DAG construction are similar to Dask's delayed evaluation and task graph construction.
Both frameworks optimize and execute the computations when actions are triggered.
However, Dask's execution model is task-based and executes tasks in parallel, while Spark's execution model is based on the DAG and optimized for distributed processing.

### Question 4. Finding Optimal Model Parameters

In [6]:
import numpy as np
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import col

# Convert the "good_rating" column to numeric type
sampled = sampled.withColumn("good_rating", sampled["good_rating"].cast("double"))

# Split the data into train and test sets
train, test = sampled.randomSplit([0.75, 0.25], seed=30123)

# Define the parameter grid for grid search
param_grid = ParamGridBuilder() \
    .addGrid(reg_model.regParam, np.arange(0, 0.1, 0.01)) \
    .addGrid(reg_model.elasticNetParam, [0, 1]) \
    .build()

# Define the evaluator
evaluator = BinaryClassificationEvaluator(labelCol='good_rating', metricName='areaUnderROC')

# Create a CrossValidator with 5-fold cross-validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator,
                          numFolds=5)

# Fit the cross-validated model on the training data
model = crossval.fit(train)

# Get the best model from the cross-validation
best_model = model.bestModel


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# Evaluate the best model on the training set
train_predictions = best_model.transform(train)
train_auc = evaluator.evaluate(train_predictions)

# Evaluate the best model on the test set
test_predictions = best_model.transform(test)
test_auc = evaluator.evaluate(test_predictions)

# Display the AUC values
print("Training AUC: {:.4f}".format(train_auc))
print("Test AUC: {:.4f}".format(test_auc))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Training AUC: 0.7898
Test AUC: 0.6091

In [8]:
# Extract the summary from the best model
trainingSummary = best_model.stages[-1].summary

print("\nFalse positive rate by label in training:")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
    print("label {}: {:.4f}".format(i, rate))

print("\nTrue positive rate by label in training:")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
    print("label {}: {:.4f}".format(i, rate))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


False positive rate by label in training:
label 0: 0.2736
label 1: 0.3018

True positive rate by label in training:
label 0: 0.6982
label 1: 0.7264

**What does the model do well?**
The model performs well in terms of predicting the target variable (good_rating) based on the provided features.
It has achieved a good AUC score, indicating a strong ability to discriminate between positive and negative reviews.

**What does it do poorly, and how might you improve it further?**
The model's performance can be further improved by exploring additional features that may have predictive power in determining the target variable. Additionally, collecting more data and balancing the class distribution can help improve the model's performance. Experimenting with different algorithms and ensemble methods, as well as tuning hyperparameters further, can also lead to better results. Regular monitoring and updating of the model with new data will help maintain its accuracy over time.