Skip to content

Commit

Permalink
feat: support cpu single mode (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
gxcsoccer committed Feb 26, 2019
1 parent d853c68 commit 7f3765d
Show file tree
Hide file tree
Showing 18 changed files with 802 additions and 376 deletions.
4 changes: 2 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
'use strict';

const assert = require('assert');
const is = require('is-type-of');
const cluster = require('./lib');
const symbols = require('./lib/symbol');
const ClusterClient = require('./lib/client');
const APIClientBase = require('./lib/api_client');

/**
Expand All @@ -22,7 +22,7 @@ module.exports = cluster;
* @return {Promise} returns a promise which will be resolved after fully closed
*/
module.exports.close = client => {
assert(client instanceof ClusterClient, '[cluster#close] client should be instanceof ClusterClient');
assert(is.function(client[symbols.close]), '[cluster#close] client should be instanceof ClusterClient');
return client[symbols.close]();
};

Expand Down
10 changes: 7 additions & 3 deletions lib/follower.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,12 @@ class Follower extends Base {
return super.send(...args);
}

formatKey(reg) {
return '$$inner$$__' + this.options.formatKey(reg);
}

subscribe(reg, listener) {
const key = this.options.formatKey(reg);
const key = this.formatKey(reg);
this.on(key, listener);

// no need duplicate subscribe
Expand Down Expand Up @@ -132,7 +136,7 @@ class Follower extends Base {
}

unSubscribe(reg, listener) {
const key = this.options.formatKey(reg);
const key = this.formatKey(reg);
if (listener) {
this.removeListener(key, listener);
} else {
Expand Down Expand Up @@ -174,7 +178,7 @@ class Follower extends Base {
for (const arg of args) {
const argBuf = this._transcode.encode(arg);
const len = argBuf.length;
const buf = new Buffer(4 + len);
const buf = Buffer.alloc(4 + len);
buf.writeInt32BE(len, 0);
argBuf.copy(buf, 4, 0, len);
arr.push(buf);
Expand Down
17 changes: 10 additions & 7 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ const is = require('is-type-of');
const symbols = require('./symbol');
const logger = require('./default_logger');
const transcode = require('./default_transcode');
const ClusterClient = require('./client');
const SingleClient = require('./wrapper/single');
const ClusterClient = require('./wrapper/cluster');
const { formatKey } = require('./utils');

const defaultOptions = {
port: parseInt(process.env.NODE_CLUSTER_CLIENT_PORT || 7777),
singleMode: process.env.NODE_CLUSTER_CLIENT_SINGLE_MODE === '1',
maxWaitTime: 30000,
responseTimeout: 3000,
heartbeatInterval: 20000,
Expand All @@ -25,9 +27,9 @@ const autoGenerateMethods = [
'close',
];

class ClientWrapper {
class ClientGenerator {
/**
* Cluster Client Wrapper
* Cluster Client Generator
*
* @param {Function} clientClass - the client class
* @param {Object} options
Expand Down Expand Up @@ -57,7 +59,7 @@ class ClientWrapper {
*
* @param {String} name - property name
* @param {Object} value - property value
* @return {ClientWrapper} self
* @return {ClientGenerator} self
*/
override(name, value) {
this._descriptors.set(name, {
Expand All @@ -72,7 +74,7 @@ class ClientWrapper {
*
* @param {String} from - method name
* @param {String} to - delegate to subscribe|publish|invoke
* @return {ClientWrapper} self
* @return {ClientGenerator} self
*/
delegate(from, to) {
to = to || 'invoke';
Expand Down Expand Up @@ -102,7 +104,8 @@ class ClientWrapper {
return Reflect.construct(clientClass, args);
}

const client = new ClusterClient(Object.assign({
const ClientWrapper = this._options.singleMode ? SingleClient : ClusterClient;
const client = new ClientWrapper(Object.assign({
createRealClient,
descriptors: this._descriptors,
}, this._options));
Expand Down Expand Up @@ -203,5 +206,5 @@ class ClientWrapper {
}

module.exports = function(clientClass, options) {
return new ClientWrapper(clientClass, options);
return new ClientGenerator(clientClass, options);
};
21 changes: 11 additions & 10 deletions lib/leader.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ class Leader extends Base {
this.ready(true);
}
});
} else {
this._isReady = true;
this.ready(true);
}

this._handleConnection = this._handleConnection.bind(this);
Expand Down Expand Up @@ -87,6 +90,10 @@ class Leader extends Base {
return this.options.logger;
}

formatKey(reg) {
return '$$inner$$__' + this.options.formatKey(reg);
}

subscribe(reg, listener) {
const transcode = this._transcode;
const conn = Object.create(Base.prototype, {
Expand Down Expand Up @@ -124,7 +131,7 @@ class Leader extends Base {
}

unSubscribe(reg, listener) {
const key = this.options.formatKey(reg);
const key = this.formatKey(reg);
const connKeySet = this._subConnMap.get(key) || new Set();
const newConnKeySet = new Set();
for (const connKey of connKeySet.values()) {
Expand Down Expand Up @@ -173,7 +180,7 @@ class Leader extends Base {
}

_doSubscribe(reg, conn) {
const key = this.options.formatKey(reg);
const key = this.formatKey(reg);
const callback = err => {
if (err) {
this._errorHandler(err);
Expand Down Expand Up @@ -232,13 +239,7 @@ class Leader extends Base {
}

_findMethodName(type) {
for (const method of this.options.descriptors.keys()) {
const descriptor = this.options.descriptors.get(method);
if (descriptor.type === 'delegate' && descriptor.to === type) {
return method;
}
}
return null;
return utils.findMethodName(this.options.descriptors, type);
}

// handle new socket connect
Expand Down Expand Up @@ -277,7 +278,7 @@ class Leader extends Base {

_handleUnSubscribe(req, conn) {
const connObj = req.connObj || {};
const key = this.options.formatKey(connObj.reg);
const key = this.formatKey(connObj.reg);
const connKeySet = this._subConnMap.get(key) || new Set();
connKeySet.delete(conn.key);
this._subConnMap.set(key, connKeySet);
Expand Down
10 changes: 5 additions & 5 deletions lib/protocol/packet.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,16 @@ class Packet {
this.type = options.type;
this.timeout = options.timeout;
this.connObj = options.connObj;
this.data = typeof options.data === 'string' ? new Buffer(options.data) : options.data;
this.data = typeof options.data === 'string' ? Buffer.from(options.data) : options.data;
}

get isResponse() {
return this.type === Constant.RESPONSE;
}

encode() {
const header = new Buffer([ Constant.VERSION, this.type, 0, 0 ]);
const connBuf = new Buffer(JSON.stringify(this.connObj));
const header = Buffer.from([ Constant.VERSION, this.type, 0, 0 ]);
const connBuf = Buffer.from(JSON.stringify(this.connObj));
const appLen = this.data ? this.data.length : 0;

byteBuffer.reset();
Expand All @@ -81,13 +81,13 @@ class Packet {
const connLength = buf.readInt32BE(16);
const appLength = buf.readInt32BE(20);

const connBuf = new Buffer(connLength);
const connBuf = Buffer.alloc(connLength);
buf.copy(connBuf, 0, 24, 24 + connLength);
const connObj = JSON.parse(connBuf);

let data;
if (appLength) {
data = new Buffer(appLength);
data = Buffer.alloc(appLength);
buf.copy(data, 0, 24 + connLength, 24 + connLength + appLength);
}
return {
Expand Down
5 changes: 5 additions & 0 deletions lib/symbol.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,8 @@ exports.subInfo = Symbol.for('ClusterClient#subInfo');
exports.pubInfo = Symbol.for('ClusterClient#pubInfo');
exports.closeHandler = Symbol.for('ClusterClient#closeHandler');
exports.close = Symbol.for('ClusterClient#close');
exports.subscribeMethodName = Symbol.for('ClusterClient#subscribeMethodName');
exports.unSubscribeMethodName = Symbol.for('ClusterClient#unSubscribeMethodName');
exports.publishMethodName = Symbol.for('ClusterClient#publishMethodName');
exports.closeByUser = Symbol.for('ClusterClient#closeByUser');
exports.singleMode = Symbol.for('ClusterClient#singleMode');
10 changes: 10 additions & 0 deletions lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,13 @@ exports.callFn = async function(fn, args) {
}
return r;
};

exports.findMethodName = (descriptors, type) => {
for (const method of descriptors.keys()) {
const descriptor = descriptors.get(method);
if (descriptor.type === 'delegate' && descriptor.to === type) {
return method;
}
}
return null;
};
79 changes: 52 additions & 27 deletions lib/client.js → lib/wrapper/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ const debug = require('debug')('cluster-client');
const is = require('is-type-of');
const Base = require('sdk-base');
const assert = require('assert');
const utils = require('./utils');
const Leader = require('./leader');
const Follower = require('./follower');
const ClusterServer = require('./server');
const utils = require('../utils');
const Leader = require('../leader');
const Follower = require('../follower');
const ClusterServer = require('../server');
const {
init,
logger,
Expand All @@ -21,8 +21,8 @@ const {
pubInfo,
closeHandler,
close,
} = require('./symbol');

singleMode,
} = require('../symbol');

class ClusterClient extends Base {
/**
Expand All @@ -40,6 +40,7 @@ class ClusterClient extends Base {
super(options);
this[subInfo] = new Map();
this[pubInfo] = new Map();
this[singleMode] = false;

this[closeHandler] = () => {
this[logger].warn('[ClusterClient:%s] %s closed, and try to init it again', this.options.name, this[innerClient].isLeader ? 'leader' : 'follower');
Expand All @@ -48,6 +49,9 @@ class ClusterClient extends Base {
this[init]().catch(err => { this.ready(err); });
};
this[init]().catch(err => { this.ready(err); });

// avoid warning message
this.setMaxListeners(100);
}

get isClusterClientLeader() {
Expand Down Expand Up @@ -102,10 +106,17 @@ class ClusterClient extends Base {
await this[innerClient].ready();

// subscribe all
for (const registrations of this[subInfo].values()) {
for (const args of registrations) {
this[innerClient].subscribe(args[0], args[1]);
}
for (const key of this[subInfo].keys()) {
const info = this[subInfo].get(key);
const reg = info.reg;
this[innerClient].subscribe(reg, data => {
this[subInfo].set(key, {
reg,
inited: true,
data,
});
this.emit(key, data);
});
}
// publish all
for (const reg of this[pubInfo].values()) {
Expand All @@ -130,12 +141,29 @@ class ClusterClient extends Base {

debug('[ClusterClient:%s] subscribe %j', this.options.name, reg);
const key = this.options.formatKey(reg);
const registrations = this[subInfo].get(key) || [];
registrations.push([ reg, listener ]);
this[subInfo].set(key, registrations);

if (this[isReady]) {
this[innerClient].subscribe(reg, listener);
this.on(key, listener);

const info = this[subInfo].get(key);
if (!info) {
this[subInfo].set(key, {
reg,
inited: false,
data: null,
});
if (this[isReady]) {
this[innerClient].subscribe(reg, data => {
this[subInfo].set(key, {
reg,
inited: true,
data,
});
this.emit(key, data);
});
}
} else if (info.inited) {
process.nextTick(() => {
listener(info.data);
});
}
}

Expand All @@ -149,19 +177,16 @@ class ClusterClient extends Base {
[unSubscribe](reg, listener) {
debug('[ClusterClient:%s] unSubscribe %j', this.options.name, reg);
const key = this.options.formatKey(reg);
const registrations = this[subInfo].get(key) || [];
const newRegistrations = [];
if (listener) {
for (const arr of registrations) {
if (arr[1] !== listener) {
newRegistrations.push(arr);
}
}
this.removeListener(key, listener);
} else {
this.removeAllListeners(key);
}
this[subInfo].set(key, newRegistrations);

if (this[isReady]) {
this[innerClient].unSubscribe(reg, listener);
if (this.listenerCount(key) === 0) {
this[subInfo].delete(key);
if (this[isReady]) {
this[innerClient].unSubscribe(reg);
}
}
}

Expand Down

0 comments on commit 7f3765d

Please sign in to comment.