Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ksql-cli: When message with wrong format or blank line is published, ksql stops processing the entire stream/table altogether #314

Closed
hemalatha-amrutha opened this issue Sep 25, 2017 · 4 comments
Assignees

Comments

@hemalatha-amrutha
Copy link

hemalatha-amrutha commented Sep 25, 2017

If Any 1 message with wrong format or blank line is published, it stops processing the entire stream/table altogether. Unless the faulty entry is removed by topic recreation. On recreating the stream/table will only fix the issue, else Ksql keeps throwing exceptions as below.

Steps to reproduce:
Create a table from another stream which is of json format message. Publish a blank line to topic, observe the exception on ksql.

`Exception in thread "ksql_query_10-f8dae1f2-2fc1-4d1e-a8d2-927da88bbab7-StreamThread-67" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record. topic=beacons-input2, partition=0, offset=3000046
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:46)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.databind.JsonMappingException: No content to map due to end-of-input
at [Source: [B@7f3b5dc3; line: 1, column: 0]
Caused by: com.fasterxml.jackson.databind.JsonMappingException: No content to map due to end-of-input
at [Source: [B@7f3b5dc3; line: 1, column: 0]
at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:270)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3838)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2404)
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.getGenericRow(KsqlJsonDeserializer.java:74)
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:66)
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:37)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:56)
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)

`

@hjafarpour
Copy link
Contributor

@hemalatha-amrutha This has been the default behavior in Kafka Streams, however, in 1.0 release of Kafka we have the option of specifying the behavior as described in KIP-161 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers)
You should be able to set the "default.deserialization.exception.handler" config variable to the name of LogAndContinueExceptionHandler class using the SET statement.
Please let us know if you encounter any issues in using the above configuration with Kafka 1.0.

@hjafarpour hjafarpour self-assigned this Sep 25, 2017
@miguno
Copy link
Contributor

miguno commented Sep 28, 2017

@hjafarpour : We should evaluate whether, moving forward, we should change the default settings of KSQL in this regard.

@badhiraj
Copy link

Hi @hjafarpour , I am getting similar issues with
ksql> show streams;

Stream Name | Kafka Topic | Format

PAGEVIEWS | pageviews | JSON
ksql> select * from PAGEVIEWS;
Failed to deserialize value for record. topic=pageviews, partition=0, offset=7107
Query terminated

are they related. Sorry new to the community and the guidelines to ask questions.

@apurvam
Copy link
Contributor

apurvam commented Feb 14, 2018

I believe the new default for KSQL is to log error and continue when deserialization fails. Closing this out.

@apurvam apurvam closed this as completed Feb 14, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants