# Kafka - Producer and Consumer

In [4]:
from kafka import KafkaProducer
import json
import time

# Kafka 프로듀서를 설정합니다
producer = KafkaProducer(
    bootstrap_servers=['127.0.0.1:9091', '127.0.0.1:9092', '127.0.0.1:9093'], # Kafka 브로커 주소
    value_serializer=lambda v: json.dumps(v).encode('utf-8') # 메시지를 JSON 형식으로 직렬화
)

# 테스트 메시지를 보내는 함수
def send_message(test_message): # DICT
    producer.send('test', value=test_message) # test토픽에 보냄
    producer.flush()
    print("Message sent:", test_message)

# 주기적으로 메시지를 보내기

for i in range(10):
    test_message = {"test_value": i}
    send_message(test_message)
    time.sleep(0.5)

producer.close()


Message sent: {'test_value': 0}
Message sent: {'test_value': 1}
Message sent: {'test_value': 2}
Message sent: {'test_value': 3}
Message sent: {'test_value': 4}
Message sent: {'test_value': 5}
Message sent: {'test_value': 6}
Message sent: {'test_value': 7}
Message sent: {'test_value': 8}
Message sent: {'test_value': 9}


In [1]:
# Consumer
from kafka import KafkaConsumer
import json


# 컨슈머 설정 
consumer = KafkaConsumer(
    'test',
    bootstrap_servers=['127.0.0.1:9091', '127.0.0.1:9092', '127.0.0.1:9093'],
    auto_offset_reset='earliest', # 가장 오래된 메시지부터 읽기 시작
    value_deserializer=lambda v: json.loads(v.decode('utf-8'))
)

try:
    for message in consumer:
        print(f"Received message: {message.value} at offset {message.offset}")
except KeyboardInterrupt:
    print("Interrupted, closing consumer...")
finally:
    consumer.close()
    print("Consumer closed.")


NoBrokersAvailable: NoBrokersAvailable