In [1]:
import json
with open('../config.json','rb') as file:
    config = json.load(file)

In [2]:
from pymongo import MongoClient

host = config['mongodb']['host']
port = config['mongodb']['port']
user = config['mongodb']['user']
password = config['mongodb']['password']

try:
    server = MongoClient(f'mongodb://{user}:{password}@{host}:{port}/')
    db = server.admin
    server_status = db.command("ping")
    print("MongoDB connection successful:", server_status)

    databases = server.list_database_names()
    print("Databases:", databases)

except Exception as e:
    print("An error occurred:", e)

MongoDB connection successful: {'ok': 1.0}
Databases: ['admin', 'config', 'ftde02', 'ftde03', 'ftde3', 'local']


In [3]:
from confluent_kafka import Consumer, KafkaException
import json
import pandas as pd
from pymongo import MongoClient

if __name__ == "__main__":
    consumer_config = {
        'bootstrap.servers': '34.56.65.122',
        'group.id': 'kelompok2-data-consumer-group',
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': False
    }
    
    consumer = Consumer(consumer_config)
    consumer.subscribe(["kelompok2-data-recruitment-selection"])

    print("Starting the consumer")

    try:
        while True:
            msg = consumer.poll(1.0)
            
            if msg is None:
                continue
            elif msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    print(f"End of partition reached: {msg.topic()} [{msg.partition()}]")
                else:
                    raise KafkaException(msg.error())
            else:
                print(f"Records = {json.loads(msg.value().decode('utf-8'))}")
                
                data = json.loads(msg.value().decode('utf-8'))
                producer = pd.DataFrame([data])
                
                db = server["ftde3"]
                collection = db["kelompok2_data_recruitment_selection_raw"]
    
                if isinstance(producer.to_dict('index')[0], list):
                    collection.insert_many(producer.to_dict('index')[0])
                else:
                    collection.insert_one(producer.to_dict('index')[0])
                
                print("Data has been saved to MongoDB")
    except KeyboardInterrupt:
        print("Consumer interrupted")
    finally:
        consumer.close()

Starting the consumer
Records = {'CandidateID': 1, 'Name': 'Kevin Wright', 'Gender': 'Female', 'Age': 35, 'Position': 'DevOps Engineer', 'ApplicationDate': '2024-09-02', 'Status': 'Rejected', 'InterviewDate': nan, 'OfferStatus': nan}
Data has been saved to MongoDB
Records = {'CandidateID': 2, 'Name': 'Ralph Gonzalez', 'Gender': 'Male', 'Age': 35, 'Position': 'Project Manager', 'ApplicationDate': '2023-11-20', 'Status': 'Rejected', 'InterviewDate': '2024-05-11', 'OfferStatus': 'Hired'}
Data has been saved to MongoDB
Records = {'CandidateID': 3, 'Name': 'Ian Perez', 'Gender': 'Female', 'Age': 27, 'Position': 'Data Engineer', 'ApplicationDate': '2024-08-23', 'Status': 'Rejected', 'InterviewDate': nan, 'OfferStatus': 'Hired'}
Data has been saved to MongoDB
Records = {'CandidateID': 4, 'Name': 'Gregory Romero', 'Gender': 'Male', 'Age': 59, 'Position': 'HR Manager', 'ApplicationDate': '2024-05-31', 'Status': 'Interviewed', 'InterviewDate': nan, 'OfferStatus': 'Hired'}
Data has been saved to 