Skip to content

Commit

Permalink
Use binary message RPC protocol for plugin API
Browse files Browse the repository at this point in the history
Refactors the plugin RPC protocol to make use of the new message-rpc introduced with eclipse-theia#11011/eclipse-theia#11228.
- Refactor plugin-ext RpcProtocol API to reuse the new message-rpc protocol
  - Remove custom RPC message encoding and handling reuse message-rpc
  - Implement `BatchingChannel` that queues messages and sends them accumulated on the next process.tick (replaces the old Multiplexer 
    implementation)
   - Refactors proxy handlers and remote target handlers
   - Use `Channel` instead of `MessageConnection` for creating new instances of RPCProtocol
   - Refactor `RpcMessageEncoder`/`RpcMessageDecoder` to enable overwritting of already registered value encoders/decoders. 
   - Add mode property to  base `RpcProtocol` to enable switching from a bidirectional RPC protocol to a client-only or server-only variant.
- Implement special message encoders and decoders for the plugin communication. (Replacement for the old `ObjectTransferrer` JSON replacers/revivers)
- Adapt `HostedPluginServer` and `HostedPluginClient` API to send/receive messages in binary format instead of strings. This enables direct writethrough of the binary messages received from the hosted plugin process.
- Adapt `hosted-plugin-process` and `plugin-host` to directly send binary messages via  `IpcChannel`/`BinaryMessagePipe`

- Remove incorrect (and unused) notification proxy identifiers and instantiation
  - NotificationExt was instantiated in the main context
  - There were unused notification proxy identifiers for main and ext in the wrong contexts

Part of eclipse-theia#10684
Fixes eclipse-theia#9514

Contributed on behalf of STMicroelectronics
  • Loading branch information
tortmayr committed Nov 29, 2022
1 parent 32deea2 commit 63a9a11
Show file tree
Hide file tree
Showing 22 changed files with 586 additions and 575 deletions.
25 changes: 20 additions & 5 deletions packages/core/src/common/message-rpc/channel.ts
Expand Up @@ -14,7 +14,6 @@
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************

import { injectable } from '../../../shared/inversify';
import { Disposable, DisposableCollection } from '../disposable';
import { Emitter, Event } from '../event';
import { ReadBuffer, WriteBuffer } from './message-buffer';
Expand Down Expand Up @@ -72,7 +71,6 @@ export type MessageProvider = () => ReadBuffer;
* Reusable abstract {@link Channel} implementation that sets up
* the basic channel event listeners and offers a generic close method.
*/
@injectable()
export abstract class AbstractChannel implements Channel {

onCloseEmitter: Emitter<ChannelCloseEvent> = new Emitter();
Expand Down Expand Up @@ -101,7 +99,21 @@ export abstract class AbstractChannel implements Channel {
}

abstract getWriteBuffer(): WriteBuffer;
}

/**
* A very basic {@link AbstractChannel} implementation which takes a function
* for retrieving the {@link WriteBuffer} as constructor argument.
*/
export class BasicChannel extends AbstractChannel {

constructor(protected writeBufferProvider: () => WriteBuffer) {
super();
}

getWriteBuffer(): WriteBuffer {
return this.writeBufferProvider();
}
}

/**
Expand Down Expand Up @@ -194,7 +206,7 @@ export class ChannelMultiplexer implements Disposable {
return this.handleClose(id);
}
case MessageTypes.Data: {
return this.handleData(id, buffer.sliceAtReadPosition());
return this.handleData(id, buffer);
}
}
}
Expand All @@ -206,7 +218,7 @@ export class ChannelMultiplexer implements Disposable {
const channel = this.createChannel(id);
this.pendingOpen.delete(id);
this.openChannels.set(id, channel);
resolve!(channel);
resolve(channel);
this.onOpenChannelEmitter.fire({ id, channel });
}
}
Expand Down Expand Up @@ -236,7 +248,7 @@ export class ChannelMultiplexer implements Disposable {
protected handleData(id: string, data: ReadBuffer): void {
const channel = this.openChannels.get(id);
if (channel) {
channel.onMessageEmitter.fire(() => data);
channel.onMessageEmitter.fire(() => data.sliceAtReadPosition());
}
}

Expand All @@ -263,6 +275,9 @@ export class ChannelMultiplexer implements Disposable {
}

open(id: string): Promise<Channel> {
if (this.openChannels.has(id)) {
throw new Error(`Another channel with the id '${id}' is already open.`);
}
const result = new Promise<Channel>((resolve, reject) => {
this.pendingOpen.set(id, resolve);
});
Expand Down
26 changes: 24 additions & 2 deletions packages/core/src/common/message-rpc/index.ts
Expand Up @@ -13,6 +13,28 @@
//
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************
export { RequestHandler, RpcProtocol, RpcProtocolOptions } from './rpc-protocol';
export { Channel, AbstractChannel, ChannelCloseEvent, MessageProvider } from './channel';
export { AbstractChannel, Channel, ChannelCloseEvent, MessageProvider } from './channel';
export { ReadBuffer, WriteBuffer } from './message-buffer';
export { RequestHandler, RpcProtocol, RpcProtocolOptions } from './rpc-protocol';

import { MsgPackExtensionManager } from './msg-pack-extension-manager';
import { ResponseError } from './rpc-message-encoder';

// Register custom msgPack extension for Errors.
MsgPackExtensionManager.getInstance().registerExtensions({
class: Error,
tag: 1,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
serialize: (error: any) => {
const { code, data, message, name } = error;
const stack = error.stacktrace ?? error.stack;
const isResponseError = error instanceof ResponseError;
return { code, data, message, name, stack, isResponseError };
},
deserialize: data => {
const error = data.isResponseError ? new ResponseError(data.code, data.message, data.data) : new Error(data.message);
error.name = data.name;
error.stack = data.stack;
return error;
}
});
70 changes: 70 additions & 0 deletions packages/core/src/common/message-rpc/msg-pack-extension-manager.ts
@@ -0,0 +1,70 @@
// *****************************************************************************
// Copyright (C) 2022 STMicroelectronics and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0.
//
// This Source Code may also be made available under the following Secondary
// Licenses when the conditions for such availability set forth in the Eclipse
// Public License v. 2.0 are satisfied: GNU General Public License, version 2
// with the GNU Classpath Exception which is available at
// https://www.gnu.org/software/classpath/license.html.
//
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************

import { addExtension } from 'msgpackr';

/**
* Handles the global registration of custom MsgPackR extensions
* required for the default RPC communication. MsgPackR extensions
* are installed globally on both ends of the communication channel.
* (frontend-backend, pluginExt-pluginMain).
* Is implemented as singleton as it is also used in plugin child processes which have no access to inversify.
*/
export class MsgPackExtensionManager {
private static readonly INSTANCE = new MsgPackExtensionManager();
public static getInstance(): MsgPackExtensionManager {
return this.INSTANCE;
}

private extensions = new Map<number, MsgPackExtension>();

private constructor() {
}

registerExtensions(...extensions: MsgPackExtension[]): void {
extensions.forEach(extension => {
if (extension.tag < 1 || extension.tag > 100) {
// MsgPackR reserves the tag range 1-100 for custom extensions.
throw new Error(`MsgPack extension tag should be a number from 1-100 but was '${extension.tag}'`);
}
if (this.extensions.has(extension.tag)) {
throw new Error(`Another MsgPack extension with the tag '${extension.tag}' is already registered`);
}
this.extensions.set(extension.tag, extension);
addExtension({
Class: extension.class,
type: extension.tag,
write: extension.serialize,
read: extension.deserialize
});
});
}

getExtension(tag: number): MsgPackExtension | undefined {
return this.extensions.get(tag);
}
}

export interface MsgPackExtension {
class: Function,
tag: number,
serialize(instance: unknown): unknown,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
deserialize(serialized: any): unknown
}

export type Constructor<T> = new (...params: unknown[]) => T;

26 changes: 3 additions & 23 deletions packages/core/src/common/message-rpc/rpc-message-encoder.ts
Expand Up @@ -15,7 +15,7 @@
// *****************************************************************************
/* eslint-disable @typescript-eslint/no-explicit-any */

import { addExtension, Packr as MsgPack } from 'msgpackr';
import { Packr as MsgPack } from 'msgpackr';
import { ReadBuffer, WriteBuffer } from './message-buffer';

/**
Expand Down Expand Up @@ -121,27 +121,10 @@ export interface RpcMessageEncoder {
}

export const defaultMsgPack = new MsgPack({ moreTypes: true, encodeUndefinedAsNil: false, bundleStrings: false });
// Add custom msgpackR extension for ResponseErrors.
addExtension({
Class: ResponseError,
type: 1,
write: (instance: ResponseError) => {
const { code, data, message, name, stack } = instance;
return { code, data, message, name, stack };
},
read: data => {
const error = new ResponseError(data.code, data.message, data.data);
error.name = data.name;
error.stack = data.stack;
return error;
}
});

export class MsgPackMessageEncoder implements RpcMessageEncoder {

constructor(protected readonly msgPack: MsgPack = defaultMsgPack) {

}
constructor(protected readonly msgPack: MsgPack = defaultMsgPack) { }

cancel(buf: WriteBuffer, requestId: number): void {
this.encode<CancelMessage>(buf, { type: RpcMessageType.Cancel, id: requestId });
Expand Down Expand Up @@ -169,13 +152,11 @@ export class MsgPackMessageEncoder implements RpcMessageEncoder {
throw err;
}
}

}

export class MsgPackMessageDecoder implements RpcMessageDecoder {
constructor(protected readonly msgPack: MsgPack = defaultMsgPack) {
constructor(protected readonly msgPack: MsgPack = defaultMsgPack) { }

}
decode<T = any>(buf: ReadBuffer): T {
const bytes = buf.readBytes();
return this.msgPack.decode(bytes);
Expand All @@ -184,5 +165,4 @@ export class MsgPackMessageDecoder implements RpcMessageDecoder {
parse(buffer: ReadBuffer): RpcMessage {
return this.decode(buffer);
}

}

0 comments on commit 63a9a11

Please sign in to comment.