Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ ResourceType

.. _pythonclient_resource_pattern_type:

**************
*******************
ResourcePatternType
**************
*******************

.. autoclass:: confluent_kafka.admin.ResourcePatternType
:members:
Expand All @@ -154,9 +154,9 @@ AclOperation

.. _pythonclient_acl_permission_type:

**************
*****************
AclPermissionType
**************
*****************

.. autoclass:: confluent_kafka.admin.AclPermissionType
:members:
Expand All @@ -172,9 +172,9 @@ AclBinding

.. _pythonclient_acl_binding_filter:

**************
****************
AclBindingFilter
**************
****************

.. autoclass:: confluent_kafka.admin.AclBindingFilter
:members:
Expand Down
12 changes: 10 additions & 2 deletions examples/adminapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,11 @@ def example_create_acls(a, args):
)
]

fs = a.create_acls(acl_bindings, request_timeout=10)
try:
fs = a.create_acls(acl_bindings, request_timeout=10)
except ValueError as e:
print(f"create_acls() failed: {e}")
return

# Wait for operation to finish.
for res, f in fs.items():
Expand Down Expand Up @@ -237,7 +241,11 @@ def example_delete_acls(a, args):
)
]

fs = a.delete_acls(acl_binding_filters, request_timeout=10)
try:
fs = a.delete_acls(acl_binding_filters, request_timeout=10)
except ValueError as e:
print(f"delete_acls() failed: {e}")
return

# Wait for operation to finish.
for res, f in fs.items():
Expand Down
16 changes: 12 additions & 4 deletions src/confluent_kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ def _make_futures(futmap_keys, class_check, make_result_fn):

return f, futmap

@staticmethod
def _has_duplicates(items):
return len(set(items)) != len(items)

def create_topics(self, new_topics, **kwargs):
"""
Create one or more new topics.
Expand Down Expand Up @@ -365,7 +369,7 @@ def create_acls(self, acls, **kwargs):
"""
Create one or more ACL bindings.

:param list(AclBinding) acls: A list of ACL binding specifications (:class:`.AclBinding`)
:param list(AclBinding) acls: A list of unique ACL binding specifications (:class:`.AclBinding`)
to create.
:param float request_timeout: The overall request timeout in seconds,
including broker lookup, request transmission, operation time
Expand All @@ -380,6 +384,8 @@ def create_acls(self, acls, **kwargs):
:raises TypeException: Invalid input.
:raises ValueException: Invalid input.
"""
if AdminClient._has_duplicates(acls):
raise ValueError("duplicate ACL bindings not allowed")

f, futmap = AdminClient._make_futures(acls, AclBinding,
AdminClient._make_acls_result)
Expand All @@ -395,7 +401,7 @@ def describe_acls(self, acl_binding_filter, **kwargs):
:param AclBindingFilter acl_binding_filter: a filter with attributes that
must match.
String attributes match exact values or any string if set to None.
Enums attributes match exact values or any value if ending with `_ANY`.
Enums attributes match exact values or any value if equal to `ANY`.
If :class:`ResourcePatternType` is set to :attr:`ResourcePatternType.MATCH` returns all
the ACL bindings with :attr:`ResourcePatternType.LITERAL`,
:attr:`ResourcePatternType.WILDCARD` or :attr:`ResourcePatternType.PREFIXED` pattern
Expand Down Expand Up @@ -423,10 +429,10 @@ def delete_acls(self, acl_binding_filters, **kwargs):
"""
Delete ACL bindings matching one or more ACL binding filters.

:param list(AclBindingFilter) acl_binding_filters: a list of ACL binding filters
:param list(AclBindingFilter) acl_binding_filters: a list of unique ACL binding filters
to match ACLs to delete.
String attributes match exact values or any string if set to None.
Enums attributes match exact values or any value if ending with `_ANY`.
Enums attributes match exact values or any value if equal to `ANY`.
If :class:`ResourcePatternType` is set to :attr:`ResourcePatternType.MATCH`
deletes all the ACL bindings with :attr:`ResourcePatternType.LITERAL`,
:attr:`ResourcePatternType.WILDCARD` or :attr:`ResourcePatternType.PREFIXED`
Expand All @@ -444,6 +450,8 @@ def delete_acls(self, acl_binding_filters, **kwargs):
:raises TypeException: Invalid input.
:raises ValueException: Invalid input.
"""
if AdminClient._has_duplicates(acl_binding_filters):
raise ValueError("duplicate ACL binding filters not allowed")

f, futmap = AdminClient._make_futures(acl_binding_filters, AclBindingFilter,
AdminClient._make_acls_result)
Expand Down
6 changes: 6 additions & 0 deletions tests/test_Admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,9 @@ def test_create_acls_api():
with pytest.raises(ValueError):
a.create_acls([None, acl_binding1])

with pytest.raises(ValueError):
a.create_acls([acl_binding1, acl_binding1])

fs = a.create_acls([acl_binding1, acl_binding2])
with pytest.raises(KafkaException):
for f in fs.values():
Expand Down Expand Up @@ -391,6 +394,9 @@ def test_delete_acls_api():
with pytest.raises(ValueError):
a.delete_acls([None, acl_binding_filter1])

with pytest.raises(ValueError):
a.delete_acls([acl_binding_filter1, acl_binding_filter1])

fs = a.delete_acls([acl_binding_filter1, acl_binding_filter2])
with pytest.raises(KafkaException):
for f in concurrent.futures.as_completed(iter(fs.values())):
Expand Down