Skip to content

Commit

Permalink
Fix missing support in ConsumerGroup for fromOffset of earliest and none
Browse files Browse the repository at this point in the history
  • Loading branch information
hyperlink committed Oct 13, 2016
1 parent 2f81843 commit f9f9f3e
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 28 deletions.
6 changes: 5 additions & 1 deletion README.md
Expand Up @@ -569,7 +569,11 @@ var options = {
// An array of partition assignment protocols ordered by preference.
// 'roundrobin' or 'range' string for built ins (see below to pass in custom assignment protocol)
protocol: ['roundrobin'],
fromOffset: 'latest', // for new groups read messages from the latest offsets (defaults to the earliest available offset)

// Offsets to use for new groups other options could be 'earliest' or 'none' (none will emit an error if no offsets were saved)
// equivalent to Java client's auto.offset.reset
fromOffset: 'latest', // default

migrateHLC: false, // for details please see Migration section below
migrateRolling: true
};
Expand Down
4 changes: 2 additions & 2 deletions example/consumerGroupMember.js
Expand Up @@ -5,8 +5,8 @@ var consumerOptions = {
host: '127.0.0.1:2181',
groupId: 'ExampleTestGroup',
sessionTimeout: 15000,
protocol: ['range'],
fromOffset: 'latest'
protocol: ['roundrobin'],
fromOffset: 'earliest' // equivalent of auto.offset.reset valid values are 'none', 'latest', 'earliest'
};

var topics = ['RebalanceTopic', 'RebalanceTest'];
Expand Down
43 changes: 30 additions & 13 deletions lib/consumerGroup.js
Expand Up @@ -13,7 +13,15 @@ const createTopicPartitionList = require('./utils').createTopicPartitionList;

const assert = require('assert');
const builtInProtocols = require('./assignment');
var LATEST_OFFSET = -1;

const LATEST_OFFSET = -1;
const EARLIEST_OFFSET = -2;
const ACCEPTED_FROM_OFFSET = {
latest: LATEST_OFFSET,
earliest: EARLIEST_OFFSET,
none: false
};

const DEFAULTS = {
groupId: 'kafka-node-group',
// Auto commit config
Expand All @@ -26,7 +34,7 @@ const DEFAULTS = {
fetchMinBytes: 1,
fetchMaxBytes: 1024 * 1024,
maxTickMessages: 1000,
fromOffset: false,
fromOffset: 'latest',
sessionTimeout: 30000,
retries: 10,
retryFactor: 1.8,
Expand All @@ -45,6 +53,10 @@ function ConsumerGroup (memberOptions, topics) {
memberOptions.ssl = {};
}

if (!(this.options.fromOffset in ACCEPTED_FROM_OFFSET)) {
throw new Error(`fromOffset ${this.options.fromOffset} should be either: ${Object.keys(ACCEPTED_FROM_OFFSET).join(', ')}`);
}

this.client = new Client(memberOptions.host, memberOptions.id, memberOptions.zk,
memberOptions.batch, memberOptions.ssl);

Expand Down Expand Up @@ -177,18 +189,18 @@ ConsumerGroup.prototype.handleJoinGroup = function (joinGroupResponse, callback)
callback(null, groupAssignment);
};

ConsumerGroup.prototype.saveLatestOffsets = function (topicPartitionList, callback) {
ConsumerGroup.prototype.saveDefaultOffsets = function (topicPartitionList, callback) {
var self = this;
const offsetPayload = _(topicPartitionList).cloneDeep().map(function (tp) {
tp.time = LATEST_OFFSET;
const offsetPayload = _(topicPartitionList).cloneDeep().map(tp => {
tp.time = ACCEPTED_FROM_OFFSET[this.options.fromOffset];
return tp;
});

self.getOffset().fetch(offsetPayload, function (error, result) {
if (error) {
return callback(error);
}
self.latestOffsets = _.mapValues(result, function (partitionOffsets) {
self.defaultOffsets = _.mapValues(result, function (partitionOffsets) {
return _.mapValues(partitionOffsets, _.first);
});
callback(null);
Expand All @@ -203,7 +215,7 @@ ConsumerGroup.prototype.handleSyncGroup = function (syncGroupResponse, callback)
debug('%s owns topics: ', self.client.clientId, syncGroupResponse.partitions);

const topicPartitionList = createTopicPartitionList(syncGroupResponse.partitions);
const useLatestOffsets = self.options.fromOffset === 'latest';
const useDefaultOffsets = self.options.fromOffset in ACCEPTED_FROM_OFFSET;

async.waterfall([
function (callback) {
Expand All @@ -218,6 +230,11 @@ ConsumerGroup.prototype.handleSyncGroup = function (syncGroupResponse, callback)

if (noOffset) {
debug('No saved offsets');

if (self.options.fromOffset === 'none') {
return callback(new Error(`${self.client.clientId} owns topics and partitions which contains no saved offsets for group '${self.options.groupId}'`));
}

async.parallel([
function (callback) {
if (self.migrator) {
Expand All @@ -226,16 +243,16 @@ ConsumerGroup.prototype.handleSyncGroup = function (syncGroupResponse, callback)
callback(null);
},
function (callback) {
if (useLatestOffsets) {
return self.saveLatestOffsets(topicPartitionList, callback);
if (useDefaultOffsets) {
return self.saveDefaultOffsets(topicPartitionList, callback);
}
callback(null);
}
], function (error) {
if (error) {
return callback(error);
}
debug('%s latestOffset Response: %j', self.client.clientId, self.latestOffsets);
debug('%s defaultOffset Response for %s: %j', self.client.clientId, self.options.fromOffset, self.defaultOffsets);
callback(null, offsets);
});
} else {
Expand All @@ -247,7 +264,7 @@ ConsumerGroup.prototype.handleSyncGroup = function (syncGroupResponse, callback)
self.topicPayloads = self.buildPayloads(topicPartitionList).map(function (p) {
var offset = offsets[p.topic][p.partition];
if (offset === -1) { // -1 means no offset was saved for this topic/partition combo
offset = useLatestOffsets ? self.getLatestOffset(p, 0) : 0;
offset = useDefaultOffsets ? self.getDefaultOffset(p, 0) : 0;
if (self.migrator) {
offset = self.migrator.getOffset(p, offset);
}
Expand All @@ -263,8 +280,8 @@ ConsumerGroup.prototype.handleSyncGroup = function (syncGroupResponse, callback)
}
};

ConsumerGroup.prototype.getLatestOffset = function (tp, defaultOffset) {
return _.get(this.latestOffsets, [tp.topic, tp.partition], defaultOffset);
ConsumerGroup.prototype.getDefaultOffset = function (tp, defaultOffset) {
return _.get(this.defaultOffsets, [tp.topic, tp.partition], defaultOffset);
};

ConsumerGroup.prototype.getOffset = function () {
Expand Down
75 changes: 63 additions & 12 deletions test/test.consumerGroup.js
Expand Up @@ -73,6 +73,29 @@ describe('ConsumerGroup', function () {
}, 'SampleTopic');
sinon.assert.calledWithExactly(fakeClient, 'myhost', 'myClientId', undefined, undefined, ssl);
});

it('should throw an error if using an invalid fromOffset', function () {
[true, false, '', 0, 1, 'blah'].forEach(offset => {
should.throws(() => {
// eslint-disable-next-line no-new
new ConsumerGroup({
fromOffset: offset
});
});
});
});

it('should not throw an error if using an valid fromOffset', function () {
['earliest', 'latest', 'none'].forEach(offset => {
should.doesNotThrow(() => {
// eslint-disable-next-line no-new
new ConsumerGroup({
fromOffset: offset,
connectOnReady: false
}, 'TestTopic');
});
});
});
});

describe('#sendHeartbeats', function () {
Expand Down Expand Up @@ -165,13 +188,13 @@ describe('ConsumerGroup', function () {
};

sandbox.stub(consumerGroup, 'fetchOffset').yields(null, fetchOffsetResponse);
sandbox.stub(consumerGroup, 'saveLatestOffsets').yields(null);
sandbox.stub(consumerGroup, 'saveDefaultOffsets').yields(null);

consumerGroup.handleSyncGroup(syncGroupResponse, function (error, ownsPartitions) {
ownsPartitions.should.be.true;
sinon.assert.calledWith(consumerGroup.fetchOffset, syncGroupResponse.partitions);

sinon.assert.notCalled(consumerGroup.saveLatestOffsets);
sinon.assert.notCalled(consumerGroup.saveDefaultOffsets);
sinon.assert.notCalled(consumerGroup.migrator.saveHighLevelConsumerOffsets);

const topicPayloads = _(consumerGroup.topicPayloads);
Expand All @@ -197,7 +220,7 @@ describe('ConsumerGroup', function () {
}
};

const latestOffsets = {
const defaultOffsets = {
TestTopic: {
0: 10,
2: 20,
Expand All @@ -222,16 +245,16 @@ describe('ConsumerGroup', function () {
}
};

consumerGroup.latestOffsets = latestOffsets;
consumerGroup.defaultOffsets = defaultOffsets;
consumerGroup.migrator.offsets = migrateOffsets;
sandbox.stub(consumerGroup, 'fetchOffset').yields(null, fetchOffsetResponse);
sandbox.stub(consumerGroup, 'saveLatestOffsets').yields(null);
sandbox.stub(consumerGroup, 'saveDefaultOffsets').yields(null);

consumerGroup.handleSyncGroup(syncGroupResponse, function (error, ownsPartitions) {
ownsPartitions.should.be.true;
sinon.assert.calledWith(consumerGroup.fetchOffset, syncGroupResponse.partitions);

sinon.assert.calledOnce(consumerGroup.saveLatestOffsets);
sinon.assert.calledOnce(consumerGroup.saveDefaultOffsets);
sinon.assert.calledOnce(consumerGroup.migrator.saveHighLevelConsumerOffsets);

const topicPayloads = _(consumerGroup.topicPayloads);
Expand All @@ -245,6 +268,34 @@ describe('ConsumerGroup', function () {
});
});

describe('options.fromOffset is "none"', function () {
it('should yield error when there is not saved offsets', function (done) {
consumerGroup.options.fromOffset = 'none';
const syncGroupResponse = {
partitions: {
TestTopic: [0, 2, 3, 4]
}
};

const fetchOffsetResponse = {
TestTopic: {
0: 10,
2: -1,
3: -1,
4: -1
}
};

sandbox.stub(consumerGroup, 'fetchOffset').yields(null, fetchOffsetResponse);
sandbox.stub(consumerGroup, 'saveDefaultOffsets').yields(null);
consumerGroup.handleSyncGroup(syncGroupResponse, function (error, ownsPartitions) {
should(ownsPartitions).be.undefined;
error.should.be.a.Error;
done();
});
});
});

describe('options.fromOffset is "latest"', function () {
it('should not fetch latestOffset if all offsets have saved previously', function (done) {
consumerGroup.options.fromOffset = 'latest';
Expand All @@ -265,12 +316,12 @@ describe('ConsumerGroup', function () {
};

sandbox.stub(consumerGroup, 'fetchOffset').yields(null, fetchOffsetResponse);
sandbox.stub(consumerGroup, 'saveLatestOffsets').yields(null);
sandbox.stub(consumerGroup, 'saveDefaultOffsets').yields(null);

consumerGroup.handleSyncGroup(syncGroupResponse, function (error, ownsPartitions) {
ownsPartitions.should.be.true;
sinon.assert.calledWith(consumerGroup.fetchOffset, syncGroupResponse.partitions);
sinon.assert.notCalled(consumerGroup.saveLatestOffsets);
sinon.assert.notCalled(consumerGroup.saveDefaultOffsets);

const topicPayloads = _(consumerGroup.topicPayloads);

Expand Down Expand Up @@ -300,22 +351,22 @@ describe('ConsumerGroup', function () {
}
};

const latestOffsets = {
const defaultOffsets = {
TestTopic: {
0: 10,
2: 3,
4: 5000
}
};

consumerGroup.latestOffsets = latestOffsets;
consumerGroup.defaultOffsets = defaultOffsets;
sandbox.stub(consumerGroup, 'fetchOffset').yields(null, fetchOffsetResponse);
sandbox.stub(consumerGroup, 'saveLatestOffsets').yields(null);
sandbox.stub(consumerGroup, 'saveDefaultOffsets').yields(null);

consumerGroup.handleSyncGroup(syncGroupResponse, function (error, ownsPartitions) {
ownsPartitions.should.be.true;
sinon.assert.calledWith(consumerGroup.fetchOffset, syncGroupResponse.partitions);
sinon.assert.calledOnce(consumerGroup.saveLatestOffsets);
sinon.assert.calledOnce(consumerGroup.saveDefaultOffsets);

const topicPayloads = _(consumerGroup.topicPayloads);

Expand Down

0 comments on commit f9f9f3e

Please sign in to comment.