In [31]:
from uuid import uuid4

from confluent_kafka import Producer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer

In [20]:
def get_schema_from_schema_registry(schema_registry_url, schema_registry_subject):
    sr = SchemaRegistryClient({'url': schema_registry_url})
    latest_version = sr.get_latest_version(schema_registry_subject)

    return sr, latest_version

In [36]:
bootstrap_servers = "kafka-1:19092,kafka-2:19093"
topic = "test-avro-topic"
schema_registry_subject = topic + "-value"

In [18]:
schema_registry_url = 'http://schema-registry-1:8081'

schema_registry_conf = {'url': schema_registry_url}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

In [4]:
schema_str = """
{
   "namespace": "my.test",
   "name": "Person",
   "type": "record",
   "fields" : [
     {
       "name" : "id",
       "type" : "string"
     },
     {
       "name" : "firstName",
       "type" : "string"
     },
     {
       "name" : "lastName",
       "type" : "string"
     }
   ]
}
"""

In [23]:
sr, latest_version = get_schema_from_schema_registry(schema_registry_url, schema_registry_subject)
schema_str = latest_version.schema.schema_str

In [24]:
def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

In [25]:
person = {"id":"1001", "firstName": "Peter", "lastName": "Muster"}

value_avro_serializer = AvroSerializer(schema_registry_client,
                                         schema_str,                                        
                                )

string_serializer = StringSerializer('utf_8')

producer_conf = {'bootstrap.servers': bootstrap_servers}

producer = Producer(producer_conf)


In [15]:
# Serve on_delivery callbacks from previous calls to produce()
producer.poll(0.0)

0

In [50]:
producer.produce(topic=topic,
                key=string_serializer(str(uuid4())),
                value=value_avro_serializer(person, SerializationContext(topic, MessageField.VALUE)),
                on_delivery=delivery_report)

In [51]:
producer.flush()

Message delivered to test-avro-topic [0]


0