Skip to content

Commit

Permalink
Refresh broker metadata when producer send fails fixes #798
Browse files Browse the repository at this point in the history
  • Loading branch information
hyperlink committed Nov 22, 2017
1 parent e6f64b7 commit a755217
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 0 deletions.
1 change: 1 addition & 0 deletions lib/kafkaClient.js
Expand Up @@ -731,6 +731,7 @@ KafkaClient.prototype.sendRequest = function (request, callback) {

const broker = this.brokerForLeader(leader);
if (!broker || !broker.isConnected()) {
this.refreshBrokerMetadata();
callback(new errors.BrokerNotAvailableError('Broker not available (sendRequest)'));
return;
}
Expand Down
60 changes: 60 additions & 0 deletions test/test.kafkaClient.js
Expand Up @@ -49,6 +49,66 @@ describe('Kafka Client', function () {
});
});

describe('#sendRequest', function () {
let sandbox;
before(function () {
sandbox = sinon.sandbox.create();
});

afterEach(function () {
sandbox.restore();
});

it('should call refreshBrokerMetadata if broker is not connected', function (done) {
const client = new Client({
autoConnect: false,
kafkaHost: 'localhost:9092'
});

const fakeSocket = new FakeSocket();
fakeSocket.destroyed = true;

const fakeBroker = new BrokerWrapper(fakeSocket);
fakeBroker.apiSupport = {};

const BrokerNotAvailableError = require('../lib/errors').BrokerNotAvailableError;

sandbox.stub(client, 'leaderByPartition').returns('1001');
sandbox.stub(client, 'brokerForLeader').returns(fakeBroker);
sandbox.stub(client, 'refreshBrokerMetadata');

const request = {
type: 'produce',
data: {
payloads: [
{
topic: 'test-topic',
partition: 0,
messages: [
{
magic: 0,
attributes: 0,
key: 'test-key',
value: 'test-message',
timestamp: 1511365962702
}
]
}
]
},
args: [1, 100],
requireAcks: 1
};

client.sendRequest(request, function (error) {
sinon.assert.calledOnce(client.refreshBrokerMetadata);
error.should.not.be.empty;
error.should.be.an.instanceOf(BrokerNotAvailableError);
done();
});
});
});

describe('Versions', function () {
let client;
before(function () {
Expand Down

0 comments on commit a755217

Please sign in to comment.