Skip to content

Commit

Permalink
Introduce new fixtures to prepare for migration to pytest.
Browse files Browse the repository at this point in the history
This commits adds new pytest fixtures in prepation for the
migration of unittest.TestCases to pytest test cases. The handling
of temporary dir creation was also changed so that we can use
the pytest tmpdir fixture after the migration.
  • Loading branch information
asdaraujo authored and jeffwidman committed Feb 21, 2018
1 parent 0f5d35f commit a1869c4
Show file tree
Hide file tree
Showing 9 changed files with 460 additions and 157 deletions.
1 change: 1 addition & 0 deletions pylint.rc
@@ -1,5 +1,6 @@
[TYPECHECK]
ignored-classes=SyncManager,_socketobject
generated-members=py.*

[MESSAGES CONTROL]
disable=E1129
113 changes: 96 additions & 17 deletions test/conftest.py
@@ -1,38 +1,117 @@
from __future__ import absolute_import

import os
import inspect

import pytest
from decorator import decorate

from test.fixtures import KafkaFixture, ZookeeperFixture

from test.testutil import kafka_version, random_string

@pytest.fixture(scope="module")
def version():
if 'KAFKA_VERSION' not in os.environ:
return ()
return tuple(map(int, os.environ['KAFKA_VERSION'].split('.')))

"""Return the Kafka version set in the OS environment"""
return kafka_version()

@pytest.fixture(scope="module")
def zookeeper(version, request):
assert version
zk = ZookeeperFixture.instance()
yield zk
zk.close()
def zookeeper():
"""Return a Zookeeper fixture"""
zk_instance = ZookeeperFixture.instance()
yield zk_instance
zk_instance.close()

@pytest.fixture(scope="module")
def kafka_broker(kafka_broker_factory):
"""Return a Kafka broker fixture"""
return kafka_broker_factory()[0]

@pytest.fixture(scope="module")
def kafka_broker(version, zookeeper, request):
assert version
k = KafkaFixture.instance(0, zookeeper.host, zookeeper.port,
partitions=4)
yield k
k.close()
def kafka_broker_factory(version, zookeeper):
"""Return a Kafka broker fixture factory"""
assert version, 'KAFKA_VERSION must be specified to run integration tests'

_brokers = []
def factory(**broker_params):
params = {} if broker_params is None else broker_params.copy()
params.setdefault('partitions', 4)
num_brokers = params.pop('num_brokers', 1)
brokers = tuple(KafkaFixture.instance(x, zookeeper, **params)
for x in range(num_brokers))
_brokers.extend(brokers)
return brokers

yield factory

for broker in _brokers:
broker.close()

@pytest.fixture
def simple_client(kafka_broker, request, topic):
"""Return a SimpleClient fixture"""
client = kafka_broker.get_simple_client(client_id='%s_client' % (request.node.name,))
client.ensure_topic_exists(topic)
yield client
client.close()

@pytest.fixture
def kafka_client(kafka_broker, request):
"""Return a KafkaClient fixture"""
(client,) = kafka_broker.get_clients(cnt=1, client_id='%s_client' % (request.node.name,))
yield client
client.close()

@pytest.fixture
def kafka_consumer(kafka_consumer_factory):
"""Return a KafkaConsumer fixture"""
return kafka_consumer_factory()

@pytest.fixture
def kafka_consumer_factory(kafka_broker, topic, request):
"""Return a KafkaConsumer factory fixture"""
_consumer = [None]

def factory(**kafka_consumer_params):
params = {} if kafka_consumer_params is None else kafka_consumer_params.copy()
params.setdefault('client_id', 'consumer_%s' % (request.node.name,))
_consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=[topic], **params))
return _consumer[0]

yield factory

if _consumer[0]:
_consumer[0].close()

@pytest.fixture
def kafka_producer(kafka_producer_factory):
"""Return a KafkaProducer fixture"""
yield kafka_producer_factory()

@pytest.fixture
def kafka_producer_factory(kafka_broker, request):
"""Return a KafkaProduce factory fixture"""
_producer = [None]

def factory(**kafka_producer_params):
params = {} if kafka_producer_params is None else kafka_producer_params.copy()
params.setdefault('client_id', 'producer_%s' % (request.node.name,))
_producer[0] = next(kafka_broker.get_producers(cnt=1, **params))
return _producer[0]

yield factory

if _producer[0]:
_producer[0].close()

@pytest.fixture
def topic(kafka_broker, request):
"""Return a topic fixture"""
topic_name = '%s_%s' % (request.node.name, random_string(10))
kafka_broker.create_topics([topic_name])
return topic_name

@pytest.fixture
def conn(mocker):
"""Return a connection mocker fixture"""
from kafka.conn import ConnectionStates
from kafka.future import Future
from kafka.protocol.metadata import MetadataResponse
Expand Down

0 comments on commit a1869c4

Please sign in to comment.