Skip to content

Commit

Permalink
Unpend tests
Browse files Browse the repository at this point in the history
  • Loading branch information
hyperlink committed Mar 17, 2017
1 parent 424925e commit 4e5acab
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 30 deletions.
16 changes: 16 additions & 0 deletions docker/createTopic.js
@@ -0,0 +1,16 @@
'use strict';

const execa = require('execa');
const assert = require('assert');

function createTopic (topicName, partitions, replicas) {
assert(topicName);
assert(partitions && partitions > 0);
assert(replicas && replicas > 0);
const topic = `${topicName}:${partitions}:${replicas}`;
const createResult = execa('docker-compose', ['exec', 'kafka', 'bash', '-c', `KAFKA_CREATE_TOPICS=${topic} KAFKA_PORT=9092 /usr/bin/create-topics.sh`]);
createResult.stdout.pipe(process.stdout);
return createResult;
}

module.exports = createTopic;
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -42,6 +42,7 @@
"eslint-plugin-dependencies": "^2.2.0",
"eslint-plugin-promise": "^3.4.0",
"eslint-plugin-standard": "^2.0.1",
"execa": "^0.6.1",
"istanbul": "^0.4.4",
"mocha": "^3.1.0",
"nsp": "^2.6.2",
Expand Down
47 changes: 33 additions & 14 deletions test/test.consumer.js
Expand Up @@ -508,14 +508,33 @@ describe('Consumer', function () {
});
});

xdescribe('#pauseTopics|resumeTopics', function () {
describe('#pauseTopics|resumeTopics', function () {
const now = Date.now();
const topic1 = `_test_topic_1_${now}`;
const topic2 = `_test_topic_2_${now}`;

const createTopic = require('../docker/createTopic');

before(function () {
return Promise.all([createTopic(topic1, 2, 1), createTopic(topic2, 2, 1)]).then(function () {
return new Promise(function (resolve, reject) {
producer.send([{ topic: topic2, messages: 'hello kafka' }], function (error) {
if (error) {
return reject(error);
}
resolve();
});
});
});
});

it('should pause or resume the topics', function (done) {
var client = new Client(host);
var client = createClient();
var topics = [
{topic: EXISTS_TOPIC_1, partition: 0},
{topic: EXISTS_TOPIC_1, partition: 1},
{topic: EXISTS_TOPIC_2, partition: 0},
{topic: EXISTS_TOPIC_2, partition: 1}
{topic: topic1, partition: 0},
{topic: topic1, partition: 1},
{topic: topic2, partition: 0},
{topic: topic2, partition: 1}
];
var consumer = new Consumer(client, topics, {});
consumer.on('error', function () {});
Expand All @@ -530,20 +549,20 @@ describe('Consumer', function () {
}

consumer.payloads.should.eql(topics);
consumer.pauseTopics([EXISTS_TOPIC_1, { topic: EXISTS_TOPIC_2, partition: 0 }]);
consumer.payloads.map(normalize).should.eql([{ topic: EXISTS_TOPIC_2, partition: 1 }]);
consumer.pauseTopics([topic1, { topic: topic2, partition: 0 }]);
consumer.payloads.map(normalize).should.eql([{ topic: topic2, partition: 1 }]);

consumer.resumeTopics([{topic: EXISTS_TOPIC_1, partition: 0}]);
consumer.resumeTopics([{topic: topic1, partition: 0}]);
consumer.payloads.map(normalize).sort(compare).should.eql([
{topic: EXISTS_TOPIC_1, partition: 0},
{topic: EXISTS_TOPIC_2, partition: 1}
{topic: topic1, partition: 0},
{topic: topic2, partition: 1}
]);
consumer.pausedPayloads.map(normalize).sort(compare).should.eql([
{topic: EXISTS_TOPIC_1, partition: 1},
{topic: EXISTS_TOPIC_2, partition: 0}
{topic: topic1, partition: 1},
{topic: topic2, partition: 0}
]);

consumer.resumeTopics([EXISTS_TOPIC_1, EXISTS_TOPIC_2]);
consumer.resumeTopics([topic1, topic2]);
consumer.payloads.sort(compare).should.eql(topics);
consumer.pausedPayloads.should.eql([]);
consumer.once('message', function () {
Expand Down
6 changes: 3 additions & 3 deletions test/test.partitioner.js
Expand Up @@ -57,10 +57,10 @@ describe('Partitioner', function () {
partitions[5].should.equal(2);
});

xit('should not modify different partitioners', function () {
it('should not modify different partitioners', function () {
var partitioner2 = new CyclicPartitioner();
var partitions1 = getPartitions(partitioner, [0, 1, 2], 1);
var partitions2 = getPartitions(partitioner2, [0, 1, 2], 1);
var partitions1 = getPartitions(partitioner, [0, 1, 2], 3);
var partitions2 = getPartitions(partitioner2, [0, 1, 2], 3);
partitions1.should.have.length(3);
partitions2.should.have.length(3);
partitions1[0].should.equal(0);
Expand Down
52 changes: 39 additions & 13 deletions test/test.producer.js
Expand Up @@ -5,6 +5,7 @@ var Producer = kafka.Producer;
var uuid = require('uuid');
var Client = kafka.Client;
var KeyedMessage = kafka.KeyedMessage;
const async = require('async');

var client, producer, noAckProducer, producerKeyed;

Expand Down Expand Up @@ -38,14 +39,14 @@ var host = process.env['KAFKA_TEST_HOST'] || '';
noAckProducer = new Producer(client, { requireAcks: 0 });
producerKeyed = new Producer(client, { partitionerType: Producer.PARTITIONER_TYPES.keyed });

producer.on('ready', function () {
producerKeyed.on('ready', function () {
producer.createTopics([EXISTS_TOPIC_3], true, function (err) {
if (err) return done(err);
done();
});
});
});
async.series([
function (callback) {
producer.once('ready', callback);
},
function (callback) {
producer.createTopics([EXISTS_TOPIC_3], true, callback);
}
], done);
});

after(function (done) {
Expand Down Expand Up @@ -184,11 +185,36 @@ var host = process.env['KAFKA_TEST_HOST'] || '';
});
});

xit('should send message to partition determined by keyed partitioner', function (done) {
producerKeyed.send([{ key: '12345', topic: EXISTS_TOPIC_3, messages: 'hello kafka' }], function (err, message) {
message.should.be.ok;
message[EXISTS_TOPIC_3].should.have.property('1', 0);
done(err);
describe('Keyed Partitioner', function () {
const createTopic = require('../docker/createTopic');
const topicWithTwoPartitions = uuid.v4();
let client, keyed;

before(function () {
var clientId = 'kafka-node-client-' + uuid.v4();
return createTopic(topicWithTwoPartitions, 2, 1).then(function () {
return new Promise(function (resolve, reject) {
client = new Client(host, clientId, undefined, undefined, sslOptions);
keyed = new Producer(client, { partitionerType: Producer.PARTITIONER_TYPES.keyed });
client.once('connect', function () {
client.refreshMetadata([topicWithTwoPartitions], function (error) {
if (error) {
return reject(error);
}
resolve();
});
});
});
});
});

it('should send message to partition determined by keyed partitioner', function (done) {
keyed.send([{ key: '12345', topic: topicWithTwoPartitions, messages: 'hello kafka' }], function (err, message) {
console.log(message);
message.should.be.ok;
message[topicWithTwoPartitions].should.have.property('1', 0);
done(err);
});
});
});
});
Expand Down

0 comments on commit 4e5acab

Please sign in to comment.