Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
This is a feature release:

- OAUTHBEARER OIDC support
- KIP-140 Admin API ACL support

### Fixes

Expand Down
90 changes: 90 additions & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ Supporting classes
- Admin API
- :ref:`NewTopic <pyclient_admin_newtopic>`
- :ref:`NewPartitions <pyclient_admin_newpartitions>`
- :ref:`ConfigSource <pythonclient_config_source>`
- :ref:`ConfigEntry <pythonclient_config_entry>`
- :ref:`ConfigResource <pythonclient_config_resource>`
- :ref:`ResourceType <pythonclient_resource_type>`
- :ref:`ResourcePatternType <pythonclient_resource_pattern_type>`
- :ref:`AclOperation <pythonclient_acl_operation>`
- :ref:`AclPermissionType <pythonclient_acl_permission_type>`
- :ref:`AclBinding <pythonclient_acl_binding>`
- :ref:`AclBindingFilter <pythonclient_acl_binding_filter>`

Guide to the :ref:`Transactional Producer API <pythonclient_transactional>`

Expand Down Expand Up @@ -89,6 +98,87 @@ NewPartitions
.. autoclass:: confluent_kafka.admin.NewPartitions
:members:

.. _pythonclient_config_source:

**************
ConfigSource
**************

.. autoclass:: confluent_kafka.admin.ConfigSource
:members:

.. _pythonclient_config_entry:

**************
ConfigEntry
**************

.. autoclass:: confluent_kafka.admin.ConfigEntry
:members:

.. _pythonclient_config_resource:

**************
ConfigResource
**************

.. autoclass:: confluent_kafka.admin.ConfigResource
:members:

.. _pythonclient_resource_type:

**************
ResourceType
**************

.. autoclass:: confluent_kafka.admin.ResourceType
:members:

.. _pythonclient_resource_pattern_type:

**************
ResourcePatternType
**************

.. autoclass:: confluent_kafka.admin.ResourcePatternType
:members:

.. _pythonclient_acl_operation:

**************
AclOperation
**************

.. autoclass:: confluent_kafka.admin.AclOperation
:members:

.. _pythonclient_acl_permission_type:

**************
AclPermissionType
**************

.. autoclass:: confluent_kafka.admin.AclPermissionType
:members:

.. _pythonclient_acl_binding:

**************
AclBinding
**************

.. autoclass:: confluent_kafka.admin.AclBinding
:members:

.. _pythonclient_acl_binding_filter:

**************
AclBindingFilter
**************

.. autoclass:: confluent_kafka.admin.AclBindingFilter
:members:

.. _pythonclient_consumer:

********
Expand Down
147 changes: 146 additions & 1 deletion examples/adminapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
# Example Admin clients.
#

from confluent_kafka.admin import AdminClient, NewTopic, NewPartitions, ConfigResource, ConfigSource
from confluent_kafka.admin import (AdminClient, NewTopic, NewPartitions, ConfigResource, ConfigSource,
AclBinding, AclBindingFilter, ResourceType, ResourcePatternType,
AclOperation, AclPermissionType)
from confluent_kafka import KafkaException
import sys
import threading
Expand All @@ -28,6 +30,13 @@
logging.basicConfig()


def parse_nullable_string(s):
if s == "None":
return None
else:
return s


def example_create_topics(a, topics):
""" Create topics """

Expand Down Expand Up @@ -117,6 +126,133 @@ def example_describe_configs(a, args):
raise


def example_create_acls(a, args):
""" create acls """

acl_bindings = [
AclBinding(
ResourceType[restype],
parse_nullable_string(resname),
ResourcePatternType[resource_pattern_type],
parse_nullable_string(principal),
parse_nullable_string(host),
AclOperation[operation],
AclPermissionType[permission_type]
)
for restype, resname, resource_pattern_type,
principal, host, operation, permission_type
in zip(
args[0::7],
args[1::7],
args[2::7],
args[3::7],
args[4::7],
args[5::7],
args[6::7],
)
]

fs = a.create_acls(acl_bindings, request_timeout=10)

# Wait for operation to finish.
for res, f in fs.items():
try:
result = f.result()
if result is None:
print("Created {}".format(res))

except KafkaException as e:
print("Failed to create ACL {}: {}".format(res, e))
except Exception:
raise


def example_describe_acls(a, args):
""" describe acls """

acl_binding_filters = [
AclBindingFilter(
ResourceType[restype],
parse_nullable_string(resname),
ResourcePatternType[resource_pattern_type],
parse_nullable_string(principal),
parse_nullable_string(host),
AclOperation[operation],
AclPermissionType[permission_type]
)
for restype, resname, resource_pattern_type,
principal, host, operation, permission_type
in zip(
args[0::7],
args[1::7],
args[2::7],
args[3::7],
args[4::7],
args[5::7],
args[6::7],
)
]

fs = [
a.describe_acls(acl_binding_filter, request_timeout=10)
for acl_binding_filter in acl_binding_filters
]
# Wait for operations to finish.
for acl_binding_filter, f in zip(acl_binding_filters, fs):
try:
print("Acls matching filter: {}".format(acl_binding_filter))
acl_bindings = f.result()
for acl_binding in acl_bindings:
print(acl_binding)

except KafkaException as e:
print("Failed to describe {}: {}".format(acl_binding_filter, e))
except Exception:
raise


def example_delete_acls(a, args):
""" delete acls """

acl_binding_filters = [
AclBindingFilter(
ResourceType[restype],
parse_nullable_string(resname),
ResourcePatternType[resource_pattern_type],
parse_nullable_string(principal),
parse_nullable_string(host),
AclOperation[operation],
AclPermissionType[permission_type]
)
for restype, resname, resource_pattern_type,
principal, host, operation, permission_type
in zip(
args[0::7],
args[1::7],
args[2::7],
args[3::7],
args[4::7],
args[5::7],
args[6::7],
)
]

fs = a.delete_acls(acl_binding_filters, request_timeout=10)

# Wait for operation to finish.
for res, f in fs.items():
try:
acl_bindings = f.result()
print("Deleted acls matching filter: {}".format(res))
for acl_binding in acl_bindings:
print(" ", acl_binding)

except KafkaException as e:
print("Failed to delete {}: {}".format(res, e))
except Exception:
raise


def example_alter_configs(a, args):
""" Alter configs atomically, replacing non-specified
configuration properties with their default values.
Expand Down Expand Up @@ -300,6 +436,12 @@ def example_list(a, args):
'<config=val,config2=val2> <resource_type2> <resource_name2> <config..> ..\n')
sys.stderr.write(' delta_alter_configs <resource_type1> <resource_name1> ' +
'<config=val,config2=val2> <resource_type2> <resource_name2> <config..> ..\n')
sys.stderr.write(' create_acls <resource_type1> <resource_name1> <resource_patter_type1> ' +
'<principal1> <host1> <operation1> <permission_type1> ..\n')
sys.stderr.write(' describe_acls <resource_type1 <resource_name1> <resource_patter_type1> ' +
'<principal1> <host1> <operation1> <permission_type1> ..\n')
sys.stderr.write(' delete_acls <resource_type1> <resource_name1> <resource_patter_type1> ' +
'<principal1> <host1> <operation1> <permission_type1> ..\n')
sys.stderr.write(' list [<all|topics|brokers|groups>]\n')
sys.exit(1)

Expand All @@ -316,6 +458,9 @@ def example_list(a, args):
'describe_configs': example_describe_configs,
'alter_configs': example_alter_configs,
'delta_alter_configs': example_delta_alter_configs,
'create_acls': example_create_acls,
'describe_acls': example_describe_acls,
'delete_acls': example_delete_acls,
'list': example_list}

if operation not in opsmap:
Expand Down
3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@

AVRO_REQUIRES = ['fastavro>=0.23.0,<1.0;python_version<"3.0"',
'fastavro>=1.0;python_version>"3.0"',
'avro==1.10.0;python_version<"3.0"',
'avro-python3==1.10.0;python_version>"3.0"'
'avro==1.10.0'
] + SCHEMA_REGISTRY_REQUIRES

JSON_REQUIRES = ['pyrsistent==0.16.1;python_version<"3.0"',
Expand Down
Loading