diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6328224a59b89..1875691d5ce93 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -9,6 +9,7 @@
[Breaking Changes:](#breaking_changes_1.33.0)
- [core] returns of many methods of `MenuModelRegistry` changed from `CompositeMenuNode` to `MutableCompoundMenuNode`. To mutate a menu, use the `updateOptions` method or add a check for `instanceof CompositeMenuNode`, which will be true in most cases.
+- [plugin-ext] refactored the plugin RPC API - now also reuses the msgpackR based RPC protocol that is better suited for handling binary data and enables message tunneling [#11228](https://github.com/eclipse-theia/theia/pull/11261). All plugin protocol types now use `UInt8Array` as type for message parameters instead of `string` - Contributed on behalf of STMicroelectronics.
## v1.32.0 - 11/24/2022
diff --git a/packages/core/src/common/message-rpc/channel.ts b/packages/core/src/common/message-rpc/channel.ts
index cff56221df2a8..5cc433fbb68d0 100644
--- a/packages/core/src/common/message-rpc/channel.ts
+++ b/packages/core/src/common/message-rpc/channel.ts
@@ -14,7 +14,6 @@
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************
-import { injectable } from '../../../shared/inversify';
import { Disposable, DisposableCollection } from '../disposable';
import { Emitter, Event } from '../event';
import { ReadBuffer, WriteBuffer } from './message-buffer';
@@ -72,7 +71,6 @@ export type MessageProvider = () => ReadBuffer;
* Reusable abstract {@link Channel} implementation that sets up
* the basic channel event listeners and offers a generic close method.
*/
-@injectable()
export abstract class AbstractChannel implements Channel {
onCloseEmitter: Emitter = new Emitter();
@@ -101,7 +99,21 @@ export abstract class AbstractChannel implements Channel {
}
abstract getWriteBuffer(): WriteBuffer;
+}
+
+/**
+ * A very basic {@link AbstractChannel} implementation which takes a function
+ * for retrieving the {@link WriteBuffer} as constructor argument.
+ */
+export class BasicChannel extends AbstractChannel {
+
+ constructor(protected writeBufferProvider: () => WriteBuffer) {
+ super();
+ }
+ getWriteBuffer(): WriteBuffer {
+ return this.writeBufferProvider();
+ }
}
/**
@@ -194,7 +206,7 @@ export class ChannelMultiplexer implements Disposable {
return this.handleClose(id);
}
case MessageTypes.Data: {
- return this.handleData(id, buffer.sliceAtReadPosition());
+ return this.handleData(id, buffer);
}
}
}
@@ -206,7 +218,7 @@ export class ChannelMultiplexer implements Disposable {
const channel = this.createChannel(id);
this.pendingOpen.delete(id);
this.openChannels.set(id, channel);
- resolve!(channel);
+ resolve(channel);
this.onOpenChannelEmitter.fire({ id, channel });
}
}
@@ -236,7 +248,7 @@ export class ChannelMultiplexer implements Disposable {
protected handleData(id: string, data: ReadBuffer): void {
const channel = this.openChannels.get(id);
if (channel) {
- channel.onMessageEmitter.fire(() => data);
+ channel.onMessageEmitter.fire(() => data.sliceAtReadPosition());
}
}
@@ -263,6 +275,9 @@ export class ChannelMultiplexer implements Disposable {
}
open(id: string): Promise {
+ if (this.openChannels.has(id)) {
+ throw new Error(`Another channel with the id '${id}' is already open.`);
+ }
const result = new Promise((resolve, reject) => {
this.pendingOpen.set(id, resolve);
});
diff --git a/packages/core/src/common/message-rpc/index.ts b/packages/core/src/common/message-rpc/index.ts
index 39a19eb44fa2f..9dde128942026 100644
--- a/packages/core/src/common/message-rpc/index.ts
+++ b/packages/core/src/common/message-rpc/index.ts
@@ -13,6 +13,10 @@
//
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************
-export { RequestHandler, RpcProtocol, RpcProtocolOptions } from './rpc-protocol';
-export { Channel, AbstractChannel, ChannelCloseEvent, MessageProvider } from './channel';
+export { AbstractChannel, Channel, ChannelCloseEvent, MessageProvider } from './channel';
export { ReadBuffer, WriteBuffer } from './message-buffer';
+export { RequestHandler, RpcProtocol, RpcProtocolOptions } from './rpc-protocol';
+
+import { registerMsgPackExtensions } from './rpc-message-encoder';
+
+registerMsgPackExtensions();
diff --git a/packages/core/src/common/message-rpc/msg-pack-extension-manager.ts b/packages/core/src/common/message-rpc/msg-pack-extension-manager.ts
new file mode 100644
index 0000000000000..65df6981894f9
--- /dev/null
+++ b/packages/core/src/common/message-rpc/msg-pack-extension-manager.ts
@@ -0,0 +1,70 @@
+// *****************************************************************************
+// Copyright (C) 2022 STMicroelectronics and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// http://www.eclipse.org/legal/epl-2.0.
+//
+// This Source Code may also be made available under the following Secondary
+// Licenses when the conditions for such availability set forth in the Eclipse
+// Public License v. 2.0 are satisfied: GNU General Public License, version 2
+// with the GNU Classpath Exception which is available at
+// https://www.gnu.org/software/classpath/license.html.
+//
+// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+// *****************************************************************************
+
+import { addExtension } from 'msgpackr';
+
+/**
+ * Handles the global registration of custom MsgPackR extensions
+ * required for the default RPC communication. MsgPackR extensions
+ * are installed globally on both ends of the communication channel.
+ * (frontend-backend, pluginExt-pluginMain).
+ * Is implemented as singleton as it is also used in plugin child processes which have no access to inversify.
+ */
+export class MsgPackExtensionManager {
+ private static readonly INSTANCE = new MsgPackExtensionManager();
+ public static getInstance(): MsgPackExtensionManager {
+ return this.INSTANCE;
+ }
+
+ private extensions = new Map();
+
+ private constructor() {
+ }
+
+ registerExtensions(...extensions: MsgPackExtension[]): void {
+ extensions.forEach(extension => {
+ if (extension.tag < 1 || extension.tag > 100) {
+ // MsgPackR reserves the tag range 1-100 for custom extensions.
+ throw new Error(`MsgPack extension tag should be a number from 1-100 but was '${extension.tag}'`);
+ }
+ if (this.extensions.has(extension.tag)) {
+ throw new Error(`Another MsgPack extension with the tag '${extension.tag}' is already registered`);
+ }
+ this.extensions.set(extension.tag, extension);
+ addExtension({
+ Class: extension.class,
+ type: extension.tag,
+ write: extension.serialize,
+ read: extension.deserialize
+ });
+ });
+ }
+
+ getExtension(tag: number): MsgPackExtension | undefined {
+ return this.extensions.get(tag);
+ }
+}
+
+export interface MsgPackExtension {
+ class: Function,
+ tag: number,
+ serialize(instance: unknown): unknown,
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ deserialize(serialized: any): unknown
+}
+
+export type Constructor = new (...params: unknown[]) => T;
+
diff --git a/packages/core/src/common/message-rpc/rpc-message-encoder.ts b/packages/core/src/common/message-rpc/rpc-message-encoder.ts
index f8a91857d8ec6..b6886d908f002 100644
--- a/packages/core/src/common/message-rpc/rpc-message-encoder.ts
+++ b/packages/core/src/common/message-rpc/rpc-message-encoder.ts
@@ -15,8 +15,9 @@
// *****************************************************************************
/* eslint-disable @typescript-eslint/no-explicit-any */
-import { addExtension, Packr as MsgPack } from 'msgpackr';
+import { Packr as MsgPack } from 'msgpackr';
import { ReadBuffer, WriteBuffer } from './message-buffer';
+import { MsgPackExtensionManager } from './msg-pack-extension-manager';
/**
* This code lets you encode rpc protocol messages (request/reply/notification/error/cancel)
@@ -121,27 +122,10 @@ export interface RpcMessageEncoder {
}
export const defaultMsgPack = new MsgPack({ moreTypes: true, encodeUndefinedAsNil: false, bundleStrings: false });
-// Add custom msgpackR extension for ResponseErrors.
-addExtension({
- Class: ResponseError,
- type: 1,
- write: (instance: ResponseError) => {
- const { code, data, message, name, stack } = instance;
- return { code, data, message, name, stack };
- },
- read: data => {
- const error = new ResponseError(data.code, data.message, data.data);
- error.name = data.name;
- error.stack = data.stack;
- return error;
- }
-});
export class MsgPackMessageEncoder implements RpcMessageEncoder {
- constructor(protected readonly msgPack: MsgPack = defaultMsgPack) {
-
- }
+ constructor(protected readonly msgPack: MsgPack = defaultMsgPack) { }
cancel(buf: WriteBuffer, requestId: number): void {
this.encode(buf, { type: RpcMessageType.Cancel, id: requestId });
@@ -169,13 +153,11 @@ export class MsgPackMessageEncoder implements RpcMessageEncoder {
throw err;
}
}
-
}
export class MsgPackMessageDecoder implements RpcMessageDecoder {
- constructor(protected readonly msgPack: MsgPack = defaultMsgPack) {
+ constructor(protected readonly msgPack: MsgPack = defaultMsgPack) { }
- }
decode(buf: ReadBuffer): T {
const bytes = buf.readBytes();
return this.msgPack.decode(bytes);
@@ -184,5 +166,25 @@ export class MsgPackMessageDecoder implements RpcMessageDecoder {
parse(buffer: ReadBuffer): RpcMessage {
return this.decode(buffer);
}
+}
+export function registerMsgPackExtensions(): void {
+ // Register custom msgPack extension for Errors.
+ MsgPackExtensionManager.getInstance().registerExtensions({
+ class: Error,
+ tag: 1,
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ serialize: (error: any) => {
+ const { code, data, message, name } = error;
+ const stack = error.stacktrace ?? error.stack;
+ const isResponseError = error instanceof ResponseError;
+ return { code, data, message, name, stack, isResponseError };
+ },
+ deserialize: data => {
+ const error = data.isResponseError ? new ResponseError(data.code, data.message, data.data) : new Error(data.message);
+ error.name = data.name;
+ error.stack = data.stack;
+ return error;
+ }
+ });
}
diff --git a/packages/core/src/common/message-rpc/rpc-protocol.ts b/packages/core/src/common/message-rpc/rpc-protocol.ts
index 61be9c73d3881..4dbf422e0b877 100644
--- a/packages/core/src/common/message-rpc/rpc-protocol.ts
+++ b/packages/core/src/common/message-rpc/rpc-protocol.ts
@@ -16,15 +16,14 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { CancellationToken, CancellationTokenSource } from '../cancellation';
-import { DisposableCollection } from '../disposable';
+import { Disposable, DisposableCollection } from '../disposable';
import { Emitter, Event } from '../event';
import { Deferred } from '../promise-util';
import { Channel } from './channel';
import { MsgPackMessageDecoder, MsgPackMessageEncoder, RpcMessage, RpcMessageDecoder, RpcMessageEncoder, RpcMessageType } from './rpc-message-encoder';
-import { Uint8ArrayWriteBuffer } from './uint8-array-message-buffer';
/**
- * Handles request messages received by the {@link RpcServer}.
+ * Handles request messages received by the {@link RPCProtocol}.
*/
export type RequestHandler = (method: string, args: any[]) => Promise;
@@ -39,15 +38,20 @@ export interface RpcProtocolOptions {
/**
* The message decoder that should be used. If `undefined` the default {@link RpcMessageDecoder} will be used.
*/
- decoder?: RpcMessageDecoder
+ decoder?: RpcMessageDecoder,
+ /**
+ * The runtime mode determines whether the RPC protocol is bi-directional (default) or acts as a client or server only.
+ */
+ mode?: 'default' | 'clientOnly' | 'serverOnly'
}
/**
- * Establish a bi-directional RPC protocol on top of a given channel. Bi-directional means to send
- * sends requests and notifications to the remote side as well as receiving requests and notifications from the remote side.
+ * Establish a RPC protocol on top of a given channel. By default the rpc protocol is bi-directional, meaning it is possible to send
+ * requests and notifications to the remote side (i.e. acts as client) as well as receiving requests and notifications from the remote side (i.e. acts as a server).
* Clients can get a promise for a remote request result that will be either resolved or
* rejected depending on the success of the request. Keeps track of outstanding requests and matches replies to the appropriate request
* Currently, there is no timeout handling for long running requests implemented.
+ * The bi-directional mode can be reconfigured using the {@link RpcProtocolOptions} to construct an RPC protocol instance that acts only as client or server instead.
*/
export class RpcProtocol {
static readonly CANCELLATION_TOKEN_KEY = 'add.cancellation.token';
@@ -58,6 +62,7 @@ export class RpcProtocol {
protected readonly encoder: RpcMessageEncoder;
protected readonly decoder: RpcMessageDecoder;
+ protected readonly mode: 'default' | 'clientOnly' | 'serverOnly';
protected readonly onNotificationEmitter: Emitter<{ method: string; args: any[]; }> = new Emitter();
protected readonly cancellationTokenSources = new Map();
@@ -68,37 +73,50 @@ export class RpcProtocol {
protected toDispose = new DisposableCollection();
- constructor(public readonly channel: Channel, public readonly requestHandler: RequestHandler, options: RpcProtocolOptions = {}) {
+ constructor(public readonly channel: Channel, public readonly requestHandler: RequestHandler | undefined, options: RpcProtocolOptions = {}) {
this.encoder = options.encoder ?? new MsgPackMessageEncoder();
this.decoder = options.decoder ?? new MsgPackMessageDecoder();
this.toDispose.push(this.onNotificationEmitter);
- this.toDispose.push(channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer()))));
channel.onClose(() => this.toDispose.dispose());
+ this.toDispose.push(channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer()))));
+ this.mode = options.mode ?? 'default';
+
+ if (this.mode !== 'clientOnly' && requestHandler === undefined) {
+ console.error('RPCProtocol was initialized without a request handler but was not set to clientOnly mode.');
+ }
}
handleMessage(message: RpcMessage): void {
- switch (message.type) {
- case RpcMessageType.Cancel: {
- this.handleCancel(message.id);
- break;
- }
- case RpcMessageType.Request: {
- this.handleRequest(message.id, message.method, message.args);
- break;
- }
- case RpcMessageType.Notification: {
- this.handleNotify(message.id, message.method, message.args);
- break;
+ if (this.mode !== 'clientOnly') {
+ switch (message.type) {
+ case RpcMessageType.Cancel: {
+ this.handleCancel(message.id);
+ return;
+ }
+ case RpcMessageType.Request: {
+ this.handleRequest(message.id, message.method, message.args);
+ return;
+ }
+ case RpcMessageType.Notification: {
+ this.handleNotify(message.id, message.method, message.args);
+ return;
+ }
}
- case RpcMessageType.Reply: {
- this.handleReply(message.id, message.res);
- break;
- }
- case RpcMessageType.ReplyErr: {
- this.handleReplyErr(message.id, message.err);
- break;
+ }
+ if (this.mode !== 'serverOnly') {
+ switch (message.type) {
+ case RpcMessageType.Reply: {
+ this.handleReply(message.id, message.res);
+ return;
+ }
+ case RpcMessageType.ReplyErr: {
+ this.handleReplyErr(message.id, message.err);
+ return;
+ }
}
}
+ // If the message was not handled until here, it is incompatible with the mode.
+ console.warn(`Received message incompatible with this RPCProtocol's mode '${this.mode}'. Type: ${message.type}. ID: ${message.id}.`);
}
protected handleReply(id: number, value: any): void {
@@ -126,13 +144,13 @@ export class RpcProtocol {
}
sendRequest(method: string, args: any[]): Promise {
- const id = this.nextMessageId++;
- const reply = new Deferred();
-
// The last element of the request args might be a cancellation token. As these tokens are not serializable we have to remove it from the
// args array and the `CANCELLATION_TOKEN_KEY` string instead.
const cancellationToken: CancellationToken | undefined = args.length && CancellationToken.is(args[args.length - 1]) ? args.pop() : undefined;
+ const id = this.nextMessageId++;
+ const reply = new Deferred();
+
if (cancellationToken) {
args.push(RpcProtocol.CANCELLATION_TOKEN_KEY);
}
@@ -153,6 +171,13 @@ export class RpcProtocol {
}
sendNotification(method: string, args: any[]): void {
+ // If the notification supports a CancellationToken, it needs to be treated like a request
+ // because cancellation does not work with the simplified "fire and forget" approach of simple notifications.
+ if (args.length && CancellationToken.is(args[args.length - 1])) {
+ this.sendRequest(method, args);
+ return;
+ }
+
const output = this.channel.getWriteBuffer();
this.encoder.notification(output, this.nextMessageId++, method, args);
output.commit();
@@ -167,7 +192,6 @@ export class RpcProtocol {
protected handleCancel(id: number): void {
const cancellationTokenSource = this.cancellationTokenSources.get(id);
if (cancellationTokenSource) {
- this.cancellationTokenSources.delete(id);
cancellationTokenSource.cancel();
}
}
@@ -185,14 +209,14 @@ export class RpcProtocol {
}
try {
- const result = await this.requestHandler(method, args);
+ const result = await this.requestHandler!(method, args);
this.cancellationTokenSources.delete(id);
this.encoder.replyOK(output, id, result);
output.commit();
} catch (err) {
// In case of an error the output buffer might already contains parts of an message.
// => Dispose the current buffer and retrieve a new, clean one for writing the response error.
- if (output instanceof Uint8ArrayWriteBuffer) {
+ if (Disposable.is(output)) {
output.dispose();
}
const errorOutput = this.channel.getWriteBuffer();
diff --git a/packages/core/src/node/messaging/index.ts b/packages/core/src/node/messaging/index.ts
index fd161d93a9df0..23da1fe350586 100644
--- a/packages/core/src/node/messaging/index.ts
+++ b/packages/core/src/node/messaging/index.ts
@@ -16,3 +16,4 @@
export * from './messaging-service';
export * from './ipc-connection-provider';
+export * from './ipc-channel';
diff --git a/packages/plugin-ext/src/common/index.ts b/packages/plugin-ext/src/common/index.ts
index f77f053e2d32d..d39f1fc134210 100644
--- a/packages/plugin-ext/src/common/index.ts
+++ b/packages/plugin-ext/src/common/index.ts
@@ -18,3 +18,7 @@
export * from './plugin-protocol';
export * from './plugin-api-rpc';
export * from './plugin-ext-api-contribution';
+
+import { registerMsgPackExtensions } from './rpc-protocol';
+
+registerMsgPackExtensions();
diff --git a/packages/plugin-ext/src/common/plugin-api-rpc.ts b/packages/plugin-ext/src/common/plugin-api-rpc.ts
index 9ec9307f24ad5..a77d09cee6151 100644
--- a/packages/plugin-ext/src/common/plugin-api-rpc.ts
+++ b/packages/plugin-ext/src/common/plugin-api-rpc.ts
@@ -1924,7 +1924,6 @@ export const PLUGIN_RPC_CONTEXT = {
STATUS_BAR_MESSAGE_REGISTRY_MAIN: >createProxyIdentifier('StatusBarMessageRegistryMain'),
ENV_MAIN: createProxyIdentifier('EnvMain'),
NOTIFICATION_MAIN: createProxyIdentifier('NotificationMain'),
- NOTIFICATION_EXT: createProxyIdentifier('NotificationExt'),
TERMINAL_MAIN: createProxyIdentifier('TerminalServiceMain'),
TREE_VIEWS_MAIN: createProxyIdentifier('TreeViewsMain'),
PREFERENCE_REGISTRY_MAIN: createProxyIdentifier('PreferenceRegistryMain'),
@@ -1956,7 +1955,6 @@ export const MAIN_RPC_CONTEXT = {
QUICK_OPEN_EXT: createProxyIdentifier('QuickOpenExt'),
WINDOW_STATE_EXT: createProxyIdentifier('WindowStateExt'),
NOTIFICATION_EXT: createProxyIdentifier('NotificationExt'),
- NOTIFICATION_MAIN: createProxyIdentifier('NotificationMain'),
WORKSPACE_EXT: createProxyIdentifier('WorkspaceExt'),
TEXT_EDITORS_EXT: createProxyIdentifier('TextEditorsExt'),
EDITORS_AND_DOCUMENTS_EXT: createProxyIdentifier('EditorsAndDocumentsExt'),
diff --git a/packages/plugin-ext/src/common/plugin-protocol.ts b/packages/plugin-ext/src/common/plugin-protocol.ts
index d4aeea1ac05cc..f4e84cbbf9d77 100644
--- a/packages/plugin-ext/src/common/plugin-protocol.ts
+++ b/packages/plugin-ext/src/common/plugin-protocol.ts
@@ -838,7 +838,7 @@ export function buildFrontendModuleName(plugin: PluginPackage | PluginModel): st
export const HostedPluginClient = Symbol('HostedPluginClient');
export interface HostedPluginClient {
- postMessage(pluginHost: string, message: string): Promise;
+ postMessage(pluginHost: string, buffer: Uint8Array): Promise;
log(logPart: LogPart): void;
@@ -901,7 +901,7 @@ export interface HostedPluginServer extends JsonRpcServer {
getExtPluginAPI(): Promise;
- onMessage(targetHost: string, message: string): Promise;
+ onMessage(targetHost: string, message: Uint8Array): Promise;
}
@@ -944,9 +944,9 @@ export interface PluginServer {
export const ServerPluginRunner = Symbol('ServerPluginRunner');
export interface ServerPluginRunner {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
- acceptMessage(pluginHostId: string, jsonMessage: string): boolean;
+ acceptMessage(pluginHostId: string, jsonMessage: Uint8Array): boolean;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
- onMessage(pluginHostId: string, jsonMessage: string): void;
+ onMessage(pluginHostId: string, jsonMessage: Uint8Array): void;
setClient(client: HostedPluginClient): void;
setDefault(defaultRunner: ServerPluginRunner): void;
clientClosed(): void;
diff --git a/packages/plugin-ext/src/common/proxy-handler.ts b/packages/plugin-ext/src/common/proxy-handler.ts
new file mode 100644
index 0000000000000..deb5ebfa62c0e
--- /dev/null
+++ b/packages/plugin-ext/src/common/proxy-handler.ts
@@ -0,0 +1,126 @@
+/********************************************************************************
+ * Copyright (C) 2022 STMicroelectronics and others.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v. 2.0 which is available at
+ * http://www.eclipse.org/legal/epl-2.0.
+ *
+ * This Source Code may also be made available under the following Secondary
+ * Licenses when the conditions for such availability set forth in the Eclipse
+ * Public License v. 2.0 are satisfied: GNU General Public License, version 2
+ * with the GNU Classpath Exception which is available at
+ * https://www.gnu.org/software/classpath/license.html.
+ *
+ * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+ ********************************************************************************/
+/* eslint-disable @typescript-eslint/no-explicit-any */
+import { Channel, RpcProtocol, RpcProtocolOptions } from '@theia/core/';
+import { RpcMessageDecoder, RpcMessageEncoder } from '@theia/core/lib/common/message-rpc/rpc-message-encoder';
+import { Deferred } from '@theia/core/lib/common/promise-util';
+
+export interface RpcHandlerOptions {
+ id: string
+ encoder: RpcMessageEncoder,
+ decoder: RpcMessageDecoder
+}
+export interface ProxyHandlerOptions extends RpcHandlerOptions {
+ channelProvider: () => Promise,
+}
+
+export interface InvocationHandlerOptions extends RpcHandlerOptions {
+ target: any
+}
+/**
+ * A proxy handler that will send any method invocation on the proxied object
+ * as a rcp protocol message over a channel.
+ */
+export class ClientProxyHandler implements ProxyHandler {
+ private rpcDeferred: Deferred = new Deferred();
+ private isRpcInitialized = false;
+
+ readonly id: string;
+ private readonly channelProvider: () => Promise;
+ private readonly encoder: RpcMessageEncoder;
+ private readonly decoder: RpcMessageDecoder;
+
+ constructor(options: ProxyHandlerOptions) {
+ Object.assign(this, options);
+ }
+
+ private initializeRpc(): void {
+ const clientOptions: RpcProtocolOptions = { encoder: this.encoder, decoder: this.decoder, mode: 'clientOnly' };
+ this.channelProvider().then(channel => {
+ const rpc = new RpcProtocol(channel, undefined, clientOptions);
+ this.rpcDeferred.resolve(rpc);
+ this.isRpcInitialized = true;
+ });
+ }
+
+ get(target: any, name: string, receiver: any): any {
+ if (!this.isRpcInitialized) {
+ this.initializeRpc();
+ }
+
+ if (target[name] || name.charCodeAt(0) !== 36 /* CharCode.DollarSign */) {
+ return target[name];
+ }
+ const isNotify = this.isNotification(name);
+ return (...args: any[]) => {
+ const method = name.toString();
+ return this.rpcDeferred.promise.then(async (connection: RpcProtocol) => {
+ if (isNotify) {
+ connection.sendNotification(method, args);
+ } else {
+ return await connection.sendRequest(method, args) as Promise;
+ }
+ });
+ };
+ }
+
+ /**
+ * Return whether the given property represents a notification. If true,
+ * the promise returned from the invocation will resolve immediately to `undefined`
+ *
+ * A property leads to a notification rather than a method call if its name
+ * begins with `notify` or `on`.
+ *
+ * @param p - The property being called on the proxy.
+ * @return Whether `p` represents a notification.
+ */
+ protected isNotification(p: PropertyKey): boolean {
+ let propertyString = p.toString();
+ if (propertyString.charCodeAt(0) === 36/* CharCode.DollarSign */) {
+ propertyString = propertyString.substring(1);
+ }
+ return propertyString.startsWith('notify') || propertyString.startsWith('on');
+ }
+}
+
+export class RpcInvocationHandler {
+ readonly id: string;
+ readonly target: any;
+
+ private rpcDeferred: Deferred = new Deferred();
+ private readonly encoder: RpcMessageEncoder;
+ private readonly decoder: RpcMessageDecoder;
+
+ constructor(options: InvocationHandlerOptions) {
+ Object.assign(this, options);
+ }
+
+ listen(channel: Channel): void {
+ const serverOptions: RpcProtocolOptions = { encoder: this.encoder, decoder: this.decoder, mode: 'serverOnly' };
+ const server = new RpcProtocol(channel, (method: string, args: any[]) => this.handleRequest(method, args), serverOptions);
+ server.onNotification((e: { method: string, args: any }) => this.onNotification(e.method, e.args));
+ this.rpcDeferred.resolve(server);
+ }
+
+ protected handleRequest(method: string, args: any[]): Promise {
+ return this.rpcDeferred.promise.then(() => this.target[method](...args));
+ }
+
+ protected onNotification(method: string, args: any[]): void {
+ this.target[method](...args);
+ }
+}
+
diff --git a/packages/plugin-ext/src/common/rpc-protocol.ts b/packages/plugin-ext/src/common/rpc-protocol.ts
index efd5b9503bee3..dd47b8f5ae550 100644
--- a/packages/plugin-ext/src/common/rpc-protocol.ts
+++ b/packages/plugin-ext/src/common/rpc-protocol.ts
@@ -22,14 +22,17 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
+import { Channel, Disposable, DisposableCollection, ReadBuffer, WriteBuffer } from '@theia/core';
import { Emitter, Event } from '@theia/core/lib/common/event';
-import { DisposableCollection, Disposable } from '@theia/core/lib/common/disposable';
-import { Deferred } from '@theia/core/lib/common/promise-util';
+import { ChannelMultiplexer, MessageProvider } from '@theia/core/lib/common/message-rpc/channel';
+import { MsgPackMessageDecoder, MsgPackMessageEncoder } from '@theia/core/lib/common/message-rpc/rpc-message-encoder';
+import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '@theia/core/lib/common/message-rpc/uint8-array-message-buffer';
+import { ClientProxyHandler, RpcInvocationHandler } from './proxy-handler';
+import { MsgPackExtensionManager } from '@theia/core/lib/common/message-rpc/msg-pack-extension-manager';
import { URI as VSCodeURI } from '@theia/core/shared/vscode-uri';
import URI from '@theia/core/lib/common/uri';
-import { CancellationToken, CancellationTokenSource } from '@theia/core/shared/vscode-languageserver-protocol';
-import { Range, Position } from '../plugin/types-impl';
import { BinaryBuffer } from '@theia/core/lib/common/buffer';
+import { Range, Position } from '../plugin/types-impl';
export interface MessageConnection {
send(msg: string): void;
@@ -76,49 +79,29 @@ export namespace ConnectionClosedError {
}
export class RPCProtocolImpl implements RPCProtocol {
-
- private readonly locals = new Map();
+ private readonly locals = new Map();
private readonly proxies = new Map();
- private lastMessageId = 0;
- private readonly cancellationTokenSources = new Map();
- private readonly pendingRPCReplies = new Map>();
- private readonly multiplexer: RPCMultiplexer;
-
- private replacer: (key: string | undefined, value: any) => any;
- private reviver: (key: string | undefined, value: any) => any;
+ private readonly multiplexer: ChannelMultiplexer;
+ private readonly encoder = new MsgPackMessageEncoder();
+ private readonly decoder = new MsgPackMessageDecoder();
private readonly toDispose = new DisposableCollection(
Disposable.create(() => { /* mark as no disposed */ })
);
- constructor(connection: MessageConnection, transformations?: {
- replacer?: (key: string | undefined, value: any) => any,
- reviver?: (key: string | undefined, value: any) => any
- }) {
- this.toDispose.push(
- this.multiplexer = new RPCMultiplexer(connection)
- );
- this.multiplexer.onMessage(msg => this.receiveOneMessage(msg));
- this.toDispose.push(Disposable.create(() => {
- this.proxies.clear();
- for (const reply of this.pendingRPCReplies.values()) {
- reply.reject(ConnectionClosedError.create());
- }
- this.pendingRPCReplies.clear();
- }));
-
- this.reviver = transformations?.reviver || ObjectsTransferrer.reviver;
- this.replacer = transformations?.replacer || ObjectsTransferrer.replacer;
- }
-
- private get isDisposed(): boolean {
- return this.toDispose.disposed;
+ constructor(channel: Channel) {
+ this.toDispose.push(this.multiplexer = new ChannelMultiplexer(new BatchingChannel(channel)));
+ this.toDispose.push(Disposable.create(() => this.proxies.clear()));
}
dispose(): void {
this.toDispose.dispose();
}
+ protected get isDisposed(): boolean {
+ return this.toDispose.disposed;
+ }
+
getProxy(proxyId: ProxyIdentifier): T {
if (this.isDisposed) {
throw ConnectionClosedError.create();
@@ -131,274 +114,125 @@ export class RPCProtocolImpl implements RPCProtocol {
return proxy;
}
- set(identifier: ProxyIdentifier, instance: R): R {
- if (this.isDisposed) {
- throw ConnectionClosedError.create();
- }
- this.locals.set(identifier.id, instance);
- if (Disposable.is(instance)) {
- this.toDispose.push(instance);
- }
- this.toDispose.push(Disposable.create(() => this.locals.delete(identifier.id)));
- return instance;
- }
-
- private createProxy(proxyId: string): T {
- const handler = {
- get: (target: any, name: string) => {
- if (!target[name] && name.charCodeAt(0) === 36 /* CharCode.DollarSign */) {
- target[name] = (...myArgs: any[]) =>
- this.remoteCall(proxyId, name, myArgs);
- }
- return target[name];
- }
- };
+ protected createProxy(proxyId: string): T {
+ const handler = new ClientProxyHandler({ id: proxyId, encoder: this.encoder, decoder: this.decoder, channelProvider: () => this.multiplexer.open(proxyId) });
return new Proxy(Object.create(null), handler);
}
- private remoteCall(proxyId: string, methodName: string, args: any[]): Promise {
+ set(identifier: ProxyIdentifier, instance: R): R {
if (this.isDisposed) {
- return Promise.reject(ConnectionClosedError.create());
- }
- const cancellationToken: CancellationToken | undefined = args.length && CancellationToken.is(args[args.length - 1]) ? args.pop() : undefined;
- if (cancellationToken && cancellationToken.isCancellationRequested) {
- return Promise.reject(canceled());
- }
-
- const callId = String(++this.lastMessageId);
- const result = new Deferred();
-
- if (cancellationToken) {
- args.push('add.cancellation.token');
- cancellationToken.onCancellationRequested(() =>
- this.multiplexer.send(this.cancel(callId))
- );
+ throw ConnectionClosedError.create();
}
+ const invocationHandler = this.locals.get(identifier.id);
+ if (!invocationHandler) {
+ const handler = new RpcInvocationHandler({ id: identifier.id, target: instance, encoder: this.encoder, decoder: this.decoder });
- this.pendingRPCReplies.set(callId, result);
- this.multiplexer.send(this.request(callId, proxyId, methodName, args));
- return result.promise;
- }
-
- private receiveOneMessage(rawmsg: string): void {
- if (this.isDisposed) {
- return;
- }
- try {
- const msg = JSON.parse(rawmsg, this.reviver);
-
- switch (msg.type) {
- case MessageType.Request:
- this.receiveRequest(msg);
- break;
- case MessageType.Reply:
- this.receiveReply(msg);
- break;
- case MessageType.ReplyErr:
- this.receiveReplyErr(msg);
- break;
- case MessageType.Cancel:
- this.receiveCancel(msg);
- break;
+ const channel = this.multiplexer.getOpenChannel(identifier.id);
+ if (channel) {
+ handler.listen(channel);
+ } else {
+ const channelOpenListener = this.multiplexer.onDidOpenChannel(event => {
+ if (event.id === identifier.id) {
+ handler.listen(event.channel);
+ channelOpenListener.dispose();
+ }
+ });
}
- } catch (e) {
- // exception does not show problematic content: log it!
- console.log('failed to parse message: ' + rawmsg);
- throw e;
- }
-
- }
-
- private receiveCancel(msg: CancelMessage): void {
- const cancellationTokenSource = this.cancellationTokenSources.get(msg.id);
- if (cancellationTokenSource) {
- cancellationTokenSource.cancel();
- }
- }
-
- private receiveRequest(msg: RequestMessage): void {
- const callId = msg.id;
- const proxyId = msg.proxyId;
- // convert `null` to `undefined`, since we don't use `null` in internal plugin APIs
- const args = msg.args.map(arg => arg === null ? undefined : arg); // eslint-disable-line no-null/no-null
-
- const addToken = args.length && args[args.length - 1] === 'add.cancellation.token' ? args.pop() : false;
- if (addToken) {
- const tokenSource = new CancellationTokenSource();
- this.cancellationTokenSources.set(callId, tokenSource);
- args.push(tokenSource.token);
- }
- const invocation = this.invokeHandler(proxyId, msg.method, args);
-
- invocation.then(result => {
- this.cancellationTokenSources.delete(callId);
- this.multiplexer.send(this.replyOK(callId, result));
- }, error => {
- this.cancellationTokenSources.delete(callId);
- this.multiplexer.send(this.replyErr(callId, error));
- });
- }
-
- private receiveReply(msg: ReplyMessage): void {
- const callId = msg.id;
- const pendingReply = this.pendingRPCReplies.get(callId);
- if (!pendingReply) {
- return;
- }
- this.pendingRPCReplies.delete(callId);
- pendingReply.resolve(msg.res);
- }
-
- private receiveReplyErr(msg: ReplyErrMessage): void {
- const callId = msg.id;
- const pendingReply = this.pendingRPCReplies.get(callId);
- if (!pendingReply) {
- return;
- }
- this.pendingRPCReplies.delete(callId);
-
- let err: Error | undefined = undefined;
- if (msg.err && msg.err.$isError) {
- err = new Error();
- err.name = msg.err.name;
- err.message = msg.err.message;
- err.stack = msg.err.stack;
- }
- pendingReply.reject(err);
- }
-
- private invokeHandler(proxyId: string, methodName: string, args: any[]): Promise {
- try {
- return Promise.resolve(this.doInvokeHandler(proxyId, methodName, args));
- } catch (err) {
- return Promise.reject(err);
- }
- }
-
- private doInvokeHandler(proxyId: string, methodName: string, args: any[]): any {
- const actor = this.locals.get(proxyId);
- if (!actor) {
- throw new Error('Unknown actor ' + proxyId);
- }
- const method = actor[methodName];
- if (typeof method !== 'function') {
- throw new Error('Unknown method ' + methodName + ' on actor ' + proxyId);
- }
- return method.apply(actor, args);
- }
- private cancel(req: string): string {
- return `{"type":${MessageType.Cancel},"id":"${req}"}`;
- }
-
- private request(req: string, rpcId: string, method: string, args: any[]): string {
- return `{"type":${MessageType.Request},"id":"${req}","proxyId":"${rpcId}","method":"${method}","args":${JSON.stringify(args, this.replacer)}}`;
- }
-
- private replyOK(req: string, res: any): string {
- if (typeof res === 'undefined') {
- return `{"type":${MessageType.Reply},"id":"${req}"}`;
- }
- return `{"type":${MessageType.Reply},"id":"${req}","res":${safeStringify(res, this.replacer)}}`;
- }
+ this.locals.set(identifier.id, handler);
+ if (Disposable.is(instance)) {
+ this.toDispose.push(instance);
+ }
+ this.toDispose.push(Disposable.create(() => this.locals.delete(identifier.id)));
- private replyErr(req: string, err: any): string {
- err = typeof err === 'string' ? new Error(err) : err;
- if (err instanceof Error) {
- return `{"type":${MessageType.ReplyErr},"id":"${req}","err":${safeStringify(transformErrorForSerialization(err))}}`;
}
- return `{"type":${MessageType.ReplyErr},"id":"${req}","err":null}`;
+ return instance;
}
}
-function canceled(): Error {
- const error = new Error('Canceled');
- error.name = error.message;
- return error;
-}
-
/**
- * Sends/Receives multiple messages in one go:
+ * Wraps and underlying channel to send/receive multiple messages in one go:
* - multiple messages to be sent from one stack get sent in bulk at `process.nextTick`.
* - each incoming message is handled in a separate `process.nextTick`.
*/
-class RPCMultiplexer implements Disposable, MessageConnection {
-
- private readonly connection: MessageConnection;
- private readonly sendAccumulatedBound: () => void;
-
- private messagesToSend: string[];
+export class BatchingChannel implements Channel {
+ protected messagesToSend: Uint8Array[] = [];
- private readonly messageEmitter = new Emitter();
- private readonly toDispose = new DisposableCollection();
+ constructor(protected underlyingChannel: Channel) {
+ underlyingChannel.onMessage(msg => this.handleMessages(msg()));
+ }
- constructor(connection: MessageConnection) {
- this.connection = connection;
- this.sendAccumulatedBound = this.sendAccumulated.bind(this);
+ protected onMessageEmitter: Emitter = new Emitter();
+ get onMessage(): Event {
+ return this.onMessageEmitter.event;
+ };
- this.toDispose.push(Disposable.create(() => this.messagesToSend = []));
- this.toDispose.push(this.connection.onMessage((msg: string) => {
- const messages = JSON.parse(msg);
- for (const message of messages) {
- this.messageEmitter.fire(message);
- }
- }));
- this.toDispose.push(this.messageEmitter);
+ readonly onClose = this.underlyingChannel.onClose;
+ readonly onError = this.underlyingChannel.onError;
+ close(): void {
+ this.underlyingChannel.close();
+ this.onMessageEmitter.dispose();
this.messagesToSend = [];
}
- dispose(): void {
- this.toDispose.dispose();
- }
-
- get onMessage(): Event {
- return this.messageEmitter.event;
+ getWriteBuffer(): WriteBuffer {
+ const writer = new Uint8ArrayWriteBuffer();
+ writer.onCommit(buffer => this.commitSingleMessage(buffer));
+ return writer;
}
- private sendAccumulated(): void {
- const tmp = this.messagesToSend;
- this.messagesToSend = [];
- this.connection.send(JSON.stringify(tmp));
- }
+ protected commitSingleMessage(msg: Uint8Array): void {
- public send(msg: string): void {
- if (this.toDispose.disposed) {
- throw ConnectionClosedError.create();
- }
if (this.messagesToSend.length === 0) {
if (typeof setImmediate !== 'undefined') {
- setImmediate(this.sendAccumulatedBound);
+ setImmediate(() => this.sendAccumulated());
} else {
- setTimeout(this.sendAccumulatedBound, 0);
+ setTimeout(() => this.sendAccumulated(), 0);
}
}
this.messagesToSend.push(msg);
}
+
+ protected sendAccumulated(): void {
+ const cachedMessages = this.messagesToSend;
+ this.messagesToSend = [];
+ const writer = this.underlyingChannel.getWriteBuffer();
+
+ if (cachedMessages.length > 0) {
+ writer.writeLength(cachedMessages.length);
+ cachedMessages.forEach(msg => {
+ writer.writeBytes(msg);
+ });
+
+ }
+ writer.commit();
+ }
+
+ protected handleMessages(buffer: ReadBuffer): void {
+ // Read in the list of messages and dispatch each message individually
+ const length = buffer.readLength();
+ if (length > 0) {
+ for (let index = 0; index < length; index++) {
+ const message = buffer.readBytes();
+ this.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(message));
+ }
+ }
+ }
}
-/**
- * These functions are responsible for correct transferring objects via rpc channel.
- *
- * To reach that some specific kind of objects is converted to json in some custom way
- * and then, after receiving, revived to objects again,
- * so there is feeling that object was transferred via rpc channel.
- *
- * To distinguish between regular and altered objects, field $type is added to altered ones.
- * Also value of that field specifies kind of the object.
- */
-export namespace ObjectsTransferrer {
-
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- export function replacer(key: string | undefined, value: any): any {
- if (value instanceof URI) {
- return {
- $type: SerializedObjectType.THEIA_URI,
- data: value.toString()
- } as SerializedObject;
- } else if (value instanceof Range) {
- const range = value as Range;
- const serializedValue = {
+export function registerMsgPackExtensions(): void {
+ MsgPackExtensionManager.getInstance().registerExtensions(
+ {
+ class: URI,
+ tag: 2,
+ serialize: (instance: URI) => instance.toString(),
+ deserialize: data => new URI(data)
+ },
+ {
+ class: Range,
+ tag: 3,
+ serialize: (range: Range) => ({
start: {
line: range.start.line,
character: range.start.character
@@ -407,140 +241,33 @@ export namespace ObjectsTransferrer {
line: range.end.line,
character: range.end.character
}
- };
- return {
- $type: SerializedObjectType.THEIA_RANGE,
- data: JSON.stringify(serializedValue)
- } as SerializedObject;
- } else if (value && value['$mid'] === 1) {
- // Given value is VSCode URI
- // We cannot use instanceof here because VSCode URI has toJSON method which is invoked before this replacer.
- const uri = VSCodeURI.revive(value);
- return {
- $type: SerializedObjectType.VSCODE_URI,
- data: uri.toString()
- } as SerializedObject;
- } else if (value instanceof BinaryBuffer) {
- const bytes = [...value.buffer.values()];
- return {
- $type: SerializedObjectType.TEXT_BUFFER,
- data: JSON.stringify({ bytes })
- };
- }
-
- return value;
- }
-
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- export function reviver(key: string | undefined, value: any): any {
- if (isSerializedObject(value)) {
- switch (value.$type) {
- case SerializedObjectType.THEIA_URI:
- return new URI(value.data);
- case SerializedObjectType.VSCODE_URI:
- return VSCodeURI.parse(value.data);
- case SerializedObjectType.THEIA_RANGE:
- // eslint-disable-next-line @typescript-eslint/no-explicit-any
- const obj: any = JSON.parse(value.data);
- const start = new Position(obj.start.line, obj.start.character);
- const end = new Position(obj.end.line, obj.end.character);
- return new Range(start, end);
- case SerializedObjectType.TEXT_BUFFER:
- const data: { bytes: number[] } = JSON.parse(value.data);
- return BinaryBuffer.wrap(Uint8Array.from(data.bytes));
+ }),
+ deserialize: data => {
+ const start = new Position(data.start.line, data.start.character);
+ const end = new Position(data.end.line, data.end.character);
+ return new Range(start, end);
+ }
+ },
+ {
+ class: VSCodeURI,
+ tag: 4,
+ // eslint-disable-next-line arrow-body-style
+ serialize: (instance: URI) => {
+ return instance.toString();
+ },
+ deserialize: data => VSCodeURI.parse(data)
+ },
+ {
+ class: BinaryBuffer,
+ tag: 5,
+ // eslint-disable-next-line arrow-body-style
+ serialize: (instance: BinaryBuffer) => {
+ return instance.buffer;
+ },
+ // eslint-disable-next-line arrow-body-style
+ deserialize: buffer => {
+ return BinaryBuffer.wrap(buffer);
}
}
-
- return value;
- }
-
-}
-
-interface SerializedObject {
- $type: SerializedObjectType;
- data: string;
-}
-
-enum SerializedObjectType {
- THEIA_URI,
- VSCODE_URI,
- THEIA_RANGE,
- TEXT_BUFFER
-}
-
-function isSerializedObject(obj: unknown): obj is SerializedObject {
- const serializedObject = obj as SerializedObject;
- return !!obj && typeof obj === 'object' && serializedObject.$type !== undefined && serializedObject.data !== undefined;
-}
-
-export const enum MessageType {
- Request = 1,
- Reply = 2,
- ReplyErr = 3,
- Cancel = 4,
- Terminate = 5,
- Terminated = 6
-}
-
-class CancelMessage {
- type: MessageType.Cancel;
- id: string;
-}
-
-class RequestMessage {
- type: MessageType.Request;
- id: string;
- proxyId: string;
- method: string;
- args: any[];
-}
-
-class ReplyMessage {
- type: MessageType.Reply;
- id: string;
- res: any;
-}
-
-class ReplyErrMessage {
- type: MessageType.ReplyErr;
- id: string;
- err: SerializedError;
-}
-
-type RPCMessage = RequestMessage | ReplyMessage | ReplyErrMessage | CancelMessage;
-
-export interface SerializedError {
- readonly $isError: true;
- readonly name: string;
- readonly message: string;
- readonly stack: string;
-}
-
-export function transformErrorForSerialization(error: Error): SerializedError {
- if (error instanceof Error) {
- const { name, message } = error;
- const stack: string = (error).stacktrace || error.stack;
- return {
- $isError: true,
- name,
- message,
- stack
- };
- }
-
- // return as is
- return error;
-}
-
-interface JSONStringifyReplacer {
- (key: string, value: any): any;
-}
-
-function safeStringify(obj: any, replacer?: JSONStringifyReplacer): string {
- try {
- return JSON.stringify(obj, replacer);
- } catch (err) {
- console.error('error stringifying response: ', err);
- return 'null';
- }
+ );
}
diff --git a/packages/plugin-ext/src/hosted/browser/hosted-plugin-watcher.ts b/packages/plugin-ext/src/hosted/browser/hosted-plugin-watcher.ts
index 3c80f8d33d410..6fb3a0c55932b 100644
--- a/packages/plugin-ext/src/hosted/browser/hosted-plugin-watcher.ts
+++ b/packages/plugin-ext/src/hosted/browser/hosted-plugin-watcher.ts
@@ -21,7 +21,8 @@ import { LogPart } from '../../common/types';
@injectable()
export class HostedPluginWatcher {
- private onPostMessage = new Emitter<{ pluginHostId: string, message: string }>();
+ private onPostMessage = new Emitter<{ pluginHostId: string, message: Uint8Array }>();
+
private onLogMessage = new Emitter();
private readonly onDidDeployEmitter = new Emitter();
@@ -31,7 +32,7 @@ export class HostedPluginWatcher {
const messageEmitter = this.onPostMessage;
const logEmitter = this.onLogMessage;
return {
- postMessage(pluginHostId, message: string): Promise {
+ postMessage(pluginHostId, message: Uint8Array): Promise {
messageEmitter.fire({ pluginHostId, message });
return Promise.resolve();
},
@@ -43,7 +44,7 @@ export class HostedPluginWatcher {
};
}
- get onPostMessageEvent(): Event<{ pluginHostId: string, message: string }> {
+ get onPostMessageEvent(): Event<{ pluginHostId: string, message: Uint8Array }> {
return this.onPostMessage.event;
}
diff --git a/packages/plugin-ext/src/hosted/browser/hosted-plugin.ts b/packages/plugin-ext/src/hosted/browser/hosted-plugin.ts
index e2c167f02dc68..62627760d3eb2 100644
--- a/packages/plugin-ext/src/hosted/browser/hosted-plugin.ts
+++ b/packages/plugin-ext/src/hosted/browser/hosted-plugin.ts
@@ -66,6 +66,8 @@ import { StandaloneServices } from '@theia/monaco-editor-core/esm/vs/editor/stan
import { ILanguageService } from '@theia/monaco-editor-core/esm/vs/editor/common/languages/language';
import { LanguageService } from '@theia/monaco-editor-core/esm/vs/editor/common/services/languageService';
import { Measurement, Stopwatch } from '@theia/core/lib/common';
+import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '@theia/core/lib/common/message-rpc/uint8-array-message-buffer';
+import { BasicChannel } from '@theia/core/lib/common/message-rpc/channel';
export type PluginHost = 'frontend' | string;
export type DebugActivationEvent = 'onDebugResolve' | 'onDebugInitialConfigurations' | 'onDebugAdapterProtocolTracker' | 'onDebugDynamicConfigurations';
@@ -534,18 +536,25 @@ export class HostedPluginSupport {
}
protected createServerRpc(pluginHostId: string): RPCProtocol {
- const emitter = new Emitter();
+
+ const channel = new BasicChannel(() => {
+ const writer = new Uint8ArrayWriteBuffer();
+ writer.onCommit(buffer => {
+ this.server.onMessage(pluginHostId, buffer);
+ });
+ return writer;
+ });
+
+ // Create RPC protocol before adding the listener to the watcher to receive the watcher's cached messages after the rpc protocol was created.
+ const rpc = new RPCProtocolImpl(channel);
+
this.watcher.onPostMessageEvent(received => {
if (pluginHostId === received.pluginHostId) {
- emitter.fire(received.message);
- }
- });
- return new RPCProtocolImpl({
- onMessage: emitter.event,
- send: message => {
- this.server.onMessage(pluginHostId, message);
+ channel.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(received.message));
}
});
+
+ return rpc;
}
protected async updateStoragePath(): Promise {
diff --git a/packages/plugin-ext/src/hosted/browser/plugin-worker.ts b/packages/plugin-ext/src/hosted/browser/plugin-worker.ts
index 874ad820448e6..6065211c436e3 100644
--- a/packages/plugin-ext/src/hosted/browser/plugin-worker.ts
+++ b/packages/plugin-ext/src/hosted/browser/plugin-worker.ts
@@ -13,8 +13,9 @@
//
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************
+import { BasicChannel } from '@theia/core/lib/common/message-rpc/channel';
+import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '@theia/core/lib/common/message-rpc/uint8-array-message-buffer';
import { injectable } from '@theia/core/shared/inversify';
-import { Emitter } from '@theia/core/lib/common/event';
import { RPCProtocol, RPCProtocolImpl } from '../../common/rpc-protocol';
@injectable()
@@ -25,22 +26,27 @@ export class PluginWorker {
public readonly rpc: RPCProtocol;
constructor() {
- const emitter = new Emitter();
-
this.worker = new Worker(new URL('./worker/worker-main',
// @ts-expect-error (TS1343)
// We compile to CommonJS but `import.meta` is still available in the browser
import.meta.url));
- this.worker.onmessage = m => emitter.fire(m.data);
- this.worker.onerror = e => console.error(e);
+ const channel = new BasicChannel(() => {
+ const writer = new Uint8ArrayWriteBuffer();
+ writer.onCommit(buffer => {
+ this.worker.postMessage(buffer);
+ });
+ return writer;
+ });
+
+ this.rpc = new RPCProtocolImpl(channel);
- this.rpc = new RPCProtocolImpl({
- onMessage: emitter.event,
- send: (m: string) => {
- this.worker.postMessage(m);
- }
+ // eslint-disable-next-line arrow-body-style
+ this.worker.onmessage = buffer => channel.onMessageEmitter.fire(() => {
+ return new Uint8ArrayReadBuffer(buffer.data);
});
+
+ this.worker.onerror = e => channel.onErrorEmitter.fire(e);
}
}
diff --git a/packages/plugin-ext/src/hosted/browser/worker/worker-main.ts b/packages/plugin-ext/src/hosted/browser/worker/worker-main.ts
index 4176d07b499c0..bd6827e1d0f85 100644
--- a/packages/plugin-ext/src/hosted/browser/worker/worker-main.ts
+++ b/packages/plugin-ext/src/hosted/browser/worker/worker-main.ts
@@ -13,28 +13,29 @@
//
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************
-
-import { Emitter } from '@theia/core/lib/common/event';
-import { RPCProtocolImpl } from '../../../common/rpc-protocol';
-import { PluginManagerExtImpl } from '../../../plugin/plugin-manager';
-import { MAIN_RPC_CONTEXT, Plugin, emptyPlugin, TerminalServiceExt } from '../../../common/plugin-api-rpc';
-import { createAPIFactory } from '../../../plugin/plugin-context';
-import { getPluginId, PluginMetadata } from '../../../common/plugin-protocol';
+// eslint-disable-next-line import/no-extraneous-dependencies
+import 'reflect-metadata';
+import { BasicChannel } from '@theia/core/lib/common/message-rpc/channel';
+import { Uint8ArrayReadBuffer, Uint8ArrayWriteBuffer } from '@theia/core/lib/common/message-rpc/uint8-array-message-buffer';
import * as theia from '@theia/plugin';
-import { PreferenceRegistryExtImpl } from '../../../plugin/preference-registry';
+import { emptyPlugin, MAIN_RPC_CONTEXT, Plugin, TerminalServiceExt } from '../../../common/plugin-api-rpc';
import { ExtPluginApi } from '../../../common/plugin-ext-api-contribution';
-import { createDebugExtStub } from './debug-stub';
+import { getPluginId, PluginMetadata } from '../../../common/plugin-protocol';
+import { RPCProtocolImpl } from '../../../common/rpc-protocol';
+import { ClipboardExt } from '../../../plugin/clipboard-ext';
import { EditorsAndDocumentsExtImpl } from '../../../plugin/editors-and-documents';
-import { WorkspaceExtImpl } from '../../../plugin/workspace';
import { MessageRegistryExt } from '../../../plugin/message-registry';
-import { WorkerEnvExtImpl } from './worker-env-ext';
-import { ClipboardExt } from '../../../plugin/clipboard-ext';
+import { createAPIFactory } from '../../../plugin/plugin-context';
+import { PluginManagerExtImpl } from '../../../plugin/plugin-manager';
import { KeyValueStorageProxy } from '../../../plugin/plugin-storage';
+import { PreferenceRegistryExtImpl } from '../../../plugin/preference-registry';
+import { SecretsExtImpl } from '../../../plugin/secrets-ext';
+import { TerminalServiceExtImpl } from '../../../plugin/terminal-ext';
import { WebviewsExtImpl } from '../../../plugin/webviews';
+import { WorkspaceExtImpl } from '../../../plugin/workspace';
+import { createDebugExtStub } from './debug-stub';
import { loadManifest } from './plugin-manifest-loader';
-import { TerminalServiceExtImpl } from '../../../plugin/terminal-ext';
-import { reviver } from '../../../plugin/types-impl';
-import { SecretsExtImpl } from '../../../plugin/secrets-ext';
+import { WorkerEnvExtImpl } from './worker-env-ext';
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const ctx = self as any;
@@ -42,22 +43,20 @@ const ctx = self as any;
const pluginsApiImpl = new Map();
const pluginsModulesNames = new Map();
-const emitter = new Emitter();
-const rpc = new RPCProtocolImpl({
- onMessage: emitter.event,
- send: (m: string) => {
- ctx.postMessage(m);
- },
-},
-{
- reviver: reviver
+const channel = new BasicChannel(() => {
+ const writeBuffer = new Uint8ArrayWriteBuffer();
+ writeBuffer.onCommit(buffer => {
+ ctx.postMessage(buffer);
+ });
+ return writeBuffer;
});
-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
addEventListener('message', (message: any) => {
- emitter.fire(message.data);
+ channel.onMessageEmitter.fire(() => new Uint8ArrayReadBuffer(message.data));
});
+const rpc = new RPCProtocolImpl(channel);
+
const scripts = new Set();
function initialize(contextPath: string, pluginMetadata: PluginMetadata): void {
diff --git a/packages/plugin-ext/src/hosted/node/hosted-plugin-process.ts b/packages/plugin-ext/src/hosted/node/hosted-plugin-process.ts
index 2c0db80da4a72..85df499084cae 100644
--- a/packages/plugin-ext/src/hosted/node/hosted-plugin-process.ts
+++ b/packages/plugin-ext/src/hosted/node/hosted-plugin-process.ts
@@ -14,16 +14,18 @@
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************
-import * as cp from 'child_process';
-import { injectable, inject, named } from '@theia/core/shared/inversify';
-import { ILogger, ConnectionErrorHandler, ContributionProvider, MessageService } from '@theia/core/lib/common';
+import { ConnectionErrorHandler, ContributionProvider, ILogger, MessageService } from '@theia/core/lib/common';
+import { Deferred } from '@theia/core/lib/common/promise-util';
import { createIpcEnv } from '@theia/core/lib/node/messaging/ipc-protocol';
-import { HostedPluginClient, ServerPluginRunner, PluginHostEnvironmentVariable, DeployedPlugin, PLUGIN_HOST_BACKEND, PluginIdentifiers } from '../../common/plugin-protocol';
-import { MessageType } from '../../common/rpc-protocol';
+import { inject, injectable, named } from '@theia/core/shared/inversify';
+import * as cp from 'child_process';
import { HostedPluginCliContribution } from './hosted-plugin-cli-contribution';
-import * as psTree from 'ps-tree';
-import { Deferred } from '@theia/core/lib/common/promise-util';
import { HostedPluginLocalizationService } from './hosted-plugin-localization-service';
+import { ProcessTerminatedMessage, ProcessTerminateMessage } from './hosted-plugin-protocol';
+import { BinaryMessagePipe } from '@theia/core/lib/node/messaging/binary-message-pipe';
+import { DeployedPlugin, HostedPluginClient, PluginHostEnvironmentVariable, PluginIdentifiers, PLUGIN_HOST_BACKEND, ServerPluginRunner } from '../../common/plugin-protocol';
+import psTree = require('ps-tree');
+import { Duplex } from 'stream';
export interface IPCConnectionOptions {
readonly serverName: string;
@@ -60,6 +62,7 @@ export class HostedPluginProcess implements ServerPluginRunner {
protected readonly localizationService: HostedPluginLocalizationService;
private childProcess: cp.ChildProcess | undefined;
+ private messagePipe?: BinaryMessagePipe;
private client: HostedPluginClient;
private terminatingPluginServer = false;
@@ -82,14 +85,14 @@ export class HostedPluginProcess implements ServerPluginRunner {
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
- public acceptMessage(pluginHostId: string, message: string): boolean {
+ public acceptMessage(pluginHostId: string, message: Uint8Array): boolean {
return pluginHostId === 'main';
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
- public onMessage(pluginHostId: string, jsonMessage: string): void {
- if (this.childProcess) {
- this.childProcess.send(jsonMessage);
+ public onMessage(pluginHostId: string, message: Uint8Array): void {
+ if (this.messagePipe) {
+ this.messagePipe.send(message);
}
}
@@ -106,12 +109,12 @@ export class HostedPluginProcess implements ServerPluginRunner {
const waitForTerminated = new Deferred();
cp.on('message', message => {
const msg = JSON.parse(message as string);
- if ('type' in msg && msg.type === MessageType.Terminated) {
+ if (ProcessTerminatedMessage.is(msg)) {
waitForTerminated.resolve();
}
});
const stopTimeout = this.cli.pluginHostStopTimeout;
- cp.send(JSON.stringify({ type: MessageType.Terminate, stopTimeout }));
+ cp.send(JSON.stringify({ type: ProcessTerminateMessage.TYPE, stopTimeout }));
const terminateTimeout = this.cli.pluginHostTerminateTimeout;
if (terminateTimeout) {
@@ -156,9 +159,11 @@ export class HostedPluginProcess implements ServerPluginRunner {
logger: this.logger,
args: []
});
- this.childProcess.on('message', message => {
+
+ this.messagePipe = new BinaryMessagePipe(this.childProcess.stdio[4] as Duplex);
+ this.messagePipe.onMessage(buffer => {
if (this.client) {
- this.client.postMessage(PLUGIN_HOST_BACKEND, message as string);
+ this.client.postMessage(PLUGIN_HOST_BACKEND, buffer);
}
});
}
@@ -184,7 +189,11 @@ export class HostedPluginProcess implements ServerPluginRunner {
silent: true,
env: env,
execArgv: [],
- stdio: ['pipe', 'pipe', 'pipe', 'ipc']
+ // 5th element MUST be 'overlapped' for it to work properly on Windows.
+ // 'overlapped' works just like 'pipe' on non-Windows platforms.
+ // See: https://nodejs.org/docs/latest-v14.x/api/child_process.html#child_process_options_stdio
+ // Note: For some reason `@types/node` does not know about 'overlapped'.
+ stdio: ['pipe', 'pipe', 'pipe', 'ipc', 'overlapped' as 'pipe']
};
const inspectArgPrefix = `--${options.serverName}-inspect`;
const inspectArg = process.argv.find(v => v.startsWith(inspectArgPrefix));
diff --git a/packages/plugin-ext/src/hosted/node/hosted-plugin-protocol.ts b/packages/plugin-ext/src/hosted/node/hosted-plugin-protocol.ts
new file mode 100644
index 0000000000000..429c2168d92ac
--- /dev/null
+++ b/packages/plugin-ext/src/hosted/node/hosted-plugin-protocol.ts
@@ -0,0 +1,49 @@
+// *****************************************************************************
+// Copyright (C) 2022 STMicroelectronics and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// http://www.eclipse.org/legal/epl-2.0.
+//
+// This Source Code may also be made available under the following Secondary
+// Licenses when the conditions for such availability set forth in the Eclipse
+// Public License v. 2.0 are satisfied: GNU General Public License, version 2
+// with the GNU Classpath Exception which is available at
+// https://www.gnu.org/software/classpath/license.html.
+//
+// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
+// *****************************************************************************
+
+// Custom message protocol between `HostedPluginProcess` and its `PluginHost` child process.
+
+/**
+ * Sent to initiate termination of the counterpart process.
+ */
+export interface ProcessTerminateMessage {
+ type: typeof ProcessTerminateMessage.TYPE,
+ stopTimeout?: number
+}
+
+export namespace ProcessTerminateMessage {
+ export const TYPE = 0;
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ export function is(object: any): object is ProcessTerminateMessage {
+ return typeof object === 'object' && object.type === TYPE;
+ }
+}
+
+/**
+ * Sent to inform the counter part process that the process termination has been completed.
+ */
+export interface ProcessTerminatedMessage {
+ type: typeof ProcessTerminateMessage.TYPE,
+}
+
+export namespace ProcessTerminatedMessage {
+ export const TYPE = 1;
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ export function is(object: any): object is ProcessTerminateMessage {
+ return typeof object === 'object' && object.type === TYPE;
+ }
+}
+
diff --git a/packages/plugin-ext/src/hosted/node/hosted-plugin.ts b/packages/plugin-ext/src/hosted/node/hosted-plugin.ts
index 119822628bd1e..81d03a156ea20 100644
--- a/packages/plugin-ext/src/hosted/node/hosted-plugin.ts
+++ b/packages/plugin-ext/src/hosted/node/hosted-plugin.ts
@@ -71,7 +71,7 @@ export class HostedPluginSupport {
}
}
- onMessage(pluginHostId: string, message: string): void {
+ onMessage(pluginHostId: string, message: Uint8Array): void {
// need to perform routing
// eslint-disable-next-line @typescript-eslint/no-explicit-any
if (this.pluginRunners.length > 0) {
diff --git a/packages/plugin-ext/src/hosted/node/plugin-host.ts b/packages/plugin-ext/src/hosted/node/plugin-host.ts
index fa54f21beb537..24b67a6304b83 100644
--- a/packages/plugin-ext/src/hosted/node/plugin-host.ts
+++ b/packages/plugin-ext/src/hosted/node/plugin-host.ts
@@ -13,11 +13,12 @@
//
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// *****************************************************************************
-
-import { Emitter } from '@theia/core/lib/common/event';
-import { RPCProtocolImpl, MessageType, ConnectionClosedError } from '../../common/rpc-protocol';
+// eslint-disable-next-line import/no-extraneous-dependencies
+import 'reflect-metadata';
+import { ConnectionClosedError, RPCProtocolImpl } from '../../common/rpc-protocol';
+import { ProcessTerminatedMessage, ProcessTerminateMessage } from './hosted-plugin-protocol';
import { PluginHostRPC } from './plugin-host-rpc';
-import { reviver } from '../../plugin/types-impl';
+import { IPCChannel } from '@theia/core/lib/node';
console.log('PLUGIN_HOST(' + process.pid + ') starting instance');
@@ -74,18 +75,8 @@ process.on('rejectionHandled', (promise: Promise) => {
});
let terminating = false;
-const emitter = new Emitter();
-const rpc = new RPCProtocolImpl({
- onMessage: emitter.event,
- send: (m: string) => {
- if (process.send && !terminating) {
- process.send(m);
- }
- }
-},
-{
- reviver: reviver
-});
+const channel = new IPCChannel();
+const rpc = new RPCProtocolImpl(channel);
process.on('message', async (message: string) => {
if (terminating) {
@@ -93,10 +84,9 @@ process.on('message', async (message: string) => {
}
try {
const msg = JSON.parse(message);
- if ('type' in msg && msg.type === MessageType.Terminate) {
+ if (ProcessTerminateMessage.is(msg)) {
terminating = true;
- emitter.dispose();
- if ('stopTimeout' in msg && typeof msg.stopTimeout === 'number' && msg.stopTimeout) {
+ if (msg.stopTimeout) {
await Promise.race([
pluginHostRPC.terminate(),
new Promise(resolve => setTimeout(resolve, msg.stopTimeout))
@@ -106,10 +96,9 @@ process.on('message', async (message: string) => {
}
rpc.dispose();
if (process.send) {
- process.send(JSON.stringify({ type: MessageType.Terminated }));
+ process.send(JSON.stringify({ type: ProcessTerminatedMessage.TYPE }));
}
- } else {
- emitter.fire(message);
+
}
} catch (e) {
console.error(e);
diff --git a/packages/plugin-ext/src/hosted/node/plugin-service.ts b/packages/plugin-ext/src/hosted/node/plugin-service.ts
index fcd24918ed2f6..4789ffb38f0ac 100644
--- a/packages/plugin-ext/src/hosted/node/plugin-service.ts
+++ b/packages/plugin-ext/src/hosted/node/plugin-service.ts
@@ -164,7 +164,7 @@ export class HostedPluginServerImpl implements HostedPluginServer {
return Promise.all(plugins.map(plugin => this.localizationService.localizePlugin(plugin)));
}
- onMessage(pluginHostId: string, message: string): Promise {
+ onMessage(pluginHostId: string, message: Uint8Array): Promise {
this.hostedPlugin.onMessage(pluginHostId, message);
return Promise.resolve();
}
diff --git a/packages/plugin-ext/src/main/browser/main-context.ts b/packages/plugin-ext/src/main/browser/main-context.ts
index 74288f2d67b36..eee7fd8cd7335 100644
--- a/packages/plugin-ext/src/main/browser/main-context.ts
+++ b/packages/plugin-ext/src/main/browser/main-context.ts
@@ -56,7 +56,6 @@ import { CustomEditorsMainImpl } from './custom-editors/custom-editors-main';
import { SecretsMainImpl } from './secrets-main';
import { WebviewViewsMainImpl } from './webview-views/webview-views-main';
import { MonacoLanguages } from '@theia/monaco/lib/browser/monaco-languages';
-import { NotificationExtImpl } from '../../plugin/notification';
import { UntitledResourceResolver } from '@theia/core/lib/common/resource';
import { ThemeService } from '@theia/core/lib/browser/theming';
@@ -110,9 +109,6 @@ export function setUpPluginApi(rpc: RPCProtocol, container: interfaces.Container
const notificationMain = new NotificationMainImpl(rpc, container);
rpc.set(PLUGIN_RPC_CONTEXT.NOTIFICATION_MAIN, notificationMain);
- const notificationExt = new NotificationExtImpl(rpc);
- rpc.set(MAIN_RPC_CONTEXT.NOTIFICATION_EXT, notificationExt);
-
const terminalMain = new TerminalServiceMainImpl(rpc, container);
rpc.set(PLUGIN_RPC_CONTEXT.TERMINAL_MAIN, terminalMain);
diff --git a/packages/plugin-ext/src/plugin/types-impl.ts b/packages/plugin-ext/src/plugin/types-impl.ts
index 4cfbe31eb1dcd..6405cf910bd43 100644
--- a/packages/plugin-ext/src/plugin/types-impl.ts
+++ b/packages/plugin-ext/src/plugin/types-impl.ts
@@ -30,22 +30,8 @@ import { startsWithIgnoreCase } from '@theia/core/lib/common/strings';
import { SymbolKind } from '../common/plugin-api-rpc-model';
import { FileSystemProviderErrorCode, markAsFileSystemProviderError } from '@theia/filesystem/lib/common/files';
import * as paths from 'path';
-import { ObjectsTransferrer } from '../common/rpc-protocol';
import { es5ClassCompat } from '../common/types';
-/**
- * A reviver that takes URI's transferred via JSON.stringify() and makes
- * instances of our local plugin API URI class (below)
- */
-// eslint-disable-next-line @typescript-eslint/no-explicit-any
-export function reviver(key: string | undefined, value: any): any {
- const revived = ObjectsTransferrer.reviver(key, value);
- if (CodeURI.isUri(revived)) {
- return URI.revive(revived);
- }
- return revived;
-}
-
/**
* This is an implementation of #theia.Uri based on vscode-uri.
* This is supposed to fix https://github.com/eclipse-theia/theia/issues/8752