<div style="text-align: center; line-height: 0; padding-top: 2px;">
  <img src="https://www.quantiaconsulting.com/logos/quantia_logo_orizz.png" alt="Quantia Consulting" style="width: 600px; height: 250px">
</div>

# Python Kafka Avro Consumer 

**Technical Accomplishments:**
- Start working with avro schema in Kafka
- Introduce the class `AvroConsumer`
- Consume data from Kafka avro topic

## Getting Started

Let's start importing libraries and creating useful variables 

In [None]:
%load_ext autotime

In [1]:
from confluent_kafka import avro
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
import json
#import qcutils

topic = 'avro-producer'
consumer_group = 'avro-producerCG'

servers="broker:29092"

sr_url="http://schema-registry:8081"

**Note**: in order to avoid conflicts during read operation, please name:
* the topic as `<surname>-topic`
* the consumer group as follow `<surname>-cg`

## Avro Consumer

In [2]:
consumerconf = {
        'bootstrap.servers': servers,
        'schema.registry.url': sr_url,
        'group.id': consumer_group,
        'auto.offset.reset': 'earliest'
    }

c = AvroConsumer(consumerconf)

# Subscribe to topic
c.subscribe([topic])

waiting = False

try:
    while True:
        msg = c.poll(1.0)
        if msg is None:
            if waiting:
                print(".",end =" ")
            else:
                print("Waiting",end =" ")
                waiting = True
            continue
        elif msg.error():
            print('error: {}'.format(msg.error()))
            waiting = False
        else:
            print("\nConsumed record with key {} and value {}".format(msg.key(), msg.value()))
            waiting = False
except SerializerError as e:
    print("Message deserialization failed {}".format(e))
    pass
except KeyboardInterrupt:
    pass
finally:
    c.close()


Consumed record with key {'name': 'Abe'} and value {'age': 0}

Consumed record with key {'name': 'Abe'} and value {'age': 1}

Consumed record with key {'name': 'Abe'} and value {'age': 2}

Consumed record with key {'name': 'Abe'} and value {'age': 3}

Consumed record with key {'name': 'Abe'} and value {'age': 4}

Consumed record with key {'name': 'Abe'} and value {'age': 5}

Consumed record with key {'name': 'Abe'} and value {'age': 6}

Consumed record with key {'name': 'Abe'} and value {'age': 7}

Consumed record with key {'name': 'Abe'} and value {'age': 8}

Consumed record with key {'name': 'Abe'} and value {'age': 9}

Consumed record with key {'name': 'Abe'} and value {'age': 10}

Consumed record with key {'name': 'Abe'} and value {'age': 11}

Consumed record with key {'name': 'Abe'} and value {'age': 12}

Consumed record with key {'name': 'Abe'} and value {'age': 13}

Consumed record with key {'name': 'Abe'} and value {'age': 14}

Consumed record with key {'name': 'Abe'} and valu

**Note** In order to add SASL security for the connection to the kafka broker, you need to add security configurations.

```
username=qcutils.read_config_value("kafka.access.key")
password=qcutils.read_config_value("kafka.access.secret")

sr_user_info=qcutils.read_config_value("kafka.schema_registry.key") + ":" + qcutils.read_config_value("kafka.schema_registry.secret")
    
consumerconf = {
        'bootstrap.servers': <servers>,
        'sasl.mechanisms': 'PLAIN',
        'security.protocol': 'SASL_SSL',
        'sasl.username': <username>,
        'sasl.password': <password>,
        'schema.registry.url': sr_url,
        'schema.registry.basic.auth.credentials.source': 'USER_INFO',
        'schema.registry.basic.auth.user.info': <login-info>
        'group.id': <consumer-group>,
        'auto.offset.reset': <....>
    }

```

##### ![Quantia Tiny Logo](https://www.quantiaconsulting.com/logos/quantia_logo_tiny.png) 2020 Quantia Consulting, srl. All rights reserved.