Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented KIP-88, KIP-222, KIP-518 and partially KIP-396 #1449

Merged
merged 54 commits into from
Jan 16, 2023
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
2ea142a
Alter and List consumer group operations
pranavrth Sep 26, 2022
c344023
Added require_stable option in ListConsumerGroupOffsets API
pranavrth Sep 26, 2022
f8727fc
Added Integration test
pranavrth Sep 26, 2022
7f71ac8
Temp Commit
pranavrth Sep 26, 2022
d3bd645
Fixed integration test to work without sleep
pranavrth Sep 27, 2022
926c6ad
Fixed unused import issue
pranavrth Sep 28, 2022
3c96288
Formatting fixes
pranavrth Sep 28, 2022
0b6bd99
Changed import from ..cimpl instead of confluent_kafka for _offsets.p…
pranavrth Sep 29, 2022
3aa56f6
Removed list and describe consumer group new implementations
pranavrth Oct 14, 2022
96faf14
After adding list group in examples
pranavrth Oct 14, 2022
4ee6878
Added usage for the delete consumer groups operation in example
pranavrth Oct 14, 2022
e001267
Added test cases for delete operation
pranavrth Oct 18, 2022
73f84b6
Removed unnecessary changes
pranavrth Oct 18, 2022
6877a32
Doc and return type changes
pranavrth Oct 18, 2022
b75253c
Updated Vairable names
pranavrth Oct 18, 2022
855e1f4
Working list consumer groups
pranavrth Dec 12, 2022
7ddc119
Added describe and list consumer groups
pranavrth Dec 21, 2022
de2b691
Some improvement to the code
pranavrth Dec 26, 2022
292f3c7
Removed some TODOs
pranavrth Dec 26, 2022
fccbbff
Moved validations for alter and list offsets API to validation util
pranavrth Dec 28, 2022
d64d71f
Added an integration test
pranavrth Dec 29, 2022
1fe9018
Refactoring
pranavrth Jan 1, 2023
d2df7c5
Flake8 fixes
pranavrth Jan 1, 2023
3799fac
Added integration test for describe consumer groups. Added more valid…
pranavrth Jan 2, 2023
4dbced1
Fixing a memory leak
pranavrth Jan 3, 2023
291b73a
Removed validations from the response. Improved example. Removed erro…
pranavrth Jan 9, 2023
a8ebaa5
Add doc TODO
pranavrth Jan 9, 2023
ae5284f
Fixed some of the TODOs
pranavrth Jan 9, 2023
22fc909
Changed Delete consumer groups response creation. Some example change…
pranavrth Jan 10, 2023
b12a886
Remove some TODOs
pranavrth Jan 10, 2023
d6410bf
Added deprecation warning for list groups. Removed some more TODOs
pranavrth Jan 10, 2023
27bb667
Changed topic_partition_list to topic_partitions
pranavrth Jan 10, 2023
cf20741
Added some more unit test cases
pranavrth Jan 10, 2023
3610ca1
Fixed styling issues
pranavrth Jan 10, 2023
4c45bf1
PR comments
pranavrth Jan 12, 2023
0de8f4c
Added docs and Changelog
pranavrth Jan 12, 2023
7d63d2b
Changelog fixes
emasab Jan 12, 2023
bea1567
Make util and model packages private,
emasab Jan 12, 2023
2aa361c
Added few changes related to ordering of the functions, docs and CHAN…
pranavrth Jan 13, 2023
aff12ac
Added old change to CHANGELOG
pranavrth Jan 13, 2023
3d4dcb0
Internal TopicPartition in admin
emasab Jan 13, 2023
92905de
Moved ConsumerGroupState to confluent_kafka instead of admin
pranavrth Jan 13, 2023
ba1937b
Changed return type of delete consumer groups request to None
pranavrth Jan 13, 2023
0ea2524
Using request_timeout instead of timeout for list, describe and delet…
pranavrth Jan 13, 2023
bde5af5
changed return type of list and alter cg offsets operation to dict[gr…
pranavrth Jan 13, 2023
91fda73
PEP8 fixes
emasab Jan 13, 2023
b768037
Changed states filter type from list to set
pranavrth Jan 13, 2023
9b61d89
Updated librdkafka version
pranavrth Jan 13, 2023
3348fcd
Updated librdkafka version for travis job
pranavrth Jan 13, 2023
7d37cb8
Fixed docstring test failure for ConsumerGroupState
pranavrth Jan 13, 2023
ff23afb
Some refactoring
pranavrth Jan 15, 2023
7beaf79
Fix error strings
emasab Jan 16, 2023
7cefc27
Refactored function names, removed unnecessary error handling from ba…
pranavrth Jan 16, 2023
67d6d59
Fix test error caused by ACLs
emasab Jan 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 22 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,29 @@
## Next Version

- Added metadata to `TopicPartition` type and `commit()` (#1410).
- Added `consumer.memberid()` for getting member id assigned to
- Added `consumer.memberid()` for getting member id assigned to
the consumer in a consumer group (#1154).
- Added Python 3.11 wheels
- Implemented `nb_bool` method for the Producer, so that the default (which uses len)
will not be used. This avoids situations where producers with no enqueued items would
evaluate to False (#1445).
- Added Python 3.11 wheels.
- [KIP-222](https://cwiki.apache.org/confluence/display/KAFKA/KIP-222+-+Add+Consumer+Group+operations+to+Admin+API)
Add Consumer Group operations to Admin API.
- [KIP-518](https://cwiki.apache.org/confluence/display/KAFKA/KIP-518%3A+Allow+listing+consumer+groups+per+state)
Allow listing consumer groups per state.
- [KIP-396](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97551484)
Partially implemented: support for AlterConsumerGroupOffsets.
- As result of the above KIPs, added (#1449)
- `list_consumer_groups` Admin operation. Supports listing by state.
- `describe_consumer_groups` Admin operation. Supports multiple groups.
- `delete_consumer_groups` Admin operation. Supports multiple groups.
- `list_consumer_group_offsets` Admin operation. Currently, only supports 1 group with multiple partitions. Supports require_stable option.
- `alter_consumer_group_offsets` Admin operation. Currently, only supports 1 group with multiple offsets.
- Added `normalize.schemas` configuration property to Schema Registry client

confluent-kafka-python is based on librdkafka v2.0.0, see the
[librdkafka release notes](https://github.com/edenhill/librdkafka/releases/tag/v2.0.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.


## v1.9.2
Expand Down
155 changes: 149 additions & 6 deletions examples/adminapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

# Example use of AdminClient operations.

from confluent_kafka import (KafkaException, ConsumerGroupTopicPartitions,
TopicPartition)
from confluent_kafka.admin import (AdminClient, NewTopic, NewPartitions, ConfigResource, ConfigSource,
AclBinding, AclBindingFilter, ResourceType, ResourcePatternType,
AclOperation, AclPermissionType)
from confluent_kafka import KafkaException
AclBinding, AclBindingFilter, ResourceType, ResourcePatternType, AclOperation,
AclPermissionType, ConsumerGroupState)
import sys
import threading
import logging
Expand Down Expand Up @@ -419,17 +420,146 @@ def example_list(a, args):
print(" {} consumer groups".format(len(groups)))
for g in groups:
if g.error is not None:
errstr = ": {}".format(t.error)
errstr = ": {}".format(g.error)
else:
errstr = ""

print(" \"{}\" with {} member(s), protocol: {}, protocol_type: {}{}".format(
g, len(g.members), g.protocol, g.protocol_type, errstr))
g, len(g.members), g.protocol, g.protocol_type, errstr))

for m in g.members:
print("id {} client_id: {} client_host: {}".format(m.id, m.client_id, m.client_host))


def example_list_consumer_groups(a, args):
"""
List Consumer Groups
"""
states = [ConsumerGroupState[state] for state in args]
future = a.list_consumer_groups(timeout=10, states=states)
try:
list_consumer_groups_result = future.result()
print("{} consumer groups".format(len(list_consumer_groups_result.valid)))
for valid in list_consumer_groups_result.valid:
print(" id: {} is_simple: {} state: {}".format(
valid.group_id, valid.is_simple_consumer_group, valid.state))
print("{} errors".format(len(list_consumer_groups_result.errors)))
for error in list_consumer_groups_result.errors:
print(" error: {}".format(error))
except Exception:
raise


def example_describe_consumer_groups(a, args):
"""
Describe Consumer Groups
"""

futureMap = a.describe_consumer_groups(args, timeout=10)

for group_id, future in futureMap.items():
try:
g = future.result()
print("Group Id: {}".format(g.group_id))
print(" Is Simple : {}".format(g.is_simple_consumer_group))
print(" State : {}".format(g.state))
print(" Partition Assignor : {}".format(g.partition_assignor))
print(" Coordinator : ({}) {}:{}".format(g.coordinator.id, g.coordinator.host, g.coordinator.port))
print(" Members: ")
for member in g.members:
print(" Id : {}".format(member.member_id))
print(" Host : {}".format(member.host))
print(" Client Id : {}".format(member.client_id))
print(" Group Instance Id : {}".format(member.group_instance_id))
if member.assignment:
print(" Assignments :")
for toppar in member.assignment.topic_partitions:
print(" {} [{}]".format(toppar.topic, toppar.partition))
except KafkaException as e:
print("Error while describing group id '{}': {}".format(group_id, e))
except Exception:
raise


def example_delete_consumer_groups(a, args):
"""
Delete Consumer Groups
"""
groups = a.delete_consumer_groups(args, timeout=10)
for group_id, future in groups.items():
try:
response = future.result()
pranavrth marked this conversation as resolved.
Show resolved Hide resolved
print("Deleted group with id '" + response.group_id + "' successfully")
except KafkaException as e:
print("Error deleting group id '{}': {}".format(group_id, e))
except Exception:
raise


def example_list_consumer_group_offsets(a, args):
"""
List consumer group offsets
"""
pranavrth marked this conversation as resolved.
Show resolved Hide resolved

topic_partitions = []
for topic, partition in zip(args[1::2], args[2::2]):
topic_partitions.append(TopicPartition(topic, int(partition)))
if len(topic_partitions) == 0:
topic_partitions = None
groups = [ConsumerGroupTopicPartitions(args[0], topic_partitions)]

futureMap = a.list_consumer_group_offsets(groups)

for request, future in futureMap.items():
pranavrth marked this conversation as resolved.
Show resolved Hide resolved
try:
response_offset_info = future.result()
print("Group: " + response_offset_info.group_id)
for topic_partition in response_offset_info.topic_partitions:
if topic_partition.error:
print(" Error: " + topic_partition.error.str() + " occurred with " +
topic_partition.topic + " [" + str(topic_partition.partition) + "]")
else:
print(" " + topic_partition.topic +
" [" + str(topic_partition.partition) + "]: " + str(topic_partition.offset))

except KafkaException as e:
print("Failed to describe {}: {}".format(request.group_id, e))
except Exception:
raise


def example_alter_consumer_group_offsets(a, args):
"""
Alter consumer group offsets
"""

topic_partitions = []
for topic, partition, offset in zip(args[1::3], args[2::3], args[3::3]):
topic_partitions.append(TopicPartition(topic, int(partition), int(offset)))
if len(topic_partitions) == 0:
topic_partitions = None
groups = [ConsumerGroupTopicPartitions(args[0], topic_partitions)]

futureMap = a.alter_consumer_group_offsets(groups)

for request, future in futureMap.items():
pranavrth marked this conversation as resolved.
Show resolved Hide resolved
try:
response_offset_info = future.result()
print("Group: " + response_offset_info.group_id)
for topic_partition in response_offset_info.topic_partitions:
if topic_partition.error:
print(" Error: " + topic_partition.error.str() + " occurred with " +
topic_partition.topic + " [" + str(topic_partition.partition) + "]")
else:
print(" " + topic_partition.topic +
" [" + str(topic_partition.partition) + "]: " + str(topic_partition.offset))

except KafkaException as e:
print("Failed to describe {}: {}".format(request.group_id, e))
except Exception:
raise


if __name__ == '__main__':
if len(sys.argv) < 3:
sys.stderr.write('Usage: %s <bootstrap-brokers> <operation> <args..>\n\n' % sys.argv[0])
Expand All @@ -449,6 +579,14 @@ def example_list(a, args):
sys.stderr.write(' delete_acls <resource_type1> <resource_name1> <resource_patter_type1> ' +
'<principal1> <host1> <operation1> <permission_type1> ..\n')
sys.stderr.write(' list [<all|topics|brokers|groups>]\n')
sys.stderr.write(' list_consumer_groups [<state1> <state2> ..]\n')
sys.stderr.write(' describe_consumer_groups <group1> <group2> ..\n')
sys.stderr.write(' delete_consumer_groups <group1> <group2> ..\n')
sys.stderr.write(' list_consumer_group_offsets <group> [<topic1> <partition1> <topic2> <partition2> ..]\n')
sys.stderr.write(
' alter_consumer_group_offsets <group> <topic1> <partition1> <offset1> ' +
'<topic2> <partition2> <offset2> ..\n')

sys.exit(1)

broker = sys.argv[1]
Expand All @@ -467,7 +605,12 @@ def example_list(a, args):
'create_acls': example_create_acls,
'describe_acls': example_describe_acls,
'delete_acls': example_delete_acls,
'list': example_list}
'list': example_list,
'list_consumer_groups': example_list_consumer_groups,
'describe_consumer_groups': example_describe_consumer_groups,
'delete_consumer_groups': example_delete_consumer_groups,
'list_consumer_group_offsets': example_list_consumer_group_offsets,
'alter_consumer_group_offsets': example_alter_consumer_group_offsets}

if operation not in opsmap:
sys.stderr.write('Unknown operation: %s\n' % operation)
Expand Down
4 changes: 3 additions & 1 deletion src/confluent_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from .deserializing_consumer import DeserializingConsumer
from .serializing_producer import SerializingProducer
from .error import KafkaException, KafkaError
from ._model import Node, ConsumerGroupTopicPartitions

from .cimpl import (Producer,
Consumer,
Expand All @@ -40,7 +41,8 @@
'OFFSET_BEGINNING', 'OFFSET_END', 'OFFSET_INVALID', 'OFFSET_STORED',
'Producer', 'DeserializingConsumer',
'SerializingProducer', 'TIMESTAMP_CREATE_TIME', 'TIMESTAMP_LOG_APPEND_TIME',
'TIMESTAMP_NOT_AVAILABLE', 'TopicPartition']
'TIMESTAMP_NOT_AVAILABLE', 'TopicPartition', 'Node',
'ConsumerGroupTopicPartitions']

__version__ = version()[0]

Expand Down
57 changes: 57 additions & 0 deletions src/confluent_kafka/_model/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright 2022 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


class Node:
pranavrth marked this conversation as resolved.
Show resolved Hide resolved
"""
Represents node information.
Used by :class:`ConsumerGroupDescription`

Parameters
----------
id: int
The node id of this node.
id_string:
String representation of the node id.
host:
The host name for this node.
port: int
The port for this node.
rack: str
The rack for this node.
"""
def __init__(self, id, host, port, rack=None):
self.id = id
self.id_string = str(id)
self.host = host
self.port = port
self.rack = rack


class ConsumerGroupTopicPartitions:
"""
Represents consumer group and its topic partition information.
Used by :meth:`AdminClient.list_consumer_group_offsets` and
:meth:`AdminClient.alter_consumer_group_offsets`.

Parameters
----------
group_id: str
Id of the consumer group.
topic_partitions : list(TopicPartition)
List of topic partitions information.
"""
def __init__(self, group_id, topic_partitions=None):
self.group_id = group_id
self.topic_partitions = topic_partitions
16 changes: 16 additions & 0 deletions src/confluent_kafka/_util/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright 2022 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .validation_util import ValidationUtil # noqa: F401
from .conversion_util import ConversionUtil # noqa: F401
38 changes: 38 additions & 0 deletions src/confluent_kafka/_util/conversion_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright 2022 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from enum import Enum


class ConversionUtil:
@staticmethod
def convert_to_enum(val, enum_clazz):
if type(enum_clazz) is not type(Enum):
raise TypeError("'enum_clazz' must be of type Enum")

if type(val) == str:
# Allow it to be specified as case-insensitive string, for convenience.
try:
val = enum_clazz[val.upper()]
except KeyError:
raise ValueError("Unknown value \"%s\": should be a %s" % (val, enum_clazz.__name__))

elif type(val) == int:
# The C-code passes restype as an int, convert to enum.
val = enum_clazz(val)

elif type(val) != enum_clazz:
raise TypeError("Unknown value \"%s\": should be a %s" % (val, enum_clazz.__name__))

return val