# CUSTOMER REVIEW ANALYSIS OF AMAZON PRODUCTS

The goal of this project is to build a classifier that would understand the essence of a piece of review text and assign it the most appropriate classification i.e., positive review or negative review through nlp.


Different text mining pre processing concepts were explored during this project. As the dataset is unbalanced we perfomed downsampling to randomly filter out some of the majority cases and trained dataset with different classifers and got better results.

In [None]:
#! /bin/python
import findspark
findspark.init()
findspark.find()

from handyspark import *
import pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.sql import functions as fnc
from pyspark.sql.types import IntegerType

from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import HashingTF, IDF

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier

from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from imblearn.over_sampling import SMOTE
from imblearn.combine import SMOTEENN

import pandas as pd
from sklearn.metrics import roc_curve, auc
from matplotlib import pyplot as plt
%matplotlib inline


Loading the hive table from the local file system.

In [None]:
sc =SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
df = sqlContext.read.format("com.databricks.spark.csv").option("delimiter", "\t").option("header", "true").load("amazon_customer_reviews_req_col.tsv")
#df.show()

Data Analysis

In [None]:
reviewcount_for_product =df.groupBy('name').agg(count("reviews_text"))
reviewcount_for_product.show()

ratingCount = df.groupBy("reviews_rating").count()
ratingCount.show()

df_product_rating = df.select(df.name, df.reviews_rating.cast("int"))

avgRatingForProd = df_product_rating.groupBy("name").mean("reviews_rating") 
avgRatingForProd.show()

In order to differentiate 'Positive' and 'Negative' reviews, we first converted our rating column by assigning
 positive sentiment as '1' for ratings 4 and 5,
 negative sentiment as '0' for ratings 1,2,3.


The distribution looks skewed as 'Positive' ratings are much more than 'Negative' ratings as we can see the volume of 'Negative' cases is very low.

In [None]:
amazon = df.selectExpr("cast(reviews_rating as int) as label", "reviews_text")
amazon = amazon.withColumn("label",fnc.when(amazon["label"]>=4,1).otherwise(0))
amazon.show()

ratingCount = amazon.groupBy("label").count()
ratingCount.show()

## Tokenization

Tokenizer breaks the review text into words. The output of the tokenizer is the tokenized words from the table. 

## Removal of Stopwords

Stopwords are most commonly used words which are not helpful in distinguishing a review from another is removed.  

## TD-IDF

TD-IDF is a feature vectorization method used in text mining to reflect the importance of a team to a document in the corpus. Term Frequency(TF) is the number of times that word appears in the review while document frequency(df) is the number of reviews that contains the word.

Hashing TF and IDF features from pyspark is used to generate the term frequency vectors.

In [None]:
# Tokenize the review. 
tokenizer = Tokenizer(inputCol="reviews_text", outputCol="tokenized_words")
tokenizedWordsDF = tokenizer.transform(amazon)
tokenizedWordsDF.show()

# Remove stop words
remover = StopWordsRemover(inputCol="tokenized_words", outputCol="filtered_words")
filteredWordsDF = remover.transform(tokenizedWordsDF)
filteredWordsDF.show()

# Convert to TF words vector
hashingTF = HashingTF(inputCol="filtered_words", outputCol="TF")
tfwordsDF = hashingTF.transform(filteredWordsDF)
tfwordsDF.show()

# Convert to IDF words vector, ensure to name the features as 'features'
idf = IDF(inputCol="TF", outputCol="features")
idfModel = idf.fit(tfwordsDF)
idfwordsDF = idfModel.transform(tfwordsDF)
idfwordsDF.show()

for features_label in idfwordsDF.select("features", "label").take(3):
    print(features_label)   

## Logistic Regression Model

Pipelined the different stages tokenizing, removing stopwords and td-idf vectorization by considering pipeline as an estimator to cross validator and BinaryClassificationEvaluator evaluated the logistic regression model.

In [None]:
# Split data into training and testing set 
(train, test) = amazon.randomSplit([0.7, 0.3])

# logistic regression instance
lr = LogisticRegression(maxIter=5)

# Use a pipeline to chain all transformers and estimators
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idfModel, lr])

# This will allow us to jointly choose parameters for all Pipeline stages.
# ParamGridBuilder to construct a grid of parameters to search over.
paramGrid = ParamGridBuilder().addGrid(hashingTF.numFeatures, [10, 50]).addGrid(lr.regParam, [0.1, 0.01]).build()

# A CrossValidator with Estimator as pipeline, an evaluator BinaryClassificationEvaluator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=4) 

# Cross-validation, to choose the best set of parameters.
cvModel = crossval.fit(train)

# Make predictions on test reviews. cvModel uses the best LR model with best parameters.
predictions = cvModel.transform(test)
# selected = predictions.select("reviews_text", "label", "probability", "prediction").take(20)

# Evaluate result with ROC
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
rocScore = evaluator.evaluate(predictions)
print("Area under ROC score: " + str(rocScore))

# Plot ROC curve from the predicted results
results = predictions.select(['probability', 'label'])
results_collect = results.collect()
results_list = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect]

fpr = dict()
tpr = dict()
roc_auc = dict()
 
y_test = [i[1] for i in results_list]
y_score = [i[0] for i in results_list]
 
fpr, tpr, _ = roc_curve(y_test, y_score)
roc_auc = auc(fpr, tpr)

plt.figure()
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Review Prediction')
plt.legend(loc="lower right")
plt.show()

As we see from the model result, the output is more biased to the positive reviews and 10% negative reviews so to balance the dataset we performed downsampling to randomly filter out some of the majority cases.

In [None]:
predictions.toHandy().cols[['reviews_text','probability', 'prediction', 'label']][:5]

## DOWN SAMPLING

We assigned a random integer to each positive class object, and then filtered out those whose objects with "random integer" is larger than a threshold which we calculated, so that the data points from the majority class -- 'Positive'-- will be mush less.

### Ensemble of Down Sampling

Each time when we do a down sampling on training data, we are filtering out some data that belong to the "Positive" class, by doing this we will miss out information which could be used to train our model. 

So, we tried down sampling for different data set to train our model.

In [None]:
## Down Sampling

ratio = 2.0 ## ratio of positive to negative for down sampling

train_sample = train
counts = train_sample.select('label').groupBy('label').count().collect()
print(counts)
higherBound = counts[0][1]
thresholdToFilter = int(ratio * float(counts[1][1]) / counts[0][1] * higherBound)
 
print("Higherbound: ", higherBound)
print("Threshold to filter the majority class", thresholdToFilter)

train_sample = train_sample.withColumn("randIndex",
    fnc.when(train["label"] == 1, round(rand()*(higherBound-1)+1,0)).
    otherwise(-1))

train1=train_sample.where("randIndex < 3920")
train2=train_sample.where("(randIndex > 3920 and randIndex < 7000) OR randIndex == -1")
train3=train_sample.where("(randIndex > 7000 and randIndex < 11000) OR randIndex == -1")
train4=train_sample.where("(randIndex > 11000 and randIndex < 17000) OR randIndex == -1")

print(train1.select('label').groupBy('label').count().collect())
print(train2.select('label').groupBy('label').count().collect())
print(train3.select('label').groupBy('label').count().collect())
print(train4.select('label').groupBy('label').count().collect())


## Randomforest Classifier

Trained downsample dataset with Randomforest classifier took the average from all the models trained with different downsampled data-sets we got better overall predictions with ROC score of 90%. 

In [None]:
df_list = [train1,train2,train3,train4]

for i in range(0,len(df_list)):
    train_df = df_list[i]
    rf = RandomForestClassifier(numTrees=15)
    pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idfModel, rf])

    paramGrid = ParamGridBuilder().addGrid(hashingTF.numFeatures, [10, 50]).addGrid(rf.maxDepth, [5, 15]).build()

    crossval = CrossValidator(estimator = pipeline,
                          estimatorParamMaps = paramGrid,
                          evaluator = BinaryClassificationEvaluator(),
                          numFolds = 5)

    cvModel = crossval.fit(train_df)
    prediction = cvModel.transform(test)
       
    evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
    areaUnderROC = evaluator.evaluate(prediction)
    print("area Under ROC score: " + str(areaUnderROC))
    
    prediction.toHandy().cols[['probability', 'prediction', 'label']][:5]

