In [None]:
#This was a Master of Management Analytics assignment so I can't link the exact dataset, but a near identical one can be found here: https://www.kaggle.com/code/benroshan/sentiment-analysis-amazon-reviews/data


# Load in one of the tables
df1 = spark.sql("select * from default.video_games_5")
df2 = spark.sql("select * from default.books_5_small")
df3 = spark.sql("select * from default.home_and_kitchen_5_small")
df = df1.union(df2).union(df3)
df = df.na.drop(subset=["reviewText", "label"])
# df= df.sample(False, 0.15, seed=47)
# df = df.dropna()
# rebalance dataset via downsampling
df_helpful = df.filter(df.label == 1)
df_unhelpful = df.filter(df.label == 0)

imbalance = df_helpful.count() / df_unhelpful.count()

df_unhelpful = df_unhelpful.sample(False, imbalance, seed=47)
df = df_unhelpful.union(df_helpful)

print((df.count(), len(df.columns)))

In [None]:
df.printSchema()

In [None]:
import datetime
from pyspark.sql.functions import from_utc_timestamp, from_unixtime, year, month, dayofmonth
from pyspark.sql.types import StringType

df = df.withColumn("date", from_unixtime(df['unixReviewTime']).cast(StringType()))

df = df.withColumn("year", year(df["date"].cast(StringType())))
df = df.withColumn("month", month(df["date"].cast(StringType())))

In [None]:
from transformers import pipeline
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import udf
from textblob import TextBlob
from textblob.sentiments import NaiveBayesAnalyzer
from pyspark.sql.functions import StringType
from pyspark.sql.types import DoubleType
import pyspark.sql.functions as func

from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, VectorAssembler, SQLTransformer, OneHotEncoder
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, NaiveBayes
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import *
from pyspark.sql.types import *
from textblob import TextBlob
from textblob.sentiments import NaiveBayesAnalyzer


In [None]:
def polarity_detection(text):
    return TextBlob(text).sentiment.polarity
def subjectivity_detection(text):
    return TextBlob(text).sentiment.subjectivity
def text_classification(words):
    # polarity detection
    polarity_detection_udf = udf(polarity_detection, DoubleType())
    words = words.withColumn("polarity", polarity_detection_udf("reviewText"))
    # subjectivity detection
    subjectivity_detection_udf = udf(subjectivity_detection, DoubleType())
    words = words.withColumn("subjectivity", subjectivity_detection_udf("reviewText"))
    return words


In [None]:
def data_preprocessing(df_orig):
  """
  UDF to handle the E2E pre-processing before model training
  """
  to_drop = ['reviewID', 'reviewerName','reviewTime','summary']
  # Condition to apply pre-processing to Kaggle test data
  if 'label' not in df_orig.columns:
    to_drop.remove('reviewID')
    
  df = df_orig.select([column for column in df_orig.columns if column not in to_drop])
  
  ## Add in text polarity & subjectivity  
  
  ## Retrieve review years and months
  df = df.withColumn('year', year(from_unixtime(col('unixReviewTime'))))
  df = df.withColumn('month', month(from_unixtime(col('unixReviewTime'))))

  ####### Begin new FE attempt iterations ################
  initial_df_columns = df.columns
  
  # Compute number of reviews by Reviewer
  to_join = df.groupBy('reviewerID').count()
  df = df.join(to_join, ['reviewerID'], 'left').select(col('count').alias('reviewerNumReviews'),*initial_df_columns).drop('reviewerID')
  initial_df_columns.remove('reviewerID')
  #df = df.withColumn
  #df = df.groupBy('reviewerID').count().select(col('count').alias('reviewerNumReviews'), *initial_df_columns)
  
  # Compute time since first product review and rating deviation from product average
  to_join = df.groupBy('asin').agg(min('unixReviewTime').alias('firstProductReview'),mean('overall').alias('meanScore'))
  df = df.join(to_join, ['asin'], 'left').select('firstProductReview', 'meanScore', 'reviewerNumReviews', *initial_df_columns)
  df = df.withColumn('sinceFirstReview', round(months_between(to_date(from_unixtime(col('unixReviewTime'))),to_date(from_unixtime(col('firstProductReview')))),2))\
  .withColumn('meanScoreDev', col('overall') - col('meanScore')).drop('firstProductReview','meanScore','asin','unixReviewTime')
  
  # Compute 
  #to_join = df.groupBy('asin').agg(mean('overall').alias('meanScore'))
  #df = df.join(to_join, ['asin'], 'left').select('meanScore', *initial_df_columns)
  #df = df.withColumn('meanScoreDev', col('overall') - col('meanScore')).drop('meanScore','asin')
  
  return df

In [None]:
df = data_preprocessing(df)

In [None]:
# set seed for reproducibility
# (trainingData, testingData) = df.randomSplit([0.8, 0.2], seed = 45)
(trainingData, testingData) = df.randomSplit([0.8, 0.2], seed = 47)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testingData.count()))

