Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failed to commit offsets for WorkerSourceTask #161

Closed
iceNuts opened this issue Nov 7, 2016 · 22 comments
Closed

Failed to commit offsets for WorkerSourceTask #161

iceNuts opened this issue Nov 7, 2016 · 22 comments

Comments

@iceNuts
Copy link

iceNuts commented Nov 7, 2016

Hi there,

I encounter this issue:
[2016-11-07 10:52:00,974] ERROR Failed to flush WorkerSourceTask{id=qa-job-exporter16-0}, timed out while waiting for producer to flush outstanding 386204 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:289) [2016-11-07 10:52:01,031] ERROR Failed to commit offsets for WorkerSourceTask{id=qa-job-exporter16-0} (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:109) [2016-11-07 10:54:01,032] ERROR Failed to flush WorkerSourceTask{id=qa-job-exporter16-0}, timed out while waiting for producer to flush outstanding 415429 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:289) [2016-11-07 10:54:01,057] ERROR Failed to commit offsets for WorkerSourceTask{id=qa-job-exporter16-0} (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:109) [2016-11-07 10:56:01,058] ERROR Failed to flush WorkerSourceTask{id=qa-job-exporter16-0}, timed out while waiting for producer to flush outstanding 413813 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:289) [2016-11-07 10:56:01,079] ERROR Failed to commit offsets for WorkerSourceTask{id=qa-job-exporter16-0} (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:109) [2016-11-07 10:58:01,080] ERROR Failed to flush WorkerSourceTask{id=qa-job-exporter16-0}, timed out while waiting for producer to flush outstanding 405908 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:289) [2016-11-07 10:58:01,100] ERROR Failed to commit offsets for WorkerSourceTask{id=qa-job-exporter16-0} (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:109) [2016-11-07 11:00:01,101] ERROR Failed to flush WorkerSourceTask{id=qa-job-exporter16-0}, timed out while waiting for producer to flush outstanding 406106 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:289) [2016-11-07 11:00:01,121] ERROR Failed to commit offsets for WorkerSourceTask{id=qa-job-exporter16-0} (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:109) [2016-11-07 11:02:01,122] ERROR Failed to flush WorkerSourceTask{id=qa-job-exporter16-0}, timed out while waiting for producer to flush outstanding 406438 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:289) [2016-11-07 11:02:01,142] ERROR Failed to commit offsets for WorkerSourceTask{id=qa-job-exporter16-0} (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:109) [2016-11-07 11:04:01,143] ERROR Failed to flush WorkerSourceTask{id=qa-job-exporter16-0}, timed out while waiting for producer to flush outstanding 405912 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:289) [2016-11-07 11:04:01,165] ERROR Failed to commit offsets for WorkerSourceTask{id=qa-job-exporter16-0} (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:109) [2016-11-07 11:06:01,166] ERROR Failed to flush WorkerSourceTask{id=qa-job-exporter16-0}, timed out while waiting for producer to flush outstanding 405430 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:289) [2016-11-07 11:06:01,186] ERROR Failed to commit offsets for WorkerSourceTask{id=qa-job-exporter16-0} (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:109)

There is data in topic, so I guess it is caused by fetch speed is much faster than write speed. But why does the commit fail? Is there anyone who could explain this or help?

Thanks.

@lianrao
Copy link

lianrao commented Jan 18, 2017

i encounter the same problem.

@wenjiezhang2013
Copy link

I think this issue is not specifically about JDBC connect, I was testing couchbase connect and had the same issue, tried both standalone and distributed way with no luck.

Some of the log shows the commit was successful in ms:

[2017-01-24 00:21:53,707] INFO Finished WorkerSourceTask{id=test-couchbase-8} commitOffsets successfully in 8 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
[2017-01-24 00:21:53,710] INFO Finished WorkerSourceTask{id=test-couchbase-7} commitOffsets successfully in 3 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
[2017-01-24 00:22:02,922] INFO Finished WorkerSourceTask{id=test-couchbase-6} commitOffsets successfully in 9214 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
[2017-01-24 00:22:12,222] INFO Finished WorkerSourceTask{id=test-couchbase-17} commitOffsets successfully in 6 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
[2017-01-24 00:22:12,230] INFO Finished WorkerSourceTask{id=test-couchbase-19} commitOffsets successfully in 8 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
[2017-01-24 00:22:21,457] INFO Finished WorkerSourceTask{id=test-couchbase-10} commitOffsets successfully in 2 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
[2017-01-24 00:22:21,457] INFO Finished WorkerSourceTask{id=test-couchbase-11} commitOffsets successfully in 0 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
[2017-01-24 00:25:07,409] INFO Finished WorkerSourceTask{id=test-couchbase-16} commitOffsets successfully in 2 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
[2017-01-24 00:27:52,782] INFO Finished WorkerSourceTask{id=test-couchbase-19} commitOffsets successfully in 1 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)

While the other shows:

[2017-01-24 00:23:07,820] ERROR Failed to flush WorkerSourceTask{id=test-couchbase-19}, timed out while waiting for producer to flush outstanding 41 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:289)
[2017-01-24 00:23:07,821] ERROR Failed to commit offsets for WorkerSourceTask{id=test-couchbase-19} (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:109)

And I also have wired OOM exceptions.

The whole situation makes me thinking kafka connect might not be production ready yet.

@ewencp
Copy link
Contributor

ewencp commented Feb 3, 2017

This error message indicates that too much data is being buffered to flush it to Kafka before the timeout is reached. This is a general issue that can occur in Connect and not specific to any one connector, but some connectors may be more prone to the issue.

If the number of messages it lists as outstanding is small (as in @wenjiezhang2013's case) this means it's just missing the timeout. To address this, you can give it a slightly bigger timeout by increasing offset.flush.timeout.ms in your worker config.

If the number is very large (and you don't have tiny messages), then probably too much data is being allowed to buffer up compared to the rate at which it can be pushed to Kafka. In that case, to ensure the ability to commit offsets in a timely fashion, you should reduce how much data is buffered by overriding the producer's buffer.memory setting (by setting producer.buffer.memory in your worker config).

Finally, note that while it isn't a sign of a healthy connector, if it shows up very occasionally that can be ok. We'll try to commit offsets again, so the primary risk is a small window where you could see a larger # of duplicates in the case of a crash.

@iceNuts in your case the number of outstanding messages is quite large, so either the buffer is too large or your messages are probably really small.

@wenjiezhang2013 It is being used at scale in production. This is a case where some config tweaking is required to match the particular workload (many users never hit this issue), which is something you'll need to do with pretty much any software operating at scale.

@ewencp ewencp closed this as completed Feb 3, 2017
@wenjiezhang2013
Copy link

@ewencp I did set offset.flush.timeout.ms=30000ms, but it still happened, and as I mentioned in my previous comment, I could not understand why it happens with standalone mode as well, since the offset data gets stored locally in this case.

@ewencp
Copy link
Contributor

ewencp commented Feb 3, 2017

@wenjiezhang2013 Regardless of where offsets are committed, the timeout policy still applies. If we failed to flush all the data in standalone mode, we would not write the offsets to disk since we could not be sure that the data had successfully made it to the output system.

You hadn't previously mentioned increasing offset.flush.timeout.ms. When you increased it, did the number of outstanding messages decrease? As long as data was successfully being delivered to Kafka, it should have -- that number is simply determined by unacked messages to Kafka. If it is not decreasing even if you increase offset.flush.timeout.ms that means messages aren't being acked for some reason (which in turn suggests there is some issue with your Kafka cluster's health).

If the number is decreasing after adjusting that parameter, then the adjustment helped but you probably have large messages such that your producer buffer is still quite full. If you're dealing with large messages, you might want to follow the second set of suggestions I gave and reduce the producer buffer size to reduce the maximum amount of outstanding data you can have.

@pradeepcheers
Copy link

pradeepcheers commented Jul 28, 2017

I got the same error then I changed the worker property parameters as suggested by @ewencp above and it is working fine. Thanks

offset.flush.timeout.ms=50000
buffer.memory=100

@jurgispods
Copy link

jurgispods commented Nov 7, 2017

Thanks @ewencp, your answers helped me identify the problem that one of our brokers was not healthy and subsequently, messaged were no ack'ed, leading to failed offset commits.

@tungntt
Copy link

tungntt commented Apr 17, 2018

Hi everyone, I also got the same issue. However, it still happen in spite of I config worker properties similarly with @pradeepcheers an @ewencp .
Failed to flush, timed out while waiting for producer to flush outstanding 2 messages

As the error, the data is not much but it still occured. I dont't why???
Please help me review this issue again.
Thanks

@aviadhai
Copy link

@pederpansen how did you found the broker that wasn't healthy? we are facing the same issue and i believe we have the same problem but Kafka metrics seems healthy....
What was the way to figure out one of the broker is problematic?

@jurgispods
Copy link

@aviadhai This might not be the answer you hoped for, but: logs :)

@eyalba
Copy link

eyalba commented Aug 12, 2018

Hi @pederpansen - Can you share the log msgs you found which helped you identify the unhealthy broker?
When I went over the relevant logs I only saw INFO msgs which reported normal Kafka activity without any indication of an issue I need to dig deeper into. 10x

@jurgispods
Copy link

@eyalba I'm sorry, but I don't have those logs anymore. I'm sure you would see error logs or exceptions if the broker was unhealthy. Since that is not the case, the problem must be somewhere else.

@benskilljar
Copy link

FWIW, I encountered the same "Failed to commit offsets" issue when trying to configure SSL for a Kafka Connect worker running a JDBC source connector.

Specifically, I was trying to run a Kafka Connect worker in a container with parent confluentinc/cp-kafka-connect:4.1.2.

I was exporting these env variables in my entrypoint script, which enabled me to create a connector, but the connector couldn't actually push anything from my database into Kafka.

export CONNECT_BOOTSTRAP_SERVERS="${KAFKA_URL}"
export CONNECT_SECURITY_PROTOCOL="SSL"
export CONNECT_SSL_CLIENT_AUTH="required"
export CONNECT_SSL_TRUSTSTORE_LOCATION="${trustStoreFile}"
export CONNECT_SSL_TRUSTSTORE_PASSWORD="${trustStorePassword}"
export CONNECT_SSL_TRUSTSTORE_TYPE="${trustStoreType}"
export CONNECT_SSL_KEYSTORE_LOCATION="${keyStoreFile}"
export CONNECT_SSL_KEYSTORE_PASSWORD="${keyStorePassword}"
export CONNECT_SSL_KEYSTORE_TYPE="${keyStoreType}"

The solution was to add CONNECT_PRODUCER_ settings as well:

export CONNECT_PRODUCER_BOOTSTRAP_SERVERS="${KAFKA_URL}"
export CONNECT_PRODUCER_SECURITY_PROTOCOL="SSL"
export CONNECT_PRODUCER_SSL_CLIENT_AUTH="required"
export CONNECT_PRODUCER_SSL_TRUSTSTORE_LOCATION="${trustStoreFile}"
export CONNECT_PRODUCER_SSL_TRUSTSTORE_PASSWORD="${trustStorePassword}"
export CONNECT_PRODUCER_SSL_TRUSTSTORE_TYPE="${trustStoreType}"
export CONNECT_PRODUCER_SSL_KEYSTORE_LOCATION="${keyStoreFile}"
export CONNECT_PRODUCER_SSL_KEYSTORE_PASSWORD="${keyStorePassword}"
export CONNECT_PRODUCER_SSL_KEYSTORE_TYPE="${keyStoreType}"

Apparently the top-level SSL connection settings are only for the non-connector-specific interactions with Kafka (e.g. create a connector and write to the config & status topics), but don't get automatically applied to the connectors (e.g. write messages to offset topic)?

@FrancoisPoinsot
Copy link

If anyone end up here.
Got the same error. @benskilljar anwser resolved part of my logs.
The other part was caused by kafka-connect being unable to create the ouput topic of the jdbc task.

In my case this is caused by some changed over our broker configuration.

This is the exact same message outputed and creating the topic manually solved these.

@anupshirolkar
Copy link

anupshirolkar commented May 16, 2019

Hi,

Not sure whether this thread is still followed but I will post my problem anyway.

We have created a new source connector to connect to a CRM system and pull data and publish to a Kafka topic as usual.
This connector runs in a Kafka connect setup in distributed mode. We are running the connect in a docker container.

I am facing the same issue as mentioned above but with a variation in how it ends. I have already tried to reduce buffer.memory and increase the offset.flush.timeout.ms as follows:

offset.flush.timeout.ms=60000
buffer.memory=10000

The values given here are the latest which I tried but I have tried a few other.

However, the problem still persists. I have tried setting the attributes in kafka-connect environment and also in my connector's REST definition. But I think the one set as kafka-connect environment works fine.

The interesting thing is, the connector does not have any error when it is performing the initial bulk of the load from CRM which is around 10K records. It all works smoothly, after that it struggles with offset commit failure and message flush for any updates that are performed on the source system.

The initial bulk load behaviour is consistent each time. I have performed these tests a few dozen times now.

The further behaviour of the issue is, the connect producer continues to fail to commit the offset and flush the message for 10-15 mins:

Failed to flush, timed out while waiting for producer to flush outstanding 4 messages (org.apache.kafka.connect.runtime.WorkerSourceTask)
ERROR WorkerSourceTask{id=task-name-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter)

Finally, this behaviour always ends with a NETWORK_EXCEPTION as follows:

Got error produce response with correlation id 1507 on topic-partition topic_name-2, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
And then the messages are flushed and appear in the topic.

Initially, I thought the Kafka cluster or topic is not reachable to the connector but such is not the case as other connectors work fine e.g. JDBC connectors while this one struggles to flush/commit.

I think because the failure loop always ends with a Network Exception and then the messages are through, the connection of the connect task to a specific partition is restarted upon the occurrence of the Network Exception.

What can I do to at least shorten the time until Kafka-connect refreshes the connection with a topic partition? So that after the connection restart, my messages will make it to the topic within a min rather than 10-15 mins.

I tried to lower connections.max.idle.ms to 60000 to make the connection refresh within a minute but it does not work. Not sure if connections.max.idle.ms property is related to refreshing the connection to topic-partition.

Please suggest a solution.
Thanks.

@anupshirolkar
Copy link

I and my team did quite a few experiments with the connector to get a workaround for the issue. Finally, we found one, we place the Kafka-connect instance in private address space as of the Kafka cluster and also used offset.flush.timeout.ms=20000 and buffer.memory=20000 parameters along with acks=1.
The issue has not appeared since this change so I am concluding the analysis with the understanding that, Kafka-connect has some bug with message commit and flushing which can be triggered for any connector. However, this bug can be affected by the latency between Kafka cluster and Kafka-connect instance.
So, in order to workaround the issue the buffer and flush settings should be altered to flush frequently and also maintain a low latency between Kafka-connect and Kafka.

@zakibenz
Copy link

zakibenz commented Jun 19, 2019

I am running into the same issue.
I've tried tuning offset.flush.timeout.ms and buffer.memory properties but does not help.
Any suggestion.

@contacttapan
Copy link

+1

@gtyanchev
Copy link

I was experiencing the same problem and increasing the replication factor of the topic from 1 to 3 fixed it for me.

@singaretti
Copy link

This error message indicates that too much data is being buffered to flush it to Kafka before the timeout is reached. This is a general issue that can occur in Connect and not specific to any one connector, but some connectors may be more prone to the issue.

If the number of messages it lists as outstanding is small (as in @wenjiezhang2013's case) this means it's just missing the timeout. To address this, you can give it a slightly bigger timeout by increasing offset.flush.timeout.ms in your worker config.

If the number is very large (and you don't have tiny messages), then probably too much data is being allowed to buffer up compared to the rate at which it can be pushed to Kafka. In that case, to ensure the ability to commit offsets in a timely fashion, you should reduce how much data is buffered by overriding the producer's buffer.memory setting (by setting producer.buffer.memory in your worker config).

Finally, note that while it isn't a sign of a healthy connector, if it shows up very occasionally that can be ok. We'll try to commit offsets again, so the primary risk is a small window where you could see a larger # of duplicates in the case of a crash.

@iceNuts in your case the number of outstanding messages is quite large, so either the buffer is too large or your messages are probably really small.

@wenjiezhang2013 It is being used at scale in production. This is a case where some config tweaking is required to match the particular workload (many users never hit this issue), which is something you'll need to do with pretty much any software operating at scale.

I've tried tuning producer.buffer.memory because another process I was doing, that's why I had this issue "timed out while waiting for producer to flush outstanding", after comment this producer.buffer.memory parameter, everything is working fine.

@gkartz
Copy link

gkartz commented Dec 9, 2019

Can someone tell me where is the location of this worker config file?

@nitinr708
Copy link

it would be in /apps/confluent-/etc/kafka/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests