diff --git a/CHANGELOG.md b/CHANGELOG.md index 05e81b2cf..284e3fbe8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ This is a feature release: - OAUTHBEARER OIDC support + - KIP-140 Admin API ACL support ### Fixes diff --git a/docs/index.rst b/docs/index.rst index e74141586..a1a7a9043 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -54,6 +54,15 @@ Supporting classes - Admin API - :ref:`NewTopic ` - :ref:`NewPartitions ` + - :ref:`ConfigSource ` + - :ref:`ConfigEntry ` + - :ref:`ConfigResource ` + - :ref:`ResourceType ` + - :ref:`ResourcePatternType ` + - :ref:`AclOperation ` + - :ref:`AclPermissionType ` + - :ref:`AclBinding ` + - :ref:`AclBindingFilter ` Guide to the :ref:`Transactional Producer API ` @@ -89,6 +98,87 @@ NewPartitions .. autoclass:: confluent_kafka.admin.NewPartitions :members: +.. _pythonclient_config_source: + +************** +ConfigSource +************** + +.. autoclass:: confluent_kafka.admin.ConfigSource + :members: + +.. _pythonclient_config_entry: + +************** +ConfigEntry +************** + +.. autoclass:: confluent_kafka.admin.ConfigEntry + :members: + +.. _pythonclient_config_resource: + +************** +ConfigResource +************** + +.. autoclass:: confluent_kafka.admin.ConfigResource + :members: + +.. _pythonclient_resource_type: + +************** +ResourceType +************** + +.. autoclass:: confluent_kafka.admin.ResourceType + :members: + +.. _pythonclient_resource_pattern_type: + +************** +ResourcePatternType +************** + +.. autoclass:: confluent_kafka.admin.ResourcePatternType + :members: + +.. _pythonclient_acl_operation: + +************** +AclOperation +************** + +.. autoclass:: confluent_kafka.admin.AclOperation + :members: + +.. _pythonclient_acl_permission_type: + +************** +AclPermissionType +************** + +.. autoclass:: confluent_kafka.admin.AclPermissionType + :members: + +.. _pythonclient_acl_binding: + +************** +AclBinding +************** + +.. autoclass:: confluent_kafka.admin.AclBinding + :members: + +.. _pythonclient_acl_binding_filter: + +************** +AclBindingFilter +************** + +.. autoclass:: confluent_kafka.admin.AclBindingFilter + :members: + .. _pythonclient_consumer: ******** diff --git a/examples/adminapi.py b/examples/adminapi.py index 1ac3f744a..657d41c51 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -19,7 +19,9 @@ # Example Admin clients. # -from confluent_kafka.admin import AdminClient, NewTopic, NewPartitions, ConfigResource, ConfigSource +from confluent_kafka.admin import (AdminClient, NewTopic, NewPartitions, ConfigResource, ConfigSource, + AclBinding, AclBindingFilter, ResourceType, ResourcePatternType, + AclOperation, AclPermissionType) from confluent_kafka import KafkaException import sys import threading @@ -28,6 +30,13 @@ logging.basicConfig() +def parse_nullable_string(s): + if s == "None": + return None + else: + return s + + def example_create_topics(a, topics): """ Create topics """ @@ -117,6 +126,133 @@ def example_describe_configs(a, args): raise +def example_create_acls(a, args): + """ create acls """ + + acl_bindings = [ + AclBinding( + ResourceType[restype], + parse_nullable_string(resname), + ResourcePatternType[resource_pattern_type], + parse_nullable_string(principal), + parse_nullable_string(host), + AclOperation[operation], + AclPermissionType[permission_type] + ) + for restype, resname, resource_pattern_type, + principal, host, operation, permission_type + in zip( + args[0::7], + args[1::7], + args[2::7], + args[3::7], + args[4::7], + args[5::7], + args[6::7], + ) + ] + + fs = a.create_acls(acl_bindings, request_timeout=10) + + # Wait for operation to finish. + for res, f in fs.items(): + try: + result = f.result() + if result is None: + print("Created {}".format(res)) + + except KafkaException as e: + print("Failed to create ACL {}: {}".format(res, e)) + except Exception: + raise + + +def example_describe_acls(a, args): + """ describe acls """ + + acl_binding_filters = [ + AclBindingFilter( + ResourceType[restype], + parse_nullable_string(resname), + ResourcePatternType[resource_pattern_type], + parse_nullable_string(principal), + parse_nullable_string(host), + AclOperation[operation], + AclPermissionType[permission_type] + ) + for restype, resname, resource_pattern_type, + principal, host, operation, permission_type + in zip( + args[0::7], + args[1::7], + args[2::7], + args[3::7], + args[4::7], + args[5::7], + args[6::7], + ) + ] + + fs = [ + a.describe_acls(acl_binding_filter, request_timeout=10) + for acl_binding_filter in acl_binding_filters + ] + # Wait for operations to finish. + for acl_binding_filter, f in zip(acl_binding_filters, fs): + try: + print("Acls matching filter: {}".format(acl_binding_filter)) + acl_bindings = f.result() + for acl_binding in acl_bindings: + print(acl_binding) + + except KafkaException as e: + print("Failed to describe {}: {}".format(acl_binding_filter, e)) + except Exception: + raise + + +def example_delete_acls(a, args): + """ delete acls """ + + acl_binding_filters = [ + AclBindingFilter( + ResourceType[restype], + parse_nullable_string(resname), + ResourcePatternType[resource_pattern_type], + parse_nullable_string(principal), + parse_nullable_string(host), + AclOperation[operation], + AclPermissionType[permission_type] + ) + for restype, resname, resource_pattern_type, + principal, host, operation, permission_type + in zip( + args[0::7], + args[1::7], + args[2::7], + args[3::7], + args[4::7], + args[5::7], + args[6::7], + ) + ] + + fs = a.delete_acls(acl_binding_filters, request_timeout=10) + + # Wait for operation to finish. + for res, f in fs.items(): + try: + acl_bindings = f.result() + print("Deleted acls matching filter: {}".format(res)) + for acl_binding in acl_bindings: + print(" ", acl_binding) + + except KafkaException as e: + print("Failed to delete {}: {}".format(res, e)) + except Exception: + raise + + def example_alter_configs(a, args): """ Alter configs atomically, replacing non-specified configuration properties with their default values. @@ -300,6 +436,12 @@ def example_list(a, args): ' ..\n') sys.stderr.write(' delta_alter_configs ' + ' ..\n') + sys.stderr.write(' create_acls ' + + ' ..\n') + sys.stderr.write(' describe_acls ' + + ' ..\n') + sys.stderr.write(' delete_acls ' + + ' ..\n') sys.stderr.write(' list []\n') sys.exit(1) @@ -316,6 +458,9 @@ def example_list(a, args): 'describe_configs': example_describe_configs, 'alter_configs': example_alter_configs, 'delta_alter_configs': example_delta_alter_configs, + 'create_acls': example_create_acls, + 'describe_acls': example_describe_acls, + 'delete_acls': example_delete_acls, 'list': example_list} if operation not in opsmap: diff --git a/setup.py b/setup.py index 3e16c9fb0..eadf11646 100755 --- a/setup.py +++ b/setup.py @@ -27,8 +27,7 @@ AVRO_REQUIRES = ['fastavro>=0.23.0,<1.0;python_version<"3.0"', 'fastavro>=1.0;python_version>"3.0"', - 'avro==1.10.0;python_version<"3.0"', - 'avro-python3==1.10.0;python_version>"3.0"' + 'avro==1.10.0' ] + SCHEMA_REGISTRY_REQUIRES JSON_REQUIRES = ['pyrsistent==0.16.1;python_version<"3.0"', diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index 53d7d0559..fa2b2c30e 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -1,7 +1,34 @@ +# Copyright 2022 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + """ Kafka admin client: create, view, alter, and delete topics and resources. """ -from ..cimpl import (KafkaException, # noqa +import concurrent.futures + +# Unused imports are keeped to be accessible using this public module +from ._config import (ConfigSource, # noqa: F401 + ConfigEntry, + ConfigResource) +from ._resource import (ResourceType, # noqa: F401 + ResourcePatternType) +from ._acl import (AclOperation, # noqa: F401 + AclPermissionType, + AclBinding, + AclBindingFilter) +from ..cimpl import (KafkaException, # noqa: F401 + KafkaError, _AdminClientImpl, NewTopic, NewPartitions, @@ -17,180 +44,6 @@ RESOURCE_GROUP, RESOURCE_BROKER) -import concurrent.futures -import functools - -from enum import Enum - - -class ConfigSource(Enum): - """ - Enumerates the different sources of configuration properties. - Used by ConfigEntry to specify the - source of configuration properties returned by `describe_configs()`. - """ - UNKNOWN_CONFIG = CONFIG_SOURCE_UNKNOWN_CONFIG #: Unknown - DYNAMIC_TOPIC_CONFIG = CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG #: Dynamic Topic - DYNAMIC_BROKER_CONFIG = CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG #: Dynamic Broker - DYNAMIC_DEFAULT_BROKER_CONFIG = CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG #: Dynamic Default Broker - STATIC_BROKER_CONFIG = CONFIG_SOURCE_STATIC_BROKER_CONFIG #: Static Broker - DEFAULT_CONFIG = CONFIG_SOURCE_DEFAULT_CONFIG #: Default - - -class ConfigEntry(object): - """ - Represents a configuration property. Returned by describe_configs() for each configuration - entry of the specified resource. - - This class is typically not user instantiated. - """ - - def __init__(self, name, value, - source=ConfigSource.UNKNOWN_CONFIG, - is_read_only=False, - is_default=False, - is_sensitive=False, - is_synonym=False, - synonyms=[]): - """ - This class is typically not user instantiated. - """ - super(ConfigEntry, self).__init__() - - self.name = name - """Configuration property name.""" - self.value = value - """Configuration value (or None if not set or is_sensitive==True).""" - self.source = source - """Configuration source.""" - self.is_read_only = bool(is_read_only) - """Indicates whether the configuration property is read-only.""" - self.is_default = bool(is_default) - """Indicates whether the configuration property is using its default value.""" - self.is_sensitive = bool(is_sensitive) - """ - Indicates whether the configuration property value contains - sensitive information (such as security settings), in which - case .value is None.""" - self.is_synonym = bool(is_synonym) - """Indicates whether the configuration property is a synonym for the parent configuration entry.""" - self.synonyms = synonyms - """A list of synonyms (ConfigEntry) and alternate sources for this configuration property.""" - - def __repr__(self): - return "ConfigEntry(%s=\"%s\")" % (self.name, self.value) - - def __str__(self): - return "%s=\"%s\"" % (self.name, self.value) - - -@functools.total_ordering -class ConfigResource(object): - """ - Represents a resource that has configuration, and (optionally) - a collection of configuration properties for that resource. Used by - describe_configs() and alter_configs(). - - Parameters - ---------- - restype : `ConfigResource.Type` - The resource type. - name : `str` - The resource name, which depends on the resource type. For RESOURCE_BROKER, the resource name is the broker id. - set_config : `dict` - The configuration to set/overwrite. Dictionary of str, str. - """ - - class Type(Enum): - """ - Enumerates the different types of Kafka resources. - """ - UNKNOWN = RESOURCE_UNKNOWN #: Resource type is not known or not set. - ANY = RESOURCE_ANY #: Match any resource, used for lookups. - TOPIC = RESOURCE_TOPIC #: Topic resource. Resource name is topic name. - GROUP = RESOURCE_GROUP #: Group resource. Resource name is group.id. - BROKER = RESOURCE_BROKER #: Broker resource. Resource name is broker id. - - def __init__(self, restype, name, - set_config=None, described_configs=None, error=None): - """ - :param ConfigResource.Type restype: Resource type. - :param str name: The resource name, which depends on restype. - For RESOURCE_BROKER, the resource name is the broker id. - :param dict set_config: The configuration to set/overwrite. Dictionary of str, str. - :param dict described_configs: For internal use only. - :param KafkaError error: For internal use only. - """ - super(ConfigResource, self).__init__() - - if name is None: - raise ValueError("Expected resource name to be a string") - - if type(restype) == str: - # Allow resource type to be specified as case-insensitive string, for convenience. - try: - restype = ConfigResource.Type[restype.upper()] - except KeyError: - raise ValueError("Unknown resource type \"%s\": should be a ConfigResource.Type" % restype) - - elif type(restype) == int: - # The C-code passes restype as an int, convert to Type. - restype = ConfigResource.Type(restype) - - self.restype = restype - self.restype_int = int(self.restype.value) # for the C code - self.name = name - - if set_config is not None: - self.set_config_dict = set_config.copy() - else: - self.set_config_dict = dict() - - self.configs = described_configs - self.error = error - - def __repr__(self): - if self.error is not None: - return "ConfigResource(%s,%s,%r)" % (self.restype, self.name, self.error) - else: - return "ConfigResource(%s,%s)" % (self.restype, self.name) - - def __hash__(self): - return hash((self.restype, self.name)) - - def __lt__(self, other): - if self.restype < other.restype: - return True - return self.name.__lt__(other.name) - - def __eq__(self, other): - return self.restype == other.restype and self.name == other.name - - def __len__(self): - """ - :rtype: int - :returns: number of configuration entries/operations - """ - return len(self.set_config_dict) - - def set_config(self, name, value, overwrite=True): - """ - Set/overwrite a configuration value. - - When calling alter_configs, any configuration properties that are not included - in the request will be reverted to their default values. As a workaround, use - describe_configs() to retrieve the current configuration and overwrite the - settings you want to change. - - :param str name: Configuration property name - :param str value: Configuration value - :param bool overwrite: If True, overwrite entry if it already exists (default). - If False, do nothing if entry already exists. - """ - if not overwrite and name in self.set_config_dict: - return - self.set_config_dict[name] = value - class AdminClient (_AdminClientImpl): """ @@ -213,6 +66,7 @@ class AdminClient (_AdminClientImpl): Requires broker version v0.11.0.0 or later. """ + def __init__(self, conf): """ Create a new AdminClient using the provided configuration dictionary. @@ -274,6 +128,39 @@ def _make_resource_result(f, futmap): for resource, fut in futmap.items(): fut.set_exception(e) + @staticmethod + def _make_acls_result(f, futmap): + """ + Map create ACL binding results to corresponding futures in futmap. + For create_acls the result value of each (successful) future is None. + For delete_acls the result value of each (successful) future is the list of deleted AclBindings. + """ + try: + results = f.result() + futmap_values = list(futmap.values()) + len_results = len(results) + len_futures = len(futmap_values) + if len_results != len_futures: + raise RuntimeError( + "Results length {} is different from future-map length {}".format(len_results, len_futures)) + for i, result in enumerate(results): + fut = futmap_values[i] + if isinstance(result, KafkaError): + fut.set_exception(KafkaException(result)) + else: + fut.set_result(result) + except Exception as e: + # Request-level exception, raise the same for all the AclBindings or AclBindingFilters + for resource, fut in futmap.items(): + fut.set_exception(e) + + @staticmethod + def _create_future(): + f = concurrent.futures.Future() + if not f.set_running_or_notify_cancel(): + raise RuntimeError("Future was cancelled prematurely") + return f + @staticmethod def _make_futures(futmap_keys, class_check, make_result_fn): """ @@ -283,20 +170,15 @@ def _make_futures(futmap_keys, class_check, make_result_fn): futmap = {} for key in futmap_keys: if class_check is not None and not isinstance(key, class_check): - raise ValueError("Expected list of {}".format(type(class_check))) - futmap[key] = concurrent.futures.Future() - if not futmap[key].set_running_or_notify_cancel(): - raise RuntimeError("Future was cancelled prematurely") + raise ValueError("Expected list of {}".format(repr(class_check))) + futmap[key] = AdminClient._create_future() # Create an internal future for the entire request, # this future will trigger _make_..._result() and set result/exception # per topic,future in futmap. - f = concurrent.futures.Future() + f = AdminClient._create_future() f.add_done_callback(lambda f: make_result_fn(f, futmap)) - if not f.set_running_or_notify_cancel(): - raise RuntimeError("Future was cancelled prematurely") - return f, futmap def create_topics(self, new_topics, **kwargs): @@ -363,13 +245,13 @@ def delete_topics(self, topics, **kwargs): return futmap - def list_topics(self, **kwargs): + def list_topics(self, *args, **kwargs): - return super(AdminClient, self).list_topics(**kwargs) + return super(AdminClient, self).list_topics(*args, **kwargs) - def list_groups(self, **kwargs): + def list_groups(self, *args, **kwargs): - return super(AdminClient, self).list_groups(**kwargs) + return super(AdminClient, self).list_groups(*args, **kwargs) def create_partitions(self, new_partitions, **kwargs): """ @@ -479,6 +361,97 @@ def alter_configs(self, resources, **kwargs): return futmap + def create_acls(self, acls, **kwargs): + """ + Create one or more ACL bindings. + + :param list(AclBinding) acls: A list of ACL binding specifications (:class:`.AclBinding`) + to create. + :param float request_timeout: The overall request timeout in seconds, + including broker lookup, request transmission, operation time + on broker, and response. Default: `socket.timeout.ms*1000.0` + + :returns: A dict of futures for each ACL binding, keyed by the :class:`AclBinding` object. + The future result() method returns None on success. + + :rtype: dict[AclBinding, future] + + :raises KafkaException: Operation failed locally or on broker. + :raises TypeException: Invalid input. + :raises ValueException: Invalid input. + """ + + f, futmap = AdminClient._make_futures(acls, AclBinding, + AdminClient._make_acls_result) + + super(AdminClient, self).create_acls(acls, f, **kwargs) + + return futmap + + def describe_acls(self, acl_binding_filter, **kwargs): + """ + Match ACL bindings by filter. + + :param AclBindingFilter acl_binding_filter: a filter with attributes that + must match. + String attributes match exact values or any string if set to None. + Enums attributes match exact values or any value if ending with `_ANY`. + If :class:`ResourcePatternType` is set to :attr:`ResourcePatternType.MATCH` returns all + the ACL bindings with :attr:`ResourcePatternType.LITERAL`, + :attr:`ResourcePatternType.WILDCARD` or :attr:`ResourcePatternType.PREFIXED` pattern + type that match the resource name. + :param float request_timeout: The overall request timeout in seconds, + including broker lookup, request transmission, operation time + on broker, and response. Default: `socket.timeout.ms*1000.0` + + :returns: A future returning a list(:class:`AclBinding`) as result + + :rtype: future + + :raises KafkaException: Operation failed locally or on broker. + :raises TypeException: Invalid input. + :raises ValueException: Invalid input. + """ + + f = AdminClient._create_future() + + super(AdminClient, self).describe_acls(acl_binding_filter, f, **kwargs) + + return f + + def delete_acls(self, acl_binding_filters, **kwargs): + """ + Delete ACL bindings matching one or more ACL binding filters. + + :param list(AclBindingFilter) acl_binding_filters: a list of ACL binding filters + to match ACLs to delete. + String attributes match exact values or any string if set to None. + Enums attributes match exact values or any value if ending with `_ANY`. + If :class:`ResourcePatternType` is set to :attr:`ResourcePatternType.MATCH` + deletes all the ACL bindings with :attr:`ResourcePatternType.LITERAL`, + :attr:`ResourcePatternType.WILDCARD` or :attr:`ResourcePatternType.PREFIXED` + pattern type that match the resource name. + :param float request_timeout: The overall request timeout in seconds, + including broker lookup, request transmission, operation time + on broker, and response. Default: `socket.timeout.ms*1000.0` + + :returns: A dict of futures for each ACL binding filter, keyed by the :class:`AclBindingFilter` object. + The future result() method returns a list of :class:`AclBinding`. + + :rtype: dict[AclBindingFilter, future] + + :raises KafkaException: Operation failed locally or on broker. + :raises TypeException: Invalid input. + :raises ValueException: Invalid input. + """ + + f, futmap = AdminClient._make_futures(acl_binding_filters, AclBindingFilter, + AdminClient._make_acls_result) + + super(AdminClient, self).delete_acls(acl_binding_filters, f, **kwargs) + + return futmap + class ClusterMetadata (object): """ @@ -487,6 +460,7 @@ class ClusterMetadata (object): This class is typically not user instantiated. """ + def __init__(self): self.cluster_id = None """Cluster id string, if supported by the broker, else None.""" @@ -514,6 +488,7 @@ class BrokerMetadata (object): This class is typically not user instantiated. """ + def __init__(self): self.id = -1 """Broker id""" @@ -538,6 +513,7 @@ class TopicMetadata (object): # The dash in "-topic" and "-error" is needed to circumvent a # Sphinx issue where it tries to reference the same instance variable # on other classes which raises a warning/error. + def __init__(self): self.topic = None """Topic name""" @@ -567,6 +543,7 @@ class PartitionMetadata (object): in ClusterMetadata.brokers. Always check the availability of a broker id in the brokers dict. """ + def __init__(self): self.id = -1 """Partition id.""" @@ -597,6 +574,7 @@ class GroupMember(object): This class is typically not user instantiated. """ # noqa: E501 + def __init__(self,): self.id = None """Member id (generated by broker).""" @@ -615,6 +593,7 @@ class GroupMetadata(object): This class is typically not user instantiated. """ + def __init__(self): self.broker = None """Originating broker metadata.""" diff --git a/src/confluent_kafka/admin/_acl.py b/src/confluent_kafka/admin/_acl.py new file mode 100644 index 000000000..853ad2158 --- /dev/null +++ b/src/confluent_kafka/admin/_acl.py @@ -0,0 +1,231 @@ +# Copyright 2022 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from enum import Enum +import functools +from .. import cimpl as _cimpl +from ._resource import ResourceType, ResourcePatternType + +try: + string_type = basestring +except NameError: + string_type = str + + +class AclOperation(Enum): + """ + Enumerates the different types of ACL operation. + """ + UNKNOWN = _cimpl.ACL_OPERATION_UNKNOWN #: Unknown + ANY = _cimpl.ACL_OPERATION_ANY #: In a filter, matches any AclOperation + ALL = _cimpl.ACL_OPERATION_ALL #: ALL the operations + READ = _cimpl.ACL_OPERATION_READ #: READ operation + WRITE = _cimpl.ACL_OPERATION_WRITE #: WRITE operation + CREATE = _cimpl.ACL_OPERATION_CREATE #: CREATE operation + DELETE = _cimpl.ACL_OPERATION_DELETE #: DELETE operation + ALTER = _cimpl.ACL_OPERATION_ALTER #: ALTER operation + DESCRIBE = _cimpl.ACL_OPERATION_DESCRIBE #: DESCRIBE operation + CLUSTER_ACTION = _cimpl.ACL_OPERATION_CLUSTER_ACTION #: CLUSTER_ACTION operation + DESCRIBE_CONFIGS = _cimpl.ACL_OPERATION_DESCRIBE_CONFIGS #: DESCRIBE_CONFIGS operation + ALTER_CONFIGS = _cimpl.ACL_OPERATION_ALTER_CONFIGS #: ALTER_CONFIGS operation + IDEMPOTENT_WRITE = _cimpl.ACL_OPERATION_IDEMPOTENT_WRITE #: IDEMPOTENT_WRITE operation + + def __lt__(self, other): + if self.__class__ != other.__class__: + return NotImplemented + return self.value < other.value + + +class AclPermissionType(Enum): + """ + Enumerates the different types of ACL permission types. + """ + UNKNOWN = _cimpl.ACL_PERMISSION_TYPE_UNKNOWN #: Unknown + ANY = _cimpl.ACL_PERMISSION_TYPE_ANY #: In a filter, matches any AclPermissionType + DENY = _cimpl.ACL_PERMISSION_TYPE_DENY #: Disallows access + ALLOW = _cimpl.ACL_PERMISSION_TYPE_ALLOW #: Grants access + + def __lt__(self, other): + if self.__class__ != other.__class__: + return NotImplemented + return self.value < other.value + + +@functools.total_ordering +class AclBinding(object): + """ + Represents an ACL binding that specify the operation and permission type for a specific principal + over one or more resources of the same type. Used by :meth:`AdminClient.create_acls`, + returned by :meth:`AdminClient.describe_acls` and :meth:`AdminClient.delete_acls`. + + Parameters + ---------- + restype : ResourceType + The resource type. + name : str + The resource name, which depends on the resource type. For :attr:`ResourceType.BROKER`, + the resource name is the broker id. + resource_pattern_type : ResourcePatternType + The resource pattern, relative to the name. + principal : str + The principal this AclBinding refers to. + host : str + The host that the call is allowed to come from. + operation: AclOperation + The operation/s specified by this binding. + permission_type: AclPermissionType + The permission type for the specified operation. + """ + + def __init__(self, restype, name, + resource_pattern_type, principal, host, + operation, permission_type): + self.restype = restype + self.name = name + self.resource_pattern_type = resource_pattern_type + self.principal = principal + self.host = host + self.operation = operation + self.permission_type = permission_type + self._convert_args() + # for the C code + self.restype_int = int(self.restype.value) + self.resource_pattern_type_int = int(self.resource_pattern_type.value) + self.operation_int = int(self.operation.value) + self.permission_type_int = int(self.permission_type.value) + + def _check_not_none(self, vars_to_check): + for param in vars_to_check: + if getattr(self, param) is None: + raise ValueError("Expected %s to be not None" % (param,)) + + def _check_is_string(self, vars_to_check): + for param in vars_to_check: + param_value = getattr(self, param) + if param_value is not None and not isinstance(param_value, string_type): + raise TypeError("Expected %s to be a string" % (param,)) + + def _convert_to_enum(self, val, enum_clazz): + if type(val) == str: + # Allow it to be specified as case-insensitive string, for convenience. + try: + val = enum_clazz[val.upper()] + except KeyError: + raise ValueError("Unknown value \"%s\": should be a %s" % (val, enum_clazz.__name__)) + + elif type(val) == int: + # The C-code passes restype as an int, convert to enum. + val = enum_clazz(val) + + elif type(val) != enum_clazz: + raise TypeError("Unknown value \"%s\": should be a %s" % (val, enum_clazz.__name__)) + + return val + + def _convert_enums(self): + self.restype = self._convert_to_enum(self.restype, ResourceType) + self.resource_pattern_type = self._convert_to_enum(self.resource_pattern_type, ResourcePatternType) + self.operation = self._convert_to_enum(self.operation, AclOperation) + self.permission_type = self._convert_to_enum(self.permission_type, AclPermissionType) + + def _check_forbidden_enums(self, forbidden_enums): + for k, v in forbidden_enums.items(): + enum_value = getattr(self, k) + if enum_value in v: + raise ValueError("Cannot use enum %s, value %s in this class" % (k, enum_value.name)) + + def _not_none_args(self): + return ["restype", "name", "resource_pattern_type", + "principal", "host", "operation", "permission_type"] + + def _string_args(self): + return ["name", "principal", "host"] + + def _forbidden_enums(self): + return { + "restype": [ResourceType.ANY], + "resource_pattern_type": [ResourcePatternType.ANY, + ResourcePatternType.MATCH], + "operation": [AclOperation.ANY], + "permission_type": [AclPermissionType.ANY] + } + + def _convert_args(self): + not_none_args = self._not_none_args() + string_args = self._string_args() + forbidden_enums = self._forbidden_enums() + self._check_not_none(not_none_args) + self._check_is_string(string_args) + self._convert_enums() + self._check_forbidden_enums(forbidden_enums) + + def __repr__(self): + type_name = type(self).__name__ + return "%s(%s,%s,%s,%s,%s,%s,%s)" % ((type_name,) + self._to_tuple()) + + def _to_tuple(self): + return (self.restype, self.name, self.resource_pattern_type, + self.principal, self.host, self.operation, + self.permission_type) + + def __hash__(self): + return hash(self._to_tuple()) + + def __lt__(self, other): + if self.__class__ != other.__class__: + return NotImplemented + return self._to_tuple() < other._to_tuple() + + def __eq__(self, other): + if self.__class__ != other.__class__: + return NotImplemented + return self._to_tuple() == other._to_tuple() + + +class AclBindingFilter(AclBinding): + """ + Represents an ACL binding filter used to return a list of ACL bindings matching some or all of its attributes. + Used by :meth:`AdminClient.describe_acls` and :meth:`AdminClient.delete_acls`. + + Parameters + ---------- + restype : ResourceType + The resource type, or :attr:`ResourceType.ANY` to match any value. + name : str + The resource name to match. + None matches any value. + resource_pattern_type : ResourcePatternType + The resource pattern, :attr:`ResourcePatternType.ANY` to match any value or + :attr:`ResourcePatternType.MATCH` to perform pattern matching. + principal : str + The principal to match, or None to match any value. + host : str + The host to match, or None to match any value. + operation: AclOperation + The operation to match or :attr:`AclOperation.ANY` to match any value. + permission_type: AclPermissionType + The permission type to match or :attr:`AclPermissionType.ANY` to match any value. + """ + + def _not_none_args(self): + return ["restype", "resource_pattern_type", + "operation", "permission_type"] + + def _forbidden_enums(self): + return { + "restype": [ResourceType.UNKNOWN], + "resource_pattern_type": [ResourcePatternType.UNKNOWN], + "operation": [AclOperation.UNKNOWN], + "permission_type": [AclPermissionType.UNKNOWN] + } diff --git a/src/confluent_kafka/admin/_config.py b/src/confluent_kafka/admin/_config.py new file mode 100644 index 000000000..678ffa88b --- /dev/null +++ b/src/confluent_kafka/admin/_config.py @@ -0,0 +1,179 @@ +# Copyright 2022 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from enum import Enum +import functools +from .. import cimpl as _cimpl +from ._resource import ResourceType + + +class ConfigSource(Enum): + """ + Enumerates the different sources of configuration properties. + Used by ConfigEntry to specify the + source of configuration properties returned by `describe_configs()`. + """ + UNKNOWN_CONFIG = _cimpl.CONFIG_SOURCE_UNKNOWN_CONFIG #: Unknown + DYNAMIC_TOPIC_CONFIG = _cimpl.CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG #: Dynamic Topic + DYNAMIC_BROKER_CONFIG = _cimpl.CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG #: Dynamic Broker + DYNAMIC_DEFAULT_BROKER_CONFIG = _cimpl.CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG #: Dynamic Default Broker + STATIC_BROKER_CONFIG = _cimpl.CONFIG_SOURCE_STATIC_BROKER_CONFIG #: Static Broker + DEFAULT_CONFIG = _cimpl.CONFIG_SOURCE_DEFAULT_CONFIG #: Default + + +class ConfigEntry(object): + """ + Represents a configuration property. Returned by describe_configs() for each configuration + entry of the specified resource. + + This class is typically not user instantiated. + """ + + def __init__(self, name, value, + source=ConfigSource.UNKNOWN_CONFIG, + is_read_only=False, + is_default=False, + is_sensitive=False, + is_synonym=False, + synonyms=[]): + """ + This class is typically not user instantiated. + """ + super(ConfigEntry, self).__init__() + + self.name = name + """Configuration property name.""" + self.value = value + """Configuration value (or None if not set or is_sensitive==True).""" + self.source = source + """Configuration source.""" + self.is_read_only = bool(is_read_only) + """Indicates whether the configuration property is read-only.""" + self.is_default = bool(is_default) + """Indicates whether the configuration property is using its default value.""" + self.is_sensitive = bool(is_sensitive) + """ + Indicates whether the configuration property value contains + sensitive information (such as security settings), in which + case .value is None.""" + self.is_synonym = bool(is_synonym) + """Indicates whether the configuration property is a synonym for the parent configuration entry.""" + self.synonyms = synonyms + """A list of synonyms (ConfigEntry) and alternate sources for this configuration property.""" + + def __repr__(self): + return "ConfigEntry(%s=\"%s\")" % (self.name, self.value) + + def __str__(self): + return "%s=\"%s\"" % (self.name, self.value) + + +@functools.total_ordering +class ConfigResource(object): + """ + Represents a resource that has configuration, and (optionally) + a collection of configuration properties for that resource. Used by + describe_configs() and alter_configs(). + + Parameters + ---------- + restype : `ConfigResource.Type` + The resource type. + name : `str` + The resource name, which depends on the resource type. For RESOURCE_BROKER, the resource name is the broker id. + set_config : `dict` + The configuration to set/overwrite. Dictionary of str, str. + """ + + Type = ResourceType + + def __init__(self, restype, name, + set_config=None, described_configs=None, error=None): + """ + :param ConfigResource.Type restype: Resource type. + :param str name: The resource name, which depends on restype. + For RESOURCE_BROKER, the resource name is the broker id. + :param dict set_config: The configuration to set/overwrite. Dictionary of str, str. + :param dict described_configs: For internal use only. + :param KafkaError error: For internal use only. + """ + super(ConfigResource, self).__init__() + + if name is None: + raise ValueError("Expected resource name to be a string") + + if type(restype) == str: + # Allow resource type to be specified as case-insensitive string, for convenience. + try: + restype = ConfigResource.Type[restype.upper()] + except KeyError: + raise ValueError("Unknown resource type \"%s\": should be a ConfigResource.Type" % restype) + + elif type(restype) == int: + # The C-code passes restype as an int, convert to Type. + restype = ConfigResource.Type(restype) + + self.restype = restype + self.restype_int = int(self.restype.value) # for the C code + self.name = name + + if set_config is not None: + self.set_config_dict = set_config.copy() + else: + self.set_config_dict = dict() + + self.configs = described_configs + self.error = error + + def __repr__(self): + if self.error is not None: + return "ConfigResource(%s,%s,%r)" % (self.restype, self.name, self.error) + else: + return "ConfigResource(%s,%s)" % (self.restype, self.name) + + def __hash__(self): + return hash((self.restype, self.name)) + + def __lt__(self, other): + if self.restype < other.restype: + return True + return self.name.__lt__(other.name) + + def __eq__(self, other): + return self.restype == other.restype and self.name == other.name + + def __len__(self): + """ + :rtype: int + :returns: number of configuration entries/operations + """ + return len(self.set_config_dict) + + def set_config(self, name, value, overwrite=True): + """ + Set/overwrite a configuration value. + + When calling alter_configs, any configuration properties that are not included + in the request will be reverted to their default values. As a workaround, use + describe_configs() to retrieve the current configuration and overwrite the + settings you want to change. + + :param str name: Configuration property name + :param str value: Configuration value + :param bool overwrite: If True, overwrite entry if it already exists (default). + If False, do nothing if entry already exists. + """ + if not overwrite and name in self.set_config_dict: + return + self.set_config_dict[name] = value diff --git a/src/confluent_kafka/admin/_resource.py b/src/confluent_kafka/admin/_resource.py new file mode 100644 index 000000000..b786f3a9a --- /dev/null +++ b/src/confluent_kafka/admin/_resource.py @@ -0,0 +1,48 @@ +# Copyright 2022 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from enum import Enum +from .. import cimpl as _cimpl + + +class ResourceType(Enum): + """ + Enumerates the different types of Kafka resources. + """ + UNKNOWN = _cimpl.RESOURCE_UNKNOWN #: Resource type is not known or not set. + ANY = _cimpl.RESOURCE_ANY #: Match any resource, used for lookups. + TOPIC = _cimpl.RESOURCE_TOPIC #: Topic resource. Resource name is topic name. + GROUP = _cimpl.RESOURCE_GROUP #: Group resource. Resource name is group.id. + BROKER = _cimpl.RESOURCE_BROKER #: Broker resource. Resource name is broker id. + + def __lt__(self, other): + if self.__class__ != other.__class__: + return NotImplemented + return self.value < other.value + + +class ResourcePatternType(Enum): + """ + Enumerates the different types of Kafka resource patterns. + """ + UNKNOWN = _cimpl.RESOURCE_PATTERN_UNKNOWN #: Resource pattern type is not known or not set. + ANY = _cimpl.RESOURCE_PATTERN_ANY #: Match any resource, used for lookups. + MATCH = _cimpl.RESOURCE_PATTERN_MATCH #: Match: will perform pattern matching + LITERAL = _cimpl.RESOURCE_PATTERN_LITERAL #: Literal: A literal resource name + PREFIXED = _cimpl.RESOURCE_PATTERN_PREFIXED #: Prefixed: A prefixed resource name + + def __lt__(self, other): + if self.__class__ != other.__class__: + return NotImplemented + return self.value < other.value diff --git a/src/confluent_kafka/avro/requirements.txt b/src/confluent_kafka/avro/requirements.txt index 90bc865f0..7f833ddd7 100644 --- a/src/confluent_kafka/avro/requirements.txt +++ b/src/confluent_kafka/avro/requirements.txt @@ -1,4 +1,3 @@ fastavro>=0.23.0 requests -avro==1.10.0;python_version<='3.0' -avro-python3==1.10.0;python_version>='3.0' +avro==1.10.0 diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index ee0d68bb3..d438366d0 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -144,13 +144,69 @@ Admin_options_to_c (Handle *self, rd_kafka_admin_op_t for_api, return c_options; err: - rd_kafka_AdminOptions_destroy(c_options); + if (c_options) rd_kafka_AdminOptions_destroy(c_options); PyErr_Format(PyExc_ValueError, "%s", errstr); return NULL; } +/** + * @brief Convert py AclBinding to C + */ +static rd_kafka_AclBinding_t * +Admin_py_to_c_AclBinding (const PyObject *py_obj_arg, + char *errstr, + size_t errstr_size) { + int restype, resource_pattern_type, operation, permission_type; + char *resname = NULL, *principal = NULL, *host = NULL; + rd_kafka_AclBinding_t *ret = NULL; + + PyObject *py_obj = (PyObject *) py_obj_arg; + if(cfl_PyObject_GetInt(py_obj, "restype_int", &restype, 0, 1) + && cfl_PyObject_GetString(py_obj, "name", &resname, NULL, 1, 0) + && cfl_PyObject_GetInt(py_obj, "resource_pattern_type_int", &resource_pattern_type, 0, 1) + && cfl_PyObject_GetString(py_obj, "principal", &principal, NULL, 1, 0) + && cfl_PyObject_GetString(py_obj, "host", &host, NULL, 1, 0) + && cfl_PyObject_GetInt(py_obj, "operation_int", &operation, 0, 1) + && cfl_PyObject_GetInt(py_obj, "permission_type_int", &permission_type, 0, 1)) { + ret = rd_kafka_AclBinding_new(restype, resname, \ + resource_pattern_type, principal, host, \ + operation, permission_type, errstr, errstr_size); + } + if (resname) free(resname); + if (principal) free(principal); + if (host) free(host); + return ret; +} +/** + * @brief Convert py AclBindingFilter to C + */ +static rd_kafka_AclBindingFilter_t* +Admin_py_to_c_AclBindingFilter (const PyObject *py_obj_arg, + char *errstr, + size_t errstr_size) { + int restype, resource_pattern_type, operation, permission_type; + char *resname = NULL, *principal = NULL, *host = NULL; + PyObject *py_obj = (PyObject *) py_obj_arg; + rd_kafka_AclBindingFilter_t* ret = NULL; + + if(cfl_PyObject_GetInt(py_obj, "restype_int", &restype, 0, 1) + && cfl_PyObject_GetString(py_obj, "name", &resname, NULL, 1, 1) + && cfl_PyObject_GetInt(py_obj, "resource_pattern_type_int", &resource_pattern_type, 0, 1) + && cfl_PyObject_GetString(py_obj, "principal", &principal, NULL, 1, 1) + && cfl_PyObject_GetString(py_obj, "host", &host, NULL, 1, 1) + && cfl_PyObject_GetInt(py_obj, "operation_int", &operation, 0, 1) + && cfl_PyObject_GetInt(py_obj, "permission_type_int", &permission_type, 0, 1)) { + ret = rd_kafka_AclBindingFilter_new(restype, resname, \ + resource_pattern_type, principal, host, \ + operation, permission_type, errstr, errstr_size); + } + if (resname) free(resname); + if (principal) free(principal); + if (host) free(host); + return ret; +} /** * @brief Translate Python list(list(int)) replica assignments and set @@ -770,7 +826,7 @@ static PyObject *Admin_describe_configs (Handle *self, PyObject *args, if (!cfl_PyObject_GetInt(res, "restype_int", &restype, 0, 0)) goto err; - if (!cfl_PyObject_GetString(res, "name", &resname, NULL, 0)) + if (!cfl_PyObject_GetString(res, "name", &resname, NULL, 0, 0)) goto err; c_objs[i] = rd_kafka_ConfigResource_new( @@ -912,7 +968,7 @@ static PyObject *Admin_alter_configs (Handle *self, PyObject *args, if (!cfl_PyObject_GetInt(res, "restype_int", &restype, 0, 0)) goto err; - if (!cfl_PyObject_GetString(res, "name", &resname, NULL, 0)) + if (!cfl_PyObject_GetString(res, "name", &resname, NULL, 0, 0)) goto err; c_objs[i] = rd_kafka_ConfigResource_new( @@ -930,7 +986,7 @@ static PyObject *Admin_alter_configs (Handle *self, PyObject *args, * Translate and apply config entries in the various dicts. */ if (!cfl_PyObject_GetAttr(res, "set_config_dict", &dict, - &PyDict_Type, 1)) { + &PyDict_Type, 1, 0)) { i++; goto err; } @@ -977,6 +1033,363 @@ static PyObject *Admin_alter_configs (Handle *self, PyObject *args, } +/** + * @brief create_acls + */ +static PyObject *Admin_create_acls (Handle *self, PyObject *args, PyObject *kwargs) { + PyObject *acls_list, *future; + int cnt, i = 0; + struct Admin_options options = Admin_options_INITIALIZER; + PyObject *AclBinding_type = NULL; + rd_kafka_AdminOptions_t *c_options = NULL; + rd_kafka_AclBinding_t **c_objs = NULL; + CallState cs; + rd_kafka_queue_t *rkqu; + char errstr[512]; + + static char *kws[] = {"acls", + "future", + /* options */ + "request_timeout", + NULL}; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO|f", kws, + &acls_list, + &future, + &options.request_timeout)) + goto err; + + if (!PyList_Check(acls_list) || + (cnt = (int)PyList_Size(acls_list)) < 1) { + PyErr_SetString(PyExc_ValueError, + "Expected non-empty list of AclBinding " + "objects"); + goto err; + } + + + /* Look up the AclBinding class so we can check if the provided + * topics are of correct type. + * Since this is not in the fast path we treat ourselves + * to the luxury of looking up this for each call. */ + AclBinding_type = cfl_PyObject_lookup("confluent_kafka.admin", + "AclBinding"); + if (!AclBinding_type) { + goto err; + } + + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_CREATEACLS, + &options, future); + if (!c_options) + goto err; /* Exception raised by options_to_c() */ + + /* options_to_c() sets future as the opaque, which is used in the + * background_event_cb to set the results on the future as the + * admin operation is finished, so we need to keep our own refcount. */ + Py_INCREF(future); + + /* + * Parse the list of AclBinding and convert to + * corresponding C types. + */ + c_objs = malloc(sizeof(*c_objs) * cnt); + + for (i = 0 ; i < cnt ; i++) { + int r; + PyObject *res = PyList_GET_ITEM(acls_list, i); + + r = PyObject_IsInstance(res, AclBinding_type); + if (r == -1) + goto err; /* Exception raised by IsInstance() */ + else if (r == 0) { + PyErr_SetString(PyExc_ValueError, + "Expected list of " + "AclBinding objects"); + goto err; + } + + + c_objs[i] = Admin_py_to_c_AclBinding(res, errstr, sizeof(errstr)); + if (!c_objs[i]) { + PyErr_SetString(PyExc_ValueError, errstr); + goto err; + } + } + + /* Use librdkafka's background thread queue to automatically dispatch + * Admin_background_event_cb() when the admin operation is finished. */ + rkqu = rd_kafka_queue_get_background(self->rk); + + /* + * Call CreateAcls + * + * We need to set up a CallState and release GIL here since + * the event_cb may be triggered immediately. + */ + CallState_begin(self, &cs); + rd_kafka_CreateAcls(self->rk, c_objs, cnt, c_options, rkqu); + CallState_end(self, &cs); + + rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */ + rd_kafka_AclBinding_destroy_array(c_objs, cnt); + free(c_objs); + Py_DECREF(AclBinding_type); /* from lookup() */ + rd_kafka_AdminOptions_destroy(c_options); + + Py_RETURN_NONE; +err: + if (c_objs) { + rd_kafka_AclBinding_destroy_array(c_objs, i); + free(c_objs); + } + if (AclBinding_type) Py_DECREF(AclBinding_type); + if (c_options) { + rd_kafka_AdminOptions_destroy(c_options); + Py_DECREF(future); + } + return NULL; +} + + +static const char Admin_create_acls_doc[] = PyDoc_STR( + ".. py:function:: create_acls(acl_bindings, future, [request_timeout])\n" + "\n" + " Create a list of ACL bindings.\n" + "\n" + " This method should not be used directly, use confluent_kafka.AdminClient.create_acls()\n" +); + + +/** + * @brief describe_acls + */ +static PyObject *Admin_describe_acls (Handle *self, PyObject *args, PyObject *kwargs) { + PyObject *acl_binding_filter, *future; + int r; + struct Admin_options options = Admin_options_INITIALIZER; + PyObject *AclBindingFilter_type = NULL; + rd_kafka_AdminOptions_t *c_options = NULL; + rd_kafka_AclBindingFilter_t *c_obj = NULL; + CallState cs; + rd_kafka_queue_t *rkqu; + char errstr[512]; + + static char *kws[] = {"acl_binding_filter", + "future", + /* options */ + "request_timeout", + NULL}; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO|f", kws, + &acl_binding_filter, + &future, + &options.request_timeout)) + goto err; + + + /* Look up the AclBindingFilter class so we can check if the provided + * topics are of correct type. + * Since this is not in the fast path we treat ourselves + * to the luxury of looking up this for each call. */ + AclBindingFilter_type = cfl_PyObject_lookup("confluent_kafka.admin", + "AclBindingFilter"); + if (!AclBindingFilter_type) { + goto err; + } + + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_CREATEACLS, + &options, future); + if (!c_options) + goto err; /* Exception raised by options_to_c() */ + + /* options_to_c() sets future as the opaque, which is used in the + * background_event_cb to set the results on the future as the + * admin operation is finished, so we need to keep our own refcount. */ + Py_INCREF(future); + + /* + * convert the AclBindingFilter to the + * corresponding C type. + */ + r = PyObject_IsInstance(acl_binding_filter, AclBindingFilter_type); + if (r == -1) + goto err; /* Exception raised by IsInstance() */ + else if (r == 0) { + PyErr_SetString(PyExc_TypeError, + "Expected an " + "AclBindingFilter object"); + goto err; + } + + c_obj = Admin_py_to_c_AclBindingFilter(acl_binding_filter, errstr, sizeof(errstr)); + if (!c_obj) { + PyErr_SetString(PyExc_ValueError, errstr); + goto err; + } + + /* Use librdkafka's background thread queue to automatically dispatch + * Admin_background_event_cb() when the admin operation is finished. */ + rkqu = rd_kafka_queue_get_background(self->rk); + + /* + * Call DeleteAcls + * + * We need to set up a CallState and release GIL here since + * the event_cb may be triggered immediately. + */ + CallState_begin(self, &cs); + rd_kafka_DescribeAcls(self->rk, c_obj, c_options, rkqu); + CallState_end(self, &cs); + + rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */ + rd_kafka_AclBinding_destroy(c_obj); + Py_DECREF(AclBindingFilter_type); /* from lookup() */ + rd_kafka_AdminOptions_destroy(c_options); + Py_RETURN_NONE; +err: + if(AclBindingFilter_type) Py_DECREF(AclBindingFilter_type); + if(c_options) { + rd_kafka_AdminOptions_destroy(c_options); + Py_DECREF(future); + } + return NULL; +} + + +static const char Admin_describe_acls_doc[] = PyDoc_STR( + ".. py:function:: describe_acls(acl_binding_filter, future, [request_timeout])\n" + "\n" + " Get a list of ACL bindings matching an ACL binding filter.\n" + "\n" + " This method should not be used directly, use confluent_kafka.AdminClient.describe_acls()\n" +); + +/** + * @brief delete_acls + */ +static PyObject *Admin_delete_acls (Handle *self, PyObject *args, PyObject *kwargs) { + PyObject *acls_list, *future; + int cnt, i = 0; + struct Admin_options options = Admin_options_INITIALIZER; + PyObject *AclBindingFilter_type = NULL; + rd_kafka_AdminOptions_t *c_options = NULL; + rd_kafka_AclBindingFilter_t **c_objs = NULL; + CallState cs; + rd_kafka_queue_t *rkqu; + char errstr[512]; + + static char *kws[] = {"acls", + "future", + /* options */ + "request_timeout", + NULL}; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO|f", kws, + &acls_list, + &future, + &options.request_timeout)) + goto err; + + if (!PyList_Check(acls_list) || + (cnt = (int)PyList_Size(acls_list)) < 1) { + PyErr_SetString(PyExc_ValueError, + "Expected non-empty list of AclBindingFilter " + "objects"); + goto err; + } + + + /* Look up the AclBindingFilter class so we can check if the provided + * topics are of correct type. + * Since this is not in the fast path we treat ourselves + * to the luxury of looking up this for each call. */ + AclBindingFilter_type = cfl_PyObject_lookup("confluent_kafka.admin", + "AclBindingFilter"); + if (!AclBindingFilter_type) { + goto err; + } + + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_DELETEACLS, + &options, future); + if (!c_options) + goto err; /* Exception raised by options_to_c() */ + + /* options_to_c() sets future as the opaque, which is used in the + * background_event_cb to set the results on the future as the + * admin operation is finished, so we need to keep our own refcount. */ + Py_INCREF(future); + + /* + * Parse the list of AclBindingFilter and convert to + * corresponding C types. + */ + c_objs = malloc(sizeof(*c_objs) * cnt); + + for (i = 0 ; i < cnt ; i++) { + int r; + PyObject *res = PyList_GET_ITEM(acls_list, i); + + r = PyObject_IsInstance(res, AclBindingFilter_type); + if (r == -1) + goto err; /* Exception raised by IsInstance() */ + else if (r == 0) { + PyErr_SetString(PyExc_ValueError, + "Expected list of " + "AclBindingFilter objects"); + goto err; + } + + + c_objs[i] = Admin_py_to_c_AclBindingFilter(res, errstr, sizeof(errstr)); + if (!c_objs[i]) { + PyErr_SetString(PyExc_ValueError, errstr); + goto err; + } + } + + /* Use librdkafka's background thread queue to automatically dispatch + * Admin_background_event_cb() when the admin operation is finished. */ + rkqu = rd_kafka_queue_get_background(self->rk); + + /* + * Call DeleteAcls + * + * We need to set up a CallState and release GIL here since + * the event_cb may be triggered immediately. + */ + CallState_begin(self, &cs); + rd_kafka_DeleteAcls(self->rk, c_objs, cnt, c_options, rkqu); + CallState_end(self, &cs); + + rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */ + rd_kafka_AclBinding_destroy_array(c_objs, cnt); + free(c_objs); + Py_DECREF(AclBindingFilter_type); /* from lookup() */ + rd_kafka_AdminOptions_destroy(c_options); + + Py_RETURN_NONE; +err: + if (c_objs) { + rd_kafka_AclBinding_destroy_array(c_objs, i); + free(c_objs); + } + if(AclBindingFilter_type) Py_DECREF(AclBindingFilter_type); + if (c_options) { + rd_kafka_AdminOptions_destroy(c_options); + Py_DECREF(future); + } + return NULL; +} + + +static const char Admin_delete_acls_doc[] = PyDoc_STR( + ".. py:function:: delete_acls(acl_binding_filters, future, [request_timeout])\n" + "\n" + " Deletes ACL bindings matching one or more ACL binding filter.\n" + "\n" + " This method should not be used directly, use confluent_kafka.AdminClient.delete_acls()\n" +); + /** * @brief Call rd_kafka_poll() and keep track of crashing callbacks. @@ -1078,6 +1491,18 @@ static PyMethodDef Admin_methods[] = { list_groups_doc }, + { "create_acls", (PyCFunction)Admin_create_acls, METH_VARARGS|METH_KEYWORDS, + Admin_create_acls_doc + }, + + { "describe_acls", (PyCFunction)Admin_describe_acls, METH_VARARGS|METH_KEYWORDS, + Admin_describe_acls_doc + }, + + { "delete_acls", (PyCFunction)Admin_delete_acls, METH_VARARGS|METH_KEYWORDS, + Admin_delete_acls_doc + }, + { NULL } }; @@ -1099,20 +1524,20 @@ static PyObject * Admin_c_topic_result_to_py (const rd_kafka_topic_result_t **c_result, size_t cnt) { PyObject *result; - size_t ti; + size_t i; result = PyDict_New(); - for (ti = 0 ; ti < cnt ; ti++) { + for (i = 0 ; i < cnt ; i++) { PyObject *error; error = KafkaError_new_or_None( - rd_kafka_topic_result_error(c_result[ti]), - rd_kafka_topic_result_error_string(c_result[ti])); + rd_kafka_topic_result_error(c_result[i]), + rd_kafka_topic_result_error_string(c_result[i])); PyDict_SetItemString( result, - rd_kafka_topic_result_name(c_result[ti]), + rd_kafka_topic_result_name(c_result[i]), error); Py_DECREF(error); @@ -1280,6 +1705,134 @@ Admin_c_ConfigResource_result_to_py (const rd_kafka_ConfigResource_t **c_resourc return NULL; } +/** + * @brief Convert C AclBinding to py + */ +static PyObject * +Admin_c_AclBinding_to_py (const rd_kafka_AclBinding_t *c_acl_binding) { + + PyObject *args, *kwargs, *AclBinding_type, *acl_binding; + + AclBinding_type = cfl_PyObject_lookup("confluent_kafka.admin", + "AclBinding"); + if (!AclBinding_type) { + return NULL; + } + + kwargs = PyDict_New(); + + cfl_PyDict_SetInt(kwargs, "restype", + rd_kafka_AclBinding_restype(c_acl_binding)); + cfl_PyDict_SetString(kwargs, "name", + rd_kafka_AclBinding_name(c_acl_binding)); + cfl_PyDict_SetInt(kwargs, "resource_pattern_type", + rd_kafka_AclBinding_resource_pattern_type(c_acl_binding)); + cfl_PyDict_SetString(kwargs, "principal", + rd_kafka_AclBinding_principal(c_acl_binding)); + cfl_PyDict_SetString(kwargs, "host", + rd_kafka_AclBinding_host(c_acl_binding)); + cfl_PyDict_SetInt(kwargs, "operation", + rd_kafka_AclBinding_operation(c_acl_binding)); + cfl_PyDict_SetInt(kwargs, "permission_type", + rd_kafka_AclBinding_permission_type(c_acl_binding)); + + args = PyTuple_New(0); + acl_binding = PyObject_Call(AclBinding_type, args, kwargs); + + Py_DECREF(args); + Py_DECREF(kwargs); + Py_DECREF(AclBinding_type); + return acl_binding; +} + +/** + * @brief Convert C AclBinding array to py list. + */ +static PyObject * +Admin_c_AclBindings_to_py (const rd_kafka_AclBinding_t **c_acls, + size_t c_acls_cnt) { + size_t i; + PyObject *result; + PyObject *acl_binding; + + result = PyList_New(c_acls_cnt); + + for (i = 0 ; i < c_acls_cnt ; i++) { + acl_binding = Admin_c_AclBinding_to_py(c_acls[i]); + if (!acl_binding) { + Py_DECREF(result); + return NULL; + } + PyList_SET_ITEM(result, i, acl_binding); + } + + return result; +} + + +/** + * @brief Convert C acl_result_t array to py list. + */ +static PyObject * +Admin_c_acl_result_to_py (const rd_kafka_acl_result_t **c_result, + size_t cnt) { + PyObject *result; + size_t i; + + result = PyList_New(cnt); + + for (i = 0 ; i < cnt ; i++) { + PyObject *error; + const rd_kafka_error_t *c_error = rd_kafka_acl_result_error(c_result[i]); + + error = KafkaError_new_or_None( + rd_kafka_error_code(c_error), + rd_kafka_error_string(c_error)); + + PyList_SET_ITEM(result, i, error); + } + + return result; +} + +/** + * @brief Convert C DeleteAcls result response array to py list. + */ +static PyObject * +Admin_c_DeleteAcls_result_responses_to_py (const rd_kafka_DeleteAcls_result_response_t **c_result_responses, + size_t cnt) { + const rd_kafka_AclBinding_t **c_matching_acls; + size_t c_matching_acls_cnt; + PyObject *result; + PyObject *acl_bindings; + size_t i; + + result = PyList_New(cnt); + + for (i = 0 ; i < cnt ; i++) { + PyObject *error; + const rd_kafka_error_t *c_error = rd_kafka_DeleteAcls_result_response_error(c_result_responses[i]); + + if (c_error) { + error = KafkaError_new_or_None( + rd_kafka_error_code(c_error), + rd_kafka_error_string(c_error)); + PyList_SET_ITEM(result, i, error); + } else { + c_matching_acls = rd_kafka_DeleteAcls_result_response_matching_acls( + c_result_responses[i], + &c_matching_acls_cnt); + acl_bindings = Admin_c_AclBindings_to_py(c_matching_acls,c_matching_acls_cnt); + if (!acl_bindings) { + Py_DECREF(result); + return NULL; + } + PyList_SET_ITEM(result, i, acl_bindings); + } + } + + return result; +} /** * @brief Event callback triggered from librdkafka's background thread @@ -1299,6 +1852,7 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev, PyObject *error, *method, *ret; PyObject *result = NULL; PyObject *exctype = NULL, *exc = NULL, *excargs = NULL; + PyObject *type, *value, *traceback; /* Acquire GIL */ gstate = PyGILState_Ensure(); @@ -1383,6 +1937,72 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev, break; } + + case RD_KAFKA_EVENT_CREATEACLS_RESULT: + { + const rd_kafka_acl_result_t **c_acl_results; + size_t c_acl_results_cnt; + + c_acl_results = rd_kafka_CreateAcls_result_acls( + rd_kafka_event_CreateAcls_result(rkev), + &c_acl_results_cnt + ); + result = Admin_c_acl_result_to_py( + c_acl_results, + c_acl_results_cnt); + break; + } + + case RD_KAFKA_EVENT_DESCRIBEACLS_RESULT: + { + const rd_kafka_DescribeAcls_result_t *c_acl_result; + const rd_kafka_AclBinding_t **c_acls; + size_t c_acl_cnt; + + c_acl_result = rd_kafka_event_DescribeAcls_result(rkev); + + + c_acls = rd_kafka_DescribeAcls_result_acls( + c_acl_result, + &c_acl_cnt + ); + + result = Admin_c_AclBindings_to_py(c_acls, + c_acl_cnt); + if (!result) + { + PyErr_Fetch(&type, &value, &traceback); + error = value; + goto raise; + } + break; + } + + + case RD_KAFKA_EVENT_DELETEACLS_RESULT: + { + const rd_kafka_DeleteAcls_result_t *c_acl_result; + const rd_kafka_DeleteAcls_result_response_t **c_acl_result_responses; + size_t c_acl_results_cnt; + + c_acl_result = rd_kafka_event_DeleteAcls_result(rkev); + + c_acl_result_responses = rd_kafka_DeleteAcls_result_responses( + c_acl_result, + &c_acl_results_cnt + ); + + result = Admin_c_DeleteAcls_result_responses_to_py(c_acl_result_responses, + c_acl_results_cnt); + if (!result) + { + PyErr_Fetch(&type, &value, &traceback); + error = value; + goto raise; + } + break; + } + default: Py_DECREF(error); /* Py_None */ error = KafkaError_new0(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE, @@ -1473,7 +2093,7 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev, static int Admin_init (PyObject *selfobj, PyObject *args, PyObject *kwargs) { Handle *self = (Handle *)selfobj; - char errstr[256]; + char errstr[512]; rd_kafka_conf_t *conf; if (self->rk) { diff --git a/src/confluent_kafka/src/AdminTypes.c b/src/confluent_kafka/src/AdminTypes.c index 8a0dfadbb..9cb81a1c0 100644 --- a/src/confluent_kafka/src/AdminTypes.c +++ b/src/confluent_kafka/src/AdminTypes.c @@ -487,15 +487,7 @@ int AdminTypes_Ready (void) { } -/** - * @brief Add Admin types to module - */ -void AdminTypes_AddObjects (PyObject *m) { - Py_INCREF(&NewTopicType); - PyModule_AddObject(m, "NewTopic", (PyObject *)&NewTopicType); - Py_INCREF(&NewPartitionsType); - PyModule_AddObject(m, "NewPartitions", (PyObject *)&NewPartitionsType); - +static void AdminTypes_AddObjectsConfigSource (PyObject *m) { /* rd_kafka_ConfigSource_t */ PyModule_AddIntConstant(m, "CONFIG_SOURCE_UNKNOWN_CONFIG", RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG); @@ -509,7 +501,10 @@ void AdminTypes_AddObjects (PyObject *m) { RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG); PyModule_AddIntConstant(m, "CONFIG_SOURCE_DEFAULT_CONFIG", RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG); +} + +static void AdminTypes_AddObjectsResourceType (PyObject *m) { /* rd_kafka_ResourceType_t */ PyModule_AddIntConstant(m, "RESOURCE_UNKNOWN", RD_KAFKA_RESOURCE_UNKNOWN); PyModule_AddIntConstant(m, "RESOURCE_ANY", RD_KAFKA_RESOURCE_ANY); @@ -517,3 +512,53 @@ void AdminTypes_AddObjects (PyObject *m) { PyModule_AddIntConstant(m, "RESOURCE_GROUP", RD_KAFKA_RESOURCE_GROUP); PyModule_AddIntConstant(m, "RESOURCE_BROKER", RD_KAFKA_RESOURCE_BROKER); } + +static void AdminTypes_AddObjectsResourcePatternType (PyObject *m) { + /* rd_kafka_ResourcePatternType_t */ + PyModule_AddIntConstant(m, "RESOURCE_PATTERN_UNKNOWN", RD_KAFKA_RESOURCE_PATTERN_UNKNOWN); + PyModule_AddIntConstant(m, "RESOURCE_PATTERN_ANY", RD_KAFKA_RESOURCE_PATTERN_ANY); + PyModule_AddIntConstant(m, "RESOURCE_PATTERN_MATCH", RD_KAFKA_RESOURCE_PATTERN_MATCH); + PyModule_AddIntConstant(m, "RESOURCE_PATTERN_LITERAL", RD_KAFKA_RESOURCE_PATTERN_LITERAL); + PyModule_AddIntConstant(m, "RESOURCE_PATTERN_PREFIXED", RD_KAFKA_RESOURCE_PATTERN_PREFIXED); +} + +static void AdminTypes_AddObjectsAclOperation (PyObject *m) { + /* rd_kafka_AclOperation_t */ + PyModule_AddIntConstant(m, "ACL_OPERATION_UNKNOWN", RD_KAFKA_ACL_OPERATION_UNKNOWN); + PyModule_AddIntConstant(m, "ACL_OPERATION_ANY", RD_KAFKA_ACL_OPERATION_ANY); + PyModule_AddIntConstant(m, "ACL_OPERATION_ALL", RD_KAFKA_ACL_OPERATION_ALL); + PyModule_AddIntConstant(m, "ACL_OPERATION_READ", RD_KAFKA_ACL_OPERATION_READ); + PyModule_AddIntConstant(m, "ACL_OPERATION_WRITE", RD_KAFKA_ACL_OPERATION_WRITE); + PyModule_AddIntConstant(m, "ACL_OPERATION_CREATE", RD_KAFKA_ACL_OPERATION_CREATE); + PyModule_AddIntConstant(m, "ACL_OPERATION_DELETE", RD_KAFKA_ACL_OPERATION_DELETE); + PyModule_AddIntConstant(m, "ACL_OPERATION_ALTER", RD_KAFKA_ACL_OPERATION_ALTER); + PyModule_AddIntConstant(m, "ACL_OPERATION_DESCRIBE", RD_KAFKA_ACL_OPERATION_DESCRIBE); + PyModule_AddIntConstant(m, "ACL_OPERATION_CLUSTER_ACTION", RD_KAFKA_ACL_OPERATION_CLUSTER_ACTION); + PyModule_AddIntConstant(m, "ACL_OPERATION_DESCRIBE_CONFIGS", RD_KAFKA_ACL_OPERATION_DESCRIBE_CONFIGS); + PyModule_AddIntConstant(m, "ACL_OPERATION_ALTER_CONFIGS", RD_KAFKA_ACL_OPERATION_ALTER_CONFIGS); + PyModule_AddIntConstant(m, "ACL_OPERATION_IDEMPOTENT_WRITE", RD_KAFKA_ACL_OPERATION_IDEMPOTENT_WRITE); +} + +static void AdminTypes_AddObjectsAclPermissionType (PyObject *m) { + /* rd_kafka_AclPermissionType_t */ + PyModule_AddIntConstant(m, "ACL_PERMISSION_TYPE_UNKNOWN", RD_KAFKA_ACL_PERMISSION_TYPE_UNKNOWN); + PyModule_AddIntConstant(m, "ACL_PERMISSION_TYPE_ANY", RD_KAFKA_ACL_PERMISSION_TYPE_ANY); + PyModule_AddIntConstant(m, "ACL_PERMISSION_TYPE_DENY", RD_KAFKA_ACL_PERMISSION_TYPE_DENY); + PyModule_AddIntConstant(m, "ACL_PERMISSION_TYPE_ALLOW", RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW); +} + +/** + * @brief Add Admin types to module + */ +void AdminTypes_AddObjects (PyObject *m) { + Py_INCREF(&NewTopicType); + PyModule_AddObject(m, "NewTopic", (PyObject *)&NewTopicType); + Py_INCREF(&NewPartitionsType); + PyModule_AddObject(m, "NewPartitions", (PyObject *)&NewPartitionsType); + + AdminTypes_AddObjectsConfigSource(m); + AdminTypes_AddObjectsResourceType(m); + AdminTypes_AddObjectsResourcePatternType(m); + AdminTypes_AddObjectsAclOperation(m); + AdminTypes_AddObjectsAclPermissionType(m); +} diff --git a/src/confluent_kafka/src/confluent_kafka.c b/src/confluent_kafka/src/confluent_kafka.c index 3d12b1439..d57fb31c9 100644 --- a/src/confluent_kafka/src/confluent_kafka.c +++ b/src/confluent_kafka/src/confluent_kafka.c @@ -2255,7 +2255,7 @@ int cfl_PyObject_SetInt (PyObject *o, const char *name, int val) { */ int cfl_PyObject_GetAttr (PyObject *object, const char *attr_name, PyObject **valp, const PyTypeObject *py_type, - int required) { + int required, int allow_None) { PyObject *o; o = PyObject_GetAttrString(object, attr_name); @@ -2270,7 +2270,7 @@ int cfl_PyObject_GetAttr (PyObject *object, const char *attr_name, return 0; } - if (py_type && Py_TYPE(o) != py_type) { + if (!(allow_None && o == Py_None) && py_type && Py_TYPE(o) != py_type) { Py_DECREF(o); PyErr_Format(PyExc_TypeError, "Expected .%s to be %s type, not %s", @@ -2301,7 +2301,7 @@ int cfl_PyObject_GetInt (PyObject *object, const char *attr_name, int *valp, #else &PyInt_Type, #endif - required)) + required, 0)) return 0; if (!o) { @@ -2337,17 +2337,17 @@ int cfl_PyBool_get (PyObject *object, const char *name, int *valp) { return 1; } - /** * @brief Get attribute \p attr_name from \p object and make sure it is - * a string type. + * a string type or None if \p allow_None is 1 * * @returns 1 if \p valp was updated with a newly allocated copy of either the - * object value (UTF8), or \p defval. + * object value (UTF8), or \p defval or NULL if the attr is None * 0 if an exception was raised. */ int cfl_PyObject_GetString (PyObject *object, const char *attr_name, - char **valp, const char *defval, int required) { + char **valp, const char *defval, int required, + int allow_None) { PyObject *o, *uo, *uop; if (!cfl_PyObject_GetAttr(object, attr_name, &o, @@ -2359,7 +2359,7 @@ int cfl_PyObject_GetString (PyObject *object, const char *attr_name, * proper conversion below. */ NULL, #endif - required)) + required, allow_None)) return 0; if (!o) { @@ -2367,6 +2367,12 @@ int cfl_PyObject_GetString (PyObject *object, const char *attr_name, return 1; } + if (o == Py_None) { + Py_DECREF(o); + *valp = NULL; + return 1; + } + if (!(uo = cfl_PyObject_Unistr(o))) { Py_DECREF(o); PyErr_Format(PyExc_TypeError, diff --git a/src/confluent_kafka/src/confluent_kafka.h b/src/confluent_kafka/src/confluent_kafka.h index cd9cb21de..6658bab87 100644 --- a/src/confluent_kafka/src/confluent_kafka.h +++ b/src/confluent_kafka/src/confluent_kafka.h @@ -51,19 +51,19 @@ * Make sure to keep the MIN_RD_KAFKA_VERSION, MIN_VER_ERRSTR and #error * defines and strings in sync. */ -#define MIN_RD_KAFKA_VERSION 0x01060000 +#define MIN_RD_KAFKA_VERSION 0x010900ff #ifdef __APPLE__ -#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v1.6.0 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`" +#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v1.9.0 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`" #else -#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v1.6.0 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html" +#define MIN_VER_ERRSTR "confluent-kafka-python requires librdkafka v1.9.0 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html" #endif #if RD_KAFKA_VERSION < MIN_RD_KAFKA_VERSION #ifdef __APPLE__ -#error "confluent-kafka-python requires librdkafka v1.6.0 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`" +#error "confluent-kafka-python requires librdkafka v1.9.0 or later. Install the latest version of librdkafka from Homebrew by running `brew install librdkafka` or `brew upgrade librdkafka`" #else -#error "confluent-kafka-python requires librdkafka v1.6.0 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html" +#error "confluent-kafka-python requires librdkafka v1.9.0 or later. Install the latest version of librdkafka from the Confluent repositories, see http://docs.confluent.io/current/installation.html" #endif #endif @@ -328,11 +328,12 @@ int cfl_PyObject_SetString (PyObject *o, const char *name, const char *val); int cfl_PyObject_SetInt (PyObject *o, const char *name, int val); int cfl_PyObject_GetAttr (PyObject *object, const char *attr_name, PyObject **valp, const PyTypeObject *py_type, - int required); + int required, int allow_None); int cfl_PyObject_GetInt (PyObject *object, const char *attr_name, int *valp, int defval, int required); int cfl_PyObject_GetString (PyObject *object, const char *attr_name, - char **valp, const char *defval, int required); + char **valp, const char *defval, int required, + int allow_None); int cfl_PyBool_get (PyObject *object, const char *name, int *valp); PyObject *cfl_int32_array_to_py_list (const int32_t *arr, size_t cnt); diff --git a/tests/docker/docker-compose.yaml b/tests/docker/docker-compose.yaml index 66033c0cb..4a7f5c8c7 100644 --- a/tests/docker/docker-compose.yaml +++ b/tests/docker/docker-compose.yaml @@ -18,6 +18,8 @@ services: KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer + KAFKA_SUPER_USERS: "User:ANONYMOUS" schema-registry: image: confluentinc/cp-schema-registry:7.1.0 depends_on: diff --git a/tests/integration/admin/test_basic_operations.py b/tests/integration/admin/test_basic_operations.py new file mode 100644 index 000000000..3ee522513 --- /dev/null +++ b/tests/integration/admin/test_basic_operations.py @@ -0,0 +1,295 @@ +# -*- coding: utf-8 -*- +# Copyright 2022 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import confluent_kafka +import struct +import time +from confluent_kafka.admin import (NewPartitions, ConfigResource, + AclBinding, AclBindingFilter, ResourceType, + ResourcePatternType, AclOperation, AclPermissionType) +from confluent_kafka.error import ConsumeError + +topic_prefix = "test-topic" + + +# Shared between producer and consumer tests and used to verify +# that consumed headers are what was actually produced. +produce_headers = [('foo1', 'bar'), + ('foo1', 'bar2'), + ('foo2', b'1'), + (u'Jämtland', u'Härjedalen'), # automatically utf-8 encoded + ('nullheader', None), + ('empty', ''), + ('foobin', struct.pack('hhl', 10, 20, 30))] + + +def verify_commit_result(err, partitions): + assert err is not None + + +def verify_admin_acls(admin_client, + topic, + group): + + # + # Add three ACLs + # + acl_binding_1 = AclBinding(ResourceType.TOPIC, topic, ResourcePatternType.LITERAL, + "User:test-user-1", "*", AclOperation.READ, AclPermissionType.ALLOW) + acl_binding_2 = AclBinding(ResourceType.TOPIC, topic, ResourcePatternType.PREFIXED, + "User:test-user-2", "*", AclOperation.WRITE, AclPermissionType.DENY) + acl_binding_3 = AclBinding(ResourceType.GROUP, group, ResourcePatternType.PREFIXED, + "User:test-user-2", "*", AclOperation.ALL, AclPermissionType.ALLOW) + + fs = admin_client.create_acls([acl_binding_1, acl_binding_2, acl_binding_3]) + for acl_binding, f in fs.items(): + f.result() # trigger exception if there was an error + + acl_binding_filter1 = AclBindingFilter(ResourceType.ANY, None, ResourcePatternType.ANY, + None, None, AclOperation.ANY, AclPermissionType.ANY) + acl_binding_filter2 = AclBindingFilter(ResourceType.ANY, None, ResourcePatternType.PREFIXED, + None, None, AclOperation.ANY, AclPermissionType.ANY) + acl_binding_filter3 = AclBindingFilter(ResourceType.TOPIC, None, ResourcePatternType.ANY, + None, None, AclOperation.ANY, AclPermissionType.ANY) + acl_binding_filter4 = AclBindingFilter(ResourceType.GROUP, None, ResourcePatternType.ANY, + None, None, AclOperation.ANY, AclPermissionType.ANY) + + expected_acl_bindings = [acl_binding_1, acl_binding_2, acl_binding_3] + acl_bindings = admin_client.describe_acls(acl_binding_filter1).result() + assert sorted(acl_bindings) == sorted(expected_acl_bindings), \ + "ACL bindings don't match, actual: {} expected: {}".format(acl_bindings, + expected_acl_bindings) + + # + # Delete the ACLs with PREFIXED + # + expected_acl_bindings = [acl_binding_2, acl_binding_3] + fs = admin_client.delete_acls([acl_binding_filter2]) + deleted_acl_bindings = sorted(fs[acl_binding_filter2].result()) + assert deleted_acl_bindings == expected_acl_bindings, \ + "Deleted ACL bindings don't match, actual {} expected {}".format(deleted_acl_bindings, + expected_acl_bindings) + + # + # Delete the ACLs with TOPIC and GROUP + # + expected_acl_bindings = [[acl_binding_1], []] + delete_acl_binding_filters = [acl_binding_filter3, acl_binding_filter4] + fs = admin_client.delete_acls(delete_acl_binding_filters) + for acl_binding, expected in zip(delete_acl_binding_filters, expected_acl_bindings): + deleted_acl_bindings = sorted(fs[acl_binding].result()) + assert deleted_acl_bindings == expected, \ + "Deleted ACL bindings don't match, actual {} expected {}".format(deleted_acl_bindings, + expected) + # + # All the ACLs should have been deleted + # + expected_acl_bindings = [] + acl_bindings = admin_client.describe_acls(acl_binding_filter1).result() + assert acl_bindings == expected_acl_bindings, \ + "ACL bindings don't match, actual: {} expected: {}".format(acl_bindings, + expected_acl_bindings) + + +def verify_topic_metadata(client, exp_topics, *args, **kwargs): + """ + Verify that exp_topics (dict) is reported in metadata. + Will retry and wait for some time to let changes propagate. + + Non-controller brokers may return the previous partition count for some + time before being updated, in this case simply retry. + """ + + for retry in range(0, 3): + do_retry = 0 + + md = client.list_topics(*args, **kwargs) + + for exptopic, exppartcnt in exp_topics.items(): + if exptopic not in md.topics: + print("Topic {} not yet reported in metadata: retrying".format(exptopic)) + do_retry += 1 + continue + + if len(md.topics[exptopic].partitions) < exppartcnt: + print("Topic {} partition count not yet updated ({} != expected {}): retrying".format( + exptopic, len(md.topics[exptopic].partitions), exppartcnt)) + do_retry += 1 + continue + + assert len(md.topics[exptopic].partitions) == exppartcnt, \ + "Expected {} partitions for topic {}, not {}".format( + exppartcnt, exptopic, md.topics[exptopic].partitions) + + if do_retry == 0: + return # All topics okay. + + time.sleep(1) + + +def test_basic_operations(kafka_cluster): + num_partitions = 2 + topic_config = {"compression.type": "gzip"} + + # + # First iteration: validate our_topic creation. + # Second iteration: create topic. + # + for validate in (True, False): + our_topic = kafka_cluster.create_topic(topic_prefix, + { + "num_partitions": num_partitions, + "config": topic_config, + "replication_factor": 1, + }, + validate_only=validate + ) + + admin_client = kafka_cluster.admin() + + # + # Find the topic in list_topics + # + verify_topic_metadata(admin_client, {our_topic: num_partitions}) + verify_topic_metadata(admin_client, {our_topic: num_partitions}, topic=our_topic) + verify_topic_metadata(admin_client, {our_topic: num_partitions}, our_topic) + + # + # Increase the partition count + # + num_partitions += 3 + fs = admin_client.create_partitions([NewPartitions(our_topic, + new_total_count=num_partitions)], + operation_timeout=10.0) + + for topic2, f in fs.items(): + f.result() # trigger exception if there was an error + + # + # Verify with list_topics. + # + verify_topic_metadata(admin_client, {our_topic: num_partitions}) + + # + # Verify with list_groups. + # + + # Produce some messages + p = kafka_cluster.producer() + p.produce(our_topic, 'Hello Python!', headers=produce_headers) + p.produce(our_topic, key='Just a key and headers', headers=produce_headers) + + def consume_messages(group_id, num_messages=None): + # Consume messages + conf = {'group.id': group_id, + 'session.timeout.ms': 6000, + 'enable.auto.commit': False, + 'on_commit': verify_commit_result, + 'auto.offset.reset': 'earliest', + 'enable.partition.eof': True} + c = kafka_cluster.consumer(conf) + c.subscribe([our_topic]) + eof_reached = dict() + read_messages = 0 + msg = None + while True: + try: + msg = c.poll() + if msg is None: + raise Exception('Got timeout from poll() without a timeout set: %s' % msg) + # Commit offset + c.commit(msg, asynchronous=False) + read_messages += 1 + if num_messages is not None and read_messages == num_messages: + print('Read all the required messages: exiting') + break + except ConsumeError as e: + if msg is not None and e.code == confluent_kafka.KafkaError._PARTITION_EOF: + print('Reached end of %s [%d] at offset %d' % ( + msg.topic(), msg.partition(), msg.offset())) + eof_reached[(msg.topic(), msg.partition())] = True + if len(eof_reached) == len(c.assignment()): + print('EOF reached for all assigned partitions: exiting') + break + else: + print('Consumer error: %s: ignoring' % str(e)) + break + + group1 = 'test-group-1' + group2 = 'test-group-2' + consume_messages(group1, 2) + consume_messages(group2, 2) + # list_groups without group argument + groups = set(group.id for group in admin_client.list_groups(timeout=10)) + assert group1 in groups, "Consumer group {} not found".format(group1) + assert group2 in groups, "Consumer group {} not found".format(group2) + # list_groups with group argument + groups = set(group.id for group in admin_client.list_groups(group1)) + assert group1 in groups, "Consumer group {} not found".format(group1) + groups = set(group.id for group in admin_client.list_groups(group2)) + assert group2 in groups, "Consumer group {} not found".format(group2) + + def verify_config(expconfig, configs): + """ + Verify that the config key,values in expconfig are found + and matches the ConfigEntry in configs. + """ + for key, expvalue in expconfig.items(): + entry = configs.get(key, None) + assert entry is not None, "Config {} not found in returned configs".format(key) + + assert entry.value == str(expvalue), \ + "Config {} with value {} does not match expected value {}".format(key, entry, expvalue) + + # + # Get current topic config + # + resource = ConfigResource(ResourceType.TOPIC, our_topic) + fs = admin_client.describe_configs([resource]) + configs = fs[resource].result() # will raise exception on failure + + # Verify config matches our expectations + verify_config(topic_config, configs) + + # + # Now change the config. + # + topic_config["file.delete.delay.ms"] = 12345 + topic_config["compression.type"] = "snappy" + + for key, value in topic_config.items(): + resource.set_config(key, value) + + fs = admin_client.alter_configs([resource]) + fs[resource].result() # will raise exception on failure + + # + # Read the config back again and verify. + # + fs = admin_client.describe_configs([resource]) + configs = fs[resource].result() # will raise exception on failure + + # Verify config matches our expectations + verify_config(topic_config, configs) + + # Verify ACL operations + verify_admin_acls(admin_client, our_topic, group1) + + # + # Delete the topic + # + fs = admin_client.delete_topics([our_topic]) + fs[our_topic].result() # will raise exception on failure + print("Topic {} marked for deletion".format(our_topic)) diff --git a/tests/integration/cluster_fixture.py b/tests/integration/cluster_fixture.py index f99bb8b13..832d13475 100644 --- a/tests/integration/cluster_fixture.py +++ b/tests/integration/cluster_fixture.py @@ -140,7 +140,12 @@ def consumer(self, conf=None, key_deserializer=None, value_deserializer=None): return DeserializingConsumer(consumer_conf) - def create_topic(self, prefix, conf=None): + def admin(self): + if self._admin is None: + self._admin = AdminClient(self.client_conf()) + return self._admin + + def create_topic(self, prefix, conf=None, **create_topic_kwargs): """ Creates a new topic with this cluster. @@ -149,12 +154,10 @@ def create_topic(self, prefix, conf=None): :returns: The topic's name :rtype: str """ - if self._admin is None: - self._admin = AdminClient(self.client_conf()) - name = prefix + "-" + str(uuid1()) - future_topic = self._admin.create_topics([NewTopic(name, - **self._topic_conf(conf))]) + future_topic = self.admin().create_topics([NewTopic(name, + **self._topic_conf(conf))], + **create_topic_kwargs) future_topic.get(name).result() return name diff --git a/tests/integration/integration_test.py b/tests/integration/integration_test.py index dc6caebd1..d7aff262e 100755 --- a/tests/integration/integration_test.py +++ b/tests/integration/integration_test.py @@ -21,7 +21,6 @@ """ Test script for confluent_kafka module """ import confluent_kafka -from confluent_kafka import admin import os import time import uuid @@ -812,10 +811,10 @@ def verify_avro_basic_auth(mode_conf): } base_conf = { - 'bootstrap.servers': bootstrap_servers, - 'error_cb': error_cb, - 'schema.registry.url': schema_registry_url - } + 'bootstrap.servers': bootstrap_servers, + 'error_cb': error_cb, + 'schema.registry.url': schema_registry_url + } consumer_conf = dict({'group.id': generate_group_id(), 'session.timeout.ms': 6000, @@ -1029,7 +1028,7 @@ def stats_cb(stats_json_str): c.close() -def verify_topic_metadata(client, exp_topics): +def verify_topic_metadata(client, exp_topics, *args, **kwargs): """ Verify that exp_topics (dict) is reported in metadata. Will retry and wait for some time to let changes propagate. @@ -1041,7 +1040,7 @@ def verify_topic_metadata(client, exp_topics): for retry in range(0, 3): do_retry = 0 - md = client.list_topics() + md = client.list_topics(*args, **kwargs) for exptopic, exppartcnt in exp_topics.items(): if exptopic not in md.topics: @@ -1067,157 +1066,6 @@ def verify_topic_metadata(client, exp_topics): raise Exception("Timed out waiting for topics {} in metadata".format(exp_topics)) -def verify_admin(): - """ Verify Admin API """ - - a = admin.AdminClient({'bootstrap.servers': bootstrap_servers}) - our_topic = topic + '_admin_' + str(uuid.uuid4()) - num_partitions = 2 - - topic_config = {"compression.type": "gzip"} - - # - # First iteration: validate our_topic creation. - # Second iteration: create topic. - # - for validate in (True, False): - fs = a.create_topics([admin.NewTopic(our_topic, - num_partitions=num_partitions, - config=topic_config, - replication_factor=1)], - validate_only=validate, - operation_timeout=10.0) - - for topic2, f in fs.items(): - f.result() # trigger exception if there was an error - - # - # Find the topic in list_topics - # - verify_topic_metadata(a, {our_topic: num_partitions}) - - # - # Increase the partition count - # - num_partitions += 3 - fs = a.create_partitions([admin.NewPartitions(our_topic, - new_total_count=num_partitions)], - operation_timeout=10.0) - - for topic2, f in fs.items(): - f.result() # trigger exception if there was an error - - # - # Verify with list_topics. - # - verify_topic_metadata(a, {our_topic: num_partitions}) - - # - # Verify with list_groups. - # - - # Produce some messages - p = confluent_kafka.Producer({"bootstrap.servers": bootstrap_servers}) - p.produce(our_topic, 'Hello Python!', headers=produce_headers) - p.produce(our_topic, key='Just a key and headers', headers=produce_headers) - - def consume_messages(group_id): - # Consume messages - conf = {'bootstrap.servers': bootstrap_servers, - 'group.id': group_id, - 'session.timeout.ms': 6000, - 'enable.auto.commit': False, - 'on_commit': print_commit_result, - 'error_cb': error_cb, - 'auto.offset.reset': 'earliest', - 'enable.partition.eof': True} - c = confluent_kafka.Consumer(conf) - c.subscribe([our_topic]) - eof_reached = dict() - while True: - msg = c.poll() - if msg is None: - raise Exception('Got timeout from poll() without a timeout set: %s' % msg) - - if msg.error(): - if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF: - print('Reached end of %s [%d] at offset %d' % ( - msg.topic(), msg.partition(), msg.offset())) - eof_reached[(msg.topic(), msg.partition())] = True - if len(eof_reached) == len(c.assignment()): - print('EOF reached for all assigned partitions: exiting') - break - else: - print('Consumer error: %s: ignoring' % msg.error()) - break - # Commit offset - c.commit(msg, asynchronous=False) - - group1 = 'test-group-1' - group2 = 'test-group-2' - consume_messages(group1) - consume_messages(group2) - # list_groups without group argument - groups = set(group.id for group in a.list_groups(timeout=10)) - assert group1 in groups, "Consumer group {} not found".format(group1) - assert group2 in groups, "Consumer group {} not found".format(group2) - # list_groups with group argument - groups = set(group.id for group in a.list_groups(group1)) - assert group1 in groups, "Consumer group {} not found".format(group1) - groups = set(group.id for group in a.list_groups(group2)) - assert group2 in groups, "Consumer group {} not found".format(group2) - - def verify_config(expconfig, configs): - """ - Verify that the config key,values in expconfig are found - and matches the ConfigEntry in configs. - """ - for key, expvalue in expconfig.items(): - entry = configs.get(key, None) - assert entry is not None, "Config {} not found in returned configs".format(key) - - assert entry.value == str(expvalue), \ - "Config {} with value {} does not match expected value {}".format(key, entry, expvalue) - - # - # Get current topic config - # - resource = admin.ConfigResource(admin.RESOURCE_TOPIC, our_topic) - fs = a.describe_configs([resource]) - configs = fs[resource].result() # will raise exception on failure - - # Verify config matches our expectations - verify_config(topic_config, configs) - - # - # Now change the config. - # - topic_config["file.delete.delay.ms"] = 12345 - topic_config["compression.type"] = "snappy" - - for key, value in topic_config.items(): - resource.set_config(key, value) - - fs = a.alter_configs([resource]) - fs[resource].result() # will raise exception on failure - - # - # Read the config back again and verify. - # - fs = a.describe_configs([resource]) - configs = fs[resource].result() # will raise exception on failure - - # Verify config matches our expectations - verify_config(topic_config, configs) - - # - # Delete the topic - # - fs = a.delete_topics([our_topic]) - fs[our_topic].result() # will raise exception on failure - print("Topic {} marked for deletion".format(our_topic)) - - def verify_avro_explicit_read_schema(): from confluent_kafka import avro @@ -1406,10 +1254,6 @@ def print_usage(exitcode, reason=None): print("=" * 30, 'Verifying AVRO with Basic Auth', '=' * 30) verify_avro_basic_auth(testconf.get('avro-basic-auth', None)) - if 'admin' in modes: - print('=' * 30, 'Verifying Admin API', '=' * 30) - verify_admin() - print('=' * 30, 'Done', '=' * 30) if with_pympler: diff --git a/tests/requirements.txt b/tests/requirements.txt index a55e30040..be14dbb4a 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -5,6 +5,6 @@ pytest-timeout requests-mock trivup>=0.8.3 fastavro -avro +avro==1.10.0 jsonschema protobuf diff --git a/tests/test_Admin.py b/tests/test_Admin.py index c4a80b011..8e44900a5 100644 --- a/tests/test_Admin.py +++ b/tests/test_Admin.py @@ -1,21 +1,74 @@ #!/usr/bin/env python import pytest -from confluent_kafka.admin import AdminClient, NewTopic, NewPartitions, ConfigResource +from confluent_kafka.admin import AdminClient, NewTopic, NewPartitions, \ + ConfigResource, AclBinding, AclBindingFilter, ResourceType, ResourcePatternType, \ + AclOperation, AclPermissionType from confluent_kafka import KafkaException, KafkaError, libversion -import confluent_kafka import concurrent.futures def test_types(): - ConfigResource(confluent_kafka.admin.RESOURCE_BROKER, "2") + ConfigResource(ResourceType.BROKER, "2") ConfigResource("broker", "2") - ConfigResource(confluent_kafka.admin.RESOURCE_GROUP, "mygroup") - ConfigResource(confluent_kafka.admin.RESOURCE_TOPIC, "") + ConfigResource(ResourceType.GROUP, "mygroup") + ConfigResource(ResourceType.TOPIC, "") with pytest.raises(ValueError): ConfigResource("doesnt exist", "hi") with pytest.raises(ValueError): - ConfigResource(confluent_kafka.admin.RESOURCE_TOPIC, None) + ConfigResource(ResourceType.TOPIC, None) + + +def test_acl_binding_type(): + attrs = [ResourceType.TOPIC, "topic", ResourcePatternType.LITERAL, + "User:u1", "*", AclOperation.WRITE, AclPermissionType.ALLOW] + + attrs_nullable_acl_binding_filter = [1, 3, 4] + + # at first it creates correctly + AclBinding(*attrs) + for i, _ in enumerate(attrs): + + # no attribute is nullable + attrs_copy = list(attrs) + attrs_copy[i] = None + with pytest.raises(ValueError): + AclBinding(*attrs_copy) + + # string attributes of AclBindingFilter are nullable + if i in attrs_nullable_acl_binding_filter: + AclBindingFilter(*attrs_copy) + else: + with pytest.raises(ValueError): + AclBindingFilter(*attrs_copy) + + for (attr_num, attr_value) in [ + (0, ResourceType.ANY), + (2, ResourcePatternType.ANY), + (2, ResourcePatternType.MATCH), + (5, AclOperation.ANY), + (6, AclPermissionType.ANY), + ]: + attrs_copy = list(attrs) + attrs_copy[attr_num] = attr_value + # forbidden enums in AclBinding + with pytest.raises(ValueError): + AclBinding(*attrs_copy) + + # AclBindingFilter can hold all the enum values + AclBindingFilter(*attrs_copy) + + # UNKNOWN values are not forbidden, for received values + for (attr_num, attr_value) in [ + (0, ResourceType.UNKNOWN), + (2, ResourcePatternType.UNKNOWN), + (2, ResourcePatternType.UNKNOWN), + (5, AclOperation.UNKNOWN), + (6, AclPermissionType.UNKNOWN), + ]: + attrs_copy = list(attrs) + attrs_copy[attr_num] = attr_value + AclBinding(*attrs_copy) @pytest.mark.skipif(libversion()[1] < 0x000b0500, @@ -206,7 +259,7 @@ def test_describe_configs_api(): is no broker configured. """ a = AdminClient({"socket.timeout.ms": 10}) - fs = a.describe_configs([ConfigResource(confluent_kafka.admin.RESOURCE_BROKER, "3")]) + fs = a.describe_configs([ConfigResource(ResourceType.BROKER, "3")]) # ignore the result with pytest.raises(Exception): @@ -219,10 +272,10 @@ def test_describe_configs_api(): a.describe_configs([]) with pytest.raises(ValueError): - a.describe_configs([None, ConfigResource(confluent_kafka.admin.RESOURCE_TOPIC, "mytopic")]) + a.describe_configs([None, ConfigResource(ResourceType.TOPIC, "mytopic")]) - fs = a.describe_configs([ConfigResource(confluent_kafka.admin.RESOURCE_TOPIC, "mytopic"), - ConfigResource(confluent_kafka.admin.RESOURCE_GROUP, "mygroup")], + fs = a.describe_configs([ConfigResource(ResourceType.TOPIC, "mytopic"), + ConfigResource(ResourceType.GROUP, "mygroup")], request_timeout=0.123) with pytest.raises(KafkaException): for f in concurrent.futures.as_completed(iter(fs.values())): @@ -236,7 +289,7 @@ def test_alter_configs_api(): is no broker configured. """ a = AdminClient({"socket.timeout.ms": 10}) - fs = a.alter_configs([ConfigResource(confluent_kafka.admin.RESOURCE_BROKER, "3", + fs = a.alter_configs([ConfigResource(ResourceType.BROKER, "3", set_config={"some": "config"})]) # ignore the result @@ -252,10 +305,147 @@ def test_alter_configs_api(): fs = a.alter_configs([ConfigResource("topic", "mytopic", set_config={"set": "this", "and": "this"}), - ConfigResource(confluent_kafka.admin.RESOURCE_GROUP, + ConfigResource(ResourceType.GROUP, "mygroup")], request_timeout=0.123) with pytest.raises(KafkaException): for f in concurrent.futures.as_completed(iter(fs.values())): f.result(timeout=1) + + +def test_create_acls_api(): + """ create_acls() tests, these wont really do anything since there is no + broker configured. """ + + a = AdminClient({"socket.timeout.ms": 10}) + + acl_binding1 = AclBinding(ResourceType.TOPIC, "topic1", ResourcePatternType.LITERAL, + "User:u1", "*", AclOperation.WRITE, AclPermissionType.ALLOW) + acl_binding2 = AclBinding(ResourceType.TOPIC, "topic2", ResourcePatternType.LITERAL, + "User:u2", "*", AclOperation.READ, AclPermissionType.DENY) + + f = a.create_acls([acl_binding1], + request_timeout=10.0) + # ignore the result + + with pytest.raises(TypeError): + a.create_acls(None) + + with pytest.raises(ValueError): + a.create_acls("topic") + + with pytest.raises(ValueError): + a.create_acls([]) + + with pytest.raises(ValueError): + a.create_acls(["topic"]) + + with pytest.raises(ValueError): + a.create_acls([None, "topic"]) + + with pytest.raises(ValueError): + a.create_acls([None, acl_binding1]) + + fs = a.create_acls([acl_binding1, acl_binding2]) + with pytest.raises(KafkaException): + for f in fs.values(): + f.result(timeout=1) + + fs = a.create_acls([acl_binding1, acl_binding2], + request_timeout=0.5) + for f in concurrent.futures.as_completed(iter(fs.values())): + e = f.exception(timeout=1) + assert isinstance(e, KafkaException) + assert e.args[0].code() == KafkaError._TIMED_OUT + + with pytest.raises(ValueError): + a.create_acls([acl_binding1], + request_timeout=-5) + + with pytest.raises(TypeError): + a.create_acls([acl_binding1], + unknown_operation="it is") + + +def test_delete_acls_api(): + """ delete_acls() tests, these wont really do anything since there is no + broker configured. """ + + a = AdminClient({"socket.timeout.ms": 10}) + + acl_binding_filter1 = AclBindingFilter(ResourceType.ANY, None, ResourcePatternType.ANY, + None, None, AclOperation.ANY, AclPermissionType.ANY) + acl_binding_filter2 = AclBindingFilter(ResourceType.ANY, "topic2", ResourcePatternType.MATCH, + None, "*", AclOperation.WRITE, AclPermissionType.ALLOW) + + fs = a.delete_acls([acl_binding_filter1]) + # ignore the result + + with pytest.raises(TypeError): + a.delete_acls(None) + + with pytest.raises(ValueError): + a.delete_acls([]) + + with pytest.raises(ValueError): + a.delete_acls([None, acl_binding_filter1]) + + fs = a.delete_acls([acl_binding_filter1, acl_binding_filter2]) + with pytest.raises(KafkaException): + for f in concurrent.futures.as_completed(iter(fs.values())): + f.result(timeout=1) + + fs = a.delete_acls([acl_binding_filter1, acl_binding_filter2], + request_timeout=0.5) + for f in concurrent.futures.as_completed(iter(fs.values())): + e = f.exception(timeout=1) + assert isinstance(e, KafkaException) + assert e.args[0].code() == KafkaError._TIMED_OUT + + with pytest.raises(ValueError): + a.create_acls([acl_binding_filter1], + request_timeout=-5) + + with pytest.raises(TypeError): + a.delete_acls([acl_binding_filter1], + unknown_operation="it is") + + +def test_describe_acls_api(): + """ describe_acls() tests, these wont really do anything since there is no + broker configured. """ + + a = AdminClient({"socket.timeout.ms": 10}) + + acl_binding_filter1 = AclBindingFilter(ResourceType.ANY, None, ResourcePatternType.ANY, + None, None, AclOperation.ANY, AclPermissionType.ANY) + acl_binding1 = AclBinding(ResourceType.TOPIC, "topic1", ResourcePatternType.LITERAL, + "User:u1", "*", AclOperation.WRITE, AclPermissionType.ALLOW) + + a.describe_acls(acl_binding_filter1) + # ignore the result + + with pytest.raises(TypeError): + a.describe_acls(None) + + with pytest.raises(TypeError): + a.describe_acls(acl_binding1) + + f = a.describe_acls(acl_binding_filter1) + with pytest.raises(KafkaException): + f.result(timeout=1) + + f = a.describe_acls(acl_binding_filter1, + request_timeout=0.5) + e = f.exception(timeout=1) + assert isinstance(e, KafkaException) + assert e.args[0].code() == KafkaError._TIMED_OUT + + with pytest.raises(ValueError): + a.describe_acls(acl_binding_filter1, + request_timeout=-5) + + with pytest.raises(TypeError): + a.describe_acls(acl_binding_filter1, + unknown_operation="it is")