In [1]:
"""
Kafka 브로커의 메모리 및 네트워크 설정을 최적화하는 실습입니다.

TODO:
1. Kafka 브로커 설정을 변경하여 메시지 처리 속도를 실험합니다.
2. `socket.send.buffer.bytes`, `socket.receive.buffer.bytes`, `num.network.threads`, `num.io.threads` 값을 조정합니다.
3. 설정 변경이 메시지 전송 속도 및 브로커 성능에 미치는 영향을 측정합니다.
4. 메시지를 모두 전송하고 소비할 때까지 걸린 시간을 출력합니다.
"""

from kafka import KafkaProducer, KafkaConsumer
import time
import json
import subprocess

In [2]:
# 설정 값
BROKER = "localhost:9092"
TOPIC = "test-topic"
SEND_BUFFER_SIZES = [131072, 524288, 1048576]  # 128KB, 512KB, 1MB
RECEIVE_BUFFER_SIZES = [131072, 524288, 1048576]  # 128KB, 512KB, 1MB
NETWORK_THREADS = [2, 4, 8]
IO_THREADS = [2, 4, 8]
NUM_MESSAGES = 50000
MESSAGE_SIZE = 100

In [3]:
# TODO 1: 100바이트 크기의 JSON 메시지를 생성
MESSAGE_PAYLOAD = json.dumps({"data": "A" * (MESSAGE_SIZE - 20)}).encode('utf-8')  # 메시지 크기 설정

In [4]:
# TODO 2: Kafka 브로커 설정을 변경하는 함수
def update_broker_config(send_buffer, receive_buffer, net_threads, io_threads):
    """Kafka 브로커 설정을 변경하고 재시작하는 함수"""
    config_updates = [
        f"socket.send.buffer.bytes={send_buffer}",  # 전송 버퍼 크기 설정
        f"socket.receive.buffer.bytes={receive_buffer}",  # 수신 버퍼 크기 설정
        f"num.network.threads={net_threads}",  # 네트워크 스레드 수 설정
        f"num.io.threads={io_threads}"  # I/O 스레드 수 설정
    ]
    config_file = "/home/eulbang/kafka/config/server.properties"  # Kafka 설정 파일 경로

    # 기존 설정 업데이트
    with open(config_file, "r") as file:
        lines = file.readlines()

    with open(config_file, "w") as file:
        for line in lines:
            if any(param.split("=")[0] in line for param in config_updates):
                continue  # 기존 설정을 제거
            file.write(line)
        for param in config_updates:
            file.write(param + "\n")

    # Kafka 브로커 재시작
    restart_kafka_broker()  # Kafka 브로커를 재시작하는 함수 호출

