In [1]:
from pyspark import SparkConf, SparkContext
import sys
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql import functions as f
from pyspark.sql.functions import current_timestamp
from pyspark.sql.types import *
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql import Row
from pyspark.sql.functions import count, avg
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.classification import NaiveBayes


In [2]:
conf = SparkConf().setMaster("local").setAppName("RatingsHistogram")
sc = SparkContext()
sqlContext = SQLContext(sc)

In [9]:
# reading data
df = sqlContext.read.csv("/Users/navyarao/Downloads/Amazon_reviews.csv",header=True)
#df = sqlContext.read.csv("/Users/Downloads/AmazonReviews_sample.csv",header=True)
print(df.show(5))


+---+----------+--------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
| Id| ProductId|        UserId|HelpfulnessNumerator|HelpfulnessDenominator|Score|      Time|             Summary|                Text|
+---+----------+--------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
|  1|B001E4KFG0|A3SGXH7AUHU8GW|                   1|                     1|    5|1303862400|Good Quality Dog ...|I have bought sev...|
|  2|B00813GRG4|A1D87F6ZCVE5NK|                   0|                     0|    1|1346976000|   Not as Advertised|"Product arrived ...|
|  3|B000LQOCH0| ABXLMWJIXXAIN|                   1|                     1|    4|1219017600| Delight says it all|"This is a confec...|
|  4|B000UA0QIQ|A395BORC6FGVXV|                   3|                     3|    2|1307923200|      Cough Medicine|If you are lookin...|
|  5|B006K2ZZ7K|A1UQRSCLF8GW1T|                   0|   

# creating new column "status" based on score. If score is lessthan or equal to 3 then considering the status as positive and if the score is greaterthan 3 then considering the status as positive

In [5]:
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()
df.select([c for c in df.columns if c in ['Text','Score']]).show()
df = df.withColumn(
    'Status',
    f.when((f.col("Score") >= 3), "Positive")\
    .otherwise("Negative")
)
df.show(5)

+---+---------+------+--------------------+----------------------+-----+----+-------+----+
| Id|ProductId|UserId|HelpfulnessNumerator|HelpfulnessDenominator|Score|Time|Summary|Text|
+---+---------+------+--------------------+----------------------+-----+----+-------+----+
|  0|        0|     0|                   0|                     0|    0|   0|      0|   0|
+---+---------+------+--------------------+----------------------+-----+----+-------+----+

+-----+--------------------+
|Score|                Text|
+-----+--------------------+
|    5|I have bought sev...|
|    1|"Product arrived ...|
|    4|"This is a confec...|
|    2|If you are lookin...|
|    5|Great taffy at a ...|
|    4|I got a wild hair...|
|    5|This saltwater ta...|
|    5|This taffy is so ...|
|    5|Right now I'm mos...|
|    5|This is a very he...|
|    5|I don't know if i...|
|    5|One of my boys ne...|
|    1|My cats have been...|
|    4|good flavor! thes...|
|    5|The Strawberry Tw...|
|    5|My daughter lov

In [6]:
df.dtypes
df.describe().show()
type(df.describe())

+-------+------------------+-------------+------------------+--------------------+----------------------+------------------+--------------------+--------------------+--------------------+--------+
|summary|                Id|    ProductId|            UserId|HelpfulnessNumerator|HelpfulnessDenominator|             Score|                Time|             Summary|                Text|  Status|
+-------+------------------+-------------+------------------+--------------------+----------------------+------------------+--------------------+--------------------+--------------------+--------+
|  count|             65534|        65533|             65533|               65533|                 65533|             65533|               65533|               65533|               65533|   65534|
|   mean| 32767.99436924908|2.734888454E9|              null|  1.6161170707887629|     2.096882486686097|4.1492988265454045|1.2949877542490044E9|                null|                null|    null|
| stddev|18917.

pyspark.sql.dataframe.DataFrame

# Positive reviews have large count than negative reviews

In [6]:
df.groupBy('Status').count().show()

+--------+------+
|  Status| count|
+--------+------+
|Positive|484549|
|Negative| 83905|
+--------+------+



In [7]:
df.dropDuplicates(['UserId', 'ProfileName','Time','Text']).show(5)

