In [1]:
import findspark

In [2]:
findspark.init("/home/hp/spark-3.0.0-bin-hadoop2.7") 

In [3]:
import pyspark

In [4]:
from pyspark import SparkContext
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
import numpy
from pyspark.mllib.clustering import KMeans

In [5]:
sc = SparkContext('local', 'name')


# Loading the train and test files 

In [6]:
trainRDD = sc.textFile("/home/hp/sparkmllib/twitter datasets/train.csv")
testRDD=sc.textFile("/home/hp/sparkmllib/twitter datasets/test.csv")

# Loading the files containing positive and negative words

In [7]:
f1 = open('/home/hp/sparkmllib/twitter datasets/positive-words.txt')

#print(f1.readlines())
positive = [x.split('\n')[0] for x in f1.readlines()]
#print(positive)
f2 = open("/home/hp/sparkmllib/twitter datasets/negative-words.txt")
negative = [x.split('\n')[0] for x in f2.readlines()]
# print(negative)

# TRAIN DATASET

In [8]:
for l in trainRDD.take(11):
    print(l)
header = trainRDD.first()

trainRDD = trainRDD.filter(lambda line: line!=header)
trainRDD = trainRDD.map(lambda line:line.split(',',2)[2])


ItemID,Sentiment,SentimentText,,,,,,,,,
1,0,                     is so sad for my APL friend.............,,,,,,,,,
2,0,                   I missed the New Moon trailer...,,,,,,,,,
3,1,              omg its already 7:30 :O,,,,,,,,,
4,0,          .. Omgaga. Im sooo  im gunna CRy. I've been at this dentist since 11.. I was suposed 2 just get a crown put on (30mins)...,,,,,,,,,
5,0,         i think mi bf is cheating on me!!!       T_T,,,,,,,,,
6,0,         or i just worry too much?        ,,,,,,,,,
7,1,       Juuuuuuuuuuuuuuuuussssst Chillin!!,,,,,,,,,
8,0,       Sunny Again        Work Tomorrow  :-|       TV Tonight,,,,,,,,,
9,1,      handed in my uniform today . i miss you already,,,,,,,,,
10,1,      hmmmm.... i wonder how she my number @-),,,,,,,,,


# Extracting features from Sentiment text
Comparing the words in sentiment text with postive and negative words


In [9]:
feature = trainRDD.map(lambda line : [sum([1 if x in positive else 0 for x in line.split()]),
                                      sum([1 if x in negative else 0 for x in line.split()])
                                      ])


# Assigning label to each feature to get labelled points

In [10]:
train_label_point = feature.map(lambda x:LabeledPoint(0 if x[0]==max(x) else 1,
                                             Vectors.dense(numpy.array(x))))


In [11]:
for l in train_label_point.take(10):
    print(l)

(1.0,[0.0,1.0])
(1.0,[0.0,1.0])
(0.0,[0.0,0.0])
(0.0,[0.0,0.0])
(1.0,[0.0,1.0])
(1.0,[0.0,1.0])
(0.0,[0.0,0.0])
(0.0,[0.0,0.0])
(1.0,[0.0,1.0])
(0.0,[1.0,0.0])


# TEST DATASET 
Repeating all the operations performed on train data for test data as well

In [12]:
for l in testRDD.take(11):
    print(l)
# testRDD.foreach(print)

header = testRDD.first()
testRDD = testRDD.filter(lambda x:x!=header).map(lambda x:x.split(',' ,1)[1])


ItemID,SentimentText
1,                     is so sad for my APL friend.............
2,                   I missed the New Moon trailer...
3,              omg its already 7:30 :O
4,          .. Omgaga. Im sooo  im gunna CRy. I've been at this dentist since 11.. I was suposed 2 just get a crown put on (30mins)...
5,         i think mi bf is cheating on me!!!       T_T
6,         or i just worry too much?        
7,       Juuuuuuuuuuuuuuuuussssst Chillin!!
8,       Sunny Again        Work Tomorrow  :-|       TV Tonight
9,      handed in my uniform today . i miss you already
10,      hmmmm.... i wonder how she my number @-)


In [13]:
test_feature = testRDD.map(lambda line : [sum([1 if x in positive else 0 for x in line.split()]),
                                      sum([1 if x in negative else 0 for x in line.split()])
                                      ])

In [14]:
test_lp = test_feature.map(lambda x:LabeledPoint(0 if x[0]==max(x) else 1,
                                             Vectors.dense(numpy.array(x))))


# MODELLING

In [15]:
train_features = train_label_point.map(lambda lp:lp.features)


In [16]:
model = KMeans.train(train_features, k =2 ,seed=1 , maxIterations=2)


In [17]:
for center in model.clusterCenters:
    print(center)


[0.41513238 0.        ]
[0.39614353 1.19231481]


In [18]:
test_feature = test_lp.map(lambda lp : lp.features)

In [19]:
tes = test_lp.map(lambda x: [x.label, float(model.predict(x.features))])

In [20]:
for i in tes.take(10):
    print(i)

[1.0, 1.0]
[1.0, 1.0]
[0.0, 0.0]
[0.0, 0.0]
[1.0, 1.0]
[1.0, 1.0]
[0.0, 0.0]
[0.0, 0.0]
[1.0, 1.0]
[0.0, 0.0]
