In [1]:
from pyspark import SparkContext
from pyspark.sql.types import *
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

In [2]:
sc

In [3]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import NGram
from pyspark.sql.functions import udf
from pyspark.ml.feature import StopWordsRemover

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

from pyspark.ml import Pipeline, PipelineModel


In [4]:
review_data = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://ec2-34-212-28-18.us-west-2.compute.amazonaws.com/msan697.review").load()

In [5]:
review_data.show(1)

+--------------------+--------------------+----+----------+-----+--------------------+-----+--------------------+------+--------------------+
|                 _id|         business_id|cool|      date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+--------------------+----+----------+-----+--------------------+-----+--------------------+------+--------------------+
|[5a5d41a969b675a5...|uYHaNptLzDLoV_JZ_...|   0|2016-07-12|    0|VfBHSwC5Vz_pbFluy...|    5|My girlfriend and...|     0|cjpdDjZyprfyDG3Rl...|
+--------------------+--------------------+----+----------+-----+--------------------+-----+--------------------+------+--------------------+
only showing top 1 row



### check distribution of the rating star

In [7]:
print review_data.groupBy(review_data["stars"]).count().show()

+-----+-------+
|stars|  count|
+-----+-------+
|    1| 639849|
|    3| 570819|
|    5|1988003|
|    4|1135830|
|    2| 402396|
+-----+-------+

None


Exclude neutral review

In [8]:
def pos_neg(star):
    if star <3:
        return int(0) #negative
    elif star >3 :
        return int(1) #positive
    else:
        return int(2) #neutral
    
star_to_senti = udf(lambda x:pos_neg(x))
train_test_DF_raw = review_data.select('text',star_to_senti('stars').alias('label')).filter("label != 2") #exclude neutral reviews

In [9]:
from pyspark.sql.types import *
train_test_DF = train_test_DF_raw.withColumn("label", train_test_DF_raw["label"].cast(DoubleType()))

In [10]:
train_test_DF.printSchema()

root
 |-- text: string (nullable = true)
 |-- label: double (nullable = true)



In [11]:
print train_test_DF.groupBy(train_test_DF["label"]).count().show()

+-----+-------+
|label|  count|
+-----+-------+
|  0.0|1042245|
|  1.0|3123833|
+-----+-------+

None


In [12]:
train_test_DF.show(1)

+--------------------+-----+
|                text|label|
+--------------------+-----+
|My girlfriend and...|  1.0|
+--------------------+-----+
only showing top 1 row



### Create TFIDF features

In [13]:
#remove punctuation
import re
import string

def remove_num_punct(text):

    my_string = text.replace("-", " ")
    regex = re.compile('[' + re.escape(string.punctuation) + '0-9\\r\\t\\n]')
    nopunct = regex.sub(" ", my_string)  # delete stuff but leave at least a space to avoid clumping together

    nopunct = nopunct.split()
    #nopunct = [stemmer.stem(w).strip(" ") for w in nopunct] #remove stop word and normalize word using stemmer.
    nopunct = [w.strip() for w in nopunct]
    nopunct = ' '.join(nopunct)
    
    return nopunct

udf_num_punct = udf(lambda x:remove_num_punct(x))
review_rmsw = train_test_DF.select(udf_num_punct('text').alias('text'), 'label')
review_rmsw.show(1,truncate = False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [14]:
### setNumFeatures(20)
n_features = 1000

### Unigram tfidf

In [15]:
tokenizer = Tokenizer().setInputCol("text").setOutputCol("words")
remover= StopWordsRemover().setInputCol("words").setOutputCol("filtered").setCaseSensitive(False)
hashingTF = HashingTF().setNumFeatures(n_features).setInputCol("filtered").setOutputCol("rawFeatures")
idf = IDF().setInputCol("rawFeatures").setOutputCol("features").setMinDocFreq(0)

### Split train and test data

In [16]:
train_set, test_set= review_rmsw.randomSplit([0.8, 0.2])
train_set = train_set.cache()
test_set = test_set.cache()

### Define evaluation metrics

In [17]:
# compute accuracy on the test set 
def evaluate_metric(predictions):
    
    evaluator = BinaryClassificationEvaluator().setMetricName("areaUnderROC")
    print "Area under ROC curve:",evaluator.evaluate(predictions)

    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                                  metricName="f1")
    f1 = evaluator.evaluate(predictions)
    print("F1_score = %0.4f" %(f1))

    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                                  metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    print("Accuracy = %0.4f" %(accuracy))
    

### Model 1: Logistic regression

In [17]:
%%time
lr =  LogisticRegression(maxIter=100, regParam=0.01, elasticNetParam=0.8)
pipeline=Pipeline(stages=[tokenizer,remover,hashingTF,idf, lr])
logreg_model=pipeline.fit(train_set)
predictions = logreg_model.transform(test_set)
#print evaluation metrics
evaluate_metric(predictions)

Area under ROC curve: 0.90763606097
F1_score = 0.8299
Accuracy = 0.8466
CPU times: user 576 ms, sys: 184 ms, total: 760 ms
Wall time: 19min 11s


In [18]:
predictions.show(5)

+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|                text|label|               words|            filtered|         rawFeatures|            features|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|A Double Double w...|  1.0|[a, double, doubl...|[double, double, ...|(1000,[100,319,48...|(1000,[100,319,48...|[-1.5326953530699...|[0.17759966568357...|       1.0|
|A bit of good and...|  1.0|[a, bit, of, good...|[bit, good, bad, ...|(1000,[1,7,17,20,...|(1000,[1,7,17,20,...|[-2.4657070055305...|[0.07829748728691...|       1.0|
|A calmer shopping...|  1.0|[a, calmer, shopp...|[calmer, shopping...|(1000,[19,43,144,...|(1000,[19,43,144,...|[-2.0775574794838...|[0.11129732771211...|       1.0|
|A f

### Model 2: Unigram Naive Bayes

In [19]:
%%time
nb = NaiveBayes(smoothing = 1.0, modelType = "multinomial")
pipeline=Pipeline(stages=[tokenizer,remover,hashingTF,idf, nb])
nb_model=pipeline.fit(train_set)
nb_prediction = nb_model.transform(test_set)
#print evaluation metrics
evaluate_metric(nb_prediction)

Area under ROC curve: 0.611796077574
F1_score = 0.8502
Accuracy = 0.8480
CPU times: user 168 ms, sys: 24 ms, total: 192 ms
Wall time: 11min 31s


In [20]:
nb_prediction.show(5)

+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|                text|label|               words|            filtered|         rawFeatures|            features|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|A Double Double w...|  1.0|[a, double, doubl...|[double, double, ...|(1000,[100,319,48...|(1000,[100,319,48...|[-157.87430553725...|[0.00344249451876...|       1.0|
|A bit of good and...|  1.0|[a, bit, of, good...|[bit, good, bad, ...|(1000,[1,7,17,20,...|(1000,[1,7,17,20,...|[-2560.0339187730...|[1.83722486379919...|       1.0|
|A calmer shopping...|  1.0|[a, calmer, shopp...|[calmer, shopping...|(1000,[19,43,144,...|(1000,[19,43,144,...|[-878.77050411191...|[4.81488550846858...|       1.0|
|A f

### Model 3: Bigram Naive Bayes

#### Bigram tfidf

In [18]:
#tokenizer = Tokenizer().setInputCol("text").setOutputCol("words")
#remover= StopWordsRemover().setInputCol("words").setOutputCol("filtered").setCaseSensitive(False)
bigram = NGram(n=2, inputCol="filtered", outputCol="bigrams")
hashingTF_bigram = HashingTF().setNumFeatures(n_features).setInputCol("bigrams").setOutputCol("rawFeatures")
idf_bigram = IDF().setInputCol("rawFeatures").setOutputCol("features").setMinDocFreq(0)

In [19]:
%%time
nb = NaiveBayes(smoothing = 1.0, modelType = "multinomial")
pipeline=Pipeline(stages=[tokenizer,remover,bigram,hashingTF_bigram,idf_bigram, nb])
nb_model_bigram=pipeline.fit(train_set)
nb_prediction_bigram = nb_model_bigram.transform(test_set)

#print evaluation metrics
evaluate_metric(nb_prediction_bigram)

Area under ROC curve: 0.606786650712
F1_score = 0.7330
Accuracy = 0.7366
CPU times: user 252 ms, sys: 16 ms, total: 268 ms
Wall time: 23min 49s


In [20]:
nb_prediction_bigram.show(5)

+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|                text|label|               words|            filtered|             bigrams|         rawFeatures|            features|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|A Double Double w...|  1.0|[a, double, doubl...|[double, double, ...|[double double, d...|(1000,[37,120,197...|(1000,[37,120,197...|[-146.40989752116...|[0.25797807881001...|       1.0|
|A float plane in ...|  1.0|[a, float, plane,...|[float, plane, de...|[float plane, pla...|(1000,[6,15,17,19...|(1000,[6,15,17,19...|[-1325.2931946499...|[0.78921656227107...|       0.0|
|A nice ambiance r...|  1.0|[a, nice, ambianc...|[nice, ambiance,

### Model 4: Trigram Naive Bayes

#### Tribgram tfidf

In [21]:
#tokenizer = Tokenizer().setInputCol("text").setOutputCol("words")
#remover= StopWordsRemover().setInputCol("words").setOutputCol("filtered").setCaseSensitive(False)
trigram = NGram(n=3, inputCol="filtered", outputCol="trigrams")
hashingTF_trigram = HashingTF().setNumFeatures(n_features).setInputCol("trigrams").setOutputCol("rawFeatures")
idf_trigram = IDF().setInputCol("rawFeatures").setOutputCol("features").setMinDocFreq(0)

In [22]:
%%time
nb = NaiveBayes(smoothing = 1.0, modelType = "multinomial")
pipeline=Pipeline(stages=[tokenizer,remover,trigram,hashingTF_trigram,idf_trigram, nb])
nb_model_trigram=pipeline.fit(train_set)
nb_prediction_trigram = nb_model_trigram.transform(test_set)
#print evaluation metrics
evaluate_metric(nb_prediction_bigram)

Area under ROC curve: 0.606786650712
F1_score = 0.7330
Accuracy = 0.7366
CPU times: user 200 ms, sys: 36 ms, total: 236 ms
Wall time: 18min 37s


In [23]:
nb_prediction_trigram.show(5)

+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|                text|label|               words|            filtered|            trigrams|         rawFeatures|            features|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|A Double Double w...|  1.0|[a, double, doubl...|[double, double, ...|[double double w,...|(1000,[18,134,302...|(1000,[18,134,302...|[-124.25031221642...|[0.27254382101278...|       1.0|
|A float plane in ...|  1.0|[a, float, plane,...|[float, plane, de...|[float plane dese...|(1000,[12,13,55,8...|(1000,[12,13,55,8...|[-1309.6464331598...|[0.17839287010797...|       1.0|
|A nice ambiance r...|  1.0|[a, nice, ambianc...|[nice, ambiance,

### Model 5: Multilayer perceptron classifier 

In [24]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

In [28]:
# specify layers for the neural network:
# input layer of size 20 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)

# %%time

layers = [n_features, 5 , 2] 
# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=10, layers=layers, blockSize=128, seed=1234)
pipeline=Pipeline(stages=[tokenizer,remover,hashingTF,idf, trainer])
nn_model = pipeline.fit(train_set)

nn_prediction = nn_model.transform(test_set)

In [29]:
nn_prediction.show(5)

+--------------------+-----+--------------------+--------------------+--------------------+--------------------+----------+
|                text|label|               words|            filtered|         rawFeatures|            features|prediction|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+----------+
|A Double Double w...|  1.0|[a, double, doubl...|[double, double, ...|(1000,[100,319,48...|(1000,[100,319,48...|       1.0|
|A float plane in ...|  1.0|[a, float, plane,...|[float, plane, de...|(1000,[3,42,51,69...|(1000,[3,42,51,69...|       1.0|
|A nice ambiance r...|  1.0|[a, nice, ambianc...|[nice, ambiance, ...|(1000,[44,73,92,1...|(1000,[44,73,92,1...|       1.0|
|A short walk nort...|  1.0|[a, short, walk, ...|[short, walk, nor...|(1000,[8,25,44,77...|(1000,[8,25,44,77...|       1.0|
|AMAZING Went at t...|  1.0|[amazing, went, a...|[amazing, went, t...|(1000,[1,36,62,76...|(1000,[1,36,62,76...|       1.0|
+-------

In [32]:
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                                  metricName="f1")
    f1 = evaluator.evaluate(nn_prediction)
    print("F1_score = %0.4f" %(f1))

F1_score = 0.8898


In [33]:
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                                  metricName="accuracy")
    accuracy = evaluator.evaluate(nn_prediction)
    print("Accuracy = %0.4f" %(accuracy))
    

Accuracy = 0.8914
