In [1]:
print("hello")

hello


In [2]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Columbia EECS E6893 Big Data Analytics
"""
This module is the spark streaming analysis process.


Usage:
    If used with dataproc:
        gcloud dataproc jobs submit pyspark --cluster <Cluster Name> twitterHTTPClient.py

    Create a dataset in BigQurey first using
        bq mk bigdata_sparkStreaming

    Remeber to replace the bucket with your own bucket name


Todo:
    1. hashtagCount: calculate accumulated hashtags count
    2. wordCount: calculate word count every 60 seconds
        the word you should track is listed below.
    3. save the result to google BigQuery

"""

from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row,SQLContext
import sys
import requests
import time
import subprocess
import re
from google.cloud import bigquery

# global variables
bucket = "mk4427hw2"    # TODO : replace with your own bucket name
output_directory_hashtags = 'gs://{}/hadoop/tmp/bigquery/pyspark_output/hashtagsCount'.format(bucket)
output_directory_wordcount = 'gs://{}/hadoop/tmp/bigquery/pyspark_output/wordcount'.format(bucket)

# output table and columns name
output_dataset = 'mk4427data'                     #the name of your dataset in BigQuery
output_table_hashtags = 'hashtags'
columns_name_hashtags = ['hashtags', 'count']
output_table_wordcount = 'wordcount'
columns_name_wordcount = ['word', 'count', 'time']

# parameter
IP = 'localhost'    # ip port
PORT = 9001       # port

STREAMTIME = 600      # time that the streaming process runs

WORD = ['data', 'spark', 'ai', 'movie', 'good']     #the words you should filter and do word count

# Helper functions
def saveToStorage(rdd, output_directory, columns_name, mode):
    """
    Save each RDD in this DStream to google storage
    Args:
        rdd: input rdd
        output_directory: output directory in google storage
        columns_name: columns name of dataframe
        mode: mode = "overwirte", overwirte the file
              mode = "append", append data to the end of file
    """
    if not rdd.isEmpty():
        (rdd.toDF( columns_name ) \
        .write.save(output_directory, format="json", mode=mode))


def saveToBigQuery(sc, output_dataset, output_table, directory):
    """
    Put temp streaming json files in google storage to google BigQuery
    and clean the output files in google storage
    """
    files = directory + '/part-*'
    subprocess.check_call(
        'bq load --source_format NEWLINE_DELIMITED_JSON '
        '--replace '
        '--autodetect '
        '{dataset}.{table} {files}'.format(
            dataset=output_dataset, table=output_table, files=files
        ).split())
    output_path = sc._jvm.org.apache.hadoop.fs.Path(directory)
    output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
        output_path, True)


def hashtagCount(words):
    """
    Calculate the accumulated hashtags count sum from the beginning of the stream
    and sort it by descending order of the count.
    Ignore case sensitivity when counting the hashtags:
        "#Ab" and "#ab" is considered to be a same hashtag
    You have to:
    1. Filter out the word that is hashtags.
       Hashtag usually start with "#" and followed by a serious of alphanumeric
    2. map (hashtag) to (hashtag, 1)
    3. sum the count of current DStream state and previous state
    4. transform unordered DStream to a ordered Dstream
    Hints:
        you may use regular expression to filter the words
        You can take a look at updateStateByKey and transform transformations
    Args:
        dstream(DStream): stream of real time tweets
    Returns:
        DStream Object with inner structure (hashtag, count)
    """

    # TODO: insert your code here
    tagCounts = words.filter(lambda word: word.lower().startswith("#") and re.match('^[a-zA-Z0-9]+$',
                                                                                    word[1:])).\
                map(lambda word:(word.lower(),1))
    tagCounts = tagCounts.updateStateByKey(lambda x,y: sum(x)+(y or 0))
    return tagCounts.transform(lambda rdd: rdd.sortBy(lambda x: x[1],
                                                      ascending=False))

def wordCount(words):
    """
    Calculte the count of 5 sepcial words for every 60 seconds (window no overlap)
    You can choose your own words.
    Your should:
    1. filter the words
    2. count the word during a special window size
    3. add a time related mark to the output of each window, ex: a datetime type
    Hints:
        You can take a look at reduceByKeyAndWindow transformation
        Dstream is a serious of rdd, each RDD in a DStream contains data from a certain interval
        You may want to take a look of transform transformation of DStream when trying to add a time
    Args:
        dstream(DStream): stream of real time tweets
    Returns:
        DStream Object with inner structure (word, (count, time))
    """

    # TODO: insert your code here
    selected = words.filter(lambda x: any(word == x.lower() for word in WORD)).\
                map(lambda word:(word.lower(),1))
    counts = selected.reduceByKeyAndWindow(lambda key, window: key + window,
                                           lambda key, window: key - window,
                                           60,
                                           60)
    return counts.transform(lambda time, rdd: rdd.map(lambda x: (x[0],
                                                                 x[1],
                                                                 time)))

