Skip to content

Commit

Permalink
Add timeout waiting for broker to be ready fixes #750
Browse files Browse the repository at this point in the history
  • Loading branch information
hyperlink committed Aug 30, 2017
1 parent a580e80 commit 40e7ba1
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 6 deletions.
32 changes: 26 additions & 6 deletions lib/kafkaClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,8 @@ KafkaClient.prototype.createBroker = function (host, port, longpolling) {
if (error) {
logger.error('error initialize broker after reconnect', error);
} else {
brokerWrapper.emit('ready');
const readyEventName = brokerWrapper.getReadyEventName();
self.emit(readyEventName);
}
self.emit('reconnect');
});
Expand All @@ -485,7 +486,8 @@ KafkaClient.prototype.createBroker = function (host, port, longpolling) {
if (error) {
logger.error('error initialize broker after connect', error);
} else {
brokerWrapper.emit('ready');
const readyEventName = brokerWrapper.getReadyEventName();
self.emit(readyEventName);
}
self.emit('connect');
});
Expand Down Expand Up @@ -609,6 +611,27 @@ function getSupportedForRequestType (broker, requestType) {
};
}

KafkaClient.prototype.waitUntilReady = function (broker, callback) {
let timeoutId = null;

function onReady () {
logger.debug('broker is now ready');
clearTimeout(timeoutId);
timeoutId = null;
callback(null);
}

const timeout = this.options.requestTimeout;
const readyEventName = broker.getReadyEventName();

timeoutId = setTimeout(() => {
this.removeListener(readyEventName, onReady);
callback(new TimeoutError(`Request timed out after ${timeout}ms`));
}, timeout);

this.once(readyEventName, onReady);
};

KafkaClient.prototype.sendRequest = function (request, callback) {
logger.debug('sending request');
const payloads = this.payloadsByLeader(request.data.payloads);
Expand Down Expand Up @@ -644,10 +667,7 @@ KafkaClient.prototype.sendRequest = function (request, callback) {
const broker = this.brokerForLeader(leader);
if (broker.apiSupport == null) {
logger.debug('missing apiSupport waiting until broker is ready...');
broker.once('ready', function () {
logger.debug('broker is now ready');
callback(null);
});
this.waitUntilReady(broker, callback);
} else {
logger.debug('has apiSupport broker is ready');
callback(null);
Expand Down
5 changes: 5 additions & 0 deletions lib/wrapper/BrokerWrapper.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ var BrokerWrapper = function (socket, noAckBatchOptions, idleConnectionMs) {

util.inherits(BrokerWrapper, EventEmitter);

BrokerWrapper.prototype.getReadyEventName = function () {
const lp = this.socket.longpolling ? '-longpolling' : '';
return `${this.socket.addr}${lp}-ready`;
};

BrokerWrapper.prototype.isConnected = function () {
return !this.socket.destroyed && !this.socket.closing && !this.socket.error;
};
Expand Down
53 changes: 53 additions & 0 deletions test/test.kafkaClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,59 @@ describe('Kafka Client', function () {
});
});

describe('#waitUntilReady', function () {
let sandbox, client, clock;

before(function () {
sandbox = sinon.sandbox.create();
clock = sandbox.useFakeTimers();
client = new Client({ kafkaHost: '127.0.0.1:9092', autoConnect: false, requestTimeout: 4000 });
});

afterEach(function () {
sandbox.restore();
});

it('should yield error timeout if broker is not ready by requestTimeout', function (done) {
const fakeBroker = new BrokerWrapper(new FakeSocket());
const readyKey = 'broker.host-ready';

sandbox.stub(fakeBroker, 'getReadyEventName').returns(readyKey);
sandbox.spy(client, 'removeListener');
sandbox.spy(client, 'once');

client.waitUntilReady(fakeBroker, function (error) {
error.should.not.be.empty;
error.should.be.an.instanceOf(TimeoutError);
sinon.assert.calledOnce(fakeBroker.getReadyEventName);
sinon.assert.calledWith(client.removeListener, readyKey, sinon.match.func);
sinon.assert.calledWith(client.once, readyKey, sinon.match.func);
done();
});

clock.tick(client.options.requestTimeout + 1);
});

it('should yield if broker is ready before requestTimeout', function (done) {
const fakeBroker = new BrokerWrapper(new FakeSocket());
const readyKey = 'broker.host-ready';

sandbox.stub(fakeBroker, 'getReadyEventName').returns(readyKey);
sandbox.spy(client, 'removeListener');
sandbox.spy(client, 'once');

client.waitUntilReady(fakeBroker, function (error) {
should(error).be.empty;
sinon.assert.calledOnce(fakeBroker.getReadyEventName);
sinon.assert.calledWith(client.once, readyKey, sinon.match.func);
done();
});

clock.tick(client.options.requestTimeout - 1);
client.emit(readyKey);
});
});

describe('#wrapTimeoutIfNeeded', function () {
let sandbox, wrapTimeoutIfNeeded, client, clock;

Expand Down

0 comments on commit 40e7ba1

Please sign in to comment.