In [None]:
import pika
import json
import pandas as pd
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError

QUEUE_TO_TABLE = {
    "Rata_rata_Kapasitas":"Rata_rata_Kapasitas",
    "Bahan bakar populer":"Bahan bakar populer",
    "Kapasitas Per country":"Kapasitas Per country",
    "Energi per jenis pembangkit":"Energi per jenis pembangkit",
    "Bahan bakar terpopuler setiap negara":"Bahan bakar terpopuler setiap negara",
    "Negara dengan Energi Terbesar":"Negara dengan Energi Terbesar",
    "total energi pertahun":"total energi pertahun"
}

dbname = "UAS DATWER"
uname  = "postgres"
passwd = "dany"
host   = "localhost"
port   = 5432

connSql = f"postgresql+psycopg2://{uname}:{passwd}@{host}:{port}/{dbname}"
engine  = create_engine(connSql)

def callback(ch, method, properties, body):
    queue_name = method.routing_key  

    if queue_name not in QUEUE_TO_TABLE:
        print(f"Queue '{queue_name}' tidak dikenali → skip.")
        ch.basic_ack(delivery_tag=method.delivery_tag)
        return

    table_name = QUEUE_TO_TABLE[queue_name]

    try:
        message_dict = json.loads(body.decode("utf-8"))
        df_row = pd.DataFrame([message_dict])  

        df_row.to_sql(
            name=table_name,
            con=engine,
            if_exists="append", 
            index=False
        )

        ch.basic_ack(delivery_tag=method.delivery_tag) 
        print(f"({queue_name}) → Insert ke '{table_name}': {message_dict}")

    except Exception as err:
        print(f"({queue_name}) ERROR {type(err).__name__}: {err}") 
        ch.basic_ack(delivery_tag=method.delivery_tag) 


def main():
    rabbit_conn = pika.BlockingConnection(
        pika.ConnectionParameters(host="localhost")
    )
    channel = rabbit_conn.channel()
    for queue_name in QUEUE_TO_TABLE.keys():
        channel.queue_declare(queue=queue_name, durable=True)

    channel.basic_qos(prefetch_count=1)

    for queue_name in QUEUE_TO_TABLE.keys():
        channel.basic_consume(
            queue=queue_name,
            on_message_callback=callback
        )

    print("Menunggu pesan dari queue:", list(QUEUE_TO_TABLE.keys()))
    print("Tekan CTRL+C untuk berhenti.")

    try:
        channel.start_consuming()
    except KeyboardInterrupt:
        print("Menghentikan consumer...")
        channel.stop_consuming()
    finally:
        rabbit_conn.close()


if __name__ == "__main__":
    main()

Menunggu pesan dari queue: ['Rata_rata_Kapasitas', 'Bahan bakar populer', 'Kapasitas Per country', 'Energi per jenis pembangkit', 'Bahan bakar terpopuler setiap negara', 'Negara dengan Energi Terbesar', 'total energi pertahun']
Tekan CTRL+C untuk berhenti.
(Rata_rata_Kapasitas) → Insert ke 'Rata_rata_Kapasitas': {'Rata-rata Kapasitas (MW)': 23.9729381818, 'Total Kapasitas (MW)': 171406.508}
(Bahan bakar populer) → Insert ke 'Bahan bakar populer': {'Total Kapasitas (MW)': 9827704.998}
(Kapasitas Per country) → Insert ke 'Kapasitas Per country': {'Kapasitas per GWh': 0.3413975319}
(Energi per jenis pembangkit) → Insert ke 'Energi per jenis pembangkit': {'Total Energi (GWh)': 137322.0659456667}
(Bahan bakar terpopuler setiap negara) → Insert ke 'Bahan bakar terpopuler setiap negara': {'Jumlah Pembangkit': 3283}
(Negara dengan Energi Terbesar) → Insert ke 'Negara dengan Energi Terbesar': {'Total Energi (GWh)': 17718726.97300002}
(total energi pertahun) → Insert ke 'total energi pertahun': 