In [4]:
import time
from confluent_kafka import Consumer, KafkaException, KafkaError
from aily_py_commons.io.env_vars import (
    INFRASTRUCTURE_PROD,
    AilySettings,
)
from aily_data_aws.aws.secrets import get_secret

In [5]:
AilySettings(INFRASTRUCTURE_PROD)

secret_name = "aily/infrastructure/dev/msk/msk-controlplane-infrastructure-dev/ailypubsub"
secret = get_secret(secret_name)

# Consumer configuration using AWS secrets
consumer_conf = {
    'bootstrap.servers': secret["bootstrap_public"],
    'group.id': 'my_consumer_group',
    'auto.offset.reset': 'earliest',
    'security.protocol': "SASL_SSL",
    'sasl.mechanism': "SCRAM-SHA-512",
    'sasl.username': secret["username"],
    'sasl.password': secret["password"]
}

consumer = Consumer(**consumer_conf)
consumer.subscribe(['kafka-message-test'])



In [6]:
try:
    last_print_time = time.time()
    while True:
        msg = consumer.poll(timeout=1.0)
        current_time = time.time()
        
        if current_time - last_print_time >= 5:
            print("Waiting")
            last_print_time = current_time

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                print(f'{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}')
            elif msg.error():
                raise KafkaException(msg.error())
        else:
            # Proper message
            print(f'Received message: {msg.value().decode("utf-8")}')
except KeyboardInterrupt:
    pass
finally:
    # Close down consumer to commit final offsets.
    consumer.close()

Waiting
Waiting
Waiting
Received message: Hello, Kafka!
Waiting
