Skip to content

Commit

Permalink
Use binary message RPC protocol for plugin API
Browse files Browse the repository at this point in the history
Refactors the plugin RPC protocol to make use of the new message-rpc introduced with eclipse-theia#11011/eclipse-theia#11228.
- Refactor plugin-ext RpcProtocol API to reuse the new message-rpc protocol
  - Remove custom RPC message encoding and handling reuse message-rpc
  - Implement `QueuingChannelMultiplexer` that queues messages and sends them accumulated on the next process.tick (replaces the old Multiplexer 
    implementation)
   - Refactors proxy handlers and remote target handlers
   - Use `Channel` instead of `MessageConnection` for creating new instances of RPCProtocol
   - Refactor `RpcMessageEncoder`/`RpcMessageDecoder` to enable overwritting of already registered value encoders/decoders. 
   - Add mode property to  base `RpcProtocol` to enable switching from a bidirectional RPC protocol to a client-only or server-only variant.
- Implement special message encoders and decoders for the plugin communication. (Replacement for the old `ObjectTransferrer` JSON replacers/revivers)
- Adapt `HostedPluginServer` and `HostedPluginClient` API to send/receive messages in binary format instead of strings. This enables direct writethrough of the binary messages received from the hosted plugin process.
- Adapt `hosted-plugin-process` and `plugin-host` to directly send binary messages via  `IpcChannel`/`BinaryMessagePipe`

- Remove incorrect (and unused) notification proxy identifiers and instantiation
  - NotificationExt was instantiated in the main context
  - There were unused notification proxy identifiers for main and ext in the wrong contexts

Part of eclipse-theia#10684
Fixes eclipse-theia#9514

Contributed on behalf of STMicroelectronics

Co-authored-by: Lucas Koehler <lkoehler@eclipsesource.com>
  • Loading branch information
tortmayr and lucas-koehler committed Jun 7, 2022
1 parent ac05b61 commit 28009a3
Show file tree
Hide file tree
Showing 20 changed files with 543 additions and 528 deletions.
22 changes: 16 additions & 6 deletions packages/core/src/common/message-rpc/channel.ts
Expand Up @@ -159,6 +159,10 @@ export class ChannelMultiplexer implements Disposable {

}

protected getUnderlyingWriteBuffer(): WriteBuffer {
return this.underlyingChannel.getWriteBuffer();
}

