In [None]:
#!pip install -U kafka-python
#!pip install --force-reinstall pyspark==2.4.4

In [1]:
import os
import sys
os.environ["SPARK_HOME"]="/home/user1/trainig_material/spark-2.4.4-bin-hadoop2.7"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4 pyspark-shell'

In [2]:
import pandas as pd
from datetime import datetime, timedelta
import json

import tweepy
from tweepy import OAuthHandler
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
from textblob import TextBlob

from kafka import KafkaClient
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka import TopicPartition
from kafka.errors import KafkaError

In [3]:
from kafka import KafkaConsumer
import json
# Spark
from pyspark import SparkContext, SparkConf
# Spark Streaming
from pyspark.streaming import StreamingContext
# Kafka
from pyspark.streaming.kafka import KafkaUtils
# json parsing
from pyspark.sql import Row,SQLContext

## Use spark streaming to get an understanding of the twitter data

In [4]:
def createContext():
    sc = SparkContext(appName="TwitterApp")
    sc.setLogLevel("WARN")
    ssc = StreamingContext(sc, 5)
    
    # Define Kafka Consumer
    kafkaStream = KafkaUtils.createStream(ssc, 'localhost:2181', 'spark-streaming', {'proj':1})
    
    ## --- Processing
    # Extract tweets
    parsed = kafkaStream.map(lambda v: json.loads(v[1]))
    
    # Count number of tweets in the batch
    count_this_batch = kafkaStream.count().map(lambda x:('Tweets this batch: %s' % x))
    
    # Count by windowed time period
    count_windowed = kafkaStream.countByWindow(60,5).map(lambda x:('Tweets total (One minute rolling count): %s' % x))

    # Get authors
    authors_dstream = parsed.map(lambda tweet: tweet['user']['screen_name'])
    
    # Count each value and number of occurences 
    count_values_this_batch = authors_dstream.countByValue()\
                                .transform(lambda rdd:rdd\
                                  .sortBy(lambda x:-x[1]))\
                              .map(lambda x:"Author counts this batch:\tValue %s\tCount %s" % (x[0],x[1]))

    # Count each value and number of occurences in the batch windowed
    count_values_windowed = authors_dstream.countByValueAndWindow(60,5)\
                                .transform(lambda rdd:rdd\
                                  .sortBy(lambda x:-x[1]))\
                            .map(lambda x:"Author counts (One minute rolling):\tValue %s\tCount %s" % (x[0],x[1]))

    # Write total tweet counts to stdout
    # Done with a union here instead of two separate pprint statements just to make it cleaner to display
    count_this_batch.union(count_windowed).pprint()

    # Write tweet author counts to stdout
    count_values_this_batch.pprint(5)
    count_values_windowed.pprint(5)
    
    return ssc

## Shows streaming values for one minute batches and one minute rolling

In [None]:
ssc = StreamingContext.getOrCreate('/tmp/checkpoint_v02',lambda: createContext())
ssc.start()
ssc.awaitTermination(timeout=180)

In [None]:
ssc.stop()

## Create consumer

In [5]:
sc = SparkContext(appName="twitter")
ssc = StreamingContext(sc, 30)

In [6]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.enableHiveSupport().getOrCreate()

In [7]:
import time
consumer = KafkaConsumer('twitter', group_id=None, auto_offset_reset='latest', bootstrap_servers=['localhost:9092'], value_deserializer=lambda m: json.loads(m)) 
avg_hashtag = []
minutes = []
rows = []
window = []


timer = time.time() + 30
minute_timer = time.time() + 60
end = time.time() + 300
for msg in consumer:
    row = json.loads(json.dumps(msg.value))
    if 'entities' in row:
        if len(row['entities']['hashtags']) > 0:
            hashtag_text = row['entities']['hashtags'][0]['text']
        else:
            continue
            hashtag_text = row['entities']['hashtags']
                                           
        #Set up stream format
        row = Row(TweetID=row['id'], Text=row['text'], UserID=row['user']['id'], Hashtags=hashtag_text, Event_time=row['created_at'])
        minutes.append(row)
        rows.append(row)
        window.append(row)

    if (time.time() >= minute_timer):
        minute_timer += 60
        minute = spark.createDataFrame(minutes)
        minutes = []
        avg_hashtag.append(minute)
        
    if (time.time() >= end):
        tweetsdf1 = spark.createDataFrame(window)
        print('Last 30 seconds:')
        tweetsdf1.show()
        tweetsdf2 = spark.createDataFrame(rows)
        print('Entire running duration (5 minutes):')
        tweetsdf2.show()
        break
        
    if (time.time() >= timer):
        timer += 30
        sample = spark.createDataFrame(window)
        print('30 second window:')
        sample.show()
        window = []

