Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add protocol support for ApiVersionRequest #678

Merged
merged 1 commit into from May 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion kafka/conn.py
Expand Up @@ -520,7 +520,7 @@ def check_version(self, timeout=2, strict=False):
# vanilla MetadataRequest. If the server did not recognize the first
# request, both will be failed with a ConnectionError that wraps
# socket.error (32, 54, or 104)
from .protocol.admin import ListGroupsRequest
from .protocol.admin import ApiVersionRequest, ListGroupsRequest
from .protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest
from .protocol.metadata import MetadataRequest

Expand All @@ -536,6 +536,7 @@ def filter(self, record):
log.addFilter(log_filter)

test_cases = [
('0.10', ApiVersionRequest[0]()),
('0.9', ListGroupsRequest[0]()),
('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')),
('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])),
Expand Down
2 changes: 1 addition & 1 deletion kafka/consumer/group.py
Expand Up @@ -225,7 +225,7 @@ def __init__(self, *topics, **configs):
# Check Broker Version if not set explicitly
if self.config['api_version'] == 'auto':
self.config['api_version'] = self._client.check_version()
assert self.config['api_version'] in ('0.9', '0.8.2', '0.8.1', '0.8.0'), 'Unrecognized api version'
assert self.config['api_version'] in ('0.10', '0.9', '0.8.2', '0.8.1', '0.8.0'), 'Unrecognized api version'
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be refactored. A user should not ever need to pass in api_version='0.10' since that broker supports ApiVersionRequest.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the user still could set api_version='0.10' (or higher in the future) right?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm convinced! merging


# Convert api_version config to tuple for easy comparisons
self.config['api_version'] = tuple(
Expand Down
2 changes: 1 addition & 1 deletion kafka/producer/kafka.py
Expand Up @@ -268,7 +268,7 @@ def __init__(self, **configs):
# Check Broker Version if not set explicitly
if self.config['api_version'] == 'auto':
self.config['api_version'] = client.check_version()
assert self.config['api_version'] in ('0.9', '0.8.2', '0.8.1', '0.8.0')
assert self.config['api_version'] in ('0.10', '0.9', '0.8.2', '0.8.1', '0.8.0')

# Convert api_version config to tuple for easy comparisons
self.config['api_version'] = tuple(
Expand Down
22 changes: 22 additions & 0 deletions kafka/protocol/admin.py
Expand Up @@ -2,6 +2,28 @@
from .types import Array, Bytes, Int16, Schema, String


class ApiVersionResponse_v0(Struct):
API_KEY = 18
API_VERSION = 0
SCHEMA = Schema(
('error_code', Int16),
('api_versions', Array(
('api_key', Int16),
('min_version', Int16),
('max_version', Int16))))


class ApiVersionRequest_v0(Struct):
API_KEY = 18
API_VERSION = 0
RESPONSE_TYPE = ApiVersionResponse_v0
SCHEMA = Schema()


ApiVersionRequest = [ApiVersionRequest_v0]
ApiVersionResponse = [ApiVersionResponse_v0]


class ListGroupsResponse_v0(Struct):
API_KEY = 16
API_VERSION = 0
Expand Down