# Naive Bayes for Sentiment Analysis

I will be using [this](http://ai.stanford.edu/~amaas/data/sentiment/) dataset for binary sentiment classification. The dataset contains 25,000 highly polar movie reviews for training, and 25,000 for testing. To get the data:

`wget http://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz`

## Load Data

In [163]:
from pyspark import SparkContext 
import numpy as np
from collections import Counter
import re
import string
import os
import numpy as np
from nltk.corpus import stopwords

In [164]:
sc = SparkContext.getOrCreate()
train_path = "/Users/deena/Documents/Intersession/spark-nb/aclImdb/train/"
test_path ="/Users/deena/Documents/Intersession/spark-nb/aclImdb/test/"

In [165]:
data_raw_pos = sc.textFile(train_path + "pos/*.txt")
data_raw_neg = sc.textFile(train_path + "neg/*.txt")

In [166]:
# note that this is a whole review
data_raw_pos.first()

u'For a movie that gets no respect there sure are a lot of memorable quotes listed for this gem. Imagine a movie where Joe Piscopo is actually funny! Maureen Stapleton is a scene stealer. The Moroni character is an absolute scream. Watch for Alan "The Skipper" Hale jr. as a police Sgt.'

In [167]:
# sample 20% of the data
data_raw_pos = data_raw_pos.sample(False, 0.2, 1)
data_raw_neg = data_raw_neg.sample(False, 0.2, 1)

In [168]:
# number of partitions
data_raw_pos.getNumPartitions()

12500

In [169]:
# Repartition
num_partitions = 8
data_raw_pos = data_raw_pos.repartition(num_partitions)
data_raw_neg = data_raw_neg.repartition(num_partitions)

In [170]:
# count 2529 elements
print(data_raw_pos.count())
print(data_raw_neg.count())

2529
2529


## Training NB

In [171]:
# Data cleaning: Remove stop words and punctuation
def words(text):
    regex = re.compile('[' + re.escape(string.punctuation) + '0-9\\r\\t\\n]')
    word = regex.sub("", text)
    if len(word) > 2 and word.strip().lower() not in stopwords.words('english'):
        return word.strip().lower()

In [172]:
bigrams_pos = data_raw_pos.map(lambda x: x.split()).map(lambda x: [words(w) for w in x])\
    .map(lambda x: [w.strip() for w in x if w is not None])\
    .flatMap(lambda x: [x[i]+'_'+x[i+1] for i in range(0,len(x)-1)])\
    .map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y).sortBy(lambda x: x[1], ascending = False)\
    .map(lambda x: x[0]).take(100)

In [173]:
def bigram_mapping(x, bigrams):
    words = []
    for i in range(0, len(x)-1):
        if x[i]+'_'+x[i+1] in bigrams:
            words.append(x[i]+'_'+x[i+1])
        else:
            words.append(x[i])
    return words

In [174]:
pos_words = data_raw_pos.map(lambda x: x.split()).map(lambda x: [words(w) for w in x])\
    .map(lambda x: [w.strip() for w in x if w is not None])\
    .flatMap(lambda x: bigram_mapping(x, bigrams_pos))#.flatMap(lambda x: x)
data_pos = pos_words.map(lambda x: (x, 1))
data_pos = data_pos.reduceByKey(lambda x,y:x+y)

In [175]:
bigrams_neg = data_raw_neg.map(lambda x: x.split()).map(lambda x: [words(w) for w in x])\
    .map(lambda x: [w.strip() for w in x if w is not None])\
    .flatMap(lambda x: [x[i]+'_'+x[i+1] for i in range(0,len(x)-1)])\
    .map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y).sortBy(lambda x: x[1], ascending = False)\
    .map(lambda x: x[0]).take(100)

In [176]:
neg_words = data_raw_neg.map(lambda x: x.split()).map(lambda x: [words(w) for w in x])\
    .map(lambda x: [w.strip() for w in x if w is not None])\
    .flatMap(lambda x: bigram_mapping(x, bigrams_neg))#.flatMap(lambda x: x)
