In [None]:
import json
import time
import uuid
from typing import Any

BOOTSTRAP = "localhost:9092,localhost:9094,localhost:9096"
EXPECTED_PORTS = [9092, 9094, 9096]

TOPIC = "hy-smoketest"
PARTITIONS = 3
REPLICATION = 3
MIN_BROKERS = 3

PRODUCE_MESSAGES = 20
TIMEOUT_S = 60

print("BOOTSTRAP:", BOOTSTRAP)
print("EXPECTED_PORTS:", EXPECTED_PORTS)
print("TOPIC:", TOPIC)

BOOTSTRAP: localhost:9092,localhost:9094,localhost:9096
EXPECTED_PORTS: [9092, 9094, 9096]
TOPIC: hy-smoketest


In [None]:
from confluent_kafka import Consumer, Producer
from confluent_kafka.admin import AdminClient, NewTopic


def now_ms() -> int:
    """Return current timestamp in milliseconds."""
    return int(time.time() * 1000)


def mk_admin(bootstrap: str, timeout_s: int = 60) -> AdminClient:
    """Create a Kafka AdminClient."""
    return AdminClient(
        {
            "bootstrap.servers": bootstrap,
            "socket.timeout.ms": timeout_s * 1000,
            "api.version.request": True,
        }
    )


def fetch_metadata(admin: AdminClient, timeout_s: int = 60) -> tuple[str, list[tuple[int, str, int]]]:
    """Fetch cluster ID and broker metadata."""
    md = admin.list_topics(timeout=timeout_s)
    cluster_id = getattr(md, "cluster_id", None) or "unknown"
    brokers = []
    for bid, b in md.brokers.items():
        brokers.append((bid, b.host, b.port))
    brokers.sort(key=lambda x: x[0])
    return cluster_id, brokers


def assert_expected_ports(brokers: list[tuple[int, str, int]], expected_ports: list[int]) -> None:
    """Assert that broker ports match expected ports."""
    ports = sorted({p for _, _, p in brokers})
    if len(brokers) < MIN_BROKERS:
        raise RuntimeError(f"브로커 수가 {MIN_BROKERS} 미만입니다: {len(brokers)} / brokers={brokers}")  # noqa: TRY003
    if sorted(expected_ports) != ports:
        msg = (
            "메타데이터 advertised 포트가 기대값과 다릅니다.\n"
            f"- expected_ports={sorted(expected_ports)}\n"
            f"- metadata_ports ={ports}\n"
            f"- brokers={brokers}"
        )
        raise RuntimeError(msg)


admin = mk_admin(BOOTSTRAP, TIMEOUT_S)
print("AdminClient 준비 완료")

AdminClient 준비 완료


In [12]:
cluster_id, brokers = fetch_metadata(admin, TIMEOUT_S)

print(f"[INFO] cluster_id={cluster_id}")
print("[INFO] brokers(metadata):")
for bid, host, port in brokers:
    print(f"  - id={bid}, host={host}, port={port}")

assert_expected_ports(brokers, EXPECTED_PORTS)
print(f"[OK] advertised ports 검증 성공: {sorted(EXPECTED_PORTS)}")

[INFO] cluster_id=TAt6dvzRT1SGJD5zqdCI-A
[INFO] brokers(metadata):
  - id=1, host=localhost, port=9092
  - id=2, host=localhost, port=9094
  - id=3, host=localhost, port=9096
[OK] advertised ports 검증 성공: [9092, 9094, 9096]


In [13]:
bootstrap_list = [x.strip() for x in BOOTSTRAP.split(",") if x.strip()]

for bs in bootstrap_list:
    a = mk_admin(bs, TIMEOUT_S)
    cid, brs = fetch_metadata(a, TIMEOUT_S)
    ports = sorted({p for _, _, p in brs})
    print(f"[OK] probe: {bs} -> cluster_id={cid}, metadata_ports={ports}")

[OK] probe: localhost:9092 -> cluster_id=TAt6dvzRT1SGJD5zqdCI-A, metadata_ports=[9092, 9094, 9096]
[OK] probe: localhost:9094 -> cluster_id=TAt6dvzRT1SGJD5zqdCI-A, metadata_ports=[9092, 9094, 9096]
[OK] probe: localhost:9096 -> cluster_id=TAt6dvzRT1SGJD5zqdCI-A, metadata_ports=[9092, 9094, 9096]


In [14]:
def ensure_topic(admin: AdminClient, topic: str, partitions: int, replication: int, timeout_s: int = 60) -> None:
    """Create topic if it doesn't exist."""
    new_topic = NewTopic(topic=topic, num_partitions=partitions, replication_factor=replication)
    fs = admin.create_topics([new_topic], request_timeout=timeout_s)
    f = fs.get(topic)

    if f is None:
        print("[INFO] create_topics 결과가 비어있습니다. (환경에 따라 정상)")
        return

    try:
        f.result(timeout=timeout_s)
        print(f"[OK] 토픽 생성: {topic} (partitions={partitions}, replication={replication})")
    except Exception as e:
        msg = str(e)
        if "TOPIC_ALREADY_EXISTS" in msg or "already exists" in msg.lower():
            print(f"[OK] 토픽 이미 존재: {topic}")
            return
        raise


