In [None]:
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row,SQLContext
import sys
import requests
import time
import subprocess
import re
from google.cloud import bigquery
import pandas as pd

# global variables
bucket = "YOUR GCP BUCKET"    # TODO : replace with your own bucket name
output_directory_wordcount = 'gs://{}/hadoop/tmp/bigquery/pyspark_output/wordcount'.format(bucket)

# output table and columns name
project_id = 'YOUR PROJECT ID'
output_dataset = 'YOUR OUTPUT DATASET'                     #the name of your dataset in BigQuery
output_table_wordcount = 'YOUR OUTPUT TABLE NAME'
columns_name_wordcount = ['tag', 'count', 'date']

# parameter
IP = 'localhost'    # ip port
PORT = 9001         # port

STREAMTIME = 600     # time that the streaming process runs


# Helper functions
def saveToStorage(rdd, output_directory, columns_name, mode):
    """
    Save each RDD in this DStream to google storage
    Args:
    rdd: input rdd
    output_directory: output directory in google storage
    columns_name: columns name of dataframe
    mode: mode = "overwirte", overwirte the file
    mode = "append", append data to the end of file
    """
    if not rdd.isEmpty():
        (rdd.toDF( columns_name ) \
        .write.save(output_directory, format="json", mode=mode))


def saveToBigQuery(sc, output_dataset, output_table, directory):
    """
    Put temp streaming json files in google storage to google BigQuery
    and clean the output files in google storage
    """
    files = directory + '/part-*'
    subprocess.check_call(
        'bq load --source_format NEWLINE_DELIMITED_JSON '
        '--replace '
        '--autodetect '
        '{dataset}.{table} {files}'.format(
            dataset=output_dataset, table=output_table, files=files
        ).split())
    output_path = sc._jvm.org.apache.hadoop.fs.Path(directory)
    output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(output_path, True)


def wordCount(words):
    """
    Calculte the count of 5 sepcial words for every 60 seconds (window no overlap)
    You can choose your own words.
    Your should:
    1. filter the words
    2. count the word during a special window size
    3. add a time related mark to the output of each window, ex: a datetime type
    Hints:
        You can take a look at reduceByKeyAndWindow transformation
        Dstream is a serioes of rdd, each RDD in a DStream contains data from a certain interval
        You may want to take a look of transform transformation of DStream when trying to add a time
    Args:
        dstream(DStream): stream of real time tweets
    Returns:
        DStream Object with inner structure (word, (count, time))
    """

    # TODO: insert your code here
    
    words = words.map(lambda word: word.lower()).filter(lambda word: word not in ["", ''])
    mapped_words = words.map(lambda word: (word, 1))
    reduced_words = mapped_words.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 10, 10)    
    total_word_count = reduced_words.transform(lambda time, wC: wC.map(lambda x: (x[0], x[1],
                                                                                   time.strftime("%m/%d %H:%M:%S"))))
    
    return total_word_count


def savingTry(words, project_id, output_dataset, output_table_wordcount, columns_name_wordcount):
    
    data = words.collect()
    dfData = pd.DataFrame(data, columns=columns_name_wordcount)
    sTablePath = "{}.{}".format(output_dataset, output_table_wordcount)
    dfData.to_gbq(sTablePath, project_id, if_exists='append')
    


if __name__ == '__main__':
    # Spark settings
    conf = SparkConf()
    conf.setMaster('local[2]')
    conf.setAppName("StackOverflowStreamApp")

    # create spark context with the above configuration
    # sc = SparkContext(conf=conf)
    sc = SparkContext.getOrCreate(conf=conf)
    sc.setLogLevel("ERROR")

    # create sql context, used for saving rdd
    sql_context = SQLContext(sc)

    # create the Streaming Context from the above spark context with batch interval size 5 seconds
    ssc = StreamingContext(sc, 10)
    
    # setting a checkpoint to allow RDD recovery
    ssc.checkpoint("~/checkpoint_stackOverflowApp")

    # read data from port 9001
    dataStream = ssc.socketTextStream(IP, PORT)
    dataStream.pprint()
    
    words = dataStream.flatMap(lambda line: line.split(" "))
    wordCount = wordCount(words)
    wordCount.pprint()

    wordCount.foreachRDD(lambda rdd: saveToStorage(rdd, output_directory_wordcount, columns_name_wordcount, mode="overwrite"))
    
    # put the temp result in google storage to google BigQuery
    wordCount.foreachRDD(lambda rdd: savingTry(rdd, project_id, output_dataset, output_table_wordcount, columns_name_wordcount))
    
    # start streaming process, wait for 600s and then stop.
    ssc.start()
    time.sleep(STREAMTIME)
    ssc.stop(stopSparkContext=False, stopGraceFully=True)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/12/02 05:11:02 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
22/12/02 05:11:02 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
22/12/02 05:11:02 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
22/12/02 05:11:02 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
[Stage 0:>                                                          (0 + 1) / 1]

-------------------------------------------
Time: 2022-12-02 05:11:10
-------------------------------------------



                                                                                

-------------------------------------------
Time: 2022-12-02 05:11:10
-------------------------------------------



[Stage 0:>                                                          (0 + 1) / 1]