In [1]:
import findspark
findspark.init()
import sys
from nltk.tokenize import word_tokenize
from pyspark.mllib.regression import LabeledPoint
import nltk

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
conf = SparkConf()
conf.setAppName("TwitterSentiAnalysis")
conf.set("spark.network.timeout","100000s")
sc = SparkContext(conf=conf)
   
spark = SparkSession.builder.appName("TwitterSentiAnalysis").getOrCreate()
#to start a spark context
#read data file
data=sc.textFile("D:/HPC-proj/datasetfinal.csv")
head=data.first()

pos = sc.textFile("D:/HPC-proj/pos1.txt")
neg = sc.textFile("D:/HPC-proj/neg1.txt")
pos_sp = pos.flatMap(lambda line: line.split("\n")).collect()
neg_sp = neg.flatMap(lambda line: line.split("\n")).collect()
all_words = []
documents = []
allowed = ["J", "R", "V", "N"]
for p in pos_sp:
    documents.append({"text": p , "label": 1})

for p in neg_sp:
    documents.append({"text": p , "label": 0})

def wc(data):
    words = word_tokenize(data)
    tag = nltk.pos_tag(words)
    for w in tag:
        if w[1][0] in allowed:
            all_words.append(w[0].lower())

    return all_words


raw_data = sc.parallelize(documents, numSlices=100)
raw_tokenized = raw_data.map(lambda dic : {"text": wc(dic["text"]) , "label" : dic["label"]})


In [3]:
import nltk

from pyspark import SparkConf, SparkContext

from nltk.tokenize import word_tokenize

from pyspark.mllib.feature import HashingTF, IDF

from pyspark.mllib.regression import LabeledPoint

htf = HashingTF(50000)
raw_hashed = raw_tokenized.map(lambda dic : LabeledPoint(dic["label"], htf.transform(dic["text"])))
raw_hashed.persist()
trained_hashed, test_hashed = raw_hashed.randomSplit([0.7, 0.3])

In [4]:
def s(row):
    split_row=row.split(",")
    return (split_row[0],split_row[1].split(" "))

data=data.filter(lambda r: r!=head)
dataset=data.map(lambda r: s(r))
#convert rdd into sql dataframe to remove stop words, make ngram model and turn review into a feature
dataframe=spark.createDataFrame(dataset, ["sentiment", "tweet"])


In [5]:
dataframe.show(n=10)

+---------+--------------------+
|sentiment|               tweet|
+---------+--------------------+
|        0|[is, so, sad, for...|
|        0|[i, missed, the, ...|
|        1|[omg, its, alread...|
|        0|[omgaga, im, sooo...|
|        0|[i, think, mi, bf...|
|        0|[or, i, just, wor...|
|        1|[juuuuuuuuuuuuuuu...|
|        0|[sunny, again, wo...|
|        1|[handed, in, my, ...|
|        1|[hmmmm, i, wonder...|
+---------+--------------------+
only showing top 10 rows



In [6]:
from pyspark.ml.feature import StopWordsRemover
remover=StopWordsRemover(inputCol="tweet", outputCol="filtered")
filtered_df=remover.transform(dataframe)

In [7]:
filtered_df.show(n=10)

+---------+--------------------+--------------------+
|sentiment|               tweet|            filtered|
+---------+--------------------+--------------------+
|        0|[is, so, sad, for...|  [sad, apl, friend]|
|        0|[i, missed, the, ...|[missed, new, moo...|
|        1|[omg, its, alread...|   [omg, already, o]|
|        0|[omgaga, im, sooo...|[omgaga, im, sooo...|
|        0|[i, think, mi, bf...|[think, mi, bf, c...|
|        0|[or, i, just, wor...|       [worry, much]|
|        1|[juuuuuuuuuuuuuuu...|[juuuuuuuuuuuuuuu...|
|        0|[sunny, again, wo...|[sunny, work, tom...|
|        1|[handed, in, my, ...|[handed, uniform,...|
|        1|[hmmmm, i, wonder...|[hmmmm, wonder, n...|
+---------+--------------------+--------------------+
only showing top 10 rows



In [8]:
#now make 2-gram model

from pyspark.ml.feature import NGram
ngram=NGram(n=2, inputCol="filtered", outputCol="2gram")
gram_df=ngram.transform(filtered_df)

In [9]:
gram_df.show(n=10)

+---------+--------------------+--------------------+--------------------+
|sentiment|               tweet|            filtered|               2gram|
+---------+--------------------+--------------------+--------------------+
|        0|[is, so, sad, for...|  [sad, apl, friend]|[sad apl, apl fri...|
|        0|[i, missed, the, ...|[missed, new, moo...|[missed new, new ...|
|        1|[omg, its, alread...|   [omg, already, o]|[omg already, alr...|
|        0|[omgaga, im, sooo...|[omgaga, im, sooo...|[omgaga im, im so...|
|        0|[i, think, mi, bf...|[think, mi, bf, c...|[think mi, mi bf,...|
|        0|[or, i, just, wor...|       [worry, much]|        [worry much]|
|        1|[juuuuuuuuuuuuuuu...|[juuuuuuuuuuuuuuu...|[juuuuuuuuuuuuuuu...|
|        0|[sunny, again, wo...|[sunny, work, tom...|[sunny work, work...|
|        1|[handed, in, my, ...|[handed, uniform,...|[handed uniform, ...|
|        1|[hmmmm, i, wonder...|[hmmmm, wonder, n...|[hmmmm wonder, wo...|
+---------+--------------

In [11]:
#now make term frequency vectors out of data frame to feed machine
from pyspark.ml.feature import HashingTF,IDF
hashingtf=HashingTF(inputCol="2gram", outputCol="tf", numFeatures=20000)
tf_df=hashingtf.transform(gram_df)
#tf-idf
idf=IDF(inputCol="tf", outputCol="idftf")
idfModel=idf.fit(tf_df)
idf_df=idfModel.transform(tf_df)

In [12]:
idf_df.show(n=10)

+---------+--------------------+--------------------+--------------------+--------------------+--------------------+
|sentiment|               tweet|            filtered|               2gram|                  tf|               idftf|
+---------+--------------------+--------------------+--------------------+--------------------+--------------------+
|        0|[is, so, sad, for...|  [sad, apl, friend]|[sad apl, apl fri...|(20000,[9509,1977...|(20000,[9509,1977...|
|        0|[i, missed, the, ...|[missed, new, moo...|[missed new, new ...|(20000,[1358,2474...|(20000,[1358,2474...|
|        1|[omg, its, alread...|   [omg, already, o]|[omg already, alr...|(20000,[11657,197...|(20000,[11657,197...|
|        0|[omgaga, im, sooo...|[omgaga, im, sooo...|[omgaga im, im so...|(20000,[1251,2920...|(20000,[1251,2920...|
|        0|[i, think, mi, bf...|[think, mi, bf, c...|[think mi, mi bf,...|(20000,[3128,4257...|(20000,[3128,4257...|
|        0|[or, i, just, wor...|       [worry, much]|        [wo

In [13]:
#convert dataframe t rdd, to make a LabeledPoint tuple(label, feature, vector) for machine
tf_rdd=tf_df.rdd

from pyspark.mllib.linalg import  Vectors as MLLibVectors
#we also need to convert ml.sparsevector mllib.sparse vector, because naive bayes only accepts mllib.sparsevector type
train_dataset=tf_rdd.map(lambda x: LabeledPoint(float(x.sentiment), MLLibVectors.fromML(x.tf)))
#split dataset into train, test
train, test=train_dataset.randomSplit([0.9, 0.1], seed=11)
print(train.first())
print(test.first())

(0.0,(20000,[9509,19771],[1.0,1.0]))
(0.0,(20000,[1429,2631,3344,3703,5070,5522,9209,17496],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))


In [14]:
#Naive Bayes Model

In [15]:
#create Model
#now train and save the model

from pyspark.mllib.classification import NaiveBayes
import shutil
#training
print("************************TRAINIG*******************************")
model=NaiveBayes.train(train, 1.0)
print("*****************************TRAINING COMPLETE************************************")

************************TRAINIG*******************************
*****************************TRAINING COMPLETE************************************


In [16]:
#saving the model

output_dir = 'D:/HPC-proj/NaiveBayesModel_Tweet3'
shutil.rmtree(output_dir, ignore_errors=True)
model.save(sc,output_dir)

In [17]:
#testing on test data
print("************************TESTING***********************************")
predictionAndLabel=test.map(lambda x: (x.label, model.predict(x.features)))
accuracy=1.0*predictionAndLabel.filter(lambda x: x[0]==x[1]).count()/test.count()
print("Model Accuracy is ", accuracy)
print("*****************TESTING COMPLETED*****************************")

************************TESTING***********************************
Model Accuracy is  0.6576188068273974
*****************TESTING COMPLETED*****************************


In [18]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.util import MLUtils

In [19]:
#Loogistic Regression Model

In [20]:
train.cache()
# Run training algorithm to build the model
model1 = LogisticRegressionWithLBFGS.train(train)

#saving the model

output_dir = 'D:/HPC-proj/BinaryClassifyier'
shutil.rmtree(output_dir, ignore_errors=True)
model1.save(sc,output_dir)

In [21]:
#testing
labelsAndPreds = test.map(lambda p: (p.label, model1.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(test.count())
print("Accuracy = " + str(1-trainErr))

Accuracy = 0.6610248728688235


In [22]:
#Binary Classification

In [23]:
# Compute raw scores on the test set
predictionAndLabels = test.map(lambda lp: (float(model.predict(lp.features)), lp.label))

# Instantiate metrics object
metrics = BinaryClassificationMetrics(predictionAndLabels)

#Precision-Recall curves summarize the trade-off between the true 
#positive rate and the positive predictive value for a predictive model using different probability thresholds.
# Area under precision-recall curve
print("Area under PR = %s" % metrics.areaUnderPR)

#ROC Curves summarize the trade-off between the true positive rate 
#and false positive rate for a predictive model using different probability thresholds.
# Area under ROC curve
print("Area under ROC = %s" % metrics.areaUnderROC)

Area under PR = 0.521599445380094
Area under ROC = 0.5952628962327116


In [24]:
#Decision Tree Classification
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel

In [26]:
model2 = DecisionTree.trainClassifier(trained_hashed, numClasses=2, categoricalFeaturesInfo={},impurity='gini', maxDepth=5, maxBins=32)

#saving the model

output_dir = 'D:/HPC-proj/DecisionTreeC'
shutil.rmtree(output_dir, ignore_errors=True)
model2.save(sc,output_dir)

print('Learned classification tree model:')
print(model2.toDebugString())

Learned classification tree model:
DecisionTreeModel classifier of depth 5 with 35 nodes
  If (feature 20234 <= 0.5)
   If (feature 32678 <= 1.5)
    If (feature 44283 <= 0.5)
     If (feature 10064 <= 0.5)
      If (feature 30363 <= 0.5)
       Predict: 1.0
      Else (feature 30363 > 0.5)
       Predict: 0.0
     Else (feature 10064 > 0.5)
      If (feature 25325 <= 0.5)
       Predict: 1.0
      Else (feature 25325 > 0.5)
       Predict: 0.0
    Else (feature 44283 > 0.5)
     If (feature 22986 <= 0.5)
      If (feature 16233 <= 0.5)
       Predict: 0.0
      Else (feature 16233 > 0.5)
       Predict: 1.0
     Else (feature 22986 > 0.5)
      If (feature 1959 <= 0.5)
       Predict: 1.0
      Else (feature 1959 > 0.5)
       Predict: 0.0
   Else (feature 32678 > 1.5)
    If (feature 30372 <= 0.5)
     If (feature 49524 <= 0.5)
      If (feature 1310 <= 0.5)
       Predict: 1.0
      Else (feature 1310 > 0.5)
       Predict: 0.0
     Else (feature 49524 > 0.5)
      Predict: 0.0
    

In [27]:
# Evaluate model on test instances and compute test error
predictions = model2.predict(test_hashed.map(lambda x: x.features))
labelsAndPredictions = test_hashed.map(lambda lp: lp.label).zip(predictions)
testAcc = labelsAndPredictions.filter(lambda lp: lp[0] == lp[1]).count() / float(test_hashed.count())
print('Accuracy = ' + str(testAcc))

Accuracy = 0.9554886211512718


In [28]:
# Decision Tress Regressor

In [29]:
model3 = DecisionTree.trainRegressor(trained_hashed, categoricalFeaturesInfo={},impurity='variance', maxDepth=5, maxBins=32)

#saving the model

output_dir = 'D:/HPC-proj/DecisionTreeR'
shutil.rmtree(output_dir, ignore_errors=True)
model3.save(sc,output_dir)

print('Learned regression tree model:')
print(model3.toDebugString())

Learned regression tree model:
DecisionTreeModel regressor of depth 5 with 35 nodes
  If (feature 20234 <= 0.5)
   If (feature 32678 <= 1.5)
    If (feature 44283 <= 0.5)
     If (feature 10064 <= 0.5)
      If (feature 30363 <= 0.5)
       Predict: 0.5947368421052631
      Else (feature 30363 > 0.5)
       Predict: 0.23096446700507614
     Else (feature 10064 > 0.5)
      If (feature 25325 <= 0.5)
       Predict: 0.902317880794702
      Else (feature 25325 > 0.5)
       Predict: 0.0
    Else (feature 44283 > 0.5)
     If (feature 22986 <= 0.5)
      If (feature 16233 <= 0.5)
       Predict: 0.0069605568445475635
      Else (feature 16233 > 0.5)
       Predict: 1.0
     Else (feature 22986 > 0.5)
      If (feature 1959 <= 0.5)
       Predict: 1.0
      Else (feature 1959 > 0.5)
       Predict: 0.0
   Else (feature 32678 > 1.5)
    If (feature 30372 <= 0.5)
     If (feature 49524 <= 0.5)
      If (feature 1310 <= 0.5)
       Predict: 0.9994579945799458
      Else (feature 1310 > 0.5)
  

In [30]:
# Evaluate model on test instances and compute test error
predictions1 = model3.predict(test_hashed.map(lambda x: x.features))
labelsAndPredictions1 = test_hashed.map(lambda lp: lp.label).zip(predictions1)
testMSE = labelsAndPredictions1.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /float(test_hashed.count())
print('Test Mean Squared Error = ' + str(testMSE))

Test Mean Squared Error = 0.02970745982424744


In [31]:
# RAndom Forest
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils

In [32]:
model4 = RandomForest.trainClassifier(trained_hashed, numClasses=2, categoricalFeaturesInfo={},numTrees=3, featureSubsetStrategy="auto",
                                         impurity='gini', maxDepth=4, maxBins=32)

#saving the model
output_dir = 'D:/HPC-proj/RandomForest'
shutil.rmtree(output_dir, ignore_errors=True)
model4.save(sc,output_dir)

print('Learned classification forest model:')
print(model4.toDebugString())

Learned classification forest model:
TreeEnsembleModel classifier with 3 trees

  Tree 0:
    If (feature 43431 <= 0.5)
     If (feature 9435 <= 0.5)
      If (feature 17647 <= 0.5)
       If (feature 21093 <= 0.5)
        Predict: 0.0
       Else (feature 21093 > 0.5)
        Predict: 1.0
      Else (feature 17647 > 0.5)
       Predict: 1.0
     Else (feature 9435 > 0.5)
      If (feature 28997 <= 0.5)
       If (feature 8886 <= 0.5)
        Predict: 1.0
       Else (feature 8886 > 0.5)
        Predict: 0.0
      Else (feature 28997 > 0.5)
       Predict: 0.0
    Else (feature 43431 > 0.5)
     Predict: 0.0
  Tree 1:
    If (feature 39727 <= 0.5)
     If (feature 16896 <= 3.5)
      If (feature 22501 <= 0.5)
       If (feature 45673 <= 0.5)
        Predict: 0.0
       Else (feature 45673 > 0.5)
        Predict: 1.0
      Else (feature 22501 > 0.5)
       If (feature 24411 <= 0.5)
        Predict: 1.0
       Else (feature 24411 > 0.5)
        Predict: 0.0
     Else (feature 16896 > 3.5

In [33]:
# Evaluate model on test instances and compute test error
predictions2 = model4.predict(test_hashed.map(lambda x: x.features))
labelsAndPredictions2 = test_hashed.map(lambda lp: lp.label).zip(predictions2)
testAcc1 = labelsAndPredictions2.filter(lambda lp: lp[0] == lp[1]).count() / float(test_hashed.count())
print('Accuracy = ' + str(testAcc1))

Accuracy = 0.7402945113788487


In [34]:
#SVM
from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.regression import LabeledPoint

In [35]:
model5 = SVMWithSGD.train(train, iterations=100)
    
#saving the model
output_dir = 'D:/HPC-proj/SVM'
shutil.rmtree(output_dir, ignore_errors=True)
model5.save(sc,output_dir)

In [36]:
# Evaluating the model on training data
labelsAndPreds1 = test.map(lambda p: (p.label, model5.predict(p.features)))
trainErr3 = labelsAndPreds1.filter(lambda lp: lp[0] != lp[1]).count() / float(test.count())
print("Accuracy = " + str(1-trainErr3))

Accuracy = 0.6369868241534924


In [37]:
sc.stop()