In [None]:
# Data Preprocessing

from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, OneHotEncoder, VectorAssembler

# We'll tokenize the text using a simple RegexTokenizer
regexTokenizer = RegexTokenizer(inputCol="reviewText", outputCol="words", pattern="\\W")


# Remove standard Stopwords
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered")


# Vectorize the sentences using simple BOW method. Other methods are possible:
# https://spark.apache.org/docs/2.2.0/ml-features.html#feature-extractors
countVectors = CountVectorizer(inputCol="filtered", outputCol="countVector_output", vocabSize=10000, minDF=5)

# one hot encoding for year and month
year_encoder = OneHotEncoder(inputCol='year', outputCol='yearVec')
month_encoder = OneHotEncoder(inputCol='month', outputCol='monthVec')


# vector assembler
# assembler = VectorAssembler(inputCols=['countVector_output', 'yearVec', 'monthVec', 'polarity', 'subjectivity'],
#                             outputCol='features')

assembler = VectorAssembler(inputCols=['countVector_output', 'yearVec', 'monthVec', 'sinceFirstReview'],
                            outputCol='features')


pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, year_encoder, month_encoder, assembler])


In [None]:
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(trainingData)
trainingDataTransformed = pipelineFit.transform(trainingData)
trainingDataTransformed.show(5)

In [None]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingDataTransformed)

In [None]:
# Extract the summary from the returned LogisticRegressionModel instance trained
# in the earlier example
trainingSummary = lrModel.summary


print("Training Accuracy:  " + str(trainingSummary.accuracy))
print("Training Precision: " + str(trainingSummary.precisionByLabel))
print("Training Recall:    " + str(trainingSummary.recallByLabel))
print("Training FMeasure:  " + str(trainingSummary.fMeasureByLabel()))
print("Training AUC:       " + str(trainingSummary.areaUnderROC))

In [None]:
testingDataTransform = pipelineFit.transform(testingData)
# testingDataTransform.show(5)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator


predictions = lrModel.transform(testingDataTransform)
predictions.show(5)

evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
print('Test Area Under ROC', evaluator.evaluate(predictions))

In [None]:
# Load in the tables
holdout_df = spark.sql("select * from default.reviews_holdout")
holdout_df.show(5)
print((holdout_df.count(), len(holdout_df.columns)))

In [None]:
#holdout_preprocessing
import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import *
# holdout_df = holdout_df.withColumn("date", from_unixtime(holdout_df['unixReviewTime']).cast(StringType()))
# holdout_df = holdout_df.withColumn("year", year(holdout_df["date"].cast(StringType())))
# holdout_df = holdout_df.withColumn("month", month(holdout_df["date"].cast(StringType())))
holdout_df = data_preprocessing(holdout_df)

# holdout_df = holdout_df.withColumn("overall", f.col('overall')/5)
# holdout_df = holdout_df.withColumn('verified', when(holdout_df.verified == "true", 1).otherwise(0))
# holdout_df = text_classification(holdout_df)

In [None]:
holdout_df.show(5)

In [None]:
holdout_df_Transform = pipelineFit.transform(holdout_df)
holdout_df_Transform.show(5)

In [None]:
predictions = lrModel.transform(holdout_df_Transform)

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

probelement=udf(lambda v:float(v[1]),FloatType())
submission_data = predictions.select('reviewID', probelement('probability')).withColumnRenamed('<lambda>(probability)', 'label')

In [None]:
display(submission_data.select('reviewID', 'label'))

reviewID,label
67000164,0.73149127
67000321,0.90904415
67000350,0.6993052
67000615,0.69776225
67000711,0.36432266
67001316,0.27518848
67001360,0.26817036
67001606,0.34211093
67001796,0.8867972
67002455,0.4876544


# Build Gradient Boosting Model (suprisingly performed worse; hence commented out)

In [None]:
# from pyspark.ml.classification import GBTClassifier
# ## Fitting the model
# gbt = GBTClassifier(maxIter=10)
# gbtModel = gbt.fit(trainingDataTransformed)



In [None]:
# gbtPreds = gbtModel.transform(testingDataTransform)


In [None]:
# from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# ## Evaluating the model
# evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
# gb_accuracy = evaluator.evaluate(gbtPreds)
# print("Accuracy of GBT is = %g"% (gb_accuracy))


In [None]:
# evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
# print('Test Area Under ROC', evaluator.evaluate(gbtPreds))

In [None]:
# gbt_test_df_Transform = pipelineFit.transform(test_df)

In [None]:
# gbt_predictions = gbtModel.transform(test_df_Transform)

In [None]:
# from pyspark.sql.functions import udf
# from pyspark.sql.types import FloatType

# probelement=udf(lambda v:float(v[1]),FloatType())
# submission_data = gbt_predictions.select('reviewID', probelement('probability')).withColumnRenamed('<lambda>(probability)', 'label')

In [None]:
# display(submission_data.select('reviewID', 'label'))