Skip to content

Commit

Permalink
feat: [BREAKING_CHANGE] refactor Messenger (#141)
Browse files Browse the repository at this point in the history
- broadcast is changed, it will send to all process
- send is changed, it won't emit message

Closes #28
  • Loading branch information
popomore authored and dead-horse committed Nov 3, 2016
1 parent 5900c1b commit a65f472
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 82 deletions.
87 changes: 44 additions & 43 deletions lib/core/messenger.js
Expand Up @@ -6,10 +6,13 @@ const sendmessage = require('sendmessage');
const EventEmitter = require('events');

class Messenger extends EventEmitter {

constructor() {
super();
this.pid = String(process.pid);
// app/agent 对方的进程,app.messenger.opids 就是 agent 的 pid
// pids of agent or app maneged by master
// - retrieve app worker pids when it's an agent worker
// - retrieve agent worker pids when it's an app worker
this.opids = [];
this.on('egg-pids', pids => {
this.opids = pids;
Expand All @@ -19,30 +22,23 @@ class Messenger extends EventEmitter {
}

/**
* 发送 message,广播
* @param {String} action 消息动作唯一标识
* @param {Object} data 广播的数据。
* Send message to all agent and app
* @param {String} action - message key
* @param {Object} data - message value
* @return {Messenger} this
*/
broadcast(action, data) {
debug('[%s] broadcast %s with %j', this.pid, action, data);
sendmessage(process, {
action,
data,
});
this.emit(action, data);
this.sendToApp(action, data);
this.sendToAgent(action, data);
return this;
}

send(action, data) {
return this.broadcast(action, data);
}

/**
* 发送给指定的进程
* @param {String} pid 接收者进程 id
* @param {String} action 消息动作唯一标识
* @param {data} data 发送的消息数据。
* send message to the specified process
* @param {String} pid - the process id of the receiver
* @param {String} action - message key
* @param {Object} data - message value
* @return {Messenger} this
*/
sendTo(pid, action, data) {
Expand All @@ -57,11 +53,11 @@ class Messenger extends EventEmitter {
}

/**
* 随机找一个进程发送
* - 如果在 app,直接发给 agent
* - 如果在 agent,会随机挑选一个 app 进程发送
* @param {String} action 消息动作唯一标识
* @param {data} data 发送的消息数据。
* send message to one app worker by random
* - if it's running in agent, it will send to one of app workers
* - if it's running in app, it will send to agent
* @param {String} action - message key
* @param {Object} data - message value
* @return {Messenger} this
*/
sendRandom(action, data) {
Expand All @@ -72,46 +68,44 @@ class Messenger extends EventEmitter {
}

/**
* 发送消息给所有的 app 进程(agent 和 app 上都可以调用)
* @param {String} action 消息动作唯一标识
* @param {data} data 发送的消息数据。
* send message to app
* @param {String} action - message key
* @param {Object} data - message value
* @return {Messenger} this
*/
sendToApp(action, data) {
debug('[%s] send %s with %j to all app', this.pid, action, data);
sendmessage(process, {
action,
data,
to: 'app',
});
this.send(action, data, 'app');
return this;
}

/**
* 发送消息给所有的 agent 进程(agent 和 app 上都可以调用)
* 在 worker 上调用时和 `send` 方法不同的是,不会同时在调用方触发该事件
* @param {String} action 消息动作唯一标识
* @param {data} data 发送的消息数据。
* send message to agent
* @param {String} action - message key
* @param {Object} data - message value
* @return {Messenger} this
*/
sendToAgent(action, data) {
debug('[%s] send %s with %j to all agent', this.pid, action, data);
this.send(action, data, 'agent');
return this;
}

/**
* @param {String} action - message key
* @param {Object} data - message value
* @param {String} to - let master know how to send message
* @return {Messenger} this
*/
send(action, data, to) {
sendmessage(process, {
action,
data,
to: 'agent',
to,
});
return this;
}

/**
* 处理 message 事件
* @method Messenger#on
* @param {Object} message 只处理符合格式的 message
* - {String} action
* - {Object} data
* @return {void}
*/
_onMessage(message) {
if (message && is.string(message.action)) {
debug('[%s] got message %s with %j, receiverPid: %s',
Expand All @@ -120,10 +114,17 @@ class Messenger extends EventEmitter {
}
}


close() {
process.removeListener('message', this._onMessage);
this.removeAllListeners();
}

/**
* @method Messenger#on
* @param {String} action - message key
* @param {Object} data - message value
*/
}

module.exports = Messenger;
Expand Down
14 changes: 14 additions & 0 deletions test/fixtures/apps/messenger-broadcast/agent.js
@@ -0,0 +1,14 @@
'use strict';

module.exports = function(agent) {
agent.messenger.on('egg-ready', () => {
agent.messenger.broadcast('broadcast', {
from: 'agent',
pid: process.pid,
});
});

agent.messenger.on('broadcast', info => {
console.log('agent %s receive message from %s pid %s', process.pid, info.from, info.pid);
});
};
14 changes: 14 additions & 0 deletions test/fixtures/apps/messenger-broadcast/app.js
@@ -0,0 +1,14 @@
'use strict';

module.exports = function(app) {
app.messenger.on('egg-ready', () => {
app.messenger.broadcast('broadcast', {
from: 'app',
pid: process.pid,
});
});

app.messenger.on('broadcast', info => {
console.log('app %s receive message from %s pid %s', process.pid, info.from, info.pid);
});
};
3 changes: 3 additions & 0 deletions test/fixtures/apps/messenger-broadcast/package.json
@@ -0,0 +1,3 @@
{
"name": "messenger-broadcast"
}
2 changes: 1 addition & 1 deletion test/lib/agent.test.js
Expand Up @@ -64,8 +64,8 @@ describe('test/lib/agent.test.js', () => {
const baseDir = utils.getFilepath('apps/agent-throw');
let app;
before(() => {
mm(process.env, 'EGG_LOG', 'none');
app = utils.cluster('apps/agent-throw');
app.debug();
return app.ready();
});
after(() => app.close());
Expand Down
2 changes: 1 addition & 1 deletion test/lib/core/agent_worker_client.test.js
Expand Up @@ -255,7 +255,7 @@ describe('test/lib/core/agent_worker_client.test.js', () => {
describe('agent sync callback', () => {
let app;
before(() => {
app = utils.app('apps/agent-app-sync');
app = utils.cluster('apps/agent-app-sync');
return app.ready();
});
after(() => app.close());
Expand Down
68 changes: 31 additions & 37 deletions test/lib/core/messenger.test.js
@@ -1,6 +1,7 @@
'use strict';

const mm = require('egg-mock');
const sleep = require('ko-sleep');
const utils = require('../../utils');
const Messenger = require('../../../lib/core/messenger');

Expand All @@ -13,35 +14,6 @@ describe('test/lib/core/messenger.test.js', () => {

afterEach(mm.restore);

describe('send()', () => {
it('should send demo message', done => {
// mock not childprocess
mm(process, 'send', null);
messenger.send('messenger-test-demo', { foo: 'haha' });
messenger.once('messenger-test-demo', data => {
data.should.eql({ foo: 'haha' });
done();
});
});

it('should mock process.send exists', done => {
mm(process, 'connected', true);
mm(process, 'send', msg => {
msg.should.eql({
action: 'message-test-send-demo',
data: {
foo: 'ok',
},
});
done();
});

messenger.send('message-test-send-demo', {
foo: 'ok',
});
});
});

describe('on(action, data)', () => {
it('should listen an action event', done => {
messenger.on('messenger-test-on-event', data => {
Expand Down Expand Up @@ -84,12 +56,12 @@ describe('test/lib/core/messenger.test.js', () => {

describe('cluster messenger', () => {
let app;
before(done => {
before(() => {
app = utils.cluster('apps/messenger');
app.coverage(false);
// 等 agent 接受消息
app.ready(() => setTimeout(done, 1000));
return app.ready();
});
before(() => sleep(1000));
after(() => app.close());

it('app should accept agent message', () => {
Expand All @@ -98,9 +70,6 @@ describe('test/lib/core/messenger.test.js', () => {
it('app should accept agent assgin pid message', () => {
app.expect('stdout', /\[app] agent-to-app agent msg \d+/);
});
it('app should accept itself message', () => {
app.expect('stdout', /\[app] app-to-agent app msg/);
});
it('agent should accept app message', () => {
app.expect('stdout', /\[agent] app-to-agent app msg/);
});
Expand All @@ -112,11 +81,37 @@ describe('test/lib/core/messenger.test.js', () => {
});
});

describe('broadcast()', () => {

let app;
before(() => {
mm.env('default');
app = utils.cluster('apps/messenger-broadcast', { workers: 2 });
app.coverage(false);
return app.ready();
});
before(() => sleep(1000));
after(() => app.close());

it('should broadcast each other', () => {
// app 26496 receive message from app pid 26495
// app 26496 receive message from app pid 26496
// app 26495 receive message from app pid 26495
// app 26495 receive message from app pid 26496
// app 26495 receive message from agent pid 26494
// app 26496 receive message from agent pid 26494
// agent 26494 receive message from app pid 26495
// agent 26494 receive message from app pid 26496
// agent 26494 receive message from agent pid 26494
const m = app.stdout.match(/(app|agent) \d+ receive message from (app|agent) pid \d+/g);
m.length.should.eql(9);
});
});

describe('sendRandom', () => {
let app;
before(() => {
mm.env('default');
mm.consoleLevel('NONE');
app = utils.cluster('apps/messenger-random', { workers: 4 });
app.coverage(false);
return app.ready();
Expand All @@ -141,7 +136,6 @@ describe('test/lib/core/messenger.test.js', () => {
let app;
before(() => {
mm.env('default');
mm.consoleLevel('NONE');
app = utils.cluster('apps/messenger-app-agent', { workers: 2 });
app.coverage(false);
return app.ready();
Expand Down

0 comments on commit a65f472

Please sign in to comment.