In [None]:
from kafka import KafkaConsumer
import json
import pandas as pd
from sqlalchemy import create_engine
import urllib.parse



def insert_data_to_postgresql(df, table_name, db_url):
    try:
        engine = create_engine(db_url,pool_pre_ping=True)
        df.to_sql(table_name, engine, if_exists='append', index=False)
        print(f"Data telah dimasukkan ke tabel {table_name}.")
    except Exception as e:
        print(f"Terjadi kesalahan: {e}")

if __name__ == "__main__":
    consumer = KafkaConsumer(
        'my-topic',
        bootstrap_servers = ['localhost:9092'],
        auto_offset_reset = 'earliest',
        enable_auto_commit = True,
        group_id = 'my-group-id',
        value_deserializer=lambda x: x.decode('utf-8')
    )
    
    # Informasi koneksi ke PostgreSQL better ganti ke localhost
    username = "postgres"
    password = "postgres"
    host = "localhost"
    port = "5432"
    database = "stream_processing"
    password = urllib.parse.quote_plus(password)

    # URL koneksi ke PostgreSQL
    db_url = f"postgresql://{username}:{password}@{host}:{port}/{database}"

    table_name = "New_information"

    for message in consumer:
        message_value = message.value
        print(f"Menerima pesan:{message_value}")
        
        if not message_value.strip():
            print("Pesan kosong, melewati.")
            continue

        try:
            message_data = json.loads(message_value)
            df=pd.DataFrame([message_value])
            insert_data_to_postgresql(df, table_name, db_url)
        except json.JSONDecodeError as e:
            print(f"Pesan tidak valid (bukan JSON): {message_value}. Error: {e}")
        except Exception as e:
            print(f"Terjadi kesalahan: {e}")