Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka client edits #332

Merged
merged 5 commits into from
Mar 8, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol
from kafka.util import kafka_bytestring

log = logging.getLogger("kafka")

Expand All @@ -30,7 +31,7 @@ class KafkaClient(object):
def __init__(self, hosts, client_id=CLIENT_ID,
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
# We need one connection to bootstrap
self.client_id = client_id
self.client_id = kafka_bytestring(client_id)
self.timeout = timeout
self.hosts = collect_hosts(hosts)

Expand Down Expand Up @@ -85,7 +86,7 @@ def _get_leader_for_partition(self, topic, partition):
self.load_metadata_for_topics(topic)

# If the partition doesn't actually exist, raise
if partition not in self.topic_partitions[topic]:
if partition not in self.topic_partitions.get(topic, []):
raise UnknownTopicOrPartitionError(key)

# If there's no leader for the partition, raise
Expand Down Expand Up @@ -177,8 +178,13 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
# Send the request, recv the response
try:
conn.send(requestId, request)

# decoder_fn=None signal that the server is expected to not
# send a response. This probably only applies to
# ProduceRequest w/ acks = 0
if decoder_fn is None:
continue

try:
response = conn.recv(requestId)
except ConnectionError as e:
Expand Down Expand Up @@ -259,7 +265,7 @@ def has_metadata_for_topic(self, topic):

def get_partition_ids_for_topic(self, topic):
if topic not in self.topic_partitions:
return None
return []

return sorted(list(self.topic_partitions[topic]))

Expand Down
13 changes: 2 additions & 11 deletions kafka/consumer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@
'rebalance_backoff_ms': 2000,
}

BYTES_CONFIGURATION_KEYS = ('client_id', 'group_id')


class KafkaConsumer(object):
"""
Expand Down Expand Up @@ -171,13 +169,6 @@ def configure(self, **configs):
raise KafkaConfigurationError('Unknown configuration key(s): ' +
str(list(configs.keys())))

# Handle str/bytes conversions
for config_key in BYTES_CONFIGURATION_KEYS:
if isinstance(self._config[config_key], six.string_types):
logger.warning("Converting configuration key '%s' to bytes" %
config_key)
self._config[config_key] = self._config[config_key].encode('utf-8')

if self._config['auto_commit_enable']:
if not self._config['group_id']:
raise KafkaConfigurationError('KafkaConsumer configured to auto-commit without required consumer group (group_id)')
Expand Down Expand Up @@ -554,7 +545,7 @@ def commit(self):

if commits:
logger.info('committing consumer offsets to group %s', self._config['group_id'])
resps = self._client.send_offset_commit_request(self._config['group_id'],
resps = self._client.send_offset_commit_request(kafka_bytestring(self._config['group_id']),
commits,
fail_on_error=False)

Expand Down Expand Up @@ -618,7 +609,7 @@ def _get_commit_offsets(self):
logger.info("Consumer fetching stored offsets")
for topic_partition in self._topics:
(resp,) = self._client.send_offset_fetch_request(
self._config['group_id'],
kafka_bytestring(self._config['group_id']),
[OffsetFetchRequest(topic_partition[0], topic_partition[1])],
fail_on_error=False)
try:
Expand Down