-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
multiplex JSON-RPC connections over a single web socket
Signed-off-by: Anton Kosiakov <anton.kosyakov@typefox.io>
- Loading branch information
Showing
8 changed files
with
339 additions
and
135 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,8 @@ | ||
/* | ||
* Copyright (C) 2017 TypeFox and others. | ||
* Copyright (C) 2018 TypeFox and others. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 | ||
*/ | ||
|
||
export * from './connection'; | ||
export * from './ws-connection-provider'; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
138 changes: 138 additions & 0 deletions
138
packages/core/src/browser/messaging/ws-connection-provider.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
/* | ||
* Copyright (C) 2018 TypeFox and others. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 | ||
*/ | ||
|
||
import { injectable, interfaces } from "inversify"; | ||
import { createWebSocketConnection, Logger, ConsoleLogger } from "vscode-ws-jsonrpc/lib"; | ||
import { ConnectionHandler, JsonRpcProxyFactory, JsonRpcProxy } from "../../common"; | ||
import { WebSocketChannel } from "../../common/messaging/web-socket-channel"; | ||
import { Endpoint } from "../endpoint"; | ||
const ReconnectingWebSocket = require('reconnecting-websocket'); | ||
|
||
export interface WebSocketOptions { | ||
/** | ||
* True by default. | ||
*/ | ||
reconnecting?: boolean; | ||
} | ||
|
||
@injectable() | ||
export class WebSocketConnectionProvider { | ||
|
||
static createProxy<T extends object>(container: interfaces.Container, path: string, target?: object): JsonRpcProxy<T> { | ||
return container.get(WebSocketConnectionProvider).createProxy<T>(path, target); | ||
} | ||
|
||
protected channelIdSeq = 0; | ||
protected readonly socket: WebSocket; | ||
protected readonly channels = new Map<number, WebSocketChannel>(); | ||
|
||
constructor() { | ||
const url = this.createWebSocketUrl('/services'); | ||
const socket = this.createWebSocket(url); | ||
socket.onerror = console.error; | ||
socket.onclose = ({ code, reason }) => { | ||
for (const channel of this.channels.values()) { | ||
channel.fireClose(code, reason); | ||
} | ||
this.channels.clear(); | ||
}; | ||
socket.onmessage = ({ data }) => { | ||
const message: WebSocketChannel.Message = JSON.parse(data); | ||
const channel = this.channels.get(message.id); | ||
if (channel) { | ||
channel.handleMessage(message); | ||
} else { | ||
console.error('The ws channel does not exist', message.id); | ||
} | ||
}; | ||
this.socket = socket; | ||
} | ||
|
||
/** | ||
* Create a proxy object to remote interface of T type | ||
* over a web socket connection for the given path. | ||
* | ||
* An optional target can be provided to handle | ||
* notifications and requests from a remote side. | ||
*/ | ||
createProxy<T extends object>(path: string, target?: object): JsonRpcProxy<T> { | ||
const factory = new JsonRpcProxyFactory<T>(target); | ||
this.listen({ | ||
path, | ||
onConnection: c => factory.listen(c) | ||
}); | ||
return factory.createProxy(); | ||
} | ||
|
||
/** | ||
* Install a connection handler for the given path. | ||
*/ | ||
listen(handler: ConnectionHandler, options?: WebSocketOptions): void { | ||
if (this.socket.readyState === WebSocket.OPEN) { | ||
this.openChannel(handler, options); | ||
} else { | ||
this.socket.addEventListener('open', () => this.openChannel(handler, options), { once: true }); | ||
} | ||
} | ||
|
||
protected openChannel(handler: ConnectionHandler, options?: WebSocketOptions): void { | ||
const id = this.channelIdSeq++; | ||
const channel = this.createChannel(id); | ||
this.channels.set(id, channel); | ||
channel.onOpen(() => { | ||
const connection = createWebSocketConnection(channel, this.createLogger()); | ||
connection.onDispose(() => this.closeChannel(id, handler, options)); | ||
handler.onConnection(connection); | ||
}); | ||
channel.open(handler.path); | ||
} | ||
|
||
protected createChannel(id: number): WebSocketChannel { | ||
return new WebSocketChannel(id, content => this.socket.send(content)); | ||
} | ||
|
||
protected createLogger(): Logger { | ||
return new ConsoleLogger(); | ||
} | ||
|
||
protected closeChannel(id: number, handler: ConnectionHandler, options?: WebSocketOptions): void { | ||
const channel = this.channels.get(id); | ||
if (channel) { | ||
this.channels.delete(id); | ||
if (this.socket.readyState < WebSocket.CLOSING) { | ||
channel.close(); | ||
} | ||
} | ||
const { reconnecting } = { reconnecting: true, ...options }; | ||
if (reconnecting) { | ||
this.listen(handler, options); | ||
} | ||
} | ||
|
||
/** | ||
* Creates a websocket URL to the current location | ||
*/ | ||
protected createWebSocketUrl(path: string): string { | ||
const endpoint = new Endpoint({ path }); | ||
return endpoint.getWebSocketUrl().toString(); | ||
} | ||
|
||
/** | ||
* Creates a web socket for the given url | ||
*/ | ||
protected createWebSocket(url: string): WebSocket { | ||
return new ReconnectingWebSocket(url, undefined, { | ||
maxReconnectionDelay: 10000, | ||
minReconnectionDelay: 1000, | ||
reconnectionDelayGrowFactor: 1.3, | ||
connectionTimeout: 10000, | ||
maxRetries: Infinity, | ||
debug: false | ||
}); | ||
} | ||
|
||
} |
127 changes: 127 additions & 0 deletions
127
packages/core/src/common/messaging/web-socket-channel.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
/* | ||
* Copyright (C) 2018 TypeFox and others. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 | ||
*/ | ||
|
||
// tslint:disable:no-any | ||
|
||
import { IWebSocket } from "vscode-ws-jsonrpc/lib/socket/socket"; | ||
import { Disposable, DisposableCollection } from "../disposable"; | ||
|
||
export class WebSocketChannel implements IWebSocket { | ||
|
||
protected readonly toDispose = new DisposableCollection(); | ||
|
||
constructor( | ||
readonly id: number, | ||
protected readonly doSend: (content: string) => void | ||
) { | ||
this.toDispose.push(Disposable.NULL); | ||
} | ||
|
||
dispose(): void { | ||
this.toDispose.dispose(); | ||
} | ||
|
||
protected checkNotDisposed(): void { | ||
if (this.toDispose.disposed) { | ||
throw new Error('The channel has been disposed.'); | ||
} | ||
} | ||
|
||
handleMessage(message: WebSocketChannel.Message) { | ||
if (message.kind === 'ready') { | ||
this.fireOpen(); | ||
} else if (message.kind === 'data') { | ||
this.fireMessage(message.content); | ||
} else if (message.kind === 'close') { | ||
this.fireClose(1000, ''); | ||
} | ||
} | ||
|
||
open(path: string): void { | ||
this.checkNotDisposed(); | ||
this.doSend(JSON.stringify(<WebSocketChannel.OpenMessage>{ | ||
kind: 'open', | ||
id: this.id, | ||
path | ||
})); | ||
} | ||
|
||
ready(): void { | ||
this.checkNotDisposed(); | ||
this.doSend(JSON.stringify(<WebSocketChannel.ReadyMessage>{ | ||
kind: 'ready', | ||
id: this.id | ||
})); | ||
} | ||
|
||
send(content: string): void { | ||
this.checkNotDisposed(); | ||
this.doSend(JSON.stringify(<WebSocketChannel.DataMessage>{ | ||
kind: 'data', | ||
id: this.id, | ||
content | ||
})); | ||
} | ||
|
||
close(): void { | ||
this.checkNotDisposed(); | ||
this.doSend(JSON.stringify(<WebSocketChannel.CloseMessage>{ | ||
kind: 'close', | ||
id: this.id | ||
})); | ||
} | ||
|
||
protected fireOpen: () => void = () => { }; | ||
onOpen(cb: () => void): void { | ||
this.checkNotDisposed(); | ||
this.fireOpen = cb; | ||
this.toDispose.push(Disposable.create(() => this.fireOpen = () => { })); | ||
} | ||
|
||
protected fireMessage: (data: any) => void = () => { }; | ||
onMessage(cb: (data: any) => void): void { | ||
this.checkNotDisposed(); | ||
this.fireMessage = cb; | ||
this.toDispose.push(Disposable.create(() => this.fireMessage = () => { })); | ||
} | ||
|
||
fireError: (reason: any) => void = () => { }; | ||
onError(cb: (reason: any) => void): void { | ||
this.checkNotDisposed(); | ||
this.fireError = cb; | ||
this.toDispose.push(Disposable.create(() => this.fireError = () => { })); | ||
} | ||
|
||
fireClose: (code: number, reason: string) => void = () => { }; | ||
onClose(cb: (code: number, reason: string) => void): void { | ||
this.checkNotDisposed(); | ||
this.fireClose = cb; | ||
this.toDispose.push(Disposable.create(() => this.fireClose = () => { })); | ||
} | ||
|
||
} | ||
export namespace WebSocketChannel { | ||
export interface OpenMessage { | ||
kind: 'open' | ||
id: number | ||
path: string | ||
} | ||
export interface ReadyMessage { | ||
kind: 'ready' | ||
id: number | ||
} | ||
export interface DataMessage { | ||
kind: 'data' | ||
id: number | ||
content: string | ||
} | ||
export interface CloseMessage { | ||
kind: 'close' | ||
id: number | ||
} | ||
export type Message = OpenMessage | ReadyMessage | DataMessage | CloseMessage; | ||
} |
Oops, something went wrong.