From d8a5f591cda3822a8347674c644b2bdd7ed0e172 Mon Sep 17 00:00:00 2001 From: ranrib Date: Wed, 27 Jan 2021 09:33:34 +0200 Subject: [PATCH] fix(kafka.py): avoid setting headers in Kafka V0/V1 --- epsagon/modules/kafka.py | 14 ++++++++------ tests/events/test_kafka.py | 35 +++++++++++++++++++++++++++++++++-- 2 files changed, 41 insertions(+), 8 deletions(-) diff --git a/epsagon/modules/kafka.py b/epsagon/modules/kafka.py index e44a4565..dd4d4ce9 100644 --- a/epsagon/modules/kafka.py +++ b/epsagon/modules/kafka.py @@ -32,12 +32,14 @@ def _wrapper(wrapped, instance, args, kwargs): """KafkaProducer.send wrapper""" new_args, new_kwargs = _parse_args(*args, **kwargs) - # Adds epsagon header - if not new_kwargs.get('headers'): - new_kwargs['headers'] = [] - new_kwargs['headers'].append( - (EPSAGON_HEADER, get_epsagon_http_trace_id().encode()) - ) + # Adds epsagon header only on Kafka record V2. V0/V1 don't support it + # pylint: disable=protected-access + if instance._max_usable_produce_magic() == 2: + if not new_kwargs.get('headers'): + new_kwargs['headers'] = [] + new_kwargs['headers'].append( + (EPSAGON_HEADER, get_epsagon_http_trace_id().encode()) + ) return wrapper(KafkaEventFactory, wrapped, instance, new_args, new_kwargs) diff --git a/tests/events/test_kafka.py b/tests/events/test_kafka.py index dd146c6f..614ebbfc 100644 --- a/tests/events/test_kafka.py +++ b/tests/events/test_kafka.py @@ -7,14 +7,14 @@ TEST_URL = 'https://example.test/' -def test(*args, **kwargs): +def record_mock(*args, **kwargs): return [{}, False, False] @mock.patch('epsagon.trace.TraceFactory.add_event') @mock.patch('kafka.producer.kafka.KafkaProducer._wait_on_metadata') @mock.patch('kafka.producer.kafka.KafkaProducer._partition') -@mock.patch('kafka.producer.record_accumulator.RecordAccumulator.append', side_effect=test) +@mock.patch('kafka.producer.record_accumulator.RecordAccumulator.append', side_effect=record_mock) def test_sanity(append_mock, partition_mock, wait_on_metadata_mock, add_event_mock): retval = 'success' body = {'test': 1} @@ -49,3 +49,34 @@ def wrapped_function(): epsagon.constants.EPSAGON_HEADER in event.resource['metadata']['messaging.headers'] ) + + +@mock.patch('epsagon.trace.TraceFactory.add_event') +@mock.patch('kafka.producer.kafka.KafkaProducer._wait_on_metadata') +@mock.patch('kafka.producer.kafka.KafkaProducer._partition') +@mock.patch('kafka.producer.record_accumulator.RecordAccumulator.append', side_effect=record_mock) +def test_no_header_injection(append_mock, partition_mock, wait_on_metadata_mock, add_event_mock): + # Verify header is not injected in older kafka api versions (V1) + retval = 'success' + body = {'test': 1} + + @epsagon.wrappers.python_function.python_wrapper + def wrapped_function(): + producer = KafkaProducer( + bootstrap_servers=['host:10'], + client_id='test_client_id', + api_version=(0, 10, 0), + value_serializer=lambda x: json.dumps(x).encode('ascii'), + ) + response = producer.send('topic', body) + return retval + assert wrapped_function() == retval + wait_on_metadata_mock.assert_called() + partition_mock.assert_called() + append_mock.assert_called() + add_event_mock.assert_called() + event = add_event_mock.call_args_list[0].args[0] + assert ( + epsagon.constants.EPSAGON_HEADER not in + event.resource['metadata'] + )