ensure_topic(admin, TOPIC, PARTITIONS, REPLICATION, TIMEOUT_S)

[OK] 토픽 생성: hy-smoketest (partitions=3, replication=3)


In [None]:
def produce_messages(bootstrap: str, topic: str, n: int, timeout_s: int = 60) -> str:
    """Produce test messages to the topic."""
    producer = Producer(
        {
            "bootstrap.servers": bootstrap,
            "socket.timeout.ms": timeout_s * 1000,
            "enable.idempotence": True,  # 멱등성(Idempotence) 활성화
            "acks": "all",
        }
    )

    delivery_errors = []

    def on_delivery(err, _):
        if err is not None:
            delivery_errors.append(str(err))

    run_id = str(uuid.uuid4())

    for i in range(n):
        payload = {"run_id": run_id, "seq": i, "ts_ms": now_ms()}
        key = f"{run_id}:{i}".encode()
        val = json.dumps(payload, ensure_ascii=False).encode()
        producer.produce(topic=topic, key=key, value=val, on_delivery=on_delivery)

    producer.flush(timeout=timeout_s)

    if delivery_errors:
        raise RuntimeError(f"produce 실패: {delivery_errors[:3]} (총 {len(delivery_errors)}건)")  # noqa: TRY003
    print(f"[OK] produce 완료: topic={topic}, messages={n}, run_id={run_id}")
    return run_id


RUN_ID = produce_messages(BOOTSTRAP, TOPIC, PRODUCE_MESSAGES, TIMEOUT_S)
RUN_ID

[OK] produce 완료: topic=hy-smoketest, messages=20, run_id=3f9432f4-1db7-4d79-b7f2-ff6efb752b2e


'3f9432f4-1db7-4d79-b7f2-ff6efb752b2e'

In [None]:
def consume_by_run_id(
    bootstrap: str, topic: str, run_id: str, timeout_s: int = 60, min_expected: int = 1
) -> list[dict[str, Any]]:
    """Consume messages filtered by run_id."""
    group_id = f"smoketest-{uuid.uuid4()}"
    consumer = Consumer(
        {
            "bootstrap.servers": bootstrap,
            "group.id": group_id,
            "auto.offset.reset": "earliest",
            "enable.auto.commit": False,
            "socket.timeout.ms": timeout_s * 1000,
        }
    )
    consumer.subscribe([topic])

    received = []
    deadline = time.time() + timeout_s

    try:
        while time.time() < deadline:
            msg = consumer.poll(1.0)
            if msg is None:
                continue
            if msg.error():
                continue

            try:
                payload = json.loads(msg.value().decode("utf-8"))
                if isinstance(payload, dict) and payload.get("run_id") == run_id:
                    received.append(payload)
            except (json.JSONDecodeError, UnicodeDecodeError, AttributeError, TypeError):
                continue

            if len(received) >= min_expected and len(received) >= PRODUCE_MESSAGES:
                break

        if len(received) < min_expected:
            error_msg = f"consume 결과가 부족합니다: got={len(received)}, min_expected={min_expected}"
            raise RuntimeError(error_msg)

        print(f"[OK] consume 완료: run_id={run_id}, received={len(received)}, group_id={group_id}")
        received.sort(key=lambda x: x.get("seq", 0))
        return received

    finally:
        consumer.close()


received = consume_by_run_id(BOOTSTRAP, TOPIC, RUN_ID, TIMEOUT_S, min_expected=max(1, PRODUCE_MESSAGES // 2))
received[:5], received[-1]

[OK] consume 완료: run_id=3f9432f4-1db7-4d79-b7f2-ff6efb752b2e, received=20, group_id=smoketest-da0b4672-9fe8-4fd5-9bc4-ec463d5baa84


([{'run_id': '3f9432f4-1db7-4d79-b7f2-ff6efb752b2e',
   'seq': 0,
   'ts_ms': 1769672397333},
  {'run_id': '3f9432f4-1db7-4d79-b7f2-ff6efb752b2e',
   'seq': 1,
   'ts_ms': 1769672397333},
  {'run_id': '3f9432f4-1db7-4d79-b7f2-ff6efb752b2e',
   'seq': 2,
   'ts_ms': 1769672397333},
  {'run_id': '3f9432f4-1db7-4d79-b7f2-ff6efb752b2e',
   'seq': 3,
   'ts_ms': 1769672397333},
  {'run_id': '3f9432f4-1db7-4d79-b7f2-ff6efb752b2e',
   'seq': 4,
   'ts_ms': 1769672397333}],
 {'run_id': '3f9432f4-1db7-4d79-b7f2-ff6efb752b2e',
  'seq': 19,
  'ts_ms': 1769672397334})

In [None]:
print(f"cluster_id: {cluster_id}")
print(f"brokers: {[(bid, host, port) for bid, host, port in brokers]}")
print(f"topic: {TOPIC}")
print(f"produced: {PRODUCE_MESSAGES}")
print(f"consumed (filtered by run_id): {len(received)}")
print("last seq:", received[-1].get("seq") if received else None)

cluster_id: TAt6dvzRT1SGJD5zqdCI-A
brokers: [(1, 'localhost', 9092), (2, 'localhost', 9094), (3, 'localhost', 9096)]
topic: hy-smoketest
produced: 20
consumed (filtered by run_id): 20
last seq: 19
