Skip to content

Commit

Permalink
multiplex JSON-RPC connections over a single web socket
Browse files Browse the repository at this point in the history
Signed-off-by: Anton Kosiakov <anton.kosyakov@typefox.io>
  • Loading branch information
akosyakov committed Apr 25, 2018
1 parent 0bad58b commit 0334978
Show file tree
Hide file tree
Showing 12 changed files with 431 additions and 174 deletions.
2 changes: 1 addition & 1 deletion packages/core/src/browser/frontend-application-module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import { ThemingCommandContribution, ThemeService } from './theming';
import { ConnectionStatusService, FrontendConnectionStatusService, ApplicationConnectionStatusContribution } from './connection-status-service';
import { DiffUriLabelProviderContribution } from './diff-uris';
import { ApplicationServer, applicationPath } from "../common/application-protocol";
import { WebSocketConnectionProvider } from "./messaging/connection";
import { WebSocketConnectionProvider } from "./messaging";
import { AboutDialog, AboutDialogProps } from "./about-dialog";
import { EnvVariablesServer, envVariablesPath } from "./../common/env-variables";
import { FrontendApplicationStateService } from './frontend-application-state';
Expand Down
93 changes: 0 additions & 93 deletions packages/core/src/browser/messaging/connection.ts

This file was deleted.

4 changes: 2 additions & 2 deletions packages/core/src/browser/messaging/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/*
* Copyright (C) 2017 TypeFox and others.
* Copyright (C) 2018 TypeFox and others.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*/

export * from './connection';
export * from './ws-connection-provider';
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

import { ContainerModule } from "inversify";
import { WebSocketConnectionProvider } from './connection';
import { WebSocketConnectionProvider } from './ws-connection-provider';

