In [27]:
import pyspark
from pyspark import SQLContext

In [28]:
conf = pyspark.SparkConf()
spark_context = SparkSession.builder.getOrCreate()

In [413]:
df = spark.read.csv("D:/Reviews.csv",header=True,inferSchema = True)


In [414]:
df.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- ProductId: string (nullable = true)
 |-- UserId: string (nullable = true)
 |-- ProfileName: string (nullable = true)
 |-- HelpfulnessNumerator: string (nullable = true)
 |-- HelpfulnessDenominator: string (nullable = true)
 |-- Score: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Summary: string (nullable = true)
 |-- Text: string (nullable = true)



In [415]:
df = df.select('Text','Score')

In [416]:
df.printSchema()

root
 |-- Text: string (nullable = true)
 |-- Score: string (nullable = true)



In [54]:
df.count()

568454

In [55]:
df = df.dropna(how='any')

In [56]:
df.show()

+--------------------+-----+
|                Text|Score|
+--------------------+-----+
|I have bought sev...|    5|
|"Product arrived ...|    1|
|"This is a confec...|    4|
|If you are lookin...|    2|
|Great taffy at a ...|    5|
|I got a wild hair...|    4|
|This saltwater ta...|    5|
|This taffy is so ...|    5|
|Right now I'm mos...|    5|
|This is a very he...|    5|
|I don't know if i...|    5|
|One of my boys ne...|    5|
|My cats have been...|    1|
|good flavor! thes...|    4|
|The Strawberry Tw...|    5|
|My daughter loves...|    5|
|I love eating the...|    2|
|I am very satisfi...|    5|
|Twizzlers, Strawb...|    5|
|Candy was deliver...|    5|
+--------------------+-----+
only showing top 20 rows



In [57]:
from pyspark.sql.functions import when
from pyspark.sql.functions import col
df_3 = df.withColumn("new_column",
           when((col("Score") == "3") | (col("Score") == "2") | (col("Score") == "1"), 0)
      .when((col("Score") == "4") | (col("Score") == "5"), 1)
      .otherwise("new_df.Score"))
df_4 = df_3.drop(df_3.Score)


In [58]:
df_4.show()

+--------------------+----------+
|                Text|new_column|
+--------------------+----------+
|I have bought sev...|         1|
|"Product arrived ...|         0|
|"This is a confec...|         1|
|If you are lookin...|         0|
|Great taffy at a ...|         1|
|I got a wild hair...|         1|
|This saltwater ta...|         1|
|This taffy is so ...|         1|
|Right now I'm mos...|         1|
|This is a very he...|         1|
|I don't know if i...|         1|
|One of my boys ne...|         1|
|My cats have been...|         0|
|good flavor! thes...|         1|
|The Strawberry Tw...|         1|
|My daughter loves...|         1|
|I love eating the...|         0|
|I am very satisfi...|         1|
|Twizzlers, Strawb...|         1|
|Candy was deliver...|         1|
+--------------------+----------+
only showing top 20 rows



In [59]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType,BooleanType,DateType
new_dataset = df_4.withColumn("new_column",col("new_column").cast(IntegerType()))

In [395]:
(trainingData, testData) = new_dataset.randomSplit([0.7, 0.3])
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 397778
Test Dataset Count: 170666


In [396]:

from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression, LinearSVC, NaiveBayes

In [397]:
tokenizer  = Tokenizer(outputCol = "words").setInputCol("Text")

In [398]:
wordsData = tokenizer.transform(trainingData)

In [399]:
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures")
tf = hashingTF.transform(wordsData)

In [400]:
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(tf) 
tfidf = idfModel.transform(tf)

In [401]:
tfidf = tfidf.dropna(how = 'any')

# Logistic Regression


In [333]:
lr = LogisticRegression(featuresCol="features", labelCol='new_column')


In [296]:
%%time
lrModel = lr.fit(tfidf)

CPU times: total: 15.6 ms
Wall time: 1min 17s


In [239]:
lrModel

LogisticRegressionModel: uid=LogisticRegression_01adfd43b472, numClasses=2, numFeatures=262144

In [297]:
res_train = lrModel.transform(tfidf)

In [336]:
res_train.show()

