Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement WSClient connection for APIClient - Closes #5928 #5964

Merged
merged 10 commits into from Nov 12, 2020
4 changes: 3 additions & 1 deletion elements/lisk-api-client/package.json
Expand Up @@ -38,9 +38,11 @@
"dependencies": {
"@liskhq/lisk-codec": "^0.1.0-alpha.0",
"@liskhq/lisk-cryptography": "^3.0.0-alpha.0",
"isomorphic-ws": "4.0.1",
"@liskhq/lisk-transactions": "^5.0.0-alpha.0",
"pm2-axon": "4.0.0",
"pm2-axon-rpc": "0.6.0"
"pm2-axon-rpc": "0.6.0",
"ws": "7.4.0"
shuse2 marked this conversation as resolved.
Show resolved Hide resolved
},
"devDependencies": {
"@liskhq/lisk-chain": "^0.2.0-alpha.2",
Expand Down
24 changes: 0 additions & 24 deletions elements/lisk-api-client/src/create_api_client.ts

This file was deleted.

Expand Up @@ -12,8 +12,10 @@
* Removal or modification of this copyright notice is prohibited.
*
*/

import { APIClient } from './api_client';
import { IPCChannel } from './ipc_channel';
import { WSChannel } from './ws_channel';

import { Channel } from './types';

export const createClient = async (channel: Channel): Promise<APIClient> => {
Expand All @@ -22,3 +24,17 @@ export const createClient = async (channel: Channel): Promise<APIClient> => {

return client;
};

export const createIPCClient = async (dataPath: string): Promise<APIClient> => {
const ipcChannel = new IPCChannel(dataPath);
await ipcChannel.connect();

return createClient(ipcChannel);
};

export const createWSClient = async (url: string): Promise<APIClient> => {
const wsChannel = new WSChannel(url);
await wsChannel.connect();

return createClient(wsChannel);
};
5 changes: 2 additions & 3 deletions elements/lisk-api-client/src/index.ts
Expand Up @@ -12,6 +12,5 @@
* Removal or modification of this copyright notice is prohibited.
*
*/
export { createWSClient } from './ws_client';
export { createClient } from './client';
export { createAPIClient } from './create_api_client';

export { createIPCClient, createWSClient, createClient } from './create_clients';
195 changes: 195 additions & 0 deletions elements/lisk-api-client/src/ws_channel.ts
@@ -0,0 +1,195 @@
/*
* Copyright © 2020 Lisk Foundation
*
* See the LICENSE file at the top-level directory of this distribution
* for licensing information.
*
* Unless otherwise agreed in a custom licensing agreement with the Lisk Foundation,
* no part of this software, including this file, may be copied, modified,
* propagated, or distributed except according to the terms contained in the
* LICENSE file.
*
* Removal or modification of this copyright notice is prohibited.
*
*/

import * as WebSocket from 'isomorphic-ws';
import { EventEmitter } from 'events';

const CONNECTION_TIMEOUT = 2000;
const ACKNOWLEDGMENT_TIMEOUT = 2000;
const RESPONSE_TIMEOUT = 3000;

const timeout = async <T = void>(ms: number, message?: string): Promise<T> =>
new Promise((_, reject) => {
const id = setTimeout(() => {
clearTimeout(id);
reject(new Error(message ?? `Timed out in ${ms}ms.`));
}, ms);
});

type EventCallback = (...args: any[]) => void;

interface WSResponse {
id?: number | string | null;
method: string;
params?: object;
result?: any;
error?: any;
}

interface Defer<T> {
promise: Promise<T>;
resolve: (result: T) => void;
reject: (error?: Error) => void;
}

const defer = <T>(): Defer<T> => {
let resolve!: (res: T) => void;
let reject!: (error?: Error) => void;

const promise = new Promise<T>((_resolve, _reject) => {
resolve = _resolve;
reject = _reject;
});

return { promise, resolve, reject };
};

export class WSChannel {
public isAlive = false;
private readonly _url: string;
private _ws?: WebSocket;
private _requestCounter = 0;
private _pendingRequests: {
[key: number]: Defer<any>;
} = {};
private readonly _emitter: EventEmitter;

public constructor(url: string) {
this._url = url;
this._emitter = new EventEmitter();
}

public async connect(): Promise<void> {
this._ws = new WebSocket(this._url);

const connect = new Promise<void>(resolve => {
this._ws?.on('open', () => {
this.isAlive = true;
resolve();
});
});

const error = new Promise<void>((_, reject) => {
this._ws?.on('error', err => {
this.isAlive = false;
reject(err);
});
});

await Promise.race([
connect,
error,
timeout(CONNECTION_TIMEOUT, `Could not connect in ${CONNECTION_TIMEOUT}ms`),
]);

this._ws.on('ping', () => {
this.isAlive = true;
});

this._ws.on('message', data => {
this._handleMessage(data as string);
});
}

public async disconnect(): Promise<void> {
this._requestCounter = 0;
this._pendingRequests = {};

if (!this._ws) {
return Promise.resolve();
}

return new Promise<void>(resolve => {
this._ws?.on('close', () => {
this._ws?.terminate();
this.isAlive = false;
this._ws = undefined;
resolve();
});
this._ws?.close();
});
}

public async invoke<T>(actionName: string, params?: Record<string, unknown>): Promise<T> {
const request = {
jsonrpc: '2.0',
id: this._requestCounter,
method: actionName,
params: params ?? {},
};

const send = new Promise((resolve, reject) => {
this._ws?.send(JSON.stringify(request), (err): void => {
if (err) {
return reject(err);
}

return resolve();
});
});

await Promise.race([
send,
timeout(ACKNOWLEDGMENT_TIMEOUT, `Request is not acknowledged in ${ACKNOWLEDGMENT_TIMEOUT}ms`),
]);

const response = defer<T>();
this._pendingRequests[this._requestCounter] = response;
this._requestCounter += 1;

return Promise.race<T>([
response.promise,
timeout<T>(RESPONSE_TIMEOUT, `Response not received in ${RESPONSE_TIMEOUT}ms`),
]);
}

public subscribe(eventName: string, cb: EventCallback): void {
this._emitter.on(eventName, cb);
}

private _handleMessage(message: string): void {
const res = JSON.parse(message) as WSResponse;

// Its an event
if ((res.id === undefined || res.id === null) && res.method) {
this._emitter.emit(res.method, this._prepareEventInfo(res));

// Its a response for a request
} else {
const id = typeof res.id === 'number' ? res.id : parseInt(res.id as string, 10);

if (this._pendingRequests[id]) {
if (res.error) {
this._pendingRequests[id].reject(res.error);
} else {
this._pendingRequests[id].resolve(res.result);
}

delete this._pendingRequests[id];
}
}
}

// eslint-disable-next-line class-methods-use-this
private _prepareEventInfo(res: WSResponse): { module: string; name: string; data: object } {
const { method } = res;
const [moduleName, ...eventName] = method.split(':');
const module = moduleName;
const name = eventName.join(':');
const data = res.params ?? {};

return { module, name, data };
}
}
82 changes: 82 additions & 0 deletions elements/lisk-api-client/test/integration/ws_channel.spec.ts
@@ -0,0 +1,82 @@
/*
* Copyright © 2020 Lisk Foundation
*
* See the LICENSE file at the top-level directory of this distribution
* for licensing information.
*
* Unless otherwise agreed in a custom licensing agreement with the Lisk Foundation,
* no part of this software, including this file, may be copied, modified,
* propagated, or distributed except according to the terms contained in the
* LICENSE file.
*
* Removal or modification of this copyright notice is prohibited.
*/

import * as WebSocket from 'isomorphic-ws';
import { WSChannel } from '../../src/ws_channel';

jest.unmock('isomorphic-ws');

describe('WSChannel', () => {
describe('connect', () => {
it('should be connect to ws server', async () => {
const server = new WebSocket.Server({ path: '/my-path', port: 65535 });
const channel = new WSChannel('ws://localhost:65535/my-path');

try {
await expect(channel.connect()).resolves.toBeUndefined();
expect(server.clients.size).toEqual(1);
expect([...server.clients][0].readyState).toEqual(WebSocket.OPEN);
} finally {
server.close();
}
expect.assertions(3);
});

it('should timeout if ws server not responding', async () => {
const verifyClient = (_: any, done: (result: boolean) => void) => {
// Take more time to accept connection
setTimeout(() => {
done(true);
}, 3000);
};
const server = new WebSocket.Server({ path: '/my-path', port: 65535, verifyClient });
const channel = new WSChannel('ws://localhost:65535/my-path');

try {
await expect(channel.connect()).rejects.toThrow('Could not connect in 2000ms');
expect(server.clients.size).toEqual(0);
} finally {
// TODO: Found that unless we disconnect channel, sever.close keep open handles.
await channel.disconnect();
server.close();
}
expect.assertions(2);
}, 5000);

it('should throw error if server is not running', async () => {
const channel = new WSChannel('ws://localhost:65535/my-path');

await expect(channel.connect()).rejects.toThrow('connect ECONNREFUSED 127.0.0.1:65535');
});
});

describe('disconnect', () => {
it('should close ws connection', async () => {
const server = new WebSocket.Server({ path: '/my-path', port: 65535 });
const channel = new WSChannel('ws://localhost:65535/my-path');

await channel.connect();

try {
await expect(channel.disconnect()).resolves.toBeUndefined();
// WebSocket.Server.channels are not cleaned immediately
expect(server.clients.size).toEqual(1);
expect([...server.clients][0].readyState).toEqual(WebSocket.CLOSING);
} finally {
server.close();
}
expect.assertions(3);
});
});
});
Expand Up @@ -10,18 +10,20 @@
* LICENSE file.
*
* Removal or modification of this copyright notice is prohibited.
*
*/

import { APIClient } from './api_client';
import { Channel } from './types';
// eslint-disable-next-line @typescript-eslint/no-var-requires
const { EventEmitter } = require('events');

export const createWSClient = async (url: string): Promise<APIClient> => {
// FIXME: requires real implementation
const channel = ({ url } as unknown) as Channel;
await channel.connect();
const client = new APIClient(channel);
await client.init();
class WebSocket extends EventEmitter {
// eslint-disable-next-line @typescript-eslint/explicit-member-accessibility
send(message, cb) {
const data = JSON.parse(message);
cb();
setTimeout(() => {
this.emit('message', JSON.stringify({ ...data, result: message }));
}, 100);
}
}

return client;
};
module.exports = WebSocket;