New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cassandra sink - resiliency #85

Closed
paddyvishnubhatt opened this Issue Nov 27, 2016 · 1 comment

Comments

Projects
None yet
2 participants
@paddyvishnubhatt

paddyvishnubhatt commented Nov 27, 2016

Seeing this error and failing to recover/restart.

The cause is most likely due to bad messages coming into kafka from the source - which could be difficult to check for every message.

Is there any thought/likelihood-if-so to build some resiliency e.g. w/ a -k flag to keep continuing inspite of errors and maintain such error counts in a management/oamp interface?

(or if there is a workaround to restart and continue?)

org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:357)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1
Caused by: java.lang.ArrayIndexOutOfBoundsException: -35
at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:402)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
2016-11-27 00:54:34,684] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)
[2016-11-27 00:54:34,684] INFO Stopping Cassandra sink. (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask:95)
[2016-11-27 00:54:34,685] INFO Shutting down Cassandra driver session and cluster. (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter:166)

@stheppi

This comment has been minimized.

Show comment
Hide comment
@stheppi

stheppi Nov 27, 2016

Collaborator

This error has nothitng to do with the resilience and recoverability of the sink. The error happens before the code execution is handed over to the sink code, in the conversion from the raw bytes to an avro record and then to a Struct .
How do you create the records? Pushing avro records and not working together with the schema registry won't work.
if you want to do what you said you will have to provide your own avroconverter. Look at io.confluent.connect.avro.AvroConverter. However I wouldn't advise something like that. Such an error should be caught and understood the reason. if you work with avro serializer (confluent implementation) + schema registry you should never end up with a producer sending bad records

Collaborator

stheppi commented Nov 27, 2016

This error has nothitng to do with the resilience and recoverability of the sink. The error happens before the code execution is handed over to the sink code, in the conversion from the raw bytes to an avro record and then to a Struct .
How do you create the records? Pushing avro records and not working together with the schema registry won't work.
if you want to do what you said you will have to provide your own avroconverter. Look at io.confluent.connect.avro.AvroConverter. However I wouldn't advise something like that. Such an error should be caught and understood the reason. if you work with avro serializer (confluent implementation) + schema registry you should never end up with a producer sending bad records

@stheppi stheppi closed this Nov 27, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment