Skip to content

flake8 (pep8 and pyflakes) clean-up #59

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 4, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions kafka/client.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
import base64
from collections import defaultdict
from functools import partial
from itertools import count, cycle
from itertools import count
import logging
from operator import attrgetter
import struct
import time
import zlib

from kafka.common import *
from kafka.common import ErrorMapping, TopicAndPartition
from kafka.conn import KafkaConnection
from kafka.protocol import KafkaProtocol

Expand Down Expand Up @@ -212,8 +208,10 @@ def send_produce_request(self, payloads=[], acks=1, timeout=1000,
order of input payloads
"""

encoder = partial(KafkaProtocol.encode_produce_request,
acks=acks, timeout=timeout)
encoder = partial(
KafkaProtocol.encode_produce_request,
acks=acks,
timeout=timeout)

if acks == 0:
decoder = None
Expand All @@ -226,10 +224,10 @@ def send_produce_request(self, payloads=[], acks=1, timeout=1000,
for resp in resps:
# Check for errors
if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
raise Exception("ProduceRequest for %s failed with "
"errorcode=%d" % (
TopicAndPartition(resp.topic, resp.partition),
resp.error))
raise Exception(
"ProduceRequest for %s failed with errorcode=%d" %
(TopicAndPartition(resp.topic, resp.partition),
resp.error))

# Run the callback
if callback is not None:
Expand All @@ -251,17 +249,18 @@ def send_fetch_request(self, payloads=[], fail_on_error=True,
max_wait_time=max_wait_time,
min_bytes=min_bytes)

resps = self._send_broker_aware_request(payloads, encoder,
KafkaProtocol.decode_fetch_response)
resps = self._send_broker_aware_request(
payloads, encoder,
KafkaProtocol.decode_fetch_response)

out = []
for resp in resps:
# Check for errors
if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR:
raise Exception("FetchRequest for %s failed with "
"errorcode=%d" % (
TopicAndPartition(resp.topic, resp.partition),
resp.error))
raise Exception(
"FetchRequest for %s failed with errorcode=%d" %
(TopicAndPartition(resp.topic, resp.partition),
resp.error))

# Run the callback
if callback is not None:
Expand All @@ -272,9 +271,10 @@ def send_fetch_request(self, payloads=[], fail_on_error=True,

def send_offset_request(self, payloads=[], fail_on_error=True,
callback=None):
resps = self._send_broker_aware_request(payloads,
KafkaProtocol.encode_offset_request,
KafkaProtocol.decode_offset_response)
resps = self._send_broker_aware_request(
payloads,
KafkaProtocol.encode_offset_request,
KafkaProtocol.decode_offset_response)

out = []
for resp in resps:
Expand Down
20 changes: 14 additions & 6 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import struct
from threading import local

from kafka.common import BufferUnderflowError


log = logging.getLogger("kafka")

Expand All @@ -12,7 +14,7 @@ class KafkaConnection(local):
A socket connection to a single Kafka broker

This class is _not_ thread safe. Each call to `send` must be followed
by a call to `recv` in order to get the correct response. Eventually,
by a call to `recv` in order to get the correct response. Eventually,
we can do something in here to facilitate multiplexed requests/responses
since the Kafka API includes a correlation id.
"""
Expand Down Expand Up @@ -43,7 +45,7 @@ def _consume_response(self):

def _consume_response_iter(self):
"""
This method handles the response header and error messages. It
This method handles the response header and error messages. It
then returns an iterator for the chunks of the response
"""
log.debug("Handling response from Kafka")
Expand All @@ -57,13 +59,15 @@ def _consume_response_iter(self):
messagesize = size - 4
log.debug("About to read %d bytes from Kafka", messagesize)

# Read the remainder of the response
# Read the remainder of the response
total = 0
while total < messagesize:
resp = self._sock.recv(self.bufsize)
log.debug("Read %d bytes from Kafka", len(resp))
if resp == "":
raise BufferUnderflowError("Not enough data to read this response")
raise BufferUnderflowError(
"Not enough data to read this response")

total += len(resp)
yield resp

Expand All @@ -75,9 +79,13 @@ def _consume_response_iter(self):

def send(self, request_id, payload):
"Send a request to Kafka"
log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id))

log.debug(
"About to send %d bytes to Kafka, request %d" %
(len(payload), request_id))

sent = self._sock.sendall(payload)
if sent != None:
if sent is not None:
raise RuntimeError("Kafka went away")

