In [None]:
#Using Machine Learning to Predict Amazon Reviews
Data retrieved from: https://jmcauley.ucsd.edu/data/amazon/

# 1. Data Preprocessing

In [None]:
%pip install nltk
%pip install mlflow
%pip install yellowbrick

In [None]:
# "dbfs:/FileStore/shared_uploads/braden.thompson1@ucalgary.ca/Appliances_5-1.json"
df = spark.read.format("json").load("")
#df = df.sample(0.005)

In [None]:
cols_to_drop = ('asin', 'image', 'reviewTime', 'reviewerID', 'reviewerName', 'style', 'summary', 'unixReviewTime', 'verified', 'vote')
df = df.drop(*cols_to_drop)
df = df.dropna()
df.show(10)

# Converting Ratings to Sentiment

If a review is 4 or greater, it will recieve a score of 1. Otherwise, it will recieve a score of 0.

In [None]:
from pyspark.sql.types import *

def ratingToSentiment(rating):
    if rating < 4:
        return 0
    return 1

rating = udf(lambda x: ratingToSentiment(x))
label = df.select('overall', 'reviewText', rating('overall'))
sentimentLabels = label.withColumnRenamed('(overall)', 'sentiment')
sentimentLabels.show(10)

# Simplify Ratings

This will change the ratings to lower case and remove the punctuation from it.

In [None]:
from pyspark.sql.functions import lower, regexp_replace, trim
from pyspark.sql.functions import col

def simplifyRating(rating):
    result = lower(rating)
    return trim(regexp_replace(result,'\p{Punct}',''))

simplifyRatingDf = label.select(label.overall, simplifyRating(label.reviewText).alias('simplifiedText'), label.sentiment)
simplifyRatingDf.show(10)

# Send Text to Vector

This will send the text to a vector to allow it to more easily be viewed.

In [None]:
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol='simplifiedText', outputCol='vectorizedText')
vectorizedDf = tokenizer.transform(simplifyRatingDf).select('overall', 'vectorizedText', 'sentiment')


vectorizedDf.show(10)

# Remove the Stop Words

In [None]:
from pyspark.ml.feature import StopWordsRemover
remover = StopWordsRemover(inputCol='vectorizedText', outputCol='vectorizedTextWithoutStopWords')
vectorizedTextWithoutStopWordsDf = remover.transform(vectorizedDf).select('overall', 'vectorizedTextWithoutStopWords', 'sentiment')
vectorizedTextWithoutStopWordsDf.show(10)

# Stem Text

In [None]:
from nltk.stem.porter import *

stemmer = PorterStemmer()
def stemText(text):
    result = []
    for word in text:
        stemmedText = stemmer.stem(word)
        result.append(stemmedText)
    return result

stemmerUdf = udf(lambda x: stemText(x), ArrayType(StringType()))

stemmedDf = (vectorizedTextWithoutStopWordsDf.withColumn('stemmedText', stemmerUdf('vectorizedTextWithoutStopWords')).select('overall', 'stemmedText', 'sentiment'))
stemmedDf.show(10)
     

# Create the IDF Model

This will create the IDF and get the input ready to be read by rest of the code.

In [None]:
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF

cv = CountVectorizer(inputCol='stemmedText', outputCol='countVectorized')
cvModel = cv.fit(stemmedDf)
cvTransformedDf = cvModel.transform(stemmedDf)

idf = IDF()
idf.setInputCol('countVectorized')
idf.setOutputCol('idf')
tfIdfModel = idf.fit(cv_transformed_df)
tfIdfDf = tfIdfModel.transform(cv_transformed_df)
tfIdfColumnDf = tfIdfDf.withColumn("sentiment",col("sentiment").cast("int"))
tfIdfColumnDf.show(10)

# 2.Rating Analysis & Prediction

In [None]:
data = tfIdfColumnDf.select('idf', 'overall').withColumnRenamed("idf", "features").withColumnRenamed("overall", "label")
(train, test) = data.randomSplit([0.80, 0.20], seed=69)

# Prepare Evaluation

In [None]:
#Set up performance evaluators
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

accuracyEvaluator = MulticlassClassificationEvaluator(metricName="accuracy")
f1Evaluator = MulticlassClassificationEvaluator(metricName="f1")
recallEvaluator = MulticlassClassificationEvaluator(metricName="weightedRecall")
precisionEvaluator = MulticlassClassificationEvaluator(metricName="weightedPrecision")

accuracyResults = {}
f1Results = {}
recallResults = {}
precisionResults = {}

In [None]:
#Set up parameters
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, LinearSVC, NaiveBayes

lr = LogisticRegression(featuresCol="features", labelCol="label", regParam=1.0)
dt = DecisionTreeClassifier(featuresCol="features", labelCol="label")
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=10)
nb = NaiveBayes(featuresCol="features", labelCol="label", smoothing=1.0, modelType="multinomial")

