In [1]:
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row,SQLContext
import sys
import requests
# create spark configuration
conf = SparkConf()
# use two kernels to speed up
conf.setMaster('local[2]')
conf.setAppName("TwitterStreamApp")
# create spark context with the above configuration
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
# create the Streaming Context from the above spark context with interval size 2 seconds
ssc = StreamingContext(sc, 3)
# setting a checkpoint to allow RDD recovery
ssc.checkpoint("checkpoint_TwitterApp")

In [2]:
def aggregate_tags_count(new_values, total_sum):
    return sum(new_values) + (total_sum or 0)

def get_sql_context_instance(spark_context):
    if ('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = SQLContext(spark_context)
    return globals()['sqlContextSingletonInstance']

def process_rdd(time, rdd):
    print("----------- %s -----------" % str(time))
    try:
#         print("start")
        # Get spark sql singleton context from the current context
        sql_context = get_sql_context_instance(rdd.context)
        print("------------sql_context----------------")
        sql_context.pprint()
        # convert the RDD to Row RDD
        row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1]))
        print("---------------row_rdd------------------")
        row_rdd.show()
        # create a DF from the Row RDD
        hashtags_df = sql_context.createDataFrame(row_rdd)
        print("----------------hashtags_df-------------")
        hashtags_df.show()
        # Register the dataframe as table
        hashtags_df.registerTempTable("hashtags")
        print("------------hashtages----------------")
        print(hashtages.show())
        # get the top 10 hashtags from the table using SQL and print them
        hashtag_counts_df = sql_context.sql("select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10")
        print("---------------hashtag_count_df--------------")
        hashtag_counts_df.show()
    except:
        print("------------------error------------------")
        e = sys.exc_info()[0]
        print(e)
#         finally:
#             try:
#                 hashtag_counts_df.show()
#             except:
#                 pass

def send_df_to_dashboard(df):
    # extract the hashtags from dataframe and convert them into array
    top_tags = [str(t.hashtag) for t in df.select("hashtag").collect()]
    # extract the counts from dataframe and convert them into array
    tags_count = [p.hashtag_count for p in df.select("hashtag_count").collect()]
    # initialize and send the data through REST API
    url = 'http://localhost:5001/updateData'
    request_data = {'label': str(top_tags), 'data': str(tags_count)}
    response = requests.post(url, data=request_data)

In [3]:
# read data from port 9009
dataStream = ssc.socketTextStream("localhost",10005)

In [4]:
dataStream.pprint()

In [5]:
# split each tweet into words
words = dataStream.flatMap(lambda line: line.split(" "))
# words.pprint()
# filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1)
hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1))
hashtags.pprint()
# adding the count of each hashtag to its last count
tags_totals = hashtags.updateStateByKey(aggregate_tags_count)
tags_totals.pprint()
# do processing for each RDD generated in each interval
tags_totals.foreachRDD(process_rdd)
# start the streaming computation
ssc.start()
# wait for the streaming to finish
ssc.awaitTermination()

-------------------------------------------
Time: 2018-10-20 05:46:33
-------------------------------------------

-------------------------------------------
Time: 2018-10-20 05:46:33
-------------------------------------------

-------------------------------------------
Time: 2018-10-20 05:46:33
-------------------------------------------

----------- 2018-10-20 05:46:33 -----------
------------sql_context----------------
------------------error------------------
<class 'AttributeError'>
-------------------------------------------
Time: 2018-10-20 05:46:36
-------------------------------------------

-------------------------------------------
Time: 2018-10-20 05:46:36
-------------------------------------------

-------------------------------------------
Time: 2018-10-20 05:46:36
-------------------------------------------

----------- 2018-10-20 05:46:36 -----------
------------sql_context----------------
------------------error------------------
<class 'AttributeError'>
--------

KeyboardInterrupt: 

-------------------------------------------
Time: 2018-10-20 05:46:48
-------------------------------------------

-------------------------------------------
Time: 2018-10-20 05:46:48
-------------------------------------------

-------------------------------------------
Time: 2018-10-20 05:46:48
-------------------------------------------

----------- 2018-10-20 05:46:48 -----------
------------sql_context----------------
------------------error------------------
<class 'AttributeError'>
-------------------------------------------
Time: 2018-10-20 05:46:51
-------------------------------------------

-------------------------------------------
Time: 2018-10-20 05:46:51
-------------------------------------------

-------------------------------------------
Time: 2018-10-20 05:46:51
-------------------------------------------

----------- 2018-10-20 05:46:51 -----------
------------sql_context----------------
------------------error------------------
<class 'AttributeError'>
--------

-------------------------------------------
Time: 2018-10-20 05:47:39
-------------------------------------------

-------------------------------------------
Time: 2018-10-20 05:47:39
-------------------------------------------

-------------------------------------------
Time: 2018-10-20 05:47:39
-------------------------------------------

----------- 2018-10-20 05:47:39 -----------
------------sql_context----------------
------------------error------------------
<class 'AttributeError'>
-------------------------------------------
Time: 2018-10-20 05:47:42
-------------------------------------------

-------------------------------------------
Time: 2018-10-20 05:47:42
-------------------------------------------

-------------------------------------------
Time: 2018-10-20 05:47:42
-------------------------------------------

----------- 2018-10-20 05:47:42 -----------
------------sql_context----------------
------------------error------------------
<class 'AttributeError'>
--------

-------------------------------------------
Time: 2018-10-20 05:48:30
-------------------------------------------

-------------------------------------------
Time: 2018-10-20 05:48:30
-------------------------------------------

-------------------------------------------
Time: 2018-10-20 05:48:30
-------------------------------------------

----------- 2018-10-20 05:48:30 -----------
------------sql_context----------------
------------------error------------------
<class 'AttributeError'>
-------------------------------------------
Time: 2018-10-20 05:48:33
-------------------------------------------

-------------------------------------------
Time: 2018-10-20 05:48:33
-------------------------------------------

-------------------------------------------
Time: 2018-10-20 05:48:33
-------------------------------------------

----------- 2018-10-20 05:48:33 -----------
------------sql_context----------------
------------------error------------------
<class 'AttributeError'>
--------