Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions confluent_kafka/deserializing_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,23 +128,23 @@ def poll(self, timeout=-1):
return None

if msg.error() is not None:
raise ConsumeError(msg.error(), message=msg)
raise ConsumeError(msg.error(), kafka_message=msg)

ctx = SerializationContext(msg.topic(), MessageField.VALUE)
value = msg.value()
if self._value_deserializer is not None:
try:
value = self._value_deserializer(value, ctx)
except Exception as se:
raise ValueDeserializationError(exception=se, message=msg)
raise ValueDeserializationError(exception=se, kafka_message=msg)

key = msg.key()
ctx.field = MessageField.KEY
if self._key_deserializer is not None:
try:
key = self._key_deserializer(key, ctx)
except Exception as se:
raise KeyDeserializationError(exception=se, message=msg)
raise KeyDeserializationError(exception=se, kafka_message=msg)

msg.set_key(key)
msg.set_value(value)
Expand Down
103 changes: 55 additions & 48 deletions confluent_kafka/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,56 @@
# limitations under the License.
#
from confluent_kafka.cimpl import KafkaException, KafkaError

from confluent_kafka.serialization import SerializationError


class ConsumeError(KafkaException):
class _KafkaClientError(KafkaException):
"""
Wraps all errors encountered during the consumption of a message.

Note:
In the event of a serialization error the original message contents
may be retrieved from the ``message`` attribute.
Wraps all errors encountered by a Kafka Client

Args:
error_code (KafkaError): Error code indicating the type of error.
kafka_error (KafkaError): KafkaError instance.

exception(Exception, optional): The original exception

message (Message, optional): The Kafka Message returned from the broker.

kafka_message (Message, optional): The Kafka Message returned
by the broker.
"""
def __init__(self, error_code, exception=None, message=None):
if exception is not None:
kafka_error = KafkaError(error_code, repr(exception))
self.exception = exception
else:
kafka_error = KafkaError(error_code)
self.exception = None

super(ConsumeError, self).__init__(kafka_error)
self.message = message
def __init__(self, kafka_error, exception=None, kafka_message=None):
super(_KafkaClientError, self).__init__(kafka_error)
self.exception = exception
self.kafka_message = kafka_message

@property
def code(self):
return self.code()
return self.args[0].code()

@property
def name(self):
return self.name()
return self.args[0].name()


class ConsumeError(_KafkaClientError):
"""
Wraps all errors encountered during the consumption of a message.

Note:
In the event of a serialization error the original message
contents may be retrieved from the ``kafka_message`` attribute.

Args:
kafka_error (KafkaError): KafkaError instance.

exception(Exception, optional): The original exception

kafka_message (Message, optional): The Kafka Message
returned by the broker.

"""

def __init__(self, kafka_error, exception=None, kafka_message=None):
super(ConsumeError, self).__init__(kafka_error, exception, kafka_message)


class KeyDeserializationError(ConsumeError, SerializationError):
Expand All @@ -64,12 +76,15 @@ class KeyDeserializationError(ConsumeError, SerializationError):
Args:
exception(Exception, optional): The original exception

message (Message, optional): The Kafka Message returned from the broker.
kafka_message (Message, optional): The Kafka Message returned
by the broker.

"""
def __init__(self, exception=None, message=None):

def __init__(self, exception=None, kafka_message=None):
super(KeyDeserializationError, self).__init__(
KafkaError._KEY_DESERIALIZATION, exception=exception, message=message)
KafkaError(KafkaError._KEY_DESERIALIZATION, str(exception)),
exception=exception, kafka_message=kafka_message)


class ValueDeserializationError(ConsumeError, SerializationError):
Expand All @@ -80,41 +95,29 @@ class ValueDeserializationError(ConsumeError, SerializationError):
Args:
exception(Exception, optional): The original exception

message (Message, optional): The Kafka Message returned from the broker.
kafka_message (Message, optional): The Kafka Message returned
by the broker.

"""
def __init__(self, exception=None, message=None):

def __init__(self, exception=None, kafka_message=None):
super(ValueDeserializationError, self).__init__(
KafkaError._VALUE_DESERIALIZATION, exception=exception, message=message)
KafkaError(KafkaError._VALUE_DESERIALIZATION, str(exception)),
exception=exception, kafka_message=kafka_message)


