From b5f51e420db05edf775f48c5fee7df297cd35139 Mon Sep 17 00:00:00 2001 From: Max Zheng Date: Fri, 23 Mar 2018 14:46:45 -0700 Subject: [PATCH 1/3] Restructure code to be more Pythonic and adhere to PEP8 style So it looks good and then we look good. :) Otherwise, it's more readable and promotes good practice as many will copy/paste. --- README.md | 70 +++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 50 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index 6078790d7..7d40c5894 100644 --- a/README.md +++ b/README.md @@ -39,9 +39,12 @@ Usage ```python from confluent_kafka import Producer + p = Producer({'bootstrap.servers': 'mybroker,mybroker2'}) + for data in some_data_source: p.produce('mytopic', data.encode('utf-8')) + p.flush() ``` @@ -51,17 +54,29 @@ p.flush() ```python from confluent_kafka import Consumer, KafkaError -c = Consumer({'bootstrap.servers': 'mybroker', 'group.id': 'mygroup', - 'default.topic.config': {'auto.offset.reset': 'smallest'}}) + +c = Consumer({ + 'bootstrap.servers': 'mybroker', + 'group.id': 'mygroup', + 'default.topic.config': { + 'auto.offset.reset': 'smallest' + } +}) + c.subscribe(['mytopic']) -running = True -while running: + +while True: msg = c.poll() - if not msg.error(): - print('Received message: %s' % msg.value().decode('utf-8')) - elif msg.error().code() != KafkaError._PARTITION_EOF: - print(msg.error()) - running = False + + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + continue + else: + print(msg.error()) + break + + print('Received message: {}'.format(msg.value().decode('utf-8'))) + c.close() ``` @@ -71,12 +86,17 @@ c.close() from confluent_kafka import avro from confluent_kafka.avro import AvroProducer + value_schema = avro.load('ValueSchema.avsc') key_schema = avro.load('KeySchema.avsc') value = {"name": "Value"} key = {"name": "Key"} -avroProducer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2', 'schema.registry.url': 'http://schem_registry_host:port'}, default_key_schema=key_schema, default_value_schema=value_schema) +avroProducer = AvroProducer({ + 'bootstrap.servers': 'mybroker,mybroker2', + 'schema.registry.url': 'http://schem_registry_host:port' + }, default_key_schema=key_schema, default_value_schema=value_schema) + avroProducer.produce(topic='my_topic', value=value, key=key) avroProducer.flush() ``` @@ -88,21 +108,31 @@ from confluent_kafka import KafkaError from confluent_kafka.avro import AvroConsumer from confluent_kafka.avro.serializer import SerializerError -c = AvroConsumer({'bootstrap.servers': 'mybroker,mybroker2', 'group.id': 'groupid', 'schema.registry.url': 'http://127.0.0.1:8081'}) + +c = AvroConsumer({ + 'bootstrap.servers': 'mybroker,mybroker2', + 'group.id': 'groupid', + 'schema.registry.url': 'http://127.0.0.1:8081'}) + c.subscribe(['my_topic']) -running = True -while running: + +while True: try: msg = c.poll(10) + if msg: - if not msg.error(): - print(msg.value()) - elif msg.error().code() != KafkaError._PARTITION_EOF: - print(msg.error()) - running = False + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + continue + else: + print(msg.error()) + break + + print(msg.value()) + except SerializerError as e: - print("Message deserialization failed for %s: %s" % (msg, e)) - running = False + print("Message deserialization failed for {}: {}".format(msg, e)) + break c.close() ``` From cf40419f575c5195dfeea269113dd42acf6de282 Mon Sep 17 00:00:00 2001 From: Max Zheng Date: Fri, 23 Mar 2018 15:02:53 -0700 Subject: [PATCH 2/3] Move try/except closer to source --- README.md | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 7d40c5894..b82c27b59 100644 --- a/README.md +++ b/README.md @@ -120,20 +120,20 @@ while True: try: msg = c.poll(10) - if msg: - if msg.error(): - if msg.error().code() == KafkaError._PARTITION_EOF: - continue - else: - print(msg.error()) - break - - print(msg.value()) - except SerializerError as e: print("Message deserialization failed for {}: {}".format(msg, e)) break + if msg: + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + continue + else: + print(msg.error()) + break + + print(msg.value()) + c.close() ``` From 9f0c0d23b323c5f428b859ff8e857b67b7e0f835 Mon Sep 17 00:00:00 2001 From: Max Zheng Date: Mon, 26 Mar 2018 10:25:27 -0700 Subject: [PATCH 3/3] Condition on msg is None instead --- README.md | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index b82c27b59..e91968007 100644 --- a/README.md +++ b/README.md @@ -124,15 +124,17 @@ while True: print("Message deserialization failed for {}: {}".format(msg, e)) break - if msg: - if msg.error(): - if msg.error().code() == KafkaError._PARTITION_EOF: - continue - else: - print(msg.error()) - break - - print(msg.value()) + if msg is None: + continue + + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + continue + else: + print(msg.error()) + break + + print(msg.value()) c.close() ```