In [1]:
import requests
import sys
import datetime
from pyspark.rdd import RDD
from pyspark.sql.functions import desc
from pyspark.sql import SQLContext, SparkSession, Row
from pyspark.streaming import StreamingContext
from pyspark import SparkContext, SparkConf
import findspark
findspark.init()


def getSparkSessionInstance(sparkConf: SparkConf) -> SparkSession:
    if ('sparkSessionSingletonInstance' not in globals()):
        globals()['sparkSessionSingletonInstance'] = SparkSession\
            .builder\
            .config(conf=sparkConf)\
            .getOrCreate()
    return globals()['sparkSessionSingletonInstance']


def send_df_to_dashboard(df):
    top_tags = [str(t.word) for t in df.select("word").collect()]
    tags_count = [p.total for p in df.select("total").collect()]
    url = 'http://localhost:5001/updateData'
    request_data = {'label': str(top_tags), 'data': str(tags_count)}
    response = requests.post(url, data=request_data)


def process(time: datetime.datetime, rdd) -> None:
    print("========= %s =========" % str(time))

    try:
        spark = getSparkSessionInstance(rdd.context.getConf())

        rowRdd = rdd.map(lambda w: Row(word=w))

        wordsDataFrame = spark.createDataFrame(rowRdd)

        wordsDataFrame.createOrReplaceTempView("words")

        wordCountsDataFrame = spark.sql(
            "select word, count(*) as total from words group by word order by total desc limit 10")

        wordCountsDataFrame.show()
        send_df_to_dashboard(wordCountsDataFrame)
    except BaseException:
        pass


sc = SparkContext(appName="Tweeter")
ssc = StreamingContext(sc, 5)
sqlContext = SQLContext(sc)
socket_stream = ssc.socketTextStream("127.0.0.1", 5554)
lines = socket_stream.window(50)

ssc.checkpoint("checkpoint_TwitterApp")
words = lines.flatMap(lambda line: line.split(" ")).filter(
    lambda word: word.lower().startswith("#"))
words.foreachRDD(process)


#words = socket_stream.flatMap(lambda line: line.split(" "))
# hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1))
#tags_totals = hashtags.updateStateByKey(aggregate_tags_count)
# tags_totals.foreachRDD(process)


# Use Parenthesis for multiple lines or use \.
"""
from collections import namedtuple
fields = ("tag", "count" )
Tweet = namedtuple( 'Tweet', fields )

( lines.flatMap( lambda text: text.split( " " ) ) #Splits to a list
  .filter( lambda word: word.lower().startswith("#") ) # Checks for hashtag calls
  .map( lambda word: ( word.lower(), 1 ) ) # Lower cases the word
  .reduceByKey( lambda a, b: a + b ) # Reduces
  .map( lambda rec: Tweet( rec[0], rec[1] ) ) # Stores in a Tweet Object
  .foreachRDD( lambda rdd: rdd.toDF().sort( ("count") ) # Sorts Them in a DF
  .limit(10).createOrReplaceTempView ("tweets") ) ) # Registers to a table
"""

ssc.start()
ssc.awaitTermination()




+--------------------+-----+
|                word|total|
+--------------------+-----+
|#DangoteSaltArtCh...|    1|
|              #7,000|    1|
|          #developer|    1|
|        #programming|    1|
|             #python|    1|
+--------------------+-----+

+--------------------+-----+
|                word|total|
+--------------------+-----+
|#DangoteSaltArtCh...|    2|
|              #7,000|    1|
|          #developer|    1|
|        #programming|    1|
|             #python|    1|
|#TOP100KPOPSONGS2022|    1|
|                  #1|    1|
|            #WithYou|    1|
+--------------------+-----+

+--------------------+-----+
|                word|total|
+--------------------+-----+
|#DangoteSaltArtCh...|    3|
|              #7,000|    1|
|          #developer|    1|
|        #programming|    1|
|             #python|    1|
|#TOP100KPOPSONGS2022|    1|
|                  #1|    1|
|            #WithYou|    1|
|              #komik|    1|
|            #dagelan|    1|
+-----------

# __Can be added__
### Analysis with updated state (with sql context) or can be used DStream object.