Skip to content
This repository was archived by the owner on Jun 13, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions epsagon/modules/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ def _wrapper(wrapped, instance, args, kwargs):
"""KafkaProducer.send wrapper"""
new_args, new_kwargs = _parse_args(*args, **kwargs)

# Adds epsagon header
if not new_kwargs.get('headers'):
new_kwargs['headers'] = []
new_kwargs['headers'].append(
(EPSAGON_HEADER, get_epsagon_http_trace_id().encode())
)
# Adds epsagon header only on Kafka record V2. V0/V1 don't support it
# pylint: disable=protected-access
if instance._max_usable_produce_magic() == 2:
if not new_kwargs.get('headers'):
new_kwargs['headers'] = []
new_kwargs['headers'].append(
(EPSAGON_HEADER, get_epsagon_http_trace_id().encode())
)

return wrapper(KafkaEventFactory, wrapped, instance, new_args, new_kwargs)

Expand Down
35 changes: 33 additions & 2 deletions tests/events/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@

TEST_URL = 'https://example.test/'

def test(*args, **kwargs):
def record_mock(*args, **kwargs):
return [{}, False, False]


@mock.patch('epsagon.trace.TraceFactory.add_event')
@mock.patch('kafka.producer.kafka.KafkaProducer._wait_on_metadata')
@mock.patch('kafka.producer.kafka.KafkaProducer._partition')
@mock.patch('kafka.producer.record_accumulator.RecordAccumulator.append', side_effect=test)
@mock.patch('kafka.producer.record_accumulator.RecordAccumulator.append', side_effect=record_mock)
def test_sanity(append_mock, partition_mock, wait_on_metadata_mock, add_event_mock):
retval = 'success'
body = {'test': 1}
Expand Down Expand Up @@ -49,3 +49,34 @@ def wrapped_function():
epsagon.constants.EPSAGON_HEADER in
event.resource['metadata']['messaging.headers']
)


@mock.patch('epsagon.trace.TraceFactory.add_event')
@mock.patch('kafka.producer.kafka.KafkaProducer._wait_on_metadata')
@mock.patch('kafka.producer.kafka.KafkaProducer._partition')
@mock.patch('kafka.producer.record_accumulator.RecordAccumulator.append', side_effect=record_mock)
def test_no_header_injection(append_mock, partition_mock, wait_on_metadata_mock, add_event_mock):
# Verify header is not injected in older kafka api versions (V1)
retval = 'success'
body = {'test': 1}

@epsagon.wrappers.python_function.python_wrapper
def wrapped_function():
producer = KafkaProducer(
bootstrap_servers=['host:10'],
client_id='test_client_id',
api_version=(0, 10, 0),
value_serializer=lambda x: json.dumps(x).encode('ascii'),
)
response = producer.send('topic', body)
return retval
assert wrapped_function() == retval
wait_on_metadata_mock.assert_called()
partition_mock.assert_called()
append_mock.assert_called()
add_event_mock.assert_called()
event = add_event_mock.call_args_list[0].args[0]
assert (
epsagon.constants.EPSAGON_HEADER not in
event.resource['metadata']
)