Skip to content

Commit

Permalink
feat: impl subscribeTimeout (#56)
Browse files Browse the repository at this point in the history
If subscribe listener not be called before subscribeTimeout,
ApiClient will emit timeout error.
  • Loading branch information
killagu committed Oct 18, 2022
1 parent 04992f3 commit c9f389d
Show file tree
Hide file tree
Showing 17 changed files with 180 additions and 51 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/nodejs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
strategy:
fail-fast: false
matrix:
node-version: [8, 10]
node-version: [8, 10, 12, 14, 16]
os: [ubuntu-latest, windows-latest, macos-latest]

steps:
Expand Down
13 changes: 0 additions & 13 deletions .travis.yml

This file was deleted.

15 changes: 0 additions & 15 deletions appveyor.yml

This file was deleted.

2 changes: 1 addition & 1 deletion lib/api_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class APIClientBase extends Base {

close() {
if (is.function(this._client.close)) {
return this._client.close();
return this._client.close().then(() => cluster.close(this._client));
}
return cluster.close(this._client);
}
Expand Down
2 changes: 1 addition & 1 deletion lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class Connection extends Base {
* - {Socket} socket - the socket instance
* - {Number} responseTimeout - the response timeout
* - {Transcode} transcode - serialze / deserialze methods
* @constructor
* @class
*/
constructor(options) {
super(options);
Expand Down
36 changes: 35 additions & 1 deletion lib/follower.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ class Follower extends Base {
* - {Map} descriptors - interface descriptors
* - {Transcode} transcode - serialze / deserialze methods
* - {Number} responseTimeout - the timeout
* @constructor
* - {Number} subscribeTimeout - the first subscribe callback timeout
* @class
*/
constructor(options) {
// local address
Expand All @@ -27,6 +28,7 @@ class Follower extends Base {
this._subData = new Map();
this._transcode = options.transcode;
this._closeByUser = false;
this._subscribeTimeoutListeners = new Map();

this.on('request', req => this._handleRequest(req));
// avoid warning message
Expand Down Expand Up @@ -107,9 +109,41 @@ class Follower extends Base {
return '$$inner$$__' + this.options.formatKey(reg);
}

_clearSubscribeTimeout(key) {
if (!this._subscribeTimeoutListeners.has(key)) {
return;
}
const timeout = this._subscribeTimeoutListeners.get(key);
clearTimeout(timeout);
this._subscribeTimeoutListeners.delete(key);
}

_setSubscribeTimeout(key) {
this._clearSubscribeTimeout(key);
const start = Date.now();
const subscribeTimeout = this.options.subscribeTimeout;
const timeout = setTimeout(() => {
const error = new Error(`subscribe timeout for ${key}, cost: ${Date.now() - start}ms, expect is ${subscribeTimeout}ms`);
this.emit('error', error);
}, subscribeTimeout);
this._subscribeTimeoutListeners.set(key, timeout);
}

_listenSubscribeTimeout(key) {
this._setSubscribeTimeout(key);
const dataListener = () => {
this.removeListener(key, dataListener);
this._clearSubscribeTimeout(key);
};
this.on(key, dataListener);
}

subscribe(reg, listener) {
const key = this.formatKey(reg);
this.on(key, listener);
if (this.options.subscribeTimeout) {
this._listenSubscribeTimeout(key);
}

// no need duplicate subscribe
if (!this._subInfo.has(key)) {
Expand Down
4 changes: 2 additions & 2 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const defaultOptions = {
port: parseInt(process.env.NODE_CLUSTER_CLIENT_PORT) || 7777,
singleMode: process.env.NODE_CLUSTER_CLIENT_SINGLE_MODE === '1',
maxWaitTime: 30000,
connectTimeout: parseInt(process.env.NODE_CLUSTER_CLIENT_CONNECT_TIMEOUT) || 10000,
connectTimeout: parseInt(process.env.NODE_CLUSTER_CLIENT_CONNECT_TIMEOUT) || 10000,
responseTimeout: 3000,
heartbeatInterval: 20000,
autoGenerate: true,
Expand Down Expand Up @@ -43,7 +43,7 @@ class ClientGenerator {
* - {Function} decode - custom deserialize method
* - {Boolean} [isLeader|null] - specify whether current instance is leader
* - {Number} [maxWaitTime|30000] - leader startup max time (ONLY effective on isLeader is true)
* @constructor
* @class
*/
constructor(clientClass, options) {
this._clientClass = clientClass;
Expand Down
2 changes: 1 addition & 1 deletion lib/leader.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class Leader extends Base {
* - {Boolean} isBroadcast - whether broadcast subscrption result to all followers or just one, default is true
* - {Number} heartbeatInterval - the heartbeat interval
* - {Function} createRealClient - to create the real client
* @constructor
* @class
*/
constructor(options) {
super(options);
Expand Down
2 changes: 1 addition & 1 deletion lib/protocol/packet.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class Packet {
* - @param {Number} timeout - The timeout
* - @param {Object} connObj - connection object
* - @param {Buffer} data - app data
* @constructor
* @class
*/
constructor(options) {
this.id = options.id;
Expand Down
2 changes: 1 addition & 1 deletion lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class ClusterServer extends Base {
/**
* Manage all TCP Connections,assign them to proper channel
*
* @constructor
* @class
* @param {Object} options
* - {net.Server} server - the server
* - {Number} port - the port
Expand Down
2 changes: 1 addition & 1 deletion lib/wrapper/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class WrapperBase extends Base {
* - {Boolean} isLeader - wehether is leader or follower
* - {Number} maxWaitTime - leader startup max time (ONLY effective on isLeader is true)
* - {Function} createRealClient - to create the real client instance
* @constructor
* @class
*/
constructor(options) {
super(options);
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"scripts": {
"autod": "autod",
"lint": "eslint . --ext .js",
"cov": "TEST_TIMEOUT=20000 egg-bin cov",
"cov": "egg-bin cov --timeout 20000",
"test": "npm run lint && npm run test-local",
"test-local": "egg-bin test",
"pkgfiles": "egg-bin pkgfiles --check",
Expand Down Expand Up @@ -67,6 +67,6 @@
"node": ">=8.0.0"
},
"ci": {
"version": "8, 10"
"version": "8, 10, 12, 14, 16"
}
}
48 changes: 48 additions & 0 deletions test/subscribe.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
'use strict';

const assert = require('assert');
const sleep = require('mz-modules/sleep');
const ApiClient = require('./supports/sub_timeout/api_client');

describe('test/subscrib.test.js', () => {
describe('timeout case', () => {
it('should timeout', async () => {
const leader = new ApiClient({
singleMode: false,
isLeader: true,
});
const follower = new ApiClient({
clusterOptions: {
subscribeTimeout: 1000,
},
singleMode: false,
isLeader: false,
});
await follower.ready();
const errors = [];
const values = [];
follower.on('error', err => {
errors.push(err);
});
follower.subscribe({
key: 'timeout:500',
}, value => {
values.push(value);
});
follower.subscribe({
key: 'timeout:1500',
}, value => {
values.push(value);
});
await sleep(2000);
assert.deepStrictEqual(values, [
'hello:500',
'hello:1500',
]);
assert(errors.length === 1);
assert(/subscribe timeout for/.test(errors[0]));
await follower.close();
await leader.close();
});
});
});
26 changes: 16 additions & 10 deletions test/supports/cluster_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ function startServer(port) {
get clusterOptions() {
return {
port,
responseTimeout: 1000,
responseTimeout: 10000,
name: `cluster-server-test-${process.version}`,
};
}
Expand Down Expand Up @@ -55,7 +55,9 @@ function startServer(port) {

// Fork workers.
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
cluster.fork({
CLUSTER_PORT: port,
});
}

cluster.on('exit', (worker, code, signal) => {
Expand Down Expand Up @@ -105,11 +107,15 @@ function startServer(port) {
}

const server = net.createServer();
server.listen(0, () => {
const address = server.address();
console.log('using port =>', address.port);
server.close();
setTimeout(() => {
startServer(address.port);
}, 100);
});
if (cluster.isMaster) {
server.listen(0, () => {
const address = server.address();
console.log('using port =>', address.port);
server.close(() => {
startServer(address.port);
});
});
} else {
console.log('child process using port =>', process.env.CLUSTER_PORT);
startServer(+process.env.CLUSTER_PORT);
}
2 changes: 1 addition & 1 deletion test/supports/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class ClusterServer extends Base {
/**
* Manage all TCP Connections,assign them to proper channel
*
* @constructor
* @class
* @param {Object} options
* - {net.Server} server - the server
* - {Number} port - the port
Expand Down
40 changes: 40 additions & 0 deletions test/supports/sub_timeout/api_client.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
'use strict';

const DataClient = require('./data_client');
const APIClientBase = require('../../../lib/api_client');

class ApiClient extends APIClientBase {
constructor(options = {}) {
super(Object.assign({}, options, { initMethod: '_init' }));
}

get DataClient() {
return DataClient;
}

get clusterOptions() {
return {
port: parseInt(process.env.NODE_CLUSTER_CLIENT_PORT || 7777),
singleMode: false,
subscribeTimeout: 1000,
};
}

async _init() {
await this._client.ready();
}

subscribe(config, listener) {
this._client.subscribe(config, listener);
}

publish(reg) {
this._client.publish(reg);
}

close() {
return this._client.close();
}
}

module.exports = ApiClient;
29 changes: 29 additions & 0 deletions test/supports/sub_timeout/data_client.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
'use strict';

const Base = require('sdk-base');

class DataClient extends Base {
constructor(options = {}) {
super(options);
}

async _init() {
// ...
}

subscribe(reg, listener) {
const { key } = reg;
const match = key.match(/timeout:(\d+)/);
if (!match) {
throw new Error('not a timeout key');
}
setTimeout(() => {
listener('hello:' + match[1]);
}, +match[1]);
}

close() {
}
}

module.exports = DataClient;

0 comments on commit c9f389d

Please sign in to comment.