Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #839 from Parsely/bugfix/pytest_skip
Browse files Browse the repository at this point in the history
upgrade pytest dependency
  • Loading branch information
Emmett J. Butler committed Jul 24, 2018
2 parents 0f9a0f8 + f771c04 commit 23a898b
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 83 deletions.
2 changes: 1 addition & 1 deletion test-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
lz4==2.0.2
lz4tools==1.3.1.2
pytest
pytest==3.6.3
pytest-cov
python-snappy
mock
Expand Down
54 changes: 0 additions & 54 deletions tests/pykafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,54 +0,0 @@
import pytest


def patch_subclass(parent, skip_condition):
"""Work around a pytest.mark.skipif bug
https://github.com/pytest-dev/pytest/issues/568
The issue causes all subclasses of a TestCase subclass to be skipped if any one
of them is skipped.
This fix circumvents the issue by overriding Python's existing subclassing mechanism.
Instead of having `cls` be a subclass of `parent`, this decorator adds each attribute
of `parent` to `cls` without using Python inheritance. When appropriate, it also adds
a boolean condition under which to skip tests for the decorated class.
:param parent: The "superclass" from which the decorated class should inherit
its non-overridden attributes
:type parent: unittest2.TestCase
:param skip_condition: A boolean condition that, when True, will cause all tests in
the decorated class to be skipped
:type skip_condition: bool
"""
def patcher(cls):
def build_skipped_method(method, cls, cond=None):
if cond is None:
cond = False
if hasattr(method, "skip_condition"):
cond = cond or method.skip_condition(cls)

@pytest.mark.skipif(cond, reason="")
def _wrapper(self):
return method(self)
return _wrapper

# two passes over parent required so that skips have access to all class
# attributes
for attr in parent.__dict__:
if attr in cls.__dict__:
continue
if not attr.startswith("test_"):
setattr(cls, attr, parent.__dict__[attr])

for attr in cls.__dict__:
if attr.startswith("test_"):
setattr(cls, attr, build_skipped_method(cls.__dict__[attr],
cls, skip_condition))

for attr in parent.__dict__:
if attr.startswith("test_"):
setattr(cls, attr, build_skipped_method(parent.__dict__[attr],
cls, skip_condition))
return cls
return patcher
11 changes: 5 additions & 6 deletions tests/pykafka/rdkafka/test_simple_consumer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import pytest
import unittest2

from tests.pykafka import test_simpleconsumer, test_balancedconsumer, patch_subclass
from tests.pykafka import test_simpleconsumer, test_balancedconsumer
from pykafka.utils.compat import range
try:
from pykafka.rdkafka import _rd_kafka # noqa
Expand All @@ -10,8 +9,8 @@
RDKAFKA = False # C extension not built


@patch_subclass(test_simpleconsumer.TestSimpleConsumer, not RDKAFKA)
class TestRdKafkaSimpleConsumer(unittest2.TestCase):
@pytest.mark.skipif(not RDKAFKA, reason="rdkafka")
class TestRdKafkaSimpleConsumer(test_simpleconsumer.TestSimpleConsumer):
USE_RDKAFKA = True

def test_update_cluster(self):
Expand Down Expand Up @@ -69,6 +68,6 @@ def _latest_partition_offsets_by_reading(consumer, n_reads):
return latest_offs


@patch_subclass(test_balancedconsumer.BalancedConsumerIntegrationTests, not RDKAFKA)
class RdkBalancedConsumerIntegrationTests(unittest2.TestCase):
@pytest.mark.skipif(not RDKAFKA, reason="rdkafka")
class RdkBalancedConsumerIntegrationTests(test_balancedconsumer.BalancedConsumerIntegrationTests):
USE_RDKAFKA = True
29 changes: 16 additions & 13 deletions tests/pykafka/test_balancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
RangeProtocol)
from pykafka.test.utils import get_cluster, stop_cluster
from pykafka.utils.compat import range, iterkeys, iteritems
from tests.pykafka import patch_subclass


kafka_version_string = os.environ.get('KAFKA_VERSION', '0.8')
Expand Down Expand Up @@ -242,6 +241,8 @@ def test_a_rebalance_unblock_event(self):
https://github.com/Parsely/pykafka/issues/701
"""
if self.USE_GEVENT:
pytest.skip("Unresolved failure")
group = b'test_rebalance'
consumer_a = self.get_balanced_consumer(group, consumer_timeout_ms=-1)

Expand All @@ -264,7 +265,6 @@ def test_a_rebalance_unblock_event(self):

# consumer thread would die in case of any rebalancing errors
self.assertTrue(consumer_a_thread.is_alive() and consumer_b_thread.is_alive())
test_a_rebalance_unblock_event.skip_condition = lambda cls: cls.USE_GEVENT

def test_rebalance_callbacks(self):
def on_rebalance(cns, old_partition_offsets, new_partition_offsets):
Expand Down Expand Up @@ -419,6 +419,8 @@ def test_external_kazoo_client(self):
This currently doesn't assert anything, it just rules out any trivial
exceptions in the code path that uses an external KazooClient
"""
if self.MANAGED_CONSUMER:
pytest.skip("Managed consumer doesn't use zookeeper")
zk = KazooClient(self.kafka.zookeeper)
zk.start()

Expand All @@ -429,7 +431,6 @@ def test_external_kazoo_client(self):
use_rdkafka=self.USE_RDKAFKA)
[msg for msg in consumer]
consumer.stop()
test_external_kazoo_client.skip_condition = lambda cls: cls.MANAGED_CONSUMER

