From a1869c4be5f47b4f6433610249aaf29af4ec95e5 Mon Sep 17 00:00:00 2001 From: Andre Araujo Date: Wed, 15 Nov 2017 06:08:29 -0800 Subject: [PATCH] Introduce new fixtures to prepare for migration to pytest. This commits adds new pytest fixtures in prepation for the migration of unittest.TestCases to pytest test cases. The handling of temporary dir creation was also changed so that we can use the pytest tmpdir fixture after the migration. --- pylint.rc | 1 + test/conftest.py | 113 +++++++++-- test/fixtures.py | 299 +++++++++++++++++++++++------- test/test_client_integration.py | 2 +- test/test_consumer_integration.py | 45 ++--- test/test_failover_integration.py | 3 +- test/test_producer_integration.py | 64 +++++-- test/testutil.py | 89 +++++---- tox.ini | 1 + 9 files changed, 460 insertions(+), 157 deletions(-) diff --git a/pylint.rc b/pylint.rc index d13ef519e..d22e523ec 100644 --- a/pylint.rc +++ b/pylint.rc @@ -1,5 +1,6 @@ [TYPECHECK] ignored-classes=SyncManager,_socketobject +generated-members=py.* [MESSAGES CONTROL] disable=E1129 diff --git a/test/conftest.py b/test/conftest.py index e85b977c8..d53ff23a9 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,38 +1,117 @@ from __future__ import absolute_import -import os +import inspect import pytest +from decorator import decorate from test.fixtures import KafkaFixture, ZookeeperFixture - +from test.testutil import kafka_version, random_string @pytest.fixture(scope="module") def version(): - if 'KAFKA_VERSION' not in os.environ: - return () - return tuple(map(int, os.environ['KAFKA_VERSION'].split('.'))) - + """Return the Kafka version set in the OS environment""" + return kafka_version() @pytest.fixture(scope="module") -def zookeeper(version, request): - assert version - zk = ZookeeperFixture.instance() - yield zk - zk.close() +def zookeeper(): + """Return a Zookeeper fixture""" + zk_instance = ZookeeperFixture.instance() + yield zk_instance + zk_instance.close() +@pytest.fixture(scope="module") +def kafka_broker(kafka_broker_factory): + """Return a Kafka broker fixture""" + return kafka_broker_factory()[0] @pytest.fixture(scope="module") -def kafka_broker(version, zookeeper, request): - assert version - k = KafkaFixture.instance(0, zookeeper.host, zookeeper.port, - partitions=4) - yield k - k.close() +def kafka_broker_factory(version, zookeeper): + """Return a Kafka broker fixture factory""" + assert version, 'KAFKA_VERSION must be specified to run integration tests' + + _brokers = [] + def factory(**broker_params): + params = {} if broker_params is None else broker_params.copy() + params.setdefault('partitions', 4) + num_brokers = params.pop('num_brokers', 1) + brokers = tuple(KafkaFixture.instance(x, zookeeper, **params) + for x in range(num_brokers)) + _brokers.extend(brokers) + return brokers + yield factory + + for broker in _brokers: + broker.close() + +@pytest.fixture +def simple_client(kafka_broker, request, topic): + """Return a SimpleClient fixture""" + client = kafka_broker.get_simple_client(client_id='%s_client' % (request.node.name,)) + client.ensure_topic_exists(topic) + yield client + client.close() + +@pytest.fixture +def kafka_client(kafka_broker, request): + """Return a KafkaClient fixture""" + (client,) = kafka_broker.get_clients(cnt=1, client_id='%s_client' % (request.node.name,)) + yield client + client.close() + +@pytest.fixture +def kafka_consumer(kafka_consumer_factory): + """Return a KafkaConsumer fixture""" + return kafka_consumer_factory() + +@pytest.fixture +def kafka_consumer_factory(kafka_broker, topic, request): + """Return a KafkaConsumer factory fixture""" + _consumer = [None] + + def factory(**kafka_consumer_params): + params = {} if kafka_consumer_params is None else kafka_consumer_params.copy() + params.setdefault('client_id', 'consumer_%s' % (request.node.name,)) + _consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=[topic], **params)) + return _consumer[0] + + yield factory + + if _consumer[0]: + _consumer[0].close() + +@pytest.fixture +def kafka_producer(kafka_producer_factory): + """Return a KafkaProducer fixture""" + yield kafka_producer_factory() + +@pytest.fixture +def kafka_producer_factory(kafka_broker, request): + """Return a KafkaProduce factory fixture""" + _producer = [None] + + def factory(**kafka_producer_params): + params = {} if kafka_producer_params is None else kafka_producer_params.copy() + params.setdefault('client_id', 'producer_%s' % (request.node.name,)) + _producer[0] = next(kafka_broker.get_producers(cnt=1, **params)) + return _producer[0] + + yield factory + + if _producer[0]: + _producer[0].close() + +@pytest.fixture +def topic(kafka_broker, request): + """Return a topic fixture""" + topic_name = '%s_%s' % (request.node.name, random_string(10)) + kafka_broker.create_topics([topic_name]) + return topic_name @pytest.fixture def conn(mocker): + """Return a connection mocker fixture""" from kafka.conn import ConnectionStates from kafka.future import Future from kafka.protocol.metadata import MetadataResponse diff --git a/test/fixtures.py b/test/fixtures.py index 1c418fd7e..493a664a5 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -4,29 +4,55 @@ import logging import os import os.path -import shutil +import random +import socket +import string import subprocess -import tempfile import time import uuid -from six.moves import urllib +import py +from six.moves import urllib, xrange from six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401 +from kafka import errors, KafkaConsumer, KafkaProducer, SimpleClient +from kafka.client_async import KafkaClient +from kafka.protocol.admin import CreateTopicsRequest +from kafka.protocol.metadata import MetadataRequest from test.service import ExternalService, SpawnedService -from test.testutil import get_open_port - log = logging.getLogger(__name__) +def random_string(length): + return "".join(random.choice(string.ascii_letters) for i in xrange(length)) + +def version_str_to_list(version_str): + return tuple(map(int, version_str.split('.'))) # e.g., (0, 8, 1, 1) + +def version(): + if 'KAFKA_VERSION' not in os.environ: + return () + return version_str_to_list(os.environ['KAFKA_VERSION']) + +def get_open_port(): + sock = socket.socket() + sock.bind(("", 0)) + port = sock.getsockname()[1] + sock.close() + return port class Fixture(object): kafka_version = os.environ.get('KAFKA_VERSION', '0.11.0.2') scala_version = os.environ.get("SCALA_VERSION", '2.8.0') - project_root = os.environ.get('PROJECT_ROOT', os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) - kafka_root = os.environ.get("KAFKA_ROOT", os.path.join(project_root, 'servers', kafka_version, "kafka-bin")) + project_root = os.environ.get('PROJECT_ROOT', + os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + kafka_root = os.environ.get("KAFKA_ROOT", + os.path.join(project_root, 'servers', kafka_version, "kafka-bin")) ivy_root = os.environ.get('IVY_ROOT', os.path.expanduser("~/.ivy2/cache")) + def __init__(self): + self.child = None + @classmethod def download_official_distribution(cls, kafka_version=None, @@ -71,31 +97,34 @@ def test_resource(cls, filename): @classmethod def kafka_run_class_args(cls, *args): result = [os.path.join(cls.kafka_root, 'bin', 'kafka-run-class.sh')] - result.extend(args) + result.extend([str(arg) for arg in args]) return result def kafka_run_class_env(self): env = os.environ.copy() - env['KAFKA_LOG4J_OPTS'] = "-Dlog4j.configuration=file:%s" % self.test_resource("log4j.properties") + env['KAFKA_LOG4J_OPTS'] = "-Dlog4j.configuration=file:%s" % \ + self.test_resource("log4j.properties") return env @classmethod def render_template(cls, source_file, target_file, binding): - log.info('Rendering %s from template %s', target_file, source_file) + log.info('Rendering %s from template %s', target_file.strpath, source_file) with open(source_file, "r") as handle: template = handle.read() assert len(template) > 0, 'Empty template %s' % source_file - with open(target_file, "w") as handle: + with open(target_file.strpath, "w") as handle: handle.write(template.format(**binding)) handle.flush() os.fsync(handle) # fsync directory for durability # https://blog.gocept.com/2013/07/15/reliable-file-updates-with-python/ - dirfd = os.open(os.path.dirname(target_file), os.O_DIRECTORY) + dirfd = os.open(os.path.dirname(target_file.strpath), os.O_DIRECTORY) os.fsync(dirfd) os.close(dirfd) + def dump_logs(self): + self.child.dump_logs() class ZookeeperFixture(Fixture): @classmethod @@ -111,32 +140,36 @@ def instance(cls): fixture.open() return fixture - def __init__(self, host, port): + def __init__(self, host, port, tmp_dir=None): + super(ZookeeperFixture, self).__init__() self.host = host self.port = port - self.tmp_dir = None - self.child = None + self.tmp_dir = tmp_dir def kafka_run_class_env(self): env = super(ZookeeperFixture, self).kafka_run_class_env() - env['LOG_DIR'] = os.path.join(self.tmp_dir, 'logs') + env['LOG_DIR'] = self.tmp_dir.join('logs').strpath return env def out(self, message): log.info("*** Zookeeper [%s:%s]: %s", self.host, self.port or '(auto)', message) def open(self): - self.tmp_dir = tempfile.mkdtemp() + if self.tmp_dir is None: + self.tmp_dir = py.path.local.mkdtemp() #pylint: disable=no-member + self.tmp_dir.ensure(dir=True) + self.out("Running local instance...") log.info(" host = %s", self.host) log.info(" port = %s", self.port or '(auto)') - log.info(" tmp_dir = %s", self.tmp_dir) + log.info(" tmp_dir = %s", self.tmp_dir.strpath) # Configure Zookeeper child process template = self.test_resource("zookeeper.properties") - properties = os.path.join(self.tmp_dir, "zookeeper.properties") - args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", properties) + properties = self.tmp_dir.join("zookeeper.properties") + args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", + properties.strpath) env = self.kafka_run_class_env() # Party! @@ -174,7 +207,7 @@ def close(self): self.child.stop() self.child = None self.out("Done!") - shutil.rmtree(self.tmp_dir) + self.tmp_dir.remove() def __del__(self): self.close() @@ -182,9 +215,11 @@ def __del__(self): class KafkaFixture(Fixture): @classmethod - def instance(cls, broker_id, zk_host, zk_port, zk_chroot=None, + def instance(cls, broker_id, zookeeper, zk_chroot=None, host=None, port=None, - transport='PLAINTEXT', replicas=1, partitions=2): + transport='PLAINTEXT', replicas=1, partitions=2, + sasl_mechanism='PLAIN', auto_create_topic=True, tmp_dir=None): + if zk_chroot is None: zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_") if "KAFKA_URI" in os.environ: @@ -195,19 +230,29 @@ def instance(cls, broker_id, zk_host, zk_port, zk_chroot=None, if host is None: host = "localhost" fixture = KafkaFixture(host, port, broker_id, - zk_host, zk_port, zk_chroot, + zookeeper, zk_chroot, transport=transport, - replicas=replicas, partitions=partitions) + replicas=replicas, partitions=partitions, + sasl_mechanism=sasl_mechanism, + auto_create_topic=auto_create_topic, + tmp_dir=tmp_dir) + fixture.open() return fixture - def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, - replicas=1, partitions=2, transport='PLAINTEXT'): + def __init__(self, host, port, broker_id, zookeeper, zk_chroot, + replicas=1, partitions=2, transport='PLAINTEXT', + sasl_mechanism='PLAIN', auto_create_topic=True, + tmp_dir=None): + super(KafkaFixture, self).__init__() + self.host = host self.port = port self.broker_id = broker_id + self.auto_create_topic = auto_create_topic self.transport = transport.upper() + self.sasl_mechanism = sasl_mechanism.upper() self.ssl_dir = self.test_resource('ssl') # TODO: checking for port connection would be better than scanning logs @@ -215,67 +260,55 @@ def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, # The logging format changed slightly in 1.0.0 self.start_pattern = r"\[Kafka ?Server (id=)?%d\],? started" % broker_id - self.zk_host = zk_host - self.zk_port = zk_port + self.zookeeper = zookeeper self.zk_chroot = zk_chroot + # Add the attributes below for the template binding + self.zk_host = self.zookeeper.host + self.zk_port = self.zookeeper.port self.replicas = replicas self.partitions = partitions - self.tmp_dir = None - self.child = None + self.tmp_dir = tmp_dir self.running = False + self._client = None + + def bootstrap_server(self): + return '%s:%d' % (self.host, self.port) + def kafka_run_class_env(self): env = super(KafkaFixture, self).kafka_run_class_env() - env['LOG_DIR'] = os.path.join(self.tmp_dir, 'logs') + env['LOG_DIR'] = self.tmp_dir.join('logs').strpath return env def out(self, message): log.info("*** Kafka [%s:%s]: %s", self.host, self.port or '(auto)', message) - def open(self): - if self.running: - self.out("Instance already running") - return - - self.tmp_dir = tempfile.mkdtemp() - self.out("Running local instance...") - log.info(" host = %s", self.host) - log.info(" port = %s", self.port or '(auto)') - log.info(" transport = %s", self.transport) - log.info(" broker_id = %s", self.broker_id) - log.info(" zk_host = %s", self.zk_host) - log.info(" zk_port = %s", self.zk_port) - log.info(" zk_chroot = %s", self.zk_chroot) - log.info(" replicas = %s", self.replicas) - log.info(" partitions = %s", self.partitions) - log.info(" tmp_dir = %s", self.tmp_dir) - - # Create directories - os.mkdir(os.path.join(self.tmp_dir, "logs")) - os.mkdir(os.path.join(self.tmp_dir, "data")) - + def _create_zk_chroot(self): self.out("Creating Zookeeper chroot node...") args = self.kafka_run_class_args("org.apache.zookeeper.ZooKeeperMain", - "-server", "%s:%d" % (self.zk_host, self.zk_port), + "-server", + "%s:%d" % (self.zookeeper.host, + self.zookeeper.port), "create", "/%s" % self.zk_chroot, "kafka-python") env = self.kafka_run_class_env() proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - if proc.wait() != 0: + if proc.wait() != 0 or proc.returncode != 0: self.out("Failed to create Zookeeper chroot node") self.out(proc.stdout.read()) self.out(proc.stderr.read()) raise RuntimeError("Failed to create Zookeeper chroot node") - self.out("Done!") + self.out("Kafka chroot created in Zookeeper!") + def start(self): # Configure Kafka child process - properties = os.path.join(self.tmp_dir, "kafka.properties") + properties = self.tmp_dir.join("kafka.properties") template = self.test_resource("kafka.properties") - args = self.kafka_run_class_args("kafka.Kafka", properties) + args = self.kafka_run_class_args("kafka.Kafka", properties.strpath) env = self.kafka_run_class_env() timeout = 5 @@ -305,14 +338,45 @@ def open(self): backoff += 1 else: raise RuntimeError('Failed to start KafkaInstance before max_timeout') + + (self._client,) = self.get_clients(1, '_internal_client') + self.out("Done!") self.running = True + + def open(self): + if self.running: + self.out("Instance already running") + return + + # Create directories + if self.tmp_dir is None: + self.tmp_dir = py.path.local.mkdtemp() #pylint: disable=no-member + self.tmp_dir.ensure(dir=True) + self.tmp_dir.ensure('logs', dir=True) + self.tmp_dir.ensure('data', dir=True) + + self.out("Running local instance...") + log.info(" host = %s", self.host) + log.info(" port = %s", self.port or '(auto)') + log.info(" transport = %s", self.transport) + log.info(" broker_id = %s", self.broker_id) + log.info(" zk_host = %s", self.zookeeper.host) + log.info(" zk_port = %s", self.zookeeper.port) + log.info(" zk_chroot = %s", self.zk_chroot) + log.info(" replicas = %s", self.replicas) + log.info(" partitions = %s", self.partitions) + log.info(" tmp_dir = %s", self.tmp_dir.strpath) + + self._create_zk_chroot() + self.start() + atexit.register(self.close) def __del__(self): self.close() - def close(self): + def stop(self): if not self.running: self.out("Instance already stopped") return @@ -320,6 +384,117 @@ def close(self): self.out("Stopping...") self.child.stop() self.child = None - self.out("Done!") - shutil.rmtree(self.tmp_dir) self.running = False + self.out("Stopped!") + + def close(self): + self.stop() + if self.tmp_dir is not None: + self.tmp_dir.remove() + self.tmp_dir = None + self.out("Done!") + + def dump_logs(self): + super(KafkaFixture, self).dump_logs() + self.zookeeper.dump_logs() + + def _send_request(self, request, timeout=None): + def _failure(error): + raise error + retries = 10 + while True: + node_id = self._client.least_loaded_node() + for ready_retry in range(40): + if self._client.ready(node_id, False): + break + time.sleep(.1) + else: + raise RuntimeError('Could not connect to broker with node id %d' % (node_id,)) + + try: + future = self._client.send(node_id, request) + future.error_on_callbacks = True + future.add_errback(_failure) + return self._client.poll(future=future, timeout_ms=timeout) + except Exception as exc: + time.sleep(1) + retries -= 1 + if retries == 0: + raise exc + else: + pass # retry + + def _create_topic(self, topic_name, num_partitions, replication_factor, timeout_ms=10000): + if num_partitions is None: + num_partitions = self.partitions + if replication_factor is None: + replication_factor = self.replicas + + # Try different methods to create a topic, from the fastest to the slowest + if self.auto_create_topic and \ + num_partitions == self.partitions and \ + replication_factor == self.replicas: + self._send_request(MetadataRequest[0]([topic_name])) + elif version() >= (0, 10, 1, 0): + request = CreateTopicsRequest[0]([(topic_name, num_partitions, + replication_factor, [], [])], timeout_ms) + result = self._send_request(request, timeout=timeout_ms) + for topic_result in result[0].topic_error_codes: + error_code = topic_result[1] + if error_code != 0: + raise errors.for_code(error_code) + else: + args = self.kafka_run_class_args('kafka.admin.TopicCommand', + '--zookeeper', '%s:%s/%s' % (self.zookeeper.host, + self.zookeeper.port, + self.zk_chroot), + '--create', + '--topic', topic_name, + '--partitions', self.partitions \ + if num_partitions is None else num_partitions, + '--replication-factor', self.replicas \ + if replication_factor is None \ + else replication_factor) + if version() >= (0, 10): + args.append('--if-not-exists') + env = self.kafka_run_class_env() + proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + ret = proc.wait() + if ret != 0 or proc.returncode != 0: + output = proc.stdout.read() + if not 'kafka.common.TopicExistsException' in output: + self.out("Failed to create topic %s" % (topic_name,)) + self.out(output) + self.out(proc.stderr.read()) + raise RuntimeError("Failed to create topic %s" % (topic_name,)) + + def create_topics(self, topic_names, num_partitions=None, replication_factor=None): + for topic_name in topic_names: + self._create_topic(topic_name, num_partitions, replication_factor) + + def get_clients(self, cnt=1, client_id=None): + if client_id is None: + client_id = 'client' + return tuple(KafkaClient(client_id='%s_%s' % (client_id, random_string(4)), + bootstrap_servers=self.bootstrap_server()) for x in range(cnt)) + + def get_consumers(self, cnt, topics, **params): + params.setdefault('client_id', 'consumer') + params.setdefault('heartbeat_interval_ms', 500) + params['bootstrap_servers'] = self.bootstrap_server() + client_id = params['client_id'] + for x in range(cnt): + params['client_id'] = '%s_%s' % (client_id, random_string(4)) + yield KafkaConsumer(*topics, **params) + + def get_producers(self, cnt, **params): + params.setdefault('client_id', 'producer') + params['bootstrap_servers'] = self.bootstrap_server() + client_id = params['client_id'] + for x in range(cnt): + params['client_id'] = '%s_%s' % (client_id, random_string(4)) + yield KafkaProducer(**params) + + def get_simple_client(self, **params): + params.setdefault('client_id', 'simple_client') + return SimpleClient(self.bootstrap_server(), **params) diff --git a/test/test_client_integration.py b/test/test_client_integration.py index 742572d5e..df0faef69 100644 --- a/test/test_client_integration.py +++ b/test/test_client_integration.py @@ -17,7 +17,7 @@ def setUpClass(cls): # noqa return cls.zk = ZookeeperFixture.instance() - cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) + cls.server = KafkaFixture.instance(0, cls.zk) @classmethod def tearDownClass(cls): # noqa diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 40eec1484..fe4e45495 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -21,9 +21,30 @@ from test.fixtures import ZookeeperFixture, KafkaFixture from test.testutil import ( - KafkaIntegrationTestCase, kafka_versions, random_string, Timer + KafkaIntegrationTestCase, kafka_versions, random_string, Timer, + send_messages ) +def test_kafka_consumer(simple_client, topic, kafka_consumer_factory): + """Test KafkaConsumer + """ + kafka_consumer = kafka_consumer_factory(auto_offset_reset='earliest') + + send_messages(simple_client, topic, 0, range(0, 100)) + send_messages(simple_client, topic, 1, range(100, 200)) + + cnt = 0 + messages = {0: set(), 1: set()} + for message in kafka_consumer: + logging.debug("Consumed message %s", repr(message)) + cnt += 1 + messages[message.partition].add(message.offset) + if cnt >= 200: + break + + assert len(messages[0]) == 100 + assert len(messages[1]) == 100 + class TestConsumerIntegration(KafkaIntegrationTestCase): maxDiff = None @@ -35,9 +56,9 @@ def setUpClass(cls): cls.zk = ZookeeperFixture.instance() chroot = random_string(10) - cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port, + cls.server1 = KafkaFixture.instance(0, cls.zk, zk_chroot=chroot) - cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port, + cls.server2 = KafkaFixture.instance(1, cls.zk, zk_chroot=chroot) cls.server = cls.server1 # Bootstrapping server @@ -501,24 +522,6 @@ def test_fetch_buffer_size(self): messages = [ message for message in consumer ] self.assertEqual(len(messages), 2) - def test_kafka_consumer(self): - self.send_messages(0, range(0, 100)) - self.send_messages(1, range(100, 200)) - - # Start a consumer - consumer = self.kafka_consumer(auto_offset_reset='earliest') - n = 0 - messages = {0: set(), 1: set()} - for m in consumer: - logging.debug("Consumed message %s" % repr(m)) - n += 1 - messages[m.partition].add(m.offset) - if n >= 200: - break - - self.assertEqual(len(messages[0]), 100) - self.assertEqual(len(messages[1]), 100) - def test_kafka_consumer__blocking(self): TIMEOUT_MS = 500 consumer = self.kafka_consumer(auto_offset_reset='earliest', diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index 9141947ac..8531cfbe8 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -29,10 +29,9 @@ def setUp(self): # mini zookeeper, 3 kafka brokers self.zk = ZookeeperFixture.instance() - kk_args = [self.zk.host, self.zk.port] kk_kwargs = {'zk_chroot': zk_chroot, 'replicas': replicas, 'partitions': partitions} - self.brokers = [KafkaFixture.instance(i, *kk_args, **kk_kwargs) + self.brokers = [KafkaFixture.instance(i, self.zk, **kk_kwargs) for i in range(replicas)] hosts = ['%s:%d' % (b.host, b.port) for b in self.brokers] diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index a304e83b6..ca0da6abd 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -15,7 +15,50 @@ from kafka.structs import FetchRequestPayload, ProduceRequestPayload from test.fixtures import ZookeeperFixture, KafkaFixture -from test.testutil import KafkaIntegrationTestCase, kafka_versions +from test.testutil import KafkaIntegrationTestCase, kafka_versions, current_offset + +# TODO: This duplicates a TestKafkaProducerIntegration method temporarily +# while the migration to pytest is in progress +def assert_produce_request(client, topic, messages, initial_offset, message_ct, + partition=0): + """Verify the correctness of a produce request + """ + produce = ProduceRequestPayload(topic, partition, messages=messages) + + # There should only be one response message from the server. + # This will throw an exception if there's more than one. + resp = client.send_produce_request([produce]) + assert_produce_response(resp, initial_offset) + + assert current_offset(client, topic, partition) == initial_offset + message_ct + +def assert_produce_response(resp, initial_offset): + """Verify that a produce response is well-formed + """ + assert len(resp) == 1 + assert resp[0].error == 0 + assert resp[0].offset == initial_offset + +def test_produce_many_simple(simple_client, topic): + """Test multiple produces using the SimpleClient + """ + start_offset = current_offset(simple_client, topic, 0) + + assert_produce_request( + simple_client, topic, + [create_message(("Test message %d" % i).encode('utf-8')) + for i in range(100)], + start_offset, + 100, + ) + + assert_produce_request( + simple_client, topic, + [create_message(("Test message %d" % i).encode('utf-8')) + for i in range(100)], + start_offset+100, + 100, + ) class TestKafkaProducerIntegration(KafkaIntegrationTestCase): @@ -26,7 +69,7 @@ def setUpClass(cls): # noqa return cls.zk = ZookeeperFixture.instance() - cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) + cls.server = KafkaFixture.instance(0, cls.zk) @classmethod def tearDownClass(cls): # noqa @@ -36,23 +79,6 @@ def tearDownClass(cls): # noqa cls.server.close() cls.zk.close() - def test_produce_many_simple(self): - start_offset = self.current_offset(self.topic, 0) - - self.assert_produce_request( - [create_message(("Test message %d" % i).encode('utf-8')) - for i in range(100)], - start_offset, - 100, - ) - - self.assert_produce_request( - [create_message(("Test message %d" % i).encode('utf-8')) - for i in range(100)], - start_offset+100, - 100, - ) - def test_produce_10k_simple(self): start_offset = self.current_offset(self.topic, 0) diff --git a/test/testutil.py b/test/testutil.py index 0ec1cff7e..850e925a4 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -1,36 +1,20 @@ -import functools -import logging import operator import os -import random import socket -import string import time import uuid -from six.moves import xrange +import decorator +import pytest from . import unittest -from kafka import SimpleClient +from kafka import SimpleClient, create_message from kafka.errors import LeaderNotAvailableError, KafkaTimeoutError -from kafka.structs import OffsetRequestPayload - -__all__ = [ - 'random_string', - 'get_open_port', - 'kafka_versions', - 'KafkaIntegrationTestCase', - 'Timer', -] - -def random_string(l): - return "".join(random.choice(string.ascii_letters) for i in xrange(l)) +from kafka.structs import OffsetRequestPayload, ProduceRequestPayload +from test.fixtures import random_string, version_str_to_list, version as kafka_version #pylint: disable=wrong-import-order def kafka_versions(*versions): - def version_str_to_list(s): - return list(map(int, s.split('.'))) # e.g., [0, 8, 1, 1] - def construct_lambda(s): if s[0].isdigit(): op_str = '=' @@ -54,25 +38,25 @@ def construct_lambda(s): } op = op_map[op_str] version = version_str_to_list(v_str) - return lambda a: op(version_str_to_list(a), version) + return lambda a: op(a, version) validators = map(construct_lambda, versions) - def kafka_versions(func): - @functools.wraps(func) - def wrapper(self): - kafka_version = os.environ.get('KAFKA_VERSION') + def real_kafka_versions(func): + def wrapper(func, *args, **kwargs): + version = kafka_version() - if not kafka_version: - self.skipTest("no kafka version set in KAFKA_VERSION env var") + if not version: + pytest.skip("no kafka version set in KAFKA_VERSION env var") for f in validators: - if not f(kafka_version): - self.skipTest("unsupported kafka version") + if not f(version): + pytest.skip("unsupported kafka version") - return func(self) - return wrapper - return kafka_versions + return func(*args, **kwargs) + return decorator.decorator(wrapper, func) + + return real_kafka_versions def get_open_port(): sock = socket.socket() @@ -81,6 +65,40 @@ def get_open_port(): sock.close() return port +_MESSAGES = {} +def msg(message): + """Format, encode and deduplicate a message + """ + global _MESSAGES #pylint: disable=global-statement + if message not in _MESSAGES: + _MESSAGES[message] = '%s-%s' % (message, str(uuid.uuid4())) + + return _MESSAGES[message].encode('utf-8') + +def send_messages(client, topic, partition, messages): + """Send messages to a topic's partition + """ + messages = [create_message(msg(str(m))) for m in messages] + produce = ProduceRequestPayload(topic, partition, messages=messages) + resp, = client.send_produce_request([produce]) + assert resp.error == 0 + + return [x.value for x in messages] + +def current_offset(client, topic, partition, kafka_broker=None): + """Get the current offset of a topic's partition + """ + try: + offsets, = client.send_offset_request([OffsetRequestPayload(topic, + partition, -1, 1)]) + except Exception: + # XXX: We've seen some UnknownErrors here and can't debug w/o server logs + if kafka_broker: + kafka_broker.dump_logs() + raise + else: + return offsets.offsets[0] + class KafkaIntegrationTestCase(unittest.TestCase): create_client = True topic = None @@ -122,7 +140,8 @@ def tearDown(self): def current_offset(self, topic, partition): try: - offsets, = self.client.send_offset_request([OffsetRequestPayload(topic, partition, -1, 1)]) + offsets, = self.client.send_offset_request([OffsetRequestPayload(topic, + partition, -1, 1)]) except Exception: # XXX: We've seen some UnknownErrors here and can't debug w/o server logs self.zk.child.dump_logs() @@ -132,7 +151,7 @@ def current_offset(self, topic, partition): return offsets.offsets[0] def msgs(self, iterable): - return [ self.msg(x) for x in iterable ] + return [self.msg(x) for x in iterable] def msg(self, s): if s not in self._messages: diff --git a/tox.ini b/tox.ini index 35dc84207..ad95f9374 100644 --- a/tox.ini +++ b/tox.ini @@ -20,6 +20,7 @@ deps = xxhash crc32c py26: unittest2 + decorator commands = py.test {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka --cov-config=.covrc} setenv =