In [1]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Columbia EECS E6893 Big Data Analytics
"""
This module is the spark streaming analysis process.


Usage:
    If used with dataproc:
        gcloud dataproc jobs submit pyspark --cluster <Cluster Name> twitterHTTPClient.py

    Create a dataset in BigQurey first using
        bq mk bigdata_sparkStreaming

    Remeber to replace the bucket with your own bucket name


Todo:
    1. hashtagCount: calculate accumulated hashtags count
    2. wordCount: calculate word count every 60 seconds
        the word you should track is listed below.
    3. save the result to google BigQuery

"""

from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row,SQLContext
import sys
import requests
import time
import subprocess
import re
from google.cloud import bigquery

# global variables
bucket = "bucket-hw3"    # TODO : replace with your own bucket name
output_directory_hashtags = 'gs://{}/hadoop/tmp/bigquery/pyspark_output/hashtagsCount'.format(bucket)
output_directory_wordcount = 'gs://{}/hadoop/tmp/bigquery/pyspark_output/wordcount'.format(bucket)

# output table and columns name
output_dataset = 'datasetHW3'                     #the name of your dataset in BigQuery
output_table_hashtags = 'hashtags'
columns_name_hashtags = ['hashtags', 'count']
output_table_wordcount = 'wordcount'
columns_name_wordcount = ['word', 'count', 'time']

# parameter
IP = 'localhost'    # ip port
PORT = 9001       # port

STREAMTIME = 600          # time that the streaming process runs

WORD = ['data', 'spark', 'ai', 'movie', 'good']     #the words you should filter and do word count

In [2]:
def aggregate_tags_count(new_values, total_sum):
    return sum(new_values) + (total_sum or 0)
    
def hashtagCount(words):
    """
    Calculate the accumulated hashtags count sum from the beginning of the stream
    and sort it by descending order of the count.
    Ignore case sensitivity when counting the hashtags:
        "#Ab" and "#ab" is considered to be a same hashtag
    You have to:
    1. Filter out the word that is hashtags.
       Hashtag usually start with "#" and followed by a series of alphanumeric
    2. map (hashtag) to (hashtag, 1)
    3. sum the count of current DStream state and previous state
    4. transform unordered DStream to a ordered Dstream
    Args:
        dstream(DStream): stream of real time tweets
    Returns:
        DStream Object with inner structure (hashtag, count)
    """

    # TODO: insert your code here
    import re
    tagCounts = words.filter(lambda word: word.lower().startswith("#") and re.match('^[a-zA-Z0-9]+$',word[1:])).\
                map(lambda word:(word.lower(),1))
    tagCounts = tagCounts.updateStateByKey(aggregate_tags_count)
    return tagCounts.transform(lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False))

In [3]:
def wordCount(words):
    """
    Calculte the count of 5 sepcial words in 60 seconds for every 60 seconds (window no overlap)
    Your should:
    1. filter the words, case insensitive.
    2. count the word during a special window size
    3. add a time related mark to the output of each window, ex: a datetime type
    Hints:
        You can take a look at reduceByKeyAndWindow transformation
        Dstream is a series of rdd, each RDD in a DStream contains data from a certain interval
        You may want to take a look of transform transformation of DStream when trying to add a time
    Args:
        dstream(DStream): stream of real time tweets
    Returns:
        DStream Object with inner structure (word, count, time)
    """

    # TODO: insert your code here
    selected = words.filter(lambda x: any(word == x.lower() for word in WORD)).\
                map(lambda word:(word.lower(),1))
    counts = selected.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 60, 60)
    return counts.transform(lambda time, rdd: rdd.map(lambda x: (x[0],x[1],time)))
    

In [4]:
# Helper functions
def saveToStorage(rdd, output_directory, columns_name, mode):
    """
    Save each RDD in this DStream to google storage
    Args:
        rdd: input rdd
        output_directory: output directory in google storage
        columns_name: columns name of dataframe
        mode: mode = "overwrite", overwirte the file
              mode = "append", append data to the end of file
    """
    if not rdd.isEmpty():
        (rdd.toDF( columns_name ) \
        .write.save(output_directory, format="json", mode=mode))


def saveToBigQuery(sc, output_dataset, output_table, directory):
    """
    Put temp streaming json files in google storage to google BigQuery
    and clean the output files in google storage
    """
    files = directory + '/part-*'
    subprocess.check_call(
        'bq load --source_format NEWLINE_DELIMITED_JSON '
        '--replace '
        '--autodetect '
        '{dataset}.{table} {files}'.format(
            dataset=output_dataset, table=output_table, files=files
        ).split())
    output_path = sc._jvm.org.apache.hadoop.fs.Path(directory)
    output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
        output_path, True)

In [5]:
# Spark settings
conf = SparkConf()
conf.setMaster('local[2]')
conf.setAppName("TwitterStreamApp")

# create spark context with the above configuration
#sc = SparkContext(conf=conf)
sc = SparkContext.getOrCreate()
sc.setLogLevel("ERROR")

# create sql context, used for saving rdd
sql_context = SQLContext(sc)

# create the Streaming Context from the above spark context with batch interval size 5 seconds
ssc = StreamingContext(sc, 5)
# setting a checkpoint to allow RDD recovery
ssc.checkpoint("~/checkpoint_TwitterApp")

# read data from port 9001
dataStream = ssc.socketTextStream(IP, PORT)
#dataStream.pprint()

words = dataStream.flatMap(lambda line: line.split(" "))

# calculate the accumulated hashtags count sum from the beginning of the stream
topTags = hashtagCount(words)
topTags.pprint()

# Calculte the word count during each time period 60s
wordCount = wordCount(words)
wordCount.pprint()

def saveTopTags(rdd):
    saveToStorage(rdd, output_directory_hashtags, columns_name_hashtags, "overwrite")
topTags.foreachRDD(saveTopTags)

def saveWordCounts(rdd):
    saveToStorage(rdd, output_directory_wordcount, columns_name_wordcount, "append")
wordCount.foreachRDD(saveWordCounts)

# start streaming process, wait for 600s and then stop.
ssc.start()
time.sleep(STREAMTIME)
ssc.stop(stopSparkContext=False, stopGraceFully=True)

# put the temp result in google storage to google BigQuery
saveToBigQuery(sc, output_dataset, output_table_hashtags, output_directory_hashtags)
saveToBigQuery(sc, output_dataset, output_table_wordcount, output_directory_wordcount)


-------------------------------------------
Time: 2019-10-31 13:24:55
-------------------------------------------
('#bms', 1)

-------------------------------------------
Time: 2019-10-31 13:25:00
-------------------------------------------
('#atlee', 4)
('#sanki', 3)
('#shahrukhkhan', 2)
('#srk', 2)
('#bms', 1)
('#ongseongwu', 1)
('#countdowntohalloween', 1)
('#sweepstakes', 1)
('#taeyeon', 1)
('#2', 1)
...

-------------------------------------------
Time: 2019-10-31 13:25:05
-------------------------------------------
('#bigil', 11)
('#atlee', 6)
('#sanki', 4)
('#shahrukhkhan', 3)
('#srk', 3)
('#ai', 3)
('#ongseongwu', 2)
('#fintech', 2)
('#bms', 1)
('#countdowntohalloween', 1)
...

-------------------------------------------
Time: 2019-10-31 13:25:10
-------------------------------------------
('#bigil', 15)
('#atlee', 6)
('#sanki', 4)
('#ai', 4)
('#shahrukhkhan', 3)
('#srk', 3)
('#ongseongwu', 2)
('#fintech', 2)
('#petta', 2)
('#viswasam', 2)
...

---------------------------------

-------------------------------------------
Time: 2019-10-31 13:27:15
-------------------------------------------
('#bigil', 132)
('#srk', 29)
('#atlee', 28)
('#ai', 24)
('#sanki', 23)
('#viswasam', 22)
('#shahrukhkhan', 21)
('#hvinoth', 14)
('#petta', 13)
('#ongseongwu', 10)
...

-------------------------------------------
Time: 2019-10-31 13:27:20
-------------------------------------------
('#bigil', 136)
('#srk', 30)
('#atlee', 29)
('#ai', 25)
('#sanki', 24)
('#viswasam', 24)
('#shahrukhkhan', 21)
('#hvinoth', 16)
('#petta', 13)
('#ongseongwu', 10)
...

