Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion ddtrace/contrib/kafka/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,12 @@ def traced_poll(func, instance, args, kwargs):
):
span.set_tag_str(kafkax.MESSAGE_KEY, message_key)
span.set_tag(kafkax.PARTITION, message.partition())
span.set_tag_str(kafkax.TOMBSTONE, str(len(message) == 0))
is_tombstone = False
try:
is_tombstone = len(message) == 0
except TypeError: # https://github.com/confluentinc/confluent-kafka-python/issues/1192
pass
span.set_tag_str(kafkax.TOMBSTONE, str(is_tombstone))
span.set_tag(kafkax.MESSAGE_OFFSET, message_offset)
span.set_tag(SPAN_MEASURED_KEY)
rate = config.kafka.get_analytics_sample_rate()
Expand Down
5 changes: 5 additions & 0 deletions releasenotes/notes/kafka-typeerror-b39a35c5338b05aa.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
fixes:
- |
kafka: This fix resolves an issue where the use of a Kafka ``DeserializingConsumer`` could result in
a crash when the deserializer in use returns a type without a ``__len__`` attribute.
2 changes: 1 addition & 1 deletion tests/contrib/kafka/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ def json_deserializer(as_bytes, ctx):
try:
return json.loads(as_bytes)
except json.decoder.JSONDecodeError:
return as_bytes
return # return a type that has no __len__ because such types caused a crash at one point

conf = {
"bootstrap.servers": BOOTSTRAP_SERVERS,
Expand Down