In [None]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Columbia EECS E6893 Big Data Analytics HW2

from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row,SQLContext
import sys
import requests
import time
import subprocess
import re
from google.cloud import bigquery

bucket = "word-count-bucket11"
output_directory_tweets = 'gs://{}/tweets/data'.format(bucket)

output_dataset = 'bigdata_sparkStreaming'

IP = 'localhost'
PORT = 9001

STREAMTIME = 300


def saveToStorage(rdd, output_directory, columns_name, mode):
    if not rdd.isEmpty():
        (rdd.toDF( columns_name ) \
        .write.save(output_directory, format="json", mode=mode))

        

def saveToBigQuery(sc, output_dataset, output_table, directory):
    files = directory + '/part-*'
    subprocess.check_call(
        'bq load --source_format NEWLINE_DELIMITED_JSON '
        '--replace '
        '--autodetect '
        '{dataset}.{table} {files}'.format(
            dataset=output_dataset, table=output_table, files=files
        ).split())
    output_path = sc._jvm.org.apache.hadoop.fs.Path(directory)
    output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
        output_path, True)


if __name__ == '__main__':
    conf = SparkConf()
    conf.setMaster('local[2]')
    conf.setAppName("TwitterStreamApp")

    sc = SparkContext(conf=conf)
    sc.setLogLevel("ERROR")

    sql_context = SQLContext(sc)

    ssc = StreamingContext(sc, 5)
    ssc.checkpoint("~/checkpoint_TwitterApp")

    dataStream = ssc.socketTextStream(IP, PORT)
    dataStream.pprint()

    tweets = dataStream.transform(lambda rdd: rdd.map(lambda x: (x, )))
    tweets.foreachRDD(lambda rdd: saveToStorage(
        rdd, output_directory_tweets, ['tweets'], mode="append"))

    ssc.start()
    time.sleep(STREAMTIME)
    ssc.stop(stopSparkContext=False, stopGraceFully=True)

    saveToBigQuery(sc, output_dataset, 'spiderman_tweets', output_directory_tweets)
    




-------------------------------------------
Time: 2021-12-18 05:42:40
-------------------------------------------

-------------------------------------------
Time: 2021-12-18 05:42:45
-------------------------------------------
RT @eoseless: o meme.
#SpiderManNoWayHome https://t.co/UHEwY563qdI wanna see more but please let tom holland rest first omfgRT @gabs1229: Spiderman 300000000/10 wow bye ameisin
RT @btspopmp3: time to bring this edit back (by ggukreum)
https://t.co/1EwUUqb8mm

-------------------------------------------
Time: 2021-12-18 05:42:50
-------------------------------------------
RT @Michael5SOS: the new spiderman is the best movie I have ever seen in my life
RT @beebinton: zendaya really dating tom holland just to make me jealous LMFAOOOORT @RenGeekness: 4 stages of watching #SpiderManNoWayHome https://t.co/Gbwx3D07z1

-------------------------------------------
Time: 2021-12-18 05:42:55
-------------------------------------------

-------------------------------------

In [2]:
sc.stop()