export const messagingFrontendModule = new ContainerModule(bind => {
bind(WebSocketConnectionProvider).toSelf().inSingletonScope();
Expand Down
138 changes: 138 additions & 0 deletions packages/core/src/browser/messaging/ws-connection-provider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright (C) 2018 TypeFox and others.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*/

import { injectable, interfaces } from "inversify";
import { createWebSocketConnection, Logger, ConsoleLogger } from "vscode-ws-jsonrpc/lib";
import { ConnectionHandler, JsonRpcProxyFactory, JsonRpcProxy } from "../../common";
import { WebSocketChannel } from "../../common/messaging/web-socket-channel";
import { Endpoint } from "../endpoint";
const ReconnectingWebSocket = require('reconnecting-websocket');

export interface WebSocketOptions {
/**
* True by default.
*/
reconnecting?: boolean;
}

@injectable()
export class WebSocketConnectionProvider {

static createProxy<T extends object>(container: interfaces.Container, path: string, target?: object): JsonRpcProxy<T> {
return container.get(WebSocketConnectionProvider).createProxy<T>(path, target);
}

protected channelIdSeq = 0;
protected readonly socket: WebSocket;
protected readonly channels = new Map<number, WebSocketChannel>();

constructor() {
const url = this.createWebSocketUrl(WebSocketChannel.wsPath);
const socket = this.createWebSocket(url);
socket.onerror = console.error;
socket.onclose = ({ code, reason }) => {
for (const channel of this.channels.values()) {
channel.fireClose(code, reason);
}
this.channels.clear();
};
socket.onmessage = ({ data }) => {
const message: WebSocketChannel.Message = JSON.parse(data);
const channel = this.channels.get(message.id);
if (channel) {
channel.handleMessage(message);
} else {
console.error('The ws channel does not exist', message.id);
}
};
this.socket = socket;
}

/**
* Create a proxy object to remote interface of T type
* over a web socket connection for the given path.
*
* An optional target can be provided to handle
* notifications and requests from a remote side.
*/
createProxy<T extends object>(path: string, target?: object): JsonRpcProxy<T> {
const factory = new JsonRpcProxyFactory<T>(target);
this.listen({
path,
onConnection: c => factory.listen(c)
});
return factory.createProxy();
}

/**
* Install a connection handler for the given path.
*/
listen(handler: ConnectionHandler, options?: WebSocketOptions): void {
if (this.socket.readyState === WebSocket.OPEN) {
this.openChannel(handler, options);
} else {
this.socket.addEventListener('open', () => this.openChannel(handler, options), { once: true });
}
}

protected openChannel(handler: ConnectionHandler, options?: WebSocketOptions): void {
const id = this.channelIdSeq++;
const channel = this.createChannel(id);
this.channels.set(id, channel);
channel.onOpen(() => {
const connection = createWebSocketConnection(channel, this.createLogger());
connection.onDispose(() => this.closeChannel(id, handler, options));
handler.onConnection(connection);
});
channel.open(handler.path);
}

protected createChannel(id: number): WebSocketChannel {
return new WebSocketChannel(id, content => this.socket.send(content));
}

protected createLogger(): Logger {
return new ConsoleLogger();
}

protected closeChannel(id: number, handler: ConnectionHandler, options?: WebSocketOptions): void {
const channel = this.channels.get(id);
if (channel) {
this.channels.delete(id);
if (this.socket.readyState < WebSocket.CLOSING) {
channel.close();
}
}
const { reconnecting } = { reconnecting: true, ...options };
if (reconnecting) {
this.listen(handler, options);
}
}

/**
* Creates a websocket URL to the current location
*/
protected createWebSocketUrl(path: string): string {
const endpoint = new Endpoint({ path });
return endpoint.getWebSocketUrl().toString();
}

/**
* Creates a web socket for the given url
*/
protected createWebSocket(url: string): WebSocket {
return new ReconnectingWebSocket(url, undefined, {
maxReconnectionDelay: 10000,
minReconnectionDelay: 1000,
reconnectionDelayGrowFactor: 1.3,
connectionTimeout: 10000,
maxRetries: Infinity,
debug: false
});
}

}
129 changes: 129 additions & 0 deletions packages/core/src/common/messaging/web-socket-channel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright (C) 2018 TypeFox and others.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*/

// tslint:disable:no-any

import { IWebSocket } from "vscode-ws-jsonrpc/lib/socket/socket";
import { Disposable, DisposableCollection } from "../disposable";

export class WebSocketChannel implements IWebSocket {

static wsPath = '/services';

protected readonly toDispose = new DisposableCollection();

constructor(
readonly id: number,
protected readonly doSend: (content: string) => void
) {
this.toDispose.push(Disposable.NULL);
}

dispose(): void {
this.toDispose.dispose();
}

protected checkNotDisposed(): void {
if (this.toDispose.disposed) {
throw new Error('The channel has been disposed.');
}
}

handleMessage(message: WebSocketChannel.Message) {
if (message.kind === 'ready') {
this.fireOpen();
} else if (message.kind === 'data') {
this.fireMessage(message.content);
} else if (message.kind === 'close') {
this.fireClose(1000, '');
}
}

open(path: string): void {
this.checkNotDisposed();
this.doSend(JSON.stringify(<WebSocketChannel.OpenMessage>{
kind: 'open',
id: this.id,
path
}));
}

ready(): void {
this.checkNotDisposed();
this.doSend(JSON.stringify(<WebSocketChannel.ReadyMessage>{
kind: 'ready',
id: this.id
}));
}

send(content: string): void {
this.checkNotDisposed();
this.doSend(JSON.stringify(<WebSocketChannel.DataMessage>{
kind: 'data',
id: this.id,
content
}));
}

close(): void {
this.checkNotDisposed();
this.doSend(JSON.stringify(<WebSocketChannel.CloseMessage>{
kind: 'close',
id: this.id
}));
}

protected fireOpen: () => void = () => { };
onOpen(cb: () => void): void {
this.checkNotDisposed();
this.fireOpen = cb;
this.toDispose.push(Disposable.create(() => this.fireOpen = () => { }));
}

protected fireMessage: (data: any) => void = () => { };
onMessage(cb: (data: any) => void): void {
this.checkNotDisposed();
this.fireMessage = cb;
this.toDispose.push(Disposable.create(() => this.fireMessage = () => { }));
}

fireError: (reason: any) => void = () => { };
onError(cb: (reason: any) => void): void {
this.checkNotDisposed();
this.fireError = cb;
this.toDispose.push(Disposable.create(() => this.fireError = () => { }));
}

fireClose: (code: number, reason: string) => void = () => { };
onClose(cb: (code: number, reason: string) => void): void {
this.checkNotDisposed();
this.fireClose = cb;
this.toDispose.push(Disposable.create(() => this.fireClose = () => { }));
}

}
export namespace WebSocketChannel {
export interface OpenMessage {
kind: 'open'
id: number
path: string
}
export interface ReadyMessage {
kind: 'ready'
id: number
}
export interface DataMessage {
kind: 'data'
id: number
content: string
}
export interface CloseMessage {
kind: 'close'
id: number
}
export type Message = OpenMessage | ReadyMessage | DataMessage | CloseMessage;
}
Loading

0 comments on commit 0334978

Please sign in to comment.