# Amazon Fine Food Reviews

https://www.kaggle.com/snap/amazon-fine-food-reviews

## Hyperparameters

In [1]:
hyperparameters = {
    'is_training': True,
}

## Load Dataset

In [4]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.ml.feature import RegexTokenizer
from pyspark.sql.functions import *

df = spark.read.csv("Reviews.csv",header=False, schema=StructType([
    StructField('Id', IntegerType(), True),
    StructField('ProductId', StringType(), True),
    StructField('UserId', StringType(), True),
    StructField('ProfileName', StringType(), True),
    StructField('HelpfulnessNumerator', IntegerType(), True),
    StructField('HelpfulnessDenominator', IntegerType(), True),
    StructField('Score', IntegerType(), True),
    StructField('Time', IntegerType(), True),
    StructField('Summary', StringType(), True),
    StructField('Text', StringType(), True)]))
df = df.select('Id', 'Text', 'Summary', 'Score').limit(10000)
df = df.filter('Id' > 0 and length('Text') > 0).filter(col('Score') > 0)

df.show()

+---+--------------------+--------------------+-----+
| Id|                Text|             Summary|Score|
+---+--------------------+--------------------+-----+
|  1|I have bought sev...|Good Quality Dog ...|    5|
|  2|"Product arrived ...|   Not as Advertised|    1|
|  3|"This is a confec...|"""Delight"" says...|    4|
|  4|If you are lookin...|      Cough Medicine|    2|
|  5|Great taffy at a ...|         Great taffy|    5|
|  6|I got a wild hair...|          Nice Taffy|    4|
|  7|This saltwater ta...|Great!  Just as g...|    5|
|  8|This taffy is so ...|Wonderful, tasty ...|    5|
|  9|Right now I'm mos...|          Yay Barley|    5|
| 10|This is a very he...|    Healthy Dog Food|    5|
| 11|I don't know if i...|The Best Hot Sauc...|    5|
| 12|One of my boys ne...|"My cats LOVE thi...|    5|
| 13|My cats have been...|My Cats Are Not F...|    1|
| 14|good flavor! thes...|   fresh and greasy!|    4|
| 15|The Strawberry Tw...|Strawberry Twizzl...|    5|
| 16|My daughter loves...|Lo

## Preprocess

In [5]:
df = RegexTokenizer(inputCol='Text', outputCol='Tokenized', pattern="\\W").transform(df)
df = RegexTokenizer(inputCol='Summary', outputCol='SummaryTokenized', pattern="\\W").transform(df)
df = df.withColumn('NumWords', size('Tokenized'))
df = df.withColumn('NumWordsSummary', size('SummaryTokenized'))

NameError: name 'RegexTokenizer' is not defined

## Investigating the Data

You can see that there are reviews with just one word and that there are reviews with a very high number of words but that on the most part most reviews are in the range of 30-50 words

In [None]:
%matplotlib inline

print 'min review length (in words)', df.select(min('NumWords')).collect()[0][0]
print 'max review length (in words)', df.select(max('NumWords')).collect()[0][0]
print 'average review length (in words)', df.select(mean('NumWords')).collect()[0][0]

df.select('NumWords').toPandas().hist()

You can see that we have a very unbalanced dataset, most of the reviews are 5, we will have to down sample that later on

In [None]:
%matplotlib inline

df.select('Score').toPandas().hist(range=[1, 5])

In [None]:
%matplotlib inline

df.select('NumWordsSummary').toPandas().hist()

In [2]:
%matplotlib inline

from wordcloud import WordCloud, STOPWORDS, ImageColorGenerator

text = df.Text
wordcloud = WordCloud().generate(text)
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis("off")
plt.show()

ImportError: cannot import name _distributor_init

## Cleaning the Data

In [None]:
from pyspark.ml.feature import StopWordsRemover, Word2Vec, NGram, OneHotEncoder, StringIndexer

temp = df.filter(col('Score') == 5)
df = df.filter(col('Score') != 5)
temp = temp.sample(0.2)
df = df.union(temp)

df = StopWordsRemover(inputCol='Tokenized', outputCol='TokenizedStopped').transform(df)
df = df.withColumn('NumWordsStopped', size('TokenizedStopped'))
df = df.filter(col('NumWordsStopped') > 2).filter(col('NumWordsStopped') <= 100)
df = df.withColumn('TextStopped', concat_ws(' ', 'TokenizedStopped'))
# df = StringIndexer(inputCol='TextStopped', outputCol='categoryIndex').fit(df).transform(df)
# df = OneHotEncoder(inputCol='categoryIndex', outputCol='categoryVec').transform(df)
df = Word2Vec(vectorSize=5, seed=42, minCount=0, inputCol='TokenizedStopped', 
              outputCol='Word2Vec').fit(df).transform(df)
