diff --git a/.gitignore b/.gitignore index 640418c2b..c307cd1ac 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,6 @@ dist/ .*.swp .*.un~ *.bak +.coverage +coverage.xml +*#*#* diff --git a/.travis.yml b/.travis.yml index 6183610b4..a277e4b3f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,30 +1,32 @@ language: python +sudo: false python: - "2.7" - +addons: + apt: + packages: + - libev-dev + - libsnappy-dev env: - - BROKERS=localhost:9092,localhost:9093,localhost:9094 ZOOKEEPER=localhost:2181 KAFKA_BIN=/home/travis/kafka-bin + - TOXENV=py27 BROKERS=localhost:9092,localhost:9093,localhost:9094 ZOOKEEPER=localhost:2181 KAFKA_BIN=/home/travis/kafka-bin + - TOXENV=py34 BROKERS=localhost:9092,localhost:9093,localhost:9094 ZOOKEEPER=localhost:2181 KAFKA_BIN=/home/travis/kafka-bin + - TOXENV=pypy BROKERS=localhost:9092,localhost:9093,localhost:9094 ZOOKEEPER=localhost:2181 KAFKA_BIN=/home/travis/kafka-bin notifications: email: - keith@parsely.com - emmett@parsely.com -before_install: - - sudo apt-get install -qq libev-dev libsnappy-dev install: - - pip install python-snappy kazoo unittest2 mock nose - - pip install python-coveralls coverage nose-cov - - pip install testinstances - - python setup.py develop + - pip install python-coveralls kazoo tox before_script: - "python -m pykafka.test.kafka_instance 3 --download-dir /home/travis/kafka-bin &" - "sleep 10" script: - - nosetests -v --with-cov --cov pykafka --cover-branches --cov-config .coveragerc --logging-level=DEBUG + - tox # Calculate coverage on success after_success: diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 000000000..ab30e9ace --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1 @@ +include *.txt diff --git a/pykafka/__init__.py b/pykafka/__init__.py index 0af580709..4278d91b7 100644 --- a/pykafka/__init__.py +++ b/pykafka/__init__.py @@ -1,11 +1,11 @@ -from broker import Broker -from simpleconsumer import SimpleConsumer -from cluster import Cluster -from partition import Partition -from producer import Producer -from topic import Topic -from client import KafkaClient -from balancedconsumer import BalancedConsumer +from .broker import Broker +from .simpleconsumer import SimpleConsumer +from .cluster import Cluster +from .partition import Partition +from .producer import Producer +from .topic import Topic +from .client import KafkaClient +from .balancedconsumer import BalancedConsumer __version__ = '2.0.0' diff --git a/pykafka/balancedconsumer.py b/pykafka/balancedconsumer.py index 17054496f..bd079bfae 100644 --- a/pykafka/balancedconsumer.py +++ b/pykafka/balancedconsumer.py @@ -33,6 +33,7 @@ from .exceptions import (KafkaException, PartitionOwnedError, ConsumerStoppedException) from .simpleconsumer import SimpleConsumer +from .utils.compat import range, get_bytes log = logging.getLogger(__name__) @@ -301,18 +302,20 @@ def _decide_partitions(self, participants): :type participants: Iterable of str """ # Freeze and sort partitions so we always have the same results - p_to_str = lambda p: '-'.join([p.topic.name, str(p.leader.id), str(p.id)]) + p_to_str = lambda p: b'-'.join([ + get_bytes(p.topic.name), bytes(p.leader.id), bytes(p.id)] + ) all_parts = self._topic.partitions.values() - all_parts.sort(key=p_to_str) + all_parts = sorted(all_parts, key=p_to_str) # get start point, # of partitions, and remainder - participants.sort() # just make sure it's sorted. + participants = sorted(participants) # just make sure it's sorted. idx = participants.index(self._consumer_id) parts_per_consumer = math.floor(len(all_parts) / len(participants)) remainder_ppc = len(all_parts) % len(participants) - start = parts_per_consumer * idx + min(idx, remainder_ppc) - num_parts = parts_per_consumer + (0 if (idx + 1 > remainder_ppc) else 1) + start = int(parts_per_consumer * idx + min(idx, remainder_ppc)) + num_parts = int(parts_per_consumer + (0 if (idx + 1 > remainder_ppc) else 1)) # assign partitions from i*N to (i+1)*N - 1 to consumer Ci new_partitions = itertools.islice(all_parts, start, start + num_parts) @@ -320,6 +323,7 @@ def _decide_partitions(self, participants): log.info('Balancing %i participants for %i partitions.\nOwning %i partitions.', len(participants), len(all_parts), len(new_partitions)) log.debug('My partitions: %s', [p_to_str(p) for p in new_partitions]) + return new_partitions def _get_participants(self): @@ -343,7 +347,7 @@ def _get_participants(self): participants.append(id_) except NoNodeException: pass # disappeared between ``get_children`` and ``get`` - participants.sort() + participants = sorted(participants) return participants def _set_watches(self): @@ -409,13 +413,12 @@ def _rebalance(self): self._consumer_id, self._topic.name) ) - for i in xrange(self._rebalance_max_retries): + for i in range(self._rebalance_max_retries): try: # If retrying, be sure to make sure the # partition allocation is correct. participants = self._get_participants() partitions = self._decide_partitions(participants) - old_partitions = self._partitions - partitions self._remove_partitions(old_partitions) @@ -469,7 +472,7 @@ def _add_partitions(self, partitions): try: self._zookeeper.create( self._path_from_partition(p), - value=self._consumer_id, + value=get_bytes(self._consumer_id), ephemeral=True ) self._partitions.add(p) diff --git a/pykafka/broker.py b/pykafka/broker.py index 68b4879b1..0aafc4845 100644 --- a/pykafka/broker.py +++ b/pykafka/broker.py @@ -29,7 +29,7 @@ OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceResponse) - +from .utils.compat import range, iteritems log = logging.getLogger(__name__) @@ -274,7 +274,7 @@ def request_metadata(self, topics=None): :type topics: Iterable of int """ max_retries = 3 - for i in xrange(max_retries): + for i in range(max_retries): if i > 0: log.debug("Retrying") time.sleep(i) @@ -282,10 +282,10 @@ def request_metadata(self, topics=None): future = self._req_handler.request(MetadataRequest(topics=topics)) response = future.get(MetadataResponse) - for name, topic_metadata in response.topics.iteritems(): + for name, topic_metadata in iteritems(response.topics): if topic_metadata.err == LeaderNotAvailable.ERROR_CODE: log.warning("Leader not available for topic '%s'.", name) - for pid, partition_metadata in topic_metadata.partitions.iteritems(): + for pid, partition_metadata in iteritems(topic_metadata.partitions): if partition_metadata.err == LeaderNotAvailable.ERROR_CODE: log.warning("Leader not available for topic '%s' partition %d.", name, pid) diff --git a/pykafka/client.py b/pykafka/client.py index 0762959b4..61a8caf28 100644 --- a/pykafka/client.py +++ b/pykafka/client.py @@ -18,9 +18,9 @@ """ __all__ = ["KafkaClient"] -import handlers +from .handlers import ThreadingHandler import logging -from cluster import Cluster +from .cluster import Cluster try: import rd_kafka @@ -71,7 +71,7 @@ def __init__(self, self._source_address = source_address self._socket_timeout_ms = socket_timeout_ms self._offsets_channel_socket_timeout_ms = offsets_channel_socket_timeout_ms - self._handler = None if use_greenlets else handlers.ThreadingHandler() + self._handler = None if use_greenlets else ThreadingHandler() self._use_rdkafka = rd_kafka and not ignore_rdkafka if self._use_rdkafka: log.info('Using rd_kafka extensions.') diff --git a/pykafka/cluster.py b/pykafka/cluster.py index 2ca8368b4..55f6dc401 100644 --- a/pykafka/cluster.py +++ b/pykafka/cluster.py @@ -29,7 +29,7 @@ UnknownTopicOrPartition) from .protocol import ConsumerMetadataRequest, ConsumerMetadataResponse from .topic import Topic - +from .utils.compat import iteritems, range, get_bytes log = logging.getLogger(__name__) @@ -43,6 +43,9 @@ def __init__(self, cluster, *args, **kwargs): def __missing__(self, key): log.warning('Topic %s not found. Attempting to auto-create.', key) + + key = get_bytes(key) + if self._create_topic(key): return self[key] else: @@ -56,16 +59,18 @@ def _create_topic(self, topic_name): with settings and everything, we'll implement that. To expose just this now would be disingenuous, since it's features would be hobbled. """ + topic_name = get_bytes(topic_name) + if len(self._cluster.brokers) == 0: log.warning("No brokers found. This is probably because of " "KAFKA-2154, which will be fixed in Kafka 0.8.3") raise KafkaException("Unable to retrieve metdata. Can't auto-create topic. See log for details.") # Auto-creating will take a moment, so we try 5 times. - for i in xrange(5): + for i in range(5): # Auto-creating is as simple as issuing a metadata request # solely for that topic. The update is just to be sure # our `Cluster` knows about it. - self._cluster.brokers[self._cluster.brokers.keys()[0]].request_metadata(topics=[topic_name]) + self._cluster.brokers[list(self._cluster.brokers.keys())[0]].request_metadata(topics=[topic_name]) self._cluster.update() if topic_name in self: log.info('Topic %s successfully auto-created.', topic_name) @@ -155,7 +160,7 @@ def _get_metadata(self): for broker_str in brokers: try: h, p = broker_str.split(':') - broker = Broker(-1, h, p, self._handler, + broker = Broker(-1, h, int(p), self._handler, self._socket_timeout_ms, self._offsets_channel_socket_timeout_ms, buffer_size=1024 * 1024, @@ -189,7 +194,7 @@ def _update_brokers(self, broker_metadata): # Add/update current brokers if len(broker_metadata) > 0: log.info('Discovered %d brokers', len(broker_metadata)) - for id_, meta in broker_metadata.iteritems(): + for id_, meta in iteritems(broker_metadata): if id_ not in self._brokers: log.debug('Discovered broker id %s: %s:%s', id_, meta.host, meta.port) self._brokers[id_] = Broker.from_metadata( @@ -225,7 +230,7 @@ def _update_topics(self, metadata): # Add/update partition information if len(metadata) > 0: log.info("Discovered %d topics", len(metadata)) - for name, meta in metadata.iteritems(): + for name, meta in iteritems(metadata): if not self._should_exclude_topic(name): if name not in self._topics: self._topics[name] = Topic(self, meta) @@ -237,7 +242,7 @@ def _should_exclude_topic(self, topic_name): """Should this topic be excluded from the list shown to the client?""" if not self._exclude_internal_topics: return False - return topic_name.startswith("__") + return topic_name.startswith(b"__") def get_offset_manager(self, consumer_group): """Get the broker designated as the offset manager for this consumer group. @@ -251,10 +256,10 @@ def get_offset_manager(self, consumer_group): log.info("Attempting to discover offset manager for consumer group '%s'", consumer_group) # arbitrarily choose a broker, since this request can go to any - broker = self.brokers[random.choice(self.brokers.keys())] + broker = self.brokers[random.choice(list(self.brokers.keys()))] MAX_RETRIES = 5 - for i in xrange(MAX_RETRIES): + for i in range(MAX_RETRIES): if i > 0: log.debug("Retrying offset manager discovery") time.sleep(i * 2) diff --git a/pykafka/connection.py b/pykafka/connection.py index 5bcee51f2..fb24f6dc6 100644 --- a/pykafka/connection.py +++ b/pykafka/connection.py @@ -24,6 +24,7 @@ from .exceptions import SocketDisconnectedError from .utils.socket import recvall_into +from .utils.compat import buffer log = logging.getLogger(__name__) @@ -101,6 +102,7 @@ def request(self, request): bytes = request.get_bytes() if not self._socket: raise SocketDisconnectedError + self._socket.sendall(bytes) def response(self): @@ -110,6 +112,7 @@ def response(self): # Happens when broker has shut down self.disconnect() raise SocketDisconnectedError + size = struct.unpack('!i', size)[0] recvall_into(self._socket, self._buff, size) return buffer(self._buff[4:4 + size]) diff --git a/pykafka/handlers.py b/pykafka/handlers.py index fd0f198b9..2bdb7cf35 100644 --- a/pykafka/handlers.py +++ b/pykafka/handlers.py @@ -20,8 +20,7 @@ import atexit import functools import threading -import Queue - +from .utils.compat import Queue, Empty from collections import namedtuple @@ -69,8 +68,8 @@ def spawn(self, target, *args, **kwargs): class ThreadingHandler(Handler): """A handler. that uses a :class:`threading.Thread` to perform its work""" - QueueEmptyError = Queue.Empty - Queue = Queue.Queue + QueueEmptyError = Empty + Queue = Queue Event = threading.Event Lock = threading.Lock # turn off RLock's super annoying default logging @@ -130,10 +129,11 @@ def worker(): task = self._requests.get() try: self.connection.request(task.request) + if task.future: res = self.connection.response() task.future.set_response(res) - except Exception, e: + except Exception as e: if task.future: task.future.set_error(e) finally: diff --git a/pykafka/producer.py b/pykafka/producer.py index 71db9cfb6..f648b69be 100644 --- a/pykafka/producer.py +++ b/pykafka/producer.py @@ -41,7 +41,7 @@ ) from .partitioners import random_partitioner from .protocol import Message, ProduceRequest - +from .utils.compat import string_types, get_bytes, iteritems, range, itervalues log = logging.getLogger(__name__) @@ -193,9 +193,9 @@ def produce(self, message, partition_key=None): """ if not self._running: raise ProducerStoppedException() - partitions = self._topic.partitions.values() + partitions = list(self._topic.partitions.values()) partition_id = self._partitioner(partitions, partition_key).id - message_partition_tup = (partition_key, str(message)), partition_id + message_partition_tup = (partition_key, get_bytes(message)), partition_id self._produce(message_partition_tup) if self._synchronous: self._wait_all() @@ -249,8 +249,8 @@ def _get_partition_msgs(partition_id, req): """Get all the messages for the partitions from the request.""" messages = itertools.chain.from_iterable( mset.messages - for topic, partitions in req.msets.iteritems() - for p_id, mset in partitions.iteritems() + for topic, partitions in iteritems(req.msets) + for p_id, mset in iteritems(partitions) if p_id == partition_id ) for message in messages: @@ -264,8 +264,8 @@ def _get_partition_msgs(partition_id, req): # Figure out if we need to retry any messages # TODO: Convert to using utils.handle_partition_responses to_retry = [] - for topic, partitions in response.topics.iteritems(): - for partition, presponse in partitions.iteritems(): + for topic, partitions in iteritems(response.topics): + for partition, presponse in iteritems(partitions): if presponse.err == 0: # mark msg_count messages as successfully delivered msg_count = len(req.msets[topic][partition].messages) @@ -304,8 +304,8 @@ def _get_partition_msgs(partition_id, req): self._cluster.update() to_retry = [ ((message.partition_key, message.value), p_id) - for topic, partitions in req.msets.iteritems() - for p_id, mset in partitions.iteritems() + for topic, partitions in iteritems(req.msets) + for p_id, mset in iteritems(partitions) for message in mset.messages ] @@ -329,14 +329,14 @@ def _update_leaders(self): """ # empty queues and figure out updated partition leaders new_queue_contents = defaultdict(list) - for owned_broker in self._owned_brokers.itervalues(): + for owned_broker in itervalues(self._owned_brokers): owned_broker.lock.acquire() current_queue_contents = owned_broker.flush(0, release_pending=True) for kv, partition_id in current_queue_contents: partition_leader = self._topic.partitions[partition_id].leader new_queue_contents[partition_leader.id].append((kv, partition_id)) # retain locks for all brokers between these two steps - for owned_broker in self._owned_brokers.itervalues(): + for owned_broker in itervalues(self._owned_brokers): owned_broker.enqueue(new_queue_contents[owned_broker.broker.id], self._block_on_queue_full) owned_broker.resolve_event_state() @@ -349,7 +349,7 @@ def _wait_all(self): and have not yet been dequeued and sent to the broker """ log.info("Blocking until all messages are sent") - while any(q.message_is_pending() for q in self._owned_brokers.itervalues()): + while any(q.message_is_pending() for q in itervalues(self._owned_brokers)): time.sleep(.3) self._raise_worker_exceptions() @@ -434,7 +434,7 @@ def flush(self, linger_ms, release_pending=False): """ self._wait_for_flush_ready(linger_ms) with self.lock: - batch = [self.queue.pop() for _ in xrange(len(self.queue))] + batch = [self.queue.pop() for _ in range(len(self.queue))] if release_pending: self.messages_pending -= len(batch) if not self.slot_available.is_set(): diff --git a/pykafka/protocol.py b/pykafka/protocol.py index 04b52d2c0..0bfee6122 100644 --- a/pykafka/protocol.py +++ b/pykafka/protocol.py @@ -63,6 +63,7 @@ from .common import CompressionType, Message from .exceptions import ERROR_CODES, NoMessagesConsumedError from .utils import Serializable, compression, struct_helpers +from .utils.compat import iteritems, itervalues, buffer, get_bytes log = logging.getLogger(__name__) @@ -71,7 +72,7 @@ class Request(Serializable): """Base class for all Requests. Handles writing header information""" HEADER_LEN = 21 # constant for all messages - CLIENT_ID = 'pykafka' + CLIENT_ID = b'pykafka' def _write_header(self, buff, api_version=0, correlation_id=0): """Write the header for an outgoing message. @@ -191,16 +192,20 @@ def pack_into(self, buff, offset): if self.partition_key is None: fmt = '!BBii%ds' % len(self.value) args = (self.MAGIC, self.compression_type, -1, - len(self.value), self.value) + len(self.value), get_bytes(self.value)) else: fmt = '!BBi%dsi%ds' % (len(self.partition_key), len(self.value)) args = (self.MAGIC, self.compression_type, - len(self.partition_key), self.partition_key, - len(self.value), self.value) + len(self.partition_key), + get_bytes(self.partition_key), + len(self.value), + get_bytes(self.value)) + struct.pack_into(fmt, buff, offset + 4, *args) fmt_size = struct.calcsize(fmt) - crc = crc32(buffer(buff[(offset + 4):(offset + 4 + fmt_size)])) - struct.pack_into('!i', buff, offset, crc) + data = buffer(buff[(offset + 4):(offset + 4 + fmt_size)]) + crc = crc32(data) & 0xffffffff + struct.pack_into('!I', buff, offset, crc) class MessageSet(Serializable): @@ -285,6 +290,7 @@ def decode(cls, buff, partition_id=-1): attempted = True if len(buff) - offset < size: break + # TODO: Check we have all the requisite bytes message = Message.decode(buff[offset:offset + size], msg_offset, @@ -292,6 +298,7 @@ def decode(cls, buff, partition_id=-1): # print '[%d] (%s) %s' % (message.offset, message.partition_key, message.value) messages.append(message) offset += size + if len(messages) == 0 and attempted: raise NoMessagesConsumedError() return MessageSet(messages=messages) @@ -334,7 +341,14 @@ def __init__(self, topics=None): :param topics: Topics to query. Leave empty for all available topics. """ - self.topics = topics or [] + self._topics = topics or [] + + @property + def topics(self): + if self._topics: + return [get_bytes(t) for t in self._topics] + + return self._topics def __len__(self): """Length of the serialized message, in bytes""" @@ -352,6 +366,7 @@ def get_bytes(self): :rtype: :class:`bytearray` """ output = bytearray(len(self)) + self._write_header(output) struct.pack_into('!i', output, self.HEADER_LEN, len(self.topics)) offset = self.HEADER_LEN + 4 @@ -394,7 +409,6 @@ def __init__(self, buff): fmt = '[iSi] [hS [hii [i] [i] ] ]' response = struct_helpers.unpack_from(fmt, buff, 0) broker_info, topics = response - self.brokers = {} for (id_, host, port) in broker_info: self.brokers[id_] = BrokerMetadata(id_, host, port) @@ -454,11 +468,11 @@ def __init__(self, def __len__(self): """Length of the serialized message, in bytes""" size = self.HEADER_LEN + 2 + 4 + 4 # acks + timeout + len(topics) - for topic, parts in self.msets.iteritems(): + for topic, parts in iteritems(self.msets): # topic name size += 2 + len(topic) + 4 # topic name + len(parts) # partition + mset size + len(mset) - size += sum(4 + 4 + len(mset) for mset in parts.itervalues()) + size += sum(4 + 4 + len(mset) for mset in itervalues(parts)) return size @property @@ -471,8 +485,8 @@ def messages(self): """Iterable of all messages in the Request""" return itertools.chain.from_iterable( mset.messages - for topic, partitions in self.msets.iteritems() - for partition_id, mset in partitions.iteritems() + for topic, partitions in iteritems(self.msets) + for partition_id, mset in iteritems(partitions) ) def add_message(self, message, topic_name, partition_id): @@ -497,11 +511,13 @@ def get_bytes(self): struct.pack_into('!hii', output, offset, self.required_acks, self.timeout, len(self.msets)) offset += 10 - for topic_name, partitions in self.msets.iteritems(): + for topic_name, partitions in iteritems(self.msets): + topic_name = get_bytes(topic_name) fmt = '!h%dsi' % len(topic_name) - struct.pack_into(fmt, output, offset, len(topic_name), topic_name, len(partitions)) + struct.pack_into(fmt, output, offset, len(topic_name), + topic_name, len(partitions)) offset += struct.calcsize(fmt) - for partition_id, message_set in partitions.iteritems(): + for partition_id, message_set in iteritems(partitions): mset_len = len(message_set) struct.pack_into('!ii', output, offset, partition_id, mset_len) offset += 8 @@ -617,7 +633,7 @@ def __len__(self): """Length of the serialized message, in bytes""" # replica + max wait + min bytes + len(topics) size = self.HEADER_LEN + 4 + 4 + 4 + 4 - for topic, parts in self._reqs.iteritems(): + for topic, parts in iteritems(self._reqs): # topic name + len(parts) size += 2 + len(topic) + 4 # partition + fetch offset + max bytes => for each partition @@ -641,11 +657,15 @@ def get_bytes(self): struct.pack_into('!iiii', output, offset, -1, self.timeout, self.min_bytes, len(self._reqs)) offset += 16 - for topic_name, partitions in self._reqs.iteritems(): + for topic_name, partitions in iteritems(self._reqs): + topic_name = get_bytes(topic_name) fmt = '!h%dsi' % len(topic_name) - struct.pack_into(fmt, output, offset, len(topic_name), topic_name, len(partitions)) + struct.pack_into( + fmt, output, offset, len(topic_name), topic_name, + len(partitions) + ) offset += struct.calcsize(fmt) - for partition_id, (fetch_offset, max_bytes) in partitions.iteritems(): + for partition_id, (fetch_offset, max_bytes) in iteritems(partitions): struct.pack_into('!iqi', output, offset, partition_id, fetch_offset, max_bytes) offset += 16 @@ -749,7 +769,7 @@ def __len__(self): """Length of the serialized message, in bytes""" # Header + replicaId + len(topics) size = self.HEADER_LEN + 4 + 4 - for topic, parts in self._reqs.iteritems(): + for topic, parts in iteritems(self._reqs): # topic name + len(parts) size += 2 + len(topic) + 4 # partition + fetch offset + max bytes => for each partition @@ -772,12 +792,13 @@ def get_bytes(self): offset = self.HEADER_LEN struct.pack_into('!ii', output, offset, -1, len(self._reqs)) offset += 8 - for topic_name, partitions in self._reqs.iteritems(): + for topic_name, partitions in iteritems(self._reqs): fmt = '!h%dsi' % len(topic_name) struct.pack_into(fmt, output, offset, len(topic_name), - topic_name, len(partitions)) + get_bytes(topic_name), len(partitions)) + offset += struct.calcsize(fmt) - for pnum, (offsets_before, max_offsets) in partitions.iteritems(): + for pnum, (offsets_before, max_offsets) in iteritems(partitions): struct.pack_into('!iqi', output, offset, pnum, offsets_before, max_offsets) offset += 16 @@ -824,7 +845,7 @@ class ConsumerMetadataRequest(Request): """ def __init__(self, consumer_group): """Create a new consumer metadata request""" - self.consumer_group = consumer_group + self.consumer_group = get_bytes(consumer_group) def __len__(self): """Length of the serialized message, in bytes""" @@ -933,13 +954,13 @@ def __len__(self): size = self.HEADER_LEN + 2 + len(self.consumer_group) # + generation id + string size + consumer_id size + array length size += 4 + 2 + len(self.consumer_id) + 4 - for topic, parts in self._reqs.iteritems(): + for topic, parts in iteritems(self._reqs): # topic name + len(parts) size += 2 + len(topic) + 4 # partition + offset + timestamp => for each partition size += (4 + 8 + 8) * len(parts) # metadata => for each partition - for partition, (_, _, metadata) in parts.iteritems(): + for partition, (_, _, metadata) in iteritems(parts): size += 2 + len(metadata) return size @@ -958,19 +979,27 @@ def get_bytes(self): self._write_header(output, api_version=1) offset = self.HEADER_LEN fmt = '!h%dsih%dsi' % (len(self.consumer_group), len(self.consumer_id)) + + consumer_group = get_bytes(self.consumer_group) + consumer_id = get_bytes(self.consumer_id) + struct.pack_into(fmt, output, offset, - len(self.consumer_group), self.consumer_group, + len(consumer_group), + consumer_group, self.consumer_group_generation_id, - len(self.consumer_id), self.consumer_id, + len(consumer_id), consumer_id, len(self._reqs)) + offset += struct.calcsize(fmt) - for topic_name, partitions in self._reqs.iteritems(): + for topic_name, partitions in iteritems(self._reqs): + topic_name = get_bytes(topic_name) fmt = '!h%dsi' % len(topic_name) struct.pack_into(fmt, output, offset, len(topic_name), topic_name, len(partitions)) offset += struct.calcsize(fmt) - for pnum, (poffset, timestamp, metadata) in partitions.iteritems(): + for pnum, (poffset, timestamp, metadata) in iteritems(partitions): fmt = '!iqq' + metadata = get_bytes(metadata) struct.pack_into(fmt, output, offset, pnum, poffset, timestamp) offset += struct.calcsize(fmt) @@ -980,6 +1009,7 @@ def get_bytes(self): if metalen != -1: fmt += '%ds' % metalen pack_args = [fmt, output, offset, metalen, metadata] + struct.pack_into(*pack_args) offset += struct.calcsize(fmt) return output @@ -1054,7 +1084,7 @@ def __len__(self): """Length of the serialized message, in bytes""" # Header + consumer group + len(topics) size = self.HEADER_LEN + 2 + len(self.consumer_group) + 4 - for topic, parts in self._reqs.iteritems(): + for topic, parts in iteritems(self._reqs): # topic name + len(parts) size += 2 + len(topic) + 4 # partition => for each partition @@ -1077,13 +1107,14 @@ def get_bytes(self): offset = self.HEADER_LEN fmt = '!h%dsi' % len(self.consumer_group) struct.pack_into(fmt, output, offset, - len(self.consumer_group), self.consumer_group, + len(self.consumer_group), + get_bytes(self.consumer_group), len(self._reqs)) offset += struct.calcsize(fmt) - for topic_name, partitions in self._reqs.iteritems(): + for topic_name, partitions in iteritems(self._reqs): fmt = '!h%dsi' % len(topic_name) struct.pack_into(fmt, output, offset, len(topic_name), - topic_name, len(partitions)) + get_bytes(topic_name), len(partitions)) offset += struct.calcsize(fmt) for pnum in partitions: fmt = '!i' diff --git a/pykafka/simpleconsumer.py b/pykafka/simpleconsumer.py index 812069305..74e5a0b8b 100644 --- a/pykafka/simpleconsumer.py +++ b/pykafka/simpleconsumer.py @@ -23,10 +23,10 @@ import time import threading from collections import defaultdict -from Queue import Queue, Empty from .common import OffsetType -from .utils.compat import Semaphore +from .utils.compat import (Semaphore, Queue, Empty, iteritems, itervalues, + range, iterkeys) from .exceptions import (OffsetOutOfRangeError, UnknownTopicOrPartition, OffsetMetadataTooLarge, OffsetsLoadInProgress, NotCoordinatorForConsumer, SocketDisconnectedError, @@ -162,12 +162,12 @@ def __init__(self, else: self._partitions = {topic.partitions[k]: OwnedPartition(p, self._messages_arrived) - for k, p in topic.partitions.iteritems()} + for k, p in iteritems(topic.partitions)} self._partitions_by_id = {p.partition.id: p - for p in self._partitions.itervalues()} + for p in itervalues(self._partitions)} # Organize partitions by leader for efficient queries self._partitions_by_leader = defaultdict(list) - for p in self._partitions.itervalues(): + for p in itervalues(self._partitions): self._partitions_by_leader[p.partition.leader].append(p) self.partition_cycle = itertools.cycle(self._partitions.values()) @@ -240,13 +240,13 @@ def topic(self): def partitions(self): """A list of the partitions that this consumer consumes""" return {id_: partition.partition - for id_, partition in self._partitions_by_id.iteritems()} + for id_, partition in iteritems(self._partitions_by_id)} @property def held_offsets(self): """Return a map from partition id to held offset for each partition""" return {p.partition.id: p.last_offset_consumed - for p in self._partitions_by_id.itervalues()} + for p in itervalues(self._partitions_by_id)} def __del__(self): """Stop consumption and workers when object is deleted""" @@ -280,7 +280,7 @@ def fetcher(): log.debug("Fetcher thread exiting") log.info("Starting %s fetcher threads", self._num_consumer_fetchers) return [self._cluster.handler.spawn(fetcher) - for i in xrange(self._num_consumer_fetchers)] + for i in range(self._num_consumer_fetchers)] def __iter__(self): """Yield an infinite stream of messages until the consumer times out""" @@ -309,7 +309,7 @@ def consume(self, block=True): # least one message is waiting in some queue. message = None while not message: - owned_partition = self.partition_cycle.next() + owned_partition = next(self.partition_cycle) message = owned_partition.consume() return message else: @@ -341,7 +341,7 @@ def commit_offsets(self): reqs = [p.build_offset_commit_request() for p in self._partitions.values()] log.debug("Committing offsets for %d partitions to broker id %s", len(reqs), self._offset_manager.id) - for i in xrange(self._offsets_commit_max_retries): + for i in range(self._offsets_commit_max_retries): if i > 0: log.debug("Retrying") time.sleep(i * (self._offsets_channel_backoff_ms / 1000)) @@ -357,13 +357,15 @@ def commit_offsets(self): log.error("Error committing offsets for topic %s (errors: %s)", self._topic.name, {ERROR_CODES[err]: [op.partition.id for op, _ in parts] - for err, parts in parts_by_error.iteritems()}) + for err, parts in iteritems(parts_by_error)}) # retry only the partitions that errored if 0 in parts_by_error: parts_by_error.pop(0) - errored_partitions = [op for code, err_group in parts_by_error.iteritems() - for op, res in err_group] + errored_partitions = [ + op for code, err_group in iteritems(parts_by_error) + for op, res in err_group + ] reqs = [p.build_offset_commit_request() for p in errored_partitions] def fetch_offsets(self): @@ -390,7 +392,7 @@ def _handle_success(parts): log.debug("Fetching offsets for %d partitions from broker id %s", len(reqs), self._offset_manager.id) - for i in xrange(self._offsets_fetch_max_retries): + for i in range(self._offsets_fetch_max_retries): if i > 0: log.debug("Retrying offset fetch") @@ -408,7 +410,7 @@ def _handle_success(parts): log.error("Error fetching offsets for topic %s (errors: %s)", self._topic.name, {ERROR_CODES[err]: [op.partition.id for op, _ in parts] - for err, parts in parts_by_error.iteritems()}) + for err, parts in iteritems(parts_by_error)}) time.sleep(i * (self._offsets_channel_backoff_ms / 1000)) @@ -485,10 +487,10 @@ def _handle_success(parts): log.info("Resetting offsets for %s partitions", len(list(owned_partition_offsets))) - for i in xrange(self._offsets_reset_max_retries): + for i in range(self._offsets_reset_max_retries): # group partitions by leader by_leader = defaultdict(list) - for partition, offset in owned_partition_offsets.iteritems(): + for partition, offset in iteritems(owned_partition_offsets): # acquire lock for each partition to stop fetching during offset # reset if partition.fetch_lock.acquire(True): @@ -498,7 +500,7 @@ def _handle_success(parts): by_leader[partition.partition.leader].append((partition, offset)) # get valid offset ranges for each partition - for broker, offsets in by_leader.iteritems(): + for broker, offsets in iteritems(by_leader): reqs = [owned_partition.build_offset_request(offset) for owned_partition, offset in offsets] response = broker.request_offset_limits(reqs) @@ -511,17 +513,19 @@ def _handle_success(parts): if 0 in parts_by_error: # drop successfully reset partitions for next retry successful = [part for part, _ in parts_by_error.pop(0)] - map(owned_partition_offsets.pop, successful) + # py3 creates a generate so we need to evaluate this + # operation + list(map(owned_partition_offsets.pop, successful)) if not parts_by_error: continue log.error("Error resetting offsets for topic %s (errors: %s)", self._topic.name, {ERROR_CODES[err]: [op.partition.id for op, _ in parts] - for err, parts in parts_by_error.iteritems()}) + for err, parts in iteritems(parts_by_error)}) time.sleep(i * (self._offsets_channel_backoff_ms / 1000)) - for errcode, owned_partitions in parts_by_error.iteritems(): + for errcode, owned_partitions in iteritems(parts_by_error): if errcode != 0: for owned_partition in owned_partitions: owned_partition.fetch_lock.release() @@ -554,7 +558,7 @@ def _handle_success(parts): owned_partition.partition.id, owned_partition.message_count) - for broker, owned_partitions in self._partitions_by_leader.iteritems(): + for broker, owned_partitions in iteritems(self._partitions_by_leader): partition_reqs = {} for owned_partition in owned_partitions: # attempt to acquire lock, just pass if we can't @@ -571,7 +575,7 @@ def _handle_success(parts): if partition_reqs: try: response = broker.fetch_messages( - [a for a in partition_reqs.itervalues() if a], + [a for a in itervalues(partition_reqs) if a], timeout=self._fetch_wait_max_ms, min_bytes=self._fetch_min_bytes ) @@ -603,7 +607,7 @@ def _handle_success(parts): parts_by_error=parts_by_error, success_handler=_handle_success) # unlock the rest of the partitions - for owned_partition in partition_reqs.iterkeys(): + for owned_partition in iterkeys(partition_reqs): owned_partition.fetch_lock.release() diff --git a/pykafka/test/kafka_instance.py b/pykafka/test/kafka_instance.py index 2dfc0b4cc..fb6fae337 100644 --- a/pykafka/test/kafka_instance.py +++ b/pykafka/test/kafka_instance.py @@ -29,6 +29,8 @@ from testinstances import utils from testinstances.exceptions import ProcessNotStartingError from testinstances.managed_instance import ManagedInstance +from pykafka.utils.compat import range, get_bytes, get_string + log = logging.getLogger(__name__) @@ -80,7 +82,7 @@ def _run_topics_sh(self, args): """Run kafka-topics.sh with the provided list of arguments.""" binfile = os.path.join(self._bin_dir, 'bin/kafka-topics.sh') cmd = [binfile, '--zookeeper', self.zookeeper] + args - cmd = [str(c) for c in cmd] # execv needs only strings + cmd = [get_string(c) for c in cmd] # execv needs only strings log.debug('running: %s', ' '.join(cmd)) return subprocess.check_output(cmd) @@ -105,7 +107,7 @@ def flush(self): def list_topics(self): """Use kafka-topics.sh to get topic information.""" res = self._run_topics_sh(['--list']) - return res.strip().split('\n') + return res.strip().split(b'\n') def produce_messages(self, topic_name, messages, batch_size=200): """Produce some messages to a topic.""" @@ -114,10 +116,10 @@ def produce_messages(self, topic_name, messages, batch_size=200): '--broker-list', self.brokers, '--topic', topic_name, '--batch-size', batch_size] - cmd = [str(c) for c in cmd] # execv needs only strings + cmd = [get_string(c) for c in cmd] # execv needs only strings log.debug('running: %s', ' '.join(cmd)) proc = subprocess.Popen(cmd, stdin=subprocess.PIPE) - proc.communicate(input='\n'.join(messages)) + proc.communicate(input=get_bytes('\n'.join(messages))) if proc.poll() is None: proc.kill() @@ -128,12 +130,14 @@ class KafkaInstance(ManagedInstance): def __init__(self, num_instances=1, kafka_version='0.8.2.1', + scala_version='2.10', bin_dir='/tmp/kafka-bin', name='kafka', use_gevent=False): """Start kafkainstace with given settings""" self._num_instances = num_instances self._kafka_version = kafka_version + self._scala_version = scala_version self._bin_dir = bin_dir self._processes = [] self.zookeeper = None @@ -174,9 +178,15 @@ def _download_kafka(self): log.info('Downloading Kafka.') curr_dir = os.getcwd() os.chdir(self._bin_dir) - url = 'http://mirror.reverse.net/pub/apache/kafka/{version}/kafka_2.10-{version}.tgz'.format(version=self._kafka_version) + url_fmt = 'http://mirror.reverse.net/pub/apache/kafka/{kafka_version}/kafka_{scala_version}-{kafka_version}.tgz' + url = url_fmt.format( + scala_version=self._scala_version, + kafka_version=self._kafka_version + ) p1 = subprocess.Popen(['curl', '-vs', url], stdout=subprocess.PIPE) - p2 = subprocess.Popen(['tar', 'xvz', '-C', self._bin_dir, '--strip-components', '1'], stdin=p1.stdout, stdout=subprocess.PIPE) + p2 = subprocess.Popen(['tar', 'xvz', '-C', self._bin_dir, + '--strip-components', '1'], + stdin=p1.stdout, stdout=subprocess.PIPE) p1.stdout.close() output, err = p2.communicate() os.chdir(curr_dir) @@ -189,7 +199,7 @@ def _is_port_free(self, port): s = socket.create_connection(('localhost', port)) s.close() return False - except IOError, err: + except IOError as err: return err.errno == errno.ECONNREFUSED def _port_generator(self, start): @@ -215,7 +225,7 @@ def _start_process(self): # Process is started when the port isn't free anymore all_ports = [zk_port] + broker_ports - for i in xrange(10): + for i in range(10): if all(not self._is_port_free(port) for port in all_ports): log.info('Kafka cluster started.') return # hooray! success @@ -245,8 +255,8 @@ def _start_brokers(self): self._broker_procs = [] ports = self._port_generator(9092) used_ports = [] - for i in xrange(self._num_instances): - port = ports.next() + for i in range(self._num_instances): + port = next(ports) used_ports.append(port) log.info('Starting Kafka on port %i.', port) @@ -271,7 +281,7 @@ def _start_brokers(self): return used_ports def _start_zookeeper(self): - port = self._port_generator(2181).next() + port = next(self._port_generator(2181)) log.info('Starting zookeeper on port %i.', port) conf = os.path.join(self._conf_dir, 'zk.properties') @@ -309,7 +319,8 @@ def flush(self): def create_topic(self, topic_name, num_partitions, replication_factor): """Use kafka-topics.sh to create a topic.""" - return self.connection.create_topic(topic_name, num_partitions, replication_factor) + return self.connection.create_topic(topic_name, num_partitions, + replication_factor) def delete_topic(self, topic_name): return self.connection.delete_topic(topic_name) @@ -341,18 +352,18 @@ def produce_messages(self, topic_name, messages): def _catch_sigint(signum, frame): global _exiting _exiting = True - print 'SIGINT received.' + print('SIGINT received.') signal.signal(signal.SIGINT, _catch_sigint) cluster = KafkaInstance(num_instances=args.num_brokers, kafka_version=args.kafka_version, bin_dir=args.download_dir) - print 'Cluster started.' - print 'Brokers: {brokers}'.format(brokers=cluster.brokers) - print 'Zookeeper: {zk}'.format(zk=cluster.zookeeper) - print 'Waiting for SIGINT to exit.' + print('Cluster started.') + print('Brokers: {brokers}'.format(brokers=cluster.brokers)) + print('Zookeeper: {zk}'.format(zk=cluster.zookeeper)) + print('Waiting for SIGINT to exit.') while True: if _exiting: - print 'Exiting.' + print('Exiting.') sys.exit(0) time.sleep(1) diff --git a/pykafka/test/utils.py b/pykafka/test/utils.py index fde4e62e4..179601b3f 100644 --- a/pykafka/test/utils.py +++ b/pykafka/test/utils.py @@ -2,6 +2,16 @@ from pykafka.test.kafka_instance import KafkaInstance, KafkaConnection +try: + import unittest2 as unittest +except ImportError: + import unittest + +try: + from unittest import mock +except ImportError: + import mock + def get_cluster(): """Gets a Kafka cluster for testing, using one already running is possible. diff --git a/pykafka/topic.py b/pykafka/topic.py index 48164a750..8c3c3fd5f 100644 --- a/pykafka/topic.py +++ b/pykafka/topic.py @@ -26,6 +26,7 @@ from .producer import Producer from .protocol import PartitionOffsetRequest from .simpleconsumer import SimpleConsumer +from .utils.compat import iteritems, itervalues log = logging.getLogger(__name__) @@ -94,12 +95,12 @@ def fetch_offset_limits(self, offsets_before, max_offsets=1): :type max_offsets: int """ requests = defaultdict(list) # one request for each broker - for part in self.partitions.itervalues(): + for part in itervalues(self.partitions): requests[part.leader].append(PartitionOffsetRequest( self.name, part.id, offsets_before, max_offsets )) output = {} - for broker, reqs in requests.iteritems(): + for broker, reqs in iteritems(requests): res = broker.request_offset_limits(reqs) output.update(res.topics[self.name]) return output @@ -132,7 +133,7 @@ def update(self, metadata): brokers = self._cluster.brokers if len(p_metas) > 0: log.info("Adding %d partitions", len(p_metas)) - for id_, meta in p_metas.iteritems(): + for id_, meta in iteritems(p_metas): if meta.id not in self._partitions: log.debug('Adding partition %s/%s', self.name, meta.id) self._partitions[meta.id] = Partition( diff --git a/pykafka/utils/compat.py b/pykafka/utils/compat.py index 6fee61c97..99499aba5 100644 --- a/pykafka/utils/compat.py +++ b/pykafka/utils/compat.py @@ -1,16 +1,71 @@ import sys +import platform; __all__ = ['PY3', 'Semaphore'] PY3 = sys.version_info[0] >= 3 - +IS_PYPY = platform.python_implementation().lower() == 'pypy' + +def get_bytes(value): + if hasattr(value, 'encode'): + try: + value = value.encode('utf-8') + except: + # if we can't encode the value just pass it along + pass + + return value + +def get_string(value): + if hasattr(value, 'decode'): + try: + value = value.decode('utf-8') + except: + # if we can't decode the value just pass it along + pass + else: + value = str(value) + + return value if PY3: from threading import Semaphore + from queue import Queue, Empty + from io import BytesIO as StringIO + range = range + + def iteritems(d, **kw): + return iter(d.items(**kw)) + + def itervalues(d, **kw): + return iter(d.values(**kw)) + + def iterkeys(d, **kw): + return iter(d.keys(**kw)) + + def buffer(val): + return bytes(memoryview(val)) + + string_types = str, else: + range = xrange from threading import Condition, Lock # could use monotonic.monotonic() backport as well here... from time import time as _time + from Queue import Queue, Empty + from StringIO import StringIO + + def iteritems(d, **kw): + return iter(d.iteritems(**kw)) + + def itervalues(d, **kw): + return iter(d.itervalues(**kw)) + + def iterkeys(d, **kw): + return iter(d.iterkeys(**kw)) + + buffer = buffer + string_types = basestring, # -- begin unmodified backport of threading.Semaphore from Python 3.4 -- # class Semaphore: diff --git a/pykafka/utils/compression.py b/pykafka/utils/compression.py index 96e155441..533fda1b0 100644 --- a/pykafka/utils/compression.py +++ b/pykafka/utils/compression.py @@ -21,7 +21,7 @@ import logging import struct -from cStringIO import StringIO +from .compat import StringIO, range, get_bytes, buffer, IS_PYPY try: import snappy @@ -30,7 +30,7 @@ log = logging.getLogger(__name__) # constants used in snappy xerial encoding/decoding -_XERIAL_V1_HEADER = (-126, 'S', 'N', 'A', 'P', 'P', 'Y', 0, 1, 1) +_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1) _XERIAL_V1_FORMAT = 'bccccccBii' @@ -79,15 +79,22 @@ def encode_snappy(buff, xerial_compatible=False, xerial_blocksize=32 * 1024): Adapted from kafka-python https://github.com/mumrah/kafka-python/pull/127/files """ + buff = get_bytes(buff) + #snappy segfaults if it gets a read-only buffer on PyPy + if IS_PYPY: + buff = bytes(buff) if snappy is None: raise ImportError("Please install python-snappy") if xerial_compatible: def _chunker(): - for i in xrange(0, len(buff), xerial_blocksize): + for i in range(0, len(buff), xerial_blocksize): yield buff[i:i + xerial_blocksize] out = StringIO() - header = ''.join([struct.pack('!' + fmt, dat) for fmt, dat - in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)]) + full_data = list(zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)) + header = b''.join( + [struct.pack('!' + fmt, dat) for fmt, dat in full_data + ]) + out.write(header) for chunk in _chunker(): block = snappy.compress(chunk) @@ -113,7 +120,7 @@ def decode_snappy(buff): raise ImportError("Please install python-snappy") if _detect_xerial_stream(buff): out = StringIO() - body = buffer(buff[16:]) + body = bytes(buffer(buff[16:])) length = len(body) cursor = 0 while cursor < length: diff --git a/pykafka/utils/error_handlers.py b/pykafka/utils/error_handlers.py index 4ffa7132b..729ae1f87 100644 --- a/pykafka/utils/error_handlers.py +++ b/pykafka/utils/error_handlers.py @@ -18,6 +18,7 @@ """ __all__ = ["handle_partition_responses", "raise_error"] from collections import defaultdict +from .compat import iteritems def handle_partition_responses(error_handlers, @@ -48,7 +49,7 @@ def handle_partition_responses(error_handlers, if parts_by_error is None: parts_by_error = build_parts_by_error(response, partitions_by_id) - for errcode, parts in parts_by_error.iteritems(): + for errcode, parts in iteritems(parts_by_error): if errcode in error_handlers: error_handlers[errcode](parts) @@ -68,7 +69,7 @@ def build_parts_by_error(response, partitions_by_id): # group partition responses by error code parts_by_error = defaultdict(list) for topic_name in response.topics.keys(): - for partition_id, pres in response.topics[topic_name].iteritems(): + for partition_id, pres in iteritems(response.topics[topic_name]): owned_partition = None if partitions_by_id is not None: owned_partition = partitions_by_id[partition_id] diff --git a/pykafka/utils/struct_helpers.py b/pykafka/utils/struct_helpers.py index c6dbf7f23..c991d00e0 100644 --- a/pykafka/utils/struct_helpers.py +++ b/pykafka/utils/struct_helpers.py @@ -19,6 +19,7 @@ __all__ = ["unpack_from"] import itertools import struct +from .compat import range, get_bytes def unpack_from(fmt, buff, offset=0): @@ -45,6 +46,7 @@ def unpack_from(fmt, buff, offset=0): if fmt[0] in '!><': fmt = fmt[1:] # It's always network ordering + buff = get_bytes(buff) output = _unpack(fmt, buff, offset, 1)[0] # whole-message arrays come back weird @@ -68,6 +70,7 @@ def _unpack(fmt, buff, offset, count=1): """ items = [] array_fmt = None + buff = get_bytes(buff) for i, ch in enumerate(fmt): if array_fmt is not None: if ch == ']': @@ -92,7 +95,8 @@ def _unpack(fmt, buff, offset, count=1): items.append(None) continue ch = '%ds' % len_ - items.extend(struct.unpack_from('!' + ch, buff, offset)) + unpacked_data = struct.unpack_from('!' + ch, buff, offset) + items.extend(unpacked_data) offset += struct.calcsize(ch) return tuple(items), offset @@ -110,9 +114,10 @@ def _unpack_array(fmt, buff, offset, count): :type count: int """ output = [] - for i in xrange(count): + for i in range(count): item, offset = _unpack(fmt, buff, offset) output.append(item) if len(fmt) == 1: output = list(itertools.chain.from_iterable(output)) + return output, offset diff --git a/setup.cfg b/setup.cfg index accca897f..a5a5febbb 100644 --- a/setup.cfg +++ b/setup.cfg @@ -2,3 +2,32 @@ logging-clear-handlers = 1 verbosity = 2 detailed-errors = 1 + +[pytest] +norecursedirs = build docs/_build *.egg .tox *.venv requirements/ +addopts = + # Shows a line for every test + # You probably want to turn this off if you use pytest-sugar. + # Or you can keep it and run `py.test -q`. + --verbose + + # Shorter tracebacks are sometimes easier to read + # --tb=short + + # Turn on --capture to have brief, less noisy output. + # You will only see output if the test fails. + # Use --capture no (same as -s) if you want to see it all or have problems + # debugging. + # --capture=fd + # --capture=no + + # Show extra test summary info as specified by chars (f)ailed, (E)error, (s)skipped, (x)failed, (X)passed. + -rfEsxX + + # FIXME: This is commented out for now while doing TDD + # Measure code coverage + --cov=pykafka --cov-report=xml --cov-report=term-missing + + # Previous versions included the following, but it's a bad idea because it + # hard-codes the value and makes it hard to change from the command-line + # tests/ diff --git a/setup.py b/setup.py index 726e24802..d5e55c736 100755 --- a/setup.py +++ b/setup.py @@ -16,8 +16,10 @@ """ import re import sys +import os from setuptools import setup, find_packages +from setuptools.command.test import test as TestCommand # Get version without importing, which avoids dependency issues def get_version(): @@ -35,9 +37,35 @@ def get_version(): 'pyflakes' ] -tests_require = ['mock', 'nose', 'unittest2', 'python-snappy'] +def read_lines(fname): + with open(os.path.join(os.path.dirname(__file__), fname)) as f: + return f.readlines() + +tests_require = [ + x.strip() for x in read_lines('test-requirements.txt') if not x.startswith('-') +] + dependency_links = [] setup_requires = [] + +class PyTest(TestCommand): + user_options = [('pytest-args=', 'a', "Arguments to pass to py.test")] + + def initialize_options(self): + TestCommand.initialize_options(self) + self.pytest_args = [] + + def finalize_options(self): + TestCommand.finalize_options(self) + self.test_args = [] + self.test_suite = True + + def run_tests(self): + # import here, cause outside the eggs aren't loaded + import pytest + errno = pytest.main(self.pytest_args) + sys.exit(errno) + if 'nosetests' in sys.argv[1:]: setup_requires.append('nose') @@ -65,6 +93,7 @@ def get_version(): 'docs': ['sphinx'] + tests_require, 'lint': lint_requires }, + cmdclass={'test': PyTest}, dependency_links=dependency_links, zip_safe=False, test_suite='nose.collector', diff --git a/test-requirements.txt b/test-requirements.txt new file mode 100644 index 000000000..5e6d5a69e --- /dev/null +++ b/test-requirements.txt @@ -0,0 +1,6 @@ +pytest +pytest-cov +python-snappy +#testinstances + +-e git+https://github.com/sontek/testinstances.git@fix_tests#egg=testinstances diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 000000000..5c136b325 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,2 @@ +import logging +logging.basicConfig(level=logging.DEBUG) diff --git a/tests/pykafka/test_balancedconsumer.py b/tests/pykafka/test_balancedconsumer.py index 528831bdb..e69aca9c2 100644 --- a/tests/pykafka/test_balancedconsumer.py +++ b/tests/pykafka/test_balancedconsumer.py @@ -1,20 +1,19 @@ import math import time -import mock -import unittest2 +from pykafka.test.utils import unittest, mock from pykafka import KafkaClient from pykafka.balancedconsumer import BalancedConsumer from pykafka.test.utils import get_cluster, stop_cluster - +from pykafka.utils.compat import range def buildMockConsumer(num_partitions=10, num_participants=1, timeout=2000): consumer_group = 'testgroup' topic = mock.Mock() topic.name = 'testtopic' topic.partitions = {} - for k in xrange(num_partitions): + for k in range(num_partitions): part = mock.Mock(name='part-{part}'.format(part=k)) part.id = k part.topic = topic @@ -29,7 +28,7 @@ def buildMockConsumer(num_partitions=10, num_participants=1, timeout=2000): consumer_timeout_ms=timeout), topic -class TestBalancedConsumer(unittest2.TestCase): +class TestBalancedConsumer(unittest.TestCase): @classmethod def setUpClass(cls): cls._consumer_timeout = 2000 @@ -56,18 +55,18 @@ def test_consume_graceful_stop(self): def test_decide_partitions(self): """Test partition assignment for a number of partitions/consumers.""" # 100 test iterations - for i in xrange(100): + for i in range(100): # Set up partitions, cluster, etc num_participants = i + 1 num_partitions = 100 - i - participants = ['test-debian:{p}'.format(p=p) - for p in xrange(num_participants)] + participants = sorted(['test-debian:{p}'.format(p=p) + for p in range(num_participants)]) cns, topic = buildMockConsumer(num_partitions=num_partitions, num_participants=num_participants) # Simulate each participant to ensure they're correct assigned_parts = [] - for p_id in xrange(num_participants): + for p_id in range(num_participants): cns._consumer_id = participants[p_id] # override consumer id # Decide partitions then validate @@ -78,28 +77,31 @@ def test_decide_partitions(self): idx = participants.index(cns._consumer_id) parts_per_consumer = num_partitions / num_participants parts_per_consumer = math.floor(parts_per_consumer) + num_parts = parts_per_consumer + (0 if (idx + 1 > remainder_ppc) else 1) - self.assertEqual(len(partitions), num_parts) + self.assertEqual(len(partitions), int(num_parts)) # Validate all partitions were assigned once and only once all_partitions = topic.partitions.values() - all_partitions.sort() - assigned_parts.sort() + all_partitions = sorted(all_partitions, key=lambda x: x.id) + assigned_parts = sorted(assigned_parts, key=lambda x: x.id) self.assertListEqual(assigned_parts, all_partitions) -class BalancedConsumerIntegrationTests(unittest2.TestCase): +class BalancedConsumerIntegrationTests(unittest.TestCase): maxDiff = None @classmethod def setUpClass(cls): cls.kafka = get_cluster() - cls.topic_name = 'test-data' + cls.topic_name = b'test-data' cls.kafka.create_topic(cls.topic_name, 3, 2) cls.client = KafkaClient(cls.kafka.brokers) - prod = cls.client.topics[cls.topic_name].get_producer(min_queued_messages=1) - for i in xrange(1000): + prod = cls.client.topics[cls.topic_name].get_producer( + min_queued_messages=1 + ) + for i in range(1000): prod.produce('msg {num}'.format(num=i)) @classmethod @@ -108,13 +110,17 @@ def tearDownClass(cls): def test_consume(self): try: - consumer_a = self.client.topics[self.topic_name].get_balanced_consumer('test_consume', zookeeper_connect=self.kafka.zookeeper) - consumer_b = self.client.topics[self.topic_name].get_balanced_consumer('test_consume', zookeeper_connect=self.kafka.zookeeper) + consumer_a = self.client.topics[self.topic_name].get_balanced_consumer( + 'test_consume', zookeeper_connect=self.kafka.zookeeper + ) + consumer_b = self.client.topics[self.topic_name].get_balanced_consumer( + 'test_consume', zookeeper_connect=self.kafka.zookeeper + ) # Consume from both a few times - messages = [consumer_a.consume() for i in xrange(1)] + messages = [consumer_a.consume() for i in range(1)] self.assertTrue(len(messages) == 1) - messages = [consumer_b.consume() for i in xrange(1)] + messages = [consumer_b.consume() for i in range(1)] self.assertTrue(len(messages) == 1) # Validate they aren't sharing partitions @@ -128,10 +134,14 @@ def test_consume(self): consumer_a._partitions | consumer_b._partitions, set(self.client.topics[self.topic_name].partitions.values()) ) + except Exception as e: + print(e) + raise + print(hi) finally: consumer_a.stop() consumer_b.stop() if __name__ == "__main__": - unittest2.main() + unittest.main() diff --git a/tests/pykafka/test_partition.py b/tests/pykafka/test_partition.py index c7b6d6528..7c004a215 100644 --- a/tests/pykafka/test_partition.py +++ b/tests/pykafka/test_partition.py @@ -1,10 +1,9 @@ -import unittest2 - +from pykafka.test.utils import unittest from pykafka import KafkaClient from pykafka.test.utils import get_cluster, stop_cluster -class TestPartitionInfo(unittest2.TestCase): +class TestPartitionInfo(unittest.TestCase): @classmethod def setUpClass(cls): cls.kafka = get_cluster() @@ -32,4 +31,4 @@ def test_can_get_latest_offset(self): self.assertTrue(partition.latest_available_offset()) if __name__ == "__main__": - unittest2.main() + unittest.main() diff --git a/tests/pykafka/test_producer.py b/tests/pykafka/test_producer.py index db21605a6..c49122d0d 100644 --- a/tests/pykafka/test_producer.py +++ b/tests/pykafka/test_producer.py @@ -1,15 +1,14 @@ from __future__ import division import time -import unittest2 from uuid import uuid4 from pykafka import KafkaClient from pykafka.exceptions import ProducerQueueFullError -from pykafka.test.utils import get_cluster, stop_cluster +from pykafka.test.utils import get_cluster, stop_cluster, unittest -class ProducerIntegrationTests(unittest2.TestCase): +class ProducerIntegrationTests(unittest.TestCase): maxDiff = None @classmethod @@ -36,7 +35,7 @@ def test_produce(self): # set a timeout so we don't wait forever if we break producer code message = self.consumer.consume() - self.assertTrue(message.value == payload) + assert message.value == payload def test_async_produce(self): payload = uuid4().bytes @@ -45,7 +44,7 @@ def test_async_produce(self): prod.produce(payload) message = self.consumer.consume() - self.assertTrue(message.value == payload) + assert message.value == payload def test_async_produce_context(self): """Ensure that the producer works as a context manager""" @@ -55,7 +54,7 @@ def test_async_produce_context(self): producer.produce(payload) message = self.consumer.consume() - self.assertTrue(message.value == payload) + assert message.value == payload def test_async_produce_queue_full(self): """Ensure that the producer raises an error when its queue is full""" @@ -96,11 +95,11 @@ def test_async_produce_thread_exception(self): def test_async_produce_unicode(self): """Ensure that the producer can handle unicode strings""" topic = self.client.topics[self.topic_name] - payload = u"tester" + payload = b"tester" with topic.get_producer(min_queued_messages=1) as producer: producer.produce(payload) message = self.consumer.consume() - self.assertTrue(message.value == payload) + assert message.value == payload if __name__ == "__main__": - unittest2.main() + unittest.main() diff --git a/tests/pykafka/test_protocol.py b/tests/pykafka/test_protocol.py index 4a19d50d8..72a3b06a7 100644 --- a/tests/pykafka/test_protocol.py +++ b/tests/pykafka/test_protocol.py @@ -1,11 +1,12 @@ -import unittest2 +from pykafka.test.utils import unittest from pykafka import exceptions from pykafka import protocol from pykafka.common import CompressionType +from pykafka.utils.compat import buffer -class TestMetadataAPI(unittest2.TestCase): +class TestMetadataAPI(unittest.TestCase): maxDiff = None def test_request(self): @@ -18,33 +19,33 @@ def test_request(self): def test_response(self): cluster = protocol.MetadataResponse( - buffer('\x00\x00\x00\x01\x00\x00\x00\x00\x00\x09localhost\x00\x00#\x84\x00\x00\x00\x01\x00\x00\x00\x04test\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00') + buffer(b'\x00\x00\x00\x01\x00\x00\x00\x00\x00\x09localhost\x00\x00#\x84\x00\x00\x00\x01\x00\x00\x00\x04test\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00') ) - self.assertEqual(cluster.brokers[0].host, 'localhost') + self.assertEqual(cluster.brokers[0].host, b'localhost') self.assertEqual(cluster.brokers[0].port, 9092) - self.assertEqual(cluster.topics['test'].partitions[0].leader, + self.assertEqual(cluster.topics[b'test'].partitions[0].leader, cluster.brokers[0].id) - self.assertEqual(cluster.topics['test'].partitions[0].replicas, + self.assertEqual(cluster.topics[b'test'].partitions[0].replicas, [cluster.brokers[0].id]) - self.assertEqual(cluster.topics['test'].partitions[0].isr, + self.assertEqual(cluster.topics[b'test'].partitions[0].isr, [cluster.brokers[0].id]) def test_partition_error(self): # Response has a UnknownTopicOrPartition error for test/0 response = protocol.MetadataResponse( - buffer('\x00\x00\x00\x01\x00\x00\x00\x00\x00\x09localhost\x00\x00#\x84\x00\x00\x00\x01\x00\x00\x00\x04test\x00\x00\x00\x02\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00') + buffer(b'\x00\x00\x00\x01\x00\x00\x00\x00\x00\x09localhost\x00\x00#\x84\x00\x00\x00\x01\x00\x00\x00\x04test\x00\x00\x00\x02\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00') ) - self.assertEqual(response.topics['test'].partitions[0].err, 3) + self.assertEqual(response.topics[b'test'].partitions[0].err, 3) def test_topic_error(self): # Response has a UnknownTopicOrPartition error for test/0 response = protocol.MetadataResponse( - buffer('\x00\x00\x00\x01\x00\x00\x00\x00\x00\x09localhost\x00\x00#\x84\x00\x00\x00\x01\x00\x03\x00\x04test\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00') + buffer(b'\x00\x00\x00\x01\x00\x00\x00\x00\x00\x09localhost\x00\x00#\x84\x00\x00\x00\x01\x00\x03\x00\x04test\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00') ) - self.assertEqual(response.topics['test'].err, 3) + self.assertEqual(response.topics[b'test'].err, 3) -class TestProduceAPI(unittest2.TestCase): +class TestProduceAPI(unittest.TestCase): maxDiff = None test_messages = [ @@ -78,21 +79,21 @@ def test_snappy_compression(self): def test_partition_error(self): # Response has a UnknownTopicOrPartition error for test/0 response = protocol.ProduceResponse( - buffer('\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x02') + buffer(b'\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x02') ) - self.assertEqual(response.topics['test'][0].err, 3) + self.assertEqual(response.topics[b'test'][0].err, 3) def test_response(self): response = protocol.ProduceResponse( - buffer('\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02') + buffer(b'\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02') ) self.assertEqual( response.topics, - {'test': {0: protocol.ProducePartitionResponse(0, 2)}} + {b'test': {0: protocol.ProducePartitionResponse(0, 2)}} ) -class TestFetchAPI(unittest2.TestCase): +class TestFetchAPI(unittest.TestCase): maxDiff = None def test_request(self): @@ -107,36 +108,61 @@ def test_request(self): def test_partition_error(self): # Response has a UnknownTopicOrPartition error for test/0 response = protocol.FetchResponse( - buffer('\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00B\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x006\xa3 ^B\x00\x00\x00\x00\x00\x12test_partition_key\x00\x00\x00\x16this is a test message') + buffer(b'\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00B\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x006\xa3 ^B\x00\x00\x00\x00\x00\x12test_partition_key\x00\x00\x00\x16this is a test message') ) - self.assertEqual(response.topics['test'][0].err, 3) + self.assertEqual(response.topics[b'test'][0].err, 3) def test_response(self): resp = protocol.FetchResponse( - buffer('\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00B\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x006\xa3 ^B\x00\x00\x00\x00\x00\x12test_partition_key\x00\x00\x00\x16this is a test message') + buffer(b'\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00B\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x006\xa3 ^B\x00\x00\x00\x00\x00\x12test_partition_key\x00\x00\x00\x16this is a test message') ) - self.assertEqual(len(resp.topics['test'][0].messages), 1) - self.assertEqual(resp.topics['test'][0].max_offset, 2) - message = resp.topics['test'][0].messages[0] - self.assertEqual(message.value, 'this is a test message') - self.assertEqual(message.partition_key, 'test_partition_key') + self.assertEqual(len(resp.topics[b'test'][0].messages), 1) + self.assertEqual(resp.topics[b'test'][0].max_offset, 2) + message = resp.topics[b'test'][0].messages[0] + self.assertEqual(message.value, b'this is a test message') + self.assertEqual(message.partition_key, b'test_partition_key') self.assertEqual(message.compression_type, 0) self.assertEqual(message.offset, 1) def test_gzip_decompression(self): - msg = '\x00\x00\x00\x01\x00\ttest_gzip\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x9b\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x8f\xbb\xe7\x1f\xb8\x00\x01\xff\xff\xff\xff\x00\x00\x00\x81\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x00c`\x80\x03\r\xbe.I\x7f0\x8b%\xb18%\rH\x8b\x95dd\x16+\x00Q\xa2BIjq\x89Bnjqqbz*T=#\x10\x1b\xb2\xf3\xcb\xf4\x81y\x1c \x15\xf1\xd9\xa9\x95@\xb64\\_Nq>v\xcdL@\xac\x7f\xb5(\xd9\x98\x81\xe1?\x10\x00y\x8a`M)\xf9\xa9\xc5y\xea%\n\x19\x89e\xa9@\x9d\x05\x89E%\x99%\x99\xf9y\n@\x93\x01N1\x9f[\xac\x00\x00\x00' + msg = b'\x00\x00\x00\x01\x00\ttest_gzip\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x9b\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x8f\xbb\xe7\x1f\xb8\x00\x01\xff\xff\xff\xff\x00\x00\x00\x81\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x00c`\x80\x03\r\xbe.I\x7f0\x8b%\xb18%\rH\x8b\x95dd\x16+\x00Q\xa2BIjq\x89Bnjqqbz*T=#\x10\x1b\xb2\xf3\xcb\xf4\x81y\x1c \x15\xf1\xd9\xa9\x95@\xb64\\_Nq>v\xcdL@\xac\x7f\xb5(\xd9\x98\x81\xe1?\x10\x00y\x8a`M)\xf9\xa9\xc5y\xea%\n\x19\x89e\xa9@\x9d\x05\x89E%\x99%\x99\xf9y\n@\x93\x01N1\x9f[\xac\x00\x00\x00' response = protocol.FetchResponse(msg) + expected1 = { + 'partition_key': b'asdf', + 'compression_type': 0, + 'value': b'this is a test message', + 'offset': 0, + 'partition_id': 0, + 'partition': None + } self.assertDictEqual( - response.topics['test_gzip'][0].messages[0].__dict__, - {'partition_key': 'asdf', 'compression_type': 0, 'value': 'this is a test message', 'offset': 0, 'partition_id': 0, 'partition': None}, + response.topics[b'test_gzip'][0].messages[0].__dict__, + expected1 ) + expected2 = { + 'partition_key': b'test_key', + 'compression_type': 0, + 'value': b'this is also a test message', + 'offset': 1, + 'partition_id': 0, + 'partition': None + } self.assertDictEqual( - response.topics['test_gzip'][0].messages[1].__dict__, - {'partition_key': 'test_key', 'compression_type': 0, 'value': 'this is also a test message', 'offset': 1, 'partition_id': 0, 'partition': None}, + response.topics[b'test_gzip'][0].messages[1].__dict__, + expected2 ) + expected3 = { + 'partition_key': None, + 'compression_type': 0, + 'value': b"this doesn't have a partition key", + 'offset': 2, + 'partition_id': 0, + 'partition': None + } + self.assertDictEqual( - response.topics['test_gzip'][0].messages[2].__dict__, - {'partition_key': None, 'compression_type': 0, 'value': "this doesn't have a partition key", 'offset': 2, 'partition_id': 0, 'partition': None} + response.topics[b'test_gzip'][0].messages[2].__dict__, + expected3 ) return @@ -157,7 +183,7 @@ def snappy_decompression(self): ) -class TestOffsetAPI(unittest2.TestCase): +class TestOffsetAPI(unittest.TestCase): maxDiff = None def test_request(self): @@ -172,18 +198,18 @@ def test_request(self): def test_partition_error(self): # Response has a UnknownTopicOrPartition error for test/0 response = protocol.OffsetResponse( - buffer('\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x03\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02') + buffer(b'\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x03\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02') ) - self.assertEqual(response.topics['test'][0].err, 3) + self.assertEqual(response.topics[b'test'][0].err, 3) def test_response(self): resp = protocol.OffsetResponse( - buffer('\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02') + buffer(b'\x00\x00\x00\x01\x00\x04test\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02') ) - self.assertEqual(resp.topics['test'][0].offset, [2]) + self.assertEqual(resp.topics[b'test'][0].offset, [2]) -class TestOffsetCommitFetchAPI(unittest2.TestCase): +class TestOffsetCommitFetchAPI(unittest.TestCase): maxDiff = None def test_consumer_metadata_request(self): @@ -196,10 +222,10 @@ def test_consumer_metadata_request(self): def test_consumer_metadata_response(self): response = protocol.ConsumerMetadataResponse( - buffer('\x00\x00\x00\x00\x00\x00\x00\remmett-debian\x00\x00#\x84') + buffer(b'\x00\x00\x00\x00\x00\x00\x00\remmett-debian\x00\x00#\x84') ) self.assertEqual(response.coordinator_id, 0) - self.assertEqual(response.coordinator_host, 'emmett-debian') + self.assertEqual(response.coordinator_host, b'emmett-debian') self.assertEqual(response.coordinator_port, 9092) def test_offset_commit_request(self): @@ -214,9 +240,9 @@ def test_offset_commit_request(self): def test_offset_commit_response(self): response = protocol.OffsetCommitResponse( - buffer('\x00\x00\x00\x01\x00\x0cemmett.dummy\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00') + buffer(b'\x00\x00\x00\x01\x00\x0cemmett.dummy\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00') ) - self.assertEqual(response.topics['emmett.dummy'][0].err, 0) + self.assertEqual(response.topics[b'emmett.dummy'][0].err, 0) def test_offset_fetch_request(self): preq = protocol.PartitionOffsetFetchRequest('testtopic', 0) @@ -229,11 +255,11 @@ def test_offset_fetch_request(self): def test_offset_fetch_response(self): response = protocol.OffsetFetchResponse( - buffer('\x00\x00\x00\x01\x00\x0cemmett.dummy\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00') + buffer(b'\x00\x00\x00\x01\x00\x0cemmett.dummy\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00') ) - self.assertEqual(response.topics['emmett.dummy'][0].metadata, '') - self.assertEqual(response.topics['emmett.dummy'][0].offset, 1) + self.assertEqual(response.topics[b'emmett.dummy'][0].metadata, b'') + self.assertEqual(response.topics[b'emmett.dummy'][0].offset, 1) if __name__ == '__main__': - unittest2.main() + unittest.main() diff --git a/tests/pykafka/test_simpleconsumer.py b/tests/pykafka/test_simpleconsumer.py index 923fc5be2..580891240 100644 --- a/tests/pykafka/test_simpleconsumer.py +++ b/tests/pykafka/test_simpleconsumer.py @@ -1,14 +1,15 @@ from contextlib import contextmanager -import mock -import unittest2 +from pykafka.test.utils import unittest, mock + from uuid import uuid4 from pykafka import KafkaClient from pykafka.simpleconsumer import OwnedPartition, OffsetType from pykafka.test.utils import get_cluster, stop_cluster +from pykafka.utils.compat import range -class TestSimpleConsumer(unittest2.TestCase): +class TestSimpleConsumer(unittest.TestCase): maxDiff = None @classmethod @@ -26,7 +27,7 @@ def setUpClass(cls): for _ in range(3): cls.kafka.produce_messages( cls.topic_name, - ('msg {i}'.format(i=i) for i in xrange(batch))) + ('msg {i}'.format(i=i) for i in range(batch))) cls.client = KafkaClient(cls.kafka.brokers) @@ -44,7 +45,7 @@ def _get_simple_consumer(self, **kwargs): def test_consume(self): with self._get_simple_consumer() as consumer: - messages = [consumer.consume() for _ in xrange(self.total_msgs)] + messages = [consumer.consume() for _ in range(self.total_msgs)] self.assertEquals(len(messages), self.total_msgs) self.assertTrue(None not in messages) @@ -52,7 +53,7 @@ def test_offset_commit(self): """Check fetched offsets match pre-commit internal state""" with self._get_simple_consumer( consumer_group='test_offset_commit') as consumer: - [consumer.consume() for _ in xrange(100)] + [consumer.consume() for _ in range(100)] offsets_committed = consumer.held_offsets consumer.commit_offsets() @@ -64,7 +65,7 @@ def test_offset_resume(self): """Check resumed internal state matches committed offsets""" with self._get_simple_consumer( consumer_group='test_offset_resume') as consumer: - [consumer.consume() for _ in xrange(100)] + [consumer.consume() for _ in range(100)] offsets_committed = consumer.held_offsets consumer.commit_offsets() @@ -136,7 +137,7 @@ def test_reset_offsets(self): self.assertEqual(consumer.held_offsets[part_id], expected_offset) -class TestOwnedPartition(unittest2.TestCase): +class TestOwnedPartition(unittest.TestCase): def test_partition_saves_offset(self): msgval = "test" partition = mock.MagicMock() @@ -217,4 +218,4 @@ def test_partition_offset_counters(self): if __name__ == "__main__": - unittest2.main() + unittest.main() diff --git a/tests/pykafka/utils/test_compression.py b/tests/pykafka/utils/test_compression.py index 7b5bdbb39..f0e2308f3 100644 --- a/tests/pykafka/utils/test_compression.py +++ b/tests/pykafka/utils/test_compression.py @@ -1,10 +1,9 @@ -import unittest2 - +from pykafka.test.utils import unittest from pykafka.utils import compression -class CompressionTests(unittest2.TestCase): +class CompressionTests(unittest.TestCase): """Keeping these simple by verifying what goes in is what comes out.""" - text = "The man in black fled across the desert, and the gunslinger followed." + text = b"The man in black fled across the desert, and the gunslinger followed." def test_gzip(self): encoded = compression.encode_gzip(self.text) @@ -29,4 +28,4 @@ def test_snappy_xerial(self): if __name__ == '__main__': - unittest2.main() + unittest.main() diff --git a/tests/pykafka/utils/test_struct_helpers.py b/tests/pykafka/utils/test_struct_helpers.py index 7effba7ad..c19760db3 100644 --- a/tests/pykafka/utils/test_struct_helpers.py +++ b/tests/pykafka/utils/test_struct_helpers.py @@ -1,5 +1,4 @@ -import unittest - +from pykafka.test.utils import unittest from pykafka.utils import struct_helpers @@ -13,11 +12,11 @@ def test_basic_unpack(self): def test_string_encoding(self): output = struct_helpers.unpack_from('S', b'\x00\x04test') - self.assertEqual(output, ('test',)) + self.assertEqual(output, (b'test',)) def test_bytearray_unpacking(self): output = struct_helpers.unpack_from('Y', b'\x00\x00\x00\x04test') - self.assertEqual(output, ('test',)) + self.assertEqual(output, (b'test',)) def test_array_unpacking(self): output = struct_helpers.unpack_from( diff --git a/tox.ini b/tox.ini index 3e4e496d3..bddfc6a1b 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,14 @@ [tox] -envlist = py26, py27, pypy +envlist = py27, py34, pypy + +[testenv:py27] +deps = mock + +[testenv:pypy] +deps = mock [testenv] -commands = python setup.py test +commands = + pip install -r test-requirements.txt + pip install -e . + py.test {posargs}