def recv(self, request_id):
Expand Down
53 changes: 30 additions & 23 deletions kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from kafka.common import (
ErrorMapping, FetchRequest,
OffsetRequest, OffsetFetchRequest, OffsetCommitRequest,
OffsetRequest, OffsetCommitRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData
)

Expand Down Expand Up @@ -223,11 +223,12 @@ def __init__(self, client, group, topic, auto_commit=True, partitions=None,
self.fetch_min_bytes = fetch_size_bytes
self.fetch_started = defaultdict(bool) # defaults to false

super(SimpleConsumer, self).__init__(client, group, topic,
partitions=partitions,
auto_commit=auto_commit,
auto_commit_every_n=auto_commit_every_n,
auto_commit_every_t=auto_commit_every_t)
super(SimpleConsumer, self).__init__(
client, group, topic,
partitions=partitions,
auto_commit=auto_commit,
auto_commit_every_n=auto_commit_every_n,
auto_commit_every_t=auto_commit_every_t)

def provide_partition_info(self):
"""
Expand Down Expand Up @@ -275,8 +276,8 @@ def seek(self, offset, whence):

resps = self.client.send_offset_request(reqs)
for resp in resps:
self.offsets[resp.partition] = resp.offsets[0] + \
deltas[resp.partition]
self.offsets[resp.partition] = \
resp.offsets[0] + deltas[resp.partition]
else:
raise ValueError("Unexpected value for `whence`, %d" % whence)

Expand Down Expand Up @@ -364,9 +365,10 @@ def __iter_partition__(self, partition, offset):
req = FetchRequest(
self.topic, partition, offset, self.client.bufsize)

(resp,) = self.client.send_fetch_request([req],
max_wait_time=self.fetch_max_wait_time,
min_bytes=fetch_size)
(resp,) = self.client.send_fetch_request(
[req],
max_wait_time=self.fetch_max_wait_time,
min_bytes=fetch_size)

assert resp.topic == self.topic
assert resp.partition == partition
Expand All @@ -376,18 +378,22 @@ def __iter_partition__(self, partition, offset):
for message in resp.messages:
next_offset = message.offset

# update the offset before the message is yielded. This is
# so that the consumer state is not lost in certain cases.
# For eg: the message is yielded and consumed by the caller,
# but the caller does not come back into the generator again.
# The message will be consumed but the status will not be
# updated in the consumer
# update the offset before the message is yielded. This
# is so that the consumer state is not lost in certain
# cases.
#
# For eg: the message is yielded and consumed by the
# caller, but the caller does not come back into the
# generator again. The message will be consumed but the
# status will not be updated in the consumer
self.fetch_started[partition] = True
self.offsets[partition] = message.offset
yield message
except ConsumerFetchSizeTooSmall, e:
log.warn("Fetch size is too small, increasing by 1.5x and retrying")
fetch_size *= 1.5
log.warn(
"Fetch size too small, increasing to %d (1.5x) and retry",
fetch_size)
continue
except ConsumerNoMoreData, e:
log.debug("Iteration was ended by %r", e)
Expand Down Expand Up @@ -429,11 +435,12 @@ def __init__(self, client, group, topic, auto_commit=True,
num_procs=1, partitions_per_proc=0):

# Initiate the base consumer class
super(MultiProcessConsumer, self).__init__(client, group, topic,
partitions=None,
auto_commit=auto_commit,
auto_commit_every_n=auto_commit_every_n,
auto_commit_every_t=auto_commit_every_t)
super(MultiProcessConsumer, self).__init__(
client, group, topic,
partitions=None,
auto_commit=auto_commit,
auto_commit_every_n=auto_commit_every_n,
auto_commit_every_t=auto_commit_every_t)

# Variables for managing and controlling the data flow from
# consumer child process to master
Expand Down
38 changes: 19 additions & 19 deletions kafka/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ class KafkaProtocol(object):
This class does not have any state associated with it, it is purely
for organization.
"""
PRODUCE_KEY = 0
FETCH_KEY = 1
OFFSET_KEY = 2
METADATA_KEY = 3
PRODUCE_KEY = 0
FETCH_KEY = 1
OFFSET_KEY = 2
METADATA_KEY = 3
OFFSET_COMMIT_KEY = 6
OFFSET_FETCH_KEY = 7
OFFSET_FETCH_KEY = 7

ATTRIBUTE_CODEC_MASK = 0x03
CODEC_NONE = 0x00
Expand Down Expand Up @@ -120,8 +120,8 @@ def _decode_message_set_iter(cls, data):
yield OffsetAndMessage(offset, message)
except BufferUnderflowError:
if read_message is False:
# If we get a partial read of a message, but haven't yielded anyhting
# there's a problem
# If we get a partial read of a message, but haven't
# yielded anyhting there's a problem
raise ConsumerFetchSizeTooSmall()
else:
raise StopIteration()
Expand Down Expand Up @@ -274,14 +274,14 @@ def decode_fetch_response(cls, data):

for i in range(num_partitions):
((partition, error, highwater_mark_offset), cur) = \
relative_unpack('>ihq', data, cur)
relative_unpack('>ihq', data, cur)

(message_set, cur) = read_int_string(data, cur)

yield FetchResponse(
topic, partition, error,
highwater_mark_offset,
KafkaProtocol._decode_message_set_iter(message_set))
topic, partition, error,
highwater_mark_offset,
KafkaProtocol._decode_message_set_iter(message_set))

