#SENG 550 PROJECT: AMAZON REVIEW CLASSIFIER

## First the Setup Spark in Google Colab (Based on the notebook given in class)
*reference: https://www.analyticsvidhya.com/blog/2020/11/a-must-read-guide-on-how-to-work-with-pyspark-on-google-colab-for-data-scientists/*


*to install other versions, get the download link from https://spark.apache.org/downloads.html*

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz

In [None]:
!tar -xvf spark-3.3.1-bin-hadoop3.tgz

In [None]:
!pip install findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

In [None]:
import findspark
findspark.init()

In [None]:
findspark.find()

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
sc = spark.sparkContext

Just testing if spark is working, for a start!

In [None]:
test = sc.parallelize([1, 2, 3, 4, 5])
test.map(lambda x: (x, x**2)).collect()

[(1, 1), (2, 4), (3, 9), (4, 16), (5, 25)]

## Next getting the data (the reviews json in this case), reading it and extracting useful info out of it. We use the cell phone review data in this case

In [None]:
!rm -r /content/reviews_Cell_Phones_and_Accessories_5.json.gz
!wget --no-check-certificate http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Cell_Phones_and_Accessories_5.json.gz

## Reading the file and extracting the useful info

In [None]:
file_data_rdd = spark.read.json("/content/reviews_Cell_Phones_and_Accessories_5.json.gz").rdd

In [None]:
# This is how the data looks
file_data_rdd.take(1)

In [None]:
# For our binary classifier (good or bad review), only the fields the that are useful are:
# overall, reviewText, and possibly helpful so we'll extract those
from pyspark.sql import Row
review_data_rdd = file_data_rdd.map(lambda row: Row(rating=row.overall, helpfulness=row.helpful, text=row.reviewText ))

In [None]:
review_data_rdd.take(1)

## Now that we are done with extracting the data, let us do a bit of cleaning to make sure we get consistent results and also do a bit of exploratory data analysis

In [None]:
# Coming to cleaning part
# For each review, we do need all the three
# So we will filter out the reviews that don't have those

cleaned_rdd = review_data_rdd.filter(lambda row: row.rating is not None and row.helpfulness is not None and row.text is not None)



# People are generous and they usually give high ratings (sometimes more than what they should)
# So We'll define the threshold for a favourable or good review to be 3.5 
# Anything lower than that is bad

GOOD_REVIEW_THRESHOLD = 3.5

number_of_total_reviews = cleaned_rdd.count()
number_of_good_reviews = cleaned_rdd.filter(lambda row: row.rating >=3.5).count()

In [None]:
print("Total Reviews: ", number_of_total_reviews)
print("Good Reviews: ", number_of_good_reviews)
print("Bad Reviews: ", number_of_total_reviews - number_of_good_reviews)

### As Evident from the data that we have, the data is heavily skewed towards good reviews i.e. a review would be a good review. As mentioned in the class, in cases like these, it is not best practise to evaluate the model on the basis of its accuracy, we would need measures such as recall, precision and F-score to test the certainity the certainity of our model actually working and doing its job well.

## Now coming to the part of simplyfying the RDD ,splitting it into datasets, extracting features and building on top of them





In [None]:
# Now


# Helpfulness is an list in our RDD, we can possibly change that to be an average
# As it would be an easier metric to work with
# And this would be the final (the original dataset)
# Before we end up splitting it up

# Also rating on its own doesn't 
# We'll use the rating to tag a review with label 'good' or 'bad' to aid in training afterwards
# For us Good Review = 1 and Bad Review = 0

original_rdd = cleaned_rdd.map(lambda row: Row(helpful_rating=sum(row.helpfulness)/len(row.helpfulness), helpful_no= len(row.helpfulness), text = row.text, label = 1 if row.rating >= 3.5 else 0))
original_rdd.take(3)

## Now it's time to use the original_rdd to extract features and move forward in the process of developing a model based on that

In [None]:
# From here on we make use of the training rdd 
# For coming up with our model

# Importing the required libraries for RegexTokenizer, StopWordsRemover, HashingTF
from pyspark.ml import *
from pyspark.ml.feature import *


# Converting the RDD to a Database as it is easier to work with (a dataframe is easier to work with column-based data) 
# A Data Frame gives us the same scalability capabilities so why not use it
original_df = original_rdd.toDF()
 
# Developing the tokenizer (to be used for tokenizing each word of the review text)
tokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\W")
df_tokenized = tokenizer.transform(original_df)

# To remove all the common words to be used in a sentence (which are of no use to our model)
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
df_filtered = remover.transform(df_tokenized)

# Converting the filtered words into numerical raw features
hashingTF = HashingTF(inputCol="filtered", outputCol="features")
df_features_raw = hashingTF.transform(df_filtered)

# Let's see how the different Transformation fields look like
df_features_raw.take(1)

In [None]:
# Now it is time to extract only those feature columns that we actually do need to train the model
df_features = df_features_raw.select('features', 'helpful_rating', 'helpful_no', 'label')

# This how they look
df_features.take(1)

## Splitting the feature dataset into different datasets - training, test and cross-validation data

In [None]:
# So we have original data set pretty much set
# It is time to split it up into training, test and cross validation data sets

df_training, df_test, df_cross_validation = df_features.randomSplit([0.5, 0.2, 0.3])

## Now training the model on training data

In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol='label')
model = lr.fit(df_training)

from pyspark.ml.evaluation import BinaryClassificationEvaluator


predictions_df = model.transform(df_cross_validation)

# Evaluate model performance
evaluator = BinaryClassificationEvaluator()
print(evaluator.evaluate(predictions_df))


### Checking ideal max iterations
After some tests we noticed that `maxIter=10` is a sweet spot to mantain model complexity. After having done several technical and statistical analysis when the iterations goes beyond the value of `10` the model complexity is probably too high. This implies data is **overfitting** to a high extent.
Therefore, we limited to not do more than `10` iterations to prevent model complexity to go too high. 

In [68]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol='label')
lr.setMaxIter(10)
model = lr.fit(df_training)


from pyspark.ml.evaluation import BinaryClassificationEvaluator

test_predictions_df = model.transform(df_training)

# Evaluate model performance
test_predictions_df.groupBy('label', 'prediction').count().show()
TN = test_predictions_df.filter('prediction = 0 AND label = prediction').count()
TP = test_predictions_df.filter('prediction = 1 AND label = prediction').count()
FN = test_predictions_df.filter('prediction = 0 AND label <> prediction').count()
FP = test_predictions_df.filter('prediction = 1 AND label <> prediction').count()
accuracy = (TN + TP) / (TN + TP + FN + FP)
precision = TP / (TP + FP)
recall = TP / (TP + FN)
F =  2 * (precision*recall) / (precision + recall)
print("testing data")
print('\t',precision)
print('\t',recall)
print('\t',accuracy)
print('\t',F)

print()
print()
# CROSS VALIDATE

predictions_df = model.transform(df_cross_validation)

# Evaluate model performance
predictions_df.groupBy('label', 'prediction').count().show()
TN = predictions_df.filter('prediction = 0 AND label = prediction').count()
TP = predictions_df.filter('prediction = 1 AND label = prediction').count()
FN = predictions_df.filter('prediction = 0 AND label <> prediction').count()
FP = predictions_df.filter('prediction = 1 AND label <> prediction').count()
accuracy = (TN + TP) / (TN + TP + FN + FP)
precision = TP / (TP + FP)
recall = TP / (TP + FN)
F =  2 * (precision*recall) / (precision + recall)
print("cross validation")
print('\t', precision)
print('\t', recall)
print('\t', accuracy)
print('\t', F)

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 2172|
|    0|       1.0| 4036|
|    0|       0.0|18677|
|    1|       1.0|72632|
+-----+----------+-----+

testing data
	 0.9473574372619606
	 0.9709641195657986
	 0.9363393049417025
	 0.9590155276222669


+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 4894|
|    0|       1.0| 5231|
|    0|       0.0| 8476|
|    1|       1.0|39449|
+-----+----------+-----+

cross validation
	 0.8829230080572963
	 0.8896330875222696
	 0.8255813953488372
	 0.8862653471574762


In [69]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol='label')
lr.setMaxIter(20)
model = lr.fit(df_training)


from pyspark.ml.evaluation import BinaryClassificationEvaluator

test_predictions_df = model.transform(df_training)

# Evaluate model performance
test_predictions_df.groupBy('label', 'prediction').count().show()
TN = test_predictions_df.filter('prediction = 0 AND label = prediction').count()
TP = test_predictions_df.filter('prediction = 1 AND label = prediction').count()
FN = test_predictions_df.filter('prediction = 0 AND label <> prediction').count()
FP = test_predictions_df.filter('prediction = 1 AND label <> prediction').count()
accuracy = (TN + TP) / (TN + TP + FN + FP)
precision = TP / (TP + FP)
recall = TP / (TP + FN)
F =  2 * (precision*recall) / (precision + recall)
print("testing data")
print('\t',precision)
print('\t',recall)
print('\t',accuracy)
print('\t',F)

print()
print()
# CROSS VALIDATE

predictions_df = model.transform(df_cross_validation)

# Evaluate model performance
predictions_df.groupBy('label', 'prediction').count().show()
TN = predictions_df.filter('prediction = 0 AND label = prediction').count()
TP = predictions_df.filter('prediction = 1 AND label = prediction').count()
FN = predictions_df.filter('prediction = 0 AND label <> prediction').count()
FP = predictions_df.filter('prediction = 1 AND label <> prediction').count()
accuracy = (TN + TP) / (TN + TP + FN + FP)
precision = TP / (TP + FP)
recall = TP / (TP + FN)
F =  2 * (precision*recall) / (precision + recall)
print("cross validation")
print('\t', precision)
print('\t', recall)
print('\t', accuracy)
print('\t', F)

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 1901|
|    0|       1.0| 3380|
|    0|       0.0|19333|
|    1|       1.0|72903|
+-----+----------+-----+

