In [71]:
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import StopWordsRemover,Tokenizer
from pyspark.sql.functions import lower
from pyspark.sql.types import Row
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql import SQLContext
from pyspark import SparkContext

import re
from collections import Counter
from nltk.stem import PorterStemmer
from math import log10,sqrt

In [77]:
spark = SparkSession\
        .builder\
        .appName("NaiveBayesExample")\
        .getOrCreate()

data = spark.read.csv("Sentiment Analysis Dataset.csv",header=True)

In [78]:
stemmer = PorterStemmer()
def parse(sentence):
    blob = [re.sub('[^a-zA-Z]+', '', s) for s in sentence]
    blob = [stemmer.stem(i) for i in blob]
    blob = [i.lower() for i in blob if len(i)>2 and len(i)<20]
    return blob

In [79]:
# Converting senteces to words
tokenizer = Tokenizer(inputCol="SentimentText", outputCol="words")
token_df=tokenizer.transform(data).drop("SentimentText")

#Removing stop words 
remover=StopWordsRemover(inputCol="words", outputCol="filtered_tweet")
data_stop_words=remover.transform(token_df).drop("words")\
                       .select("*").withColumn("id", monotonically_increasing_id())


data_stop_words_rdd=data_stop_words.rdd       #transforming to RDD

#Spliting into train and test data
train,test=data_stop_words_rdd.randomSplit([0.7,0.3])

train_data_tweets=train.map(lambda row: ("doc"+str(row.id),parse(row.filtered_tweet)))     # RDD -> (doc_id,tweet)      
train_labels=train.map(lambda row:("doc"+str(row.id),row.Sentiment)) # RDD -> (doc_id, sentiment)

test_data_tweets=test.map(lambda row: ("testDoc"+str(row.id),parse(row.filtered_tweet)))    # RDD -> (doc_id, tweet)
test_labels=test.map(lambda row:("testDoc"+str(row.id),row.Sentiment))   # RDD -> (doc_id, sentiment)

In [80]:
train_data_tweets.take(5)

[('doc0', ['sad', 'apl', 'friend']),
 ('doc1', ['miss', 'new', 'moon', 'trailer']),
 ('doc2', ['omg', 'alreadi']),
 ('doc6', ['chillin']),
 ('doc9', ['hmmmm', 'wonder', 'number'])]

In [81]:
def term_freq(row):
    doc_id=row[0]
    doc=row[1]
    a=Counter(doc).items()
    return [(k[0],(1+log10(k[1]),doc_id)) for k in a]           # TF= 1+log(WF); RDD -> (term,(TF,doc_id))

In [82]:
train_tf=train_data_tweets.flatMap(term_freq)  # RDD -> (term,(TF,doc_id))
test_tf=test_data_tweets.flatMap(term_freq)    # RDD -> (term,(TF,doc_id))

In [83]:
train_tf.take(8)

[('sad', (1.0, 'doc0')),
 ('apl', (1.0, 'doc0')),
 ('friend', (1.0, 'doc0')),
 ('miss', (1.0, 'doc1')),
 ('trailer', (1.0, 'doc1')),
 ('new', (1.0, 'doc1')),
 ('moon', (1.0, 'doc1')),
 ('alreadi', (1.0, 'doc2'))]

In [84]:
def tfidfCalc(row):
    tf=row[1][1][0]
    idf=log10(1+(num_docs/row[1][0]))
    doc_id=row[1][1][1]
    return (doc_id,(row[0],tf*idf))                    #Mulitplying TF and IDF

In [85]:
train_tf.map(lambda row:(row[0],1)).reduceByKey(lambda a,b:a+b).take(4)

[('noelpresid', 1), ('nite', 11), ('mad', 23), ('iwi', 1)]

In [86]:
train_doc_count=train_tf.map(lambda row:(row[0],1)).reduceByKey(lambda x,y: x+y)   # RDD -> (term, Number of documents containing the term t)
# print(train_doc_count.take(5))

In [87]:
num_docs=train.count()
train_tfidf=train_doc_count.join(train_tf)\
                           .map(tfidfCalc)       #RDD -> (term,(doc_count,())) => #RDD -> (doc_id,(term,tfidf))

In [88]:
train_tfidf.take(8)

[('doc2227', ('noelpresid', 3.8477576883923312)),
 ('doc430', ('nite', 2.8069811986863766)),
 ('doc471', ('nite', 2.8069811986863766)),
 ('doc1472', ('nite', 2.8069811986863766)),
 ('doc1525', ('nite', 2.8069811986863766)),
 ('doc1661', ('nite', 2.8069811986863766)),
 ('doc1741', ('nite', 2.8069811986863766)),
 ('doc2267', ('nite', 2.8069811986863766))]

In [89]:
denom_square=train_tfidf.map(lambda row:(row[0],row[1][1])).reduceByKey(lambda a,b:a*a+b*b) #RDD -> (doc_id,denom_square)

In [90]:
denom_square.join(train_tfidf).take(4)

[('doc7049', (1.2044510733559554e+79, ('episod', 2.8069811986863766))),
 ('doc7049', (1.2044510733559554e+79, ('fave', 2.8940082052377174))),
 ('doc7049', (1.2044510733559554e+79, ('know', 1.6081171952607451))),
 ('doc7049', (1.2044510733559554e+79, ('let', 1.9495119823462912)))]

In [91]:
def norm_tfidf(row):
    tf_idf=row[1][1][1]/sqrt(row[1][0])
    return (row[0],(row[1][1][0],tf_idf))

In [92]:
train_tfidf_norm=denom_square.join(train_tfidf).map(norm_tfidf)# RDD -> (doc_id,())
train_tfidf_norm.take(4)

[('doc7049', ('episod', 8.088070389699966e-40)),
 ('doc7049', ('fave', 8.338831084186102e-40)),
 ('doc7049', ('know', 4.633648802579319e-40)),
 ('doc7049', ('let', 5.61734797018213e-40))]

In [93]:
train_tfidf_norm_labels=train_tfidf_norm.join(train_labels)     
#RDD -> (doc_id,((term,tfidf),label))
#train_tfidf_norm_labels.collect()                                   #RDD -> (doc_id,((term,tfidf),label))

# Naive Bayes

In [None]:
alpha=train_tfidf.count()     #Vocabulary
alpha

In [None]:
labels_count=train.map(lambda row:(row.Sentiment,1))\
                                    .reduceByKey(lambda a,b:a+b).collect()
labels_count=dict([(lab,c/train.count()) for lab,c in labels_count])

In [None]:
B=dict(train_tfidf_norm_labels.map(lambda row:(row[1][1],row[1][0][1]))\
                              .reduceByKey(lambda a,b:a+b).collect())            #Number of words in each class
print(B)
B_alpha={k: v +alpha for k, v in B.items()}
B_alpha

In [None]:
# (doc_id,((term,tfidf),label))=> ((term,label),tfidf) => (term,(label,prob=(1+tfidf)/(alpha+label_count)))
prior=train_tfidf_norm_labels.map(lambda row:((row[1][0][0],row[1][1]),row[1][0][1]))\
                     .reduceByKey(lambda a,b:a+b)\
                     .map(lambda row:((row[0][0],row[0][1]),(1+row[1])/B_alpha[row[0][1]]))
print("prior",prior.take(5))

In [None]:
# format for join of tes_tf and prior is [(term,((term_freq,doc_id),(label,prob)))]
test_prior_negative=test_tf.map(lambda row: ((row[0],'0'),(row[1][0],row[1][1]))).leftOuterJoin(prior)
test_prior_positive=test_tf.map(lambda row: ((row[0],'1'),(row[1][0],row[1][1]))).leftOuterJoin(prior)
# print("test_prior_negative",test_prior_negative.collect())
# print("test_prior_positive",test_prior_positive.collect())

In [None]:
def checkForNone(lab,val):
    if(val==None):
        return 1/B_alpha[lab]
    else:
        return val
    
#Multiplying probabilities of terms in each doc    
test_negative_docs=test_prior_negative.map(lambda row: (row[1][0][1],checkForNone(row[0][1],row[1][1]))).reduceByKey(lambda a,b:a*b)
test_positive_docs=test_prior_positive.map(lambda row: (row[1][0][1],checkForNone(row[0][1],row[1][1]))).reduceByKey(lambda a,b:a*b)

In [None]:
# print("test_negative_docs",test_negative_docs.collect())
# print("test_positive_docs",test_positive_docs.collect())

In [None]:
predicted_values=test_negative_docs.join(test_positive_docs).map(lambda row: (row[0],'0' if row[1][0]>row[1][1] else '1'))

In [None]:
accuracy=dict(predicted_values.join(test_labels).map(lambda row: (True,1) if row[1][0]==row[1][1] else (False,1)).reduceByKey(lambda a,b:a+b).collect())
accuracy=100*accuracy[True]/sum(accuracy.values())
print(str(data.count())+"=>"+str(accuracy))

# KNN


In [94]:
def test_tfidfCalc(tf,df):
    idf=log10(1+(num_docs+1/df+1))
    return tf*idf

In [95]:
test_tfidf=test_tf.join(train_doc_count).map(lambda row:(row[1][0][1],(row[0],test_tfidfCalc(row[1][0][0],row[1][1]))))

In [96]:
# test_tfidf.take(4)

In [97]:
test_denom_square=test_tfidf.map(lambda row:(row[0],row[1][1])).reduceByKey(lambda a,b:a*a+b*b) #RDD -> (doc_id,denom_square)
# test_denom_square.take(4)

In [98]:
def test_norm_tfidf(row):
    tf_idf=row[1][1][1]/sqrt(row[1][0])
    return (row[1][1][0],(row[0],tf_idf))

In [99]:
test_tfidf_norm=test_denom_square.join(test_tfidf).map(test_norm_tfidf)# RDD -> (doc_id,())

train_tfidf_norm.take(4)

In [100]:
test_train=test_tfidf_norm.join(train_tfidf_norm.map(lambda row:(row[1][0],(row[0],row[1][1]))))

In [101]:
# test_train.take(1)

In [102]:
test_scores=test_train.map(lambda row: ((row[1][0][0],row[1][1][0]),row[1][0][1]*row[1][1][1]))\
                           .reduceByKey(lambda a,b:a+b)
# test_scores.collect()

In [103]:
test_sorted=test_scores.map(lambda row:(row[0][0],(row[0][1],row[1])))
# test_sorted.take(9)

In [104]:
# test_sorted.filter(lambda row:row[0]=='testDoc10').take(9)

In [None]:
predicted=[]
out=[]
for i in set(test_sorted.keys().collect()):
    top_9_docs=dict(test_sorted.filter(lambda row:row[0]==i).map(lambda row:(row[1][0],row[1][1])).sortBy(lambda row:row[0],ascending=False).take(3))
    top_9_docs_labels=train_labels.filter(lambda row:row[0] in top_9_docs.keys())
    labels_count=dict(top_9_docs_labels.map(lambda row:(row[1],1)).reduceByKey(lambda a,b:a+b).collect())
    test_output=max(labels_count,key=labels_count.get)
    out.append(test_output)
    predicted.append((i,test_output))
predictedRDD=sc.parallelize(predicted)

In [None]:
accuracy=100*(dict(predictedRDD.join(test_labels).map(lambda row: (True,1) if row[1][0]==row[1][1] else (False,1))\
                                   .reduceByKey(lambda a,b:a+b).collect())[True])/test_labels.count()

In [None]:
accuracy