df = NGram(n=2, inputCol='TokenizedStopped', outputCol='ngrams').transform(df)

df = df.select('Id', 'Text', 'Score', 'NumWordsStopped', 'TextStopped', 'Word2Vec')

df.show()

In [None]:
# import numpy as np
# from sklearn.preprocessing import OneHotEncoder

# enc = OneHotEncoder()
# enc.fit(np.array(df.select('TextStopped').collect()).reshape(-1, 1))
# ar = enc.transform(df.select('TextStopped').collect()).toarray()
# df = df.withColumn('OneHot', enc.transform(col('TextStopped').reshape(-1, 1)).toarray())

## After Cleanup

In [None]:
%matplotlib inline

min_words = df.select(min('NumWordsStopped')).collect()[0][0]
print 'min review length (in words)', min_words
max_words = df.select(max('NumWordsStopped')).collect()[0][0]
print 'max review length (in words)', max_words
print 'average review length (in words)', df.select(mean('NumWordsStopped')).collect()[0][0]

df.select('NumWordsStopped').toPandas().hist()

In [None]:
%matplotlib inline

df.select('Score').toPandas().hist(range=[1, 5])

## Feature Engineering

In [None]:
data = [(x[0], float(x[1].toArray().mean())) for x in df.select('Id', 'Word2Vec').collect()]
temp_df = spark.createDataFrame(data, ['Id', 'AverageEmbedding'])
df = df.join(temp_df, df['Id'] == temp_df['Id'])
df = df.drop('Id')
df = df.withColumn('NumWordsStoppedNormalized', (col('NumWordsStopped') - min_words) / (max_words - min_words))
df = df.withColumn('ExclamationPoints', size(split(col('Text'), r'!')) - 1)

df.show()

## Clustering

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.linalg import Vectors

dataset = df.select('NumWordsStopped', 'Score')
data = [(Vectors.dense([x[0], x[1]]),) for x in dataset.collect()]
dataset = spark.createDataFrame(data, ['features'])
kmeans = KMeans().setK(5).setSeed(1)
model = kmeans.fit(dataset)
predictions = model.transform(dataset)
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

In [None]:
# %matplotlib inline

# import matplotlib.pyplot as plt

# # plt.scatter(X[:, 0], X[:, 1], c=y_kmeans, s=50, cmap='viridis')

# centers = model.clusterCenters()
# plt.scatter(centers[:, 0], centers[:, 1], c='black', s=200, alpha=0.5)

## Classification

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

dataset = df.select('NumWordsStoppedNormalized', 'AverageEmbedding', 
                    'ExclamationPoints', 'Score').orderBy(rand())
# dataset = df.select('Word2Vec', 'Score')
data = [(x[3], Vectors.dense([x[0], x[1], x[2]])) for x in dataset.collect()]
# data = [(x[1], x[0]) for x in dataset.collect()]
dataset = spark.createDataFrame(data, ['label', 'features'])
labelIndexer = StringIndexer(inputCol='label', outputCol='indexedLabel').fit(dataset)
featureIndexer = VectorIndexer(inputCol="features", outputCol='indexedFeatures', maxCategories=5).fit(dataset)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3])
rf = RandomForestClassifier(labelCol='indexedLabel', featuresCol='indexedFeatures', numTrees=10)
labelConverter = IndexToString(inputCol='prediction', outputCol='predictedLabel',
                               labels=labelIndexer.labels)
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])
model = pipeline.fit(trainingData)
predictions = model.transform(testData)
evaluator = MulticlassClassificationEvaluator(labelCol='indexedLabel', 
                                              predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))
predictions.select('predictedLabel', 'label', 'features').show()

## Evaluation

In [None]:
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import MulticlassMetrics

predictionAndLabels = sc.parallelize([(float(x[0]), float(x[1])) 
                                      for x in predictions.select('predictedLabel', 'label').collect()])
metrics = MulticlassMetrics(predictionAndLabels)
print("Summary Stats")
print('Accuracy = %s' % metrics.accuracy)
print("Precision = %s" % metrics.precision())
print("Recall = %s" % metrics.recall())
print("F1 Score = %s" % metrics.fMeasure())
for label in [float(x) for x in range(1, 6)]:
    print("Class %s precision = %s" % (label, metrics.precision(label)))
    print("Class %s recall = %s" % (label, metrics.recall(label)))
    print("Class %s F1 Measure = %s" % (label, metrics.fMeasure(label, beta=1.0)))
print("Weighted recall = %s" % metrics.weightedRecall)
print("Weighted precision = %s" % metrics.weightedPrecision)
print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)