In [1]:
# Dependencies
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from datetime import datetime
from json import loads
import mysql.connector
import time

In [2]:
# Setup Kafka and Spark Streaming
KAFKA_TOPIC = "social_media_topic"
BOOTSTRAP_SERVER = "localhost:9092"

ssc = StreamingContext(sc, 60) #stream each one minute
ssc.checkpoint("./checkpoint")
lines = KafkaUtils.createDirectStream(ssc, [KAFKA_TOPIC],
                                      {"metadata.broker.list": BOOTSTRAP_SERVER})

In [3]:
# Setup MySQL
user = 'raf'
password = 'S1!verBu!!et'
db_name = 'sosial_media'
tablename = 'streaming_sosmed'

client = mysql.connector.connect(host = 'localhost',
                                user = user,
                                passwd = password)
db_cursor = client.cursor()

## Setup MySQL Database
db_cursor.execute("SHOW DATABASES")
is_database_exist = False
for x in db_cursor:
    if x[0] == db_name:
        is_database_exist = True
if (not is_database_exist):
    db_cursor.execute(f"CREATE DATABASE {db_name};")
db_cursor.execute(f"USE {db_name}")

## Setup MySQL Table
db_cursor.execute("SHOW TABLES")
is_table_exist = False
for x in db_cursor:
    if x[0] == tablename:
        is_table_exist = True
        
if (not is_table_exist):
    db_cursor.execute(f"CREATE TABLE {tablename} (timestamp VARCHAR(20), social_media varchar(25), stream_count int, unique_user_count int, created_at datetime, updated_at datetime);")

## Temporary end mysql connection
client.commit()
db_cursor.close()
client.close()

In [4]:
def calculate_data(lines):
    def convert_timestamp(ts, socmed_type):
        result = ts
        if socmed_type == "twitter":
            result = datetime.strptime(ts, "%a %b %d %H:%M:%S %z %Y")
        elif socmed_type == "youtube":
            result = datetime.strptime(ts, "%Y-%m-%dT%H:%M:%SZ")
        elif socmed_type == "instagram":
            result = datetime.fromtimestamp(int(result))
        elif socmed_type == "facebook":
            result = datetime.strptime(ts, "%Y-%m-%dT%H:%M:%S%z")

        result = result.replace(second=0, microsecond=0)
        result = datetime.strftime(result, "%Y-%m-%dT%H:%M:%S")
        return result
    
    def map_elements(line):
        el = loads(line[1])
        data = {}
        socmed_type = el["crawler_target"]["specific_resource_type"]
        timestamp = None
        
        if socmed_type == "twitter":
            timestamp = convert_timestamp(el["created_at"], socmed_type)
            data["user_count"] = 1
            data["stream_count"] = 1
            data["user_ids"] = [el["user_id_str"]]
        elif socmed_type == "youtube":
            timestamp = convert_timestamp(el["snippet"]["publishedAt"], socmed_type)
            data["user_count"] = 1
            data["stream_count"] = 1
            if (el["snippet"].get("channelId")):
                data["user_ids"] = [el["snippet"]["channelId"]]
            else :
                data["user_ids"] = ["no_user_id_provided"]
        elif socmed_type == "instagram":
            timestamp = convert_timestamp(el["created_time"], socmed_type)
            data["user_count"] = 1
            data["stream_count"] = 1
            data["user_ids"] = [el["user"]["id"]]
        elif socmed_type == "facebook":
            timestamp = convert_timestamp(el["created_time"], socmed_type)
            data["user_count"] = 1
            data["stream_count"] = 1
            data["user_ids"] = [el["from"]["id"]]

        return ((socmed_type, timestamp), data)
    
    def reducer(a,b):
        # User Ids
        new_user_ids = list(set(a['user_ids'] + b['user_ids']))
        
        # User Count
        new_user_count = len(new_user_ids)
        
        # Stream Count
        new_stream_count = a['stream_count'] + b['stream_count']
        
        return {
            "user_count": new_user_count,
            "stream_count": new_stream_count,
            "user_ids": new_user_ids
        }
    
    def updateFunction(a,b):
        if b is None:
            b = {
                "user_count": 0,
                "stream_count": 0,
                "user_ids": []
            }
        
        if a == []:
            a = [{
                "user_count": 0,
                "stream_count": 0,
                "user_ids": []
            }]
        
        # User Ids
        new_user_ids = list(set(a[0]['user_ids'] + b['user_ids']))
        
        # User Count
        new_user_count = len(new_user_ids)
        
        # Stream Count
        new_stream_count = a[0]['stream_count'] + b['stream_count']
        
        # Write to SQL
        
        return {
            "user_count": new_user_count,
            "stream_count": new_stream_count,
            "user_ids": new_user_ids
        }
    
    # Streaming Main
    process_result = lines.map(map_elements)
    
    # Reduce Process
    reduced = process_result.reduceByKey(reducer)
    
    # Update Process
    result = reduced.updateStateByKey(updateFunction)
    
    return result
 
def sendPartition(partition):
    connection = mysql.connector.connect(host = 'localhost',
                                         user = user,
                                         passwd = password)
    db_cursor = connection.cursor()
    db_cursor.execute(f"USE {db_name}")
    current_datetime = datetime.now()
    for record in partition:
        sosmed = record[0][0]
        timestamp = record[0][1]
        user_count = record[1]['user_count']
        stream_count = record[1]['stream_count']
        current_datetime = datetime.now()
        # TODO LIST:
        # - Check if data exist
        # - Insert/update data
        query = f"""select * from {tablename} where timestamp='{timestamp}' and social_media='{sosmed}';"""
        db_cursor.execute(query)
        ada = False
        for x in db_cursor:
            ada = True
            break
        if(not ada):
            query = f"""insert into {tablename} values('{timestamp}', '{sosmed}', '{stream_count}', '{user_count}', '{current_datetime}', '{current_datetime}');"""
            db_cursor.execute(query)
        else:
            query = "update " + tablename + " set stream_count = " + str(stream_count) + ", unique_user_count = " + str(user_count) + ", updated_at = '" + current_datetime + "' where timestamp='" + timestamp + "' and social_media='" + sosmed + "';"
            db_cursor.execute(query)
        connection.commit()
    db_cursor.close()
    connection.close()     
    
social_media = calculate_data(lines)
social_media.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))

ssc.start()
ssc.awaitTermination()
#time.sleep(60)
#ssc.stop()