+------+----------+------------------+--------------------+--------------------+----------------------+-----+----------+-------------------+--------------------+--------+
|    Id| ProductId|            UserId|         ProfileName|HelpfulnessNumerator|HelpfulnessDenominator|Score|      Time|            Summary|                Text|  Status|
+------+----------+------------------+--------------------+--------------------+----------------------+-----+----------+-------------------+--------------------+--------+
|290941|B005HG9ESG|#oc-R1GKUU1PTIDIZ7|"Barbara Rhoades ...|                   0|                     0|    5|1343779200|       Better water|Water is water, r...|Positive|
|291020|B005HG9ESG|#oc-R2F3I37IKVM0H0|"Addison Dewitt "...|                   0|                     3|    3|1341187200|Clear, Cool, Water?|When I was growin...|Positive|
| 83696|B005ZBZLT4| #oc-R6LF0WYR2C9SB|           brown1829|                   1|                     8|    1|1332633600|        Worst Ever!|Thoug

# Here we are tokenizing the text, preprocessing and removing stopwords from the text. And converting tokens to count vectors and building TF-IDF matrix

In [8]:
# tokenizer
Tokenizer_regex = RegexTokenizer(inputCol="Text", outputCol="all_words", pattern="\\W")
# stop words removal
list_stopwords_additional = ['a', u'about', u'above', u'after', u'again', u'against', u'all', u'am', u'an', u'and', u'any', u'are', u'arent', u'as', u'at', 
    u'be', u'because', u'been', u'before', u'being', u'below', u'between', u'both', u'but', u'by', 
    u'can', 'cant', 'come', u'could', 'couldnt', 
    u'd', u'did', u'didn', u'do', u'does', u'doesnt', u'doing', u'dont', u'down', u'during', 
    u'each', 
    u'few', 'finally', u'for', u'from', u'further', 
    u'had', u'hadnt', u'has', u'hasnt', u'have', u'havent', u'having', u'he', u'her', u'here', u'hers', u'herself', u'him', u'himself', u'his', u'how', 
    u'i', u'if', u'in', u'into', u'is', u'isnt', u'it', u'its', u'itself', 
    u'just', 
    u'll', 
    u'm', u'me', u'might', u'more', u'most', u'must', u'my', u'myself', 
    u'no', u'nor', u'not', u'now', 
    u'o', u'of', u'off', u'on', u'once', u'only', u'or', u'other', u'our', u'ours', u'ourselves', u'out', u'over', u'own', 
    u'r', u're', 
    u's', 'said', u'same', u'she', u'should', u'shouldnt', u'so', u'some', u'such',
    u't', u'than', u'that', 'thats', u'the', u'their', u'theirs', u'them', u'themselves', u'then', u'there', u'these', u'they', u'this', u'those', u'through', u'to', u'too', 
    u'under', u'until', u'up', 
    u'very', 
    u'was', u'wasnt', u'we', u'were', u'werent', u'what', u'when', u'where', u'which', u'while', u'who', u'whom', u'why', u'will', u'with', u'wont', u'would', 
    u'y', u'you', u'your', u'yours', u'yourself', u'yourselves'] 

Remove_stopwords = StopWordsRemover(inputCol="all_words", outputCol="stopwords_removed").setStopWords(list_stopwords_additional)
# words count
count_Vectors = CountVectorizer(inputCol="stopwords_removed", outputCol="features", vocabSize=10000, minDF=5)
#label generation
label_Target = StringIndexer(inputCol = "Status", outputCol = "label")
#TF
TF = HashingTF(inputCol="stopwords_removed", outputCol="TF", numFeatures=10000)
#idf
idf = IDF(inputCol="TF", outputCol="features", minDocFreq=5) 

# Logistic Regression using count vectors

In [9]:
pipeline = Pipeline(stages=[Tokenizer_regex, Remove_stopwords, count_Vectors ,label_Target])
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(df)
final = pipelineFit.transform(df)
final.show(5)

