In [1]:
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from datetime import datetime, timedelta

KAFKA_TOPIC = "social_media"
BOOTSTRAP_SERVER = "localhost:9092"

ssc = StreamingContext(sc,1) #stream each one second
ssc.checkpoint("./checkpoint")
lines = KafkaUtils.createDirectStream(ssc, [KAFKA_TOPIC], {"metadata.broker.list": BOOTSTRAP_SERVER})

In [2]:
import psycopg2
from psycopg2.extras import execute_batch
# Connect to the PostgreSQL database
conn = psycopg2.connect(
    host="localhost",
    port=5432,
    database="if4040",
    user="postgres",
    password="postgres"
)

# Open a cursor to perform database operations
cur = conn.cursor()

# Execute a test query
cur.execute("SELECT version()")

# Fetch and print the result
db_version = cur.fetchone()
print("PostgreSQL database version:", db_version)

conn.close()

PostgreSQL database version: ('PostgreSQL 10.23 (Ubuntu 10.23-0ubuntu0.18.04.1) on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 7.5.0-3ubuntu1~18.04) 7.5.0, 64-bit',)


In [3]:
def data_get_user_id(data):
    # function to get the user id based on what social media it is
    # input: 
    #     data: python dict
    # output:
    #     user_id: string
    
    # can also use a switch case here, but for now, if-else should do fine
    
    user_id = ""
    
    try:
        if data['crawler_target']['specific_resource_type'] == 'instagram':
            user_id = data['user']['id']
        elif data['crawler_target']['specific_resource_type'] == 'twitter':
            user_id = data['user_id']
        elif data['crawler_target']['specific_resource_type'] == 'facebook':
            user_id = data['from']['id']
        elif data['crawler_target']['specific_resource_type'] == 'youtube':
            if data['kind'] == 'youtube#video':
                user_id = data['snippet']['channelId']
            else:
                user_id = data['snippet']['authorChannelId']['value']
    except:
        raise ValueError("something went wrong when getting user ID")
    
    return user_id

def update_state(new_value, cur_state):
    if cur_state is None:
        cur_state = []
    
    cur_state += new_value
    return cur_state

def insert_to_postgres(data):
    # function to insert data to postgres
    # aggregated format: soc_med, timestamp, count, unique_count, created_at, updated_at
    # input:
    #     data: pyspark rdd
    
    rdd1 = data.map(lambda x: ((x[0], x[2]), [x[1]]))
    rdd2 = rdd1.reduceByKey(lambda x, y: x + y)
    rdd3 = rdd2.map(lambda x: (x[0][0], x[0][1], len(x[1]), len(set(x[1]))))

    records = rdd3.collect()

    print(records)
    
    conn = psycopg2.connect(
    host="localhost",
    port=5432,
    database="if4040",
    user="postgres",
    password="postgres"
    )

    # Open a cursor to perform database operations
    cur = conn.cursor()

    if not data.isEmpty():
        query = """
        INSERT INTO social_media (social_media, timestamp, count, unique_count) 
        VALUES (%s, %s, %s, %s)
        ON CONFLICT (social_media, timestamp) DO
        UPDATE SET count = social_media.count + EXCLUDED.count, 
        unique_count = social_media.unique_count + EXCLUDED.unique_count,
        updated_at = NOW()
        """
        execute_batch(cur, query, records)
    
        conn.commit()
        cur.close()
        conn.close()

def transform_data(lines, window_size = 2, sliding_interval = 2):
    now = datetime.now()
    nearest_minute = now.minute // 5 * 5
    rounded_time = datetime(now.year, now.month, now.day, now.hour, nearest_minute)
    time_str = rounded_time.strftime("%Y-%m-%dT%H:%M:%S")
    
    line = lines.map(lambda x: json.loads(x[1]))
    social_media = line.map(lambda x: (x['crawler_target']['specific_resource_type'],\
                                       data_get_user_id(x),\
                                       time_str))
    
    social_media.window(4,4).foreachRDD(insert_to_postgres)
    
    return social_media



# run the function
result = transform_data(lines)
# Print
result.pprint()
ssc.start()
ssc.awaitTermination()

-------------------------------------------
Time: 2023-04-17 21:50:49
-------------------------------------------
('twitter', 'SXRKWYQEHPRBQOXOWFBJ', '2023-04-17T21:50:00')
('facebook', 'HWXMEHNPNMCDZZSXGEYG', '2023-04-17T21:50:00')
('youtube', 'RBBKIKYXJOEWFFXSUXVN', '2023-04-17T21:50:00')

-------------------------------------------
Time: 2023-04-17 21:50:50
-------------------------------------------
('instagram', 'BLDPTXKYUGQPCOGJDZOW', '2023-04-17T21:50:00')

-------------------------------------------
Time: 2023-04-17 21:50:51
-------------------------------------------
('instagram', 'HBATCYRZFXVOQYGXNPYU', '2023-04-17T21:50:00')

[('instagram', '2023-04-17T21:50:00', 2, 2), ('facebook', '2023-04-17T21:50:00', 1, 1), ('youtube', '2023-04-17T21:50:00', 1, 1), ('twitter', '2023-04-17T21:50:00', 2, 2)]
-------------------------------------------
Time: 2023-04-17 21:50:52
-------------------------------------------
('twitter', 'GGYFPIBGLMUIXSEOYHRP', '2023-04-17T21:50:00')

---------

KeyboardInterrupt: 

In [7]:
# conn = psycopg2.connect(
#     host="localhost",
#     port=5432,
#     database="if4040",
#     user="postgres",
#     password="postgres"
# )

# # Open a cursor to perform database operations
# cur = conn.cursor()

# # Execute a test query
# cur.execute("SELECT version()")

# data = [('youtube', '2023-04-17T20:05:00', 1, 1), ('twitter', '2023-04-17T20:05:00', 2, 2), ('instagram', '2023-04-17T20:05:00', 1, 1)]
# query = """
# INSERT INTO social_media (social_media, timestamp, count, unique_count) 
# VALUES (%s, %s, %s, %s)
# ON CONFLICT (social_media, timestamp) DO
# UPDATE SET count = social_media.count + EXCLUDED.count, 
# unique_count = social_media.unique_count + EXCLUDED.unique_count,
# updated_at = NOW()
# """
# execute_batch(cur, query, data)

# conn.commit()