# Predicting Product Ratings using Customer Reviews (Databricks)

In [None]:
# start the timer
import time
start_time = time.time()

### Amazon Reviews Dataset Use Case


Source of the data: J. McAuley and J. Leskovec. From amateurs to connoisseurs: modeling the evolution of user expertise through online reviews. WWW, 2013.

#### Loading the DataFrame


In [None]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
import pandas as pd
import numpy as np
from pyspark.sql.functions import approx_count_distinct, avg, col, date_format, to_date
from pyspark.ml import Pipeline
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, split

textDF = spark.read.csv("/mnt/training/reviews/reviews.csv", inferSchema = True, header=True, escape='"')
display(textDF.limit(1000))

### Take A Random Sample


In [None]:
# check numbers of rows and columns

print((textDF.count(), len(textDF.columns)))

In [None]:
#Randomly sample 1% of the data without replacement

sample1 = textDF.sample(False, 0.01, seed=0)

sample1 = sample1.select("Id", "ProductId", "Score", "Summary", "Text")

sample1.cache()

print((sample1.count(), len(sample1.columns)))

In [None]:
display(sample1)


### Text Cleaning Proces


In [None]:
# Download required packages in NLTK

import nltk
nltk.download('punkt')
nltk.download('stopwords')

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

# wrap nltk nlp tokenization and stop word removal in pandas UDF
def clean_text(Text):
  text = str(Text)
  print('text = ', Text)

  # split into words
  from nltk.tokenize import word_tokenize
  tokens = word_tokenize(text)

  # convert to lower case
  tokens = [w.lower() for w in tokens]

  # remove punctuation from each word
  import string
  table = str.maketrans('', '', string.punctuation)
  stripped = [w.translate(table) for w in tokens]

  # remove remaining tokens that are not alphabetic
  words = [word for word in stripped if word.isalpha()]

  # filter out stop words
  from nltk.corpus import stopwords
  stop_words = set(stopwords.words('english'))
  words = [w for w in words if not w in stop_words]

  # stemming of words
  from nltk.stem.porter import PorterStemmer
  porter = PorterStemmer()
  stemmed = [porter.stem(word) for word in words]
  stemmed_minchars = [x for x in stemmed if len(x) > 4]
  return(",".join(stemmed_minchars))

import pandas as pd
@pandas_udf('string', PandasUDFType.SCALAR)
def clean_text_pandas(v: pd.Series) -> pd.Series: #is a udf
    nltk.download('punkt')
    nltk.download('stopwords')
    return v.map(lambda x: clean_text(x))


In [None]:
clean_text = sample1.withColumn("clean_text", clean_text_pandas(col("Text"))).withColumn("textSWRemoved", split(col("clean_text"), ',')).drop("clean_text")
display(clean_text)

In [None]:
from pyspark.ml.feature import CountVectorizer

# Set params for CountVectorizer

vectorizer = CountVectorizer() \
  .setInputCol("textSWRemoved")     \
  .setOutputCol("features")    \
  .setVocabSize(500)         \
  .setMinDF(3)                 \
  .fit(clean_text)

In [None]:
count_vectors = vectorizer.transform(clean_text).select("ProductId", "features", "Score")

display(count_vectors)

### TF-IDF

Feature Transformations

In [None]:
from pyspark.ml.feature import Tokenizer,StopWordsRemover, CountVectorizer,IDF,StringIndexer


idf = IDF(inputCol="features", outputCol="tf_idf")

cleaner = idf.fit(count_vectors)
clean_data = cleaner.transform(count_vectors)

### Split Traning and Testing data


In [None]:
(training,testing) = clean_data.randomSplit([0.7,0.3], seed=100)


### Creat a Random Forest Classifier


In [None]:
from pyspark.ml.classification import RandomForestClassifier
nb = RandomForestClassifier(numTrees=3, maxDepth=2, featuresCol="tf_idf", labelCol="Score", seed=100)
amazon_predictor = nb.fit(training)

### Save and Load the Model


In [None]:
from pyspark.ml.classification import RandomForestClassificationModel

fileName = "/tmp/HW2Q2_TFIDF"
amazon_predictor.write().overwrite().save(fileName)
sameModel = RandomForestClassificationModel.load(fileName)

### Evaluation on Testing Data


In [None]:
test_results = amazon_predictor.transform(testing)
display(test_results)


In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="Score", predictionCol="prediction", metricName="weightedFMeasure")
FMeasure = evaluator.evaluate(test_results)
print("Weighted F-measure of model at predicting spam was: {}".format(FMeasure))

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="Score", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(test_results)
print("Accuracy of model at predicting user rating was: {}".format(accuracy))

### Word2Vec


In [None]:
from pyspark.ml.feature import Word2Vec

word2vec = Word2Vec(vectorSize=20, minCount=2, inputCol="textSWRemoved", outputCol="w2v")

cleaner = word2vec.fit(clean_text)
clean_data = cleaner.transform(clean_text)
(training,testing) = clean_data.randomSplit([0.7,0.3], seed=100)

In [None]:
from pyspark.ml.classification import RandomForestClassifier
nb = RandomForestClassifier(numTrees=3, maxDepth=2, featuresCol="w2v", labelCol="Score", seed=100)
amazon_predictor = nb.fit(training)

test_results = amazon_predictor.transform(testing)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="Score", predictionCol="prediction", metricName="weightedFMeasure")
FMeasure = evaluator.evaluate(test_results)
print("Weighted F-measure of model at predicting spam was: {}".format(FMeasure))

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="Score", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(test_results)
print("Accuracy of model at predicting user rating was: {}".format(accuracy))

### LDA Model

In [None]:
from pyspark.ml.clustering import LDA

# Trains a LDA model.

lda = LDA(k=10, maxIter=10, featuresCol='features')
model = lda.fit(count_vectors)

transformed = model.transform(count_vectors)

clean_data = transformed.select(['Score', 'topicDistribution'])

(training,testing) = clean_data.randomSplit([0.7,0.3])

In [None]:
nb = RandomForestClassifier(numTrees=3, maxDepth=2, featuresCol="topicDistribution", labelCol="Score", seed=100)
amazon_predictor = nb.fit(training)

test_results = amazon_predictor.transform(testing)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="Score", predictionCol="prediction", metricName="weightedFMeasure")
FMeasure = evaluator.evaluate(test_results)
print("Weighted F-measure of model at predicting spam was: {}".format(FMeasure))
Weighted F-measure of model at predicting spam was: 0.513520826020826
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="Score", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(test_results)
print("Accuracy of model at predicting user rating was: {}".format(accuracy))

# Model Evaluation

### LDA

* F Measure: 0.513520826020826

* Accuracy Score: 0.6511056511056511



### TF-IDF

* F measure: 0.49448213840921434

* Accuracy Score: 0.6359906213364596



### Word2Vec

* F measure: 0.5077503769703561

* Accuracy Score: 0.6465416178194607