diff --git a/build-contracts/basics/index.js b/build-contracts/basics/index.js index adce0c1..067e8d1 100644 --- a/build-contracts/basics/index.js +++ b/build-contracts/basics/index.js @@ -38,7 +38,7 @@ describe('kafka-cache build-contract basics', function () { expect(value).to.deep.equal({ json: 'whadup' }); done(); }); - }, 1000); + }, 3000); }); }); }); diff --git a/build-contracts/docker-compose.yml b/build-contracts/docker-compose.yml index 9ef980e..3b7315b 100644 --- a/build-contracts/docker-compose.yml +++ b/build-contracts/docker-compose.yml @@ -27,16 +27,15 @@ services: - kafka environment: TOPIC_NAME: build-contracts.basics.001 - # TODO I think this test keeps hanging due to child processes from node-rdkafka - # so to get this to work we probably need to start cleaning stuff up. - # test-basics: - # build: - # context: ../ - # dockerfile: build-contracts/basics/Dockerfile - # labels: - # - com.yolean.build-contract - # links: - # - kafka + test-basics: + build: + context: ../ + dockerfile: build-contracts/basics/Dockerfile + labels: + - com.yolean.build-contract + links: + - kafka + - zookeeper unit-tests: build: context: ../ diff --git a/lib/kafka.js b/lib/kafka.js index d776545..d03c4ab 100644 --- a/lib/kafka.js +++ b/lib/kafka.js @@ -45,6 +45,8 @@ function createProducer(options) { producer.on('ready', () => resolve(producer)); producer.on('error', reject); + process.on('exit', () => producer.disconnect()); + producer.connect(); }); } @@ -60,7 +62,8 @@ function checkTopicExists(topic, options, callback) { consumer.connect({}, err => { if (err) { log.error(err); - log.error('Failed to check for topic existance!'); + log.error('Failed to check for topic existence!'); + consumer.disconnect(); return callback(err); } @@ -71,6 +74,8 @@ function checkTopicExists(topic, options, callback) { consumer.on('event.error', log.error.bind(null, 'checkTopicExists.event.error')); consumer.on('error', log.error.bind(null, 'checkTopicExists.error')); + process.on('exit', () => consumer.disconnect()); + consumer.on('ready', () => { log.debug({ topic }, 'checkTopicExists consumer ready'); async.retry( @@ -129,6 +134,8 @@ function stream(topic, consumerId, options, offset, callback) { consumer.on('data', callback); }); + process.on('exit', () => consumer.disconnect()); + return { endStream: () => { log.info({ topic, consumerId }, 'Ending stream');