In [None]:
# import common libraries

import sys
import time
import json

In [None]:
# Import the pyspark libraries

from pyspark import SparkContext
from pyspark.streaming import  StreamingContext
from pyspark.sql import  SQLContext

In [None]:
# Kafka Server configuration

# Import Kafka libraries
from kafka import KafkaConsumer, KafkaProducer

kafka_hostname = '35.239.57.13'
kafka_port = '9092'
kafka_bootstrap_server = kafka_hostname + ':' + kafka_port

In [None]:
# Register to Kafka as a Producer

def connect_kafka_producer():
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=[kafka_bootstrap_server], api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka')
        print(str(ex))
    finally:
        return _producer

In [None]:
# Publishes the message to Kafka Topic

def publishToKafka(topic_name, data):
    producer = connect_kafka_producer()
    text_to_send = bytes(data, encoding='utf-8')
    try:
        producer.send(topic_name,value=text_to_send)
        producer.flush()
        print(f"Message delivered to Kafka Topic - {topic_name}")
    except Exception as ex:
        #print("Message sending failed!!")
        print(ex)
    producer.close()

In [None]:
# Use Case 1

from nltk import word_tokenize
from textblob import TextBlob
import re

def sentimentCategoryBlob(score):
    if score == 0.0:
        category = "neutral"
    elif score > 0.0:
        category = "positive"
    elif score < 0.0:
        category = "negative"
        
    return category

# Cleans the tweet Text
def clean_tweet(text):
    """
    Removes the junk characters and tweet names
    """
    return ' '.join(re.sub("(@[A-Za-z0-9]+)|([^0-9A-Za-z \t])|(\w+:\/\/\S+)", " ",text).split())

# Applies Blob Algorithm to find sentiments to each texts
def blobSentimentAnalysis(text):
    
    text = clean_tweet(text)
    analysis = TextBlob(text)
    
    return analysis.sentiment.polarity

def findBlobSentiment(text):
    sentiment = blobSentimentAnalysis(str(text))
    #print(sentiment)
    category = sentimentCategoryBlob(sentiment)
    return category

def tweetsPerSentiment(dic) :
    topic_name = "blob_sentiments"
    senti_map = {"positive":0, "negative":0, "neutral": 0}
    for key in dic.keys():
        senti_map[key] = dic[key]
    #print(dic)
    print(senti_map)
    if dic:
        publishToKafka(topic_name, json.dumps(senti_map))
        #pass


In [None]:
# Use Case 2

import re

def electionMap(tweet):
    parties_list = list()
    if tweet: 
        tweet = tweet.lower()
        if re.search(r"shivsena|janata|jdu|bjp|modi|namo|chowkidar|shah|nda", tweet):
            parties_list.insert(0,"BJP")
        if re.search(r"indiancongress|raga|gandhi|inc|sonia|congress|rahul|priyanka|gatbandhan|nyay", tweet):
            parties_list.insert(0,"INC")
        if re.search(r"aap|kejri|arvind", tweet):
            parties_list.insert(0,"AAP") 
        if re.search(r"mamata|cpi|kanhaiya|bsp|samajwadi|tmc|trinamool|dmk|mns|bjd|samajwadi|yadav|gatbandhan|naidu|kalyan", tweet):
            parties_list.insert(0,"Others")
    return parties_list

def tweetsPerParty(dic):
    topic_name = "election_parties"
    election_map = {"BJP": 0, "INC": 0, "AAP": 0, "Others": 0}
    for key in dic.keys():
        election_map[key] = dic[key]
    print(election_map)
    if dic:
        publishToKafka(topic_name, json.dumps(election_map))
        #pass

In [None]:
# Use Case 3

# Find top 6 hashtags
def topTrendingHashTags(records):
    topic_name = "trending_hashtags"
    top_k_tweets = {}
    if records:
        top_k_tweets = dict(records)
    print(top_k_tweets)
    if records:
        publishToKafka(topic_name, json.dumps(top_k_tweets))

In [None]:
# Convert a tweet stream to JSON
def convert_tweet(tweet):
    if not tweet:
        tweet = "{}"
    return json.loads(tweet)

# Extract Tweet Text from tweet JSON
def extractTweetText(tweet):
    try:
        text = tweet['text']
        
    except KeyError as ex:
        print("No key attribute - text in the JSON")
        text = ""
    return text

# Extract Hashtags from a tweet JSON
def extractTweetHashtags(tweet):
    try:
        # hashtags is a list
        hashtags = tweet['entities']['hashtags']
        
    except KeyError as ex:
        print("No key attribute - entities:hashtags in the JSON")
        hashtags = ""
        
    return hashtags

In [None]:
if __name__ == '__main__':
    if len(sys.argv) != 3:
        print(f"Usage: python <file_name> <netcat_host_name> <netcat_port>")
        exit(-1)

    nc_host_name = "localhost"
    nc_port = 9999
    
    # batch interval in seconds
    batch_interval = 4 # batch interval
    window_length = 15 * batch_interval # 1 min data
    sliding_window = 1 * batch_interval # window is refreshed each second

    spc = SparkContext.getOrCreate()
    spc.setLogLevel("ERROR")
    stc = StreamingContext(spc, batch_interval)
    stc.checkpoint("/tmp/checkpoint")

    tweetsDStream = stc.socketTextStream(nc_host_name, nc_port)
    
    try:
        
        # Converts tweet stream to json object
        tstream = tweetsDStream.map(lambda tweet: convert_tweet(tweet))
        #tstream = tstream.filter(lambda tweet: filterOnLang(tweet))
 
        # Extracts the text from tweet json
        textstream = tstream.map(lambda tweet: extractTweetText(tweet))
        
        # Counts the number of records in the stream batch
        tstream.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()
        
        # Counts the number of records in the streaming window
        tstream.countByWindow(window_length,sliding_window).map(lambda x: "Tweets in this 1 minute window: %s" %x).pprint()
        
        ################## Use Case 2 ####################
        
        # Returns a map of election parties and their no. of tweets
        election = textstream.flatMap(lambda tweet: electionMap(tweet))
        election.countByValueAndWindow(window_length,sliding_window).foreachRDD(lambda rdd: tweetsPerParty(dict(rdd.collect())))
      
        ################## Use Case 3 ####################
        
        # Find the dictionary of top 6 Trending Hashtags in each tweets
        hashtagstream = tstream.map(lambda tweet: extractTweetHashtags(tweet))
        hashtagstream = hashtagstream.flatMap(lambda hashtags: [ hashtag['text'] for hashtag in hashtags if hashtag ])
        hashtagstream.countByValueAndWindow(window_length,sliding_window).transform(lambda rdd: rdd.sortBy(lambda x: -x[1])).foreachRDD(lambda rdd: topTrendingHashTags(rdd.take(6)))
    
        ################## Use Case 1 ####################
        
        # Finds the Sentiments of the tweets using Blob based algorithm
        sentimentstream = textstream.map(lambda text: findBlobSentiment(text))
        sentimentstream.countByValueAndWindow(window_length,sliding_window).foreachRDD(lambda rdd: tweetsPerSentiment(dict(rdd.collect())))
        
        stc.start()
        stc.awaitTermination()
        
    except Exception as ex:
        print(ex)