# Consumer Messages in AVRO

In [None]:
import env
# Import required libraries
from confluent_kafka import DeserializingConsumer
from confluent_kafka.serialization import SerializationContext, MessageField, StringSerializer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer


# Configure the Kafka consumer
conf = {
    "bootstrap.servers": env.KAFKA_SERVICE_URI,
    "security.protocol": "SSL",
    "ssl.ca.location": env.CERTIFICATES_FOLDER + "/ca.pem",
    "ssl.certificate.location": env.CERTIFICATES_FOLDER + "/service.cert",
    "ssl.key.location": env.CERTIFICATES_FOLDER + "/service.key",
    'group.id': 'demo-consumer-group',
    'auto.offset.reset': 'earliest'
}

# Configure the Avro schema registry
schema_registry_conf = {
    "url": "https://{}:{}@{}:{}".format(env.SCHEMA_REGISTRY_USERNAME, env.SCHEMA_REGISTRY_PASSWORD, env.SCHEMA_REGISTRY_HOSTNAME, env.SCHEMA_REGISTRY_PORT)
}

schema_registry_client = SchemaRegistryClient(schema_registry_conf)

# Define the Avro schema for the message
value_str = """
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
    {"name": "name", "type": "string"},
    {"name": "age",  "type": ["int", "null"]}
]
}
"""

key_str = """
{
"namespace": "example.avro",
"type": "record",
"name": "Id",
"fields": [
    {"name": "id", "type": "int"}
]
}
"""

# Define a class for the User object
class User(object):
    def __init__(self, name, age):
        self.name = name
        self.age = age

# Define a class for the Key object
class Key(object):
    def __init__(self, id):
        self.id = id

# Converting dict to User object for AvroDeserializer function
def dict_to_user(obj, ctx):
    return User(name=obj['name'],
                age=obj['age']
    )

# Converting dict to Key object for AvroDeserializer function
def dict_to_key(obj, ctx):
    return Key(id=obj["id"])

# Create the Avro deserializer
avro_deserializer_value = AvroDeserializer(
    schema_registry_client, value_str, dict_to_user
)

avro_deserializer_key = AvroDeserializer(
    schema_registry_client, key_str, dict_to_key
)

# Create the Kafka consumer
consumer = DeserializingConsumer(
    conf
)

# Subscribe to the Kafka topic
consumer.subscribe(['demo-topic-avro'])

# Consume messages from the Kafka topic
while True:
    try:
        msg = consumer.poll(1.0)

        if msg is None:
            continue

        if msg.error():
            print("Consumer error: {}".format(msg.error()))
            continue

        user = avro_deserializer_value(
            msg.value(), SerializationContext(msg.topic(), MessageField.VALUE)
        )
        key = avro_deserializer_key(
            msg.key(), SerializationContext(msg.topic(), MessageField.KEY)
        )

        if user is not None and key is not None:
            print(
                "Key --> {}\n"
                "User record --> name: {},"
                " age: {}\n".format(key.id, user.name, user.age)
            )

    except KeyboardInterrupt:
            break

    # Commit the offset for the message
    consumer.commit()