In [1]:
sc

<pyspark.context.SparkContext at 0x2ae45b1a4110>

In [2]:
sqlContext

<pyspark.sql.context.HiveContext at 0x2ae45b1ce990>

In [3]:
from pprint import pprint
import nltk
import nltk.corpus as corpus
import json
from collections import defaultdict
import itertools
import math
import re
from nltk.corpus import wordnet
import pickle
from __future__ import print_function
import pandas as pd
import bokeh

Input learned DataSet

In [4]:
ReviewRDD= sc.textFile('hdfs:///reviewsR.txt')

In [5]:
pprint(ReviewRDD.take(5))

[u'Judging from previous posts this used to be a good place, but not any longer.~negative',
 u'We, there were four of us, arrived at noon - the place was empty - and the staff acted like we were imposing on them and they were very rude.~negative',
 u'They never brought us complimentary noodles, ignored repeated requests for sugar, and threw our dishes on the table.~negative',
 u'The food was lousy - too sweet or too salty and the portions tiny.~negative',
 u'After all that, they complained to me about the small tip.~negative']


Function to get Synonyms and parse text

In [6]:
def synonyms(x):
    "generate list of synonyms for given word"
    w=wordnet.synsets(x)
    t=[]
    for i in w:
        s=str(i)
        s= re.sub('Synset\(','',s)[1:-2]
        t.extend(wordnet.synset(s).lemma_names())
    return [ i.lower() for i in t  if not any( j in str(i) for j  in ['.','_','-',',']) ][:6]

def parse(x):
    "parse text to list of words with synonyms"
    #porter = nltk.PorterStemmer()
    x=x.split('~')
    y=re.split(r'[^\w]',x[0])
    #y=[porter.stem(i.lower()) for i in y if len(i) >3 or i=='not']
    y=[re.sub(r'\.|,','',i.lower()) for i in y if len(i)>3 or i=='not']
    words=[]
    for i in y:
        words.extend(synonyms(i))
    return (words,x[1])

pprint(parse(u'Rude service, medicore food...there are tons of restaurants in NY...stay away from this one~negative'))


([u'rude',
  u'unmannered',
  u'unmannerly',
  u'bounderish',
  u'lowbred',
  u'rude',
  u'service',
  u'service',
  u'service',
  u'service',
  u'service',
  u'service',
  u'food',
  u'nutrient',
  u'food',
  u'food',
  u'there',
  u'there',
  u'there',
  u'there',
  u'thither',
  u'tons',
  u'dozens',
  u'heaps',
  u'lots',
  u'piles',
  u'scores',
  u'restaurant',
  u'eatery',
  u'stay',
  u'arrest',
  u'check',
  u'halt',
  u'hitch',
  u'stay',
  u'away',
  u'away',
  u'away',
  u'outside',
  u'away',
  u'off'],
 u'negative')


Generate RDD of parsed o/p

In [8]:
ReviewPRDD=ReviewRDD.map(lambda x: parse(x)).map(lambda (x,y):(list(set(x)),y))
ReviewPRDD.cache()

PythonRDD[3] at RDD at PythonRDD.scala:43

In [9]:
pprint(ReviewPRDD.take(1))

[([u'old',
   u'not',
   u'goodness',
   u'apply',
   u'previous',
   u'use',
   u'utilize',
   u'long',
   u'late',
   u'station',
   u'judging',
   u'non',
   u'good',
   u'utilise',
   u'evaluate',
   u'spot',
   u'judgement',
   u'yearner',
   u'judge',
   u'post',
   u'judgment',
   u'former',
   u'berth',
   u'longer',
   u'commodity',
   u'thirster',
   u'employ',
   u'place',
   u'position',
   u'property'],
  u'negative')]


Create broadcast variable of all words for feature vector

