## Sentiment Analysis with MLlib
__Author : Erica Lee__

In [4]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.util import MLUtils
from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import HashingTF, IDF
from math import exp
import re
from collections import Counter




**Importing**
Data set: http://ai.stanford.edu/~amaas/data/sentiment/

- Method : local
- 25,000 highly polar movie reviews for training, and 25,000 for testing.


In [8]:
def parseline(line, label):
    rgx = re.compile("([\w][\w']*\w)")
    words = rgx.findall(line.lower().replace("<br","").replace("</br",""))
    return {"text": words, "label": label}

def parse_pos(line):
    return parseline(line,1)

def parse_neg(line):
    return parseline(line,-1)

# From local
pos_train = sc.textFile("../aclImdb/train_pos.txt").map(parse_pos)
neg_train = sc.textFile("../aclImdb/train_neg.txt").map(parse_neg)
pos_test = sc.textFile("../aclImdb/test_pos.txt").map(parse_pos)
neg_test = sc.textFile("../aclImdb/test_neg.txt").map(parse_neg)

# pos_train = sc.textFile(s3files[0]).map(parse_pos)
# neg_train = sc.textFile(s3files[1]).map(parse_neg)
# pos_test = sc.textFile(s3files[2]).map(parse_pos)
# neg_test = sc.textFile(s3files[3]).map(parse_neg)

print "Total number of training, positive documetns: ",pos_train.count()
print "Total number of training, negative documetns: ",neg_train.count()
print "Total number of testing, positive documetns: ",pos_test.count()
print "Total number of testing, negative documetns: ",neg_test.count()

all_train = pos_train.union(neg_train)
labels = all_train.map(lambda doc: doc["label"], preservesPartitioning=True)

all_test = pos_test.union(neg_test)

tf = HashingTF().transform(all_train.map(lambda doc: doc["text"], preservesPartitioning=True))
tf.take(1)
idf = IDF().fit(tf)

tfidf = idf.transform(tf)

Total number of training, positive documetns:  12500
Total number of training, negative documetns:  12500
Total number of testing, positive documetns:  12500
Total number of testing, negative documetns:  12500


In [9]:
# Combine using zip
training = labels.zip(tfidf).map(lambda x: LabeledPoint(x[0], x[1]))

# Train and check
model = NaiveBayes.train(training)
trainlabel = labels.zip(model.predict(tfidf)).map(lambda x: {"actual": x[0], "predicted": x[1]})

In [10]:
# trainlabel.collect()
trainlabel = labels.zip(model.predict(tfidf)).map(lambda x: (x[0], x[1]))
trainaccuracy = trainlabel.filter(lambda (x,v): x == v).count()/float(all_train.count())
print "Training Accuracy: ", trainaccuracy


Training Accuracy:  0.94824


In [11]:
labels_test = all_test.map(lambda doc: doc["label"], preservesPartitioning=True)
tf_test = HashingTF().transform(all_test.map(lambda doc: doc["text"], preservesPartitioning=True))
idf_test = IDF().fit(tf_test)
tfidf_test = idf.transform(tf_test)

# predictions = model.predict(tfidf_test).collect()

In [12]:
testlabel = labels_test.zip(model.predict(tfidf_test)).map(lambda x: (x[0], x[1]))
accuracy = testlabel.filter(lambda (x,v): x == v).count()/float(all_test.count())
print "Final Accuracy: ", accuracy


Final Accuracy:  0.7706


## Developing a Simple, Naive Bayes Classifier from Scratch in Spark

In [13]:
print "Total number of training, positive documetns: ",pos_train.count()
print "Total number of training, negative documetns: ",neg_train.count()
print "Total number of testing, positive documetns: ",pos_test.count()
print "Total number of testing, negative documetns: ",neg_test.count()

pos_n = pos_train.map(lambda x: (x['text'])).flatMap(lambda x:x).count()
neg_n = neg_train.map(lambda x: (x['text'])).flatMap(lambda x:x).count()

pos_n_test = float(pos_test.count())
neg_n_test = float(neg_test.count())

all_train = pos_train.union(neg_train)
all_test = pos_train.union(neg_train)


Total number of training, positive documetns:  12500
Total number of training, negative documetns:  12500
Total number of testing, positive documetns:  12500
Total number of testing, negative documetns:  12500


In [14]:
counter = all_train.map(lambda x: (x['text'])).flatMap(lambda x:x).map(lambda x:(x,1)).reduceByKey(lambda x,y: x+y)
poscount = pos_train.map(lambda x: (x['text'])).flatMap(lambda x:x).map(lambda x:(x,1)).reduceByKey(lambda x,y: x+y)
negcount = neg_train.map(lambda x: (x['text'])).flatMap(lambda x:x).map(lambda x:(x,1)).reduceByKey(lambda x,y: x+y)

print "Total number of word count in 'positive' documents: ", pos_n
print "Total number of word count in 'negative' documents: ", neg_n

Total number of word count in 'positive' documents:  2826798
Total number of word count in 'negative' documents:  2748845


In [15]:
posprob = poscount.map(lambda x: (x[0], (x[1],pos_n)))
negprob = negcount.map(lambda x: (x[0], (x[1],neg_n)))

In [16]:
alltestcount = pos_n_test + neg_n_test
truelabels = all_test.map(lambda x: x['label']).zipWithIndex().map(lambda x: (x[1], x[0]))

import numpy as np

def indexdocs(docs):
    return [(docs[0][i], docs[1]) for i in range(len(docs[0]))]

testdocs = all_test.map(lambda x: x['text']).zipWithIndex().map(indexdocs).flatMap(lambda x:x)

test_p = testdocs.join(posprob).map(lambda x:(x[1][0], np.log(x[1][1][0]/(float(x[1][1][1])))))
test_n = testdocs.join(negprob).map(lambda x:(x[1][0], np.log(x[1][1][0]/(float(x[1][1][1])))))

In [17]:
test_p2 = test_p.reduceByKey(lambda x,y: x+y)
test_n2 = test_n.reduceByKey(lambda x,y: x+y)

In [18]:
def posneg(tup):
    if tup[1][0] < tup[1][1]: pred = 1
    else: pred = -1
    return (tup[0],pred)

test_pred = test_p2.join(test_n2).map(posneg)
correct = test_pred.join(truelabels).map(lambda x:(x[1][0]*x[1][1] + 1)/2).collect()
accuracy = sum(correct)/float(len(correct))
print "Accuracy: ", accuracy
# test_pred.take(10)

Accuracy:  0.63524
