Skip to content

Commit

Permalink
Merge pull request #4 from Yolean/disconnect-on-process-exit
Browse files Browse the repository at this point in the history
Disconnect on process exit
  • Loading branch information
atamon committed Dec 18, 2017
2 parents 666a1c2 + f34e55d commit 66f3b8a
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 12 deletions.
2 changes: 1 addition & 1 deletion build-contracts/basics/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ describe('kafka-cache build-contract basics', function () {
expect(value).to.deep.equal({ json: 'whadup' });
done();
});
}, 1000);
}, 3000);
});
});
});
Expand Down
19 changes: 9 additions & 10 deletions build-contracts/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,15 @@ services:
- kafka
environment:
TOPIC_NAME: build-contracts.basics.001
# TODO I think this test keeps hanging due to child processes from node-rdkafka
# so to get this to work we probably need to start cleaning stuff up.
# test-basics:
# build:
# context: ../
# dockerfile: build-contracts/basics/Dockerfile
# labels:
# - com.yolean.build-contract
# links:
# - kafka
test-basics:
build:
context: ../
dockerfile: build-contracts/basics/Dockerfile
labels:
- com.yolean.build-contract
links:
- kafka
- zookeeper
unit-tests:
build:
context: ../
Expand Down
9 changes: 8 additions & 1 deletion lib/kafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ function createProducer(options) {
producer.on('ready', () => resolve(producer));
producer.on('error', reject);

process.on('exit', () => producer.disconnect());

producer.connect();
});
}
Expand All @@ -60,7 +62,8 @@ function checkTopicExists(topic, options, callback) {
consumer.connect({}, err => {
if (err) {
log.error(err);
log.error('Failed to check for topic existance!');
log.error('Failed to check for topic existence!');
consumer.disconnect();
return callback(err);
}

Expand All @@ -71,6 +74,8 @@ function checkTopicExists(topic, options, callback) {
consumer.on('event.error', log.error.bind(null, 'checkTopicExists.event.error'));
consumer.on('error', log.error.bind(null, 'checkTopicExists.error'));

process.on('exit', () => consumer.disconnect());

consumer.on('ready', () => {
log.debug({ topic }, 'checkTopicExists consumer ready');
async.retry(
Expand Down Expand Up @@ -129,6 +134,8 @@ function stream(topic, consumerId, options, offset, callback) {
consumer.on('data', callback);
});

process.on('exit', () => consumer.disconnect());

return {
endStream: () => {
log.info({ topic, consumerId }, 'Ending stream');
Expand Down

0 comments on commit 66f3b8a

Please sign in to comment.