Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion integration-test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ requests
flask
confluent-kafka
msgpack
sentry-sdk>=0.2.0
pytest-rerunfailures
pytest-xdist
git+https://github.com/untitaker/pytest-sentry#egg=pytest-sentry
2 changes: 1 addition & 1 deletion tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from .fixtures.haproxy import haproxy # noqa
from .fixtures.mini_sentry import mini_sentry # noqa
from .fixtures.relay import relay # noqa
from .fixtures.processing import kafka_consumer, relay_with_processing # noqa
from .fixtures.processing import kafka_consumer, get_topic_name, processing_config, relay_with_processing, events_consumer, outcomes_consumer # noqa

@pytest.fixture
def random_port():
Expand Down
1 change: 0 additions & 1 deletion tests/integration/fixtures/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import time

import requests
import sentry_sdk

session = requests.session()

Expand Down
199 changes: 112 additions & 87 deletions tests/integration/fixtures/processing.py
Original file line number Diff line number Diff line change
@@ -1,130 +1,101 @@
import json
import msgpack

import pytest
import os
import confluent_kafka as kafka
from copy import deepcopy
from typing import Optional

from confluent_kafka.admin import AdminClient

topic_names = {
"events": "test-ingest-events",
"attachments": "test-ingest-attachments",
"transactions": "test-ingest-transactions",
"outcomes": "test-event-outcomes",
}

@pytest.fixture
def get_topic_name(worker_id):
"""
Generate a unique topic name for each test
"""

def _get_topic_name(topic: str, test_name: Optional[str]):
topic_name = topic_names.get(topic)
if topic_name is None:
raise ValueError("Invalid topic_name specified, check topic_names dict for accepted topic names.", topic)
if test_name is None:
return topic_name
else:
return "{}--{}".format(topic_name, test_name)
return lambda topic: f"semaphore-test-{topic}-{worker_id}"


def _processing_config(test_name: Optional[str], options=None):
@pytest.fixture
def processing_config(get_topic_name):
"""
Returns a minimal configuration for setting up a relay capable of processing
:param options: initial options to be merged
:return: the altered options
"""
# The Travis script sets the kafka bootstrap server into system environment variable.
bootstrap_servers = os.environ.get('KAFKA_BOOTSTRAP_SERVER', '127.0.0.1:9092')

options = deepcopy(options) # avoid lateral effects

if options is None:
options = {}
if options.get('processing') is None:
options['processing'] = {}
processing = options['processing']
processing['enabled'] = True
if processing.get('kafka_config') is None:
processing['kafka_config'] = [
{'name': 'bootstrap.servers', 'value': bootstrap_servers},
# {'name': 'batch.size', 'value': '0'} # do not batch messages
]
if processing.get('topics') is None:
processing['topics'] = {
'events': _get_topic_name("events", test_name),
'attachments': _get_topic_name("attachments", test_name),
'transactions': _get_topic_name("transactions", test_name),
'outcomes': _get_topic_name("outcomes", test_name),
}
def inner(options=None):
# The Travis script sets the kafka bootstrap server into system environment variable.
bootstrap_servers = os.environ.get('KAFKA_BOOTSTRAP_SERVER', '127.0.0.1:9092')

options = deepcopy(options) # avoid lateral effects

if options is None:
options = {}
if options.get('processing') is None:
options['processing'] = {}
processing = options['processing']
processing['enabled'] = True
if processing.get('kafka_config') is None:
processing['kafka_config'] = [
{'name': 'bootstrap.servers', 'value': bootstrap_servers},
# {'name': 'batch.size', 'value': '0'} # do not batch messages
]
if processing.get('topics') is None:
processing['topics'] = {
'events': get_topic_name("events"),
'attachments': get_topic_name("attachments"),
'transactions': get_topic_name("transactions"),
'outcomes': get_topic_name("outcomes"),
}

if not processing.get('redis'):
processing['redis'] = 'redis://127.0.0.1'
return options

return inner


if not processing.get('redis'):
processing['redis'] = 'redis://127.0.0.1'
return options
def _sync_wait_on_result(futures_dict):
"""
Synchronously waits on all futures returned by the admin_client api.
:param futures_dict: the api returns a dict of futures that can be awaited
"""
# just wait on all futures returned by the async operations of the admin_client
for f in futures_dict.values():
f.result(5) # wait up to 5 seconds for the admin operation to finish