lrparamGrid = (ParamGridBuilder().addGrid(lr.regParam, [0.01, 0.5, 2.0]).addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]).build())

dtparamGrid = (ParamGridBuilder().addGrid(dt.maxDepth, [2, 5, 10, 20, 30]).addGrid(dt.maxBins, [10, 20, 40, 80, 100]).build())

rfparamGrid = (ParamGridBuilder().addGrid(rf.maxDepth, [2, 5, 10, 20, 30]).addGrid(rf.maxBins, [10, 20, 40, 80, 100]).addGrid(rf.numTrees, [5, 20, 50, 100]).build())

nbparamGrid = (ParamGridBuilder().addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0]).build())

In [None]:
#Set up cross validation
lrcv = TrainValidationSplit(estimator = lr, estimatorParamMaps = lrparamGrid, evaluator = accuracyEvaluator, trainRatio = 0.8)
dtcv = TrainValidationSplit(estimator = dt,estimatorParamMaps = dtparamGrid, evaluator = accuracyEvaluator, trainRatio = 0.8)
rfcv = TrainValidationSplit(estimator = rf, estimatorParamMaps = rfparamGrid, evaluator = accuracyEvaluator, trainRatio = 0.8)
nbcv = TrainValidationSplit(estimator = nb, estimatorParamMaps = nbparamGrid, evaluator = accuracyEvaluator, trainRatio = 0.8)

# Train and Test the Model

Logistic Regression

In [None]:
from sklearn import metrics

lrModel = lrcv.fit(train)
lrPredictionDf = lrModel.transform(test)

accuracyResults['Logistic Regression'] = accuracyEvaluator.evaluate(lrPredictionDf)
f1Results['Logistic Regression'] = f1Evaluator.evaluate(lrPredictionDf)
recallResults['Logistic Regression'] = recallEvaluator.evaluate(lrPredictionDf)
precisionResults['Logistic Regression'] = precisionEvaluator.evaluate(lrPredictionDf)

Decision Tree

In [None]:
dtModel = dtcv.fit(train)
dtPredictionDf = dtModel.transform(test)

accuracyResults['Decision Tree'] = accuracyEvaluator.evaluate(dtPredictionDf)
f1Results['Decision Tree'] = f1Evaluator.evaluate(dtPredictionDf)
recallResults['Decision Tree'] = recallEvaluator.evaluate(dtPredictionDf)
precisionResults['Decision Tree'] = precisionEvaluator.evaluate(dtPredictionDf)

Random Forest

In [None]:
rfModel = rfcv.fit(train)
rfPredictionDf = rfModel.transform(test)

accuracyResults['Random Forest'] = accuracyEvaluator.evaluate(rfPredictionDf)
f1Results['Random Forest'] = f1Evaluator.evaluate(rfPredictionDf)
recallResults['Random Forest'] = recallEvaluator.evaluate(rfPredictionDf)
precisionResults['Random Forest'] = precisionEvaluator.evaluate(rfPredictionDf)

Naive Bayes

In [None]:
nbModel = nbcv.fit(train)
nbPredictionDf = nbModel.transform(test)

accuracyResults['Naive Bayes'] = accuracyEvaluator.evaluate(nbPredictionDf)
f1Results['Naive Bayes'] = f1Evaluator.evaluate(nbPredictionDf)
recallResults['Naive Bayes'] = recallEvaluator.evaluate(nbPredictionDf)
precisionResults['Naive Bayes'] = precisionEvaluator.evaluate(nbPredictionDf)

# Results

In [None]:
accuracyResultsDf = sc.parallelize([ (k, v) for k,v in accuracyResults.items()]).toDF().withColumnRenamed("_1", "Model").withColumnRenamed("_2", "Accuracy")
f1ResultsDf = sc.parallelize([ (k, v) for k,v in f1Results.items()]).toDF().withColumnRenamed("_1", "Model").withColumnRenamed("_2", "F1")
recallResultsDf = sc.parallelize([ (k, v) for k,v in recallResults.items()]).toDF().withColumnRenamed("_1", "Model").withColumnRenamed("_2", "Recall")
precisionResultsDf = sc.parallelize([(k, v) for k,v in precisionResults.items()]).toDF().withColumnRenamed("_1","Model").withColumnRenamed("_2", "Precision")

accuracyResultsDf.show(truncate=False)
f1ResultsDf.show(truncate=False)
recallResultsDf.show(truncate=False)
precisionResultsDf.show(truncate=False)

drawMetricsResults(AccuracyResultsDF, \
                   F1ResultsDF, \
                   RecallResultsDF, \
                   PrecisionResultsDF, \
                   (0, 1))