In [17]:
from confluent_kafka import Consumer, KafkaException, KafkaError
import avro.schema
import avro.io
import io
import sys

In [18]:
cfg = {
    "bootstrap.servers": "kafka:9092",
    "group.id": "group-t1",
    "default.topic.config": {"auto.offset.reset": "earliest"}}

In [19]:
consumer = Consumer(**cfg)
consumer.subscribe(['t1'])

In [20]:
def load_scheme(scheme_file):
    return avro.schema.Parse(open(scheme_file).read())

In [21]:
scheme = load_scheme("iris-scheme.avsc")
scheme

<avro.schema.RecordSchema at 0x7ff314670190>

In [None]:
try:
    running = True
    while running:
        msg = consumer.poll(timeout=60000)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                sys.stderr.write("Reached end.")
            elif msg.error():
                raise KafkaException(msg.error())
        message = msg.value()
        bytes_reader = io.BytesIO(message)
        deserialzer = avro.io.BinaryDecoder(bytes_reader)
        reader = avro.io.DatumReader(scheme)
        try:
            decoded = reader.read(deserialzer)
            print(decoded)
            sys.stdout.flush()
        except AssertionError:
            continue
except KeyboardInterrupt:
    sys.stderr.write("Keyboard interrupt...")

{'iris_type': '"Setosa"', 'sepal_length': 5.099999904632568, 'sepal_width': 3.5, 'petal_length': 1.399999976158142, 'petal_width': 0.20000000298023224}
{'iris_type': '"Setosa"', 'sepal_length': 4.900000095367432, 'sepal_width': 3.0, 'petal_length': 1.399999976158142, 'petal_width': 0.20000000298023224}
{'iris_type': '"Setosa"', 'sepal_length': 4.699999809265137, 'sepal_width': 3.200000047683716, 'petal_length': 1.2999999523162842, 'petal_width': 0.20000000298023224}
{'iris_type': '"Setosa"', 'sepal_length': 4.599999904632568, 'sepal_width': 3.0999999046325684, 'petal_length': 1.5, 'petal_width': 0.20000000298023224}
{'iris_type': '"Setosa"', 'sepal_length': 5.0, 'sepal_width': 3.5999999046325684, 'petal_length': 1.399999976158142, 'petal_width': 0.20000000298023224}
{'iris_type': '"Setosa"', 'sepal_length': 5.400000095367432, 'sepal_width': 3.9000000953674316, 'petal_length': 1.7000000476837158, 'petal_width': 0.4000000059604645}
{'iris_type': '"Setosa"', 'sepal_length': 4.599999904632

{'iris_type': '"Versicolor"', 'sepal_length': 5.5, 'sepal_width': 2.299999952316284, 'petal_length': 4.0, 'petal_width': 1.2999999523162842}
{'iris_type': '"Versicolor"', 'sepal_length': 6.5, 'sepal_width': 2.799999952316284, 'petal_length': 4.599999904632568, 'petal_width': 1.5}
{'iris_type': '"Versicolor"', 'sepal_length': 5.699999809265137, 'sepal_width': 2.799999952316284, 'petal_length': 4.5, 'petal_width': 1.2999999523162842}
{'iris_type': '"Versicolor"', 'sepal_length': 6.300000190734863, 'sepal_width': 3.299999952316284, 'petal_length': 4.699999809265137, 'petal_width': 1.600000023841858}
{'iris_type': '"Versicolor"', 'sepal_length': 4.900000095367432, 'sepal_width': 2.4000000953674316, 'petal_length': 3.299999952316284, 'petal_width': 1.0}
{'iris_type': '"Versicolor"', 'sepal_length': 6.599999904632568, 'sepal_width': 2.9000000953674316, 'petal_length': 4.599999904632568, 'petal_width': 1.2999999523162842}
{'iris_type': '"Versicolor"', 'sepal_length': 5.199999809265137, 'sepal

{'iris_type': '"Virginica"', 'sepal_length': 7.300000190734863, 'sepal_width': 2.9000000953674316, 'petal_length': 6.300000190734863, 'petal_width': 1.7999999523162842}
{'iris_type': '"Virginica"', 'sepal_length': 6.699999809265137, 'sepal_width': 2.5, 'petal_length': 5.800000190734863, 'petal_width': 1.7999999523162842}
{'iris_type': '"Virginica"', 'sepal_length': 7.199999809265137, 'sepal_width': 3.5999999046325684, 'petal_length': 6.099999904632568, 'petal_width': 2.5}
{'iris_type': '"Virginica"', 'sepal_length': 6.5, 'sepal_width': 3.200000047683716, 'petal_length': 5.099999904632568, 'petal_width': 2.0}


% t1 [0] at offset 300 with key None:
% t1 [0] at offset 301 with key None:
% t1 [0] at offset 302 with key None:
% t1 [0] at offset 303 with key None:
% t1 [0] at offset 304 with key None:
% t1 [0] at offset 305 with key None:
% t1 [0] at offset 306 with key None:
% t1 [0] at offset 307 with key None:
% t1 [0] at offset 308 with key None:
% t1 [0] at offset 309 with key None:
% t1 [0] at offset 310 with key None:
% t1 [0] at offset 311 with key None:
% t1 [0] at offset 312 with key None:
% t1 [0] at offset 313 with key None:
% t1 [0] at offset 314 with key None:
% t1 [0] at offset 315 with key None:
% t1 [0] at offset 316 with key None:
% t1 [0] at offset 317 with key None:
% t1 [0] at offset 318 with key None:
% t1 [0] at offset 319 with key None:
% t1 [0] at offset 320 with key None:
% t1 [0] at offset 321 with key None:
% t1 [0] at offset 322 with key None:
% t1 [0] at offset 323 with key None:
% t1 [0] at offset 324 with key None:
% t1 [0] at offset 325 with key None:
% t1 [0] at 

{'iris_type': '"Virginica"', 'sepal_length': 6.400000095367432, 'sepal_width': 2.700000047683716, 'petal_length': 5.300000190734863, 'petal_width': 1.899999976158142}
{'iris_type': '"Virginica"', 'sepal_length': 6.800000190734863, 'sepal_width': 3.0, 'petal_length': 5.5, 'petal_width': 2.0999999046325684}
{'iris_type': '"Virginica"', 'sepal_length': 5.699999809265137, 'sepal_width': 2.5, 'petal_length': 5.0, 'petal_width': 2.0}
{'iris_type': '"Virginica"', 'sepal_length': 5.800000190734863, 'sepal_width': 2.799999952316284, 'petal_length': 5.099999904632568, 'petal_width': 2.4000000953674316}
{'iris_type': '"Virginica"', 'sepal_length': 6.400000095367432, 'sepal_width': 3.200000047683716, 'petal_length': 5.300000190734863, 'petal_width': 2.299999952316284}
{'iris_type': '"Virginica"', 'sepal_length': 6.5, 'sepal_width': 3.0, 'petal_length': 5.5, 'petal_width': 1.7999999523162842}
{'iris_type': '"Virginica"', 'sepal_length': 7.699999809265137, 'sepal_width': 3.799999952316284, 'petal_le

% t1 [0] at offset 411 with key None:
% t1 [0] at offset 412 with key None:
% t1 [0] at offset 413 with key None:
% t1 [0] at offset 414 with key None:
% t1 [0] at offset 415 with key None:
% t1 [0] at offset 416 with key None:
% t1 [0] at offset 417 with key None:
% t1 [0] at offset 418 with key None:
% t1 [0] at offset 419 with key None:
% t1 [0] at offset 420 with key None:
% t1 [0] at offset 421 with key None:
% t1 [0] at offset 422 with key None:
% t1 [0] at offset 423 with key None:
% t1 [0] at offset 424 with key None:
% t1 [0] at offset 425 with key None:
% t1 [0] at offset 426 with key None:
% t1 [0] at offset 427 with key None:
% t1 [0] at offset 428 with key None:
% t1 [0] at offset 429 with key None:
% t1 [0] at offset 430 with key None:
% t1 [0] at offset 431 with key None:
% t1 [0] at offset 432 with key None:
% t1 [0] at offset 433 with key None:
% t1 [0] at offset 434 with key None:
% t1 [0] at offset 435 with key None:
% t1 [0] at offset 436 with key None:
% t1 [0] at 