Skip to content

Commit

Permalink
Consumer should throw an error if message exceeds fetchMaxBytes fixes… (
Browse files Browse the repository at this point in the history
#744)

* Consumer should throw an error if message exceeds fetchMaxBytes fixes #339
  • Loading branch information
hyperlink committed Aug 16, 2017
1 parent 0eb61c5 commit 04a248a
Show file tree
Hide file tree
Showing 3 changed files with 307 additions and 151 deletions.
15 changes: 15 additions & 0 deletions lib/errors/MessageSizeTooLargeError.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
var util = require('util');

var MessageSizeTooLarge = function (vars) {
Error.captureStackTrace(this, this);
if (typeof vars === 'object') {
this.message = `Found a message larger than the maximum fetch size of this consumer on topic ${vars.topic} partition ${vars.partition} at fetch offset ${vars.offset}. Increase the fetch size, or decrease the maximum message size the broker will allow.`;
} else {
this.message = vars;
}
};

util.inherits(MessageSizeTooLarge, Error);
MessageSizeTooLarge.prototype.name = 'MessageSizeTooLarge';

module.exports = MessageSizeTooLarge;
13 changes: 13 additions & 0 deletions lib/protocol/protocol.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ var ERROR_CODE = protocol.ERROR_CODE;
var GROUP_ERROR = protocol.GROUP_ERROR;
var PartitionMetadata = protocol.PartitionMetadata;
const API_KEY_TO_NAME = _.invert(REQUEST_TYPE);
const MessageSizeTooLarge = require('../errors/MessageSizeTooLargeError');

var API_VERSION = 0;
var REPLICA_ID = -1;
Expand Down Expand Up @@ -141,6 +142,7 @@ function _decodeFetchResponse (resp, cb, maxTickMessages) {
function decodeMessageSet (topic, partition, messageSet, cb, maxTickMessages, highWaterOffset) {
var set = [];
var messageCount = 0;
const messageSetSize = messageSet.length;
while (messageSet.length > 0) {
var cur = 8 + 4 + 4 + 1 + 1 + 4 + 4;
Binary.parse(messageSet)
Expand Down Expand Up @@ -177,6 +179,17 @@ function decodeMessageSet (topic, partition, messageSet, cb, maxTickMessages, hi
vars.value = null;
}

if (vars.attributes === 0 && vars.messageSize > messageSetSize) {
cb(
new MessageSizeTooLarge({
topic: topic,
offset: vars.offset,
partition: partition
})
);
return;
}

if (!vars.partial && vars.offset !== null) {
messageCount++;
set.push(vars.offset);
Expand Down
Loading

0 comments on commit 04a248a

Please sign in to comment.