Skip to content

Commit

Permalink
Add test to compress each message to verify decodes correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
hyperlink committed Apr 8, 2019
1 parent f2d0844 commit b9395ed
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 30 deletions.
15 changes: 15 additions & 0 deletions docker/docker-compose.2.1.yml
@@ -0,0 +1,15 @@
version: '2'
services:
kafka:
image: wurstmeister/kafka:2.12-2.1.1
environment:
KAFKA_LISTENERS: "PLAINTEXT://:9092,SSL://:9093,SASL_PLAINTEXT://:9094"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://${KAFKA_ADVERTISED_HOST_NAME}:9092,SSL://${KAFKA_ADVERTISED_HOST_NAME}:9093,SASL_PLAINTEXT://${KAFKA_ADVERTISED_HOST_NAME}:9094"
KAFKA_SASL_ENABLED_MECHANISMS: "PLAIN"
KAFKA_SECURITY_INTER_BROKER_PROTOCOL: "PLAINTEXT"
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
KAFKA_SUPER_USERS: "User:admin,User:broker"
KAFKA_OPTS: "-Djava.security.auth.login.config=/var/private/sasl/sasl.conf"
volumes:
- ./docker/start-kafka.sh:/usr/bin/start-kafka.sh

30 changes: 30 additions & 0 deletions test/helpers/sendMessageEach.js
@@ -0,0 +1,30 @@
'use strict';

const KafkaClient = require('../../lib/kafkaClient');
const HighLevelProducer = require('../../lib/highLevelProducer');
const async = require('async');

function sendMessage (message, topic, done) {
var client = new KafkaClient({ kafkaHost: '127.0.0.1:9092' });
var producer = new HighLevelProducer(client, { requireAcks: 1 });

client.on('connect', function () {
async.each(
message,
function (message, callback) {
producer.send([{ topic: topic, messages: message, attributes: 1 }], callback);
},
function (error) {
if (error) {
done(error);
} else {
producer.close(function () {
done(null);
});
}
}
);
});
}

module.exports = sendMessage;
112 changes: 82 additions & 30 deletions test/test.consumer.js
Expand Up @@ -13,6 +13,7 @@ var InvalidConfigError = require('../lib/errors/InvalidConfigError');

const createTopic = require('../docker/createTopic');
const sendMessage = require('./helpers/sendMessage');
const sendMessageEach = require('./helpers/sendMessageEach');
const _ = require('lodash');

var client, producer, offset;
Expand Down Expand Up @@ -135,42 +136,93 @@ describe('Consumer', function () {
});

describe('Compression', function () {
let topic, messages;

before(function () {
if (process.env.KAFKA_VERSION === '0.9') {
this.skip();
}

topic = uuid.v4();
messages = _.times(10, uuid.v4);
return createTopic(topic, 1, 1, 'compression.type=gzip').then(function () {
return new Promise(function (resolve, reject) {
sendMessage(messages, topic, function (error) {
if (error) {
return reject(error);
}
resolve();
describe('by topic', function () {
let topic, messages;
before(function () {
if (process.env.KAFKA_VERSION === '0.9') {
this.skip();
}

topic = uuid.v4();
messages = _.times(10, function () {
return new Array(100).join(Math.random().toString(36));
});
return createTopic(topic, 1, 1, 'compression.type=gzip').then(function () {
return new Promise(function (resolve, reject) {
sendMessage(messages, topic, function (error) {
if (error) {
return reject(error);
}
resolve();
});
});
});
});

it('should not throw offsetOutOfRange error', function (done) {
const client = new Client({ kafkaHost: '127.0.0.1:9092' });
const consumer = new Consumer(client, [
{
topic,
partition: 0
}
]);
let verifyOffset = 0;
consumer.on('offsetOutOfRange', done);
consumer.on('message', function (message) {
message.offset.should.be.equal(verifyOffset++);
message.partition.should.be.equal(0);
if (_.pull(messages, message.value).length === 0) {
setTimeout(function () {
consumer.close(done);
}, 50);
}
});
});
});

it('should not throw offsetOutOfRange error', function (done) {
const client = new Client({ kafkaHost: '127.0.0.1:9092' });
const consumer = new Consumer(client, [
{
topic,
partition: 0
}
]);
consumer.on('offsetOutOfRange', done);
consumer.on('message', function (message) {
if (_.pull(messages, message.value).length === 0) {
setTimeout(function () {
consumer.close(done);
}, 50);
describe('by message', function () {
let topic, messages;
before(function () {
if (process.env.KAFKA_VERSION === '0.9') {
this.skip();
}

topic = uuid.v4();
messages = _.times(10, function () {
return new Array(100).join(Math.random().toString(36));
});
return createTopic(topic, 1, 1).then(function () {
return new Promise(function (resolve, reject) {
sendMessageEach(messages, topic, function (error) {
if (error) {
return reject(error);
}
resolve();
});
});
});
});

it('should not throw offsetOutOfRange error', function (done) {
const client = new Client({ kafkaHost: '127.0.0.1:9092' });
const consumer = new Consumer(client, [
{
topic,
partition: 0
}
]);
let verifyOffset = 0;
consumer.on('offsetOutOfRange', done);
consumer.on('message', function (message) {
message.offset.should.be.equal(verifyOffset++);
message.partition.should.be.equal(0);
if (_.pull(messages, message.value).length === 0) {
setTimeout(function () {
consumer.close(done);
}, 50);
}
});
});
});
});
Expand Down

0 comments on commit b9395ed

Please sign in to comment.