# Streaming Application

In [None]:
#Load Package

import nltk
import numpy as np
import pandas as pd
import seaborn as sns
from time import sleep
import warnings, json, time
from IPython import display
from datetime import datetime
from wordcloud import WordCloud 
import matplotlib.pyplot as plt
from collections import namedtuple
from collections import defaultdict
# from textblob import TextBlob
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as sf

# GLOBAL Var
sns.set()
warnings.filterwarnings('ignore') 
nltk.download('wordnet')
stop_words=set(stopwords.words('english'))
lemmatizer = WordNetLemmatizer()

# initialize spark
sc = SparkContext()
spark = SparkSession(sc)
sc.setLogLevel("ERROR")
sqlContext = SQLContext(sc)
ssc = StreamingContext(sc, 1)
data_stream = ssc.socketTextStream("127.0.0.1", 9991)

In [None]:
# utility functions
TWEET_SCHEMA = namedtuple("record", ("Time_Index", "Tweet_Text"))

# split sentence into words
def word_TokenizeFunct(x):
    return [word for line in x for word in line.split()]

# lemmatization
def lemmatizationFunct(x):
    tmp_ = [lemmatizer.lemmatize(s) for s in x]
    return " ".join(x)

# extract noun phrases
def extractPhraseFunct(x):

    def leaves(tree):
        """Finds NP (nounphrase) leaf nodes of a chunk tree."""
        for subtree in tree.subtrees(filter = lambda t: t.label()=='NP'):
            yield subtree.leaves()
    
    def get_terms(tree):
        for leaf in leaves(tree):
            term = [w for w,t in leaf if not w in stop_words]
            yield term
    
    sentence_re = r'(?:(?:[A-Z])(?:.[A-Z])+.?)|(?:\w+(?:-\w+)*)|(?:\$?\d+(?:.\d+)?%?)|(?:...|)(?:[][.,;"\'?():-_`])'
    grammar = r"""
    NBAR:
        {<NN.*|JJ>*<NN.*>}  # Nouns and Adjectives, terminated with Nouns
        
    NP:
        {<NBAR>}
        {<NBAR><IN><NBAR>}  # Above, connected with in/of/etc...
    """
    
    chunker = nltk.RegexpParser(grammar)
    tokens = nltk.regexp_tokenize(x,sentence_re)
    postoks = nltk.tag.pos_tag(tokens) #Part of speech tagging 
    tree = chunker.parse(postoks) #chunking
    terms = get_terms(tree)
    temp_phrases = []
    for term in terms:
        if len(term):
            temp_phrases.append(' '.join(term))
    
    return [w for w in temp_phrases if w] #remove empty lists


# Listening To The Port

In [None]:
# spark listen to stream data
trip_update = data_stream.map(lambda x: json.loads(x))\
                         .map(lambda x: TWEET_SCHEMA(x["Time_Index"], x["Tweet_Text"]))\
                         .foreachRDD(lambda rdd: rdd.toDF().registerTempTable("record"))

# trip_update.pprint()
ssc.start()


# Sentiment Monitor

In [None]:
# APP1 LINE PLOT

df = pd.DataFrame(columns = ["TimeStamp", "Polarity"])

count = 0
res = []

while True:
    try:
        tweetRecord = sqlContext.sql("SELECT Tweet_Text from record").rdd.map(lambda x: TextBlob(str(x)).sentences[0].sentiment.polarity)
        res += tweetRecord.collect()
        df = pd.DataFrame(columns = ["TimeStamp", "Polarity"])
        df.TimeStamp = list(range(0, len(res)))
        df.Polarity = res
        
        # graph
        plt.figure(figsize=(16, 6))

        display.clear_output(wait = True)
#         df.plot.line(x='TimeStamp', y='Polarity')
        ax = sns.lineplot(x="TimeStamp", y="Polarity", data=df)
        plt.show()
        
    except Exception as inst:
        print(type(inst))    # the exception instance
        print(inst.args)     # arguments stored in .args
        print(inst)  

    count += 1
    time.sleep(1)
    if count == 50:
        break

    

    
    

# Word Cloud

In [None]:
# APP2 WORD CLOUD

wc_schema = StructType([StructField("Phrase", StringType(), True), StructField("Count", FloatType(), True)])
wc_df = spark.createDataFrame(sc.emptyRDD(), schema = wc_schema)
count = 0

while True:
    
    if count % 10000 == 0:
        wc_df = spark.createDataFrame(wc_df.head(30), schema = wc_schema)
    else:
        print("begin")
        try:
            # extract tweet content
            tweetRecord = sqlContext.sql("SELECT Tweet_Text from record").rdd\
                                    .map(lambda x: word_TokenizeFunct(x))\
                                    .map(lambda x: lemmatizationFunct(x))\
                                    .map(lambda x: extractPhraseFunct(x))

            # get phrases/wrods frequency
            freqDistRDD = tweetRecord.flatMap(lambda x : nltk.FreqDist(x).most_common())\
                                     .map(lambda x: x)\
                                     .reduceByKey(lambda x,y : x+y)

            # aggregate wc
            wc_df = wc_df.union(freqDistRDD.toDF())\
                         .groupBy("Phrase")\
                         .agg(sf.sum('Count').alias('Count'))\
                         .sort(sf.col("Count").desc())

            # bar plot
            wc_df.createOrReplaceTempView("AGGTable") 
            wc_df_ = spark.sql("SELECT Phrase AS Keyword, Count as Frequency from AGGTable limit 20") #renaming columns 
            pandD = wc_df_.toPandas()
            display.clear_output(wait=True)
            
            wordcloudConvertDF = pandD.set_index('Keyword').T.to_dict('records')
            wordcloud = WordCloud(width=800, height=500, random_state=21, max_font_size=100, relative_scaling=0.5, colormap='Dark2').generate_from_frequencies(dict(*wordcloudConvertDF))
            plt.figure(figsize=(14, 10))    
            plt.imshow(wordcloud, interpolation="bilinear")
            plt.axis('off')
            plt.show()
            
        except Exception as inst:
            print(type(inst))    # the exception instance
            print(inst.args)     # arguments stored in .args
            print(inst)  

        
    count += 1
