In [None]:
#!/usr/bin/env python
# coding: utf-8

# In[ ]:

# specify the location of spark installation
import findspark
findspark.init("/home/aritra/spark/spark-2.4.0-bin-hadoop2.7")

from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
import sys
from datetime import datetime
import pandas as pd
import warnings
import re
from collections import Counter
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row, SQLContext
from pyspark.sql import functions as fun
from pyspark.sql import types as t
from pyspark.sql.types import StringType
from pyspark.ml.feature import Tokenizer, NGram, CountVectorizer, IDF, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import PipelineModel
from pyspark.sql.functions import *
import os

#path where model is will be read by streaming application
modeldir = "/home/aritra/CS 631/Project/Birendra/model"



# Using regex for preprocessing

pat1 = r'@[A-Za-z0-9_]+'
pat2 = r'https?://[^ ]+'
combined_pat = r'|'.join((pat1,pat2))
www_pat = r'www.[^ ]+'
negations_dic = {"isn't":"is not", "aren't":"are not", "wasn't":"was not", "weren't":"were not",
                "haven't":"have not","hasn't":"has not","hadn't":"had not","won't":"will not",
                "wouldn't":"would not", "don't":"do not", "doesn't":"does not","didn't":"did not",
                "can't":"can not","couldn't":"could not","shouldn't":"should not","mightn't":"might not",
                "mustn't":"must not"}
neg_pattern = re.compile(r'\b(' + '|'.join(negations_dic.keys()) + r')\b')

# preprocessing codes
# remove Twitter handle and URL, remove URL pattern starting with www., and transform to lower characters and remove numbers and special characters

def pre_processing(column):
    step1 = re.sub(combined_pat, '', column)
    step2 = re.sub(www_pat, '', step1)
    step3 = step2.lower()
    step4 = neg_pattern.sub(lambda x: negations_dic[x.group()], step3)
    final = re.sub(r'[^A-Za-z ]','',step4)
    return final.strip()

# building a pipeline following below order
# tokenizer + create n-gram + count vceorizer + inverse doc freq + assembler+  encoding target labels
def build_pipeline():
    tokenizer = [Tokenizer(inputCol='tweet',outputCol='words')]
    ngrams = [NGram(n=i, inputCol='words', outputCol='{0}_grams'.format(i)) for i in range(1,4)]
    cv = [CountVectorizer(vocabSize=5460, inputCol='{0}_grams'.format(i), outputCol='{0}_tf'.format(i)) for i in range(1,4)]
    idf = [IDF(inputCol='{0}_tf'.format(i), outputCol='{0}_tfidf'.format(i), minDocFreq=5) for i in range(1,4)]
    assembler = [VectorAssembler(inputCols=['{0}_tfidf'.format(i) for i in range(1,4)], outputCol='features')]
    label = [StringIndexer(inputCol='sentiment', outputCol='label')]
    lr = [LogisticRegression(maxIter=100)]
    pipeline = Pipeline(stages=tokenizer+ngrams+cv+idf+assembler+label+lr)
    return pipeline

#function to compute the trending hash tag 

list2=[]
def trending_hash_tag(row):
    global list2
    C = Counter(list2)
    string = str(row["Text"])
    tag = re.findall(r"#(\w+)", string)
    dict1={}
    if len(tag)>0:
        for element in tag:
            dict1[element]= C[element]
        trending_hashtag= sorted(dict1)[0]
    else:
        trending_hashtag=""
    return trending_hashtag

#function to compute the hash tag from tweet 

list1 =[]
def hash_tag(row):
    global list1
    string = str(row["Text"])
    tag = re.findall(r"#(\w+)", string)
    list1.append(tag) 
    return " ".join(tag)




# Create Spark instance with the above spark configuration
sc = SparkContext(appName="YourTest", master="local[2]")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 20) # 20 second batch interval
sqlContext = SQLContext(sc)
IP = "localhost" # Replace with your stream IP
Port = 5000 # Replace with your stream port

lines = ssc.socketTextStream(IP, Port)
# split each tweet into words
data_frame= pd.DataFrame(columns=['Time','longitude', 'latitude','place', "Text"])
#function definition for save to csv file
def save_df(rdd):
    global data_frame
    #print(rdd)
    rdd1 = rdd.collect()
    list1 = [x for x in rdd1 if len(x)>1]
    list2= [line.split("*////*") for line in list1 if len(line.split("*////*"))==5]
    #print(list2)
    dummy_frame = pd.DataFrame(list2,columns = ['Time','longitude', 'latitude','place', "Text"] )
    data_frame =data_frame.append(dummy_frame, ignore_index=True)
    data_frame.to_csv("new_result.csv")
    
    
    cwd = os.getcwd()


    classify_pp = fun.udf(pre_processing)
    model = PipelineModel.load(modeldir)
    test_set = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('new_result.csv')
    test_set = test_set.withColumn('tweet', classify_pp(fun.col('Text')))
    predictions = model.transform(test_set)
    predictions.createOrReplaceTempView("table1")
    df2 = predictions.select(predictions["time"],predictions["longitude"],predictions["latitude"],predictions["place"],predictions["Text"], predictions["prediction"])
    df3 = df2.withColumn('Sentiment',when(df2.prediction == 0,"Positive").otherwise('Negative')).drop(df2.prediction)
    #df3.show()
    df4=df3.toPandas()
    df4["hashstag"] = df4.apply(hash_tag,axis =1)
    df4["Trending hashtag"]= df4.apply(trending_hash_tag,axis=1)
    df4.to_csv("result_sample.csv")


lines.foreachRDD(save_df)

ssc.start()
# wait for the streaming to finish
ssc.awaitTermination()

