Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 sink crashes on null-valued tombstone messages. #42

Closed
ldcasillas-progreso opened this issue May 4, 2017 · 14 comments
Closed

S3 sink crashes on null-valued tombstone messages. #42

ldcasillas-progreso opened this issue May 4, 2017 · 14 comments

Comments

@ldcasillas-progreso
Copy link

If we try to use the Confluent S3 sink (with the Avro Converter) to save a topic that uses null-valued messages as tombstones for log compaction, we get exceptions like the following. These exceptions kill the task, and repeat whenever we try to restart it.

[2017-05-03 14:31:56,348] INFO Opening record writer for: topics/my_instance.my_database.my_table/partition=1/my_instance.my_database.my_table+1+0001400000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider:66)
[2017-05-03 14:31:56,422] ERROR Task s3-my-instance-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:449)
org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.connect.errors.SchemaProjectorException: Switch between schema-based and schema-less data is not supported
at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:213)
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:163)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.SchemaProjectorException: Switch between schema-based and schema-less data is not supported
at io.confluent.connect.storage.schema.StorageSchemaCompatibility.validateAndCheck(StorageSchemaCompatibility.java:75)
at io.confluent.connect.storage.schema.StorageSchemaCompatibility.shouldChangeSchema(StorageSchemaCompatibility.java:91)
at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:184)
... 12 more
[2017-05-03 14:31:56,422] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:450)
[2017-05-03 14:31:56,422] INFO WorkerSinkTask{id=s3-data-dev-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:272)
[2017-05-03 14:31:56,429] ERROR Task s3-data-dev-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:451)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
[2017-05-03 14:31:56,429] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:142)

For the particular streams that I'm dealing with it would be safe for the sink to just ignore the tombstone messages, but the bigger problem seems to be that there is just no way for connectors (neither sources nor sinks) to distinguish tombstones (that should be ignored) from null-valued data (which should be persisted).

In the mean time arguably there should be a configurable option to ignore messages with null values.

Related issue on third-party source connector:

@niketanand123
Copy link

Hi,
Did you find any solution to handle the delete case at S3-sink connector?
What kind of solution do you recommend?

@benhowes
Copy link

benhowes commented Jan 7, 2019

@ldcasillas-progreso What did you end up doing for this issue? I've hit the same problem!

@benhowes
Copy link

benhowes commented Jan 8, 2019

What we've done is to set {"tombstones.on.delete": false} in the connector config to prevent the tombstones being produced (available in Debezium 0.7.0 onward), so that this problem doesn't come up

@liukrimhrim
Copy link
Member

liukrimhrim commented Apr 9, 2019

I'm debugging this, but I cannot reproduce the error. As long as "format.class" is set to "io.confluent.connect.s3.format.json.JsonFormat", the connector can deal with null AvroRecord, with or without AvroConverter. When "format.class" is set to "io.confluent.connect.s3.format.avro.AvroFormat", it will threw me an error, but different from yours. Could you be more specific about the scenario where you encounter the error? Thank you!

@cyrusv
Copy link
Contributor

cyrusv commented Jul 25, 2019

@cyrusv cyrusv closed this as completed Jul 25, 2019
@max-polyakov
Copy link

@cyrusv Unfortunately the TombstoneHandler can only ignore the null value messages. But in my case the null value messages cannot be ignored as it the way to signal a deleted message. I use "io.confluent.connect.s3.format.json.JsonFormat" and expect to see an empty json in output file.
Any help will be much appreciated.

@cyrusv
Copy link
Contributor

cyrusv commented Feb 25, 2020

@max-polyakov , I understand your use-case. We're exploring some upgrades in S3 connector that will make this possible, but it's a big investment. Stay tuned!

@darrenhaken
Copy link

@cyrusv any updates on this?

@dosvath
Copy link
Contributor

dosvath commented Aug 17, 2020

@cyrusv Unfortunately the TombstoneHandler can only ignore the null value messages. But in my case the null value messages cannot be ignored as it the way to signal a deleted message. I use "io.confluent.connect.s3.format.json.JsonFormat" and expect to see an empty json in output file.
Any help will be much appreciated.

@darrenhaken thanks for the comment, let me check with @kkonstantine on this issue. To confirm I understand correctly, the current behavior.on.null.values options, FAIL and IGNORE, do not work for your use case, and you'd like more something like WRITE which would write an empty file.

@miguelbirdie
Copy link

miguelbirdie commented Dec 17, 2021

options, FAIL and IGNORE, do not work for your use case, and you'd like more something like WRITE which would write an empty file.

@dosvath , Any progress here? It would be great some mechanism to write an empty file in S3 when we receive a tombstone.

@cyrusv
Copy link
Contributor

cyrusv commented Dec 17, 2021

@miguelbirdie, I understand the request, but we don't have it in our roadmap. We would be welcoming and I'd review a PR if someone from the community puts one together!

@bigman3
Copy link

bigman3 commented Jan 26, 2022

@cyrusv I see this PR #472 .
any news on this @kppullin ?

@ferozed
Copy link

ferozed commented Mar 24, 2022

@cyrusv do you have a design for this case ? What do you think should happen?

@cyrusv
Copy link
Contributor

cyrusv commented Mar 25, 2022

@NathanNam , @kkonstantine , any input here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests