Skip to content

Commit

Permalink
feat: support dynamic create comsumer and producer
Browse files Browse the repository at this point in the history
  • Loading branch information
dead-horse committed May 3, 2018
1 parent 06de2d6 commit 097d576
Show file tree
Hide file tree
Showing 10 changed files with 274 additions and 116 deletions.
29 changes: 29 additions & 0 deletions README.md
Expand Up @@ -123,6 +123,35 @@ const msg = new Message('TP_NAME', // topic
const sendResult = yield ctx.ons.send(msg);
```

## Dynamic Create Consumer and Producer

You can dynamic create comsumer or producer by your self.

```js
// app.js

module.exports = app => {
app.beforeStart(function* () {
// dynamic create a consumer, and subscribe topics use the subscriber in the second argument
yield app.createConsumer({
consumerGroup: 'consumerGroup',
topics: [
'TEST_TOPIC',
],
}, {
TEST_TOPIC: require('./ops/TEST_TOPIC'), // the subscriber for TEST_TOPIC
});
// dynamic create a producer
yield app.createProducer({
producerGroup: config.producerGroup,
topics: [
'TEST_TOPIC',
],
});
});
};
```

## Questions & Suggestions

Please open an issue [here](https://github.com/eggjs/egg/issues).
Expand Down
101 changes: 4 additions & 97 deletions app.js
@@ -1,103 +1,10 @@
'use strict';

const fs = require('fs');
const path = require('path');
const assert = require('assert');
const Message = require('ali-ons').Message;
const Consumer = require('ali-ons').Consumer;
const Producer = require('ali-ons').Producer;
const ONS = require('./lib/ons');

module.exports = app => {
const logger = app.getLogger('onsLogger');
const { sub, pub } = app.config.ons;
const defaultOptions = app.config.ons.default;
const httpclient = app.httpclient;
const consumerMap = new Map();
const producerMap = new Map();
const topic2Producer = new Map();
let appReady = false;

app.ready(() => {
appReady = true;
app.ons = new ONS(app);
app.beforeStart(function* () {
yield app.ons.init();
});

function errorHandler(err) {
// 应用启动前避免错误输出到标准输出
if (appReady) {
logger.error(err);
} else {
logger.warn(err);
}
}

for (const options of sub) {
const consumer = new Consumer(Object.assign({
httpclient,
logger,
}, defaultOptions, options));
consumer.on('error', errorHandler);
const key = `${consumer.consumerGroup}-${consumer.clientId}`;
assert(!consumerMap.has(key), `[egg-ons] duplicate consumer, consumerGroup=${consumer.consumerGroup}, clientId=${consumer.clientId}`);
consumerMap.set(key, consumer);

app.beforeStart(function* () {
yield consumer.ready();
logger.info('[egg-ons] consumer: %s is ready, messageModel: %s', consumer.consumerGroup, consumer.messageModel);
});
app.beforeClose(function* () {
yield consumer.close();
logger.info('[egg-ons] consumer: %s is closed, messageModel: %s', consumer.consumerGroup, consumer.messageModel);
});

const topics = options.topics || [];
for (const topic of topics) {
const filepath = path.join(app.config.baseDir, 'app/ons', topic + '.js');
if (!fs.existsSync(filepath)) {
app.coreLogger.warn('[egg-ons] CANNOT find the subscription logic in file:`%s` for topic=%s', filepath, topic);
continue;
}
const Subscriber = require(filepath);
consumer.subscribe(topic, Subscriber.subExpression || '*', function* (msg) {
const ctx = app.createAnonymousContext();
const subscriber = new Subscriber(ctx);
yield subscriber.subscribe(msg);
});
}
}

for (const options of pub) {
const producer = new Producer(Object.assign({
httpclient: app.httpclient,
logger,
}, defaultOptions, options));
producer.on('error', errorHandler);
assert(!producerMap.has(producer.producerGroup), `[egg-ons] duplicate producer, producerGroup=${producer.producerGroup}`);
producerMap.set(producer.producerGroup, producer);

const topics = options.topics || [];
for (const topic of topics) {
topic2Producer.set(topic, producer);
}

app.beforeStart(function* () {
yield producer.ready();
logger.info('[egg-ons] producer: %s is ready', producer.producerGroup);
});
app.beforeClose(function* () {
yield producer.close();
logger.info('[egg-ons] producer: %s is closed', producer.producerGroup);
});
}

app.ons = {
consumerMap,
producerMap,
Message,
* send(msg) {
assert(msg && msg.topic, '[egg-ons] send(msg) msg.topic is required');
const producer = topic2Producer.get(msg.topic);
assert(producer, `[egg-ons] CANNOT find producer for topic=${msg.topic}`);
return yield producer.send(msg);
},
};
};
113 changes: 113 additions & 0 deletions lib/ons.js
@@ -0,0 +1,113 @@
'use strict';

const Message = require('ali-ons').Message;
const Consumer = require('ali-ons').Consumer;
const Producer = require('ali-ons').Producer;
const path = require('path');
const assert = require('assert');

module.exports = class ONS {
constructor(app) {
this.app = app;
this.logger = app.getLogger('onsLogger');
this.config = app.config.ons;
app.ready(() => {
this.appReady = true;
});

this.consumerMap = new Map();
this.producerMap = new Map();
this.topic2Producer = new Map();
this.Message = Message;
}

* init() {
const { app } = this;
const { sub, pub } = this.config;
const directory = path.join(app.config.baseDir, 'app/ons');
app.loader.loadToApp(directory, 'ONSSubscribers', {
caseStyle(filepath) {
return filepath.substring(0, filepath.lastIndexOf('.')).split('/');
},
});
for (const options of sub) {
yield this.createConsumer(options, app.ONSSubscribers);
}

for (const options of pub) {
yield this.createProducer(options);
}
}

_errorHandler(err) {
// avoid output error message into stderr before app get ready
this.appReady ? this.logger.error(err) : this.logger.warn(err);
}

* createConsumer(options, Subscribers) {
const { app, consumerMap, logger, config } = this;
const consumer = new Consumer(Object.assign({
httpclient: app.httpclient,
logger,
}, config.default, options));
consumer.on('error', err => this._errorHandler(err));
const key = `${consumer.consumerGroup}-${consumer.clientId}`;
assert(!consumerMap.has(key), `[egg-ons] duplicate consumer, consumerGroup=${consumer.consumerGroup}, clientId=${consumer.clientId}`);
consumerMap.set(key, consumer);

yield consumer.ready();
logger.info('[egg-ons] consumer: %s is ready, messageModel: %s', consumer.consumerGroup, consumer.messageModel);

app.beforeClose(function* () {
yield consumer.close();
logger.info('[egg-ons] consumer: %s is closed, messageModel: %s', consumer.consumerGroup, consumer.messageModel);
});

const topics = options.topics || [];
for (const topic of topics) {
const Subscriber = Subscribers[topic];
if (!Subscriber) {
app.coreLogger.warn('[egg-ons] CANNOT find the subscription logic for topic=%s', topic);
continue;
}

consumer.subscribe(topic, Subscriber.subExpression || '*', function* (msg) {
const ctx = app.createAnonymousContext();
const subscriber = new Subscriber(ctx);
yield subscriber.subscribe(msg);
});
}
}

* createProducer(options) {
const { app, producerMap, logger, config, topic2Producer } = this;
const producer = new Producer(Object.assign({
httpclient: app.httpclient,
logger,
}, config.default, options));
producer.on('error', err => this._errorHandler(err));
assert(!producerMap.has(producer.producerGroup), `[egg-ons] duplicate producer, producerGroup=${producer.producerGroup}`);
producerMap.set(producer.producerGroup, producer);

const topics = options.topics || [];
for (const topic of topics) {
topic2Producer.set(topic, producer);
}

yield producer.ready();
logger.info('[egg-ons] producer: %s is ready', producer.producerGroup);

app.beforeClose(function* () {
yield producer.close();
logger.info('[egg-ons] producer: %s is closed', producer.producerGroup);
});
}

* send(msg) {
assert(msg && msg.topic, '[egg-ons] send(msg) msg.topic is required');
const producer = this.topic2Producer.get(msg.topic);
assert(producer, `[egg-ons] CANNOT find producer for topic=${msg.topic}`);
return yield producer.send(msg);
}
};

3 changes: 2 additions & 1 deletion package.json
Expand Up @@ -37,8 +37,9 @@
},
"files": [
"app",
"config",
"app.js",
"config"
"lib"
],
"repository": {
"type": "git",
Expand Down
11 changes: 11 additions & 0 deletions test/fixtures/apps/ons-test-dynamic/app.js
@@ -0,0 +1,11 @@
'use strict';

module.exports = app => {
app.onsMsgs = new Map();
app.beforeStart(function* () {
yield app.ons.createConsumer(app.config.ons.dynamicSub, {
TEST_TOPIC: require('./app/ons/TEST_TOPIC'),
});
yield app.ons.createProducer(app.config.ons.dynamicPub);
});
};
14 changes: 14 additions & 0 deletions test/fixtures/apps/ons-test-dynamic/app/ons/TEST_TOPIC.js
@@ -0,0 +1,14 @@
'use strict';

class TestSubscriber {
constructor(ctx) {
this.ctx = ctx;
this.app = ctx.app;
}

* subscribe(msg) {
this.app.onsMsgs.set(msg.msgId, msg);
}
}

module.exports = TestSubscriber;
25 changes: 25 additions & 0 deletions test/fixtures/apps/ons-test-dynamic/app/router.js
@@ -0,0 +1,25 @@
'use strict';

const assert = require('assert');
const sleep = require('mz-modules/sleep');

module.exports = app => {
app.get('/', function* () {
this.body = 'hi, ' + app.plugins.ons.name;
});

app.get('/sendMessage', function* () {
const Message = this.ons.Message;
const msg = new Message('TEST_TOPIC', // topic
'TagA', // tag
'Hello ONS !!!' // body
);
const sendResult = yield this.ons.send(msg);
assert.equal(sendResult.sendStatus, 'SEND_OK');

yield sleep(1000);

assert(app.onsMsgs.has(sendResult.msgId));
this.body = 'ok';
});
};
25 changes: 25 additions & 0 deletions test/fixtures/apps/ons-test-dynamic/config/config.unittest.js
@@ -0,0 +1,25 @@
'use strict';

const config = require('../../../../config');

exports.keys = '123456';

exports.ons = {
default: {
accessKey: config.accessKey,
secretKey: config.secretKey,
onsAddr: config.onsAddr,
},
dynamicSub: {
consumerGroup: config.consumerGroup,
topics: [
'TEST_TOPIC',
],
},
dynamicPub: {
producerGroup: config.producerGroup,
topics: [
'TEST_TOPIC',
],
},
};
4 changes: 4 additions & 0 deletions test/fixtures/apps/ons-test-dynamic/package.json
@@ -0,0 +1,4 @@
{
"name": "ons-test-dynamic",
"version": "0.0.1"
}

0 comments on commit 097d576

Please sign in to comment.