In [1]:
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from collections import namedtuple
from pyspark.sql.functions import desc

In [2]:
sc = SparkContext()

In [3]:
ssc = StreamingContext(sc,10)
sqlContext = SQLContext(sc)



In [4]:
socket_stream = ssc.socketTextStream("localhost",9999)

In [5]:
lines = socket_stream.window( 20 )

In [6]:
fields = ("tag", "count")
Tweet = namedtuple("Tweet",fields)

In [7]:
# Use Parenthesis for multiple lines or use \.
( lines.flatMap( lambda text: text.split( " " ) ) #Splits to a list
  .filter( lambda word: word.lower().startswith("#") ) # Checks for hashtag calls
  .map( lambda word: ( word.lower(), 1 ) ) # Lower cases the word
  .reduceByKey( lambda a, b: a + b ) # Reduces
  .map( lambda rec: Tweet( rec[0], rec[1] ) ) # Stores in a Tweet Object
  .foreachRDD( lambda rdd: rdd.toDF().sort( desc("count") ) # Sorts Them in a DF
  .limit(10).registerTempTable("tweets") ) ) # Registers to a table.

In [8]:
ssc.start()

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col, desc

In [2]:
spark = SparkSession.builder.appName("TwitterHashtagPopularity").getOrCreate()

In [3]:
# read the tweet data from socket
tweet_df = spark \
    .readStream \
    .format("socket") \
    .option("host", "127.0.0.1") \
    .option("port", 9999) \
    .load()

In [4]:
# type cast the column value
tweet_df = tweet_df.selectExpr("CAST(value AS STRING)")

In [5]:
# split words based on space, filter out hashtag values and group them up
tweets_tab = tweet_df.withColumn('word', explode(split(col('value'), ' '))) \
    .groupBy('word') \
    .count() \
    .sort('count', ascending=False) \
    .filter(col('word').contains('#'))

In [6]:
# write the above data into memory. consider the entire analysis in all iteration (output mode = complete). and let the trigger runs in every 2 secs.
writeTweet = tweets_tab.writeStream. \
    outputMode("complete"). \
    format("console"). \
    queryName("tweetquery"). \
    trigger(processingTime='2 seconds'). \
    start(). \
    awaitTermination()