Skip to content

Commit

Permalink
[BREAKING] feat: migrating from generators to async/await (#36)
Browse files Browse the repository at this point in the history
feat: migrating from generators to async/await
  • Loading branch information
gxcsoccer committed Mar 6, 2018
1 parent e32f0ef commit 2574eae
Show file tree
Hide file tree
Showing 21 changed files with 432 additions and 284 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
sudo: false
language: node_js
node_js:
- '6'
- '8'
- '9'
install:
- npm i npminstall && npminstall
script:
Expand Down
2 changes: 1 addition & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
environment:
matrix:
- nodejs_version: '6'
- nodejs_version: '8'
- nodejs_version: '9'

install:
- ps: Install-Product node $env:nodejs_version
Expand Down
121 changes: 57 additions & 64 deletions lib/client.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
'use strict';

const debug = require('debug')('cluster-client');
const co = require('co');
const is = require('is-type-of');
const Base = require('sdk-base');
const assert = require('assert');
Expand Down Expand Up @@ -46,9 +45,9 @@ class ClusterClient extends Base {
this[logger].warn('[ClusterClient:%s] %s closed, and try to init it again', this.options.name, this[innerClient].isLeader ? 'leader' : 'follower');
this[isReady] = false;
this.ready(false);
this[init]();
this[init]().catch(err => { this.ready(err); });
};
this[init]();
this[init]().catch(err => { this.ready(err); });
}

get isClusterClientLeader() {
Expand All @@ -68,59 +67,55 @@ class ClusterClient extends Base {
*
* @return {void}
*/
[init]() {
co(function* () {
const name = this.options.name;
const port = this.options.port;
let server;
if (this.options.isLeader === true) {
server = yield ClusterServer.create(name, port);
if (!server) {
throw new Error(`create "${name}" leader failed, the port:${port} is occupied by other`);
}
} else if (this.options.isLeader === false) {
// wait for leader active
yield ClusterServer.waitFor(port, this.options.maxWaitTime);
} else {
debug('[ClusterClient:%s] init cluster client, try to seize the leader on port:%d', name, port);
server = yield ClusterServer.create(name, port);
async [init]() {
const name = this.options.name;
const port = this.options.port;
let server;
if (this.options.isLeader === true) {
server = await ClusterServer.create(name, port);
if (!server) {
throw new Error(`create "${name}" leader failed, the port:${port} is occupied by other`);
}
} else if (this.options.isLeader === false) {
// wait for leader active
await ClusterServer.waitFor(port, this.options.maxWaitTime);
} else {
debug('[ClusterClient:%s] init cluster client, try to seize the leader on port:%d', name, port);
server = await ClusterServer.create(name, port);
}

if (server) {
this[innerClient] = new Leader(Object.assign({ server }, this.options));
debug('[ClusterClient:%s] has seized port %d, and serves as leader client.', name, port);
} else {
this[innerClient] = new Follower(this.options);
debug('[ClusterClient:%s] gives up seizing port %d, and serves as follower client.', name, port);
}
if (server) {
this[innerClient] = new Leader(Object.assign({ server }, this.options));
debug('[ClusterClient:%s] has seized port %d, and serves as leader client.', name, port);
} else {
this[innerClient] = new Follower(this.options);
debug('[ClusterClient:%s] gives up seizing port %d, and serves as follower client.', name, port);
}

// events delegate
utils.delegateEvents(this[innerClient], this);
// events delegate
utils.delegateEvents(this[innerClient], this);

// re init when connection is close
this[innerClient].on('close', this[closeHandler]);
// re init when connection is close
this[innerClient].on('close', this[closeHandler]);

// wait leader/follower ready
yield this[innerClient].ready();
// wait leader/follower ready
await this[innerClient].ready();

// subscribe all
for (const registrations of this[subInfo].values()) {
for (const args of registrations) {
this[innerClient].subscribe(args[0], args[1]);
}
}
// publish all
for (const reg of this[pubInfo].values()) {
this[innerClient].publish(reg);
// subscribe all
for (const registrations of this[subInfo].values()) {
for (const args of registrations) {
this[innerClient].subscribe(args[0], args[1]);
}
}
// publish all
for (const reg of this[pubInfo].values()) {
this[innerClient].publish(reg);
}

if (!this[isReady]) {
this[isReady] = true;
this.ready(true);
}
}.bind(this)).catch(err => {
this.ready(err);
});
if (!this[isReady]) {
this[isReady] = true;
this.ready(true);
}
}

/**
Expand Down Expand Up @@ -210,24 +205,22 @@ class ClusterClient extends Base {
this[innerClient].invoke(method, args, callback);
}

[close]() {
return co(function* () {
try {
// close after ready, in case of innerClient is initializing
yield this.ready();
} catch (err) {
// ignore
}
async [close]() {
try {
// close after ready, in case of innerClient is initializing
await this.ready();
} catch (err) {
// ignore
}

const client = this[innerClient];
if (client) {
// prevent re-initializing
client.removeListener('close', this[closeHandler]);
if (client.close) {
yield utils.callFn(client.close.bind(client));
}
const client = this[innerClient];
if (client) {
// prevent re-initializing
client.removeListener('close', this[closeHandler]);
if (client.close) {
await utils.callFn(client.close.bind(client));
}
}.bind(this));
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ class ClientWrapper {

for (const key of keys) {
const descriptor = Reflect.getOwnPropertyDescriptor(proto, key);
if (descriptor.value && is.generatorFunction(descriptor.value)) {
if (descriptor.value &&
(is.generatorFunction(descriptor.value) || is.asyncFunction(descriptor.value))) {
this.delegate(key);
}
}
Expand Down
45 changes: 22 additions & 23 deletions lib/leader.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,14 @@ class Leader extends Base {
});
}

this._closeHandler = this._handleClose.bind(this);
this._handleConnection = this._handleConnection.bind(this);

// subscribe its own channel
this._server.on(`${this.options.name}_connection`, this._handleConnection);
this._server.once('close', () => { this.emit('server_closed'); });
this.on('server_closed', this._closeHandler);
this.on('server_closed', () => {
this._handleClose().catch(err => { this.emit('error', err); });
});

// maxIdleTime is 3 times of heartbeatInterval
const heartbeatInterval = this.options.heartbeatInterval;
Expand Down Expand Up @@ -247,7 +248,7 @@ class Leader extends Base {
const conn = new Connection({
socket,
name: this.options.name,
logger: this.options.logger,
logger: this.logger,
transcode: this.options.transcode,
requestTimeout: this.options.requestTimeout,
});
Expand Down Expand Up @@ -393,43 +394,41 @@ class Leader extends Base {
});
}

* _handleClose() {
async _handleClose() {
debug('[Leader:%s] leader server is closed', this.options.name);
// close the real client
if (this._realClient) {
const originClose = this._findMethodName('close');
if (originClose) {
// support common function, generatorFunction, and function returning a promise
yield utils.callFn(this._realClient[originClose].bind(this._realClient));
await utils.callFn(this._realClient[originClose].bind(this._realClient));
}
}
clearInterval(this._heartbeatTimer);
this._heartbeatTimer = null;
this.emit('close');
}

close() {
async close() {
this._closeByUser = true;
return co(function* () {
debug('[Leader:%s] try to close leader', this.options.name);
// 1. stop listening to server channel
this._server.removeListener(`${this.options.name}_connection`, this._handleConnection);

// 2. close all mock connections
for (const conn of this._connections.values()) {
if (conn.isMock) {
conn.emit('close');
}
debug('[Leader:%s] try to close leader', this.options.name);
// 1. stop listening to server channel
this._server.removeListener(`${this.options.name}_connection`, this._handleConnection);

// 2. close all mock connections
for (const conn of this._connections.values()) {
if (conn.isMock) {
conn.emit('close');
}
}

// 3. close server
// CANNOT close server directly by server.close(), other cluster clients may be using it
this.removeAllListeners('server_closed');
yield ClusterServer.close(this.options.name, this._server);
// 3. close server
// CANNOT close server directly by server.close(), other cluster clients may be using it
this.removeAllListeners('server_closed');
await ClusterServer.close(this.options.name, this._server);

// 5. close real client
yield this._handleClose();
}.bind(this));
// 5. close real client
await this._handleClose();
}
}

Expand Down
33 changes: 16 additions & 17 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const debug = require('debug')('cluster-client:server');
const net = require('net');
const Base = require('sdk-base');
const sleep = require('mz-modules/sleep');
const Packet = require('./protocol/packet');

// share memory in current process
Expand All @@ -19,10 +20,8 @@ if (global.typeSet) {
global.typeSet = typeSet = new Set();
}

const sleep = timeout => cb => setTimeout(cb, timeout);

function claimServer(port) {
return cb => {
return new Promise((resolve, reject) => {
const server = net.createServer();
server.listen({
port,
Expand All @@ -33,26 +32,26 @@ function claimServer(port) {

function onError(err) {
debug('listen %s error: %s', port, err);
cb(err);
reject(err);
}

server.on('error', onError);
server.on('listening', () => {
server.removeListener('error', onError);
debug('listen %s success', port);
cb(null, server);
resolve(server);
});
};
});
}

function tryToConnect(port) {
return cb => {
return new Promise(resolve => {
const socket = net.connect(port, '127.0.0.1');
debug('try to connecting %s', port);
let success = false;
socket.on('connect', () => {
success = true;
cb(null, true);
resolve(true);
// disconnect
socket.end();
debug('test connected %s success, end now', port);
Expand All @@ -61,9 +60,9 @@ function tryToConnect(port) {
debug('test connect %s error: %s, success: %s', port, err, success);
// if success before, ignore it
if (success) return;
cb(null, false);
resolve(false);
});
};
});
}

class ClusterServer extends Base {
Expand Down Expand Up @@ -169,7 +168,7 @@ class ClusterServer extends Base {
* @param {Number} port - the port
* @return {ClusterServer} server
*/
static* create(name, port) {
static async create(name, port) {
const key = `${name}@${port}`;
let instance = serverMap.get(port);
if (instance && !instance.isClosed) {
Expand All @@ -181,7 +180,7 @@ class ClusterServer extends Base {
}
// compete for the local port, if got => leader, otherwise follower
try {
const server = yield claimServer(port);
const server = await claimServer(port);
instance = new ClusterServer({ server, port });
typeSet.add(key);
serverMap.set(port, instance);
Expand All @@ -200,7 +199,7 @@ class ClusterServer extends Base {
}
}

static* close(name, server) {
static async close(name, server) {
const port = server._port;

// remove from typeSet, so other client can occupy
Expand All @@ -217,7 +216,7 @@ class ClusterServer extends Base {
// close server if no one is listening on this port any more
if (!listening) {
const server = serverMap.get(port);
if (server) yield server.close();
if (server) await server.close();
}
}

Expand All @@ -228,18 +227,18 @@ class ClusterServer extends Base {
* @param {Number} timeout - the max wait time
* @return {void}
*/
static* waitFor(port, timeout) {
static async waitFor(port, timeout) {
const start = Date.now();
let connect = false;
while (!connect) {
connect = yield tryToConnect(port);
connect = await tryToConnect(port);

// if timeout, throw error
if (Date.now() - start > timeout) {
throw new Error(`[ClusterClient] leader does not be active in ${timeout}ms on port:${port}`);
}
if (!connect) {
yield sleep(3000);
await sleep(3000);
}
}
}
Expand Down
Loading

0 comments on commit 2574eae

Please sign in to comment.