# ----------------------AMAZON FINE FOOD REVIEW DATASET-----------------------
### PREDICTING RATING FROM USER REVIEWS(SUMMARY AND TEXT)

1. Reading the Data and taking only necessary columns -- score, summary and text

In [1]:
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.ml.classification.{ RandomForestClassifier, LogisticRegression}
var reader=spark.read
reader.option("inferSchema",true).option("header",true).option("delimiter",",")
var data=reader.csv("./reviews_5000.csv")
var reviews=data.select("score","summary","text").na.drop()
reviews.show(5)

+-----+--------------------+--------------------+
|score|             summary|                text|
+-----+--------------------+--------------------+
|    5|Good Quality Dog ...|I have bought sev...|
|    1|   Not as Advertised|"Product arrived ...|
|    4|"""Delight"" says...|"This is a confec...|
|    2|      Cough Medicine|If you are lookin...|
|    5|         Great taffy|Great taffy at a ...|
+-----+--------------------+--------------------+
only showing top 5 rows



2. Encoding score to Positive or negative based on value of each sample

In [2]:
import org.apache.spark.sql.functions._
var scores = reviews.select("score")
val scoreReviews = reviews.withColumn("score", when(col("score") >=3, "Positive").otherwise("Negative"))
scoreReviews.show(5)

+--------+--------------------+--------------------+
|   score|             summary|                text|
+--------+--------------------+--------------------+
|Positive|Good Quality Dog ...|I have bought sev...|
|Negative|   Not as Advertised|"Product arrived ...|
|Positive|"""Delight"" says...|"This is a confec...|
|Negative|      Cough Medicine|If you are lookin...|
|Positive|         Great taffy|Great taffy at a ...|
+--------+--------------------+--------------------+
only showing top 5 rows



#------------------DATA PREPROCESSING----------------#

3. Tokenization - breaking the text and summary columns into individual words. It also removes punctuations.

In [3]:
import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
val scoreReviewsLowered = scoreReviews.withColumn("summary", lower(col("summary"))).withColumn("text", lower(col("text")))
val textTokenized = new RegexTokenizer().setInputCol("text").setOutputCol("textTokens").setPattern("\\w+").setGaps(false).transform(scoreReviewsLowered)
val tokenizedData = new RegexTokenizer().setInputCol("summary").setOutputCol("summaryTokens").setPattern("\\w+").setGaps(false).transform(textTokenized)
val countTokens = udf { (words: Seq[String]) => words.length }
tokenizedData.select("summary","summaryTokens","text","textTokens").withColumn("tokensSummary", countTokens(col("summaryTokens"))).withColumn("tokensText", countTokens(col("textTokens"))).show(5)

+--------------------+--------------------+--------------------+--------------------+-------------+----------+
|             summary|       summaryTokens|                text|          textTokens|tokensSummary|tokensText|
+--------------------+--------------------+--------------------+--------------------+-------------+----------+
|good quality dog ...|[good, quality, d...|i have bought sev...|[i, have, bought,...|            4|        48|
|   not as advertised|[not, as, adverti...|"product arrived ...|[product, arrived...|            3|        32|
|"""delight"" says...|[delight, says, i...|"this is a confec...|[this, is, a, con...|            4|        71|
|      cough medicine|   [cough, medicine]|if you are lookin...|[if, you, are, lo...|            2|        41|
|         great taffy|      [great, taffy]|great taffy at a ...|[great, taffy, at...|            2|        27|
+--------------------+--------------------+--------------------+--------------------+-------------+----------+
o

4. Stop words removal - This step removes all the unneccessary words which do not help in prediction.

In [4]:
import org.apache.spark.ml.feature.StopWordsRemover
val filteredTextData = new StopWordsRemover().setInputCol("textTokens").setOutputCol("filteredText").transform(tokenizedData)
val filteredData = new StopWordsRemover().setInputCol("summaryTokens").setOutputCol("filteredSummary").transform(filteredTextData)
filteredData.show(5)

+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|   score|             summary|                text|          textTokens|       summaryTokens|        filteredText|     filteredSummary|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Positive|good quality dog ...|i have bought sev...|[i, have, bought,...|[good, quality, d...|[bought, several,...|[good, quality, d...|
|Negative|   not as advertised|"product arrived ...|[product, arrived...|[not, as, adverti...|[product, arrived...|        [advertised]|
|Positive|"""delight"" says...|"this is a confec...|[this, is, a, con...|[delight, says, i...|[confection, arou...|     [delight, says]|
|Negative|      cough medicine|if you are lookin...|[if, you, are, lo...|   [cough, medicine]|[looking, secret,...|   [cough, medicine]|
|Positive|         great taffy|great taff

5. It extracts the vocabulary and converts into vectors, assigns numeric values to unique words and counts their occurrences

In [5]:
import org.apache.spark.ml.feature.{CountVectorizer}
val vectorizedText = new CountVectorizer().setInputCol("filteredText").setOutputCol("vectorText").fit(filteredData).transform(filteredData)
val vectorizedData = new CountVectorizer().setInputCol("filteredSummary").setOutputCol("vectorSummary").fit(vectorizedText).transform(vectorizedText)
vectorizedData.show(10)

+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|   score|             summary|                text|          textTokens|       summaryTokens|        filteredText|     filteredSummary|          vectorText|       vectorSummary|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Positive|good quality dog ...|i have bought sev...|[i, have, bought,...|[good, quality, d...|[bought, several,...|[good, quality, d...|(12442,[1,2,5,11,...|(3048,[1,10,11,39...|
|Negative|   not as advertised|"product arrived ...|[product, arrived...|[not, as, adverti...|[product, arrived...|        [advertised]|(12442,[5,114,140...|  (3048,[720],[1.0])|
|Positive|"""delight"" says...|"this is a confec...|[this, is, a, con...|[delight, says, i...|[confection

In [None]:
6. Using StringIndexer the categorical data score is coverted to numerical.

In [19]:
import org.apache.spark.ml.feature.{StringIndexer}
val indexedData = new StringIndexer().setInputCol("score").setOutputCol("scoreIndex").fit(vectorizedData).transform(vectorizedData)
indexedData.show(5)

+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|   score|             summary|                text|          textTokens|       summaryTokens|        filteredText|     filteredSummary|          vectorText|       vectorSummary|scoreIndex|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|Positive|good quality dog ...|i have bought sev...|[i, have, bought,...|[good, quality, d...|[bought, several,...|[good, quality, d...|(12442,[1,2,5,11,...|(3048,[1,10,11,39...|       0.0|
|Negative|   not as advertised|"product arrived ...|[product, arrived...|[not, as, adverti...|[product, arrived...|        [advertised]|(12442,[5,114,140...|  (3048,[720],[1.0])|       1.0|
|Positive|"""delight"" says...|"this is a confec..

In [20]:
import org.apache.spark.ml.feature.RFormula
val rFormula = new RFormula().setFormula("scoreIndex ~ vectorSummary") 
val preparedData = rFormula.fit(indexedData).transform(indexedData)

In [21]:
val Array(train, test) = preparedData.randomSplit(Array(0.8,0.2)) 

In [22]:
import org.apache.spark.ml.classification.LogisticRegression
val logisticRegression = new LogisticRegression()
val logisticRegressionModel = logisticRegression.fit(train)
val predictionsLogistic = logisticRegressionModel.transform(test)

In [23]:
val labelPositives = predictionsLogistic.where(expr("label == 1.0"))
val labelNegatives = predictionsLogistic.where(expr("label == 0.0"))
val falseNegatives = labelPositives.where(expr("label != prediction")).count()
val falsePositives = labelNegatives.where(expr("label != prediction")).count()
val trueNegatives  = labelNegatives.where(expr("label == prediction")).count()
val truePositives  = labelPositives.where(expr("label == prediction")).count()
println(" Precision = " + truePositives.toFloat/(falsePositives + truePositives))
println(" Recall = " + truePositives.toFloat/(falseNegatives + truePositives))
println(" Accuracy= " + (truePositives+trueNegatives).toFloat/(falsePositives + trueNegatives+falseNegatives + truePositives))

 Precision = 0.4642857
 Recall = 0.5693431
 Accuracy= 0.8470226


In [25]:
import org.apache.spark.ml.classification.RandomForestClassifier
val randomForest = new RandomForestClassifier().setImpurity("gini").setMaxDepth(3).setNumTrees(10).setFeatureSubsetStrategy("auto").setSeed(5043)
val randomForestModel = randomForest.fit(train)
val predictionsRandomForest = model.transform(test)

In [26]:
val labelPositives = predictionsRandomForest.where(expr("label == 1.0"))
val labelNegatives = predictionsRandomForest.where(expr("label == 0.0"))
val falseNegatives = labelPositives.where(expr("label != prediction")).count()
val falsePositives = labelNegatives.where(expr("label != prediction")).count()
val trueNegatives  = labelNegatives.where(expr("label == prediction")).count()
val truePositives  = labelPositives.where(expr("label == prediction")).count()
println(" Precision = " + truePositives.toFloat/(falsePositives + truePositives))
println(" Recall = " + truePositives.toFloat/(falseNegatives + truePositives))
println(" Accuracy= " + (truePositives+trueNegatives).toFloat/(falsePositives + trueNegatives+falseNegatives + truePositives))

 Precision = 0.8375
 Recall = 0.4890511
 Accuracy= 0.9147844


In [27]:
import org.apache.spark.ml.classification.NaiveBayes
val naiveBayesModel = new NaiveBayes().fit(train)
val predictionsNaiveBayes = naiveBayesModel.transform(test)

In [28]:
val labelPositives = predictionsNaiveBayes.where(expr("label == 1.0"))
val labelNegatives = predictionsNaiveBayes.where(expr("label == 0.0"))
val falseNegatives = labelPositives.where(expr("label != prediction")).count()
val falsePositives = labelNegatives.where(expr("label != prediction")).count()
val trueNegatives  = labelNegatives.where(expr("label == prediction")).count()
val truePositives  = labelPositives.where(expr("label == prediction")).count()
println(" Precision = " + truePositives.toFloat/(falsePositives + truePositives))
println(" Recall = " + truePositives.toFloat/(falseNegatives + truePositives))
println(" Accuracy= " + (truePositives+trueNegatives).toFloat/(falsePositives + trueNegatives+falseNegatives + truePositives))

 Precision = 0.56666666
 Recall = 0.37226278
 Accuracy= 0.8716633


In [30]:
import org.apache.spark.ml.feature.RFormula
val formula2 = new RFormula().setFormula("scoreIndex ~ vectorSummary + vectorText") 
val preparedDF = formula2.fit(indexedData).transform(indexedData)

In [31]:
val Array(train, test) = preparedDF.randomSplit(Array(0.8,0.2)) 

In [32]:
import org.apache.spark.ml.classification.LogisticRegression
val logisticRegression = new LogisticRegression()
val logisticRegressionModel = logisticRegression.fit(train)
val predictionsLogistic = logisticRegressionModel.transform(test)

In [33]:
val labelPositives = predictionsLogistic.where(expr("label == 1.0"))
val labelNegatives = predictionsLogistic.where(expr("label == 0.0"))
val falseNegatives = labelPositives.where(expr("label != prediction")).count()
val falsePositives = labelNegatives.where(expr("label != prediction")).count()
val trueNegatives  = labelNegatives.where(expr("label == prediction")).count()
val truePositives  = labelPositives.where(expr("label == prediction")).count()
println(" Precision = " + truePositives.toFloat/(falsePositives + truePositives))
println(" Recall = " + truePositives.toFloat/(falseNegatives + truePositives))
println(" Accuracy= " + (truePositives+trueNegatives).toFloat/(falsePositives + trueNegatives+falseNegatives + truePositives))

 Precision = 0.6694915
 Recall = 0.47878787
 Accuracy= 0.879692
