diff --git a/.gitignore b/.gitignore index 9384e9e..f911958 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,6 @@ *.log/ *es6/ *node_modules/ +dist/ +dist/* +*/dist/* diff --git a/README.md b/README.md index f05752e..52a6889 100644 --- a/README.md +++ b/README.md @@ -3,15 +3,20 @@ Reliable Websocket Client Library This library provides a reliable websocket connection. It sends heartbeats (pings), reconnects if connection is broken and also keeps a buffer for messages that were send while the connection is down and those messages are later sent when connection is up. +Buffer for message can be a ring buffer (older messages are dropped) or a Queue (newer messages are dropeed) + # Use +Use either @bhoos/websocket-node or @bhoos/websocket-browser + ```ts -import {ReliableWS} from '@bhoos/websocket' +import {ReliableWS, BufferType} from '@bhoos/websocket-node' const config = { PING_INTERVAL: 1000, // ms RECONNECT_INTERVAL: 5000, // ms MSG_BUFFER_SIZE: 20, - PING_MESSAGE : 'ping' + PING_MESSAGE : 'ping', + BUFFER_TYPE: BufferType.RingBuffer } const agent = new ReliableWS('ws://localhost:3030/subscribe/', config); diff --git a/base/package.json b/base/package.json new file mode 100644 index 0000000..d529b83 --- /dev/null +++ b/base/package.json @@ -0,0 +1,26 @@ +{ + "name": "@bhoos/websocket-base", + "version": "0.0.2", + "description": "reliable websocket client", + "repository": "https://github.com/Bhoos/websocket", + "author": "Bibek Panthi ", + "license": "MIT", + "main": "es6/index.js", + "type": "module", + "files": [ + "es6", + "dist" + ], + "scripts": { + "build:cjs": "tsc", + "build:es6": "mkdir -p es6 && echo '{\"type\":\"module\"}' > es6/package.json && tsc --module es6 --outDir es6", + "build": "yarn build:cjs && yarn build:es6" + }, + "publishConfig": { + "registry": "https://npm.pkg.github.com/" + }, + "dependencies": {}, + "devDependencies": { + "typescript": "^4.1.3" + } +} diff --git a/base/src/buffer.ts b/base/src/buffer.ts new file mode 100644 index 0000000..f840cb9 --- /dev/null +++ b/base/src/buffer.ts @@ -0,0 +1,65 @@ +export enum BufferType { + RingBuffer = 1, + FixedSizeQueue = 2 +} + +export class Buffer { + size : number; + type : BufferType; + private count : number = 0 ; + private end : number = 0; + private buffer : T[]; + constructor(size: number, type : BufferType) { + this.size = size; + this.type = type; + this.buffer = Array(size); + } + + + add(...items : T[]) : void{ + if (this.type === BufferType.FixedSizeQueue){ + const len = Math.min(items.length, this.size - this.count); + for (var i = 0; i < len; i++) { + this.buffer[this.end] = items[i]; + this.end = this.end + 1; + } + this.count += len; + } else { + for (var i = 0; i < items.length; i++) { + this.buffer[this.end] = items[i]; + this.end = (this.end+1) % this.size; + } + this.count = Math.min(this.count+items.length, this.size); + } + } + + clear(): void { + this.count =0; + this.end = 0; + } + + toArray() { + let pos = this.end - this.count; + if (pos < 0) { + pos = this.size + pos; + } + + let res : T[] = []; + for (let i=0; i< this.count; i++) { + res.push(this.buffer[pos]); + pos = (pos + 1) % this.size; + } + return res; + } + + forEach(fn : (el : T) => void) { + let pos = this.end - this.count; + if (pos < 0) { + pos = this.size - pos; + } + for (let i=0; i< this.count; i++) { + fn(this.buffer[pos]); + pos = (pos + 1) % this.size; + } + } +} diff --git a/src/index.ts b/base/src/index.ts similarity index 65% rename from src/index.ts rename to base/src/index.ts index 138e822..e181eed 100644 --- a/src/index.ts +++ b/base/src/index.ts @@ -1,4 +1,6 @@ -import RingBuffer from 'ring-buffer-ts'; +import {Buffer, BufferType} from './buffer.js'; + +export { BufferType } export type Config = { // all time intervals are in miliseconds @@ -6,13 +8,15 @@ export type Config = { PING_MESSAGE: string; RECONNECT_INTERVAL: number; // If in any case the connection is broken then a reconnect attempt is made every `RECONNECT_INTERVAL` MSG_BUFFER_SIZE: number; // number of messages to remember and send later (if the connection doesnot exist, or is broken) + BUFFER_TYPE: BufferType; }; interface Event { type: string; } -interface ErrorEvent extends Event {} +interface ErrorEvent extends Event { +} interface CloseEvent extends Event { wasClean: boolean; @@ -33,6 +37,7 @@ export class ReliableWS< ErrorEv extends ErrorEvent, CloseEv extends CloseEvent, MessageEv extends MessageEvent, + WSArgs > { private ws?: WebSocket; private address: string; @@ -40,27 +45,35 @@ export class ReliableWS< private wsOpen: boolean = false; private onceOpened: boolean = false; - private msgBuffer: RingBuffer.RingBuffer; + private msgBuffer: Buffer; private pingTimer?: ReturnType; private reconnectTimeout?: ReturnType; private shuttingDown: boolean = false; + private wsargs? : WSArgs; + private tries: number = 0; onopen: ((event: Ev) => void) | null = null; onerror: ((event: ErrorEv) => void) | null = null; onclose: ((event: CloseEv) => void) | null = null; onmessage: ((event: MessageEv) => void) | null = null; + // this event is issued when the connection is disconnected + // reliable websocket will nonethless be trying connection attempts. + // tries: number of times connection has been tried to establish since start or after the last successfull connection + ondisconnect: ((event : CloseEv, tries : number) => void) | null = null; - constructor(address: string, options: Config) { + constructor(address: string, options: Config, wsargs?: WSArgs ) { this.address = address; this.config = options; - this.msgBuffer = new RingBuffer.RingBuffer(this.config.MSG_BUFFER_SIZE); + this.wsargs = wsargs; + this.msgBuffer = new Buffer(this.config.MSG_BUFFER_SIZE, this.config.BUFFER_TYPE); this.setupConnection(); } private setupConnection() { if (this.shuttingDown) return; this.changeConfig(this.config); - this.ws = new WebSocket(this.address); + //@ts-ignore + this.ws = new WebSocket(this.address, this.wsargs); this.ws.onerror = _event => { const event = _event as unknown as ErrorEv; @@ -70,37 +83,40 @@ export class ReliableWS< this.ws.onopen = _event => { const event = _event as unknown as Ev; this.wsOpen = true; + this.tries = 0; if (this.onopen && !this.onceOpened) { - this.onceOpened = true; - this.onopen(event); // this is triggerred on the first time only + this.onceOpened = true; + this.onopen(event); // this is triggerred on the first time only } if (this.ws) - this.ws.onmessage = _event => { - const event = _event as unknown as MessageEv; - if (this.onmessage) this.onmessage(event); - }; - - const queued: string[] = this.msgBuffer.toArray(); + this.ws.onmessage = _event => { + const event = _event as unknown as MessageEv; + if (this.onmessage) this.onmessage(event); + }; + + this.msgBuffer.forEach((msg) => { + this.ws?.send(msg); + }) this.msgBuffer.clear(); - for (const msg of queued) { - this.ws?.send(msg); - } if (this.shuttingDown) this.ws?.close(); }; this.ws.onclose = _event => { const event = _event as unknown as CloseEv; + this.tries++; if (this.shuttingDown) { - if (this.onclose) this.onclose(event); - return; + if (this.onclose) this.onclose(event); + return; + } else { + if (this.ondisconnect) this.ondisconnect(event, this.tries); } // this means connection was closed unexpetedly this.wsOpen = false; if (this.pingTimer) clearInterval(this.pingTimer); this.reconnectTimeout = setTimeout( - this.setupConnection.bind(this), - this.config.RECONNECT_INTERVAL, + this.setupConnection.bind(this), + this.config.RECONNECT_INTERVAL, ); }; } @@ -112,30 +128,30 @@ export class ReliableWS< } if (this.config.PING_INTERVAL != 0) this.pingTimer = setInterval(this.pingIfUp.bind(this), this.config.PING_INTERVAL); - if (this.msgBuffer.getSize() != this.config.MSG_BUFFER_SIZE) { - const newBuffer = new RingBuffer.RingBuffer(this.config.MSG_BUFFER_SIZE); + if (this.msgBuffer.size != this.config.MSG_BUFFER_SIZE) { + const newBuffer = new Buffer(this.config.MSG_BUFFER_SIZE, this.config.BUFFER_TYPE); newBuffer.add(...this.msgBuffer.toArray()); this.msgBuffer = newBuffer; } } private pingIfUp() { - this.ws?.send(this.config.PING_MESSAGE); + if (this.wsOpen) + this.ws?.send(this.config.PING_MESSAGE); } send(msg: any) { if (this.wsOpen && this.ws) { this.ws.send(msg); - } else if (this.msgBuffer.getSize() != 0) { - this.msgBuffer.add(msg); } + this.msgBuffer.add(msg); } close() { this.shuttingDown = true; if (!this.onceOpened && this.config.RECONNECT_INTERVAL != 0) console.debug( - '.close() called on Reliable Websocket, before any connection was successful. Be careful', + '.close() called on Reliable Websocket, before any connection was successful. Be careful', ); if (this.pingTimer) clearInterval(this.pingTimer); if (this.reconnectTimeout) clearTimeout(this.reconnectTimeout); diff --git a/src/utils.ts b/base/src/utils.ts similarity index 100% rename from src/utils.ts rename to base/src/utils.ts diff --git a/base/tsconfig.json b/base/tsconfig.json new file mode 100644 index 0000000..3a0c781 --- /dev/null +++ b/base/tsconfig.json @@ -0,0 +1,19 @@ +{ + "compileOnSave": true, + "compilerOptions": { + "moduleResolution": "node", + "strict": true, + "target": "es2017", + "lib": ["es2017", "DOM"], + "module": "commonjs", + "esModuleInterop": true, + "noImplicitAny": true, + "removeComments": true, + "preserveConstEnums": true, + "declaration": true, + "outDir": "dist", + "sourceMap": true + }, + "include": ["src/**/*"], + "exclude": ["node_modules", "src/node"] +} diff --git a/browser/package.json b/browser/package.json new file mode 100644 index 0000000..b4f4191 --- /dev/null +++ b/browser/package.json @@ -0,0 +1,23 @@ +{ + "name": "@bhoos/websocket", + "version": "0.0.2", + "main": "index.js", + "license": "MIT", + "dependencies": { + "@bhoos/websocket-base": "^0.0.2" + }, + "main": "es6/index.js", + "type": "module", + "files": [ + "es6", + "dist" + ], + "scripts": { + "build:cjs": "tsc", + "build:es6": "mkdir -p es6 && echo '{\"type\":\"module\"}' > es6/package.json && tsc --module es6 --outDir es6", + "build": "yarn build:cjs && yarn build:es6" + }, + "publishConfig": { + "registry": "https://npm.pkg.github.com/" + } +} diff --git a/browser/src/index.ts b/browser/src/index.ts new file mode 100644 index 0000000..2565500 --- /dev/null +++ b/browser/src/index.ts @@ -0,0 +1,4 @@ +import { ReliableWS as WS } from '@bhoos/websocket-base' +export { Config, BufferType } from '@bhoos/websocket-base' +export class ReliableWS extends WS { +} diff --git a/browser/tsconfig.json b/browser/tsconfig.json new file mode 100644 index 0000000..b043889 --- /dev/null +++ b/browser/tsconfig.json @@ -0,0 +1,19 @@ +{ + "compileOnSave": true, + "compilerOptions": { + "moduleResolution": "node", + "strict": true, + "target": "es2017", + "lib": ["es2017", "dom"], + "module": "commonjs", + "esModuleInterop": true, + "noImplicitAny": true, + "removeComments": true, + "preserveConstEnums": true, + "declaration": true, + "outDir": "dist", + "sourceMap": true + }, + "include": ["src/**/*"], + "exclude": ["node_modules", "src/node"] +} diff --git a/examples/browser/package.json b/examples/browser/package.json index 6bbc644..295c232 100644 --- a/examples/browser/package.json +++ b/examples/browser/package.json @@ -13,6 +13,6 @@ "vite": "^3.0.7" }, "dependencies": { - "@bhoos/websocket": "^0.0.0" + "@bhoos/websocket": "^0.0.1" } } diff --git a/examples/browser/src/main.ts b/examples/browser/src/main.ts index 87629d7..9df3015 100644 --- a/examples/browser/src/main.ts +++ b/examples/browser/src/main.ts @@ -1,20 +1,25 @@ -import {ReliableWS} from '@bhoos/websocket'; +import {ReliableWS, BufferType} from '@bhoos/websocket'; const config = { PING_INTERVAL: 1000, // ms RECONNECT_INTERVAL: 5000, // ms - MSG_BUFFER_SIZE: 20, - PING_MESSAGE : 'ping' + MSG_BUFFER_SIZE: 2, + PING_MESSAGE : '{"type" : "ping"}', + BUFFER_TYPE: BufferType.FixedSizeQueue, } const agent = new ReliableWS('ws://localhost:3030/subscribe/', config); -agent.send(JSON.stringify({type: 'ping'})); +agent.send(JSON.stringify({type: 'msg1'})); +agent.send(JSON.stringify({type: 'msg2'})); +agent.send(JSON.stringify({type: 'msg3'})); +agent.send(JSON.stringify({type: 'msg4'})); +agent.send(JSON.stringify({type: 'msg5'})); agent.onerror = (event) => { - console.log("Error: ", event.message); + console.log("Error: ", event); } agent.onopen = (event : Event) => { - console.log("Opened!!",event); + console.log("Opened!!", event); } agent.onmessage = (event) => { diff --git a/examples/node/package.json b/examples/node/package.json index 7893edd..e28b69d 100644 --- a/examples/node/package.json +++ b/examples/node/package.json @@ -11,7 +11,7 @@ "test": "yarn build && node es6/index.js" }, "dependencies": { - "@bhoos/websocket": "^0.0.0", + "@bhoos/websocket-node": "^0.0.1", "ws": "^8.8.1" }, "devDependencies": { diff --git a/examples/node/src/index.ts b/examples/node/src/index.ts index 6b73707..d72e16a 100644 --- a/examples/node/src/index.ts +++ b/examples/node/src/index.ts @@ -1,9 +1,10 @@ -import {ReliableWS} from '@bhoos/websocket'; +import {ReliableWS, BufferType} from '@bhoos/websocket-node'; const config = { - PING_INTERVAL: 1000, // ms + PING_INTERVAL: 0, // ms RECONNECT_INTERVAL: 5000, // ms MSG_BUFFER_SIZE: 20, - PING_MESSAGE : 'ping' + PING_MESSAGE : 'ping', + BUFFER_TYPE: BufferType.RingBuffer } const agent = new ReliableWS('ws://localhost:3030/subscribe/', config); @@ -14,8 +15,8 @@ agent.onerror = (event) => { } agent.onopen = (event) => { - console.log("Opened!!"); - agent.close(); + console.log("Opened!!", event.type); +// agent.close(); } agent.onmessage = (event) => { @@ -23,7 +24,7 @@ agent.onmessage = (event) => { }; agent.onclose = (ev) => { - console.log("closed"); + console.log("closed", ev.code, ev.reason); } -setTimeout(() => {console.log('1s') }, 1000); +// setTimeout(() => {console.log('1s') }, 1000); diff --git a/node/package.json b/node/package.json new file mode 100644 index 0000000..3d34b45 --- /dev/null +++ b/node/package.json @@ -0,0 +1,26 @@ +{ + "name": "@bhoos/websocket-node", + "version": "0.0.2", + "main": "es6/index.js", + "license": "MIT", + "dependencies": { + "@bhoos/websocket-base": "^0.0.2", + "ws": "^8.8.1" + }, + "type": "module", + "files": [ + "es6", + "dist" + ], + "scripts": { + "build:cjs": "tsc", + "build:es6": "mkdir -p es6 && echo '{\"type\":\"module\"}' > es6/package.json && tsc --module es6 --outDir es6", + "build": "yarn build:cjs && yarn build:es6" + }, + "publishConfig": { + "registry": "https://npm.pkg.github.com/" + }, + "devDependencies": { + "@types/ws": "^8.5.3" + } +} diff --git a/node/src/index.ts b/node/src/index.ts new file mode 100644 index 0000000..4466487 --- /dev/null +++ b/node/src/index.ts @@ -0,0 +1,8 @@ +import WebSocket from 'ws'; +import { ClientOptions, Event, CloseEvent, MessageEvent, ErrorEvent} from 'ws'; +import { ReliableWS as WS, BufferType } from '@bhoos/websocket-base' +//@ts-ignore +global.WebSocket = WebSocket +export class ReliableWS extends WS { +} +export {BufferType} diff --git a/tsconfig.json b/node/tsconfig.json similarity index 88% rename from tsconfig.json rename to node/tsconfig.json index f2adc85..414ff0b 100644 --- a/tsconfig.json +++ b/node/tsconfig.json @@ -4,14 +4,14 @@ "moduleResolution": "node", "strict": true, "target": "es2017", - "lib": ["es2017", "DOM"], + "lib": ["es2017"], "module": "commonjs", "esModuleInterop": true, "noImplicitAny": true, "removeComments": true, "preserveConstEnums": true, "declaration": true, - "outDir": "es6", + "outDir": "dist", "sourceMap": true }, "include": ["src/**/*"], diff --git a/package.json b/package.json index 40cc663..a191667 100644 --- a/package.json +++ b/package.json @@ -1,30 +1,15 @@ { "name": "@bhoos/websocket", - "version": "0.0.1", - "description": "reliable websocket client", - "repository": "https://github.com/Bhoos/websocket", - "publishConfig": { - "registry": "https://npm.pkg.github.com/" - }, - "author": "Bibek Panthi ", - "license": "MIT", - "browser": "es6/browser.js", - "main": "es6/node.js", - "type": "module", - "files": [ - "es6", - "dist" + "private": true, + "workspaces": [ + "base", + "node", + "browser" ], "scripts": { - "build:cjs": "tsc", - "build:es6": "mkdir -p es6 && echo '{\"type\":\"module\"}' > es6/package.json && tsc --module es6 --outDir es6", - "build": "yarn build:cjs && yarn build:es6" - }, - "dependencies": { - "ring-buffer-ts": "^1.0.3" + "build": "lerna run build" }, "devDependencies": { - "@types/ws": "^8.5.3", - "typescript": "^4.1.3" + "lerna": "^4.0.0" } } diff --git a/src/browser.ts b/src/browser.ts deleted file mode 100644 index 8a2362f..0000000 --- a/src/browser.ts +++ /dev/null @@ -1,4 +0,0 @@ -import { ReliableWS as WS } from './index.js' -export { Config } from './index.js' -export class ReliableWS extends WS { -} diff --git a/src/node.ts b/src/node.ts deleted file mode 100644 index 095c8ab..0000000 --- a/src/node.ts +++ /dev/null @@ -1,6 +0,0 @@ -import {WebSocket, Event, CloseEvent, MessageEvent, ErrorEvent} from 'ws'; -import { Config, ReliableWS as WS } from './index.js' -//@ts-ignore -global.WebSocket = WebSocket -export class ReliableWS extends WS { -}