Skip to content

Commit 1d39610

Browse files
committed
feat(producer): class Producer init
1 parent 6a55696 commit 1d39610

File tree

2 files changed

+204
-0
lines changed

2 files changed

+204
-0
lines changed

lib/producer.js

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,112 @@
66
* @authro Zongmin Lei <leizongmin@gmail.com>
77
*/
88

9+
const EventEmitter = require('events').EventEmitter;
10+
const leiPromise = require('lei-promise');
11+
const utils = require('./utils');
12+
const debug = utils.debug('producer');
13+
14+
class Producer {
15+
16+
/**
17+
* Constructor
18+
*
19+
* @param {Object} options
20+
* - {String} queue
21+
* - {Number} maxAge
22+
* - {Object} redis
23+
* - {String} host
24+
* - {Number} port
25+
* - {Number} db
26+
* - {String} prefix
27+
*/
28+
constructor(options) {
29+
30+
if (!options.queue) throw new Error('missing queue name');
31+
32+
this._redis = utils.createRedisClient(options.redis);
33+
this.name = utils.generateClientId('producer');
34+
this._maxAge = options.maxAge || 0;
35+
this._redisPrefix = (options.redis && options.redis.prefix) || '';
36+
this.queue = options.queue;
37+
this._queueKey = this._redisPrefix + options.queue;
38+
39+
this._msgId = 0;
40+
this._msgCallback = new EventEmitter();
41+
const msgCallback = (id, err, result) => this._msgCallback.emit(id, err, result);
42+
43+
this._redis.subscribe(`${this._redisPrefix}CB:${this.name}`, (chennel, message) => {
44+
45+
// msg: success => id,s,data error => id,e,data
46+
const info = utils.splitString(message, 3);
47+
if (info.length !== 3) return;
48+
49+
const id = info[0];
50+
const type = info[1];
51+
52+
if (type === 's') {
53+
let data;
54+
try {
55+
data = JSON.parse(info[2]);
56+
} catch (err) {
57+
return msgCallback(id, new Error(`JSON.parse(data) fail: ${err.message}`));
58+
}
59+
msgCallback(id, null, {result: data});
60+
} else if (type === 'e') {
61+
const err = info[2];
62+
msgCallback(id, new Error(err));
63+
}
64+
65+
});
66+
67+
this.push = leiPromise.promisify(this.push.bind(this));
68+
this.exit = leiPromise.promisify(this.exit.bind(this));
69+
70+
}
71+
72+
/**
73+
* push to queue
74+
*
75+
* @param {Object} info
76+
* - {Object} data
77+
* - {Number} maxAge
78+
* @param {Function}
79+
*/
80+
push(info, callback) {
81+
82+
info.maxAge = ('maxAge' in info ? info.maxAge : this._maxAge);
83+
84+
const expire = utils.secondTimestamp() + info.maxAge;
85+
let data;
86+
try {
87+
data = JSON.stringify(info.data);
88+
} catch (err) {
89+
return callback(new Error(`JSON.stringify(data) fail: ${err.message}`));
90+
}
91+
92+
const id = utils.integerToBase64(++this._msgId);
93+
// msg: producer,id,expire,data
94+
const content = `${this.name},${id},${expire},${data}`;
95+
96+
this._redis.lpush(this._queueKey, content, err => {
97+
if (err) return callback(err);
98+
99+
this._msgCallback.once(id, callback);
100+
});
101+
102+
}
103+
104+
/**
105+
* exit
106+
*
107+
* @param {Function} callback
108+
*/
109+
exit(callback) {
110+
this._redis.end();
111+
this._msgCallback = null;
112+
callback && callback(null);
113+
}
114+
115+
}
116+
117+
module.exports = Producer;

lib/utils.js

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
'use strict';
2+
3+
/**
4+
* super-queue utils
5+
*
6+
* @authro Zongmin Lei <leizongmin@gmail.com>
7+
*/
8+
9+
const createDebug = require('debug');
10+
const Redis = require('ioredis');
11+
const utils = exports = module.exports = require('lei-utils').extend();
12+
13+
// if set environment variable NODE_TEST includes "super-queue" then IS_TEST=true
14+
const IS_TEST = (String(process.env.NODE_TEST).indexOf('super-queue') !== -1);
15+
16+
17+
/**
18+
* Create Debug Function
19+
*
20+
* @param {String} name
21+
* @return {Function}
22+
*/
23+
utils.debug = function (name) {
24+
return 'super-queue:' + name;
25+
};
26+
27+
/**
28+
* Generate Redis Client
29+
*
30+
* @param {Object} options
31+
* @return {Object}
32+
*/
33+
utils.createRedisClient = function (options) {
34+
if (IS_TEST) {
35+
return require('fakeredis').createClient();
36+
} else {
37+
return new Redis(options);
38+
}
39+
};
40+
41+
/**
42+
* Generate Client ID
43+
*
44+
* @param {String} type should be one of "producer", "consumer", "monitor"
45+
* @return {String}
46+
*/
47+
utils.generateClientId = function (type) {
48+
return type.slice(0, 1).toUpperCase() + (utils.md5(Date.now() + '' + Math.random()).slice(0, 9));
49+
};
50+
51+
/**
52+
* Get Second Timestamp
53+
*
54+
* @return {Number}
55+
*/
56+
utils.secondTimestamp = function () {
57+
return (Date.now() / 1000) << 0;
58+
};
59+
60+
/**
61+
* Integer Value to Base64 String
62+
*
63+
* @param {Number} value
64+
* @param {Number} byteLength
65+
* @return {String}
66+
*/
67+
utils.integerToBase64 = function (value, byteLength) {
68+
const buf = new Buffer(byteLength);
69+
buf.writeIntLE(value, 0, byteLength);
70+
return buf.toString('base64');
71+
};
72+
73+
/**
74+
* Split String by Separator
75+
*
76+
* @param {String} text
77+
* @param {String} separator
78+
* @param {Number} length
79+
* @return {Array}
80+
*/
81+
utils.splitString = function (text, separator, length) {
82+
const list = [];
83+
let lastIndex = 0;
84+
for (let i = 0; i < length - 1; i++) {
85+
let j = text.indexOf(separator, lastIndex);
86+
if (j === -1) {
87+
break;
88+
} else {
89+
list.push(text.slice(lastIndex, j));
90+
lastIndex = j + 1;
91+
}
92+
}
93+
list.push(text.slice(lastIndex, text.length));
94+
return list;
95+
};

0 commit comments

Comments
 (0)