diff --git a/pylint.rc b/pylint.rc index d13ef519e..d22e523ec 100644 --- a/pylint.rc +++ b/pylint.rc @@ -1,5 +1,6 @@ [TYPECHECK] ignored-classes=SyncManager,_socketobject +generated-members=py.* [MESSAGES CONTROL] disable=E1129 diff --git a/test/conftest.py b/test/conftest.py index e85b977c8..d53ff23a9 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,38 +1,117 @@ from __future__ import absolute_import -import os +import inspect import pytest +from decorator import decorate from test.fixtures import KafkaFixture, ZookeeperFixture - +from test.testutil import kafka_version, random_string @pytest.fixture(scope="module") def version(): - if 'KAFKA_VERSION' not in os.environ: - return () - return tuple(map(int, os.environ['KAFKA_VERSION'].split('.'))) - + """Return the Kafka version set in the OS environment""" + return kafka_version() @pytest.fixture(scope="module") -def zookeeper(version, request): - assert version - zk = ZookeeperFixture.instance() - yield zk - zk.close() +def zookeeper(): + """Return a Zookeeper fixture""" + zk_instance = ZookeeperFixture.instance() + yield zk_instance + zk_instance.close() +@pytest.fixture(scope="module") +def kafka_broker(kafka_broker_factory): + """Return a Kafka broker fixture""" + return kafka_broker_factory()[0] @pytest.fixture(scope="module") -def kafka_broker(version, zookeeper, request): - assert version - k = KafkaFixture.instance(0, zookeeper.host, zookeeper.port, - partitions=4) - yield k - k.close() +def kafka_broker_factory(version, zookeeper): + """Return a Kafka broker fixture factory""" + assert version, 'KAFKA_VERSION must be specified to run integration tests' + + _brokers = [] + def factory(**broker_params): + params = {} if broker_params is None else broker_params.copy() + params.setdefault('partitions', 4) + num_brokers = params.pop('num_brokers', 1) + brokers = tuple(KafkaFixture.instance(x, zookeeper, **params) + for x in range(num_brokers)) + _brokers.extend(brokers) + return brokers + yield factory + + for broker in _brokers: + broker.close() + +@pytest.fixture +def simple_client(kafka_broker, request, topic): + """Return a SimpleClient fixture""" + client = kafka_broker.get_simple_client(client_id='%s_client' % (request.node.name,)) + client.ensure_topic_exists(topic) + yield client + client.close() + +@pytest.fixture +def kafka_client(kafka_broker, request): + """Return a KafkaClient fixture""" + (client,) = kafka_broker.get_clients(cnt=1, client_id='%s_client' % (request.node.name,)) + yield client + client.close() + +@pytest.fixture +def kafka_consumer(kafka_consumer_factory): + """Return a KafkaConsumer fixture""" + return kafka_consumer_factory() + +@pytest.fixture +def kafka_consumer_factory(kafka_broker, topic, request): + """Return a KafkaConsumer factory fixture""" + _consumer = [None] + + def factory(**kafka_consumer_params): + params = {} if kafka_consumer_params is None else kafka_consumer_params.copy() + params.setdefault('client_id', 'consumer_%s' % (request.node.name,)) + _consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=[topic], **params)) + return _consumer[0] + + yield factory + + if _consumer[0]: + _consumer[0].close() + +@pytest.fixture +def kafka_producer(kafka_producer_factory): + """Return a KafkaProducer fixture""" + yield kafka_producer_factory() + +@pytest.fixture +def kafka_producer_factory(kafka_broker, request): + """Return a KafkaProduce factory fixture""" + _producer = [None] + + def factory(**kafka_producer_params): + params = {} if kafka_producer_params is None else kafka_producer_params.copy() + params.setdefault('client_id', 'producer_%s' % (request.node.name,)) + _producer[0] = next(kafka_broker.get_producers(cnt=1, **params)) + return _producer[0] + + yield factory + + if _producer[0]: + _producer[0].close() + +@pytest.fixture +def topic(kafka_broker, request): + """Return a topic fixture""" + topic_name = '%s_%s' % (request.node.name, random_string(10)) + kafka_broker.create_topics([topic_name]) + return topic_name @pytest.fixture def conn(mocker): + """Return a connection mocker fixture""" from kafka.conn import ConnectionStates from kafka.future import Future from kafka.protocol.metadata import MetadataResponse diff --git a/test/fixtures.py b/test/fixtures.py index 1c418fd7e..493a664a5 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -4,29 +4,55 @@ import logging import os import os.path -import shutil +import random +import socket +import string import subprocess -import tempfile import time import uuid -from six.moves import urllib +import py +from six.moves import urllib, xrange from six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401 +from kafka import errors, KafkaConsumer, KafkaProducer, SimpleClient +from kafka.client_async import KafkaClient +from kafka.protocol.admin import CreateTopicsRequest +from kafka.protocol.metadata import MetadataRequest from test.service import ExternalService, SpawnedService -from test.testutil import get_open_port - log = logging.getLogger(__name__) +def random_string(length): + return "".join(random.choice(string.ascii_letters) for i in xrange(length)) + +def version_str_to_list(version_str): + return tuple(map(int, version_str.split('.'))) # e.g., (0, 8, 1, 1) + +def version(): + if 'KAFKA_VERSION' not in os.environ: + return () + return version_str_to_list(os.environ['KAFKA_VERSION']) + +def get_open_port(): + sock = socket.socket() + sock.bind(("", 0)) + port = sock.getsockname()[1] + sock.close() + return port class Fixture(object): kafka_version = os.environ.get('KAFKA_VERSION', '0.11.0.2') scala_version = os.environ.get("SCALA_VERSION", '2.8.0') - project_root = os.environ.get('PROJECT_ROOT', os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) - kafka_root = os.environ.get("KAFKA_ROOT", os.path.join(project_root, 'servers', kafka_version, "kafka-bin")) + project_root = os.environ.get('PROJECT_ROOT', + os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + kafka_root = os.environ.get("KAFKA_ROOT", + os.path.join(project_root, 'servers', kafka_version, "kafka-bin")) ivy_root = os.environ.get('IVY_ROOT', os.path.expanduser("~/.ivy2/cache")) + def __init__(self): + self.child = None + @classmethod def download_official_distribution(cls, kafka_version=None, @@ -71,31 +97,34 @@ def test_resource(cls, filename): @classmethod def kafka_run_class_args(cls, *args): result = [os.path.join(cls.kafka_root, 'bin', 'kafka-run-class.sh')] - result.extend(args) + result.extend([str(arg) for arg in args]) return result def kafka_run_class_env(self): env = os.environ.copy() - env['KAFKA_LOG4J_OPTS'] = "-Dlog4j.configuration=file:%s" % self.test_resource("log4j.properties") + env['KAFKA_LOG4J_OPTS'] = "-Dlog4j.configuration=file:%s" % \ + self.test_resource("log4j.properties") return env @classmethod def render_template(cls, source_file, target_file, binding): - log.info('Rendering %s from template %s', target_file, source_file) + log.info('Rendering %s from template %s', target_file.strpath, source_file) with open(source_file, "r") as handle: template = handle.read() assert len(template) > 0, 'Empty template %s' % source_file - with open(target_file, "w") as handle: + with open(target_file.strpath, "w") as handle: handle.write(template.format(**binding)) handle.flush() os.fsync(handle) # fsync directory for durability # https://blog.gocept.com/2013/07/15/reliable-file-updates-with-python/ - dirfd = os.open(os.path.dirname(target_file), os.O_DIRECTORY) + dirfd = os.open(os.path.dirname(target_file.strpath), os.O_DIRECTORY) os.fsync(dirfd) os.close(dirfd) + def dump_logs(self): + self.child.dump_logs() class ZookeeperFixture(Fixture): @classmethod @@ -111,32 +140,36 @@ def instance(cls): fixture.open() return fixture - def __init__(self, host, port): + def __init__(self, host, port, tmp_dir=None): + super(ZookeeperFixture, self).__init__() self.host = host self.port = port - self.tmp_dir = None - self.child = None + self.tmp_dir = tmp_dir def kafka_run_class_env(self): env = super(ZookeeperFixture, self).kafka_run_class_env() - env['LOG_DIR'] = os.path.join(self.tmp_dir, 'logs') + env['LOG_DIR'] = self.tmp_dir.join('logs').strpath return env def out(self, message): log.info("*** Zookeeper [%s:%s]: %s", self.host, self.port or '(auto)', message) def open(self): - self.tmp_dir = tempfile.mkdtemp() + if self.tmp_dir is None: + self.tmp_dir = py.path.local.mkdtemp() #pylint: disable=no-member + self.tmp_dir.ensure(dir=True) + self.out("Running local instance...") log.info(" host = %s", self.host) log.info(" port = %s", self.port or '(auto)') - log.info(" tmp_dir = %s", self.tmp_dir) + log.info(" tmp_dir = %s", self.tmp_dir.strpath) # Configure Zookeeper child process template = self.test_resource("zookeeper.properties") - properties = os.path.join(self.tmp_dir, "zookeeper.properties") - args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", properties) + properties = self.tmp_dir.join("zookeeper.properties") + args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", + properties.strpath) env = self.kafka_run_class_env() # Party! @@ -174,7 +207,7 @@ def close(self): self.child.stop() self.child = None self.out("Done!") - shutil.rmtree(self.tmp_dir) + self.tmp_dir.remove() def __del__(self): self.close() @@ -182,9 +215,11 @@ def __del__(self): class KafkaFixture(Fixture): @classmethod - def instance(cls, broker_id, zk_host, zk_port, zk_chroot=None, + def instance(cls, broker_id, zookeeper, zk_chroot=None, host=None, port=None, - transport='PLAINTEXT', replicas=1, partitions=2): + transport='PLAINTEXT', replicas=1, partitions=2, + sasl_mechanism='PLAIN', auto_create_topic=True, tmp_dir=None): + if zk_chroot is None: zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_") if "KAFKA_URI" in os.environ: @@ -195,19 +230,29 @@ def instance(cls, broker_id, zk_host, zk_port, zk_chroot=None, if host is None: host = "localhost" fixture = KafkaFixture(host, port, broker_id, - zk_host, zk_port, zk_chroot, + zookeeper, zk_chroot, transport=transport, - replicas=replicas, partitions=partitions) + replicas=replicas, partitions=partitions, + sasl_mechanism=sasl_mechanism, + auto_create_topic=auto_create_topic, + tmp_dir=tmp_dir) + fixture.open() return fixture - def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, - replicas=1, partitions=2, transport='PLAINTEXT'): + def __init__(self, host, port, broker_id, zookeeper, zk_chroot, + replicas=1, partitions=2, transport='PLAINTEXT', + sasl_mechanism='PLAIN', auto_create_topic=True, + tmp_dir=None): + super(KafkaFixture, self).__init__() + self.host = host self.port = port self.broker_id = broker_id + self.auto_create_topic = auto_create_topic self.transport = transport.upper() + self.sasl_mechanism = sasl_mechanism.upper() self.ssl_dir = self.test_resource('ssl') # TODO: checking for port connection would be better than scanning logs @@ -215,67 +260,55 @@ def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, # The logging format changed slightly in 1.0.0 self.start_pattern = r"\[Kafka ?Server (id=)?%d\],? started" % broker_id - self.zk_host = zk_host - self.zk_port = zk_port + self.zookeeper = zookeeper self.zk_chroot = zk_chroot + # Add the attributes below for the template binding + self.zk_host = self.zookeeper.host + self.zk_port = self.zookeeper.port self.replicas = replicas self.partitions = partitions - self.tmp_dir = None - self.child = None + self.tmp_dir = tmp_dir self.running = False + self._client = None + + def bootstrap_server(self): + return '%s:%d' % (self.host, self.port) + def kafka_run_class_env(self): env = super(KafkaFixture, self).kafka_run_class_env() - env['LOG_DIR'] = os.path.join(self.tmp_dir, 'logs') + env['LOG_DIR'] = self.tmp_dir.join('logs').strpath return env def out(self, message): log.info("*** Kafka [%s:%s]: %s", self.host, self.port or '(auto)', message) - def open(self): - if self.running: - self.out("Instance already running") - return - - self.tmp_dir = tempfile.mkdtemp() - self.out("Running local instance...") - log.info(" host = %s", self.host) - log.info(" port = %s", self.port or '(auto)') - log.info(" transport = %s", self.transport) - log.info(" broker_id = %s", self.broker_id) - log.info(" zk_host = %s", self.zk_host) - log.info(" zk_port = %s", self.zk_port) - log.info(" zk_chroot = %s", self.zk_chroot) - log.info(" replicas = %s", self.replicas) - log.info(" partitions = %s", self.partitions) - log.info(" tmp_dir = %s", self.tmp_dir) - - # Create directories - os.mkdir(os.path.join(self.tmp_dir, "logs")) - os.mkdir(os.path.join(self.tmp_dir, "data")) - + def _create_zk_chroot(self): self.out("Creating Zookeeper chroot node...") args = self.kafka_run_class_args("org.apache.zookeeper.ZooKeeperMain", - "-server", "%s:%d" % (self.zk_host, self.zk_port), + "-server", + "%s:%d" % (self.zookeeper.host, + self.zookeeper.port), "create", "/%s" % self.zk_chroot, "kafka-python") env = self.kafka_run_class_env() proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - if proc.wait() != 0: + if proc.wait() != 0 or proc.returncode != 0: self.out("Failed to create Zookeeper chroot node") self.out(proc.stdout.read()) self.out(proc.stderr.read()) raise RuntimeError("Failed to create Zookeeper chroot node") - self.out("Done!") + self.out("Kafka chroot created in Zookeeper!") + def start(self): # Configure Kafka child process - properties = os.path.join(self.tmp_dir, "kafka.properties") + properties = self.tmp_dir.join("kafka.properties") template = self.test_resource("kafka.properties") - args = self.kafka_run_class_args("kafka.Kafka", properties) + args = self.kafka_run_class_args("kafka.Kafka", properties.strpath) env = self.kafka_run_class_env() timeout = 5 @@ -305,14 +338,45 @@ def open(self): backoff += 1 else: raise RuntimeError('Failed to start KafkaInstance before max_timeout') + + (self._client,) = self.get_clients(1, '_internal_client') + self.out("Done!") self.running = True + + def open(self): + if self.running: + self.out("Instance already running") + return + + # Create directories + if self.tmp_dir is None: + self.tmp_dir = py.path.local.mkdtemp() #pylint: disable=no-member + self.tmp_dir.ensure(dir=True) + self.tmp_dir.ensure('logs', dir=True) + self.tmp_dir.ensure('data', dir=True) + + self.out("Running local instance...") + log.info(" host = %s", self.host) + log.info(" port = %s", self.port or '(auto)') + log.info(" transport = %s", self.transport) + log.info(" broker_id = %s", self.broker_id) + log.info(" zk_host = %s", self.zookeeper.host) + log.info(" zk_port = %s", self.zookeeper.port) + log.info(" zk_chroot = %s", self.zk_chroot) + log.info(" replicas = %s", self.replicas) + log.info(" partitions = %s", self.partitions) + log.info(" tmp_dir = %s", self.tmp_dir.strpath) + + self._create_zk_chroot() + self.start() + atexit.register(self.close) def __del__(self): self.close() - def close(self): + def stop(self): if not self.running: self.out("Instance already stopped") return @@ -320,6 +384,117 @@ def close(self): self.out("Stopping...") self.child.stop() self.child = None - self.out("Done!") - shutil.rmtree(self.tmp_dir) self.running = False + self.out("Stopped!") + + def close(self): + self.stop() + if self.tmp_dir is not None: + self.tmp_dir.remove() + self.tmp_dir = None + self.out("Done!") + + def dump_logs(self): + super(KafkaFixture, self).dump_logs() + self.zookeeper.dump_logs() + + def _send_request(self, request, timeout=None): + def _failure(error): + raise error + retries = 10 + while True: + node_id = self._client.least_loaded_node() + for ready_retry in range(40): + if self._client.ready(node_id, False): + break + time.sleep(.1) + else: + raise RuntimeError('Could not connect to broker with node id %d' % (node_id,)) + + try: + future = self._client.send(node_id, request) + future.error_on_callbacks = True + future.add_errback(_failure) + return self._client.poll(future=future, timeout_ms=timeout) + except Exception as exc: + time.sleep(1) + retries -= 1 + if retries == 0: + raise exc + else: + pass # retry + + def _create_topic(self, topic_name, num_partitions, replication_factor, timeout_ms=10000): + if num_partitions is None: + num_partitions = self.partitions + if replication_factor is None: + replication_factor = self.replicas + + # Try different methods to create a topic, from the fastest to the slowest + if self.auto_create_topic and \ + num_partitions == self.partitions and \ + replication_factor == self.replicas: + self._send_request(MetadataRequest[0]([topic_name])) + elif version() >= (0, 10, 1, 0): + request = CreateTopicsRequest[0]([(topic_name, num_partitions, + replication_factor, [], [])], timeout_ms) + result = self._send_request(request, timeout=timeout_ms) + for topic_result in result[0].topic_error_codes: + error_code = topic_result[1] + if error_code != 0: + raise errors.for_code(error_code) + else: + args = self.kafka_run_class_args('kafka.admin.TopicCommand', + '--zookeeper', '%s:%s/%s' % (self.zookeeper.host, + self.zookeeper.port, + self.zk_chroot), + '--create', + '--topic', topic_name, + '--partitions', self.partitions \ + if num_partitions is None else num_partitions, + '--replication-factor', self.replicas \ + if replication_factor is None \ + else replication_factor) + if version() >= (0, 10): + args.append('--if-not-exists') + env = self.kafka_run_class_env() + proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + ret = proc.wait() + if ret != 0 or proc.returncode != 0: + output = proc.stdout.read() + if not 'kafka.common.TopicExistsException' in output: + self.out("Failed to create topic %s" % (topic_name,)) + self.out(output) + self.out(proc.stderr.read()) + raise RuntimeError("Failed to create topic %s" % (topic_name,)) + + def create_topics(self, topic_names, num_partitions=None, replication_factor=None): + for topic_name in topic_names: + self._create_topic(topic_name, num_partitions, replication_factor) + + def get_clients(self, cnt=1, client_id=None): + if client_id is None: + client_id = 'client' + return tuple(KafkaClient(client_id='%s_%s' % (client_id, random_string(4)), + bootstrap_servers=self.bootstrap_server()) for x in range(cnt)) + + def get_consumers(self, cnt, topics, **params): + params.setdefault('client_id', 'consumer') + params.setdefault('heartbeat_interval_ms', 500) + params['bootstrap_servers'] = self.bootstrap_server() + client_id = params['client_id'] + for x in range(cnt): + params['client_id'] = '%s_%s' % (client_id, random_string(4)) + yield KafkaConsumer(*topics, **params) + + def get_producers(self, cnt, **params): + params.setdefault('client_id', 'producer') + params['bootstrap_servers'] = self.bootstrap_server() + client_id = params['client_id'] + for x in range(cnt): + params['client_id'] = '%s_%s' % (client_id, random_string(4)) + yield KafkaProducer(**params) + + def get_simple_client(self, **params): + params.setdefault('client_id', 'simple_client') + return SimpleClient(self.bootstrap_server(), **params) diff --git a/test/test_client_integration.py b/test/test_client_integration.py index 742572d5e..df0faef69 100644 --- a/test/test_client_integration.py +++ b/test/test_client_integration.py @@ -17,7 +17,7 @@ def setUpClass(cls): # noqa return cls.zk = ZookeeperFixture.instance() - cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) + cls.server = KafkaFixture.instance(0, cls.zk) @classmethod def tearDownClass(cls): # noqa diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 40eec1484..fe4e45495 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -21,9 +21,30 @@ from test.fixtures import ZookeeperFixture, KafkaFixture from test.testutil import ( - KafkaIntegrationTestCase, kafka_versions, random_string, Timer + KafkaIntegrationTestCase, kafka_versions, random_string, Timer, + send_messages ) +def test_kafka_consumer(simple_client, topic, kafka_consumer_factory): + """Test KafkaConsumer + """ + kafka_consumer = kafka_consumer_factory(auto_offset_reset='earliest') + + send_messages(simple_client, topic, 0, range(0, 100)) + send_messages(simple_client, topic, 1, range(100, 200)) + + cnt = 0 + messages = {0: set(), 1: set()} + for message in kafka_consumer: + logging.debug("Consumed message %s", repr(message)) + cnt += 1 + messages[message.partition].add(message.offset) + if cnt >= 200: + break + + assert len(messages[0]) == 100 + assert len(messages[1]) == 100 + class TestConsumerIntegration(KafkaIntegrationTestCase): maxDiff = None @@ -35,9 +56,9 @@ def setUpClass(cls): cls.zk = ZookeeperFixture.instance() chroot = random_string(10) - cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port, + cls.server1 = KafkaFixture.instance(0, cls.zk, zk_chroot=chroot) - cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port, + cls.server2 = KafkaFixture.instance(1, cls.zk, zk_chroot=chroot) cls.server = cls.server1 # Bootstrapping server @@ -501,24 +522,6 @@ def test_fetch_buffer_size(self): messages = [ message for message in consumer ] self.assertEqual(len(messages), 2) - def test_kafka_consumer(self): - self.send_messages(0, range(0, 100)) - self.send_messages(1, range(100, 200)) - - # Start a consumer - consumer = self.kafka_consumer(auto_offset_reset='earliest') - n = 0 - messages = {0: set(), 1: set()} - for m in consumer: - logging.debug("Consumed message %s" % repr(m)) - n += 1 - messages[m.partition].add(m.offset) - if n >= 200: - break - - self.assertEqual(len(messages[0]), 100) - self.assertEqual(len(messages[1]), 100) - def test_kafka_consumer__blocking(self): TIMEOUT_MS = 500 consumer = self.kafka_consumer(auto_offset_reset='earliest', diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index 9141947ac..8531cfbe8 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -29,10 +29,9 @@ def setUp(self): # mini zookeeper, 3 kafka brokers self.zk = ZookeeperFixture.instance() - kk_args = [self.zk.host, self.zk.port] kk_kwargs = {'zk_chroot': zk_chroot, 'replicas': replicas, 'partitions': partitions} - self.brokers = [KafkaFixture.instance(i, *kk_args, **kk_kwargs) + self.brokers = [KafkaFixture.instance(i, self.zk, **kk_kwargs) for i in range(replicas)] hosts = ['%s:%d' % (b.host, b.port) for b in self.brokers] diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index a304e83b6..ca0da6abd 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -15,7 +15,50 @@ from kafka.structs import FetchRequestPayload, ProduceRequestPayload from test.fixtures import ZookeeperFixture, KafkaFixture -from test.testutil import KafkaIntegrationTestCase, kafka_versions +from test.testutil import KafkaIntegrationTestCase, kafka_versions, current_offset + +# TODO: This duplicates a TestKafkaProducerIntegration method temporarily +# while the migration to pytest is in progress +def assert_produce_request(client, topic, messages, initial_offset, message_ct, + partition=0): + """Verify the correctness of a produce request + """ + produce = ProduceRequestPayload(topic, partition, messages=messages) + + # There should only be one response message from the server. + # This will throw an exception if there's more than one. + resp = client.send_produce_request([produce]) + assert_produce_response(resp, initial_offset) + + assert current_offset(client, topic, partition) == initial_offset + message_ct + +def assert_produce_response(resp, initial_offset): + """Verify that a produce response is well-formed + """ + assert len(resp) == 1 + assert resp[0].error == 0 + assert resp[0].offset == initial_offset + +def test_produce_many_simple(simple_client, topic): + """Test multiple produces using the SimpleClient + """ + start_offset = current_offset(simple_client, topic, 0) + + assert_produce_request( + simple_client, topic, + [create_message(("Test message %d" % i).encode('utf-8')) + for i in range(100)], + start_offset, + 100, + ) + + assert_produce_request( + simple_client, topic, + [create_message(("Test message %d" % i).encode('utf-8')) + for i in range(100)], + start_offset+100, + 100, + ) class TestKafkaProducerIntegration(KafkaIntegrationTestCase): @@ -26,7 +69,7 @@ def setUpClass(cls): # noqa return cls.zk = ZookeeperFixture.instance() - cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) + cls.server = KafkaFixture.instance(0, cls.zk) @classmethod def tearDownClass(cls): # noqa @@ -36,23 +79,6 @@ def tearDownClass(cls): # noqa cls.server.close() cls.zk.close() - def test_produce_many_simple(self): - start_offset = self.current_offset(self.topic, 0) - - self.assert_produce_request( - [create_message(("Test message %d" % i).encode('utf-8')) - for i in range(100)], - start_offset, - 100, - ) - - self.assert_produce_request( - [create_message(("Test message %d" % i).encode('utf-8')) - for i in range(100)], - start_offset+100, - 100, - ) - def test_produce_10k_simple(self): start_offset = self.current_offset(self.topic, 0) diff --git a/test/testutil.py b/test/testutil.py index 0ec1cff7e..850e925a4 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -1,36 +1,20 @@ -import functools -import logging import operator import os -import random import socket -import string import time import uuid -from six.moves import xrange +import decorator +import pytest from . import unittest -from kafka import SimpleClient +from kafka import SimpleClient, create_message from kafka.errors import LeaderNotAvailableError, KafkaTimeoutError -from kafka.structs import OffsetRequestPayload - -__all__ = [ - 'random_string', - 'get_open_port', - 'kafka_versions', - 'KafkaIntegrationTestCase', - 'Timer', -] - -def random_string(l): - return "".join(random.choice(string.ascii_letters) for i in xrange(l)) +from kafka.structs import OffsetRequestPayload, ProduceRequestPayload +from test.fixtures import random_string, version_str_to_list, version as kafka_version #pylint: disable=wrong-import-order def kafka_versions(*versions): - def version_str_to_list(s): - return list(map(int, s.split('.'))) # e.g., [0, 8, 1, 1] - def construct_lambda(s): if s[0].isdigit(): op_str = '=' @@ -54,25 +38,25 @@ def construct_lambda(s): } op = op_map[op_str] version = version_str_to_list(v_str) - return lambda a: op(version_str_to_list(a), version) + return lambda a: op(a, version) validators = map(construct_lambda, versions) - def kafka_versions(func): - @functools.wraps(func) - def wrapper(self): - kafka_version = os.environ.get('KAFKA_VERSION') + def real_kafka_versions(func): + def wrapper(func, *args, **kwargs): + version = kafka_version() - if not kafka_version: - self.skipTest("no kafka version set in KAFKA_VERSION env var") + if not version: + pytest.skip("no kafka version set in KAFKA_VERSION env var") for f in validators: - if not f(kafka_version): - self.skipTest("unsupported kafka version") + if not f(version): + pytest.skip("unsupported kafka version") - return func(self) - return wrapper - return kafka_versions + return func(*args, **kwargs) + return decorator.decorator(wrapper, func) + + return real_kafka_versions def get_open_port(): sock = socket.socket() @@ -81,6 +65,40 @@ def get_open_port(): sock.close() return port +_MESSAGES = {} +def msg(message): + """Format, encode and deduplicate a message + """ + global _MESSAGES #pylint: disable=global-statement + if message not in _MESSAGES: + _MESSAGES[message] = '%s-%s' % (message, str(uuid.uuid4())) + + return _MESSAGES[message].encode('utf-8') + +def send_messages(client, topic, partition, messages): + """Send messages to a topic's partition + """ + messages = [create_message(msg(str(m))) for m in messages] + produce = ProduceRequestPayload(topic, partition, messages=messages) + resp, = client.send_produce_request([produce]) + assert resp.error == 0 + + return [x.value for x in messages] + +def current_offset(client, topic, partition, kafka_broker=None): + """Get the current offset of a topic's partition + """ + try: + offsets, = client.send_offset_request([OffsetRequestPayload(topic, + partition, -1, 1)]) + except Exception: + # XXX: We've seen some UnknownErrors here and can't debug w/o server logs + if kafka_broker: + kafka_broker.dump_logs() + raise + else: + return offsets.offsets[0] + class KafkaIntegrationTestCase(unittest.TestCase): create_client = True topic = None @@ -122,7 +140,8 @@ def tearDown(self): def current_offset(self, topic, partition): try: - offsets, = self.client.send_offset_request([OffsetRequestPayload(topic, partition, -1, 1)]) + offsets, = self.client.send_offset_request([OffsetRequestPayload(topic, + partition, -1, 1)]) except Exception: # XXX: We've seen some UnknownErrors here and can't debug w/o server logs self.zk.child.dump_logs() @@ -132,7 +151,7 @@ def current_offset(self, topic, partition): return offsets.offsets[0] def msgs(self, iterable): - return [ self.msg(x) for x in iterable ] + return [self.msg(x) for x in iterable] def msg(self, s): if s not in self._messages: diff --git a/tox.ini b/tox.ini index 35dc84207..ad95f9374 100644 --- a/tox.ini +++ b/tox.ini @@ -20,6 +20,7 @@ deps = xxhash crc32c py26: unittest2 + decorator commands = py.test {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka --cov-config=.covrc} setenv =