In [7]:
# TODO 3: Kafka 브로커 안전 재시작 함수
def restart_kafka_broker():
    """Kafka 브로커를 안전하게 재시작하는 함수"""

    # Kafka 프로세스 실행 여부 확인
    result = subprocess.run(["pgrep", "-f", "___"], stdout=subprocess.PIPE, text=True)  # Kafka 프로세스 확인

    if result.stdout.strip():  # Kafka가 실행 중이면 종료
        print("Stopping Kafka broker...")
        subprocess.run(["/home/eulbang/kafka/bin/kafka-server-stop.sh"], check=False)  # Kafka 브로커 중지
        time.sleep(5)  # 브로커가 완전히 종료될 때까지 대기

    print("Starting Kafka broker...")

    # Kafka를 백그라운드에서 실행
    subprocess.Popen([
        "/home/eulbang/kafka/bin/kafka-server-start.sh",
        "/home/eulbang/kafka/config/server.properties"
    ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, close_fds=True)  # Kafka 시작

    # Kafka가 정상적으로 시작되었는지 확인
    time.sleep(5)  # Kafka가 시작될 시간을 확보
    confirm_result = subprocess.run(["pgrep", "-f", "kafka.Kafka"], stdout=subprocess.PIPE, text=True)  # Kafka 실행 여부 확인
    if not confirm_result.stdout.strip():
        print("Kafka did not start properly. Check logs for details.")  # 오류 발생 시 메시지 출력
    else:
        print("Kafka successfully started!")  # 정상 실행 시 메시지 출력

In [8]:
# TODO 4: Kafka 프로듀서를 생성하고 메시지 발행
def produce_messages():
    """Kafka 프로듀서를 생성하여 메시지를 발행하는 함수"""
    print("Producing messages...")

    producer = KafkaProducer(
        bootstrap_servers=BROKER,  # Kafka 브로커 주소 설정
        batch_size=32768,  # 배치 크기 설정
        linger_ms=50,  # 배치를 적절히 활용하도록 설정
        acks="all"  # 메시지 전송 확인 설정
    )

    start_time = time.time()  # 메시지 전송 시작 시간 기록
    for _ in range(NUM_MESSAGES):  # 메시지 전송 반복 횟수
        producer.send(TOPIC, MESSAGE_PAYLOAD)  # 메시지 전송
    producer.flush()  # 모든 메시지 전송 완료
    elapsed_time = time.time() - start_time  # 경과 시간 측정
    print(f"Produced {NUM_MESSAGES} messages in {elapsed_time:.3f} sec\n")

In [9]:
# TODO 5: Kafka 컨슈머를 생성하고 메시지 소비 테스트
def consume_messages():
    """Kafka 컨슈머를 생성하여 메시지를 소비하는 함수"""
    print("Consuming messages...")

    consumer = KafkaConsumer(
        TOPIC,  # 구독할 토픽 설정
        bootstrap_servers=BROKER,  # Kafka 브로커 주소 설정
        auto_offset_reset="earliest",  # 오프셋 초기화 방식 설정
        enable_auto_commit=False  # 자동 오프셋 커밋 여부 설정
    )

    start_time = time.time()  # 메시지 소비 시작 시간 기록
    message_count = 0
    for message in consumer:
        message_count += 1
        if message_count >= NUM_MESSAGES:  # 원하는 메시지 개수 도달 시 종료
            break
    elapsed_time = time.time() - start_time  # 경과 시간 측정
    print(f"Consumed {NUM_MESSAGES} messages in {elapsed_time:.3f} sec\n")

In [10]:
# TODO 6: 서로 다른 설정값 조합을 테스트
for send_buffer in SEND_BUFFER_SIZES:
    for receive_buffer in RECEIVE_BUFFER_SIZES:
        for net_threads in NETWORK_THREADS:
            for io_threads in IO_THREADS:
                print(f"Testing send_buffer={send_buffer}, receive_buffer={receive_buffer}, "
                      f"network_threads={net_threads}, io_threads={io_threads}...")

                # Kafka 브로커 설정 업데이트 및 재시작
                update_broker_config(send_buffer, receive_buffer, net_threads, io_threads)  # Kafka 브로커 설정 업데이트 함수 호출

                # 메시지 발행 및 소비 테스트
                produce_messages()  # 메시지 발행 함수 호출
                consume_messages()  # 메시지 소비 함수 호출

                # 테스트 간 간격 추가
                time.sleep(5)  # 테스트 간 5초 대기

Testing send_buffer=131072, receive_buffer=131072, network_threads=2, io_threads=2...
Starting Kafka broker...
Kafka successfully started!
Producing messages...
Produced 50000 messages in 1.352 sec

Consuming messages...
Consumed 50000 messages in 0.444 sec

Testing send_buffer=131072, receive_buffer=131072, network_threads=2, io_threads=4...
Starting Kafka broker...
Kafka successfully started!
Producing messages...
Produced 50000 messages in 1.297 sec

Consuming messages...
Consumed 50000 messages in 0.455 sec

Testing send_buffer=131072, receive_buffer=131072, network_threads=2, io_threads=8...
Starting Kafka broker...
Kafka successfully started!
Producing messages...
Produced 50000 messages in 4.223 sec

Consuming messages...
Consumed 50000 messages in 0.452 sec

Testing send_buffer=131072, receive_buffer=131072, network_threads=4, io_threads=2...
Starting Kafka broker...
Kafka successfully started!
Producing messages...
Produced 50000 messages in 1.312 sec

Consuming messages...
Co