class ProduceError(KafkaException):
class ProduceError(_KafkaClientError):
"""
Wraps all errors encountered when Producing messages.

Args:
error_code (KafkaError): Error code indicating the type of error.
kafka_error (KafkaError): KafkaError instance.

exception(Exception, optional): The original exception.

"""
def __init__(self, error_code, exception=None):
if exception is not None:
kafka_error = KafkaError(error_code, repr(exception))
self.exception = exception
else:
kafka_error = KafkaError(error_code)
self.exception = None

super(ProduceError, self).__init__(kafka_error)

@property
def code(self):
return self.code()

@property
def name(self):
return self.name()
def __init__(self, kafka_error, exception=None):
super(ProduceError, self).__init__(kafka_error, exception, None)


class KeySerializationError(ProduceError, SerializationError):
Expand All @@ -124,9 +127,11 @@ class KeySerializationError(ProduceError, SerializationError):
Args:
exception (Exception): The exception that occurred during serialization.
"""

def __init__(self, exception=None):
super(KeySerializationError, self).__init__(
KafkaError._KEY_SERIALIZATION, exception=exception)
KafkaError(KafkaError._KEY_SERIALIZATION, str(exception)),
exception=exception)


class ValueSerializationError(ProduceError, SerializationError):
Expand All @@ -136,6 +141,8 @@ class ValueSerializationError(ProduceError, SerializationError):
Args:
exception (Exception): The exception that occurred during serialization.
"""

def __init__(self, exception=None):
super(ValueSerializationError, self).__init__(
KafkaError._VALUE_SERIALIZATION, exception=exception)
KafkaError(KafkaError._VALUE_SERIALIZATION, str(exception)),
exception=exception)
44 changes: 44 additions & 0 deletions tests/integration/consumer/test_consumer_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limit
#

import pytest
from confluent_kafka.cimpl import TopicPartition, OFFSET_END

from confluent_kafka.error import ConsumeError
from confluent_kafka.serialization import StringSerializer


def test_consume_error(kafka_cluster):
"""
Tests to ensure librdkafka errors are propagated as
an instance of ConsumeError.
"""
topic = kafka_cluster.create_topic("test_commit_transaction")
consumer_conf = {'enable.partition.eof': True}

producer = kafka_cluster.producer()
producer.produce(topic=topic, value="a")
producer.flush()

consumer = kafka_cluster.consumer(consumer_conf,
value_deserializer=StringSerializer())
consumer.assign([TopicPartition(topic, 0, OFFSET_END)])

with pytest.raises(ConsumeError, match="No more messages"):
# Trigger EOF error
consumer.poll()
71 changes: 71 additions & 0 deletions tests/test_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limit
#

from confluent_kafka.cimpl import KafkaError
from confluent_kafka.error import ConsumeError, \
ProduceError


def test_new_consume_error_constant():
ce = ConsumeError(KafkaError(KafkaError._PARTITION_EOF))

assert ce.code == KafkaError._PARTITION_EOF
assert ce.name == u'_PARTITION_EOF'


def test_new_consume_error_caused_by():
ce = ConsumeError(KafkaError(KafkaError.INVALID_CONFIG),
exception=ValueError())

assert ce.code == KafkaError.INVALID_CONFIG
assert ce.name == u'INVALID_CONFIG'
assert isinstance(ce.exception, ValueError)


def test_new_consume_error_custom_message():
ce = ConsumeError(KafkaError(KafkaError._KEY_SERIALIZATION,
"Unable to serialize key"))

assert ce.code == KafkaError._KEY_SERIALIZATION
assert ce.name == u'_KEY_SERIALIZATION'
assert ce.args[0].str() == "Unable to serialize key"


def test_new_produce_error_constant():
pe = ProduceError(KafkaError(KafkaError._PARTITION_EOF))

assert pe.code == KafkaError._PARTITION_EOF
assert pe.name == u'_PARTITION_EOF'


def test_new_produce_error_caused_by():
pe = ProduceError(KafkaError(KafkaError.INVALID_CONFIG),
exception=ValueError())

assert pe.code == KafkaError.INVALID_CONFIG
assert pe.name == u'INVALID_CONFIG'
assert isinstance(pe.exception, ValueError)


def test_new_produce_error_custom_message():
pe = ProduceError(KafkaError(KafkaError._KEY_SERIALIZATION,
"Unable to serialize key"))

assert pe.code == KafkaError._KEY_SERIALIZATION
assert pe.name == u'_KEY_SERIALIZATION'
assert pe.args[0].str() == "Unable to serialize key"