-------------------------------------------
Time: 2019-10-31 13:27:25
-------------------------------------------
('#bigil', 139)
('#atlee', 30)
('#srk', 30)
('#ai', 26)
('#sanki', 24)
('#viswasam', 24)
('#shahrukhkhan', 22)
('#hvinoth', 16)
('#petta', 13)
('#ongseongwu', 11)
...

-------------------------------------------
Time: 2019-10-31 13:27:30
-------------------------------------------
('#bigil', 143)
('#atlee', 33)
('#srk', 

-------------------------------------------
Time: 2019-10-31 13:29:30
-------------------------------------------
('#bigil', 246)
('#atlee', 61)
('#srk', 60)
('#sanki', 52)
('#ai', 47)
('#shahrukhkhan', 41)
('#viswasam', 37)
('#hvinoth', 27)
('#petta', 17)
('#halloween', 17)
...

-------------------------------------------
Time: 2019-10-31 13:29:35
-------------------------------------------
('#bigil', 257)
('#atlee', 63)
('#srk', 61)
('#sanki', 54)
('#ai', 49)
('#shahrukhkhan', 43)
('#viswasam', 38)
('#hvinoth', 27)
('#halloween', 20)
('#petta', 18)
...

-------------------------------------------
Time: 2019-10-31 13:29:40
-------------------------------------------
('#bigil', 263)
('#atlee', 63)
('#srk', 61)
('#sanki', 54)
('#ai', 50)
('#shahrukhkhan', 43)
('#viswasam', 38)
('#hvinoth', 27)
('#halloween', 20)
('#petta', 18)
...

-------------------------------------------
Time: 2019-10-31 13:29:45
-------------------------------------------
('#bigil', 265)
('#atlee', 63)
('#srk', 61)

-------------------------------------------
Time: 2019-10-31 13:31:45
-------------------------------------------
('#bigil', 362)
('#atlee', 85)
('#srk', 82)
('#ai', 78)
('#sanki', 73)
('#viswasam', 59)
('#shahrukhkhan', 55)
('#hvinoth', 38)
('#halloween', 27)
('#petta', 24)
...

-------------------------------------------
Time: 2019-10-31 13:31:50
-------------------------------------------
('#bigil', 366)
('#atlee', 85)
('#srk', 83)
('#ai', 78)
('#sanki', 73)
('#viswasam', 60)
('#shahrukhkhan', 56)
('#hvinoth', 38)
('#halloween', 27)
('#petta', 24)
...

-------------------------------------------
Time: 2019-10-31 13:31:55
-------------------------------------------
('#bigil', 372)
('#atlee', 86)
('#srk', 85)
('#ai', 80)
('#sanki', 74)
('#viswasam', 63)
('#shahrukhkhan', 59)
('#hvinoth', 39)
('#halloween', 28)
('#petta', 25)
...

-------------------------------------------
Time: 2019-10-31 13:32:00
-------------------------------------------
('#bigil', 381)
('#atlee', 89)
('#srk', 86)

-------------------------------------------
Time: 2019-10-31 13:34:15
-------------------------------------------
('#bigil', 508)
('#atlee', 112)
('#srk', 103)
('#ai', 101)
('#sanki', 93)
('#viswasam', 86)
('#shahrukhkhan', 80)
('#hvinoth', 50)
('#halloween', 33)
('#petta', 27)
...

-------------------------------------------
Time: 2019-10-31 13:34:20
-------------------------------------------
('#bigil', 508)
('#atlee', 112)
('#srk', 103)
('#ai', 101)
('#sanki', 93)
('#viswasam', 86)
('#shahrukhkhan', 80)
('#hvinoth', 50)
('#halloween', 33)
('#petta', 27)
...

-------------------------------------------
Time: 2019-10-31 13:34:25
-------------------------------------------
('#bigil', 512)
('#atlee', 112)
('#srk', 105)
('#ai', 102)
('#sanki', 94)
('#viswasam', 86)
('#shahrukhkhan', 83)
('#hvinoth', 50)
('#halloween', 33)
('#petta', 27)
...

-------------------------------------------
Time: 2019-10-31 13:34:30
-------------------------------------------
('#bigil', 516)
('#atlee', 114)
('