Skip to content

Commit

Permalink
✨ Add implementation for ws client
Browse files Browse the repository at this point in the history
  • Loading branch information
nazarhussain committed Nov 10, 2020
1 parent fd4caf7 commit 782ebab
Showing 1 changed file with 106 additions and 8 deletions.
114 changes: 106 additions & 8 deletions elements/lisk-api-client/src/ws_client.ts
Expand Up @@ -14,25 +14,61 @@
*/

import * as WebSocket from 'isomorphic-ws';
import { EventEmitter } from 'events';

const CONNECTION_TIMEOUT = 2000;
const ACKNOWLEDGMENT_TIMEOUT = 2000;
const RESPONSE_TIMEOUT = 3000;

const timeout = async (ms: number, message?: string): Promise<void> =>
const timeout = async <T = void>(ms: number, message?: string): Promise<T> =>
new Promise((_, reject) => {
const id = setTimeout(() => {
clearTimeout(id);
reject(message ?? `Timed out in ${ms}ms.`);
}, ms);
});

type EventCallback = (...args: any[]) => void;

interface WSResponse {
id?: number | string | null;
method?: string;
params?: any;
result?: any;
error?: any;
}

interface Defer<T> {
promise: Promise<T>;
resolve: (result: T) => void;
reject: (error?: Error) => void;
}

const defer = <T>(): Defer<T> => {
let resolve!: (res: T) => void;
let reject!: (error?: Error) => void;

const promise = new Promise<T>((_resolve, _reject) => {
resolve = _resolve;
reject = _reject;
});

return { promise, resolve, reject };
};

export class WSClient {
public isAlive = false;
private readonly _url: string;
private _ws?: WebSocket;
private _requestCounter = 0;
private _pendingRequests: {
[key: number]: Defer<any>;
} = {};
private readonly _emitter: EventEmitter;

public constructor(url: string) {
this._url = url;
this._emitter = new EventEmitter();
}

public async connect(): Promise<void> {
Expand All @@ -45,17 +81,24 @@ export class WSClient {
});
});

await Promise.race([
connect,
timeout(CONNECTION_TIMEOUT, `Could not connect in ${CONNECTION_TIMEOUT}ms`),
]);

this._ws.on('ping', () => {
this.isAlive = true;
});

return Promise.race([
connect,
timeout(CONNECTION_TIMEOUT, `Could not connect in ${CONNECTION_TIMEOUT}ms`),
]);
this._ws.on('message', data => {
this._handleMessage(data as string);
});
}

public async disconnect(): Promise<void> {
this._requestCounter = 0;
this._pendingRequests = {};

if (!this._ws) {
return Promise.resolve();
}
Expand All @@ -70,8 +113,63 @@ export class WSClient {
});
}

// public async invoke<T>(actionName: string, params?: Record<string, unknown>): Promise<T> {
// }
public async invoke<T>(actionName: string, params?: Record<string, unknown>): Promise<T> {
const request = {
jsonrpc: '2.9',
id: this._requestCounter,
method: actionName,
params: params ?? {},
};

// public subscribe(eventName: string, cb: EventCallback): void {}
const send = new Promise((resolve, reject) => {
this._ws?.send(JSON.stringify(request), (err): void => {
if (err) {
return reject(err);
}

return resolve();
});
});

await Promise.race([
send,
timeout(ACKNOWLEDGMENT_TIMEOUT, `Request is not acknowledged in ${ACKNOWLEDGMENT_TIMEOUT}ms`),
]);

const response = defer<T>();
this._pendingRequests[this._requestCounter] = response;
this._requestCounter += 1;

return Promise.race<T>([
response.promise,
timeout<T>(RESPONSE_TIMEOUT, `Response not received in ${RESPONSE_TIMEOUT}ms`),
]);
}

public subscribe(eventName: string, cb: EventCallback): void {
this._emitter.on(eventName, cb);
}

private _handleMessage(message: string): void {
const res = JSON.parse(message) as WSResponse;

// Its an event
if ((res.id === undefined || res.id === null) && res.method) {
this._emitter.emit(res.method, res.params);

// Its a response for a request
} else {
const id = typeof res.id === 'number' ? res.id : parseInt(res.id as string, 10);

if (this._pendingRequests[id]) {
if (res.error) {
this._pendingRequests[id].reject(res.error);
} else {
this._pendingRequests[id].resolve(res.result);
}

delete this._pendingRequests[id];
}
}
}
}

0 comments on commit 782ebab

Please sign in to comment.