data_neg = neg_words.map(lambda x: (x, 1))
data_neg = data_neg.reduceByKey(lambda x,y:x+y)

### Naive Bayes Implementation

Computing count(pos) and count(neg):

In [177]:
count_pos = data_pos.map(lambda x: x[1]).reduce(lambda x,y:x+y)
count_neg = data_neg.map(lambda x: x[1]).reduce(lambda x,y:x+y)

In [178]:
print(count_pos, count_neg)

(302041, 294762)


In [179]:
## Getting V
v1 = data_pos.map(lambda x: x[0]) # pos vocabulary
v2 = data_neg.map(lambda x: x[0]) # neg vocabulary
v = v1.union(v2)
#v.count()
v0 = v.distinct()
V = v0.count()
print(V)

50136


In [180]:
# Denominators are different 
pos_denom = float(count_pos + V + 1)
neg_denom = float(count_neg + V + 1)

In [181]:
# log probabities
pos_prob = data_pos.map(lambda x: (x[0], np.log(float(x[1] + 1)/pos_denom)))

neg_prob = data_neg.map(lambda x: (x[0], np.log(float(x[1] + 1)/neg_denom))) 

In [182]:
pos_prob.take(10)

[(u'fawn', -12.078744828024506),
 (u'hazenut', -12.078744828024506),
 (u'antiamericans', -12.078744828024506),
 (u'divinely', -11.673279719916343),
 (u'blackend', -12.078744828024506),
 (u'resist', -10.373996735786081),
 (u'sahan', -11.673279719916343),
 (u'joshua', -11.673279719916343),
 (u'needlessly', -12.078744828024506),
 (u'advices', -12.078744828024506)]

In [183]:
pos_prob = dict(pos_prob.collect())
neg_prob = dict(neg_prob.collect())

In [184]:
# broadcast = shared by all nodes
pos_prob_b = sc.broadcast(pos_prob)
neg_prob_b = sc.broadcast(neg_prob)

## Prediction

In [185]:
test_raw_pos = sc.textFile(test_path + "pos/*.txt")
test_raw_neg = sc.textFile(test_path + "neg/*.txt")

test_raw_pos = test_raw_pos.sample(False, 0.1, 1)
test_raw_neg = test_raw_neg.sample(False, 0.1, 1)

num_partitions = 8
test_raw_pos = test_raw_pos.repartition(num_partitions)
test_raw_neg = test_raw_neg.repartition(num_partitions)

print(test_raw_pos.count())
print(test_raw_neg.count())

1277
1277


In [154]:
def pred_class(doc):
    doc_words = [words(w) for w in doc.split(" ") if w is not None]
    doc_words = [w for w in doc_words if w is not None]
    doc_words= bigram_mapping(doc_words, bigrams + bigrams_neg)

    counts = Counter(doc_words)
    log_pos = 0.0
    log_neg = 0.0
    for w in counts:
        log_pos += counts[w]* pos_prob_b.value.get(w, np.log(1.0/pos_denom))
        log_neg += counts[w]* neg_prob_b.value.get(w, np.log(1.0/neg_denom))
    if log_pos > log_neg:
        return "pos"
    return "neg"

In [155]:
test_pos_res = test_raw_pos.map(pred_class)
test_pos_res.take(10)

['neg', 'neg', 'pos', 'pos', 'pos', 'pos', 'pos', 'neg', 'pos', 'pos']

In [156]:
test_pos_res = test_raw_pos.map(pred_class).map(lambda x: (x, 1)).reduceByKey(lambda x,y:x+y)
pos_results = dict(test_pos_res.collect())
print(pos_results)

{'neg': 283, 'pos': 994}


In [157]:
test_neg_res = test_raw_neg.map(pred_class).map(lambda x: (x, 1)).reduceByKey(lambda x,y:x+y)
neg_results = dict(test_neg_res.collect())
print(neg_results)

{'neg': 1125, 'pos': 152}


In [158]:
# compute accuracy
total = sum(neg_results.values()) + sum(pos_results.values())
acc = float(neg_results["neg"] + pos_results["pos"]) / float(total)
print(acc)

0.829678935004
