Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@
*.log/
*es6/
*node_modules/
dist/
dist/*
*/dist/*
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,20 @@ Reliable Websocket Client Library

This library provides a reliable websocket connection. It sends heartbeats (pings), reconnects if connection is broken and also keeps a buffer for messages that were send while the connection is down and those messages are later sent when connection is up.

Buffer for message can be a ring buffer (older messages are dropped) or a Queue (newer messages are dropeed)

# Use
Use either @bhoos/websocket-node or @bhoos/websocket-browser

```ts
import {ReliableWS} from '@bhoos/websocket'
import {ReliableWS, BufferType} from '@bhoos/websocket-node'

const config = {
PING_INTERVAL: 1000, // ms
RECONNECT_INTERVAL: 5000, // ms
MSG_BUFFER_SIZE: 20,
PING_MESSAGE : 'ping'
PING_MESSAGE : 'ping',
BUFFER_TYPE: BufferType.RingBuffer
}

const agent = new ReliableWS('ws://localhost:3030/subscribe/', config);
Expand Down
26 changes: 26 additions & 0 deletions base/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"name": "@bhoos/websocket-base",
"version": "0.0.2",
"description": "reliable websocket client",
"repository": "https://github.com/Bhoos/websocket",
"author": "Bibek Panthi <bpanthi977@gmail.com>",
"license": "MIT",
"main": "es6/index.js",
"type": "module",
"files": [
"es6",
"dist"
],
"scripts": {
"build:cjs": "tsc",
"build:es6": "mkdir -p es6 && echo '{\"type\":\"module\"}' > es6/package.json && tsc --module es6 --outDir es6",
"build": "yarn build:cjs && yarn build:es6"
},
"publishConfig": {
"registry": "https://npm.pkg.github.com/"
},
"dependencies": {},
"devDependencies": {
"typescript": "^4.1.3"
}
}
65 changes: 65 additions & 0 deletions base/src/buffer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
export enum BufferType {
RingBuffer = 1,
FixedSizeQueue = 2
}

export class Buffer<T> {
size : number;
type : BufferType;
private count : number = 0 ;
private end : number = 0;
private buffer : T[];
constructor(size: number, type : BufferType) {
this.size = size;
this.type = type;
this.buffer = Array(size);
}


add(...items : T[]) : void{
if (this.type === BufferType.FixedSizeQueue){
const len = Math.min(items.length, this.size - this.count);
for (var i = 0; i < len; i++) {
this.buffer[this.end] = items[i];
this.end = this.end + 1;
}
this.count += len;
} else {
for (var i = 0; i < items.length; i++) {
this.buffer[this.end] = items[i];
this.end = (this.end+1) % this.size;
}
this.count = Math.min(this.count+items.length, this.size);
}
}

clear(): void {
this.count =0;
this.end = 0;
}

toArray() {
let pos = this.end - this.count;
if (pos < 0) {
pos = this.size + pos;
}

let res : T[] = [];
for (let i=0; i< this.count; i++) {
res.push(this.buffer[pos]);
pos = (pos + 1) % this.size;
}
return res;
}

forEach(fn : (el : T) => void) {
let pos = this.end - this.count;
if (pos < 0) {
pos = this.size - pos;
}
for (let i=0; i< this.count; i++) {
fn(this.buffer[pos]);
pos = (pos + 1) % this.size;
}
}
}
70 changes: 43 additions & 27 deletions src/index.ts → base/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
import RingBuffer from 'ring-buffer-ts';
import {Buffer, BufferType} from './buffer.js';

export { BufferType }

export type Config = {
// all time intervals are in miliseconds
PING_INTERVAL: number; // Agents sends ping to management server every `PING_INTERVAL`
PING_MESSAGE: string;
RECONNECT_INTERVAL: number; // If in any case the connection is broken then a reconnect attempt is made every `RECONNECT_INTERVAL`
MSG_BUFFER_SIZE: number; // number of messages to remember and send later (if the connection doesnot exist, or is broken)
BUFFER_TYPE: BufferType;
};

interface Event {
type: string;
}

interface ErrorEvent extends Event {}
interface ErrorEvent extends Event {
}

interface CloseEvent extends Event {
wasClean: boolean;
Expand All @@ -33,34 +37,43 @@ export class ReliableWS<
ErrorEv extends ErrorEvent,
CloseEv extends CloseEvent,
MessageEv extends MessageEvent,
WSArgs
> {
private ws?: WebSocket;
private address: string;
private config: Config;

private wsOpen: boolean = false;
private onceOpened: boolean = false;
private msgBuffer: RingBuffer.RingBuffer<any>;
private msgBuffer: Buffer<any>;
private pingTimer?: ReturnType<typeof setInterval>;
private reconnectTimeout?: ReturnType<typeof setTimeout>;
private shuttingDown: boolean = false;
private wsargs? : WSArgs;
private tries: number = 0;

onopen: ((event: Ev) => void) | null = null;
onerror: ((event: ErrorEv) => void) | null = null;
onclose: ((event: CloseEv) => void) | null = null;
onmessage: ((event: MessageEv) => void) | null = null;
// this event is issued when the connection is disconnected
// reliable websocket will nonethless be trying connection attempts.
// tries: number of times connection has been tried to establish since start or after the last successfull connection
ondisconnect: ((event : CloseEv, tries : number) => void) | null = null;

constructor(address: string, options: Config) {
constructor(address: string, options: Config, wsargs?: WSArgs ) {
this.address = address;
this.config = options;
this.msgBuffer = new RingBuffer.RingBuffer(this.config.MSG_BUFFER_SIZE);
this.wsargs = wsargs;
this.msgBuffer = new Buffer(this.config.MSG_BUFFER_SIZE, this.config.BUFFER_TYPE);
this.setupConnection();
}

private setupConnection() {
if (this.shuttingDown) return;
this.changeConfig(this.config);
this.ws = new WebSocket(this.address);
//@ts-ignore
this.ws = new WebSocket(this.address, this.wsargs);

this.ws.onerror = _event => {
const event = _event as unknown as ErrorEv;
Expand All @@ -70,37 +83,40 @@ export class ReliableWS<
this.ws.onopen = _event => {
const event = _event as unknown as Ev;
this.wsOpen = true;
this.tries = 0;
if (this.onopen && !this.onceOpened) {
this.onceOpened = true;
this.onopen(event); // this is triggerred on the first time only
this.onceOpened = true;
this.onopen(event); // this is triggerred on the first time only
}

if (this.ws)
this.ws.onmessage = _event => {
const event = _event as unknown as MessageEv;
if (this.onmessage) this.onmessage(event);
};

const queued: string[] = this.msgBuffer.toArray();
this.ws.onmessage = _event => {
const event = _event as unknown as MessageEv;
if (this.onmessage) this.onmessage(event);
};

this.msgBuffer.forEach((msg) => {
this.ws?.send(msg);
})
this.msgBuffer.clear();
for (const msg of queued) {
this.ws?.send(msg);
}
if (this.shuttingDown) this.ws?.close();
};

this.ws.onclose = _event => {
const event = _event as unknown as CloseEv;
this.tries++;
if (this.shuttingDown) {
if (this.onclose) this.onclose(event);
return;
if (this.onclose) this.onclose(event);
return;
} else {
if (this.ondisconnect) this.ondisconnect(event, this.tries);
}
// this means connection was closed unexpetedly
this.wsOpen = false;
if (this.pingTimer) clearInterval(this.pingTimer);
this.reconnectTimeout = setTimeout(
this.setupConnection.bind(this),
this.config.RECONNECT_INTERVAL,
this.setupConnection.bind(this),
this.config.RECONNECT_INTERVAL,
);
};
}
Expand All @@ -112,30 +128,30 @@ export class ReliableWS<
}
if (this.config.PING_INTERVAL != 0)
this.pingTimer = setInterval(this.pingIfUp.bind(this), this.config.PING_INTERVAL);
if (this.msgBuffer.getSize() != this.config.MSG_BUFFER_SIZE) {
const newBuffer = new RingBuffer.RingBuffer<string>(this.config.MSG_BUFFER_SIZE);
if (this.msgBuffer.size != this.config.MSG_BUFFER_SIZE) {
const newBuffer = new Buffer<string>(this.config.MSG_BUFFER_SIZE, this.config.BUFFER_TYPE);
newBuffer.add(...this.msgBuffer.toArray());
this.msgBuffer = newBuffer;
}
}

private pingIfUp() {
this.ws?.send(this.config.PING_MESSAGE);
if (this.wsOpen)
this.ws?.send(this.config.PING_MESSAGE);
}

send(msg: any) {
if (this.wsOpen && this.ws) {
this.ws.send(msg);
} else if (this.msgBuffer.getSize() != 0) {
this.msgBuffer.add(msg);
}
this.msgBuffer.add(msg);
}

close() {
this.shuttingDown = true;
if (!this.onceOpened && this.config.RECONNECT_INTERVAL != 0)
console.debug(
'.close() called on Reliable Websocket, before any connection was successful. Be careful',
'.close() called on Reliable Websocket, before any connection was successful. Be careful',
);
if (this.pingTimer) clearInterval(this.pingTimer);
if (this.reconnectTimeout) clearTimeout(this.reconnectTimeout);
Expand Down
File renamed without changes.
19 changes: 19 additions & 0 deletions base/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"compileOnSave": true,
"compilerOptions": {
"moduleResolution": "node",
"strict": true,
"target": "es2017",
"lib": ["es2017", "DOM"],
"module": "commonjs",
"esModuleInterop": true,
"noImplicitAny": true,
"removeComments": true,
"preserveConstEnums": true,
"declaration": true,
"outDir": "dist",
"sourceMap": true
},
"include": ["src/**/*"],
"exclude": ["node_modules", "src/node"]
}
23 changes: 23 additions & 0 deletions browser/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"name": "@bhoos/websocket",
"version": "0.0.2",
"main": "index.js",
"license": "MIT",
"dependencies": {
"@bhoos/websocket-base": "^0.0.2"
},
"main": "es6/index.js",
"type": "module",
"files": [
"es6",
"dist"
],
"scripts": {
"build:cjs": "tsc",
"build:es6": "mkdir -p es6 && echo '{\"type\":\"module\"}' > es6/package.json && tsc --module es6 --outDir es6",
"build": "yarn build:cjs && yarn build:es6"
},
"publishConfig": {
"registry": "https://npm.pkg.github.com/"
}
}
4 changes: 4 additions & 0 deletions browser/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import { ReliableWS as WS } from '@bhoos/websocket-base'
export { Config, BufferType } from '@bhoos/websocket-base'
export class ReliableWS extends WS<Event, Event, CloseEvent, MessageEvent, string | string[]> {
}
19 changes: 19 additions & 0 deletions browser/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"compileOnSave": true,
"compilerOptions": {
"moduleResolution": "node",
"strict": true,
"target": "es2017",
"lib": ["es2017", "dom"],
"module": "commonjs",
"esModuleInterop": true,
"noImplicitAny": true,
"removeComments": true,
"preserveConstEnums": true,
"declaration": true,
"outDir": "dist",
"sourceMap": true
},
"include": ["src/**/*"],
"exclude": ["node_modules", "src/node"]
}
2 changes: 1 addition & 1 deletion examples/browser/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@
"vite": "^3.0.7"
},
"dependencies": {
"@bhoos/websocket": "^0.0.0"
"@bhoos/websocket": "^0.0.1"
}
}
17 changes: 11 additions & 6 deletions examples/browser/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
import {ReliableWS} from '@bhoos/websocket';
import {ReliableWS, BufferType} from '@bhoos/websocket';
const config = {
PING_INTERVAL: 1000, // ms
RECONNECT_INTERVAL: 5000, // ms
MSG_BUFFER_SIZE: 20,
PING_MESSAGE : 'ping'
MSG_BUFFER_SIZE: 2,
PING_MESSAGE : '{"type" : "ping"}',
BUFFER_TYPE: BufferType.FixedSizeQueue,
}

const agent = new ReliableWS('ws://localhost:3030/subscribe/', config);

agent.send(JSON.stringify({type: 'ping'}));
agent.send(JSON.stringify({type: 'msg1'}));
agent.send(JSON.stringify({type: 'msg2'}));
agent.send(JSON.stringify({type: 'msg3'}));
agent.send(JSON.stringify({type: 'msg4'}));
agent.send(JSON.stringify({type: 'msg5'}));
agent.onerror = (event) => {
console.log("Error: ", event.message);
console.log("Error: ", event);
}

agent.onopen = (event : Event) => {
console.log("Opened!!",event);
console.log("Opened!!", event);
}

agent.onmessage = (event) => {
Expand Down
Loading