In [None]:
train_df = train1
rf = RandomForestClassifier(numTrees=15)
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idfModel, rf])

paramGrid = ParamGridBuilder().addGrid(hashingTF.numFeatures, [10, 50]).addGrid(rf.maxDepth, [5, 15]).build()

crossval = CrossValidator(estimator = pipeline,
                          estimatorParamMaps = paramGrid,
                          evaluator = BinaryClassificationEvaluator(),
                          numFolds = 5)

cvModel = crossval.fit(train_df)
prediction = cvModel.transform(test)

results = prediction.select(['probability', 'label'])
results_collect = results.collect()
results_list = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect]

fpr = dict()
tpr = dict()
roc_auc = dict()
 
y_test = [i[1] for i in results_list]
y_score = [i[0] for i in results_list]
 
fpr, tpr, _ = roc_curve(y_test, y_score)
roc_auc = auc(fpr, tpr)

plt.figure()
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Review Prediction')
plt.legend(loc="lower right")
plt.show()
    
prediction.toHandy().cols[['probability', 'prediction', 'label']][:5]

## Gradient Boost Tree Classifier

In [None]:
# Train a GBT model.
df_list = [train1,train2,train3,train4]

for i in range(0,len(df_list)):
    gbt = GBTClassifier(maxIter=10)

    pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idfModel, gbt])

    paramGrid = ParamGridBuilder().addGrid(hashingTF.numFeatures, [10, 50]).build()

    crossval = CrossValidator(estimator = pipeline,
                          estimatorParamMaps = paramGrid,
                          evaluator = BinaryClassificationEvaluator(),
                          numFolds = 3)

    cvModel = crossval.fit(train1)

    # Make predictions.
    predictions = cvModel.transform(test)
    # Select example rows to display.
    selected_GBT = predictions.select("reviews_text", "label", "probability", "prediction").take(5)
    for row in selected_GBT:
        print(row)

    evaluator_GBT = BinaryClassificationEvaluator(labelCol="label")
    rocScore = evaluator_GBT.evaluate(predictions)
    print("ROC score for Gradient-boosted tree classifier: " + rocScore)


In [None]:
# Train a GBT model.
gbt = GBTClassifier(maxIter=10)

pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idfModel, gbt])

paramGrid = ParamGridBuilder().addGrid(hashingTF.numFeatures, [10, 50]).build()

crossval = CrossValidator(estimator = pipeline,
                          estimatorParamMaps = paramGrid,
                          evaluator = BinaryClassificationEvaluator(),
                          numFolds = 3)

# Train model.  This also runs the indexers.
cvModel = crossval.fit(train2)

# Make predictions.
predictions = cvModel.transform(test)

evaluator_GBT = BinaryClassificationEvaluator(labelCol="label")
Accuray_GBT = evaluator_GBT.evaluate(predictions)
print("Accuracy for Gradient-boosted tree classifier: " + str(Accuray_GBT))


results = predictions.select(['probability', 'label'])
results_collect = results.collect()
results_list = [(float(i[0][0]), 1.0-float(i[1])) for i in results_collect]

fpr = dict()
tpr = dict()
roc_auc = dict()
 
y_test = [i[1] for i in results_list]
y_score = [i[0] for i in results_list]
 
fpr, tpr, _ = roc_curve(y_test, y_score)
roc_auc = auc(fpr, tpr)

plt.figure()
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Review Prediction')
plt.legend(loc="lower right")
plt.show()
    
predictions.toHandy().cols[['probability', 'prediction', 'label']][:5]


Different NLP pre processing techniques and concepts were explored during this project from our analysis actually pre-processing steps are very important.Handling of misspelled words, incorrectly spelled words will be taken as they are during the training.