Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failed Authentication: Too many connects #28

Closed
rakrak98 opened this issue Jul 8, 2021 · 35 comments
Closed

Failed Authentication: Too many connects #28

rakrak98 opened this issue Jul 8, 2021 · 35 comments

Comments

@rakrak98
Copy link

rakrak98 commented Jul 8, 2021

What does this error indicate? The logic of my application is that I have 2 producer and one consumer running in parallel with eachother, is that what may be causing this issue? This is the first time I have seen this error:

22:03:32.012 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Failed authentication with example.kafka.us-east-1.amazonaws.com/10.1.1.132 ([446c81dc-9ab3-4d4b-b174-4ecd9baa406c]: Too many connects)
22:03:32.046 [kafka-producer-network-thread | producer-1] ERROR org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node -1 (example.kafka.us-east-1.amazonaws.com/10.1.1.132:9098) failed authentication due to: [446c81dc-9ab3-4d4b-b174-4ecd9baa406c]: Too many connects

@liko9
Copy link

liko9 commented Jul 13, 2021

This error is the result of too many connection requests at the same time given your broker's instance size. You probably want to set reconnect.backoff.ms to something higher than default. See this link for some information: https://docs.aws.amazon.com/msk/latest/developerguide/limits.html

@sayantacC
Copy link
Contributor

"Too many connects" is a sign that one or more IAM clients are trying to connect to a particular broker too many times per second and the broker is protecting itself.
What is the size of broker on the cluster (t3.small / m5.xl ...) ?
What is the reconnect.backoff.ms set to on the clients ? Setting this higher should help clients backoff and retry connections such that the broker does not need to reject new connections because of the connection rate.

Please note this error is not about the total number of connections per broker but the rate of new IAM connections per broker.
See https://docs.aws.amazon.com/msk/latest/developerguide/limits.html for the limit on the rate of new IAM connections per broker for different broker sizes.

@rakrak98
Copy link
Author

@sayantacC @liko9 Thanks a lot for the detailed answer! Making this change worked for me. Maybe I missed it, but was this limits page referenced in the IAM documentation? I think it would be helpful to have that. Thanks again!

@BhuviTheDataGuy
Copy link

BhuviTheDataGuy commented Aug 25, 2021

I have increased this value, but still getting the error. Im running kafka connect in distributed mode. And this is the error from the kafka connect service.

org.apache.kafka.connect.errors.ConnectException: Failed to connect to and describe Kafka cluster. Check worker's broker connection and security properties.
        at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70)
        at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51)
        at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.configure(KafkaOffsetBackingStore.java:86)
        at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:112)
        at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:80)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SaslAuthenticationException: [b823641a-a49b-4326-80bd-4c4a00b4e375]: Too many connects
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
        at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
        ... 4 more
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: [b823641a-a49b-4326-80bd-4c4a00b4e375]: Too many connects

@sayantacC
Copy link
Contributor

@BhuviTheDataGuy The number of kafka connect workers will increase the rate of new connections to the kafka brokers. Depending on the number of kafka connect workers you have, you might need to set the reconnect.backoff.ms even higher. What is the number of workers in your setup and the value of reconnect.backoff.ms ?
What is the type of broker you are using ? (t3.small / m5.xl)

@BhuviTheDataGuy
Copy link

BhuviTheDataGuy commented Aug 26, 2021

Instance type - t3.small(2 broker)
rexonnext.backoff.ms 100000

@sayantacC
Copy link
Contributor

@BhuviTheDataGuy Sorry for the delayed response. If you update the broker type to a larger instance, does the problem go away ?

@mfbieber
Copy link

mfbieber commented Jan 1, 2022

We are encountering the same problems. Setting reconnect.backoff.ms to e.g. 10000 doesn't make a difference, since the exception that is being thrown (SaslAuthenticationException) is not retryable (see https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L808) and ultimately leads to a new creation of a client, not a reconnect.

