diff --git a/ddtrace/contrib/kafka/patch.py b/ddtrace/contrib/kafka/patch.py index 36303361ae6..677bc612693 100644 --- a/ddtrace/contrib/kafka/patch.py +++ b/ddtrace/contrib/kafka/patch.py @@ -39,8 +39,8 @@ def __bool__(self): class TracedConsumer(confluent_kafka.Consumer): - def __init__(self, config): - super(TracedConsumer, self).__init__(config) + def __init__(self, config, *args, **kwargs): + super(TracedConsumer, self).__init__(config, *args, **kwargs) self._group_id = config["group.id"] def poll(self, timeout=1): diff --git a/releasenotes/notes/pass-args-kwargs-to-kafka-f3ca8128a2c4d612.yaml b/releasenotes/notes/pass-args-kwargs-to-kafka-f3ca8128a2c4d612.yaml new file mode 100644 index 00000000000..a3402d09293 --- /dev/null +++ b/releasenotes/notes/pass-args-kwargs-to-kafka-f3ca8128a2c4d612.yaml @@ -0,0 +1,4 @@ +--- +fixes: + - | + kafka: Fixes ``TypeError` raised when artibrary keyword arguments are passed to ``confluent_kafka.Consumer`` diff --git a/tests/contrib/kafka/test_kafka.py b/tests/contrib/kafka/test_kafka.py index e2c26f4159e..637b6c8aab3 100644 --- a/tests/contrib/kafka/test_kafka.py +++ b/tests/contrib/kafka/test_kafka.py @@ -1,3 +1,5 @@ +import logging + import confluent_kafka import pytest import six @@ -63,6 +65,21 @@ def consumer(tracer): _consumer.close() +def test_consumer_created_with_logger_does_not_raise(tracer): + """Test that adding a logger to a Consumer init does not raise any errors.""" + logger = logging.getLogger() + # regression test for DataDog/dd-trace-py/issues/5873 + consumer = confluent_kafka.Consumer( + { + "bootstrap.servers": BOOTSTRAP_SERVERS, + "group.id": GROUP_ID, + "auto.offset.reset": "earliest", + }, + logger=logger, + ) + consumer.close() + + @pytest.mark.parametrize("tombstone", [False, True]) @pytest.mark.snapshot(ignores=["metrics.kafka.message_offset"]) def test_message(producer, consumer, tombstone):