In [None]:
from pyspark import SparkContext
sc = SparkContext()

In [None]:
from pyspark.streaming import StreamingContext
# Create a local StreamingContext with batch interval of 4 second
ssc = StreamingContext(sc, 4)
ssc.checkpoint("StreamingTwitterCheckpoint")  # required when we update an rdd
# Create a DStream that will connect to hostname:port. Here, localhost:9019
dataStream = ssc.socketTextStream("localhost",9019)

In [None]:
from pyspark.sql import SparkSession
from pyspark import Row

def aggregate_hashtags_count(new_counts, total_sum):
    return sum(new_counts) + (total_sum or 0)


# Lazily instantiated global instance of SparkSession
def getSparkSessionInstance(sparkConf):
    if ("sparkSessionSingletonInstance" not in globals()):
        globals()["sparkSessionSingletonInstance"] = SparkSession \
            .builder \
            .config(conf=sparkConf) \
            .getOrCreate()
    return globals()["sparkSessionSingletonInstance"]

In [None]:
def processRDD(time, rdd):
    print("===== %s =====" % str(time))
    # Get the singleton instance of SparkSession
    spark = getSparkSessionInstance(rdd.context.getConf())

    # Convert RDD[String] to RDD[Row]
    row_rdd = rdd.map(lambda item: Row(hashtag=item[0], hashtag_count=item[1]))
    # RDD[Row] to DataFrame
    hashtags_df = spark.createDataFrame(row_rdd)
    
    # Back to the basics
    hashtags_df.createOrReplaceTempView("hashtags")
    # get the top 10 hashtags from the table using SQL and print them
    hashtag_counts_df = spark.sql("")
    hashtag_counts_df.show()

In [None]:
# split each tweet into words
words = dataStream.flatMap(lambda line: line.split(" "))

# filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1)
hashtags = words.filter(lambda word: '#' in word).map(lambda hashtag: (hashtag, 1))
hashtags.pprint()

# add the count of hashtag from each rdd to its previous count
#tags_totals = hashtags.updateStateByKey(aggregate_hashtags_count)
#tags_totals.pprint()

# do processing for each RDD generated in each interval
#tags_totals.foreachRDD(processRDD)

In [None]:
# start the streaming computation
ssc.start()
# wait for the streaming to finish
ssc.awaitTermination()