When would the reconnect take place? As I went through the implementation, what I see is:

  1. that startConnect() in ConnectDistributed is calling the constructor of Worker
  2. the constructor of Worker calls ConnectUtils.lookupKafkaClusterId(config)
  3. that method calls Admin.create(config.originals())
  4. if you follow the calls from there, you will see that you end up not retrying upon obtaining SaslAuthenticationException (https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L808)

Even if the retry would work, several AdminClients are created, which all connect to the MSK cluster. Since this is not a reconnect, reconnect.backoff.ms settings cannot work for remediation. There is no mechanism in the Kafka code that would globally allow restricting these connections to happen only every x seconds. Unless I oversee something, MSK Connect should only work by chance with t3.small instances.

So either AWS removes the limit on t3.small for IAM connections or the Kafka clients throw a different exception :-/

See parts of our logs using AWS MSK Connect:

[Worker-05ea3408948fa0a4c] [2022-01-01 22:41:53,059] INFO Creating Kafka admin client (org.apache.kafka.connect.util.ConnectUtils:49)
[Worker-05ea3408948fa0a4c] [2022-01-01 22:41:53,061] INFO AdminClientConfig values:
...
[Worker-05ea3408948fa0a4c] 	reconnect.backoff.max.ms = 10000
[Worker-05ea3408948fa0a4c] 	reconnect.backoff.ms = 10000
[Worker-05ea3408948fa0a4c] 	request.timeout.ms = 30000
[Worker-05ea3408948fa0a4c] 	retries = 2147483647
[Worker-05ea3408948fa0a4c] 	retry.backoff.ms = 10000
...
[Worker-05ea3408948fa0a4c] [2022-01-01 22:41:54,269] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectDistributed:86)
[Worker-05ea3408948fa0a4c] org.apache.kafka.connect.errors.ConnectException: Failed to connect to and describe Kafka cluster. Check worker's broker connection and security properties.
[Worker-05ea3408948fa0a4c] 	at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70)
[Worker-05ea3408948fa0a4c] 	at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51)
[Worker-05ea3408948fa0a4c] 	at org.apache.kafka.connect.runtime.Worker.<init>(Worker.java:140)
[Worker-05ea3408948fa0a4c] 	at org.apache.kafka.connect.runtime.Worker.<init>(Worker.java:127)
[Worker-05ea3408948fa0a4c] 	at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:118)
[Worker-05ea3408948fa0a4c] 	at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:80)
[Worker-05ea3408948fa0a4c] Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SaslAuthenticationException: [e4afe53f-73b5-4b94-9ac3-30d737071e56]: Too many connects
[Worker-05ea3408948fa0a4c] 	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
[Worker-05ea3408948fa0a4c] 	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
[Worker-05ea3408948fa0a4c] 	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
[Worker-05ea3408948fa0a4c] 	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
[Worker-05ea3408948fa0a4c] 	at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
[Worker-05ea3408948fa0a4c] 	... 5 more
[Worker-05ea3408948fa0a4c] Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: [e4afe53f-73b5-4b94-9ac3-30d737071e56]: Too many connects
[Worker-05ea3408948fa0a4c] [2022-01-01 22:41:54,281] INFO Stopped http_0.0.0.08083@68631b1d{HTTP/1.1, (http/1.1)}{0.0.0.0:8083} (org.eclipse.jetty.server.AbstractConnector:381)
[Worker-05ea3408948fa0a4c] [2022-01-01 22:41:54,283] INFO Stopped https_0.0.0.08443@611d0763{SSL, (ssl, http/1.1)}{0.0.0.0:8443} (org.eclipse.jetty.server.AbstractConnector:381)
[Worker-05ea3408948fa0a4c] MSK Connect encountered errors and failed.

Since this forces us to use the m5.large instance or not use IAM, I wrote this question in the AWS re:Post site to hopefully obtain help from the AWS community on this: https://repost.aws/questions/QU3qd8DVjTR4qHH_4Zf4KtuA/msk-connect-on-t-3-small-fails-due-to-not-retryable-sasl-authentication-exception-reconnect-backoff-ms-worker-configuration-will-not-help-can-aws-remove-the-connection-limit

@liko9
Copy link

liko9 commented Jan 3, 2022

@mfbieber What do your Kafka Connect configurations look like? Obviously, please mask anything sensitive, but I'm keenly interested in how you are setting the backoff parameter. One element of Kafka Connect which isn't immediately obvious is the number of different contexts it runs within. In my testing, I had to set the backoff for two additional contexts - Producer and Consumer. From what you shared, it appears that your AdminClientConfig took the parameter as expected, but I'd be curious to see if the others did as well (my guess is that they did not). Also, you are referencing trunk in your code investigation - did you compile the latest version from trunk yourself, or are you using a specific version of Apache Kafka?
In my successful tests against a t3.small MSK cluster, I had the following parameters set:

reconnect.backoff.ms=1000
reconnect.backoff.max.ms=60000
producer.reconnect.backoff.ms=1000
producer.reconnect.backoff.max.ms=60000
consumer.reconnect.backoff.ms=1000
consumer.reconnect.backoff.max.ms=60000

I believe this is because the default of the backoff timing is 50ms, which is way too quick for MSK using IAM. I also set the backoff.max.ms higher than my backoff.ms so that it could happen more than once instead of failing immediately. (reference: https://kafka.apache.org/documentation/#producerconfigs_reconnect.backoff.max.ms)

@mfbieber
Copy link

mfbieber commented Jan 4, 2022

Hi @liko9 , thanks for your interest and your answer! I tried your configuration, but I am seeing the same errors as before with this worker configuration:

  key.converter=org.apache.kafka.connect.storage.StringConverter
  value.converter=org.apache.kafka.connect.storage.StringConverter
  reconnect.backoff.ms=10000
  reconnect.backoff.max.ms=60000
  producer.reconnect.backoff.ms=10000
  producer.reconnect.backoff.max.ms=60000
  consumer.reconnect.backoff.ms=10000
  consumer.reconnect.backoff.max.ms=60000

Also, because I am unsure of how to configure the connector, I added those values to the connector configuration (where should they actually go? I would have thought the worker, but I am just starting out with Kafka):

{
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "flush.size": "2",
  "format.bytearray.extension": ".txt",
  "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
  "locale": "en-US",
  "partitioner.class": "io.confluent.connect.storage.partitioner.DailyPartitioner",
  "retry.backoff.ms": "10000",
  "reconnect.backoff.ms": "10000",
  "reconnect.backoff.max.ms": "60000",
  "producer.reconnect.backoff.ms": "10000",
  "producer.reconnect.backoff.max.ms": "60000",
  "consumer.reconnect.backoff.ms": "10000",
  "consumer.reconnect.backoff.max.ms": "60000",
  "s3.bucket.name": "${s3_destination_bucket}",
  "s3.compression.type": "gzip",
  "s3.part.retries": "10",
  "s3.region": "${region}",
  "s3.retry.backoff.ms": "10000",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "tasks.max": "1",
  "timestamp.extractor": "Record",
  "timezone": "UTC",
  "topics": "${topics}",
  "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}

In the connector log, I also see:

...
[Worker-0a0d8cc42ea8ff79d] [2022-01-04 22:13:29,076] WARN The configuration 'consumer.reconnect.backoff.ms' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
...
[Worker-0a0d8cc42ea8ff79d] [2022-01-04 22:13:29,080] WARN The configuration 'consumer.reconnect.backoff.max.ms' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
...
[Worker-0a0d8cc42ea8ff79d] [2022-01-04 22:13:29,088] WARN The configuration 'producer.reconnect.backoff.max.ms' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
...
[Worker-0a0d8cc42ea8ff79d] [2022-01-04 22:13:29,089] WARN The configuration 'producer.reconnect.backoff.ms' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig:369)
...

From what you shared, it appears that your AdminClientConfig took the parameter as expected, but I'd be curious to see if the others did as well (my guess is that they did not).

I believe I am seeing the AdminClientConfig only. That one is crashing (as described) and I believe that prevents any producers and consumers from being started maybe?

Also, you are referencing trunk in your code investigation - did you compile the latest version from trunk yourself, or are you using a specific version of Apache Kafka?

The Kafka Connect version is 2.7.1 and the cluster's Kafka version is 2.8.1.
We didn't compile anything, we are using the managed MSK for the Kafka Cluster and configuring the connector via the AWS CLI, providing a bunch of JSON data to configure everything.
We simply checked out the code and wanted to understand why the AdminClient connects that often. We wanted to see if there is a retry upon rejection because of the IAM limitation and if the reconnect mechanism is wrapped around those AdmiClient creations with their connection attempts.

I believe it is easier if you see the full Kafka Connect Worker log, I've attached it here. That contains no production-relevant information:
20220104T2210Z_2dda6191.log.gz

I really hope that I am missing something in the configuration.

@liko9
Copy link

liko9 commented Jan 5, 2022

@mfbieber First off, thank you for your detailed descriptions and full logs. If more people were willing to provide that, it would make troubleshooting far, far easier.

I previously overlooked that you are using MSK Connect. I do see in the logs the same as you, that the AdminClientConfig has accepted the parameters you provided, and these do match my working configuration. However, my working configuration not from MSK Connect but rather Kafka Connect running on an EC2 instance (created prior to the availability of MSK Connect).

You are correct that because the AdminClient is unable to check its topics (the config, offsets, and storage topics) in MSK due to the "too many connects" error, it is not continuing to where the producer or consumer are initialized.

There is an automatic retry / reconnection attempt that the reconnect.backoff parameters are used by. I'm at a loss as to why this isn't working within MSK Connect - I'd suggest opening a support case to see if the service team can see anything that we're not seeing in the logs.

The parameters are worker level parameters (not connector parameters), so you did that in the proper place. The two things you could try while you're waiting on support are to try to set reconnect.backoff.ms to 10000 (make it 10s instead of 1s - this is a totally silly idea but it might be interesting if the behavior changes) or you could try running Kafka Connect yourself in EC2. I'm sorry that I couldn't be of further help.

@Gatsby-Lee
Copy link

Hello All,

I am having this issue as well.

(Not working) Test1: t3.small + no custom config for reconnect.backoff

I reached out to AWS Support. And, I was told that the issue is related to the TCP connection limit.
( https://docs.aws.amazon.com/msk/latest/developerguide/limits.html )
AWS Support asked me to increase the instance type to m5.large ( why? )
I was not convinced since I only use one MSK connector ( using IAM ).

(Not working) Test2: t3.small + overriding these reconnect.backoff

  • producer.reconnect.backoff.max.ms=1200000
  • producer.reconnect.backoff.ms=1000

I wasn't convinced by what AWS support said, so I tried with the updated config.
However, it failed with these log. And I googled the error and got here.

[Worker-0e73633c0bd7070df] Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: [551e2ed2-38d0-4069-aefa-26fb70ee52ed]: Too many connects

Honestly, it doesn't make sense that I have to increase the machine type because t3.small can't handle IAM auth request from one MSK connector.

@sayantacC
Copy link
Contributor

Although, sadly I do not yet have a solution to offer, I want to reopen the issue to raise visibility.

@sayantacC sayantacC reopened this Jan 7, 2022
@Gatsby-Lee
Copy link

@sayantacC
Thank you.
I created a AWS Support ticket. AWS Support team told me that they will reach out to Internal Team to check how to make it work with t5.small and the given throttle.

@Gatsby-Lee
Copy link

To give little more details, I only use one MSK Connector that use "tasks.max=1" and connect to MSK Cluster.

@mfbieber
Copy link

mfbieber commented Jan 8, 2022

@liko9: thanks for verifying!

@Gatsby-Lee: thank you. We will also invest some $s for the support and add a ticket for the t3.small limitation. The more people request it, the higher it might get in their priority list - that's what I heard.

The next (but not preferred) option would have been to use SASL SCRAM, configure ACLs and use the public endpoint to connect to the MSK cluster. I tested it, but unfortunately, it is not possible to provide security.* and sasl.* configuration options for the workers (https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html).

I am going to create one more ticket for that too.

Also, I will create another ticket for an option to delete worker configurations and custom plugins, which is not possible at the moment: https://stackoverflow.com/questions/70025964/how-to-delete-a-worker-and-a-plugin-on-aws-msk-connect

@Gatsby-Lee
Copy link

@mfbieber
It seems you and me are in the similar context :)

I escalated these two limitations, you mentioned.
In my case, I have one more limitation that I can't override "offset.storage.topic for MSK" connector.
It is already created by MSK connector with new name.
I am not sure if it is one of your blocker.

I will share if I have more updates about the TCP connection throttle issue with t3.small.

For fun, I wrote about the issue here.
https://medium.com/@life-is-short-so-enjoy-it/tcp-connection-throttle-in-amazon-msk-iam-access-control-c7d2102e2cc3

@mfbieber
Copy link

mfbieber commented Jan 8, 2022

Thanks, @Gatsby-Lee. No, that offset.storage.topic setting is not one of our issues.

I will also share any updates I get here with you guys.

@liko9
Copy link

liko9 commented Jan 11, 2022

This seems to be exclusive to MSK Connect, as I do not experience the same issue when running Kafka Connect with IAM authentication against MSK (it seems to respect the reconnect.backoff parameters as expected). Have either of you tried Kafka Connect without MSK Connect?

@Gatsby-Lee
Copy link

@liko9 no I haven't tried it.

@waynetaylor
Copy link

waynetaylor commented Jan 13, 2022

Hi all, I have this same problem on our dev cluster setting up MSK Kafka Connect.

I have IAM working running on EC2 or ECS. But with MSK Kafka Connect this remains a problem. Upgrading the instance types to m5.large did work around the problem - but this means I have a really expensive dev environment for really low producers and consumers.

Here is the custom worker config:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
reconnect.backoff.ms=1000
consumer.reconnect.backoff.max.ms=60000
consumer.reconnect.backoff.ms=1000
reconnect.backoff.max.ms=60000
producer.reconnect.backoff.max.ms=60000
producer.reconnect.backoff.ms=1000

Errors from logs:



2022-01-12T21:32:44.000-06:00 | [Worker-0ecb1447a552fbbc6] [2022-01-13 03:32:44,592] INFO Kafka version: 2.7.1 (org.apache.kafka.common.utils.AppInfoParser:119)
-- | --
  | 2022-01-12T21:32:44.000-06:00 | [Worker-0ecb1447a552fbbc6] [2022-01-13 03:32:44,593] INFO Kafka commitId: unknown (org.apache.kafka.common.utils.AppInfoParser:120)
  | 2022-01-12T21:32:44.000-06:00 | [Worker-0ecb1447a552fbbc6] [2022-01-13 03:32:44,593] INFO Kafka startTimeMs: 1642044764592 (org.apache.kafka.common.utils.AppInfoParser:121)
  | 2022-01-12T21:32:45.000-06:00 | [Worker-0ecb1447a552fbbc6] [2022-01-13 03:32:45,066] INFO [AdminClient clientId=adminclient-7] Failed authentication with mycluster/INTERNAL_IP ([b1bd4809-4694-4ed6-bc13-394737824d04]: Too many connects) (org.apache.kafka.common.network.Selector:616)
  | 2022-01-12T21:32:45.000-06:00 | [Worker-0ecb1447a552fbbc6] [2022-01-13 03:32:45,068] ERROR [AdminClient clientId=adminclient-7] Connection to node 3 (mycluster/INTERNAL_IP) failed authentication due to: [b1bd4809-4694-4ed6-bc13-394737824d04]: Too many connects (org.apache.kafka.clients.NetworkClient:771)
  | 2022-01-12T21:32:45.000-06:00 | [Worker-0ecb1447a552fbbc6] [2022-01-13 03:32:45,069] WARN [AdminClient clientId=adminclient-7] Metadata update failed due to authentication error (org.apache.kafka.clients.admin.internals.AdminMetadataManager:232)
  | 2022-01-12T21:32:45.000-06:00 | [Worker-0ecb1447a552fbbc6] org.apache.kafka.common.errors.SaslAuthenticationException: [b1bd4809-4694-4ed6-bc13-394737824d04]: Too many connects

@pmalon
Copy link

pmalon commented Jan 27, 2022

@mfbieber were you able to get MSK Kafka Connect working with t3.small instances?

@mfbieber
Copy link

@pmalon: no, but I have gotten feedback from the AWS Support team:

  • The team is working on solutions to increase the connection limit for t3 instances. Possibly this feature will be available for new t3 clusters later in Q1 2022.
  • Currently, SASL/SCRAM authentication is not supported on MSK Connect connectors. This will be available by the end of the year 2022.

We will probably either wait until it works with t3 or implement a hacky solution involving a public cluster with self-built mechanisms to secure it. It simply depends on the timing and we would rather wait for the IAM + t3 solution.

@averemee-si
Copy link

This seems to be exclusive to MSK Connect, as I do not experience the same issue when running Kafka Connect with IAM authentication against MSK (it seems to respect the reconnect.backoff parameters as expected). Have either of you tried Kafka Connect without MSK Connect?

@liko9 I got the same error with pure Kafka Connect without MSK Connect

@Gatsby-Lee
Copy link

@mfbieber
IAM Auth in MSK Connect doesn't look stable.
Today, one of my Connectors ( running MongoDB Kafaka Source Connector ) died because somehow it failed to IAM Auth in the middle of running.

Since the current MSK Connect doesn't allow to override the offset.storage.topic, I wasn't able to start the Connector from where it failed.

@dude0001
Copy link
Contributor

We just went live running ~80 Debezium connectors in a Kafka Connect ECS Cluster, not MSK Connect as it was going to be much more expensive for our use case. Our MSK cluster is running 3 m5.large, and I've currently got ~7 connectors experiencing this issue.

@dude0001
Copy link
Contributor

dude0001 commented Feb 28, 2022

@sayantacC or anyone else that might have insight, is there any discussion or progress internally related to this issue? I am feeling like something is severely wrong with the library here or the number of connects per second is too low for our broker types. This is causing us severe headaches and a lack of confidence in the solution in production. I am going to open a support ticket with AWS, but I'd like to keep the conversation going here as well.

I want to stress, I do not think this is an issue with only MSK Connect or using MSK with small broker sizes. We are spending some good money on a large MSK cluster and Kafka Connect in ECS, and constantly seeing this problem.

Below is our setup. We currently are only producing data, and don't even have our consumers wired up yet. We are able to see sustained 30k messages per second with spikes up to 70k-90k per second with the brokers keeping at ~50-75% CPU usage in the Kafka cluster. However, our connectors keep failing with the error below. I have tried to set the retry and backoff as suggested in this thread. The longest we've gone without connectors failing for this is maybe 12 hours. Also after restarting connectors they sometimes seem to fail faster than the max timeout we have set. It sort of feels like these settings aren't being honored like others have said above.

It is frustrating for me to have to go ask for more brokers or larger brokers when the clusters seem to be performing more than sufficient for our load otherwise. Explaining we need more brokers to handle IAM auth is a very tough sell. I did load testing on 60-100k messages per second and was not running into his problem, so again this is hard to explain why I need to spend more on brokers now and I'm not even sure that would solve our issue.

What can we do to troubleshoot further or get you more information? Should we just stop trying to use IAM auth? I see how we can monitor total TCP connections in our cluster in Cloud Watch metrics and we are staying below quota there. How do we monitor new connections per second which seem to be where the problem is? And can we aggregate this by the source of the connection to help pinpoint where the issue is coming from?

Kafka:
MSK 3 broker kafka.m5.large

Broker config:

auto.create.topics.enable = false
delete.topic.enable = true
default.replication.factor = 3
log.cleanup.policy = delete
log.retention.hours = 168
log.retention.bytes = -1
# Allow occasional huge transactions to be ingested from CDC. The expected average message size is much smaller.
message.max.bytes = 26214400
min.insync.replicas = 2
offsets.topic.replication.factor = 3

Kafka Connect:
ECS, EC2 launch type (m5a.large), 25 workers running 75 Debezium connectors streaming ~100 tables each.

connect-distributed.properties:

topic.creation.enable=true

config.storage.replication.factor=3
offset.storage.replication.factor=3
offset.storage.partitions=25
status.storage.replication.factor=3
status.storage.partitions=5

# The converters specify the format of data in Kafka and how to translate it into Connect data.
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

config.providers=file
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider

# AWS MSK IAM authentication
ssl.truststore.location=/tmp/kafka.client.truststore.jks
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
consumer.ssl.truststore.location=/tmp/kafka.client.truststore.jks
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=AWS_MSK_IAM
consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
producer.ssl.truststore.location=/tmp/kafka.client.truststore.jks
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=AWS_MSK_IAM
producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

# Connection Retry
# When using IAM auth MSK throttles the total number of open TCP connections as well as new TCP connections being opened per second.
# Because of this throttling, we have connectors failing for unretryable connection errors. Increasing the
# backoff and ultimate timeout period is attempting to work around these isseus. See
# https://github.com/aws/aws-msk-iam-auth/issues/28
# https://docs.aws.amazon.com/msk/latest/developerguide/limits.html
reconnect.backoff.ms=5000
reconnect.backoff.max.ms=900000
producer.reconnect.backoff.ms=5000
producer.reconnect.backoff.max.ms=900000
consumer.reconnect.backoff.ms=5000
consumer.reconnect.backoff.max.ms=900000

# kafka connect rest extension
rest.extension.classes=com.zotecpartners.kafka.connect.rest.ZotecConnectRestExtension

Error:

org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback
	at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:284)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:338)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:241)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: [b1f87579-2113-455d-87ce-85fc89ac43bd]: Too many connects

@sayantacC
Copy link
Contributor

sayantacC commented Mar 1, 2022

@dude0001 Thanks for reporting your problem with such detail and I am sorry about the pain this issue has been causing you. We are looking into the pain points with using IAM and Kafka Connect.

In the meanwhile, I have captured my best understanding of this issue based on some digging and have some suggestions based on it.

Background

As you are painfully aware, each MSK broker imposes a limit on the connection creation rate by IAM clients. When this limit is breached, the broker rejects the next connection with a "Too many connects" error message that gets encapsulated in a SaslAuthenticationException on the client.

By default, Kafka Connect fails the connector task immediately on any error. As a result any SaslAuthenticationException caued the connector task to fail immediately.

However, this behavior can be changed by setting the value of the field errors.tolerance. From Kafka Connect's documentation for errors.tolerance: [Define] Behavior for tolerating errors during connector operation. 'none' is the default value and signals that any error will result in an immediate connector task failure; 'all' changes the behavior to skip over problematic records.

My reading of the Kafka Connect code seems to indicate that setting errors.tolerance=all will handle errors communicating with Kafka as well. Warning: This will lead to records not being written to the Kafka cluster.

Suggestion

Would you be willing to try modifying your Connect's error handling by setting:

errors.tolerance=all

errors.log.enable=true
errors.log.include.messages=false

errors.retry.timeout=600000
errors.retry.delay.max.ms=30000

I will point out for completeness that it is possible to include the messages while logging the errors as well: errors.log.include.messages=true.

Monitoring

The ConnectionCreationRate is a metric that is reported per broker per listener (IAM in this case).
However, it is only available when PER_BROKER monitoring is enabled.
Please note that the user is charged for enabling PER_BROKER monitoring.

@Gatsby-Lee
Copy link

@sayantacC

Well. IMHO, your suggestion works for only the case that it's ok to lose some data.

In my case, your suggestion creates data integrity question.

@dude0001
Copy link
Contributor

dude0001 commented Mar 3, 2022

Is there any other logging or anything needed to reproduce and troubleshoot the issue? It seems easy enough yo reproduce already but if I can help I am glad to do it. Having to buy more compute in an already large cluster just for IAM auth feels bad and there has to be another solution.

Thank you for pointing me to the metric. I missed this and now I can at least monitor. There still doesn't seem to be a way to aggregate a source from this so we can pinpoint where we are opening a lot of connections quickly and try to optimize.

The "'all' changes the behavior to skip over problematic records" part of the suggested error tolerance work around does seem problematic for us as well. I appreciate the idea but I am still researching that.

Right now the only solution I have found is to automate restarting the failed connectors with a Lambda running on a CRON schedule but that jas it's own consequences for is as well.

@sayantacC
Copy link
Contributor

@Gatsby-Lee
You are right and I should have been more explicit than simply quoting the documentation for the relevant field. I have edited my comment and added a warning.

Although, it is unlikely to be useful, I will still point out for completeness that it is possible to include the messages while logging the errors as well: errors.log.include.messages=true.

@sayantacC
Copy link
Contributor

@dude0001 We are internally looking into the problem of rejecting IAM connections with Too many connects.
If you have not already done so and have the ability, please reach out to AWS support and open a case.

In this case, there isn't a Kafka metric from the broker side that can aggregate by something like client-id since the broker is rejecting the connection before it can really learn about the client-id. I will continue to look for any other mechanism of finding this information (such as logging).

I wanted to point out that there are some JMX based metrics on all Kafka producer/consumer/connect/streams instances: connection-creation-rate and connection-creation-total. If you already have a setup for monitoring your connect instances, you might already have those metrics.

I have updated my comment suggesting the error.tolerance field to add a warning.

@HunterSherms
Copy link

I'm going to dog pile on this same issue with another "this isn't just the small instance types" or "this isn't just MSK connect" example. We are running with kafka.m5.2xlarge instances and self-hosted Kafka Connect and see this error on every deploy due to all of the new instances coming up and making new connections. It is resulting in downtime and the need to restart all connectors manually after a deploy.

It makes IAM based auth nearly unusable for any larger deployment of Kafka-Connect.

@plazma-prizma
Copy link
Contributor

AWS MSK made an improvement for connection burst rates in IAMAuthAgent. AWS need to deploy the new version to your Amazon MSK clusters to mitigate the issue on your behalf. You can request this from AWS support.

The deployment requires a rolling restart of all the brokers in the cluster. During deployment, one broker at a time will be unavailable for read/write operations. Clients will still be able to communicate with the cluster if you follow the Amazon MSK best practices [1] to avoid any availability loss during the planned upgrade.

[1] https://docs.aws.amazon.com/msk/latest/developerguide/bestpractices.html

@grsubramanian
Copy link
Contributor

We have recently addressed this issue in MSK. We have also updated documentation at https://docs.aws.amazon.com/msk/latest/developerguide/limits.html#msk-provisioned-quota. If you still need help, please create a support case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests