Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaClient.sendRequest stuck in ensureBrokerReady loop preventing broker failover due to missing metadata refresh #1041

Closed
poolik opened this issue Aug 2, 2018 · 4 comments

Comments

@poolik
Copy link

poolik commented Aug 2, 2018

Bug Report

We've been investigating why on some occasions our HighLevelProducer stops producing when one broker goes down and in some cases it doesn't. Discovered that the issue lies within KafkaClient.sendRequest getting stuck in ensureBrokerReady.

We have 3 brokers and are using keyed producer, with automatic retries on produce failure.

The "happy" (working) path on 1 broker going down with Debug logging:

message: 5413
  kafka-node:KafkaClient compressing messages if needed +1s
  kafka-node:KafkaClient checking payload topic/partitions has leaders +0ms
  kafka-node:KafkaClient found leaders for all +0ms
  kafka-node:KafkaClient grouped requests by 1 brokers ["1"] +0ms
  kafka-node:KafkaClient has apiSupport broker is ready +0ms
  kafka-node:KafkaClient found 1 connected broker(s) +1ms
Received error: BrokerNotAvailableError: Broker not available (sendRequest)
  kafka-node:KafkaClient updating metadatas +19ms
  kafka-node:KafkaClient compressing messages if needed +19ms
  kafka-node:KafkaClient checking payload topic/partitions has leaders +0ms
  kafka-node:KafkaClient found leaders for all +1ms
  kafka-node:KafkaClient grouped requests by 1 brokers ["0"] +0ms
  kafka-node:KafkaClient missing apiSupport waiting until broker is ready... +0ms
  kafka-node:KafkaClient Sending versions request to toniss-mbp-2.lan:9092 +1ms
  kafka-node:KafkaClient Received versions response from toniss-mbp-2.lan:9092 +2ms
  kafka-node:KafkaClient setting api support to {"21":{"min":0,"max":0,"usable":false},"22":{"min":0,"max":0,"usable":false},"23":{"min":0,"max":0,"usable":false},"24":{"min":0,"max":0,"usable":false},"25":{"min":0,"max":0,"usable":false},"26":{"min":0,"max":0,"usable":false},"27":{"min":0,"max":0,"usable":false},"28":{"min":0,"max":0,"usable":false},"29":{"min":0,"max":0,"usable":false},"30":{"min":0,"max":0,"usable":false},"31":{"min":0,"max":0,"usable":false},"32":{"min":0,"max":0,"usable":false},"33":{"min":0,"max":0,"usable":false},"34":{"min":0,"max":0,"usable":false},"35":{"min":0,"max":0,"usable":false},"36":{"min":0,"max":0,"usable":false},"37":{"min":0,"max":0,"usable":false},"produce":{"min":0,"max":5,"usable":2},"fetch":{"min":0,"max":6,"usable":2},"offset":{"min":0,"max":2,"usable":0},"metadata":{"min":0,"max":5,"usable":0},"leader":{"min":0,"max":1,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":4,"usable":false},"controlledShutdown":{"min":0,"max":1,"usable":false},"offsetCommit":{"min":0,"max":3,"usable":2},"offsetFetch":{"min":0,"max":3,"usable":1},"groupCoordinator":{"min":0,"max":1,"usable":0},"joinGroup":{"min":0,"max":2,"usable":0},"heartbeat":{"min":0,"max":1,"usable":0},"leaveGroup":{"min":0,"max":1,"usable":0},"syncGroup":{"min":0,"max":1,"usable":0},"describeGroups":{"min":0,"max":1,"usable":0},"listGroups":{"min":0,"max":1,"usable":0},"saslHandshake":{"min":0,"max":1,"usable":false},"apiVersions":{"min":0,"max":1,"usable":0},"createTopics":{"min":0,"max":2,"usable":false},"deleteTopics":{"min":0,"max":1,"usable":false}} +0ms
  kafka-node:KafkaClient broker is now ready +16ms
  kafka-node:KafkaClient Using V2 of produce +0ms
  kafka-node:Client refresh metadata currentAttempt 1 +0ms
  kafka-node:Client refresh metadata currentAttempt 2 +204ms
  kafka-node:Client refresh metadata currentAttempt 3 +401ms
  kafka-node:KafkaClient kafka-node-client reconnecting to toniss-mbp-2.lan:9093 +119ms
message: 5414
  kafka-node:KafkaClient compressing messages if needed +407ms
  kafka-node:KafkaClient checking payload topic/partitions has leaders +0ms
  kafka-node:KafkaClient found leaders for all +0ms
  kafka-node:KafkaClient grouped requests by 1 brokers ["0"] +0ms
  kafka-node:KafkaClient has apiSupport broker is ready +0ms
  kafka-node:KafkaClient Using V2 of produce +0ms
  kafka-node:Client refresh metadata currentAttempt 4 +278ms
  kafka-node:KafkaClient kafka-node-client reconnecting to toniss-mbp-2.lan:9093 +323ms

All works as expected, producing message: 5413 fails with BrokerNotAvailableError at first as broker 1 went down. We catch the error and retry sending the message. BrokerNotAvailableError also triggers a metadata update from KafkaClient.sendRequest sendToBroker:

if (!broker || !broker.isConnected()) {
this.refreshBrokerMetadata();
callback(new errors.BrokerNotAvailableError('Broker not available (sendRequest)'));

Kafka-node receives new partition assignments and the retry will succeed as another (up) broker is now the leader for the partition.

The broken path 1 broker going down with Debug logging:

message: 5127
  kafka-node:KafkaClient compressing messages if needed +1s
  kafka-node:KafkaClient checking payload topic/partitions has leaders +0ms
  kafka-node:KafkaClient found leaders for all +0ms
  kafka-node:KafkaClient grouped requests by 1 brokers ["2"] +0ms
  kafka-node:KafkaClient has apiSupport broker is ready +0ms
  kafka-node:KafkaClient Using V2 of produce +0ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +304ms
  kafka-node:KafkaClient kafka-node-client reconnecting to toniss-mbp-2.lan:9092 +2ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to toniss-mbp-2.lan:9092 +1ms
message: 5128
  kafka-node:KafkaClient compressing messages if needed +207ms
  kafka-node:KafkaClient checking payload topic/partitions has leaders +1ms
  kafka-node:KafkaClient found leaders for all +0ms
  kafka-node:KafkaClient grouped requests by 1 brokers ["0"] +0ms
  kafka-node:KafkaClient missing apiSupport waiting until broker is ready... +0ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +800ms
  kafka-node:KafkaClient kafka-node-client reconnecting to toniss-mbp-2.lan:9092 +1ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to toniss-mbp-2.lan:9092 +1ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to toniss-mbp-2.lan:9092 +0ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to toniss-mbp-2.lan:9092 +1ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to toniss-mbp-2.lan:9092 +1ms
  kafka-node:KafkaClient kafka-node-client reconnecting to toniss-mbp-2.lan:9092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +0ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to toniss-mbp-2.lan:9092 +1ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to toniss-mbp-2.lan:9092 +1ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to toniss-mbp-2.lan:9092 +1ms
  kafka-node:KafkaClient kafka-node-client reconnecting to toniss-mbp-2.lan:9092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1ms
Received error: TimeoutError: Request timed out after 10000ms
  kafka-node:KafkaClient compressing messages if needed +184ms
  kafka-node:KafkaClient checking payload topic/partitions has leaders +0ms
  kafka-node:KafkaClient found leaders for all +0ms
  kafka-node:KafkaClient grouped requests by 1 brokers ["0"] +0ms
  kafka-node:KafkaClient missing apiSupport waiting until broker is ready... +0ms
  kafka-node:KafkaClient kafka-node-client reconnecting to toniss-mbp-2.lan:9092 +821ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +0ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to toniss-mbp-2.lan:9092 +1ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to toniss-mbp-2.lan:9092 +1ms
  kafka-node:KafkaClient kafka-node-client reconnecting to toniss-mbp-2.lan:9092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to toniss-mbp-2.lan:9092 +1ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to toniss-mbp-2.lan:9092 +1ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to toniss-mbp-2.lan:9092 +1ms
  kafka-node:KafkaClient kafka-node-client reconnecting to toniss-mbp-2.lan:9092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1ms
  kafka-node:KafkaClient kafka-node-client reconnecting to toniss-mbp-2.lan:9092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +0ms
  kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1s
  kafka-node:KafkaClient kafka-node-client reconnecting to toniss-mbp-2.lan:9092 +0ms
Received error: TimeoutError: Request timed out after 10000ms

Sending message: 5127 to broker 2 is successful, but before the next message is sent to broker 0 the retry function on broker 0 socket close is triggered, which creates a new BrokerWrapper that doesn't have apiSupport set:

socket.on('close', function (hadError) {
self.emit('close', this);
if (!hadError && self.closing) {
logger.debug(`clearing ${this.addr} callback queue without error`);
self.clearCallbackQueue(this);
} else {
self.clearCallbackQueue(
this,
this.error != null ? this.error : new errors.BrokerNotAvailableError('Broker not available')
);
}
retry(this);

So when it then tries to send a a message to broker 0 it triggers the waitUntilReady call in ensureBrokerReady in KafkaClient.sendRequest:
if (!broker.isReady()) {
logger.debug('missing apiSupport waiting until broker is ready...');
this.waitUntilReady(broker, callback);

This sees that apiSupport is not set and waits for it to be set, but as broker 0 is down, this will never happen and then the timeout is triggered here:
timeoutId = setTimeout(() => {
this.removeListener(readyEventName, onReady);
callback(new TimeoutError(`Request timed out after ${timeout}ms`));
}, timeout);

We again catch the TimeoutError, retry to produce the message, but it gets stuck in the same place (in waitUntilReady) and we get another timeout etc. This will continue indefinitely or until broker 0 comes back up.

The root cause of the issue is that metadata is not updated in the broken path, which would make the message producing on retry go to the new leader of the partition. It keeps trying to send it to the broker that is down.

Potential fix

I could fix it locally by simply also triggering a this.refreshBrokerMetadata(); before the this.waitUntilReady(broker, callback); call, as that way after retry the message would go to the new leader. But I'm not sure it is the correct way to fix it, not that familiar with the code base yet.

It also looks a bit odd that sendToBroker in KafkaClient.sendRequest also does a broker.isReady() check and just returns:

if (!broker.isReady()) {
callback(new Error('Broker is not ready (apiSuppport is not set)'));
return;
}

Whereas ensureBrokerReady waits for it to be ready. Maybe these should be unified?

Alternative solution would be to maybe trigger metadata refresh from the retry function that runs on socket close / end?

Environment

  • Node version: 8.11.3
  • Kafka-node version: 2.6.1
  • Kafka version: 1.0.1

For specific cases also provide

  • Number of Brokers: 3
  • Number partitions for topic:

Include Sample Code to reproduce behavior

Not easy for me to add sample code as we've written our own small abstraction on top of HighLevelProducer, but the producer just sends a message every 1s keyed to an auto incrementing number (so that it would go to different partitions). The test topic had 13 partitions.

PS! This could explain some of the other issues reported about producer not doing a failover to another broker. For example the last comments at the bottom of this issue:
#948

@poolik poolik changed the title KafkaClient.sendRequest stuck in ensureBrokerReady loop preventing broker failover KafkaClient.sendRequest stuck in ensureBrokerReady loop preventing broker failover due to missing metadata refresh Aug 2, 2018
@hxinhan
Copy link
Contributor

hxinhan commented Nov 10, 2019

@hyperlink Is the issue solved by #1345?

@hyperlink
Copy link
Collaborator

Yup, looks like it @HansonHH. 👍

@JsonMa
Copy link

JsonMa commented Nov 19, 2019

@hyperlink when does this bug fix version release?

@hyperlink
Copy link
Collaborator

hyperlink commented Nov 19, 2019 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants