In [1]:
#This section for Windows users only

import os
import sys

os.chdir(r"C:\spark\spark-files")
os.curdir
if 'SPARK_HOME' not in os.environ:
    os.environ['SPARK_HOME'] = 'C:\spark'
    
SPARK_HOME = os.environ['SPARK_HOME']

sys.path.insert(0,os.path.join(SPARK_HOME,"python"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","pyspark.zip"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","py4j-0.10.4-src.zip"))

In [2]:
#Importing dependencies

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.feature import IDF
import operator

In [3]:
#Setting up spark context, run it only once in a session

conf=SparkConf()
conf.set("spark.executor.memory", "1g")
conf.set("spark.cores.max", "2")
conf.setAppName("IRApp")
sc = SparkContext('local', conf=conf)

In [6]:
#Loading & extracting data

tweetData = sc.textFile("tweets_formatted_data.csv") #Located in C:\spark\spark-files
fields = tweetData.map(lambda x: x.split(","))
documents = fields.map(lambda x: x[1].lower().split(" "))
#fields.take(1)
tweetData.take(1)

['emergency,unable to sit due to sticky stains on berth. PNR no 4512791357. do help immediately']

In [8]:
#Calculatig TF-IDF

documentNames = fields.map(lambda x: x[0])
hashingTF = HashingTF(100000)
article_hash_value = hashingTF.transform(documents)
article_hash_value.cache()

idf = IDF().fit(article_hash_value)
tfidf = idf.transform(article_hash_value)

xformedData=tweetData.zip(tfidf)
xformedData.cache()
xformedData.collect()[0]

('emergency,unable to sit due to sticky stains on berth. PNR no 4512791357. do help immediately',
 SparseVector(100000, {916: 4.0775, 1462: 5.687, 16079: 2.253, 16470: 3.6721, 37495: 2.9144, 39740: 2.3911, 52613: 5.687, 59838: 4.3007, 60365: 3.4146, 66215: 4.9938, 75088: 5.687, 76603: 5.687, 88349: 5.687, 95803: 4.9938}))

In [11]:
#Processing

from pyspark.mllib.regression import LabeledPoint
def convertToLabeledPoint(inVal) :
    origAttr=inVal[0].split(",")
    sentiment = 0.0 if origAttr[0] == "feedback" else 1.0
    return LabeledPoint(sentiment, inVal[1])

tweetLp=xformedData.map(convertToLabeledPoint)
tweetLp.cache()
tweetLp.collect()[0]

LabeledPoint(1.0, (100000,[916,1462,16079,16470,37495,39740,52613,59838,60365,66215,75088,76603,88349,95803],[4.07753744391,5.68697535634,2.25298815185,3.6720723358,2.9143866341,2.39113849034,5.68697535634,4.30068099522,3.41458740488,4.99382817578,5.68697535634,5.68697535634,5.68697535634,4.99382817578]))

In [18]:
#Machine Learning - Naive Bayes

from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel
model = NaiveBayes.train(tweetLp, 1.0)
predictionAndLabel = tweetLp.map(lambda p: \
    (float(model.predict(p.features)), float(p.label)))
predictionAndLabel.collect()[35:40]

[(0.0, 0.0), (1.0, 1.0), (1.0, 1.0), (0.0, 0.0), (1.0, 1.0)]

In [19]:
#Forming confusion matrix

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
predDF = sqlContext.createDataFrame(predictionAndLabel.collect(), ["prediction","label"])
predDF.groupBy("label","prediction").count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|  215|
|  0.0|       1.0|   30|
|  1.0|       0.0|   36|
|  0.0|       0.0|  308|
+-----+----------+-----+



In [20]:
#saving the model

#model.save(sc,"IRModel")
import pickle
with open('IRModel', 'wb') as f:
    pickle.dump(model, f)

In [22]:
#This section classifies real-time tweets and sends data into MySQL Database

from pyspark import SparkConf, SparkContext
from pyspark.mllib.classification import  NaiveBayesModel
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.feature import IDF
from pyspark.mllib.regression import LabeledPoint

import operator
import pickle
import json
import MySQLdb


def insert_tweet(tweet,username,pnr,prediction,tweet_id):
    query = "INSERT INTO tweets(tweet,username,pnr,prediction,tweet_id) VALUES ('"+tweet+"','"+username+"',"+str(pnr)+","+str(int(prediction))+","+str(tweet_id)+");"
    try:
        conn = MySQLdb.connect("localhost","kunwar","","twitter" )
        cursor = conn.cursor()
        cursor.execute(query)
#         print("Database insertion SUCCESSFUL!!")
        conn.commit()
    except MySQLdb.Error as e:
        print(e)
#         print("Database insertion unsuccessful!!")
    finally:
        conn.close()

from pyspark.streaming import StreamingContext
conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")

ssc = StreamingContext(sc, 10)
ssc.checkpoint("checkpoint")
kstream = KafkaUtils.createDirectStream(
ssc, topics = ['twitterstream'], kafkaParams = {"metadata.broker.list": 'localhost:9092'})
tweets = kstream.map(lambda x: json.loads(x[1]))

with open('IRModel', 'rb') as f:
    loadedModel = pickle.load(f)

bc_model = sc.broadcast(loadedModel)

def process_data(data):

#         print("Processing data ...")        

        if (not data.isEmpty()):
            nbModel=bc_model.value
            hashingTF = HashingTF(100000)
            tf = hashingTF.transform(data.map(lambda x: x[0].encode('utf-8','ignore')))
            tf.cache()
            idf = IDF(minDocFreq=2).fit(tf)
            tfidf = idf.transform(tf)
            tfidf.cache()
            prediction=nbModel.predict(tfidf)

            temp = []
            i=0
            for p,q,r in data.collect():
                temp.append([])
                temp[i].append(p.encode('utf-8','ignore'))
                temp[i].append(q)
                temp[i].append(r)
                i+=1
            i=0
            for p in prediction.collect():
                temp[i].append(p)
                i+=1

            print(temp)
            for i in temp:
                insert_tweet(str(i[0]),str(i[1]),"0",int(i[3]),int(i[2]))
        else:
#             print("Empty RDD !!!")        
            pass

txt = tweets.map(lambda x: (x['text'], x['user']['screen_name'], x['id']))
txt.foreachRDD(process_data)

ssc.start() 
ssc.awaitTerminationOrTimeout(1000)
ssc.stop(stopGraceFully = True)