@classmethod
def encode_offset_request(cls, client_id, correlation_id, payloads=None):
Expand Down Expand Up @@ -321,7 +321,7 @@ def decode_offset_response(cls, data):

for i in range(num_partitions):
((partition, error, num_offsets,), cur) = \
relative_unpack('>ihi', data, cur)
relative_unpack('>ihi', data, cur)

offsets = []
for j in range(num_offsets):
Expand Down Expand Up @@ -383,17 +383,17 @@ def decode_metadata_response(cls, data):

for j in range(num_partitions):
((partition_error_code, partition, leader, numReplicas), cur) = \
relative_unpack('>hiii', data, cur)
relative_unpack('>hiii', data, cur)

(replicas, cur) = relative_unpack('>%di' % numReplicas,
data, cur)
(replicas, cur) = relative_unpack(
'>%di' % numReplicas, data, cur)

((num_isr,), cur) = relative_unpack('>i', data, cur)
(isr, cur) = relative_unpack('>%di' % num_isr, data, cur)

partition_metadata[partition] = \
PartitionMetadata(topic_name, partition, leader,
replicas, isr)
PartitionMetadata(
topic_name, partition, leader, replicas, isr)

topic_metadata[topic_name] = partition_metadata

Expand Down Expand Up @@ -531,7 +531,7 @@ def create_gzip_message(payloads, key=None):
key: bytes, a key used for partition routing (optional)
"""
message_set = KafkaProtocol._encode_message_set(
[create_message(payload) for payload in payloads])
[create_message(payload) for payload in payloads])

gzipped = gzip_encode(message_set)
codec = KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP
Expand All @@ -552,7 +552,7 @@ def create_snappy_message(payloads, key=None):
key: bytes, a key used for partition routing (optional)
"""
message_set = KafkaProtocol._encode_message_set(
[create_message(payload) for payload in payloads])
[create_message(payload) for payload in payloads])

snapped = snappy_encode(message_set)
codec = KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY
Expand Down
19 changes: 11 additions & 8 deletions kafka/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ def __init__(self, client, topic, partition, out_queue, barrier,
Process.__init__(self)

def __str__(self):
return "[KafkaConsumerProcess: topic=%s, partition=%s, sleep=%s]" % \
(self.topic, self.partition, self.consumer_sleep)
return "[KafkaConsumerProcess: topic=%s, \
partition=%s, sleep=%s]" % \
(self.topic, self.partition, self.consumer_sleep)

def run(self):
self.barrier.wait()
Expand Down Expand Up @@ -70,10 +71,12 @@ def __init__(self, client, topic, in_queue, barrier,
Process.__init__(self)

def __str__(self):
return "[KafkaProducerProcess: topic=%s, flush_buffer=%s, \
flush_timeout=%s, timeout=%s]" % (
self.topic, self.producer_flush_buffer,
self.producer_flush_timeout, self.producer_timeout)
return "[KafkaProducerProcess: topic=%s, \
flush_buffer=%s, flush_timeout=%s, timeout=%s]" % \
(self.topic,
self.producer_flush_buffer,
self.producer_flush_timeout,
self.producer_timeout)

def run(self):
self.barrier.wait()
Expand Down Expand Up @@ -104,8 +107,8 @@ def flush(messages):
last_produce = time.time()

try:
msg = KafkaClient.create_message(self.in_queue.get(True,
self.producer_timeout))
msg = KafkaClient.create_message(
self.in_queue.get(True, self.producer_timeout))
messages.append(msg)

except Empty:
Expand Down
Loading