In [33]:
from kafka import KafkaConsumer
from kafka import KafkaProducer

import os
import google.generativeai as genai
import json
import pandas as pd
import re
import requests
from io import StringIO
import sys
from pathlib import Path
current_path = Path.cwd()
sys.path.append(str(current_path.parents[1]))
# print(str(current_path.parents[1]))
from src.config import Config
os.environ["GOOGLE_API_KEY"] = Config.GOOGLE_API_KEY
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from langchain_google_genai import ChatGoogleGenerativeAI
from src.module_ner.LLM_langchain import *

In [34]:
def run_consumer_extract_push_kafka(config: Config):
    # Khởi tạo consumer nhận từ topic fb_posts
    consumer = KafkaConsumer(
        config.KAFKA_TOPIC_FB_POSTS, 
        bootstrap_servers=config.KAFKA_SERVERS, 
        auto_offset_reset='earliest',
        group_id='fb_posts_viewer',
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )

    # Khởi tạo producer để push lên topic mới
    producer = KafkaProducer(
        bootstrap_servers=config.KAFKA_SERVERS,
        value_serializer=lambda x: json.dumps(x).encode('utf-8')
    )


    print("[SYSTEM] Consumer started, waiting for messages...")

    try:
        for message in consumer:
            data = message.value
            print(f"Received: {data}")
            print(type(data))

            # Gọi extract NER
            extracted_result = ner_extract(data["text"])
            print(extracted_result)
            print(extracted_result)
            for place_dict in extracted_result:
                place = place_dict['text']
                label = place_dict['label']

                result_payload = {
                    "place": place,
                    "label": label
                }

                # Push lên topic mới
                producer.send(config.KAFKA_TOPIC_COMMENTS, result_payload)
                print(f"[SYSTEM] Pushed NER result to {config.KAFKA_TOPIC_COMMENTS}: {result_payload}")

    except KeyboardInterrupt:
        print("Stopped by user.")
    finally:
        consumer.close()
        producer.flush()
        producer.close()

In [36]:
config = Config()
run_consumer_extract_push_kafka(config)

[SYSTEM] Consumer started, waiting for messages...
Received: {'text': 'Tôi ăn ở chợ Đêm Đà Lạt', 'url': 'streamlit_user_input'}
<class 'dict'>


Retrying langchain_google_genai.chat_models._chat_with_retry.<locals>._chat_with_retry in 2.0 seconds as it raised ResourceExhausted: 429 You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits. [violations {
  quota_metric: "generativelanguage.googleapis.com/generate_content_free_tier_requests"
  quota_id: "GenerateRequestsPerDayPerProjectPerModel-FreeTier"
  quota_dimensions {
    key: "model"
    value: "gemini-1.5-flash"
  }
  quota_dimensions {
    key: "location"
    value: "global"
  }
  quota_value: 50
}
, links {
  description: "Learn more about Gemini API quotas"
  url: "https://ai.google.dev/gemini-api/docs/rate-limits"
}
, retry_delay {
  seconds: 8
}
].


ResourceExhausted: 429 You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits. [violations {
  quota_metric: "generativelanguage.googleapis.com/generate_content_free_tier_requests"
  quota_id: "GenerateRequestsPerDayPerProjectPerModel-FreeTier"
  quota_dimensions {
    key: "model"
    value: "gemini-1.5-flash"
  }
  quota_dimensions {
    key: "location"
    value: "global"
  }
  quota_value: 50
}
, links {
  description: "Learn more about Gemini API quotas"
  url: "https://ai.google.dev/gemini-api/docs/rate-limits"
}
, retry_delay {
  seconds: 5
}
]

In [3]:
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'fb_posts',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='earliest',
    group_id='fb_posts_viewer',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

for message in consumer:
    print(message.value)

{'text': 'Đà Lạt bạn đã đến nơi nào?', 'url': 'https://www.facebook.com/reel/2155228254944487/'}
{'text': 'Đà Lạt bạn đã đến nơi nào?', 'url': 'https://www.facebook.com/reel/2155228254944487/'}
{'text': 'Tại 𝐊𝐢𝐞̂̀𝐮, chúng mình tự hào với:\n🥟 𝐁𝐀́𝐍𝐇 𝐂𝐔𝐎̂́𝐍 𝐓𝐇𝐔̉ 𝐂𝐎̂𝐍𝐆 𝟏𝟎𝟎%: Vỏ bánh mỏng tang, mềm mướt, nóng hổi vừa thổi vừa ăn, quyện cùng nhân thịt mộc nhĩ đậm đà và nước chấm chua ngọt gia truyền "cân" mọi khẩu vị. \n🍜 𝐁𝐔́𝐍 𝐂𝐇𝐀̉ 𝐇𝐀̀ 𝐍𝐎̣̂𝐈 Đ𝐔́𝐍𝐆 Đ𝐈𝐄̣̂𝐔: Chả nướng thơm lừng cả góc phố, thịt mềm thấm vị, nước chấm pha chuẩn chỉnh vị Hà Nội xưa - hương vị mà ai ăn cũng phải tấm tắc khen ngon!\n🏡 𝐊𝐇𝐎̂𝐍𝐆 𝐆𝐈𝐀𝐍: Ấm cúng, sạch sẽ, đậm chất Hà Nội xưa pha lẫn nét hiện đại, là nơi lý tưởng để bạn tụ họp, "chill" cùng bạn bè, gia đình trong dịp lễ này.', 'url': 'https://www.facebook.com/reel/665443249433540/'}
{'text': 'Ngắm hoàng hôn, chill cùng tiếng tàu chạy ngang – khung cảnh lãng mạn đúng kiểu\n"Đà Lạt mộng mơ"', 'url': 'https://www.facebook.com/reel/980996147534378/'}
{'text': 'Đà Lạt bạn đã đế

KeyboardInterrupt: 

In [2]:
topic_name = 'items'
consumer = KafkaConsumer(topic_name)
for message in consumer:
    print(message)

ConsumerRecord(topic='items', partition=0, leader_epoch=0, offset=2, timestamp=1751373178402, timestamp_type=0, key=None, value=b'Hello Trung', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=11, serialized_header_size=-1)


KeyboardInterrupt: 