In [10]:
all_words=ReviewPRDD.map(lambda (x,y): (x))\
                    .flatMap(lambda x:x).map(lambda x: (x,1))\
                    .reduceByKey(lambda x,y:x+y)\
                    .map(lambda (x,y): (x))

wordsb=sc.broadcast(all_words.collect())

In [11]:
wordsb

<pyspark.broadcast.Broadcast at 0x2ae4755af290>

Function for feature vector generation

In [12]:
def extract_features(x,wordsb):
    d={}
    for i in wordsb.value:
        d['contains {}'.format(i)]= i in x
    return d

Feature Vector RDD Creation

In [13]:
ReviewfRDD=ReviewPRDD.map(lambda (x,y): (extract_features(x,wordsb),y))

In [14]:
ReviewfRDD.take(2)

[({'contains sizing': False,
   'contains sickly': False,
   'contains frost': False,
   'contains quaint': False,
   'contains relatively': False,
   'contains youngster': False,
   'contains bungling': False,
   'contains farewell': False,
   'contains tetrad': False,
   'contains setback': False,
   'contains rarified': False,
   'contains burger': False,
   'contains alien': False,
   'contains houseclean': False,
   'contains dispatch': False,
   'contains honor': False,
   'contains glorify': False,
   'contains curry': False,
   'contains perspective': False,
   'contains authority': False,
   'contains invariable': False,
   'contains viridity': False,
   'contains starter': False,
   'contains equitable': False,
   'contains distillery': False,
   'contains pry': False,
   'contains consistent': False,
   'contains child': False,
   'contains junket': False,
   'contains ego': False,
   'contains thick': False,
   'contains hustle': False,
   'contains okay': False,
   'contai

created train_set test_set broadcast variables

In [15]:
train_set,test_set= ReviewfRDD.randomSplit([0.8,0.2])
ReviewfRDD.cache()
training_set=sc.broadcast(train_set.collect())
testing_set=sc.broadcast(test_set.collect())

NaiveBayesClassifier classifier

In [16]:
NaiveBayesClassifier = nltk.NaiveBayesClassifier.train(training_set.value)

In [17]:
nltk.classify.accuracy(NaiveBayesClassifier,testing_set.value)

0.6791044776119403

Show top 10 informative features

In [18]:
NaiveBayesClassifier.show_most_informative_features(10)

Most Informative Features
     contains unmannerly = True           negati : positi =     39.5 : 1.0
        contains lowbred = True           negati : positi =     39.5 : 1.0
     contains unmannered = True           negati : positi =     39.5 : 1.0
           contains rude = True           negati : positi =     39.5 : 1.0
     contains bounderish = True           negati : positi =     39.5 : 1.0
          contains grand = True           positi : negati =     14.8 : 1.0
      contains fantastic = True           positi : negati =     14.8 : 1.0
            contains ask = True           negati : positi =     13.8 : 1.0
       contains splendid = True           positi : negati =      9.5 : 1.0
          contains bland = True           negati : positi =      9.3 : 1.0


Train MultinomialNB , BernoulliNB Classifier models from sklearn

In [19]:
from nltk.classify.scikitlearn import SklearnClassifier
from sklearn.naive_bayes import MultinomialNB,BernoulliNB

MNB_classifier = SklearnClassifier(MultinomialNB())
MNB_classifier.train(training_set.value)


BNB_classifier = SklearnClassifier(BernoulliNB())
BNB_classifier.train(training_set.value)


<SklearnClassifier(BernoulliNB(alpha=1.0, binarize=0.0, class_prior=None, fit_prior=True))>

In [20]:
print("MultinomialNB accuracy percent:",nltk.classify.accuracy(MNB_classifier, testing_set.value))
print("BernoulliNB accuracy percent:",nltk.classify.accuracy(BNB_classifier, testing_set.value))

MultinomialNB accuracy percent: 0.682835820896
BernoulliNB accuracy percent: 0.675373134328


Train RandomForest Classifier models from sklear

In [21]:
from sklearn.ensemble import RandomForestClassifier,GradientBoostingRegressor
RF_classifier=SklearnClassifier(RandomForestClassifier(n_estimators = 100))
RF_classifier.train(training_set.value)


<SklearnClassifier(RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
            max_depth=None, max_features='auto', max_leaf_nodes=None,
            min_samples_leaf=1, min_samples_split=2,
            min_weight_fraction_leaf=0.0, n_estimators=100, n_jobs=1,
            oob_score=False, random_state=None, verbose=0,
            warm_start=False))>

In [22]:
pprint("RF_Classifier accuracy is {}".format(nltk.classify.accuracy(RF_classifier,testing_set.value)))

'RF_Classifier accuracy is 0.649253731343'


Train LogisticRegression,SGD Classifier models from sklearn

In [23]:
from sklearn.linear_model import LogisticRegression,SGDClassifier

LogisticRegression_classifier = SklearnClassifier(LogisticRegression())
LogisticRegression_classifier.train(training_set.value)
print("LogisticRegression_classifier accuracy percent:", (nltk.classify.accuracy(LogisticRegression_classifier, testing_set.value))*100)

SGDClassifier_classifier = SklearnClassifier(SGDClassifier())
SGDClassifier_classifier.train(training_set.value)
print("SGDClassifier_classifier accuracy percent:", (nltk.classify.accuracy(SGDClassifier_classifier, testing_set.value))*100)


LogisticRegression_classifier accuracy percent: 63.8059701493
SGDClassifier_classifier accuracy percent: 62.6865671642


Train SVC,LinearSVC Classifier models from sklear

In [24]:
from sklearn.svm import SVC, LinearSVC

SVC_classifier = SklearnClassifier(SVC())
SVC_classifier.train(training_set.value)
print("SVC_classifier accuracy percent:", (nltk.classify.accuracy(SVC_classifier, testing_set.value))*100)

LinearSVC_classifier = SklearnClassifier(LinearSVC())
LinearSVC_classifier.train(training_set.value)
print("LinearSVC_classifier accuracy percent:", (nltk.classify.accuracy(LinearSVC_classifier, testing_set.value))*100)

SVC_classifier accuracy percent: 61.1940298507
LinearSVC_classifier accuracy percent: 61.1940298507


In [25]:
from nltk.classify import ClassifierI
from statistics import mode


In [26]:

class VoteClassifier(ClassifierI):
    def __init__(self, *classifiers):
        self._classifiers = classifiers
    def classify(self, features):
        votes = []
        for c in self._classifiers:
            v = c.classify(features)
            votes.append(v)
        neg_cnt,pos_cnt=0,0
        for i in votes:
            if i=='negative':
                neg_cnt+=1
            else:
                pos_cnt+=1
        if neg_cnt>pos_cnt:
            return 'negative'
        else:
            return 'positive'
    def confidence(self, features):
        votes = []
        for c in self._classifiers:
            v = c.classify(features)
            votes.append(v)

        choice_votes = votes.count(mode(votes))
        conf = choice_votes / len(votes)
        return conf

In [27]:
voted_classifier = VoteClassifier(NaiveBayesClassifier,
                                  BNB_classifier,
                                  LogisticRegression_classifier,
                                  RF_classifier,
                                  MNB_classifier,
                                  LogisticRegression_classifier,
                                  SGDClassifier_classifier,
                                  LinearSVC_classifier
                                  )

In [28]:
print("voted_classifier accuracy percent:", (nltk.classify.accuracy(voted_classifier, testing_set.value))*100)

print("Classification:", voted_classifier.classify(testing_set.value[0][0]), "Confidence %:",voted_classifier.confidence(testing_set.value[0][0])*100)
print("Classification:", voted_classifier.classify(testing_set.value[1][0]), "Confidence %:",voted_classifier.confidence(testing_set.value[1][0])*100)
print("Classification:", voted_classifier.classify(testing_set.value[2][0]), "Confidence %:",voted_classifier.confidence(testing_set.value[2][0])*100)
print("Classification:", voted_classifier.classify(testing_set.value[3][0]), "Confidence %:",voted_classifier.confidence(testing_set.value[3][0])*100)
print("Classification:", voted_classifier.classify(testing_set.value[5][0]), "Confidence %:",voted_classifier.confidence(testing_set.value[5][0])*100)

voted_classifier accuracy percent: 63.4328358209
Classification: positive Confidence %: 100
Classification: positive Confidence %: 0
Classification: positive Confidence %: 0
Classification: positive Confidence %: 0
Classification: negative Confidence %: 100


In [29]:

ReviewfRDD.unpersist()

PythonRDD[11] at RDD at PythonRDD.scala:43

ofile=open('NaiveBayesModel.pickle', 'wb')
pickle.dump(NaiveBayesClassifier,ofile)

ofile.close()

os.listdir('/oasis/projects/nsf/sun116/addankn')

In [30]:
yelpReviewDF=sqlContext.read.json('hdfs:///yelp_academic_dataset_review.json')

In [31]:
yelpReviewDF.cache()

yelpReviewDF.select("text").take(1)
yelpReviewDF.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: long (nullable = true)
 |-- text: string (nullable = true)
 |-- type: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- votes: struct (nullable = true)
 |    |-- cool: long (nullable = true)
 |    |-- funny: long (nullable = true)
 |    |-- useful: long (nullable = true)



#yelpReviewRDD=yelpReviewDF.select("review_id","text").rdd

In [32]:
yelpReviewRDD=yelpReviewDF.select("review_id","text").limit(100000)
#yelpReviewRDD=yelpReviewDF.select("review_id","text")

In [33]:
yelpReviewRDD.cache()
yelpReviewRDD.take(2)


[Row(review_id=u'Ya85v4eqdd6k9Od8HbQjyA', text=u'Mr Hoagie is an institution. Walking in, it does seem like a throwback to 30 years ago, old fashioned menu board, booths out of the 70s, and a large selection of food. Their speciality is the Italian Hoagie, and it is voted the best in the area year after year. I usually order the burger, while the patties are obviously cooked from frozen, all of the other ingredients are very fresh. Overall, its a good alternative to Subway, which is down the road.'),
 Row(review_id=u'KPvLNJ21_4wbYNctrOwWdQ', text=u"Excellent food. Superb customer service. I miss the mario machines they used to have, but it's still a great place steeped in tradition.")]

In [34]:
def yparse(x):
    y=re.split(r'[^\w]',x)
    #y=[porter.stem(i.lower()) for i in y if len(i) >3 or i=='not']
    y=[i.lower() for i in y if len(i)>3 or i=='not']
    words=[]
    for i in y:
        words.extend(synonyms(i))
    return words

def lowers(x):
    return x.lower()

In [35]:
yelpReviewpRDD=yelpReviewRDD.map(lambda (x,y):(x,y,yparse(y))).map(lambda (x,z,y):(x,z,list(set(y))))


In [36]:
wordsb.value[:2]

[u'boilersuit', u'ciao']

In [37]:
yelpReviewpRDD.cache()

PythonRDD[37] at RDD at PythonRDD.scala:43

In [38]:
yelpReviewpRDD.take(6)

[(u'Ya85v4eqdd6k9Od8HbQjyA',
  u'Mr Hoagie is an institution. Walking in, it does seem like a throwback to 30 years ago, old fashioned menu board, booths out of the 70s, and a large selection of food. Their speciality is the Italian Hoagie, and it is voted the best in the area year after year. I usually order the burger, while the patties are obviously cooked from frozen, all of the other ingredients are very fresh. Overall, its a good alternative to Subway, which is down the road.',
  [u'walking',
   u'selection',
   u'boilersuit',
   u'hamburger',
   u'years',
   u'cook',
   u'alternative',
   u'bomber',
   u'menu',
   u'outflank',
   u'alternate',
   u'eld',
   u'former',
   u'pile',
   u'board',
   u'factor',
   u'atavistic',
   u'ingredient',
   u'identical',
   u'do',
   u'patently',
   u'founding',
   u'food',
   u'big',
   u'spell',
   u'overall',
   u'choice',
   u'early',
   u'topper',
   u'declamatory',
   u'yr',
   u'down',
   u'like',
   u'goodness',
   u'patch',
   u'larg

In [39]:
yelpReviewfRDD=yelpReviewpRDD.map(lambda (x,y,z):(x,y,extract_features(z,wordsb)))

In [40]:
yelpReviewfRDD.cache()


PythonRDD[39] at RDD at PythonRDD.scala:43

In [186]:
yelpReviewfRDD.take(1)

[(u'NpkTbM23X8douRPIPletCg',
  u"My wifey & I decided to venture off of the strip, try something the locals probably do :)\n\nThis is a cute little casino that has Karaoke nights. It is really popular because when we got there & put in our request for a song we wanted to sing it was like a 30 minute wait. Enough to make you a nervous-wreck if you hate waiting. \n\nThe locals there mistaked my friend & I for locals because I guess people don't really venture off the strip. The locals were very welcoming & inviting. I would totally go back here when I go back to Vegas.\n\nNot to mention I got hit on by a lesbian ;) that's a first. Whoa der..... lol.",
  {'contains sizing': False,
   'contains sickly': False,
   'contains frost': False,
   'contains quaint': False,
   'contains relatively': False,
   'contains youngster': False,
   'contains bungling': False,
   'contains farewell': False,
   'contains neighbourhood': False,
   'contains setback': False,
   'contains rarified': False,
   

In [41]:
yelpReviewcRDD=yelpReviewfRDD.map(lambda (x,z,y):(str(x),z,str(NaiveBayesClassifier.classify(y)),str(RF_classifier.classify(y)),                                                
                                              str(BNB_classifier.classify(y)),
                                              str(LogisticRegression_classifier.classify(y)),
                                              str(MNB_classifier.classify(y)),
                                              str(SGDClassifier_classifier.classify(y)),
                                              str(LinearSVC_classifier.classify(y)),
                                                str(voted_classifier.classify(y))))

In [42]:
yelpReviewcRDD.cache()


PythonRDD[40] at RDD at PythonRDD.scala:43

In [None]:
pprint(yelpReviewcRDD.take(3))

In [190]:
type(yelpReviewcRDD)
hasattr(yelpReviewcRDD, "toDF")

True

In [191]:
yelpReviewcDF2=yelpReviewcRDD.toDF()

In [192]:
yelpReviewcDF2.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: string (nullable = true)
 |-- _4: string (nullable = true)
 |-- _5: string (nullable = true)
 |-- _6: string (nullable = true)
 |-- _7: string (nullable = true)
 |-- _8: string (nullable = true)
 |-- _9: string (nullable = true)
 |-- _10: string (nullable = true)



In [193]:
yelpReviewcDF2.show(1)

+--------------------+--------------------+--------+--------+--------+--------+--------+--------+--------+--------+
|                  _1|                  _2|      _3|      _4|      _5|      _6|      _7|      _8|      _9|     _10|
+--------------------+--------------------+--------+--------+--------+--------+--------+--------+--------+--------+
|NpkTbM23X8douRPIP...|My wifey & I deci...|negative|negative|negative|negative|negative|negative|negative|negative|
+--------------------+--------------------+--------+--------+--------+--------+--------+--------+--------+--------+
only showing top 1 row



In [194]:
yelpReviewcDF2=yelpReviewcDF2.selectExpr("_1 as ReviewId","_2 as text","_3 as NaiveBayes", "_4 as RFClassification" ,"_5 as BNBClassification "
                ,"_6 as LogisticRegressionClassification", "_7 as MNBClassification" ,
                "_8 as SGDClassification",  "_9 as LinearSVCClassification", "_10 as CombinedClassification")

In [195]:
yelpReviewcDF2.printSchema()

root
 |-- ReviewId: string (nullable = true)
 |-- text: string (nullable = true)
 |-- NaiveBayes: string (nullable = true)
 |-- RFClassification: string (nullable = true)
 |-- BNBClassification: string (nullable = true)
 |-- LogisticRegressionClassification: string (nullable = true)
 |-- MNBClassification: string (nullable = true)
 |-- SGDClassification: string (nullable = true)
 |-- LinearSVCClassification: string (nullable = true)
 |-- CombinedClassification: string (nullable = true)



In [196]:
yelpReviewcDF2.show(3)

+--------------------+--------------------+----------+----------------+-----------------+--------------------------------+-----------------+-----------------+-----------------------+----------------------+
|            ReviewId|                text|NaiveBayes|RFClassification|BNBClassification|LogisticRegressionClassification|MNBClassification|SGDClassification|LinearSVCClassification|CombinedClassification|
+--------------------+--------------------+----------+----------------+-----------------+--------------------------------+-----------------+-----------------+-----------------------+----------------------+
|NpkTbM23X8douRPIP...|My wifey & I deci...|  negative|        negative|         negative|                        negative|         negative|         negative|               negative|              negative|
|Dcn7SHEyUN5BDalaU...|Beer is awesome! ...|  negative|        negative|         negative|                        negative|         negative|         negative|               neg

In [197]:
#yelpReviewcDF2.rdd.saveAsTextFile("file:///home/addankn/out.txt")

In [198]:
yelpReviewPD=yelpReviewcDF2.toPandas()

In [199]:
type(yelpReviewPD)

pandas.core.frame.DataFrame

In [200]:
yelpReviewPD

Unnamed: 0,ReviewId,text,NaiveBayes,RFClassification,BNBClassification,LogisticRegressionClassification,MNBClassification,SGDClassification,LinearSVCClassification,CombinedClassification
0,NpkTbM23X8douRPIPletCg,My wifey & I decided to venture off of the str...,negative,negative,negative,negative,negative,negative,negative,negative
1,Dcn7SHEyUN5BDalaUDc-6g,Beer is awesome! Food is a russian roulette! S...,negative,negative,negative,negative,negative,negative,negative,negative
2,2URAUtCMjMzFJhMkTpMnLA,Every time we come to Vegas this is the 1st pl...,positive,positive,positive,positive,positive,positive,positive,positive
3,hoiHUrd2csfiiqKVlaVWbw,"what a find this place was , had the steak spe...",negative,negative,negative,negative,negative,negative,negative,negative
4,zAx6l2nsW19ZmqLiAEP29w,FREAKING LOVE THIS PLACE.\n\nHave gone here ev...,negative,positive,negative,positive,negative,negative,negative,negative
5,kaz0xlmj-72GGd-_kAJDWQ,We were referred by a friend and I must say if...,negative,positive,negative,positive,positive,negative,positive,positive
6,7YBQ6YkKw7IiRAIbwAhRIg,The parking sucks but besides that this place ...,positive,negative,positive,positive,positive,negative,negative,positive
7,BUMGO2QBIXcOm3Jh1Nd6mQ,This place is AWESOME!!! I've been hearing abo...,negative,negative,negative,positive,positive,positive,positive,positive
8,5SLkvG5RV0qJnh9ZLGS9Ww,So I'm staying at the Holiday Inn off the stri...,negative,positive,negative,negative,negative,positive,positive,negative
9,_-MdgspEO1dAxgKxtOAnSA,Wow this place is fun! The beer is good and t...,positive,positive,positive,positive,positive,positive,positive,positive


In [201]:
yelpReviewPD.to_pickle("yelpReview10000.p")