In [1]:
from kafka import KafkaProducer, KafkaConsumer
import json

In [2]:
def test_kafka_producer_consumer(kafka_broker='localhost:9092', topic='test-topic'):
    """
    Tests sending and receiving a message with Kafka using a producer and consumer
    in the same Python script.

    Args:
        kafka_broker (str, optional): The Kafka broker address. Defaults to 'localhost:9092'.
        topic (str, optional): The Kafka topic to use. Defaults to 'test-topic'.
    """
    # 1. Producer
    producer = KafkaProducer(
        bootstrap_servers=kafka_broker,
        value_serializer=lambda v: json.dumps(v).encode('utf-8')  # Serialize to JSON
    )

    # 2. Consumer
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=kafka_broker,
        auto_offset_reset='earliest',  # Start from the beginning
        enable_auto_commit=True,
        group_id='test-group'  # Important for consumer groups
    )

    try:
        # Send a test message
        test_message = {'key': 'test_key', 'value': 'test_value'}
        print(f"Producer: Sending message: {test_message}")
        producer.send(topic, test_message).get()  # .get() blocks until sent
        print("Producer: Message sent successfully")

        # Consume the message
        print("Consumer: Waiting for message...")
        for message in consumer:
            received_message = message.value
            print(f"Consumer: Received message: {received_message}")
            break  # Exit loop after receiving one message

        # Check if the received message matches the sent message
        if received_message == test_message:
            print("Test Passed: Message sent and received correctly!")
            return True
        else:
            print("Test Failed: Message mismatch!")
            print(f"  Sent:     {test_message}")
            print(f"  Received: {received_message}")
            return False

    except Exception as e:
        print(f"Error during test: {e}")
        return False

    finally:
        producer.close()
        consumer.close()

In [4]:
if __name__ == "__main__":
    # You can change the broker and topic if needed
    kafka_address = 'localhost:9092'  # Or your Kafka broker address
    test_topic_name = 'my-test-topic'
    test_result = test_kafka_producer_consumer(kafka_address, test_topic_name)

    if test_result:
        print(f"Kafka Producer/Consumer test on topic '{test_topic_name}' PASSED.")
    else:
        print(f"Kafka Producer/Consumer test on topic '{test_topic_name}' FAILED.")

NoBrokersAvailable: NoBrokersAvailable