New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-5157: Options for handling corrupt data during deserialization #3423
KAFKA-5157: Options for handling corrupt data during deserialization #3423
Conversation
More tests coming, but basic structure should be in place. |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
@mjsax @dguy @bbejeck @guozhangwang this is the implementation for KIP-161. Have a look when you can. Thanks. |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch. Some minor comments.
@@ -239,6 +241,13 @@ | |||
private static final String STATE_DIR_DOC = "Directory location for state store."; | |||
|
|||
/** | |||
* {@code default.deserialization.exception.handler} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Can you please add this in alphabetic order. Thx.
@@ -339,6 +348,11 @@ | |||
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, | |||
Importance.MEDIUM, | |||
CommonClientConfigs.SECURITY_PROTOCOL_DOC) | |||
.define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit. As above. Alphabetic within the "medium" group.
@@ -339,6 +348,11 @@ | |||
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, | |||
Importance.MEDIUM, | |||
CommonClientConfigs.SECURITY_PROTOCOL_DOC) | |||
.define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, | |||
Type.CLASS, | |||
LogAndFailExceptionHandler.class.getName(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: remove getName()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They all have getName
. Should I remove all? Otherwise it will look inconsistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me. Not sure why we started with this annoying .getName()
in the first place...
private static final Logger log = LoggerFactory.getLogger(StreamThread.class); | ||
|
||
@Override | ||
public DeserializationHandlerResponse handle(ProcessorContext context, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: add final
} | ||
|
||
@Override | ||
public void configure(Map<String, ?> configs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
@@ -129,6 +135,55 @@ public void shouldProcessRecordsForOtherTopic() throws Exception { | |||
assertEquals(0, sourceOne.numReceived); | |||
} | |||
|
|||
@Test(expected = StreamsException.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not use expected
here but try { globalStateTask.update(...); fail(...): } catch(ignore) {}
pattern. Also, the ConsumerRecord()
should be instantiated outside/before the try-catch blog.
} | ||
|
||
|
||
@Test(expected = StreamsException.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
@Test(expected = StreamsException.class) | ||
public void shouldThrowOnNegativeTimestamp() { | ||
final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList( | ||
new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); | ||
|
||
final RecordQueue queue = new RecordQueue(new TopicPartition(topics[0], 1), | ||
new MockSourceNode<>(topics, intDeserializer, intDeserializer), | ||
new FailOnInvalidTimestamp()); | ||
new FailOnInvalidTimestamp(), new LogAndContinueExceptionHandler(), null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: please one argument per line.
@@ -158,7 +189,7 @@ public void shouldDropOnNegativeTimestamp() { | |||
|
|||
final RecordQueue queue = new RecordQueue(new TopicPartition(topics[0], 1), | |||
new MockSourceNode<>(topics, intDeserializer, intDeserializer), | |||
new LogAndSkipOnInvalidTimestamp()); | |||
new LogAndSkipOnInvalidTimestamp(), new LogAndContinueExceptionHandler(), null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above.
@@ -205,7 +206,7 @@ public ProcessorTopologyTestDriver(final StreamsConfig config, | |||
final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(globalTopology, globalConsumer, stateDirectory); | |||
globalStateTask = new GlobalStateUpdateTask(globalTopology, | |||
new GlobalProcessorContextImpl(config, stateManager, streamsMetrics, cache), | |||
stateManager | |||
stateManager, new LogAndContinueExceptionHandler() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guess you missed this one :)
…zation-exceptions
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
@dguy @guozhangwang any further comments? Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @enothereska - i've left a few comments
final Exception exception) { | ||
|
||
log.warn("Deserialization exception {}. Processor context is {} and record is {}", | ||
exception.toString(), context, record); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want the exception stacktrace?
final ConsumerRecord<byte[], byte[]> record, | ||
final Exception exception) { | ||
|
||
log.warn("Deserialization exception {}. Processor context is {} and record is {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
} catch (Exception e) { | ||
DeserializationExceptionHandler.DeserializationHandlerResponse response = | ||
deserializationExceptionHandler.handle(processorContext, rawRecord, e); | ||
if (response.id == DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL.id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just if(response == DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL)
?
try { | ||
return deserialize(rawRecord); | ||
} catch (Exception e) { | ||
DeserializationExceptionHandler.DeserializationHandlerResponse response = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: final
// catch and process if we have a deserialization handler | ||
try { | ||
return deserialize(rawRecord); | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a metric for records skipped due to deserialization errors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes good idea. I'll piggyback on this KIP.
final byte[] key, | ||
final byte[] recordValue, | ||
boolean failExpected) { | ||
ConsumerRecord record = new ConsumerRecord<>("t2", 1, 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: final
@@ -140,14 +142,45 @@ public void shouldThrowStreamsExceptionWhenValueDeserializationFails() throws Ex | |||
queue.addRawRecords(records); | |||
} | |||
|
|||
@Test | |||
public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() throws Exception { | |||
RecordQueue queue = new RecordQueue(new TopicPartition(topics[0], 1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe extract to a field as it is also used in the test below
public ConsumerRecord<Object, Object> tryDeserialize(final ProcessorContext processorContext, | ||
ConsumerRecord<byte[], byte[]> rawRecord) { | ||
|
||
if (deserializationExceptionHandler == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we assume and/or ensure that this is non-null? Then we won't need this check here and can just have a single path through this method. I think we default to LogAndFail...
if nothing is set - right?
…zation-exceptions
Refer to this link for build results (access rights to CI server needed): |
Thanks @dguy |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @enothereska. I've left one comment. I think the only other thing remaining is whether or not we should implement the pattern for setting state that @guozhangwang suggested
public DeserializationHandlerResponse handle(final ProcessorContext context, | ||
final ConsumerRecord<byte[], byte[]> record, | ||
final Exception exception) { | ||
StringWriter sWriter = new StringWriter(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can just pass the exception as the last parameter to i.e, log.warn(" p1: {} p2: {}", p1, p2, exception)
Also, just realised that ProcessorContextImpl
doesn't have a toString
. Perhaps we just need to log the taskId, topic, partition, and offset? i.e.,
log.warn("Exception caught during Deserialization, taskId: {}, topic:{}, partition:{}, offset:{}", context.taskId(), record.topic(), record.partition(), record.offset(), exception)
Same in the other exception handler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I pass just the exception, it reverts to exception.toString(). It won't print the stack trace. As a typical NPE example:
- with just exception passed:
java.lang.NullPointerException
- with stack trace:```
java.lang.NullPointerException
at org.rocksdb.RocksDB.get(RocksDB.java:791)
at org.apache.kafka.streams.state.internals.RocksDBStore.getInternal(RocksDBStore.java:235)
at org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:219)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.delete(ChangeLoggingKeyValueBytesStore.java:80)
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.delete(ChangeLoggingKeyValueStore.java:96)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add a toString
to ProcessorContextImpl
? That sounds useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I pass just the exception, it reverts to exception.toString(). It won't print the stack trace.
My guess is that you are adding a {}
for the exception, in which case it probably will just print the string. You don't need to add it. For example, from another one we have:
log.warn("{} Failed offset commits {}: ", logPrefix, consumedOffsetsAndMetadata, e)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having toString
on ProcessorContextImpl
might be useful, but it also might be too much information for this particular log message
Thanks @dguy I'm not sure what this refers to. I looked back but I don't see @guozhangwang 's comment. Could you elaborate? Thanks. |
@enothereska doh! I've got my PRs crossed! ignore me |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 more comment otherwise LGTM. @mjsax can you please make another pass over it?
@@ -80,8 +88,14 @@ public TopicPartition partition() { | |||
* @return the size of this queue | |||
*/ | |||
public int addRawRecords(Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) { | |||
ConsumerRecord<Object, Object> record = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any reason for this variable to not be declared inside the loop as it was previously
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments.
import org.apache.kafka.common.Configurable; | ||
import org.apache.kafka.streams.processor.ProcessorContext; | ||
|
||
public interface DeserializationExceptionHandler extends Configurable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have proper class JavaDocs for all public interfaces.
|
||
public interface DeserializationExceptionHandler extends Configurable { | ||
/** | ||
* Inspect a record and the exception received |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: .
missing at the end
public final short id; | ||
|
||
DeserializationHandlerResponse(int id, String name) { | ||
this.id = (short) id; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iIf we go with short
, the parameter should be short
, too?
/** the permanent and immutable id of an API--this can't change ever */ | ||
public final short id; | ||
|
||
DeserializationHandlerResponse(int id, String name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add final
final Exception exception) { | ||
|
||
log.warn("Exception caught during Deserialization, " + | ||
"taskId: {}, topic:{}, partition:{}, offset:{}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: some whitespaces are missing
final Exception exception) { | ||
|
||
log.warn("Exception caught during Deserialization, " + | ||
"taskId: {}, topic:{}, partition:{}, offset:{}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: fix indention and some whitespaces are missing
@@ -63,7 +56,7 @@ public GlobalStateUpdateTask(final ProcessorTopology topology, | |||
for (final String storeName : storeNames) { | |||
final String sourceTopic = storeNameToTopic.get(storeName); | |||
final SourceNode source = topology.source(sourceTopic); | |||
deserializers.put(sourceTopic, new SourceNodeAndDeserializer(source, new SourceNodeRecordDeserializer(source))); | |||
deserializers.put(sourceTopic, new SourceNodeRecordDeserializer(source, this.deserializationExceptionHandler)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: remove this
final DeserializationExceptionHandler.DeserializationHandlerResponse response = | ||
deserializationExceptionHandler.handle(processorContext, rawRecord, e); | ||
if (response == DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL) { | ||
throw e; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we wrap this with a StreamsException
? This way, we could add a detailed error message and explain in the msg text that uses can set a different exception handler etc (similar to timestamp extractor?) -- so users don't need to ask at the mailing list :)
@@ -205,7 +206,7 @@ public ProcessorTopologyTestDriver(final StreamsConfig config, | |||
final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(globalTopology, globalConsumer, stateDirectory); | |||
globalStateTask = new GlobalStateUpdateTask(globalTopology, | |||
new GlobalProcessorContextImpl(config, stateManager, streamsMetrics, cache), | |||
stateManager | |||
stateManager, new LogAndContinueExceptionHandler() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guess you missed this one :)
@@ -186,6 +187,8 @@ public NodeMetrics(StreamsMetrics metrics, String name, String sensorNamePrefix) | |||
this.nodeCreationSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "create", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); | |||
this.nodeDestructionSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "destroy", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); | |||
this.sourceNodeForwardSensor = metrics.addThroughputSensor(scope, sensorNamePrefix + "." + name, "forward", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); | |||
this.sourceNodeSkippedDueToDeserializationError = metrics.addThroughputSensor(scope, sensorNamePrefix + "." + name, "skippedDueToDeserializationError", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: remove empty line
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
…zation-exceptions
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
|
retest this please |
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
@dguy I don't have anything else to add to this. Fixing the QueryableState in separate PR. Thanks. Also I just merged with trunk. |
…zation-exceptions
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
@guozhangwang any more comments? |
retest this please |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @enothereska, LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments on this PR. Sorry for the late reply.
sourceNode.nodeMetrics.sourceNodeSkippedDueToDeserializationError.record(); | ||
} | ||
} | ||
return null; |
This is the implementation of KIP-161: https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers