Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ddtrace/contrib/kafka/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ def __bool__(self):


class TracedConsumer(confluent_kafka.Consumer):
def __init__(self, config):
super(TracedConsumer, self).__init__(config)
def __init__(self, config, *args, **kwargs):
super(TracedConsumer, self).__init__(config, *args, **kwargs)
self._group_id = config["group.id"]

def poll(self, timeout=1):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
kafka: Fixes ``TypeError` raised when artibrary keyword arguments are passed to ``confluent_kafka.Consumer``
17 changes: 17 additions & 0 deletions tests/contrib/kafka/test_kafka.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import logging

import confluent_kafka
import pytest
import six
Expand Down Expand Up @@ -63,6 +65,21 @@ def consumer(tracer):
_consumer.close()


def test_consumer_created_with_logger_does_not_raise(tracer):
"""Test that adding a logger to a Consumer init does not raise any errors."""
logger = logging.getLogger()
# regression test for DataDog/dd-trace-py/issues/5873
consumer = confluent_kafka.Consumer(
{
"bootstrap.servers": BOOTSTRAP_SERVERS,
"group.id": GROUP_ID,
"auto.offset.reset": "earliest",
},
logger=logger,
)
consumer.close()


@pytest.mark.parametrize("tombstone", [False, True])
@pytest.mark.snapshot(ignores=["metrics.kafka.message_offset"])
def test_message(producer, consumer, tombstone):
Expand Down