def saveToStorage_hashTags(rdd):
    if not rdd.isEmpty():
        (rdd.toDF( columns_name_hashtags ) \
        .write.save(output_directory_hashtags, format="json", mode="overwrite"))

def saveToStorage_wordCount(rdd):
    if not rdd.isEmpty():
        (rdd.toDF( columns_name_wordcount ) \
        .write.save(output_directory_wordcount, format="json", mode="append"))

if __name__ == '__main__':
    # Spark settings
    conf = SparkConf()
    conf.setMaster('local[2]')
    conf.setAppName("TwitterStreamApp")

    # create spark context with the above configuration
    sc = SparkContext.getOrCreate(conf=conf)
    sc.setLogLevel("ERROR")

    # create sql context, used for saving rdd
    sql_context = SQLContext(sc)

    # create the Streaming Context from the above spark context with batch interval size 5 seconds
    ssc = StreamingContext(sc, 5)
    # setting a checkpoint to allow RDD recovery
    ssc.checkpoint("~/checkpoint_TwitterApp")

    # read data from port 9001
    dataStream = ssc.socketTextStream(IP, PORT)
    #dataStream.pprint()

    words = dataStream.flatMap(lambda line: line.split(" "))
    #words.pprint()
    # calculate the accumulated hashtags count sum from the beginning of the stream
    topTags = hashtagCount(words)
    topTags.pprint()

    # Calculte the word count during each time period 6s
    wordCount = wordCount(words)
    wordCount.pprint()

    # save hashtags count and word count to google storage
    # used to save to google BigQuery
    # You should:
    #   1. topTags: only save the lastest rdd in DStream
    #   2. wordCount: save each rdd in DStream
    # Hints:
    #   1. You can take a look at foreachRDD transformation
    #   2. You may want to use helper function saveToStorage
    #   3. You should use save output to output_directory_hashtags, output_directory_wordcount,
    #       and have output columns name columns_name_hashtags and columns_name_wordcount.
    # TODO: insert your code here
    
    topTags.foreachRDD(saveToStorage_hashTags)
    wordCount.foreachRDD(saveToStorage_wordCount)
    print("Starting ssc")
    # start streaming process, wait for 600s and then stop.
    ssc.start()
    print("Sleeping .....zzzz..")
    time.sleep(STREAMTIME)
    print("Waking up...")
    ssc.stop(stopSparkContext=False, stopGraceFully=True)
    print("Saving to BigQuery")
    # put the temp result in google storage to google BigQuery
    saveToBigQuery(sc, output_dataset, output_table_wordcount, output_directory_wordcount)
    saveToBigQuery(sc, output_dataset, output_table_hashtags, output_directory_hashtags)
    print("Done :) ")



Starting ssc
Sleeping .....zzzz..
-------------------------------------------
Time: 2021-10-24 15:26:55
-------------------------------------------
('#2yrsofindustryhitbigil', 1)
('#beast', 1)
('#master', 1)

-------------------------------------------
Time: 2021-10-24 15:27:00
-------------------------------------------
('#lalice', 3)
('#blackpink', 3)
('#lalisa', 3)
('#2yrsofindustryhitbigil', 1)
('#beast', 1)
('#rainilealsat', 1)
('#bjkvgs', 1)
('#100daysofcode', 1)
('#bigdata', 1)
('#pytorch', 1)
...

-------------------------------------------
Time: 2021-10-24 15:27:05
-------------------------------------------
('#lalice', 9)
('#blackpink', 9)
('#lalisa', 9)
('#datascience', 6)
('#100daysofcode', 5)
('#bigdata', 5)
('#pytorch', 5)
('#dataanalytics', 5)
('#machinelearning', 5)
('#python', 5)
...