def test_no_partitions(self):
"""Ensure a consumer assigned no partitions doesn't fail"""
Expand Down Expand Up @@ -461,6 +462,8 @@ def test_zk_conn_lost(self):
See also github issue #204.
"""
if self.MANAGED_CONSUMER:
pytest.skip("Managed consumer doesn't use zookeeper")
check_partitions = lambda c: c._get_held_partitions() == c._partitions
zk = self.get_zk()
zk.start()
Expand Down Expand Up @@ -498,7 +501,6 @@ def test_zk_conn_lost(self):
zk.stop()
except:
pass
test_zk_conn_lost.skip_condition = lambda cls: cls.MANAGED_CONSUMER

def wait_for_rebalancing(self, *balanced_consumers):
"""Test helper that loops while rebalancing is ongoing
Expand All @@ -520,21 +522,22 @@ def wait_for_rebalancing(self, *balanced_consumers):
raise AssertionError("Rebalancing failed")


@patch_subclass(BalancedConsumerIntegrationTests,
platform.python_implementation() == "PyPy" or gevent is None)
class BalancedConsumerGEventIntegrationTests(unittest2.TestCase):
@pytest.mark.skipif(platform.python_implementation() == "PyPy" or gevent is None,
reason="Unresolved crashes")
class BalancedConsumerGEventIntegrationTests(BalancedConsumerIntegrationTests):
USE_GEVENT = True


@patch_subclass(BalancedConsumerIntegrationTests, kafka_version < version_09)
class ManagedBalancedConsumerIntegrationTests(unittest2.TestCase):
@pytest.mark.skipif(kafka_version < version_09,
reason="Managed consumer unsupported until 0.9")
class ManagedBalancedConsumerIntegrationTests(BalancedConsumerIntegrationTests):
MANAGED_CONSUMER = True


@patch_subclass(
BalancedConsumerIntegrationTests,
platform.python_implementation() == "PyPy" or kafka_version < version_09 or gevent is None)
class ManagedBalancedConsumerGEventIntegrationTests(unittest2.TestCase):
@pytest.mark.skipif(platform.python_implementation() == "PyPy" or
kafka_version < version_09 or gevent is None,
reason="Unresolved crashes")
class ManagedBalancedConsumerGEventIntegrationTests(BalancedConsumerIntegrationTests):
MANAGED_CONSUMER = True
USE_GEVENT = True

Expand Down
8 changes: 4 additions & 4 deletions tests/pykafka/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from pykafka.common import CompressionType
from pykafka.producer import OwnedBroker
from pykafka.utils import serialize_utf8, deserialize_utf8
from tests.pykafka import patch_subclass

kafka_version = os.environ.get('KAFKA_VERSION', '0.8.0')

Expand Down Expand Up @@ -185,6 +184,8 @@ def test_async_produce_queue_full(self):

def test_async_produce_lingers(self):
"""Ensure that the context manager waits for linger_ms milliseconds"""
if self.USE_RDKAFKA:
pytest.skip("rdkafka uses different lingering mechanism")
linger = 3
consumer = self._get_consumer()
with self._get_producer(linger_ms=linger * 1000) as producer:
Expand All @@ -194,7 +195,6 @@ def test_async_produce_lingers(self):
self.assertTrue(int(time.time() - start) >= int(linger))
consumer.consume()
consumer.consume()
test_async_produce_lingers.skip_condition = lambda cls: RDKAFKA

def test_async_produce_thread_exception(self):
"""Ensure that an exception on a worker thread is raised to the main thread"""
Expand Down Expand Up @@ -380,8 +380,8 @@ def ensure_all_messages_consumed():
retry(ensure_all_messages_consumed, retry_time=15)


@patch_subclass(ProducerIntegrationTests, not RDKAFKA)
class TestRdKafkaProducer(unittest2.TestCase):
@pytest.mark.skipif(not RDKAFKA, reason="rdkafka")
class TestRdKafkaProducer(ProducerIntegrationTests):
USE_RDKAFKA = True


Expand Down
3 changes: 2 additions & 1 deletion tests/pykafka/test_simpleconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ def test_reset_offsets(self):

def test_update_cluster(self):
"""Check that the consumer can initiate cluster updates"""
if self.USE_RDKAFKA:
pytest.skip("Unresolved crashes")
with self._get_simple_consumer() as consumer:
self.assertIsNotNone(consumer.consume())

Expand All @@ -262,7 +264,6 @@ def test_update_cluster(self):
# If the fetcher thread fell over during the cluster update
# process, we'd get an exception here:
self.assertIsNotNone(consumer.consume())
test_update_cluster.skip_condition = lambda cls: RDKAFKA

def test_consumer_lag(self):
"""Ensure that after consuming the entire topic, lag is 0"""
Expand Down
8 changes: 4 additions & 4 deletions tests/pykafka/utils/test_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ def test_snappy_xerial(self):
decoded = compression.decode_snappy(encoded)
self.assertEqual(self.text, decoded)

@pytest.mark.skipif(platform.python_implementation() == "PyPy",
reason="PyPy fails to compress large messages with Snappy")
def test_snappy_large_payload(self):
if platform.python_implementation() == "PyPy":
pytest.skip("PyPy fails to compress large messages with Snappy")
payload = b''.join([uuid4().bytes for i in range(10)])
c = compression.encode_snappy(payload)
self.assertEqual(compression.decode_snappy(c), payload)
Expand All @@ -46,9 +46,9 @@ def test_lz4(self):
decoded = compression.decode_lz4(encoded)
self.assertEqual(self.text, decoded)

@pytest.mark.skipif(platform.python_implementation() == "PyPy",
reason="lz4f is currently unsupported with PyPy")
def test_lz4f(self):
if platform.python_implementation() == "PyPy":
pytest.skip("lz4f is currently unsupported with PyPy")
encoded = lz4f.compressFrame(self.text)
self.assertNotEqual(self.text, encoded)

Expand Down

0 comments on commit 23a898b

Please sign in to comment.