Skip to content

Commit

Permalink
fix: single mode bugs (#49)
Browse files Browse the repository at this point in the history
  • Loading branch information
gxcsoccer committed Mar 1, 2019
1 parent b03e4c8 commit 6f9bf8e
Show file tree
Hide file tree
Showing 11 changed files with 554 additions and 377 deletions.
12 changes: 0 additions & 12 deletions lib/leader.js
Original file line number Diff line number Diff line change
Expand Up @@ -153,18 +153,6 @@ class Leader extends Base {
}

invoke(methodName, args, callback) {
if (!this._isReady) {
this.ready(err => {
if (err) {
if (callback) {
callback(err);
}
return;
}
this.invoke(methodName, args, callback);
});
return;
}
let method = this._realClient[methodName];
// compatible with generatorFunction
if (is.generatorFunction(method)) {
Expand Down
1 change: 1 addition & 0 deletions lib/symbol.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ exports.unSubscribeMethodName = Symbol.for('ClusterClient#unSubscribeMethodName'
exports.publishMethodName = Symbol.for('ClusterClient#publishMethodName');
exports.closeByUser = Symbol.for('ClusterClient#closeByUser');
exports.singleMode = Symbol.for('ClusterClient#singleMode');
exports.createClient = Symbol.for('ClusterClient#createClient');
209 changes: 209 additions & 0 deletions lib/wrapper/base.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
'use strict';

const debug = require('debug')('cluster-client');
const is = require('is-type-of');
const Base = require('sdk-base');
const assert = require('assert');
const utils = require('../utils');
// Symbols
const {
init,
logger,
isReady,
innerClient,
subscribe,
unSubscribe,
publish,
invoke,
subInfo,
pubInfo,
closeHandler,
close,
singleMode,
createClient,
} = require('../symbol');

class WrapperBase extends Base {
/**
* Share Connection among Multi-Process Mode
*
* @param {Object} options
* - {Number} port - the port
* - {Transcode} transcode - serialze / deseriaze methods
* - {Boolean} isLeader - wehether is leader or follower
* - {Number} maxWaitTime - leader startup max time (ONLY effective on isLeader is true)
* - {Function} createRealClient - to create the real client instance
* @constructor
*/
constructor(options) {
super(options);
this[subInfo] = new Map();
this[pubInfo] = new Map();
this[init]().catch(err => { this.ready(err); });
}

get isClusterClientLeader() {
return this[innerClient] && this[innerClient].isLeader;
}

get [singleMode]() {
return false;
}

/**
* log instance
* @property {Logger} ClusterClient#[logger]
*/
get [logger]() {
return this.options.logger;
}

async [createClient]() {
throw new Error('not implement');
}

/**
* initialize, to leader or follower
*
* @return {void}
*/
async [init]() {
this[innerClient] = await this[createClient]();

// events delegate
utils.delegateEvents(this[innerClient], this);

// re init when connection is close
if (this[closeHandler]) {
this[innerClient].on('close', this[closeHandler]);
}

// wait leader/follower ready
await this[innerClient].ready();

// subscribe all
for (const registrations of this[subInfo].values()) {
for (const args of registrations) {
this[innerClient].subscribe(args[0], args[1]);
}
}
// publish all
for (const reg of this[pubInfo].values()) {
this[innerClient].publish(reg);
}

if (!this[isReady]) {
this[isReady] = true;
this.ready(true);
}
}

/**
* do subscribe
*
* @param {Object} reg - subscription info
* @param {Function} listener - callback function
* @return {void}
*/
[subscribe](reg, listener) {
assert(is.function(listener), `[ClusterClient:${this.options.name}] subscribe(reg, listener) listener should be a function`);

debug('[ClusterClient:%s] subscribe %j', this.options.name, reg);
const key = this.options.formatKey(reg);
const registrations = this[subInfo].get(key) || [];
registrations.push([ reg, listener ]);
this[subInfo].set(key, registrations);

if (this[isReady]) {
this[innerClient].subscribe(reg, listener);
}
}

/**
* do unSubscribe
*
* @param {Object} reg - subscription info
* @param {Function} listener - callback function
* @return {void}
*/
[unSubscribe](reg, listener) {
debug('[ClusterClient:%s] unSubscribe %j', this.options.name, reg);
const key = this.options.formatKey(reg);
const registrations = this[subInfo].get(key) || [];
const newRegistrations = [];
if (listener) {
for (const arr of registrations) {
if (arr[1] !== listener) {
newRegistrations.push(arr);
}
}
}
this[subInfo].set(key, newRegistrations);

if (this[isReady]) {
this[innerClient].unSubscribe(reg, listener);
}
}

/**
* do publish
*
* @param {Object} reg - publish info
* @return {void}
*/
[publish](reg) {
debug('[ClusterClient:%s] publish %j', this.options.name, reg);
const key = this.options.formatKey(reg);
this[pubInfo].set(key, reg);

if (this[isReady]) {
this[innerClient].publish(reg);
}
}

/**
* invoke a method asynchronously
*
* @param {String} method - the method name
* @param {Array} args - the arguments list
* @param {Function} callback - callback function
* @return {void}
*/
[invoke](method, args, callback) {
if (!this[isReady]) {
this.ready(err => {
if (err) {
callback && callback(err);
return;
}
this[innerClient].invoke(method, args, callback);
});
return;
}

debug('[ClusterClient:%s] invoke method: %s, args: %j', this.options.name, method, args);
this[innerClient].invoke(method, args, callback);
}

async [close]() {
try {
// close after ready, in case of innerClient is initializing
await this.ready();
} catch (err) {
// ignore
}

const client = this[innerClient];
if (client) {
// prevent re-initializing
if (this[closeHandler]) {
client.removeListener('close', this[closeHandler]);
}
if (client.close) {
await utils.callFn(client.close.bind(client));
}
}
}
}

module.exports = WrapperBase;

0 comments on commit 6f9bf8e

Please sign in to comment.