Skip to content

Commit

Permalink
Merge 60ae82a into 6f932ba
Browse files Browse the repository at this point in the history
  • Loading branch information
Tincu Gabriel committed Oct 28, 2020
2 parents 6f932ba + 60ae82a commit fb7963c
Show file tree
Hide file tree
Showing 10 changed files with 445 additions and 9 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ env:
- KAFKA_VERSION=1.1.1
- KAFKA_VERSION=2.4.0
- KAFKA_VERSION=2.5.0
- KAFKA_VERSION=2.6.0

addons:
apt:
Expand Down
24 changes: 23 additions & 1 deletion kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest,
DeleteGroupsRequest
DeleteGroupsRequest, ListPartitionReassignmentsRequest
)
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
Expand Down Expand Up @@ -460,6 +460,28 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False):
# TODO raise exceptions if errors
return self._send_request_to_controller(request)

def list_partition_reassignments(self, topics_partitions, timeout_ms=None):
"""List ongoing partition reassignments in the cluster.
:param topics_partitions: A list of TopicPartition objects.
:param timeout_ms: Milliseconds to wait for new topics to be created
before the broker returns.
:return: Appropriate version of ListPartitionReassignment class.
"""
version = self._matching_api_version(ListPartitionReassignmentsRequest)
timeout_ms = self._validate_timeout(timeout_ms)
if version != 0:
raise NotImplementedError(
"Support for ListPartitionReassignments v{} has not yet been added to KafkaAdminClient."
.format(version))
payload = defaultdict(set)
for tp in topics_partitions:
payload[tp.topic].add(tp.partition)
request = ListPartitionReassignmentsRequest[version](timeout_ms, [(k, v, {}) for k, v in payload.items()], {})
future = self._send_request_to_node(self._controller_id, request)
self._wait_for_futures([future])
return future.value

def delete_topics(self, topics, timeout_ms=None):
"""Delete topics from the cluster.
Expand Down
48 changes: 48 additions & 0 deletions kafka/protocol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,52 @@
40: 'ExpireDelegationToken',
41: 'DescribeDelegationToken',
42: 'DeleteGroups',
45: 'AlterPartitionReassignments',
46: 'ListPartitionReassignments',
}

# generated using the util function
FLEXIBLE_VERSIONS = {
1: 12,
3: 9,
4: 4,
5: 2,
6: 6,
7: 3,
8: 8,
9: 6,
10: 3,
11: 6,
12: 4,
13: 4,
14: 4,
15: 5,
16: 3,
18: 3,
19: 5,
20: 4,
21: 2,
22: 2,
28: 3,
29: 2,
30: 2,
31: 2,
35: 2,
36: 2,
37: 2,
38: 2,
39: 2,
40: 2,
41: 2,
42: 2,
43: 2,
44: 1,
45: 0,
46: 0,
50: 0,
51: 0,
52: 0,
55: 0,
56: 0,
57: 0
}
89 changes: 88 additions & 1 deletion kafka/protocol/admin.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import absolute_import

from kafka.protocol.api import Request, Response
from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String
from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, CompactString, CompactArray, TaggedFields


class ApiVersionResponse_v0(Response):
Expand Down Expand Up @@ -923,3 +923,90 @@ class DeleteGroupsRequest_v1(Request):
DeleteGroupsResponse = [
DeleteGroupsResponse_v0, DeleteGroupsResponse_v1
]


class AlterPartitionReassignmentsResponse_v0(Response):
API_KEY = 45
API_VERSION = 0
SCHEMA = Schema(
("throttle_time_ms", Int32),
("error_code", Int16),
("error_message", CompactString("utf-8")),
("responses", CompactArray(
("name", CompactString("utf-8")),
("partitions", CompactArray(
("partition_index", Int32),
("error_code", Int16),
("error_message", CompactString("utf-8")),
("_tagged_fields", TaggedFields)
)),
("_tagged_fields", TaggedFields)
)),
("_tagged_fields", TaggedFields)
)


class AlterPartitionReassignmentsRequest_v0(Request):
API_KEY = 45
API_VERSION = 0
RESPONSE_TYPE = AlterPartitionReassignmentsResponse_v0
SCHEMA = Schema(
("timeout_ms", Int32),
("topics", CompactArray(
("name", CompactString("utf-8")),
("partitions", CompactArray(
("partition_index", Int32),
("replicas", CompactArray(Int32)),
("_tagged_fields", TaggedFields)
)),
("_tagged_fields", TaggedFields)
)),
("_tagged_fields", TaggedFields)
)


AlterPartitionReassignmentsRequest = [AlterPartitionReassignmentsRequest_v0]

AlterPartitionReassignmentsResponse = [AlterPartitionReassignmentsResponse_v0]


class ListPartitionReassignmentsResponse_v0(Response):
API_KEY = 46
API_VERSION = 0
SCHEMA = Schema(
("throttle_time_ms", Int32),
("error_code", Int16),
("error_message", CompactString("utf-8")),
("topics", CompactArray(
("name", CompactString("utf-8")),
("partitions", CompactArray(
("partition_index", Int32),
("replicas", CompactArray(Int32)),
("adding_replicas", CompactArray(Int32)),
("removing_replicas", CompactArray(Int32)),
("_tagged_fields", TaggedFields)
)),
("_tagged_fields", TaggedFields)
)),
("_tagged_fields", TaggedFields)
)


class ListPartitionReassignmentsRequest_v0(Request):
API_KEY = 46
API_VERSION = 0
RESPONSE_TYPE = ListPartitionReassignmentsResponse_v0
SCHEMA = Schema(
("timeout_ms", Int32),
("topics", CompactArray(
("name", CompactString("utf-8")),
("partition_index", CompactArray(Int32)),
("_tagged_fields", TaggedFields)
)),
("_tagged_fields", TaggedFields)
)


ListPartitionReassignmentsRequest = [ListPartitionReassignmentsRequest_v0]

ListPartitionReassignmentsResponse = [ListPartitionReassignmentsResponse_v0]
18 changes: 17 additions & 1 deletion kafka/protocol/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import abc

from kafka.protocol.struct import Struct
from kafka.protocol.types import Int16, Int32, String, Schema, Array
from kafka.protocol.types import Int16, Int32, String, Schema, Array, TaggedFields


class RequestHeader(Struct):
Expand All @@ -20,6 +20,22 @@ def __init__(self, request, correlation_id=0, client_id='kafka-python'):
)


class RequestHeaderV2(Struct):
# Flexible response / request headers end in field buffer
SCHEMA = Schema(
('api_key', Int16),
('api_version', Int16),
('correlation_id', Int32),
('client_id', String('utf-8')),
('_tag_buffer', TaggedFields)
)

def __init__(self, request, correlation_id=0, client_id='kafka-python'):
super(RequestHeaderV2, self).__init__(
request.API_KEY, request.API_VERSION, correlation_id, client_id, {}
)


class Request(Struct):
__metaclass__ = abc.ABCMeta

Expand Down
27 changes: 22 additions & 5 deletions kafka/protocol/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
import logging

import kafka.errors as Errors
from kafka.protocol.api import RequestHeader
from kafka.protocol import FLEXIBLE_VERSIONS
from kafka.protocol.api import RequestHeader, RequestHeaderV2
from kafka.protocol.commit import GroupCoordinatorResponse
from kafka.protocol.frame import KafkaBytes
from kafka.protocol.types import Int32
from kafka.protocol.types import Int32, TaggedFields
from kafka.version import __version__

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -59,9 +60,14 @@ def send_request(self, request, correlation_id=None):
log.debug('Sending request %s', request)
if correlation_id is None:
correlation_id = self._next_correlation_id()
header = RequestHeader(request,
correlation_id=correlation_id,
client_id=self._client_id)
if not self.is_flexible_version_request(request):
header = RequestHeader(request,
correlation_id=correlation_id,
client_id=self._client_id)
else:
header = RequestHeaderV2(request,
correlation_id=correlation_id,
client_id=self._client_id)
message = b''.join([header.encode(), request.encode()])
size = Int32.encode(len(message))
data = size + message
Expand Down Expand Up @@ -162,6 +168,9 @@ def _process_response(self, read_buffer):
'Correlation IDs do not match: sent %d, recv %d'
% (correlation_id, recv_correlation_id))

# Flexible response / request headers end in field buffer
if self.is_flexible_version_request(request):
_ = TaggedFields.decode(read_buffer)
# decode response
log.debug('Processing response %s', request.RESPONSE_TYPE.__name__)
try:
Expand All @@ -181,3 +190,11 @@ def _reset_buffer(self):
self._receiving = False
self._header.seek(0)
self._rbuffer = None

@staticmethod
def is_flexible_version_request(request):
flexible_version = FLEXIBLE_VERSIONS.get(request.API_KEY)
request_version = request.API_VERSION
if flexible_version is None or flexible_version > request_version:
return False
return True

0 comments on commit fb7963c

Please sign in to comment.