In [1]:
from confluent_kafka import Producer, Consumer, KafkaError
from pymongo import MongoClient
import time

### CẤU HÌNH KAFKA

In [2]:
kafka_config = {
    'bootstrap.servers': 'localhost:9094, localhost:9194, localhost:9294',
    'security.protocol': 'SASL_PLAINTEXT',
    'sasl.mechanism': 'PLAIN',
    'sasl.username': 'kafka',
    'sasl.password': 'UnigapKafka@2024'
}

### KẾT NỐI MONGODB

In [3]:
# Connect to MongoDB
mongo_client = MongoClient('mongodb://localhost:27017/')
db = mongo_client['Kafka_MongoDB']
collection = db['Messages']

print("Kết nối MongoDB thành công")

Kết nối MongoDB thành công


### HÀM CALLBACK BÁO CÁO TRẠNG THÁI PRODUCER

In [4]:
def delivery_report(err, msg):
    """Callback báo cáo trạng thái gửi message"""
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")

### HÀM GỬI DỮ LIỆU VÀO KAFKA (PRODUCER)

In [5]:
def produce_messages(topic, messages):
    """Gửi các message vào Kafka"""
    producer = Producer(kafka_config)
    for message in messages:
        try:
            producer.produce(topic, message.encode('utf-8'), callback=delivery_report)
        except BufferError:
            print("Producer queue đầy, thử lại sau.")
        producer.poll(0)
    producer.flush()
    print("Tất cả message đã được produce")

### HÀM NHẬN DỮ LIỆU TỪ KAFKA VÀ LƯU VÀO MONGODB (CONSUMER)

In [6]:
def consume_and_store_messages(topic, timeout=10):
    """Consume từ Kafka và lưu vào MongoDB"""
    consumer_config = kafka_config.copy()
    consumer_config.update({
        'group.id': 'my_consumer_group',
        'auto.offset.reset': 'earliest'
    })
    consumer = Consumer(consumer_config)
    consumer.subscribe([topic])
    
    start_time = time.time()
    print("Bắt đầu consume message...")
    
    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                if time.time() - start_time > timeout:
                    break
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    print(f"Đã đọc hết dữ liệu ở partition: {msg.topic()} [{msg.partition()}]")
                else:
                    print("Lỗi: {}".format(msg.error()))
                continue
            
            message_value = msg.value().decode('utf-8')
            print(f"Consumed: {message_value}")
            result = collection.insert_one({'message': message_value})
            print("Đã insert document có id:", result.inserted_id)
    
    except KeyboardInterrupt:
        print("Dừng việc consume do KeyboardInterrupt")
    
    finally:
        consumer.close()
    print("Kết thúc việc consume.")

### Gửi Message Vào Kafka (Test Producer)

In [7]:
topic = 'project_kafka_mongo_1'
messages = [
    "Message 1: Say Hello to Kafka From Jupiter",
    "Message 2: I am experimenting sending messages to Kafka",
    "Message 3: And then save those messages to MongoDB"
]

produce_messages(topic, messages)


Message delivered to project_kafka_mongo_1 [0] at offset 3
Message delivered to project_kafka_mongo_1 [2] at offset 0
Message delivered to project_kafka_mongo_1 [2] at offset 1
Tất cả message đã được produce


### Nhận Message Từ Kafka Và Lưu Vào MongoDB (Test Consumer)

In [8]:
consume_and_store_messages(topic, timeout=10)

Bắt đầu consume message...
Consumed: Message 1: Say Hello to Kafka From Jupiter
Đã insert document có id: 67b5f7844f52b5e487b730d0
Consumed: Message 2: I am experimenting sending messages to Kafka
Đã insert document có id: 67b5f7844f52b5e487b730d1
Consumed: Message 3: And then save those messages to MongoDB
Đã insert document có id: 67b5f7844f52b5e487b730d2
Kết thúc việc consume.


### Kiểm tra dữ liệu đã lưu trong MongoDB

In [9]:
for doc in collection.find():
    print(doc)

{'_id': ObjectId('67b5f7844f52b5e487b730d0'), 'message': 'Message 1: Say Hello to Kafka From Jupiter'}
{'_id': ObjectId('67b5f7844f52b5e487b730d1'), 'message': 'Message 2: I am experimenting sending messages to Kafka'}
{'_id': ObjectId('67b5f7844f52b5e487b730d2'), 'message': 'Message 3: And then save those messages to MongoDB'}