@pytest.fixture
def relay_with_processing(relay, mini_sentry, request):
def relay_with_processing(relay, mini_sentry, processing_config, get_topic_name):
"""
Creates a fixture that configures a relay with processing enabled and that forwards
requests to the test ingestion topics
"""

def inner(options=None):
test_name = request.node.name
options = _processing_config(test_name, options)
admin = _KafkaAdminWrapper(request, options)
admin.delete_events_topic()

return relay(mini_sentry, options=options)

return inner


class _KafkaAdminWrapper:
def __init__(self, request, options):
self.test_name = request.node.name
self.options = options
options = processing_config(options)

kafka_config = {}
for elm in options['processing']['kafka_config']:
kafka_config[elm['name']] = elm['value']

self.admin_client = AdminClient(kafka_config)

def delete_events_topic(self):
self._delete_topic("events")

def delete_outcomes_topic(self):
self._delete_topic("outcomes")

def _delete_topic(self, topic):
topic_name = _get_topic_name(topic, self.test_name)
try:
futures_dict = self.admin_client.delete_topics([topic_name])
self._sync_wait_on_result(futures_dict)
except Exception: # noqa
pass # noqa nothing to do (probably there was no topic to start with)
return relay(mini_sentry, options=options)

def _sync_wait_on_result(self, futures_dict):
"""
Synchronously waits on all futures returned by the admin_client api.
:param futures_dict: the api returns a dict of futures that can be awaited
"""
# just wait on all futures returned by the async operations of the admin_client
for f in futures_dict.values():
f.result(5) # wait up to 5 seconds for the admin operation to finish
return inner



@pytest.fixture
def kafka_consumer(request):
def kafka_consumer(request, get_topic_name, processing_config):
"""
Creates a fixture that, when called, returns an already subscribed kafka consumer.
"""

def inner(topic: str, options=None):
test_name = request.node.name
topics = [_get_topic_name(topic, test_name)]
options = _processing_config(test_name, options)
topics = [get_topic_name(topic)]
options = processing_config(options)
# look for the servers (it is the only config we are interested in)
servers = [elm['value'] for elm in options['processing']['kafka_config'] if elm['name'] == 'bootstrap.servers']
servers= [elm['value'] for elm in options['processing']['kafka_config'] if elm['name'] == 'bootstrap.servers']
if len(servers) < 1:
raise ValueError("Bad kafka_config, could not find 'bootstrap.servers'.\n"
"The configuration should have an entry of the format \n"
Expand All @@ -141,7 +112,61 @@ def inner(topic: str, options=None):

consumer = kafka.Consumer(settings)
consumer.subscribe(topics)
request.addfinalizer(consumer.unsubscribe)

while consumer.poll(timeout=0.1) is not None:
pass


return consumer

return inner


class ConsumerBase(object):
# First poll takes forever, the next ones are fast
timeout = 20

def poll(self):
rv = self.consumer.poll(timeout=self.timeout)
self.timeout = 5
return rv


@pytest.fixture
def outcomes_consumer(kafka_consumer):
return lambda: OutcomesConsumer(kafka_consumer("outcomes"))


class OutcomesConsumer(ConsumerBase):
def __init__(self, consumer):
self.consumer = consumer

def get_outcome(self):
outcome = self.poll()
assert outcome is not None
assert outcome.error() is None
return json.loads(outcome.value())

def assert_rate_limited(self):
outcome = self.get_outcome()
assert outcome['outcome'] == 2, outcome


@pytest.fixture
def events_consumer(kafka_consumer):
return lambda: EventsConsumer(kafka_consumer("events"))


class EventsConsumer(ConsumerBase):
def __init__(self, consumer):
self.consumer = consumer

def get_event(self):
event = self.poll()
assert event is not None
assert event.error() is None

v = msgpack.unpackb(event.value(), raw=False, use_list=False)
assert v['ty'][0] == 0, v['ty'] # KafkaMessageType::Event
return json.loads(v['payload'].decode("utf8")), v
Loading