Skip to content

Commit

Permalink
add kafka transporter tests
Browse files Browse the repository at this point in the history
  • Loading branch information
icebob committed Jan 24, 2018
1 parent 9b3f6b8 commit 2c7737d
Show file tree
Hide file tree
Showing 7 changed files with 351 additions and 54 deletions.
52 changes: 26 additions & 26 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@
"tonicExampleFilename": "docs/runkit/simple.js",
"typings": "./index.d.ts",
"jest": {
"coverageDirectory": "../coverage",
"coverageReporters": [
"text",
"lcov"
],
"coveragePathIgnorePatterns": [
"/node_modules/",
"/test/services/",
Expand Down
36 changes: 24 additions & 12 deletions src/transporters/kafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

"use strict";

const _ = require("lodash");
const Promise = require("bluebird");
const Transporter = require("./base");
const { MoleculerError } = require("../errors");

/**
* Lightweight transporter for Kafka
Expand All @@ -26,12 +26,17 @@ class KafkaTransporter extends Transporter {
* @memberOf KafkaTransporter
*/
constructor(opts) {
if (typeof opts == "string")
if (typeof opts == "string") {
opts = { kafka: {
host: opts.replace("kafka://", "")
} };
} else if (!opts) {
opts = {
kafka: {}
};
}

opts.kafka = Object.assign({
opts.kafka = _.defaultsDeep(opts.kafka, {
host: undefined,
client: {
zkOptions: undefined,
Expand All @@ -41,27 +46,20 @@ class KafkaTransporter extends Transporter {
producer: {},
customPartitioner: undefined,

/*consumer: {
groupId: undefined, // No nodeID at here
encoding: "buffer",
fromOffset: false,
consumer: {
},
consumerPayloads: undefined,
*/

publish: {
partition: 0,
attributes: 0
}
}, opts.kafka);
});

super(opts);

this.client = null;
this.producer = null;
this.consumer = null;

this.topics = [];
}

/**
Expand Down Expand Up @@ -106,6 +104,7 @@ class KafkaTransporter extends Transporter {

// Create Producer
this.producer = new Kafka.Producer(this.client, opts.producer, opts.customPartitioner);
/* istanbul ignore next */
this.producer.on("error", e => {
this.logger.error("Kafka Producer error", e.message);
this.logger.debug(e);
Expand All @@ -117,6 +116,15 @@ class KafkaTransporter extends Transporter {
this.onConnected().then(resolve);
});

/* istanbul ignore next */
this.client.on("error", e => {
this.logger.error("Kafka Client error", e.message);
this.logger.debug(e);

if (!this.connected)
reject(e);
});

});
}

Expand Down Expand Up @@ -153,6 +161,7 @@ class KafkaTransporter extends Transporter {
return new Promise((resolve, reject) => {

this.producer.createTopics(topics, true, (err, data) => {
/* istanbul ignore next */
if (err) {
this.logger.error("Unable to create topics!", topics, err);
return reject(err);
Expand All @@ -169,6 +178,7 @@ class KafkaTransporter extends Transporter {
const Kafka = require("kafka-node");
this.consumer = new Kafka.ConsumerGroup(consumerOptions, topics);

/* istanbul ignore next */
this.consumer.on("error", e => {
this.logger.error("Kafka Consumer error", e.message);
this.logger.debug(e);
Expand Down Expand Up @@ -230,6 +240,7 @@ class KafkaTransporter extends Transporter {
* @memberOf KafkaTransporter
*/
publish(packet) {
/* istanbul ignore next */
if (!this.producer) return Promise.resolve();

return new Promise((resolve, reject) => {
Expand All @@ -240,6 +251,7 @@ class KafkaTransporter extends Transporter {
partition: this.opts.kafka.publish.partition,
attributes: this.opts.kafka.publish.attributes,
}], (err, result) => {
/* istanbul ignore next */
if (err) {
this.logger.error("Publish error", err);
reject(err);
Expand Down
32 changes: 17 additions & 15 deletions test/unit/transit.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -217,23 +217,25 @@ describe("Test Transit.makeSubscriptions", () => {
const broker = new ServiceBroker({ nodeID: "node1", transporter: new FakeTransporter() });
const transit = broker.transit;

transit.subscribe = jest.fn(() => Promise.resolve());
transit.tx.makeSubscriptions = jest.fn(() => Promise.resolve());

it("should call subscribe with all topics", () => {
it("should call makeSubscriptions of transporter with all topics", () => {
return transit.makeSubscriptions().catch(protectReject).then(() => {
expect(transit.subscribe).toHaveBeenCalledTimes(12);
expect(transit.subscribe).toHaveBeenCalledWith("EVENT", "node1");
expect(transit.subscribe).toHaveBeenCalledWith("REQ", "node1");
expect(transit.subscribe).toHaveBeenCalledWith("RES", "node1");
expect(transit.subscribe).toHaveBeenCalledWith("DISCOVER");
expect(transit.subscribe).toHaveBeenCalledWith("DISCOVER", "node1");
expect(transit.subscribe).toHaveBeenCalledWith("INFO");
expect(transit.subscribe).toHaveBeenCalledWith("INFO", "node1");
expect(transit.subscribe).toHaveBeenCalledWith("DISCONNECT");
expect(transit.subscribe).toHaveBeenCalledWith("HEARTBEAT");
expect(transit.subscribe).toHaveBeenCalledWith("PING");
expect(transit.subscribe).toHaveBeenCalledWith("PING", "node1");
expect(transit.subscribe).toHaveBeenCalledWith("PONG", "node1");
expect(transit.tx.makeSubscriptions).toHaveBeenCalledTimes(1);
expect(transit.tx.makeSubscriptions).toHaveBeenCalledWith([
{"cmd": "EVENT", "nodeID": "node1" },
{"cmd": "REQ", "nodeID": "node1" },
{"cmd": "RES", "nodeID": "node1" },
{"cmd": "DISCOVER" },
{"cmd": "DISCOVER", "nodeID": "node1" },
{"cmd": "INFO" },
{"cmd": "INFO", "nodeID": "node1" },
{"cmd": "DISCONNECT" },
{"cmd": "HEARTBEAT" },
{"cmd": "PING"},
{"cmd": "PING", "nodeID": "node1" },
{"cmd": "PONG", "nodeID": "node1" }
]);
});
});

Expand Down
Loading

0 comments on commit 2c7737d

Please sign in to comment.