In [14]:
import quixstreams as qx

REDPANDA_TOPIC = "StreamingApp"


class RedpandaInitializer:
    def __init__(self):
        self.logger = print("RedpandaInitializer")
        self.app = qx.Application(
            broker_address="localhost:9092",
            consumer_group="StreamingAppGroup",
            loglevel="DEBUG",
        )
        self.topic = self.app.topic(REDPANDA_TOPIC, value_deserializer="json")

In [30]:
class RedpandaConsumer(RedpandaInitializer):
    def __init__(self):
        super().__init__()
        self.logger = print("RedpandaConsumer")

    def consume_data(self):
        # Create a Consumer instance
        with self.app.get_consumer() as consumer:
            # List all topics and partitions
            topics = consumer.list_topics().topics
            print("Available Topics and Partitions:")
            for topic, metadata in topics.items():
                print(f"Topic: {topic}")
                partitions = metadata.partitions.keys()
                print(f"Partitions: {list(partitions)}")

            # Check if the specific topic exists
            if self.topic.name not in topics:
                print(f"Error: Topic '{self.topic}' does not exist.")
                return

            print(
                f"Subscribing to topic: {self.topic.name} with partitions: {list(topics[self.topic.name].partitions.keys())}"
            )
            consumer.subscribe([self.topic.name])
            print(consumer.consumer_group_metadata())

            while True:
                message = consumer.poll(timeout=5)
                if message is None:
                    print("Waiting ...")
                elif message.error() is not None:
                    print(f"Error: {message.error()}")
                else:
                    print(
                        f"Received message: Key: {message.key()} \t Value: {message.value()}"
                    )

In [31]:
consumer = RedpandaConsumer()

RedpandaInitializer
RedpandaConsumer


In [32]:
consumer.consume_data()

[2024-11-29 07:49:03,271] [INFO] [quixstreams] : Topics required for this application: "StreamingApp"
[2024-11-29 07:49:03,275] [INFO] [quixstreams] : Validating Kafka topics exist and are configured correctly...
[2024-11-29 07:49:03,281] [INFO] [quixstreams] : Kafka topics validation complete


Available Topics and Partitions:
Topic: _schemas
Partitions: [0]
Topic: chat-room
Partitions: [0]
Topic: my_topic
Partitions: [0]
Topic: StreamingApp
Partitions: [0]
Topic: __consumer_offsets
Partitions: [0, 1, 2]
Subscribing to topic: StreamingApp with partitions: [0]
b'CGMDv2:\xff\xff\xff\xffStreamingAppGroup\x00\x00\x01'
Waiting ...
Waiting ...
Waiting ...
Waiting ...
Waiting ...
Received message: Key: b'1' 	 Value: b'"test"'
Received message: Key: b'1' 	 Value: b'"test"'
Received message: Key: b'1' 	 Value: b'"test"'
Received message: Key: b'1' 	 Value: b'"test"'
Received message: Key: b'1' 	 Value: b'"test"'
Received message: Key: b'1' 	 Value: b'"test"'


KeyboardInterrupt: 