Skip to content

Commit

Permalink
Fix blocked ConsumerGroup fetch loop when fetch request yields an error
Browse files Browse the repository at this point in the history
closes #1179
  • Loading branch information
hyperlink committed Feb 22, 2019
1 parent 7c857d1 commit 8c0c22f
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 1 deletion.
7 changes: 6 additions & 1 deletion lib/consumerGroup.js
Expand Up @@ -815,7 +815,12 @@ ConsumerGroup.prototype.fetch = function () {
this.topicPayloads,
this.options.fetchMaxWaitMs,
this.options.fetchMinBytes,
this.options.maxTickMessages
this.options.maxTickMessages,
error => {
if (error) {
this._resetFetchState();
}
}
);
};

Expand Down
69 changes: 69 additions & 0 deletions test/test.consumerGroup.js
Expand Up @@ -12,6 +12,7 @@ const uuid = require('uuid');
const async = require('async');
const BrokerWrapper = require('../lib/wrapper/BrokerWrapper');
const FakeSocket = require('./mocks/mockSocket');
const { BrokerNotAvailableError } = require('../lib/errors');

describe('ConsumerGroup', function () {
describe('#constructor', function () {
Expand Down Expand Up @@ -1095,6 +1096,74 @@ describe('ConsumerGroup', function () {
});
});

describe('#fetch', function () {
let consumerGroup;

afterEach(function (done) {
consumerGroup.close(done);
});

it('should reset fetch pending state if fetch request fails', function () {
const topic = uuid.v4();
consumerGroup = new ConsumerGroup(
{
connectOnReady: false,
groupId: uuid.v4(),
autoCommit: false
},
[topic]
);

consumerGroup.ready = true;
consumerGroup.paused = false;
consumerGroup._isFetchPending = false;

const clientMock = sinon.mock(consumerGroup.client);

clientMock
.expects('sendFetchRequest')
.once()
.yields(new BrokerNotAvailableError('Test Error'));

const cgMock = sinon.mock(consumerGroup);
cgMock.expects('_resetFetchState').once();

consumerGroup.fetch();
clientMock.verify();
cgMock.verify();
});

it('should not reset fetch pending state if fetch request was successful', function () {
const topic = uuid.v4();
consumerGroup = new ConsumerGroup(
{
connectOnReady: false,
groupId: uuid.v4(),
autoCommit: false
},
[topic]
);

consumerGroup.ready = true;
consumerGroup.paused = false;
consumerGroup._isFetchPending = false;

const clientMock = sinon.mock(consumerGroup.client);

clientMock
.expects('sendFetchRequest')
.once()
.yields(null);

const cgMock = sinon.mock(consumerGroup);
cgMock.expects('_resetFetchState').never();

consumerGroup.fetch();
clientMock.verify();
cgMock.verify();
});
});

describe('#removeTopics', function () {
let topic, newTopic, testMessage, consumerGroup;

Expand Down

0 comments on commit 8c0c22f

Please sign in to comment.