+--------------------+----------+--------------------+--------------------+--------------------+--------------------+----------+
|                Text|new_column|               words|         rawFeatures|            features|       rawPrediction|prediction|
+--------------------+----------+--------------------+--------------------+--------------------+--------------------+----------+
|            almost."|         1|        [, almost."]|(262144,[242002,2...|(262144,[242002,2...|[-1.0185823196254...|       1.0|
| but made from 10...|         1|[, but, made, fro...|(262144,[33917,70...|(262144,[33917,70...|[-1.0060223991956...|       1.0|
| easy and delicio...|         1|[, easy, and, del...|(262144,[96984,20...|(262144,[96984,20...|[-1.1649944549642...|       1.0|
|    easy to prepare"|         1|[, easy, to, prep...|(262144,[27576,30...|(262144,[27576,30...|[-1.0648722594166...|       1.0|
|      says cat.  :)"|         1|[, says, cat., , ...|(262144,[89832,16...|(262144,[89832,16...|[

In [337]:
res_train.select('prediction').show()

+----------+
|prediction|
+----------+
|       1.0|
|       1.0|
|       1.0|
|       1.0|
|       1.0|
|       1.0|
|       1.0|
|       0.0|
|       1.0|
|       0.0|
|       1.0|
|       0.0|
|       1.0|
|       0.0|
|       1.0|
|       1.0|
|       1.0|
|       1.0|
|       1.0|
|       1.0|
+----------+
only showing top 20 rows



In [338]:
test_tokens = tokenizer.transform(testData)
test_tf = hashingTF.transform(test_tokens)
test_tfidf = idfModel.transform(test_tf)

In [339]:
test_tfidf = test_tfidf.dropna(how='any')

In [302]:
res = lrModel.transform(test_tfidf) 

In [341]:
res.select('new_column','prediction').show()

+----------+----------+
|new_column|prediction|
+----------+----------+
|         1|       1.0|
|         1|       1.0|
|         1|       0.0|
|         1|       1.0|
|         0|       1.0|
|         0|       1.0|
|         0|       1.0|
|         1|       1.0|
|         1|       1.0|
|         1|       1.0|
|         1|       1.0|
|         0|       0.0|
|         1|       1.0|
|         0|       0.0|
|         0|       0.0|
|         1|       1.0|
|         1|       1.0|
|         1|       1.0|
|         1|       1.0|
|         1|       1.0|
+----------+----------+
only showing top 20 rows



In [342]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="new_column", rawPredictionCol="prediction", metricName='areaUnderROC')
AUC = evaluator.evaluate(res)

In [343]:
tp = res[(res.new_column==1) & (res.prediction==1)].count()
tn = res[(res.new_column==0) & (res.prediction==0)].count()
fp = res[(res.new_column==0) & (res.prediction==1)].count()
fn = res[(res.new_column==1) & (res.prediction==0)].count()
accuracy = float((tp+tn)/(res.count()))

In [344]:
if(tp + fn == 0.0):
    r = 0.0
    p = float(tp) / (tp + fp)
elif(tp + fp == 0.0):
    r = float(tp) / (tp + fn)
    p = 0.0
else:
    r = float(tp) / (tp + fn)
    p = float(tp) / (tp + fp)
    
if(p + r == 0):
    f1 = 0
else:
    f1 = 2 * ((p * r)/(p + r))

    

print("True Positives:", tp)
print("True Negatives:", tn)
print("False Positives:", fp)
print("False Negatives:", fn)
print("AUC:",AUC)
print("Accuracy:", accuracy)
print("Recall:", r)
print("Precision: ", p)
print("F1 score:", f1)

True Positives: 313187
True Negatives: 74099
False Positives: 26137
False Negatives: 40364
AUC: 0.8125389882406717
Accuracy: 0.8534532721298759
Recall: 0.8858325955802699
Precision:  0.9229733234312928
F1 score: 0.9040216489265741


# Linear SVC

In [353]:
LSVC = LinearSVC(featuresCol="features", labelCol='new_column')

In [354]:
LSVC_MODEL = LSVC.fit(tfidf)

In [355]:
res_train = LSVC_MODEL.transform(tfidf)

In [356]:
test_tokens = tokenizer.transform(testData)
test_tf = hashingTF.transform(test_tokens)
test_tfidf = idfModel.transform(test_tf)

In [357]:
test_tfidf = test_tfidf.dropna(how='any')

In [358]:
res = LSVC_MODEL.transform(test_tfidf) 

In [359]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="new_column", rawPredictionCol="prediction", metricName='areaUnderROC')
AUC = evaluator.evaluate(res)

In [360]:
tp = res[(res.new_column==1) & (res.prediction==1)].count()
tn = res[(res.new_column==0) & (res.prediction==0)].count()
fp = res[(res.new_column==0) & (res.prediction==1)].count()
fn = res[(res.new_column==1) & (res.prediction==0)].count()
accuracy = float((tp+tn)/(res.count()))

In [361]:
if(tp + fn == 0.0):
    r = 0.0
    p = float(tp) / (tp + fp)
elif(tp + fp == 0.0):
    r = float(tp) / (tp + fn)
    p = 0.0
else:
    r = float(tp) / (tp + fn)
    p = float(tp) / (tp + fp)
    
if(p + r == 0):
    f1 = 0
else:
    f1 = 2 * ((p * r)/(p + r))

    

print("True Positives:", tp)
print("True Negatives:", tn)
print("False Positives:", fp)
print("False Negatives:", fn)
print("AUC:",AUC)
print("Accuracy:", accuracy)
print("Recall:", r)
print("Precision: ", p)
print("F1 score:", f1)

True Positives: 200463
True Negatives: 48477
False Positives: 14293
False Negatives: 20366
AUC: 0.8400352383611893
Accuracy: 0.8777887087048967
Recall: 0.9077747940714308
Precision:  0.9334453984987614
F1 score: 0.9204311443231517


# NAIVE BAYES

In [402]:
NB = NaiveBayes(featuresCol="features", labelCol='new_column')

In [403]:
NB_MODEL = NB.fit(tfidf)

In [404]:
res_train = NB_MODEL.transform(tfidf)

In [405]:
test_tokens = tokenizer.transform(testData)
test_tf = hashingTF.transform(test_tokens)
test_tfidf = idfModel.transform(test_tf)

In [406]:
test_tfidf = test_tfidf.dropna(how='any')

In [407]:
res = NB_MODEL.transform(test_tfidf) 

In [408]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="new_column", rawPredictionCol="prediction", metricName='areaUnderROC')
AUC = evaluator.evaluate(res)

In [409]:
tp = res[(res.new_column==1) & (res.prediction==1)].count()
tn = res[(res.new_column==0) & (res.prediction==0)].count()
fp = res[(res.new_column==0) & (res.prediction==1)].count()
fn = res[(res.new_column==1) & (res.prediction==0)].count()
accuracy = float((tp+tn)/(res.count()))

In [410]:
if(tp + fn == 0.0):
    r = 0.0
    p = float(tp) / (tp + fp)
elif(tp + fp == 0.0):
    r = float(tp) / (tp + fn)
    p = 0.0
else:
    r = float(tp) / (tp + fn)
    p = float(tp) / (tp + fp)
    
if(p + r == 0):
    f1 = 0
else:
    f1 = 2 * ((p * r)/(p + r))

    

print("True Positives:", tp)
print("True Negatives:", tn)
print("False Positives:", fp)
print("False Negatives:", fn)
print("AUC:",AUC)
print("Accuracy:", accuracy)
print("Recall:", r)
print("Precision: ", p)
print("F1 score:", f1)

True Positives: 116797
True Negatives: 28505
False Positives: 9121
False Negatives: 15764
AUC: 0.8193345004157759
Accuracy: 0.8537784907190326
Recall: 0.8810811626345607
Precision:  0.9275639702028304
F1 score: 0.9037252542759759


# ML PIPELINE


In [None]:
from pyspark.ml.feature import StopWordsRemover
eng_stopwords = StopWordsRemover.loadDefaultStopWords("english")
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, CountVectorizer, HashingTF
from pyspark.ml.classification import LogisticRegression

tokenizer  = Tokenizer(inputCol = 'Text',outputCol = "tokens")
stopwordsremover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")
vectorizer = CountVectorizer(inputCol = 'tokens', outputCol = 'raw_features')
hashingTF = HashingTF(inputCol="tokens", outputCol="raw_features")
idf = IDF(inputCol = 'raw_features', outputCol = 'vectorizedfeatures')
lr = LogisticRegression(featuresCol="vectorizedfeatures", labelCol='Score')
lr_pipeline = Pipeline(stages = [tokenizer,stopwordsremover,vectorizer,hashingTF,idf,lr ])