testing data
	 0.9556913073686142
	 0.9745869204855355
	 0.945845339786909
	 0.9650466287635601


+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 5540|
|    0|       1.0| 5594|
|    0|       0.0| 8113|
|    1|       1.0|38803|
+-----+----------+-----+

cross validation
	 0.8740004955289772
	 0.8750648354869991
	 0.8081998277347114
	 0.8745323416723011


### Testing effectiveness of various models
To improve our model we relied on 2 important key factors:
1. number of max iterations of training: the larger it is the better it fits the data but the risk is overfitting
2. value of regularization parameter: to allow make the model simpler and lower risk of overfitting. However too high value may yield to underfitting since model may be too simple




In [None]:
import numpy as np
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
regParams = np.arange(0, .1, .01)
elasticNetParams = [0,1]
maxIters = [10]

results = []

for rp in regParams:
  for enp in elasticNetParams:
    for mi in maxIters:

      lr = LogisticRegression(labelCol='label')
      lr.setMaxIter(mi)
      lr.setRegParam(rp)
      lr.setElasticNetParam(enp)
      model = lr.fit(df_training)




      predictions_df = model.transform(df_cross_validation)

      # Evaluate model performance
      evaluator = BinaryClassificationEvaluator()
      predictions_df.groupBy('label', 'prediction').count().show()
      TN = predictions_df.filter('prediction = 0 AND label = prediction').count()
      TP = predictions_df.filter('prediction = 1 AND label = prediction').count()
      FN = predictions_df.filter('prediction = 0 AND label <> prediction').count()
      FP = predictions_df.filter('prediction = 1 AND label <> prediction').count()
      accuracy = (TN + TP) / (TN + TP + FN + FP)
      precision = TP / (TP + FP)
      recall = TP / (TP + FN)
      F =  2 * (precision*recall) / (precision + recall)
      
      key = [mi, rp , enp]
      value = [precision, recall, accuracy, F]
      
      results.append([key, value])

      print("=================================================")
      print(key)
      print("\t\t",precision)
      print("\t\t",recall)
      print("\t\t",accuracy)
      print("\t\t",F)

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 5009|
|    0|       1.0| 5395|
|    0|       0.0| 8413|
|    1|       1.0|39566|
+-----+----------+-----+

[10, 0.0, 0]
		 0.8800071172794199
		 0.887627593942793
		 0.821797441035918
		 0.883800929235168
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 5009|
|    0|       1.0| 5395|
|    0|       0.0| 8413|
|    1|       1.0|39566|
+-----+----------+-----+

[10, 0.0, 1]
		 0.8800071172794199
		 0.887627593942793
		 0.821797441035918
		 0.883800929235168
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 3371|
|    0|       1.0| 6153|
|    0|       0.0| 7655|
|    1|       1.0|41204|
+-----+----------+-----+

[10, 0.01, 0]
		 0.8700720062503959
		 0.9243746494671902
		 0.8368703218402617
		 0.8964016882043249
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|  652|

In [None]:
for key, value in results:
  mi, rp, enp = key
  precision, recall, accuracy, fscore = value

  print(f"regParam: {rp} elasticNetValue: {enp} maxIteration: {mi}")
  print(f"\tprecision: {precision}")
  print(f"\trecall: {recall}")
  print(f"\taccuracy: {accuracy}")
  print(f"\tfscore: {fscore}")
  print()

regParam: 0.0 elasticNetValue: 0 maxIteration: 10
	precision: 0.8800071172794199
	recall: 0.887627593942793
	accuracy: 0.821797441035918
	fscore: 0.883800929235168

regParam: 0.0 elasticNetValue: 1 maxIteration: 10
	precision: 0.8800071172794199
	recall: 0.887627593942793
	accuracy: 0.821797441035918
	fscore: 0.883800929235168

regParam: 0.01 elasticNetValue: 0 maxIteration: 10
	precision: 0.8700720062503959
	recall: 0.9243746494671902
	accuracy: 0.8368703218402617
	fscore: 0.8964016882043249

regParam: 0.01 elasticNetValue: 1 maxIteration: 10
	precision: 0.8005066613206
	recall: 0.9853729669097028
	accuracy: 0.8013462823082061
	fscore: 0.8833715457946181

regParam: 0.02 elasticNetValue: 0 maxIteration: 10
	precision: 0.8620817690498783
	recall: 0.9375659001682557
	accuracy: 0.8378123768905332
	fscore: 0.8982407875081942

regParam: 0.02 elasticNetValue: 1 maxIteration: 10
	precision: 0.7756684538675143
	recall: 0.9963881099270891
	accuracy: 0.7772296730212562
	fscore: 0.872282341850462