New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS connector issue #51

Closed
amitmalagi opened this Issue Apr 20, 2016 · 7 comments

Comments

Projects
None yet
7 participants
@amitmalagi

amitmalagi commented Apr 20, 2016

HDFS connector throws this exception and halts execution even if one message in a kafka topic is erroneous. I found that kafka-avro-console-consumer continues with processing of subsequent messages when --skip-message-on-error option is used. Can the same behaviour be provided in HDFS connector?

Thanks.

Exception in thread "WorkerSinkTask-hdfs-sink-0" org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:266)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 61
Caused by: java.io.EOFException
at org.apache.avro.io.BinaryDecoder$ByteArrayByteSource.readRaw(BinaryDecoder.java:944)
at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:349)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:130)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:191)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:130)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:99)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:266)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175)

@jcustenborder

This comment has been minimized.

Show comment
Hide comment
@jcustenborder

jcustenborder Apr 20, 2016

Member

Hi @amitmalagi I'm not sure we want to skip records on error. Do you have a good idea of how the invalid record entered the topic? What are you using to produce to the topic?

Member

jcustenborder commented Apr 20, 2016

Hi @amitmalagi I'm not sure we want to skip records on error. Do you have a good idea of how the invalid record entered the topic? What are you using to produce to the topic?

@amitmalagi

This comment has been minimized.

Show comment
Hide comment
@amitmalagi

amitmalagi Apr 21, 2016

Hello @jcustenborder, The producer is a node.js application. I am using 'kafka-node' module to post messages to kafka and 'avsc' module to encode messages into arvo format. In some cases, if the message size is bigger than the allocated buffer size used for avro encoding, the resulting record would be invalid.

amitmalagi commented Apr 21, 2016

Hello @jcustenborder, The producer is a node.js application. I am using 'kafka-node' module to post messages to kafka and 'avsc' module to encode messages into arvo format. In some cases, if the message size is bigger than the allocated buffer size used for avro encoding, the resulting record would be invalid.

@tony-lijinwen

This comment has been minimized.

Show comment
Hide comment
@tony-lijinwen

tony-lijinwen Aug 3, 2016

@amitmalagi, I also encountered the similar issue, did you resolve it?

tony-lijinwen commented Aug 3, 2016

@amitmalagi, I also encountered the similar issue, did you resolve it?

@amitmalagi

This comment has been minimized.

Show comment
Hide comment
@amitmalagi

amitmalagi Aug 4, 2016

@tony-lijinwen, I addressed this issue in our producer application.

amitmalagi commented Aug 4, 2016

@tony-lijinwen, I addressed this issue in our producer application.

@renukaradhya

This comment has been minimized.

Show comment
Hide comment
@renukaradhya

renukaradhya Sep 26, 2016

Even I am facing same issue. Please share the fix for the same.

[2016-09-26 13:51:42,569] INFO WorkerSinkTask{id=elasticsearch-schema-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2016-09-26 13:52:34,437] INFO WorkerSinkTask{id=elasticsearch-schema-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2016-09-26 13:52:34,446] ERROR Task elasticsearch-schema-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:357)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

renukaradhya commented Sep 26, 2016

Even I am facing same issue. Please share the fix for the same.

[2016-09-26 13:51:42,569] INFO WorkerSinkTask{id=elasticsearch-schema-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2016-09-26 13:52:34,437] INFO WorkerSinkTask{id=elasticsearch-schema-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2016-09-26 13:52:34,446] ERROR Task elasticsearch-schema-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:357)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

@Tseretyan

This comment has been minimized.

Show comment
Hide comment
@Tseretyan

Tseretyan Sep 26, 2016

That error occurrs because your producer application sends messages that are not compatible with consumer. The confluent producer/consumer use some specific format of messages: first byte is 0, then 4 bytes - Id of schema in schema registry (The schema must be registered for the same topic as messages are sent to) and after that - message itself.

Tseretyan commented Sep 26, 2016

That error occurrs because your producer application sends messages that are not compatible with consumer. The confluent producer/consumer use some specific format of messages: first byte is 0, then 4 bytes - Id of schema in schema registry (The schema must be registered for the same topic as messages are sent to) and after that - message itself.

@cotedm

This comment has been minimized.

Show comment
Hide comment
@cotedm

cotedm Jan 9, 2017

Contributor

@Tseretyan is correct and this format is documented here. I'm not seeing anything outstanding here so closing this out.

Contributor

cotedm commented Jan 9, 2017

@Tseretyan is correct and this format is documented here. I'm not seeing anything outstanding here so closing this out.

@cotedm cotedm closed this Jan 9, 2017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment