## Consumer

In [8]:
from stream_pack.modelib.FraudModel import runModel
from stream_pack.dumplib.dump import load_data
from kafka import KafkaConsumer
from pymongo import MongoClient
import pandas as pd
import urllib.parse
import json
import os

if __name__ == "__main__":
    # Informasi koneksi ke PostgreSQL
    USERNAME = os.getenv("USER")
    PASSWORD = os.getenv("PASSWORD")
    HOST = os.getenv("HOST")
    DATABASE = os.getenv("DB_NAME")
    PASSWORD = urllib.parse.quote_plus(PASSWORD)

    # URL koneksi ke PostgreSQL
    db_url = f"postgresql+psycopg2://{USERNAME}:{PASSWORD}@{HOST}/{DATABASE}?sslmode=require"
    table_name = "old_information"

    new_data = {
        'step': 1,
        'type': 'PAYMENT',
        'amount': 9839.64,
        'oldbalanceOrg': 170136.0,
        'newbalanceOrig': 160296.36,
        'oldbalanceDest': 0.0,
        'newbalanceDest': 0.0
    }

    # Mengatur koneksi ke MongoDB
    mongo_client = MongoClient("mongodb://admin:password@localhost:27017/")
    db = mongo_client["ftde01"]
    collection = db["tes01-collection"]

    # Mengatur Kafka Consumer
    consumer = KafkaConsumer("ftde01-project4", bootstrap_servers='localhost')
    print("Starting the consumer")
    
    for msg in consumer:
        print(f"Records = {json.loads(msg.value)}")
        
        # Parsing pesan Kafka
        data = json.loads(msg.value)
        
        # Menjalankan prediksi menggunakan fungsi runModel
        prediction = runModel(new_data, os.getcwd())  # Ubah '.' dengan path packages jika diperlukan
        
        # Load data from DB
        df_old = load_data(table_name, db_url)
        df_old.columns

        # create DF producer
        producer = pd.DataFrame([data])
        data_merge = producer.merge(df_old,
                              how='inner',
                              on=['nameOrig', 
                                  'nameDest']
                                  )
        predict = data_merge.drop(['nameOrig', 'nameDest'], axis=1)
        predict = predict.to_dict('index')
        print(predict)

        # Menambahkan hasil prediksi ke data sebelum disimpan
        data_merge['prediction'] = prediction
        data_dict = data_merge.to_dict(orient='records')
        print(data_dict)

        # Menyimpan data ke MongoDB
        if isinstance(data, list):
            collection.insert_many(data)
        else:
            collection.insert_one(data)
        
        print("Data telah disimpan ke MongoDB")

        # Show data
        data_from_mongo = list(collection.find())
        dfm = pd.DataFrame(data_from_mongo)
        print(dfm)


Starting the consumer
Records = {'step': 1, 'type': 'PAYMENT', 'amount': 9839.64, 'nameOrig': 'C1231006815', 'newbalanceOrig': 160296.36, 'nameDest': 'M1979787155', 'newbalanceDest': 0.0}
{0: {'step': 1, 'type': 'PAYMENT', 'amount': 9839.64, 'newbalanceOrig': 160296.36, 'newbalanceDest': 0.0, 'oldbalanceOrg': 170136.0, 'oldbalanceDest': 0.0}}
[{'step': 1, 'type': 'PAYMENT', 'amount': 9839.64, 'nameOrig': 'C1231006815', 'newbalanceOrig': 160296.36, 'nameDest': 'M1979787155', 'newbalanceDest': 0.0, 'oldbalanceOrg': 170136.0, 'oldbalanceDest': 0.0, 'prediction': 'White List'}]
Data telah disimpan ke MongoDB
                          _id  step      type     amount     nameOrig  \
0    6691bd95bad1327df42486dd     1   PAYMENT    9839.64  C1231006815   
1    6691bd95bad1327df42486de     1   PAYMENT    1864.28  C1666544295   
2    6691bd95bad1327df42486df     1  TRANSFER     181.00  C1305486145   
3    6691bd96bad1327df42486e0     1  CASH_OUT     181.00   C840083671   
4    6691bd97bad1327df4

KeyboardInterrupt: 