Skip to content

Commit

Permalink
KafkaClient should not refresh metadata when broker socket is closed …
Browse files Browse the repository at this point in the history
…due to being idle
  • Loading branch information
hyperlink committed Mar 26, 2019
1 parent fe5a64e commit 0f37dc0
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 1 deletion.
4 changes: 3 additions & 1 deletion lib/kafkaClient.js
Expand Up @@ -762,6 +762,7 @@ KafkaClient.prototype.createBroker = function (host, port, longpolling) {
});
socket.on('close', function (hadError) {
self.emit('close', this);
logger.debug(`${self.clientId} socket closed ${this.addr} (hadError: ${hadError})`);
if (!hadError && self.closing) {
logger.debug(`clearing ${this.addr} callback queue without error`);
self.clearCallbackQueue(this);
Expand All @@ -774,7 +775,8 @@ KafkaClient.prototype.createBroker = function (host, port, longpolling) {
error = new errors.SaslAuthenticationError(null, message);
} else {
error = new errors.BrokerNotAvailableError('Broker not available (socket closed)');
if (!self.connecting) {
if (!self.connecting && !brokerWrapper.isIdle()) {
logger.debug(`${self.clientId} schedule refreshBrokerMetadata()`);
setImmediate(function () {
self.refreshBrokerMetadata();
});
Expand Down
13 changes: 13 additions & 0 deletions test/test.kafkaClient.js
Expand Up @@ -222,6 +222,19 @@ describe('Kafka Client', function () {
done();
});
});

it('should not schedule metadata refresh when broker is closed due to being idle', function () {
const client = new Client({ autoConnect: false });
const brokerWrapper = client.createBroker('fakehost', 9092, true);

sandbox.stub(brokerWrapper, 'isIdle').returns(true);
sandbox.stub(client, 'refreshBrokerMetadata');
sandbox.useFakeTimers();

mockSocket.emit('close', false);
sandbox.clock.tick();
sinon.assert.notCalled(client.refreshBrokerMetadata);
});
});

describe('#sendRequest', function () {
Expand Down

0 comments on commit 0f37dc0

Please sign in to comment.