From d41006325b8ad5c64538492c1d0707273cf81ead Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Wed, 29 Jun 2022 14:05:55 +0200 Subject: [PATCH 1/2] generic and specific avro examples moved protobuf to a similiar folder --- examples/Makefile | 6 +- examples/avro-cli.py | 16 ++---- examples/avro/user_generic.avsc | 18 ++++++ examples/avro/user_specific.avsc | 19 ++++++ examples/avro_consumer.py | 28 ++++----- examples/avro_producer.py | 25 ++++---- examples/json_consumer.py | 4 +- examples/protobuf/__init__.py | 0 examples/{ => protobuf}/user.proto | 0 examples/protobuf/user_pb2.py | 25 ++++++++ examples/protobuf_consumer.py | 4 +- examples/protobuf_producer.py | 6 +- examples/user_pb2.py | 83 --------------------------- src/confluent_kafka/admin/__init__.py | 24 +++++--- 14 files changed, 119 insertions(+), 139 deletions(-) create mode 100644 examples/avro/user_generic.avsc create mode 100644 examples/avro/user_specific.avsc create mode 100644 examples/protobuf/__init__.py rename examples/{ => protobuf}/user.proto (100%) create mode 100644 examples/protobuf/user_pb2.py delete mode 100644 examples/user_pb2.py diff --git a/examples/Makefile b/examples/Makefile index 8d4b20469..6a7023826 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -1,5 +1,5 @@ -user_pb2.py: user.proto - protoc -I=. --python_out=. ./user.proto; +user_pb2.py: protobuf/user.proto + cd protobuf && protoc -I=. --python_out=. ./user.proto; clean: - rm -f $(TARGET_DIR)/*_pb2.py + rm -f $(TARGET_DIR)/protobuf/*_pb2.py diff --git a/examples/avro-cli.py b/examples/avro-cli.py index 46e0293d8..3a79b2ce4 100755 --- a/examples/avro-cli.py +++ b/examples/avro-cli.py @@ -16,6 +16,7 @@ # import argparse +import os from uuid import uuid4 from six.moves import input @@ -23,18 +24,9 @@ from confluent_kafka import avro # Parse Schema used for serializing User class -record_schema = avro.loads(""" - { - "namespace": "confluent.io.examples.serialization.avro", - "name": "User", - "type": "record", - "fields": [ - {"name": "name", "type": "string"}, - {"name": "favorite_number", "type": "int"}, - {"name": "favorite_color", "type": "string"} - ] - } -""") +path = os.path.realpath(os.path.dirname(__file__)) +with open(f"{path}/avro/user_specific.avsc") as f: + record_schema = avro.loads(f.read()) class User(object): diff --git a/examples/avro/user_generic.avsc b/examples/avro/user_generic.avsc new file mode 100644 index 000000000..f7584dbe7 --- /dev/null +++ b/examples/avro/user_generic.avsc @@ -0,0 +1,18 @@ +{ + "name": "User", + "type": "record", + "fields": [ + { + "name": "name", + "type": "string" + }, + { + "name": "favorite_number", + "type": "long" + }, + { + "name": "favorite_color", + "type": "string" + } + ] +} \ No newline at end of file diff --git a/examples/avro/user_specific.avsc b/examples/avro/user_specific.avsc new file mode 100644 index 000000000..9deb32c77 --- /dev/null +++ b/examples/avro/user_specific.avsc @@ -0,0 +1,19 @@ +{ + "namespace": "confluent.io.examples.serialization.avro", + "name": "User", + "type": "record", + "fields": [ + { + "name": "name", + "type": "string" + }, + { + "name": "favorite_number", + "type": "long" + }, + { + "name": "favorite_color", + "type": "string" + } + ] +} \ No newline at end of file diff --git a/examples/avro_consumer.py b/examples/avro_consumer.py index bf812e788..1ddf8e208 100644 --- a/examples/avro_consumer.py +++ b/examples/avro_consumer.py @@ -20,6 +20,7 @@ # This is a simple example of the SerializingProducer using Avro. # import argparse +import os from confluent_kafka import DeserializingConsumer from confluent_kafka.schema_registry import SchemaRegistryClient @@ -66,19 +67,16 @@ def dict_to_user(obj, ctx): def main(args): topic = args.topic + is_specific = args.specific == "true" - schema_str = """ - { - "namespace": "confluent.io.examples.serialization.avro", - "name": "User", - "type": "record", - "fields": [ - {"name": "name", "type": "string"}, - {"name": "favorite_number", "type": "int"}, - {"name": "favorite_color", "type": "string"} - ] - } - """ + if is_specific: + schema = "user_specific.avsc" + else: + schema = "user_generic.avsc" + + path = os.path.realpath(os.path.dirname(__file__)) + with open(f"{path}/avro/{schema}") as f: + schema_str = f.read() sr_conf = {'url': args.schema_registry} schema_registry_client = SchemaRegistryClient(sr_conf) @@ -110,8 +108,8 @@ def main(args): "\tfavorite_number: {}\n" "\tfavorite_color: {}\n" .format(msg.key(), user.name, - user.favorite_color, - user.favorite_number)) + user.favorite_number, + user.favorite_color)) except KeyboardInterrupt: break @@ -129,5 +127,7 @@ def main(args): help="Topic name") parser.add_argument('-g', dest="group", default="example_serde_avro", help="Consumer group") + parser.add_argument('-p', dest="specific", default="true", + help="Avro specific record") main(parser.parse_args()) diff --git a/examples/avro_producer.py b/examples/avro_producer.py index 6128d500a..423b08cb0 100644 --- a/examples/avro_producer.py +++ b/examples/avro_producer.py @@ -20,6 +20,7 @@ # This is a simple example of the SerializingProducer using Avro. # import argparse +import os from uuid import uuid4 from six.moves import input @@ -99,19 +100,17 @@ def delivery_report(err, msg): def main(args): topic = args.topic + is_specific = args.specific == "true" + + if is_specific: + schema = "user_specific.avsc" + else: + schema = "user_generic.avsc" + + path = os.path.realpath(os.path.dirname(__file__)) + with open(f"{path}/avro/{schema}") as f: + schema_str = f.read() - schema_str = """ - { - "namespace": "confluent.io.examples.serialization.avro", - "name": "User", - "type": "record", - "fields": [ - {"name": "name", "type": "string"}, - {"name": "favorite_number", "type": "int"}, - {"name": "favorite_color", "type": "string"} - ] - } - """ schema_registry_conf = {'url': args.schema_registry} schema_registry_client = SchemaRegistryClient(schema_registry_conf) @@ -158,5 +157,7 @@ def main(args): help="Schema Registry (http(s)://host[:port]") parser.add_argument('-t', dest="topic", default="example_serde_avro", help="Topic name") + parser.add_argument('-p', dest="specific", default="true", + help="Avro specific record") main(parser.parse_args()) diff --git a/examples/json_consumer.py b/examples/json_consumer.py index 0e5573e04..5bf13de2c 100644 --- a/examples/json_consumer.py +++ b/examples/json_consumer.py @@ -116,8 +116,8 @@ def main(args): "\tfavorite_number: {}\n" "\tfavorite_color: {}\n" .format(msg.key(), user.name, - user.favorite_color, - user.favorite_number)) + user.favorite_number, + user.favorite_color)) except KeyboardInterrupt: break diff --git a/examples/protobuf/__init__.py b/examples/protobuf/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/user.proto b/examples/protobuf/user.proto similarity index 100% rename from examples/user.proto rename to examples/protobuf/user.proto diff --git a/examples/protobuf/user_pb2.py b/examples/protobuf/user_pb2.py new file mode 100644 index 000000000..138f5212c --- /dev/null +++ b/examples/protobuf/user_pb2.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: user.proto +"""Generated protocol buffer code.""" +from google.protobuf.internal import builder as _builder +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\nuser.proto\"E\n\x04User\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x17\n\x0f\x66\x61vorite_number\x18\x02 \x01(\x03\x12\x16\n\x0e\x66\x61vorite_color\x18\x03 \x01(\tb\x06proto3') + +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'user_pb2', globals()) +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + _USER._serialized_start=14 + _USER._serialized_end=83 +# @@protoc_insertion_point(module_scope) diff --git a/examples/protobuf_consumer.py b/examples/protobuf_consumer.py index 5eee52b4a..c32463499 100644 --- a/examples/protobuf_consumer.py +++ b/examples/protobuf_consumer.py @@ -31,8 +31,8 @@ # import argparse -# Protobuf generated class; resides at ./user_pb2.py -import user_pb2 +# Protobuf generated class; resides at ./protobuf/user_pb2.py +import protobuf.user_pb2 as user_pb2 from confluent_kafka import DeserializingConsumer from confluent_kafka.schema_registry.protobuf import ProtobufDeserializer from confluent_kafka.serialization import StringDeserializer diff --git a/examples/protobuf_producer.py b/examples/protobuf_producer.py index 785a3ddce..8d2de74e4 100644 --- a/examples/protobuf_producer.py +++ b/examples/protobuf_producer.py @@ -34,8 +34,8 @@ from six.moves import input -# Protobuf generated class; resides at ./user_pb2.py -import user_pb2 +# Protobuf generated class; resides at ./protobuf/user_pb2.py +import protobuf.user_pb2 as user_pb2 from confluent_kafka import SerializingProducer from confluent_kafka.serialization import StringSerializer from confluent_kafka.schema_registry import SchemaRegistryClient @@ -75,7 +75,7 @@ def main(args): protobuf_serializer = ProtobufSerializer(user_pb2.User, schema_registry_client, - {'use.deprecated.format': True}) + {'use.deprecated.format': False}) producer_conf = {'bootstrap.servers': args.bootstrap_servers, 'key.serializer': StringSerializer('utf_8'), diff --git a/examples/user_pb2.py b/examples/user_pb2.py deleted file mode 100644 index 3c1a2f16b..000000000 --- a/examples/user_pb2.py +++ /dev/null @@ -1,83 +0,0 @@ -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: user.proto - -import sys -_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) -from google.protobuf import descriptor as _descriptor -from google.protobuf import message as _message -from google.protobuf import reflection as _reflection -from google.protobuf import symbol_database as _symbol_database -from google.protobuf import descriptor_pb2 -# @@protoc_insertion_point(imports) - -_sym_db = _symbol_database.Default() - - - - -DESCRIPTOR = _descriptor.FileDescriptor( - name='user.proto', - package='', - syntax='proto3', - serialized_pb=_b('\n\nuser.proto\"E\n\x04User\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x17\n\x0f\x66\x61vorite_number\x18\x02 \x01(\x03\x12\x16\n\x0e\x66\x61vorite_color\x18\x03 \x01(\tb\x06proto3') -) - - - - -_USER = _descriptor.Descriptor( - name='User', - full_name='User', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='name', full_name='User.name', index=0, - number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None, file=DESCRIPTOR), - _descriptor.FieldDescriptor( - name='favorite_number', full_name='User.favorite_number', index=1, - number=2, type=3, cpp_type=2, label=1, - has_default_value=False, default_value=0, - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None, file=DESCRIPTOR), - _descriptor.FieldDescriptor( - name='favorite_color', full_name='User.favorite_color', index=2, - number=3, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=_b("").decode('utf-8'), - message_type=None, enum_type=None, containing_type=None, - is_extension=False, extension_scope=None, - options=None, file=DESCRIPTOR), - ], - extensions=[ - ], - nested_types=[], - enum_types=[ - ], - options=None, - is_extendable=False, - syntax='proto3', - extension_ranges=[], - oneofs=[ - ], - serialized_start=14, - serialized_end=83, -) - -DESCRIPTOR.message_types_by_name['User'] = _USER -_sym_db.RegisterFileDescriptor(DESCRIPTOR) - -User = _reflection.GeneratedProtocolMessageType('User', (_message.Message,), dict( - DESCRIPTOR = _USER, - __module__ = 'user_pb2' - # @@protoc_insertion_point(class_scope:User) - )) -_sym_db.RegisterMessage(User) - - -# @@protoc_insertion_point(module_scope) diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index 7f99549b8..ef36a7f1b 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -402,10 +402,14 @@ def describe_acls(self, acl_binding_filter, **kwargs): must match. String attributes match exact values or any string if set to None. Enums attributes match exact values or any value if equal to `ANY`. - If :class:`ResourcePatternType` is set to :attr:`ResourcePatternType.MATCH` returns ACL bindings with: - :attr:`ResourcePatternType.LITERAL` pattern type with resource name equal to the given resource name; - :attr:`ResourcePatternType.LITERAL` pattern type with wildcard resource name that matches the given resource name; - :attr:`ResourcePatternType.PREFIXED` pattern type with resource name that is a prefix of the given resource name + If :class:`ResourcePatternType` is set to :attr:`ResourcePatternType.MATCH` + returns ACL bindings with: + :attr:`ResourcePatternType.LITERAL` pattern type with resource name equal + to the given resource name; + :attr:`ResourcePatternType.LITERAL` pattern type with wildcard resource name + that matches the given resource name; + :attr:`ResourcePatternType.PREFIXED` pattern type with resource name + that is a prefix of the given resource name :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: `socket.timeout.ms*1000.0` @@ -433,10 +437,14 @@ def delete_acls(self, acl_binding_filters, **kwargs): to match ACLs to delete. String attributes match exact values or any string if set to None. Enums attributes match exact values or any value if equal to `ANY`. - If :class:`ResourcePatternType` is set to :attr:`ResourcePatternType.MATCH` deletes ACL bindings with: - :attr:`ResourcePatternType.LITERAL` pattern type with resource name equal to the given resource name; - :attr:`ResourcePatternType.LITERAL` pattern type with wildcard resource name that matches the given resource name; - :attr:`ResourcePatternType.PREFIXED` pattern type with resource name that is a prefix of the given resource name + If :class:`ResourcePatternType` is set to :attr:`ResourcePatternType.MATCH` + deletes ACL bindings with: + :attr:`ResourcePatternType.LITERAL` pattern type with resource name + equal to the given resource name; + :attr:`ResourcePatternType.LITERAL` pattern type with wildcard resource name + that matches the given resource name; + :attr:`ResourcePatternType.PREFIXED` pattern type with resource name + that is a prefix of the given resource name :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: `socket.timeout.ms*1000.0` From 269a420e3373cc182ee585f5181e9405353b6632 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Fri, 1 Jul 2022 10:27:30 +0200 Subject: [PATCH 2/2] removed empty __init__.py --- examples/protobuf/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 examples/protobuf/__init__.py diff --git a/examples/protobuf/__init__.py b/examples/protobuf/__init__.py deleted file mode 100644 index e69de29bb..000000000