-------------------------------------------
Time: 2021-10-24 15:27:10
-------------------------------------------
('#lalice', 11)
('#blackpink', 11)
('#lalisa', 11)
('#datascience', 7)
('

-------------------------------------------
Time: 2021-10-24 15:29:00
-------------------------------------------
('#lalice', 61)
('#blackpink', 61)
('#lalisa', 61)
('#bigil', 27)
('#machinelearning', 22)
('#ai', 22)
('#datascience', 21)
('#100daysofcode', 20)
('#beast', 19)
('#2yrsofindustryhitbigil', 18)
...

-------------------------------------------
Time: 2021-10-24 15:29:05
-------------------------------------------
('#lalice', 64)
('#blackpink', 64)
('#lalisa', 64)
('#bigil', 29)
('#machinelearning', 22)
('#ai', 22)
('#datascience', 21)
('#100daysofcode', 20)
('#2yrsofindustryhitbigil', 19)
('#beast', 19)
...

-------------------------------------------
Time: 2021-10-24 15:29:10
-------------------------------------------
('#lalice', 64)
('#blackpink', 64)
('#lalisa', 64)
('#bigil', 29)
('#ai', 24)
('#100daysofcode', 22)
('#machinelearning', 22)
('#datascience', 21)
('#2yrsofindustryhitbigil', 20)
('#beast', 20)
...

-------------------------------------------
Time: 2021-10-24 

-------------------------------------------
Time: 2021-10-24 15:31:05
-------------------------------------------
('#lalice', 112)
('#blackpink', 112)
('#lalisa', 112)
('#bigil', 51)
('#ai', 48)
('#beast', 39)
('#datascience', 35)
('#master', 34)
('#bigdata', 33)
('#machinelearning', 33)
...

-------------------------------------------
Time: 2021-10-24 15:31:10
-------------------------------------------
('#lalice', 114)
('#blackpink', 114)
('#lalisa', 114)
('#bigil', 55)
('#ai', 48)
('#beast', 40)
('#master', 35)
('#datascience', 35)
('#2yrsofindustryhitbigil', 33)
('#bigdata', 33)
...

-------------------------------------------
Time: 2021-10-24 15:31:15
-------------------------------------------
('#lalice', 114)
('#blackpink', 114)
('#lalisa', 114)
('#bigil', 56)
('#ai', 49)
('#beast', 40)
('#master', 35)
('#datascience', 35)
('#2yrsofindustryhitbigil', 33)
('#bigdata', 33)
...

-------------------------------------------
Time: 2021-10-24 15:31:20
----------------------------------

-------------------------------------------
Time: 2021-10-24 15:33:10
-------------------------------------------
('#lalice', 153)
('#blackpink', 153)
('#lalisa', 153)
('#bigil', 78)
('#ai', 72)
('#datascience', 56)
('#machinelearning', 55)
('#beast', 53)
('#bigdata', 47)
('#master', 47)
...

-------------------------------------------
Time: 2021-10-24 15:33:15
-------------------------------------------
('#lalice', 155)
('#blackpink', 155)
('#lalisa', 155)
('#bigil', 79)
('#ai', 73)
('#datascience', 58)
('#machinelearning', 55)
('#beast', 54)
('#master', 48)
('#bigdata', 47)
...

-------------------------------------------
Time: 2021-10-24 15:33:20
-------------------------------------------
('#lalice', 157)
('#blackpink', 157)
('#lalisa', 157)
('#bigil', 80)
('#ai', 76)
('#datascience', 60)
('#machinelearning', 57)
('#beast', 54)
('#master', 48)
('#bigdata', 47)
...

-------------------------------------------
Time: 2021-10-24 15:33:25
-------------------------------------------
('#l

-------------------------------------------
Time: 2021-10-24 15:35:15
-------------------------------------------
('#lalice', 195)
('#blackpink', 195)
('#lalisa', 195)
('#bigil', 99)
('#ai', 95)
('#beast', 75)
('#machinelearning', 72)
('#datascience', 71)
('#master', 69)
('#2yrsofindustryhitbigil', 61)
...

-------------------------------------------
Time: 2021-10-24 15:35:20
-------------------------------------------
('#lalice', 195)
('#blackpink', 195)
('#lalisa', 195)
('#bigil', 99)
('#ai', 96)
('#beast', 75)
('#machinelearning', 72)
('#datascience', 71)
('#master', 69)
('#2yrsofindustryhitbigil', 61)
...

-------------------------------------------
Time: 2021-10-24 15:35:25
-------------------------------------------
('#lalice', 195)
('#blackpink', 195)
('#lalisa', 195)
('#bigil', 105)
('#ai', 97)
('#beast', 77)
('#machinelearning', 72)
('#master', 71)
('#datascience', 71)
('#2yrsofindustryhitbigil', 63)
...

-------------------------------------------
Time: 2021-10-24 15:35:30
--

-------------------------------------------
Time: 2021-10-24 15:37:25
-------------------------------------------
('#lalice', 221)
('#blackpink', 221)
('#lalisa', 221)
('#bigil', 120)
('#ai', 108)
('#beast', 88)
('#master', 80)
('#datascience', 78)
('#2yrsofindustryhitbigil', 73)
('#machinelearning', 73)
...

-------------------------------------------
Time: 2021-10-24 15:37:30
-------------------------------------------
('#lalice', 221)
('#blackpink', 221)
('#lalisa', 221)
('#bigil', 120)
('#ai', 108)
('#beast', 88)
('#master', 80)
('#datascience', 78)
('#2yrsofindustryhitbigil', 73)
('#machinelearning', 73)
...

-------------------------------------------
Time: 2021-10-24 15:37:35
-------------------------------------------
('#lalice', 221)
('#blackpink', 221)
('#lalisa', 221)
('#bigil', 120)
('#ai', 108)
('#beast', 88)
('#master', 80)
('#datascience', 78)
('#2yrsofindustryhitbigil', 73)
('#machinelearning', 73)
...

-------------------------------------------
Time: 2021-10-24 15:37: