Skip to content

Commit

Permalink
Backport fix moleculerjs#620
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreMaz committed Dec 17, 2019
1 parent 938662d commit 411067c
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 9 deletions.
33 changes: 33 additions & 0 deletions dev/nats-wildcard.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
const ServiceBroker = require("../src/service-broker");

const broker = new ServiceBroker({
nodeID: "broker-1",
transporter: {
type: "NATS"
},
metrics: true,
logLevel: "debug",
disableBalancer: true
});

broker.createService({
name: "test",

events: {
"config.site.**.changed": (payload) => {broker.logger.info(payload)},
"config.mail.**.changed": () => {},
"config.accounts.**.changed": () => {},
}
});

async function start() {
await broker.start();

broker.repl();

setInterval(() => {
broker.emit('config.site.test.changed', {data: 123})
}, 1000)
}

start();
7 changes: 2 additions & 5 deletions src/transit.js
Original file line number Diff line number Diff line change
Expand Up @@ -864,11 +864,8 @@ class Transit {
if (!this.connected || !this.isReady) return Promise.resolve();

const info = this.broker.getLocalNodeInfo();

let p = Promise.resolve();
if (!nodeID)
p = this.tx.makeBalancedSubscriptions();


const p = !nodeID && this.broker.options.disableBalancer ? this.tx.makeBalancedSubscriptions() : Promise.resolve();
return p.then(() => this.publish(new Packet(P.PACKET_INFO, nodeID, {
services: info.services,
ipList: info.ipList,
Expand Down
2 changes: 1 addition & 1 deletion src/transporters/nats.js
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class NatsTransporter extends Transporter {
* @memberof NatsTransporter
*/
subscribeBalancedEvent(event, group) {
const topic = `${this.prefix}.${PACKET_EVENT}B.${group}.${event}`.replace(/\*\*/g, ">");
const topic = `${this.prefix}.${PACKET_EVENT}B.${group}.${event}`.replace(/\*\*.*$/g, ">");

this.subscriptions.push(this.client.subscribe(topic, { queue: group }, (msg) => this.incomingMessage(PACKET_EVENT, msg)));
}
Expand Down
24 changes: 23 additions & 1 deletion test/unit/transit.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -971,9 +971,12 @@ describe("Test Transit.sendNodeInfo", () => {
});
});

it("should call publish with correct params if has no nodeID", () => {
it("should call publish with correct params if has no nodeID & disableBalancer: true", () => {
// Set disableBalancer option
broker.options.disableBalancer = true
transit.publish.mockClear();
broker.getLocalNodeInfo.mockClear();
transit.tx.makeBalancedSubscriptions.mockClear();

return transit.sendNodeInfo().then(() => {
expect(transit.tx.makeBalancedSubscriptions).toHaveBeenCalledTimes(1);
Expand All @@ -987,6 +990,25 @@ describe("Test Transit.sendNodeInfo", () => {
});
});

it("should call publish with correct params if has no nodeID & disableBalancer: false", () => {
// Set disableBalancer option
broker.options.disableBalancer = false
transit.publish.mockClear();
broker.getLocalNodeInfo.mockClear();
transit.tx.makeBalancedSubscriptions.mockClear();

return transit.sendNodeInfo().then(() => {
expect(transit.tx.makeBalancedSubscriptions).toHaveBeenCalledTimes(0);
expect(transit.publish).toHaveBeenCalledTimes(1);
expect(broker.getLocalNodeInfo).toHaveBeenCalledTimes(1);
const packet = transit.publish.mock.calls[0][0];
expect(packet).toBeInstanceOf(P.Packet);
expect(packet.type).toBe(P.PACKET_INFO);
expect(packet.target).toBe();
expect(packet.payload.services).toEqual([]);
});
});

});

describe("Test Transit.sendPing", () => {
Expand Down
65 changes: 63 additions & 2 deletions test/unit/transporters/nats.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,68 @@ describe("Test NatsTransporter subscribe & publish", () => {
expect(transporter.subscriptions).toEqual([123]);
});

it("check subscribeBalancedEvent", () => {
describe("Test subscribeBalancedEvent", () => {

it("check subscription & unsubscription", () => {
let subCb;
transporter.client.subscribe = jest.fn((name, opts, cb) => {
subCb = cb;
return 125;
});
transporter.incomingMessage = jest.fn();

transporter.subscribeBalancedEvent("user.created", "mail");

expect(transporter.client.subscribe).toHaveBeenCalledTimes(1);
expect(transporter.client.subscribe).toHaveBeenCalledWith("MOL-TEST.EVENTB.mail.user.created", { queue: "mail" }, jasmine.any(Function));

// Test subscribe callback
subCb("{ sender: \"node1\" }");
expect(transporter.incomingMessage).toHaveBeenCalledTimes(1);
expect(transporter.incomingMessage).toHaveBeenCalledWith("EVENT", "{ sender: \"node1\" }");
expect(transporter.subscriptions).toEqual([125]);

// Test unsubscribeFromBalancedCommands
transporter.client.unsubscribe = jest.fn();
transporter.client.flush = jest.fn(cb => cb());

return transporter.unsubscribeFromBalancedCommands().catch(protectReject).then(() => {
expect(transporter.subscriptions).toEqual([]);
expect(transporter.client.unsubscribe).toHaveBeenCalledTimes(1);
expect(transporter.client.unsubscribe).toHaveBeenCalledWith(125);
expect(transporter.client.flush).toHaveBeenCalledTimes(1);
});
});

it("check with '*' wildchar topic", () => {
transporter.client.subscribe = jest.fn();

transporter.subscribeBalancedEvent("user.*", "users");

expect(transporter.client.subscribe).toHaveBeenCalledTimes(1);
expect(transporter.client.subscribe).toHaveBeenCalledWith("MOL-TEST.EVENTB.users.user.*", { queue: "users" }, jasmine.any(Function));
});

it("check with '**' wildchar topic", () => {
transporter.client.subscribe = jest.fn();

transporter.subscribeBalancedEvent("user.**", "users");

expect(transporter.client.subscribe).toHaveBeenCalledTimes(1);
expect(transporter.client.subscribe).toHaveBeenCalledWith("MOL-TEST.EVENTB.users.user.>", { queue: "users" }, jasmine.any(Function));
});

it("check with '**' wildchar (as not last) topic", () => {
transporter.client.subscribe = jest.fn();

transporter.subscribeBalancedEvent("user.**.changed", "users");

expect(transporter.client.subscribe).toHaveBeenCalledTimes(1);
expect(transporter.client.subscribe).toHaveBeenCalledWith("MOL-TEST.EVENTB.users.user.>", { queue: "users" }, jasmine.any(Function));
});
});

/*it("check subscribeBalancedEvent", () => {
let subCb;
transporter.client.subscribe = jest.fn((name, opts, cb) => {
subCb = cb;
Expand Down Expand Up @@ -203,7 +264,7 @@ describe("Test NatsTransporter subscribe & publish", () => {
expect(transporter.client.unsubscribe).toHaveBeenCalledWith(125);
expect(transporter.client.flush).toHaveBeenCalledTimes(1);
});
});
});*/

it("check publish with target", () => {
transporter.serialize = jest.fn(() => Buffer.from("json data"));
Expand Down

0 comments on commit 411067c

Please sign in to comment.