#  Spark Streaming with Twitter Project
_____

In [1]:
import findspark

In [None]:
# your path will likely not have 'matthew' in it. Change it to reflect your path.
findspark.init('/spark/spark-2.4.0-bin-hadoop2.7')


In [None]:
# May cause deprecation warnings, safe to ignore, they aren't errors
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import desc

In [None]:
# Can only run this once. restart your kernel for any errors.
conf = SparkConf().setAll([('spark.master','local[10]'),('spark.executor.memory', '8g'), ('spark.executor.cores', '6'), ('spark.cores.max', '6'), ('spark.driver.memory','8g')])
sc = SparkContext(conf=conf)

In [None]:
ssc = StreamingContext(sc, 10 )
sqlContext = SQLContext(sc)

In [None]:
socket_stream = ssc.socketTextStream("10.183.226.17", 5555)

In [None]:
lines = socket_stream.window( 20 )

In [None]:
from collections import namedtuple
fields = ("tag", "count" )
Tweet = namedtuple( 'Tweet', fields )

In [None]:
# Use Parenthesis for multiple lines or use \.
( lines.flatMap( lambda text: text.split( " " ) ) #Splits to a list
  .filter( lambda word: word.lower().startswith("#") ) # Checks for hashtag calls
  .map( lambda word: ( word.lower(), 1 ) ) # Lower cases the word
  .reduceByKey( lambda a, b: a + b ) # Reduces
  .map( lambda rec: Tweet( rec[0], rec[1] )  ) # Stores in a Tweet Object
  .foreachRDD( lambda rdd: rdd.toDF().sort( desc("count") ) # Sorts Them in a DF
  .limit(10).registerTempTable("tweets") ) ) # Registers to a table.

In [None]:
import time
from IPython import display
import matplotlib.pyplot as plt
import seaborn as sns
import pandas
# Only works for Jupyter Notebooks!
%matplotlib inline 

In [None]:
ssc.start()
ssc.awaitTermination

In [None]:
count = 0
while count < 10:
    
    time.sleep( 3 )
    top_10_tweets = sqlContext.sql( 'Select tag, count from tweets' )
    top_10_df = top_10_tweets.toPandas()
    display.clear_output(wait=True)
    plt.figure( figsize = ( 10, 8 ) )
    sns.barplot( x="count", y="tag", data=top_10_df)
    plt.show()
    count = count + 1

In [None]:
#ssc.stop()