30 second window:
+--------------------+--------------------+--------------------+-------------------+-------------------+
|          Event_time|            Hashtags|                Text|            TweetID|             UserID|
+--------------------+--------------------+--------------------+-------------------+-------------------+
|Tue Jul 27 19:12:...|    StrongerTogether|RT @Olympics: Did...|1420099677043703812|1412261447254614016|
|Tue Jul 27 19:12:...|            Olympics|Can Lutalo Muhamm...|1420099682039209990|          413809300|
|Tue Jul 27 19:12:...|                 NGA|RT @Olympics: Coa...|1420099682840240129|1345700670029103105|
|Tue Jul 27 19:12:...|              Silver|RT @Olympics: #Si...|1420099682999676936|           63890475|
|Tue Jul 27 19:12:...|           Tokyo2020|RT @CubaSolidarit...|1420099683414953987|           21202355|
|Tue Jul 27 19:12:...|             surfing|RT @Olympics: We ...|1420099685243445249|1229014788539174912|
|Tue Jul 27 19:12:...|               

30 second window:
+--------------------+---------------+--------------------+-------------------+-------------------+
|          Event_time|       Hashtags|                Text|            TweetID|             UserID|
+--------------------+---------------+--------------------+-------------------+-------------------+
|Tue Jul 27 19:14:...|       Olympics|RT @folkypunkcas:...|1420100180360286210|         2314884479|
|Tue Jul 27 19:14:...|    bbcolympics|RT @BBCSport: .@T...|1420100180855115778|           70214615|
|Tue Jul 27 19:14:...|    SimoneBiles|RT @Tokyo2020: It...|1420100183321423879|           78397748|
|Tue Jul 27 19:14:...|   SaeidMollaei|RT @Saeed_Juventi...|1420100185275908100|1115709805564387328|
|Tue Jul 27 19:14:...|    dreamfanart|RT @Mendiii5: Mrs...|1420100185967894530|1291672509088071680|
|Tue Jul 27 19:14:...|            USA|It seem like ever...|1420100189973487621|1271509111075508224|
|Tue Jul 27 19:14:...|      Tokyo2020|RT @ngcproject: W...|1420100190824923147|   

30 second window:
+--------------------+----------------+--------------------+-------------------+-------------------+
|          Event_time|        Hashtags|                Text|            TweetID|             UserID|
+--------------------+----------------+--------------------+-------------------+-------------------+
|Tue Jul 27 19:16:...|       Tokyo2020|RT @JulesBoykoff:...|1420100685702434817|1070415931933782016|
|Tue Jul 27 19:16:...|        Olympics|RT @folkypunkcas:...|1420100687548096513|          412372061|
|Tue Jul 27 19:16:...|        Olympics|My #Olympics swim...|1420100692983812097|           21517602|
|Tue Jul 27 19:16:...|        Olympics|RT @ajplus: Alger...|1420100695957467137|1275517229010845696|
|Tue Jul 27 19:16:...|       JillBiden|Hear, hear! You g...|1420100699132612608|          103012051|
|Tue Jul 27 19:16:...|        Olympics|Good news: gymnas...|1420100699203936262|           71291093|
|Tue Jul 27 19:16:...|       Tokyo2020|RT @ESPNIndia: Is...|1420100704354

In [8]:
tweetsdf1.createOrReplaceTempView('df1')
tweetsdf1.write.mode('append').saveAsTable('df1')

tweetsdf2.createOrReplaceTempView('df2')
tweetsdf2.write.mode('append').saveAsTable('df2')

7a

In [10]:
num_tweets = spark.sql('select count(TweetID) as Num_of_Tweets from df1')
num_tweets.show()

+-------------+
|Num_of_Tweets|
+-------------+
|           53|
+-------------+



7b

In [11]:
spark.sql('select count(distinct UserID) as Active_Users from df1').show()

+------------+
|Active_Users|
+------------+
|          45|
+------------+



7c

In [12]:
spark.sql('select Hashtags, count(Hashtags) as Count from df1 group by Hashtags order by Count desc limit 1').show()

+--------+-----+
|Hashtags|Count|
+--------+-----+
|Olympics|   16|
+--------+-----+



7d

In [13]:
spark.sql('select Hashtags, count(Hashtags) as Count from df2 group by Hashtags order by Count desc limit 1').show()

+--------+-----+
|Hashtags|Count|
+--------+-----+
|Olympics|  185|
+--------+-----+



7e

In [14]:
for i in avg_hashtag:
    i.createOrReplaceTempView('minute')
    spark.sql('select count(Hashtags) as Count from minute where Hashtags = "Olympics"').show()

+-----+
|Count|
+-----+
|   32|
+-----+

+-----+
|Count|
+-----+
|   34|
+-----+

+-----+
|Count|
+-----+
|   47|
+-----+

+-----+
|Count|
+-----+
|   39|
+-----+

+-----+
|Count|
+-----+
|   33|
+-----+



8

In [15]:
num_tweets.write.mode('overwrite').saveAsTable('number_of_tweets')

In [16]:
spark.sql('show tables').show()

+--------+----------------+-----------+
|database|       tableName|isTemporary|
+--------+----------------+-----------+
| default|             df1|      false|
| default|             df2|      false|
| default|        employee|      false|
| default|number_of_tweets|      false|
| default|             src|      false|
|        |             df1|       true|
|        |             df2|       true|
|        |          minute|       true|
+--------+----------------+-----------+



In [17]:
spark.sql('select * from number_of_tweets').show()

+-------------+
|Num_of_Tweets|
+-------------+
|           53|
+-------------+