+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+-----+
| Id| ProductId|        UserId|         ProfileName|HelpfulnessNumerator|HelpfulnessDenominator|Score|      Time|             Summary|                Text|  Status|           all_words|   stopwords_removed|            features|label|
+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+-----+
|  1|B001E4KFG0|A3SGXH7AUHU8GW|          delmartian|                   1|                     1|    5|1303862400|Good Quality Dog ...|I have bought sev...|Positive|[i, have, bought,...|[bought, several,...|(10000,[1,2,7,11,...|  0.0|
|  2|B00813GRG4|A1D87F6ZCVE5NK|              dll pa|            

# Splitting data into 75 percent train and 25 percent test

In [10]:
# set seed  for train test split
(trainData, testData) = final.randomSplit([0.75, 0.25], seed = 100)


In [11]:
print("Train Data Count")
print(str(trainData.count()))
print("Test Data Count")
print(str(testData.count()))

Train Data Count
425888
Test Data Count
142566


In [12]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainData)

In [13]:
lrModel.summary

<pyspark.ml.classification.BinaryLogisticRegressionTrainingSummary at 0x5b91400>

In [14]:
predictions_train = lrModel.transform(trainData)
predictions_train.select("Text","probability","label","prediction").show()
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print("Train Accuracy")
evaluator.evaluate(predictions_train)

+--------------------+--------------------+-----+----------+
|                Text|         probability|label|prediction|
+--------------------+--------------------+-----+----------+
|I have bought sev...|[0.86521491951716...|  0.0|       0.0|
|This is a very he...|[0.91002016546519...|  0.0|       0.0|
|I fed this to my ...|[0.63498899727519...|  1.0|       0.0|
|I have to admit, ...|[0.87806296483908...|  1.0|       0.0|
|I love this noodl...|[0.88291013280326...|  0.0|       0.0|
|That's pretty muc...|[0.82073464319410...|  0.0|       0.0|
|These are very go...|[0.91153753318420...|  0.0|       0.0|
|Very spicy packag...|[0.85891053323600...|  0.0|       0.0|
|Delicious, awesom...|[0.90270526120574...|  0.0|       0.0|
|Always good.  I d...|[0.88363590578553...|  0.0|       0.0|
|It's quick to coo...|[0.89920111945357...|  0.0|       0.0|
|the deliver is on...|[0.84963181637577...|  0.0|       0.0|
|"Love this soup! ...|[0.91999493629266...|  0.0|       0.0|
|You get what you ...|[0

0.8444717631829979

In [15]:
predictions_test = lrModel.transform(testData)
predictions_test.select("Text","probability","label","prediction").show()
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print("Test Accuracy")
evaluator.evaluate(predictions_test)

+--------------------+--------------------+-----+----------+
|                Text|         probability|label|prediction|
+--------------------+--------------------+-----+----------+
|We have a 7 week ...|[0.92604458735751...|  0.0|       0.0|
|The taste is grea...|[0.88326052879249...|  0.0|       0.0|
|This is the best ...|[0.89501928224742...|  0.0|       0.0|
|"I don't see how ...|[0.90249562227259...|  0.0|       0.0|
|I purchased this ...|[0.91021299920427...|  0.0|       0.0|
|bought 2 package ...|[0.83423704706643...|  0.0|       0.0|
|Shin Ramyun (sinc...|[0.86679681965417...|  0.0|       0.0|
|I received Nongsh...|[0.86394863198892...|  0.0|       0.0|
|These are perhaps...|[0.91557649654691...|  0.0|       0.0|
|"RECONSIDER THIS ...|[0.63917590959691...|  1.0|       0.0|
|I really love thi...|[0.89871970167112...|  0.0|       0.0|
|This is the most ...|[0.76614312816563...|  0.0|       0.0|
|These noodles rem...|[0.96134203086638...|  0.0|       0.0|
|They're not for t...|[0

0.8436584609908302

# Logistic Regression using TF-idf

In [16]:
pipeline = Pipeline(stages=[Tokenizer_regex, Remove_stopwords, TF, idf, label_Target])
pipelineFit = pipeline.fit(df)
final_TF_idf = pipelineFit.transform(df)
final_TF_idf.show(5)

+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+-----+
| Id| ProductId|        UserId|         ProfileName|HelpfulnessNumerator|HelpfulnessDenominator|Score|      Time|             Summary|                Text|  Status|           all_words|   stopwords_removed|                  TF|            features|label|
+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+--------+--------------------+--------------------+--------------------+--------------------+-----+
|  1|B001E4KFG0|A3SGXH7AUHU8GW|          delmartian|                   1|                     1|    5|1303862400|Good Quality Dog ...|I have bought sev...|Positive|[i, have, bought,...|[bought, several,...|(10000,[639,941,1...|(10000,[

In [17]:
(trainData_TF_idf, testData_TF_idf) = final_TF_idf.randomSplit([0.7, 0.3], seed = 100)
print("Train Data Count")
print(str(trainData_TF_idf.count()))
print("Test Data Count")
print(str(testData_TF_idf.count()))


Train Data Count
397945
Test Data Count
170509


In [18]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel_TF_idf = lr.fit(trainData_TF_idf)

In [19]:
predictions_train_TF_idf = lrModel_TF_idf.transform(trainData_TF_idf)
predictions_train_TF_idf.select("Text","probability","label","prediction").show()
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print("Train Accuracy")
evaluator.evaluate(predictions_train_TF_idf)

+--------------------+--------------------+-----+----------+
|                Text|         probability|label|prediction|
+--------------------+--------------------+-----+----------+
|I have bought sev...|[0.84096602345808...|  0.0|       0.0|
|This is a very he...|[0.90071968211141...|  0.0|       0.0|
|I fed this to my ...|[0.64773101606137...|  1.0|       0.0|
|I have to admit, ...|[0.90509712302684...|  1.0|       0.0|
|I love this noodl...|[0.88406138137306...|  0.0|       0.0|
|That's pretty muc...|[0.78432301709773...|  0.0|       0.0|
|These are very go...|[0.88654513634755...|  0.0|       0.0|
|Very spicy packag...|[0.88865625530105...|  0.0|       0.0|
|Delicious, awesom...|[0.87693259414089...|  0.0|       0.0|
|Always good.  I d...|[0.88416923928414...|  0.0|       0.0|
|It's quick to coo...|[0.89981801328921...|  0.0|       0.0|
|the deliver is on...|[0.84272775631786...|  0.0|       0.0|
|"Love this soup! ...|[0.92351717304043...|  0.0|       0.0|
|You get what you ...|[0

0.8345994322629462

In [20]:
predictions_test_TF_idf = lrModel_TF_idf.transform(testData_TF_idf)
predictions_test_TF_idf.select("Text","probability","label","prediction").show()
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print("Test Accuracy")
evaluator.evaluate(predictions_test_TF_idf)

+--------------------+--------------------+-----+----------+
|                Text|         probability|label|prediction|
+--------------------+--------------------+-----+----------+
|We have a 7 week ...|[0.92108889261432...|  0.0|       0.0|
|The taste is grea...|[0.88331982423707...|  0.0|       0.0|
|This is the best ...|[0.87865104449789...|  0.0|       0.0|
|"I don't see how ...|[0.89684987265900...|  0.0|       0.0|
|I purchased this ...|[0.85834928537730...|  0.0|       0.0|
|bought 2 package ...|[0.83377239630574...|  0.0|       0.0|
|Shin Ramyun (sinc...|[0.89425060058872...|  0.0|       0.0|
|I received Nongsh...|[0.88749141261992...|  0.0|       0.0|
|These are perhaps...|[0.92511829089692...|  0.0|       0.0|
|"RECONSIDER THIS ...|[0.58239393230898...|  1.0|       0.0|
|Excellent product...|[0.94056158309248...|  0.0|       0.0|
|I really love thi...|[0.89759641803673...|  0.0|       0.0|
|This is the most ...|[0.80052088245035...|  0.0|       0.0|
|These noodles rem...|[0

0.8331494767643901

# NaiveBayes

In [21]:
nb = NaiveBayes(smoothing=1)
model_NB = nb.fit(trainData_TF_idf)

In [22]:
predictions_train_TF_idf_NB = model_NB.transform(trainData_TF_idf)
predictions_train_TF_idf_NB.select("Text","probability","label","prediction").show()
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print("Train Accuracy")
evaluator.evaluate(predictions_train_TF_idf_NB)

+--------------------+--------------------+-----+----------+
|                Text|         probability|label|prediction|
+--------------------+--------------------+-----+----------+
|I have bought sev...|[0.99717079159599...|  0.0|       0.0|
|This is a very he...|[0.99998259779735...|  0.0|       0.0|
|I fed this to my ...|[5.02743064841116...|  1.0|       1.0|
|I have to admit, ...|[1.0,4.2763142322...|  1.0|       0.0|
|I love this noodl...|[0.99996223727718...|  0.0|       0.0|
|That's pretty muc...|[0.99774860194651...|  0.0|       0.0|
|These are very go...|[0.99999141563091...|  0.0|       0.0|
|Very spicy packag...|[0.99999999900201...|  0.0|       0.0|
|Delicious, awesom...|[0.99998537496198...|  0.0|       0.0|
|Always good.  I d...|[0.99987755996523...|  0.0|       0.0|
|It's quick to coo...|[0.99999999539160...|  0.0|       0.0|
|the deliver is on...|[0.11735406584916...|  0.0|       1.0|
|"Love this soup! ...|[0.99999999999999...|  0.0|       0.0|
|You get what you ...|[0

0.8689622001667336

In [23]:
predictions_test_TF_idf_NB = model_NB.transform(testData_TF_idf)
predictions_test_TF_idf_NB.select("Text","probability","label","prediction").show()
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print("Test Accuracy")
evaluator.evaluate(predictions_test_TF_idf_NB)

+--------------------+--------------------+-----+----------+
|                Text|         probability|label|prediction|
+--------------------+--------------------+-----+----------+
|We have a 7 week ...|[0.99999999994235...|  0.0|       0.0|
|The taste is grea...|[0.99999939514935...|  0.0|       0.0|
|This is the best ...|[0.99994602934327...|  0.0|       0.0|
|"I don't see how ...|[0.99999999990287...|  0.0|       0.0|
|I purchased this ...|[0.94282840305928...|  0.0|       0.0|
|bought 2 package ...|[0.96904960047021...|  0.0|       0.0|
|Shin Ramyun (sinc...|[1.0,8.0513083484...|  0.0|       0.0|
|I received Nongsh...|[0.99999999975747...|  0.0|       0.0|
|These are perhaps...|[0.99999999999516...|  0.0|       0.0|
|"RECONSIDER THIS ...|[5.93077012108314...|  1.0|       1.0|
|Excellent product...|[0.99999999999998...|  0.0|       0.0|
|I really love thi...|[0.99999999340859...|  0.0|       0.0|
|This is the most ...|[0.99963843621816...|  0.0|       0.0|
|These noodles rem...|[1

0.864070763372753

# Converting tokens to vectors using word2vec model

In [28]:
from pyspark.ml.feature import Word2Vec

word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="stopwords_removed", outputCol="features")
pipeline = Pipeline(stages=[Tokenizer_regex, Remove_stopwords,word2Vec ,label_Target])

model = pipeline.fit(df)

result = model.transform(df)


In [29]:
(trainData_wordvec, testData_wordvec) = result.randomSplit([0.7, 0.3], seed = 100)
print("Train Data Count")
print(str(trainData_wordvec.count()))
print("Test Data Count")
print(str(testData_wordvec.count()))



Train Data Count
397945
Test Data Count
170509


# Logistic regression using word2vec vectors 

In [30]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel_wordvec = lr.fit(trainData_wordvec)

In [31]:
predictions_train_wordvec = lrModel_wordvec.transform(trainData_wordvec)
predictions_train_wordvec.select("Text","probability","label","prediction").show()
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print("Train Accuracy")
evaluator.evaluate(predictions_train_wordvec)

+--------------------+--------------------+-----+----------+
|                Text|         probability|label|prediction|
+--------------------+--------------------+-----+----------+
|I have bought sev...|[0.85185376495581...|  0.0|       0.0|
|This is a very he...|[0.84052218477329...|  0.0|       0.0|
|I fed this to my ...|[0.84115776266943...|  1.0|       0.0|
|I have to admit, ...|[0.85996233748411...|  1.0|       0.0|
|I love this noodl...|[0.84927127863579...|  0.0|       0.0|
|That's pretty muc...|[0.86121998108371...|  0.0|       0.0|
|These are very go...|[0.85872845914892...|  0.0|       0.0|
|Very spicy packag...|[0.85258114510804...|  0.0|       0.0|
|Delicious, awesom...|[0.86948081055635...|  0.0|       0.0|
|Always good.  I d...|[0.85452297375336...|  0.0|       0.0|
|It's quick to coo...|[0.85920451913633...|  0.0|       0.0|
|the deliver is on...|[0.84240972313296...|  0.0|       0.0|
|"Love this soup! ...|[0.86022583754429...|  0.0|       0.0|
|You get what you ...|[0

0.7843724492292861

In [32]:
predictions_test_wordvec = lrModel_wordvec.transform(testData_wordvec)
predictions_test_wordvec.select("Text","probability","label","prediction").show()
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print("Test Accuracy")
evaluator.evaluate(predictions_test_wordvec)

+--------------------+--------------------+-----+----------+
|                Text|         probability|label|prediction|
+--------------------+--------------------+-----+----------+
|We have a 7 week ...|[0.83906671830046...|  0.0|       0.0|
|The taste is grea...|[0.86395393187768...|  0.0|       0.0|
|This is the best ...|[0.86651645424894...|  0.0|       0.0|
|"I don't see how ...|[0.86425862028939...|  0.0|       0.0|
|I purchased this ...|[0.83986722801162...|  0.0|       0.0|
|bought 2 package ...|[0.85047972607606...|  0.0|       0.0|
|Shin Ramyun (sinc...|[0.84958657481014...|  0.0|       0.0|
|I received Nongsh...|[0.84325885236766...|  0.0|       0.0|
|These are perhaps...|[0.85517496969795...|  0.0|       0.0|
|"RECONSIDER THIS ...|[0.84094150559616...|  1.0|       0.0|
|Excellent product...|[0.85327875832977...|  0.0|       0.0|
|I really love thi...|[0.85281063755129...|  0.0|       0.0|
|This is the most ...|[0.84504044070449...|  0.0|       0.0|
|These noodles rem...|[0

0.7847225175650496