Skip to content

Commit

Permalink
Fix ConsumerGroup receiving wrong offsets for compressed messages
Browse files Browse the repository at this point in the history
  • Loading branch information
hyperlink committed Apr 15, 2019
1 parent 4a5a767 commit f0118de
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 4 deletions.
29 changes: 26 additions & 3 deletions lib/protocol/protocol.js
Expand Up @@ -317,12 +317,14 @@ function _decodeFetchResponse (resp, cb, maxTickMessages, version) {
}
}

function decodeMessageSet (topic, partition, messageSet, enqueue, emit, highWaterOffset, topics) {
function decodeMessageSet (topic, partition, messageSet, enqueue, emit, highWaterOffset, topics, lastOffset) {
const messageSetSize = messageSet.length;
// TODO: this is broken logic. It overwrites previous partitions HWO.
// Need to refactor this on next major API bump
topics[topic].highWaterOffset = highWaterOffset;

let innerMessages = [];

while (messageSet.length > 0) {
var cur = 8 + 4 + 4 + 1 + 1 + 4 + 4;
var partial = false;
Expand Down Expand Up @@ -379,6 +381,7 @@ function decodeMessageSet (topic, partition, messageSet, enqueue, emit, highWate
const offset = vars.offset;
const value = vars.value;
const key = vars.key;
const magicByte = vars.magicByte;
var codec = getCodec(vars.attributes);
if (!codec) {
const message = {
Expand All @@ -394,10 +397,17 @@ function decodeMessageSet (topic, partition, messageSet, enqueue, emit, highWate
message.timestamp = new Date(vars.timestamp);
}

return enqueue((next) => {
if (lastOffset != null) {
// need to fix offset skipping enqueue till later
innerMessages.push(message);
return;
}

enqueue((next) => {
emit(null, message);
next(null);
});
return;
}
enqueue((next) => {
codec.decode(value, function (error, inlineMessageSet) {
Expand All @@ -408,7 +418,7 @@ function decodeMessageSet (topic, partition, messageSet, enqueue, emit, highWate
}
decodeMessageSet(topic, partition, inlineMessageSet, (cb) => {
cb(_.noop);
}, emit, highWaterOffset, topics, offset);
}, emit, highWaterOffset, topics, magicByte === 1 ? offset : null);

// Delay 1 tick as this isn't counted to max tick messages, give a breather
process.nextTick(() => next(null));
Expand All @@ -420,6 +430,19 @@ function decodeMessageSet (topic, partition, messageSet, enqueue, emit, highWate
if (partial) break;
messageSet = messageSet.slice(cur);
}

if (lastOffset != null && innerMessages.length) {
// contains inner messages, need to fix up offsets
let len = innerMessages.length - 1;
for (const message of innerMessages) {
const offset = lastOffset - len--;
message.offset = offset;
enqueue((next) => {
emit(null, message);
next(null);
});
}
}
}

function encodeMetadataRequest (clientId, correlationId, topics) {
Expand Down
31 changes: 31 additions & 0 deletions test/helpers/sendMessageEach.js
@@ -0,0 +1,31 @@
'use strict';

const KafkaClient = require('../../lib/kafkaClient');
const HighLevelProducer = require('../../lib/highLevelProducer');
const async = require('async');
const uuid = require('uuid');

function sendMessage (message, topic, done, attributes = 0) {
var client = new KafkaClient({ kafkaHost: '127.0.0.1:9092' });
var producer = new HighLevelProducer(client, { requireAcks: 1 });

client.on('connect', function () {
async.each(
message,
function (message, callback) {
producer.send([{ topic: topic, messages: message, key: uuid.v4(), attributes, timestamp: Date.now() }], callback);
},
function (error) {
if (error) {
done(error);
} else {
producer.close(function () {
done(null);
});
}
}
);
});
}

module.exports = sendMessage;
81 changes: 80 additions & 1 deletion test/test.consumerGroup.js
Expand Up @@ -4,10 +4,11 @@ const sinon = require('sinon');
const should = require('should');
const ConsumerGroup = require('../lib/consumerGroup');
const sendMessage = require('./helpers/sendMessage');
const sendMessageEach = require('./helpers/sendMessageEach');
const _ = require('lodash');
const proxyquire = require('proxyquire').noCallThru();
const EventEmitter = require('events').EventEmitter;

const createTopic = require('../docker/createTopic');
const uuid = require('uuid');
const async = require('async');
const BrokerWrapper = require('../lib/wrapper/BrokerWrapper');
Expand Down Expand Up @@ -92,6 +93,84 @@ describe('ConsumerGroup', function () {
});
});

describe('Compression', function () {
function verifyMessagesAndOffsets (topic, messages, done) {
const consumer = new ConsumerGroup(
{ kafkaHost: '127.0.0.1:9092', groupId: uuid.v4(), fromOffset: 'earliest' },
topic
);
let verifyOffset = 0;
const allMessages = messages.slice(0);
consumer.on('offsetOutOfRange', done);
consumer.on('message', function (message) {
message.offset.should.be.equal(verifyOffset++);
message.partition.should.be.equal(0);
message.value.should.be.equal(allMessages[message.offset]);
if (_.pull(messages, message.value).length === 0) {
setTimeout(function () {
consumer.close(done);
}, 50);
}
});
}

describe('with topic config enabled send batch', function () {
let topic, messages;
before(function () {
if (process.env.KAFKA_VERSION === '0.9') {
this.skip();
}

topic = uuid.v4();
messages = _.times(25, function () {
return new Array(100).join(Math.random().toString(36));
});
return createTopic(topic, 1, 1, 'compression.type=gzip').then(function () {
return new Promise(function (resolve, reject) {
sendMessage(messages, topic, function (error) {
if (error) {
return reject(error);
}
resolve();
});
});
});
});

it('should not throw offsetOutOfRange error', function (done) {
verifyMessagesAndOffsets(topic, messages, done);
});
});

describe('with topic config enabled send each', function () {
let topic, messages;
before(function () {
if (process.env.KAFKA_VERSION === '0.9') {
this.skip();
}

topic = uuid.v4();
messages = _.times(25, function () {
return new Array(100).join(Math.random().toString(36));
});
return createTopic(topic, 1, 1, 'compression.type=gzip').then(function () {
return new Promise(function (resolve, reject) {
sendMessageEach(messages, topic, function (error) {
if (error) {
return reject(error);
}
resolve();
});
});
});
});

it('should not throw offsetOutOfRange error', function (done) {
verifyMessagesAndOffsets(topic, messages, done);
});
});
});

describe('Topic partition change detection', function () {
let ConsumerGroup = null;
let consumerGroup = null;
Expand Down

0 comments on commit f0118de

Please sign in to comment.