Skip to content

Commit

Permalink
Fix ready event emitting for producers (#1349)
Browse files Browse the repository at this point in the history
Make sure the ready event is emitted asynchronously so that it can be
caught by listeners.

Signed-off-by: Raymond Feng <enjoyjava@gmail.com>
  • Loading branch information
raymondfeng authored and hyperlink committed Nov 4, 2019
1 parent 6d58279 commit fcc8aef
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 1 deletion.
8 changes: 7 additions & 1 deletion lib/baseProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,13 @@ BaseProducer.prototype.connect = function () {
// emiter...
var self = this;
this.ready = this.client.ready;
if (this.ready) self.emit('ready');
if (this.ready) {
// Emit the ready event in next tick to give consumers a chance to set up
// a `ready` listener
setImmediate(function () {
self.emit('ready');
});
}
this.client.on('ready', function () {
if (!self.ready) {
self.ready = true;
Expand Down
27 changes: 27 additions & 0 deletions test/test.baseProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,33 @@ const async = require('async');
const should = require('should');

describe('BaseProducer', function () {
describe('ready event', function () {
const KAFKA_HOST = 'localhost:9092';
let client;
before(function () {
client = new KafkaClient({
kafkaHost: KAFKA_HOST
});
});

it('can listen on the ready event before the client is connected', function (done) {
const producer = new BaseProducer(client, {}, BaseProducer.PARTITIONER_TYPES.default);
producer.once('ready', function () {
should(producer.ready).be.true;
done();
});
});

it('can listen on the ready event after the client is connected', function (done) {
should(client.ready).be.true;
const producer = new BaseProducer(client, {}, BaseProducer.PARTITIONER_TYPES.default);
producer.once('ready', function () {
should(producer.ready).be.true;
done();
});
});
});

describe('encoding and decoding key attribute', function () {
const KAFKA_HOST = 'localhost:9092';
let consumerGroup, topic, producer;
Expand Down

0 comments on commit fcc8aef

Please sign in to comment.