protected handleMessage(buffer: ReadBuffer): void {
const type = buffer.readUint8();
const id = buffer.readString();
Expand All @@ -185,7 +189,7 @@ export class ChannelMultiplexer implements Disposable {
const channel = this.createChannel(id);
this.pendingOpen.delete(id);
this.openChannels.set(id, channel);
resolve!(channel);
resolve(channel);
this.onOpenChannelEmitter.fire({ id, channel });
}
}
Expand All @@ -199,7 +203,7 @@ export class ChannelMultiplexer implements Disposable {
// edge case: both side try to open a channel at the same time.
resolve(channel);
}
this.underlyingChannel.getWriteBuffer().writeUint8(MessageTypes.AckOpen).writeString(id).commit();
this.getUnderlyingWriteBuffer().writeUint8(MessageTypes.AckOpen).writeString(id).commit();
this.onOpenChannelEmitter.fire({ id, channel });
}
}
Expand All @@ -215,7 +219,9 @@ export class ChannelMultiplexer implements Disposable {
protected handleData(id: string, data: ReadBuffer): void {
const channel = this.openChannels.get(id);
if (channel) {
channel.onMessageEmitter.fire(() => data);
channel.onMessageEmitter.fire(() => data.sliceAtReadPosition());
} else {
console.warn(`Could not handle data for channel ${id} because it is not open`);
}
}

Expand All @@ -226,14 +232,14 @@ export class ChannelMultiplexer implements Disposable {
// Prepare the write buffer for the channel with the give, id. The channel id has to be encoded
// and written to the buffer before the actual message.
protected prepareWriteBuffer(id: string): WriteBuffer {
const underlying = this.underlyingChannel.getWriteBuffer();
const underlying = this.getUnderlyingWriteBuffer();
underlying.writeUint8(MessageTypes.Data);
underlying.writeString(id);
return underlying;
}

protected closeChannel(id: string): void {
this.underlyingChannel.getWriteBuffer()
this.getUnderlyingWriteBuffer()
.writeUint8(MessageTypes.Close)
.writeString(id)
.commit();
Expand All @@ -242,10 +248,14 @@ export class ChannelMultiplexer implements Disposable {
}

open(id: string): Promise<Channel> {
const existingChannel = this.getOpenChannel(id);
if (existingChannel) {
return Promise.resolve(existingChannel);
}
const result = new Promise<Channel>((resolve, reject) => {
this.pendingOpen.set(id, resolve);
});
this.underlyingChannel.getWriteBuffer().writeUint8(MessageTypes.Open).writeString(id).commit();
this.getUnderlyingWriteBuffer().writeUint8(MessageTypes.Open).writeString(id).commit();
return result;
}

Expand Down
26 changes: 18 additions & 8 deletions packages/core/src/common/message-rpc/rpc-message-encoder.ts
Expand Up @@ -149,6 +149,10 @@ export class RpcMessageDecoder {
protected decoders: Map<number, ValueDecoder> = new Map();

constructor() {
this.registerDecoders();
}

protected registerDecoders(): void {
this.registerDecoder(ObjectType.JSON, {
read: buf => {
const json = buf.readString();
Expand Down Expand Up @@ -206,7 +210,6 @@ export class RpcMessageDecoder {
this.registerDecoder(ObjectType.Number, {
read: buf => buf.readNumber()
});

}

/**
Expand All @@ -215,9 +218,10 @@ export class RpcMessageDecoder {
* by retrieving the highest tag value and calculating the required Uint size to store it.
* @param tag the tag for which the decoder should be registered.
* @param decoder the decoder that should be registered.
* @param overwrite flag to indicate wether an existing registration with the same tag should be overwritten with the new registration.
*/
registerDecoder(tag: number, decoder: ValueDecoder): void {
if (this.decoders.has(tag)) {
registerDecoder(tag: number, decoder: ValueDecoder, overwrite = false): void {
if (!overwrite && this.decoders.has(tag)) {
throw new Error(`Decoder already registered: ${tag}`);
}
this.decoders.set(tag, decoder);
Expand Down Expand Up @@ -435,14 +439,20 @@ export class RpcMessageEncoder {
* After the successful registration the {@link tagIntType} is recomputed
* by retrieving the highest tag value and calculating the required Uint size to store it.
* @param tag the tag for which the encoder should be registered.
* @param decoder the encoder that should be registered.
* @param encoder the encoder that should be registered.
* @param overwrite to indicate wether an existing registration with the same tag should be overwritten with the new registration.
*/
registerEncoder<T>(tag: number, encoder: ValueEncoder): void {
if (this.registeredTags.has(tag)) {
registerEncoder<T>(tag: number, encoder: ValueEncoder, overwrite = false): void {
if (!overwrite && this.registeredTags.has(tag)) {
throw new Error(`Tag already registered: ${tag}`);
}
this.registeredTags.add(tag);
this.encoders.push([tag, encoder]);
if (!overwrite) {
this.registeredTags.add(tag);
this.encoders.push([tag, encoder]);
} else {
const overrideIndex = this.encoders.findIndex(existingEncoder => existingEncoder[0] === tag);
this.encoders[overrideIndex] = [tag, encoder];
}
}

cancel(buf: WriteBuffer, requestId: number): void {
Expand Down
78 changes: 52 additions & 26 deletions packages/core/src/common/message-rpc/rpc-protocol.ts
Expand Up @@ -24,7 +24,7 @@ import { RpcMessage, RpcMessageDecoder, RpcMessageEncoder, RpcMessageType } from
import { Uint8ArrayWriteBuffer } from './uint8-array-message-buffer';

/**
* Handles request messages received by the {@link RpcServer}.
* Handles request messages received by the {@link RPCProtocol}.
*/
export type RequestHandler = (method: string, args: any[]) => Promise<any>;

Expand All @@ -39,15 +39,20 @@ export interface RpcProtocolOptions {
/**
* The message decoder that should be used. If `undefined` the default {@link RpcMessageDecoder} will be used.
*/
decoder?: RpcMessageDecoder
decoder?: RpcMessageDecoder,
/**
* The runtime mode determines whether the RPC protocol is bi-directional (default) or acts as a client or server only.
*/
mode?: 'default' | 'clientOnly' | 'serverOnly'
}

/**
* Establish a bi-directional RPC protocol on top of a given channel. Bi-directional means to send
* sends requests and notifications to the remote side as well as receiving requests and notifications from the remote side.
* Establish a RPC protocol on top of a given channel. By default the rpc protocol is bi-directional, meaning it is possible to send
* requests and notifications to the remote side (i.e. acts as client) as well as receiving requests and notifications from the remote side (i.e. acts as a server).
* Clients can get a promise for a remote request result that will be either resolved or
* rejected depending on the success of the request. Keeps track of outstanding requests and matches replies to the appropriate request
* Currently, there is no timeout handling for long running requests implemented.
* The bi-directional mode can be reconfigured using the {@link RpcProtocolOptions} to construct an RPC protocol instance that acts only as client or server instead.
*/
export class RpcProtocol {
static readonly CANCELLATION_TOKEN_KEY = 'add.cancellation.token';
Expand All @@ -58,6 +63,7 @@ export class RpcProtocol {

protected readonly encoder: RpcMessageEncoder;
protected readonly decoder: RpcMessageDecoder;
protected readonly mode: 'default' | 'clientOnly' | 'serverOnly';

protected readonly onNotificationEmitter: Emitter<{ method: string; args: any[]; }> = new Emitter();
protected readonly cancellationTokenSources = new Map<number, CancellationTokenSource>();
Expand All @@ -68,37 +74,50 @@ export class RpcProtocol {

protected toDispose = new DisposableCollection();

constructor(public readonly channel: Channel, public readonly requestHandler: RequestHandler, options: RpcProtocolOptions = {}) {
constructor(public readonly channel: Channel, public readonly requestHandler: RequestHandler | undefined, options: RpcProtocolOptions = {}) {
this.encoder = options.encoder ?? new RpcMessageEncoder();
this.decoder = options.decoder ?? new RpcMessageDecoder();
this.toDispose.push(this.onNotificationEmitter);
this.toDispose.push(channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer()))));
channel.onClose(() => this.toDispose.dispose());
this.toDispose.push(channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer()))));
this.mode = options.mode ?? 'default';

if (this.mode !== 'clientOnly' && requestHandler === undefined) {
console.error('RPCProtocol was initialized without a request handler but was not set to clientOnly mode.');
}
}

handleMessage(message: RpcMessage): void {
switch (message.type) {
case RpcMessageType.Cancel: {
this.handleCancel(message.id);
break;
}
case RpcMessageType.Request: {
this.handleRequest(message.id, message.method, message.args);
break;
if (this.mode !== 'clientOnly') {
switch (message.type) {
case RpcMessageType.Cancel: {
this.handleCancel(message.id);
return;
}
case RpcMessageType.Request: {
this.handleRequest(message.id, message.method, message.args);
return;
}
case RpcMessageType.Notification: {
this.handleNotify(message.id, message.method, message.args);
return;
}
}
case RpcMessageType.Notification: {
this.handleNotify(message.id, message.method, message.args);
break;
}
case RpcMessageType.Reply: {
this.handleReply(message.id, message.res);
break;
}
case RpcMessageType.ReplyErr: {
this.handleReplyErr(message.id, message.err);
break;
}
if (this.mode !== 'serverOnly') {
switch (message.type) {
case RpcMessageType.Reply: {
this.handleReply(message.id, message.res);
return;
}
case RpcMessageType.ReplyErr: {
this.handleReplyErr(message.id, message.err);
return;
}
}
}
// If the message was not handled until here, it is incompatible with the mode.
console.warn(`Received message incompatible with this RPCProtocol's mode '${this.mode}'. Type: ${message.type}. ID: ${message.id}.`);
}

protected handleReply(id: number, value: any): void {
Expand Down Expand Up @@ -153,6 +172,13 @@ export class RpcProtocol {
}

sendNotification(method: string, args: any[]): void {
// If the notification supports a CancellationToken, it needs to be treated like a request
// because cancellation does not work with the simplified "fire and forget" approach of simple notifications.
if (args.length && CancellationToken.is(args[args.length - 1])) {
this.sendRequest(method, args);
return;
}

const output = this.channel.getWriteBuffer();
this.encoder.notification(output, this.nextMessageId++, method, args);
output.commit();
Expand Down Expand Up @@ -191,7 +217,7 @@ export class RpcProtocol {
}

try {
const result = await this.requestHandler(method, args);
const result = await this.requestHandler!(method, args);
this.cancellationTokenSources.delete(id);
this.encoder.replyOK(output, id, result);
output.commit();
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/node/messaging/index.ts
Expand Up @@ -16,3 +16,4 @@

export * from './messaging-service';
export * from './ipc-connection-provider';
export * from './ipc-channel';
2 changes: 0 additions & 2 deletions packages/plugin-ext/src/common/plugin-api-rpc.ts
Expand Up @@ -1839,7 +1839,6 @@ export const PLUGIN_RPC_CONTEXT = {
STATUS_BAR_MESSAGE_REGISTRY_MAIN: <ProxyIdentifier<StatusBarMessageRegistryMain>>createProxyIdentifier<StatusBarMessageRegistryMain>('StatusBarMessageRegistryMain'),
ENV_MAIN: createProxyIdentifier<EnvMain>('EnvMain'),
NOTIFICATION_MAIN: createProxyIdentifier<NotificationMain>('NotificationMain'),
NOTIFICATION_EXT: createProxyIdentifier<NotificationExt>('NotificationExt'),
TERMINAL_MAIN: createProxyIdentifier<TerminalServiceMain>('TerminalServiceMain'),
TREE_VIEWS_MAIN: createProxyIdentifier<TreeViewsMain>('TreeViewsMain'),
PREFERENCE_REGISTRY_MAIN: createProxyIdentifier<PreferenceRegistryMain>('PreferenceRegistryMain'),
Expand Down Expand Up @@ -1871,7 +1870,6 @@ export const MAIN_RPC_CONTEXT = {
QUICK_OPEN_EXT: createProxyIdentifier<QuickOpenExt>('QuickOpenExt'),
WINDOW_STATE_EXT: createProxyIdentifier<WindowStateExt>('WindowStateExt'),
NOTIFICATION_EXT: createProxyIdentifier<NotificationExt>('NotificationExt'),
NOTIFICATION_MAIN: createProxyIdentifier<NotificationMain>('NotificationMain'),
WORKSPACE_EXT: createProxyIdentifier<WorkspaceExt>('WorkspaceExt'),
TEXT_EDITORS_EXT: createProxyIdentifier<TextEditorsExt>('TextEditorsExt'),
EDITORS_AND_DOCUMENTS_EXT: createProxyIdentifier<EditorsAndDocumentsExt>('EditorsAndDocumentsExt'),
Expand Down
8 changes: 4 additions & 4 deletions packages/plugin-ext/src/common/plugin-protocol.ts
Expand Up @@ -813,7 +813,7 @@ export function buildFrontendModuleName(plugin: PluginPackage | PluginModel): st

export const HostedPluginClient = Symbol('HostedPluginClient');
export interface HostedPluginClient {
postMessage(pluginHost: string, message: string): Promise<void>;
postMessage(pluginHost: string, buffer: Uint8Array): Promise<void>;

log(logPart: LogPart): void;

Expand Down Expand Up @@ -858,7 +858,7 @@ export interface HostedPluginServer extends JsonRpcServer<HostedPluginClient> {

getExtPluginAPI(): Promise<ExtPluginApi[]>;

onMessage(targetHost: string, message: string): Promise<void>;
onMessage(targetHost: string, message: Uint8Array): Promise<void>;

}

Expand Down Expand Up @@ -895,9 +895,9 @@ export interface PluginServer {
export const ServerPluginRunner = Symbol('ServerPluginRunner');
export interface ServerPluginRunner {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
acceptMessage(pluginHostId: string, jsonMessage: string): boolean;
acceptMessage(pluginHostId: string, jsonMessage: Uint8Array): boolean;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
onMessage(pluginHostId: string, jsonMessage: string): void;
onMessage(pluginHostId: string, jsonMessage: Uint8Array): void;
setClient(client: HostedPluginClient): void;
setDefault(defaultRunner: ServerPluginRunner): void;
clientClosed(): void;
Expand Down

0 comments on commit 28009a3

Please sign in to comment.