Skip to content

Commit

Permalink
core: improve rpc protocol
Browse files Browse the repository at this point in the history
- Ensure that pending requests are properly rejected when the underlying service channel is closed.
- Remove obsolete id-property from `NotificationMessage`s. Ids are only required fro matching request-response pairs.
- Rename `JsonRpcProxyFactory` and related types to `RpcProxyFactory` (Rpc*). The naming scheme was a remainder of the old vscode jsonr-rpc based protocol.
  By simply using the `Rpc` suffix the class names are less misleading and protocol agnostic.
- Keep deprecated declarations of the old `JsonRpc*` namespace. The components are heavily used by adopters so we should maintain this deprecated symbols for a while to enable graceful migration without hard API breaks.
- Use a deferred in the `RpcProxyFactory` for protocol initialization

Fixes #12581
  • Loading branch information
tortmayr committed May 30, 2023
1 parent c1a2b7b commit 8bb9afa
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 63 deletions.
7 changes: 3 additions & 4 deletions packages/core/src/common/message-rpc/rpc-message-encoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ export interface RequestMessage {

export interface NotificationMessage {
type: RpcMessageType.Notification;
id: number;
method: string;
args: any[];
}
Expand Down Expand Up @@ -111,7 +110,7 @@ export interface RpcMessageDecoder {
export interface RpcMessageEncoder {
cancel(buf: WriteBuffer, requestId: number): void;

notification(buf: WriteBuffer, requestId: number, method: string, args: any[]): void
notification(buf: WriteBuffer, method: string, args: any[]): void

request(buf: WriteBuffer, requestId: number, method: string, args: any[]): void

Expand All @@ -130,8 +129,8 @@ export class MsgPackMessageEncoder implements RpcMessageEncoder {
cancel(buf: WriteBuffer, requestId: number): void {
this.encode<CancelMessage>(buf, { type: RpcMessageType.Cancel, id: requestId });
}
notification(buf: WriteBuffer, requestId: number, method: string, args: any[]): void {
this.encode<NotificationMessage>(buf, { type: RpcMessageType.Notification, id: requestId, method, args });
notification(buf: WriteBuffer, method: string, args: any[]): void {
this.encode<NotificationMessage>(buf, { type: RpcMessageType.Notification, method, args });
}
request(buf: WriteBuffer, requestId: number, method: string, args: any[]): void {
this.encode<RequestMessage>(buf, { type: RpcMessageType.Request, id: requestId, method, args });
Expand Down
14 changes: 9 additions & 5 deletions packages/core/src/common/message-rpc/rpc-protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ export class RpcProtocol {
this.encoder = options.encoder ?? new MsgPackMessageEncoder();
this.decoder = options.decoder ?? new MsgPackMessageDecoder();
this.toDispose.push(this.onNotificationEmitter);
channel.onClose(() => this.toDispose.dispose());
channel.onClose(event => {
this.pendingRequests.forEach(pending => pending.reject(new Error(event.reason)));
this.pendingRequests.clear();
this.toDispose.dispose();
});
this.toDispose.push(channel.onMessage(readBuffer => this.handleMessage(this.decoder.parse(readBuffer()))));
this.mode = options.mode ?? 'default';

Expand All @@ -98,7 +102,7 @@ export class RpcProtocol {
return;
}
case RpcMessageType.Notification: {
this.handleNotify(message.id, message.method, message.args);
this.handleNotify(message.method, message.args);
return;
}
}
Expand All @@ -116,7 +120,7 @@ export class RpcProtocol {
}
}
// If the message was not handled until here, it is incompatible with the mode.
console.warn(`Received message incompatible with this RPCProtocol's mode '${this.mode}'. Type: ${message.type}. ID: ${message.id}.`);
console.warn(`Received message incompatible with this RPCProtocol's mode '${this.mode}'. Type: ${message.type}`);
}

protected handleReply(id: number, value: any): void {
Expand Down Expand Up @@ -179,7 +183,7 @@ export class RpcProtocol {
}

const output = this.channel.getWriteBuffer();
this.encoder.notification(output, this.nextMessageId++, method, args);
this.encoder.notification(output, method, args);
output.commit();
}

Expand Down Expand Up @@ -226,7 +230,7 @@ export class RpcProtocol {
}
}

protected async handleNotify(id: number, method: string, args: any[]): Promise<void> {
protected async handleNotify(method: string, args: any[]): Promise<void> {
if (this.toDispose.disposed) {
return;
}
Expand Down
149 changes: 95 additions & 54 deletions packages/core/src/common/messaging/proxy-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,30 @@ import { Emitter, Event } from '../event';
import { Channel } from '../message-rpc/channel';
import { RequestHandler, RpcProtocol } from '../message-rpc/rpc-protocol';
import { ConnectionHandler } from './handler';
import { Deferred } from '../promise-util';

export type JsonRpcServer<Client> = Disposable & {
export type RpcServer<Client> = Disposable & {
/**
* If this server is a proxy to a remote server then
* a client is used as a local object
* to handle JSON-RPC messages from the remote server.
* to handle RPC messages from the remote server.
*/
setClient(client: Client | undefined): void;
getClient?(): Client | undefined;
};

export interface JsonRpcConnectionEventEmitter {
export interface RpcConnectionEventEmitter {
readonly onDidOpenConnection: Event<void>;
readonly onDidCloseConnection: Event<void>;
}
export type JsonRpcProxy<T> = T & JsonRpcConnectionEventEmitter;

export class JsonRpcConnectionHandler<T extends object> implements ConnectionHandler {
export type RpcProxy<T> = T & RpcConnectionEventEmitter;

export class RpcConnectionHandler<T extends object> implements ConnectionHandler {
constructor(
readonly path: string,
readonly targetFactory: (proxy: JsonRpcProxy<T>) => any,
readonly factoryConstructor: new () => JsonRpcProxyFactory<T> = JsonRpcProxyFactory
readonly targetFactory: (proxy: RpcProxy<T>) => any,
readonly factoryConstructor: new () => RpcProxyFactory<T> = RpcProxyFactory
) { }

onConnection(connection: Channel): void {
Expand All @@ -54,20 +56,30 @@ export class JsonRpcConnectionHandler<T extends object> implements ConnectionHan
factory.listen(connection);
}
}

/**
* Factory for creating a new {@link RpcConnection} for a given chanel and {@link RequestHandler}.
* Factory for creating a new {@link RpcProtocol} for a given chanel and {@link RequestHandler}.
*/
export type RpcConnectionFactory = (channel: Channel, requestHandler: RequestHandler) => RpcProtocol;
export type RpcProtocolFactory = (channel: Channel, requestHandler: RequestHandler) => RpcProtocol;

const defaultRPCConnectionFactory: RpcConnectionFactory = (channel, requestHandler) => new RpcProtocol(channel, requestHandler);
const defaultRpcProtocolFactory: RpcProtocolFactory = (channel, requestHandler) => new RpcProtocol(channel, requestHandler);

export interface RpcProxyFactoryOptions {
rpcProtocolFactory?: RpcProtocolFactory,
/**
* Used to identify proxy calls that should be sent as notification. If method starts with one of the given
* keys it is treated as a notification
*/
notificationKeys?: string[]
}

/**
* Factory for JSON-RPC proxy objects.
* Factory for RPC proxy objects.
*
* A JSON-RPC proxy exposes the programmatic interface of an object through
* JSON-RPC. This allows remote programs to call methods of this objects by
* sending JSON-RPC requests. This takes place over a bi-directional stream,
* where both ends can expose an object and both can call methods each other's
* A RPC proxy exposes the programmatic interface of an object through
* Theia's RPC protocol. This allows remote programs to call methods of this objects by
* sending RPC requests. This takes place over a bi-directional stream,
* where both ends can expose an object and both can call methods on each other's
* exposed object.
*
* For example, assuming we have an object of the following type on one end:
Expand All @@ -76,87 +88,87 @@ const defaultRPCConnectionFactory: RpcConnectionFactory = (channel, requestHandl
* bar(baz: number): number { return baz + 1 }
* }
*
* which we want to expose through a JSON-RPC interface. We would do:
* which we want to expose through a JSON interface. We would do:
*
* let target = new Foo()
* let factory = new JsonRpcProxyFactory<Foo>('/foo', target)
* let factory = new RpcProxyFactory<Foo>('/foo', target)
* factory.onConnection(connection)
*
* The party at the other end of the `connection`, in order to remotely call
* methods on this object would do:
*
* let factory = new JsonRpcProxyFactory<Foo>('/foo')
* let factory = new RpcProxyFactory<Foo>('/foo')
* factory.onConnection(connection)
* let proxy = factory.createProxy();
* let result = proxy.bar(42)
* // result is equal to 43
*
* One the wire, it would look like this:
*
* --> {"jsonrpc": "2.0", "id": 0, "method": "bar", "params": {"baz": 42}}
* <-- {"jsonrpc": "2.0", "id": 0, "result": 43}
* --> { "type":"1", "id": 1, "method": "bar", "args": [42]}
* <-- { "type":"3", "id": 1, "res": 43}
*
* Note that in the code of the caller, we didn't pass a target object to
* JsonRpcProxyFactory, because we don't want/need to expose an object.
* RpcProxyFactory, because we don't want/need to expose an object.
* If we had passed a target object, the other side could've called methods on
* it.
*
* @param <T> - The type of the object to expose to JSON-RPC.
* @param <T> - The type of the object to expose to RPC.
*/

export class JsonRpcProxyFactory<T extends object> implements ProxyHandler<T> {
export class RpcProxyFactory<T extends object> implements ProxyHandler<T> {

protected readonly onDidOpenConnectionEmitter = new Emitter<void>();
protected readonly onDidCloseConnectionEmitter = new Emitter<void>();

protected connectionPromiseResolve: (connection: RpcProtocol) => void;
protected connectionPromise: Promise<RpcProtocol>;
protected rpcDeferred = new Deferred<RpcProtocol>();

/**
* Build a new JsonRpcProxyFactory.
* Build a new RpcProxyFactory.
*
* @param target - The object to expose to JSON-RPC methods calls. If this
* @param target - The object to expose to RPC methods calls. If this
* is omitted, the proxy won't be able to handle requests, only send them.
*/
constructor(public target?: any, protected rpcConnectionFactory = defaultRPCConnectionFactory) {
constructor(public target?: any, protected rpcProtocolFactory = defaultRpcProtocolFactory) {
this.waitForConnection();
}

protected waitForConnection(): void {
this.connectionPromise = new Promise(resolve =>
this.connectionPromiseResolve = resolve
);
this.connectionPromise.then(connection => {
connection.channel.onClose(() => {
this.rpcDeferred.promise.then(protocol => {
protocol.channel.onClose(() => {
this.onDidCloseConnectionEmitter.fire(undefined);
// Wait for connection in case the backend reconnects
this.waitForConnection();
});
protocol.channel.onError(err => {
if (this.rpcDeferred.state !== 'resolved') {
this.rpcDeferred.reject(err);
}
});
this.onDidOpenConnectionEmitter.fire(undefined);
});
}

/**
* Connect a MessageConnection to the factory.
* Connect a {@link Channel} to the factory by creating an {@link RpcProtocol} on top of it.
*
* This connection will be used to send/receive JSON-RPC requests and
* This protocol will be used to send/receive RPC requests and
* response.
*/
listen(channel: Channel): void {
const connection = this.rpcConnectionFactory(channel, (meth, args) => this.onRequest(meth, ...args));
connection.onNotification(event => this.onNotification(event.method, ...event.args));
const protocol = this.rpcProtocolFactory(channel, (meth, args) => this.onRequest(meth, ...args));
protocol.onNotification(event => this.onNotification(event.method, ...event.args));

this.connectionPromiseResolve(connection);
this.rpcDeferred.resolve(protocol);
}

/**
* Process an incoming JSON-RPC method call.
* Process an incoming RPC method call.
*
* onRequest is called when the JSON-RPC connection received a method call
* request. It calls the corresponding method on [[target]].
* onRequest is called when the RPC connection received a method call
* request. It calls the corresponding method on [[target]].
*
* The return value is a Promise object that is resolved with the return
* value of the method call, if it is successful. The promise is rejected
* value of the method call, if it is successful. The promise is rejected
* if the called method does not exist or if it throws.
*
* @returns A promise of the method call completion.
Expand All @@ -181,7 +193,7 @@ export class JsonRpcProxyFactory<T extends object> implements ProxyHandler<T> {
}

/**
* Process an incoming JSON-RPC notification.
* Process an incoming RPC notification.
*
* Same as [[onRequest]], but called on incoming notifications rather than
* methods calls.
Expand All @@ -194,37 +206,37 @@ export class JsonRpcProxyFactory<T extends object> implements ProxyHandler<T> {

/**
* Create a Proxy exposing the interface of an object of type T. This Proxy
* can be used to do JSON-RPC method calls on the remote target object as
* can be used to do RPC method calls on the remote target object as
* if it was local.
*
* If `T` implements `JsonRpcServer` then a client is used as a target object for a remote target object.
* If `T` implements `RpcServer` then a client is used as a target object for a remote target object.
*/
createProxy(): JsonRpcProxy<T> {
createProxy(): RpcProxy<T> {
const result = new Proxy<T>(this as any, this);
return result as any;
}

/**
* Get a callable object that executes a JSON-RPC method call.
* Get a callable object that executes a RPC method call.
*
* Getting a property on the Proxy object returns a callable that, when
* called, executes a JSON-RPC call. The name of the property defines the
* called, executes a RPC call. The name of the property defines the
* method to be called. The callable takes a variable number of arguments,
* which are passed in the JSON-RPC method call.
* which are passed in the RPC method call.
*
* For example, if you have a Proxy object:
*
* let fooProxyFactory = JsonRpcProxyFactory<Foo>('/foo')
* let fooProxyFactory = RpcProxyFactory<Foo>('/foo')
* let fooProxy = fooProxyFactory.createProxy()
*
* accessing `fooProxy.bar` will return a callable that, when called,
* executes a JSON-RPC method call to method `bar`. Therefore, doing
* executes a RPC method call to method `bar`. Therefore, doing
* `fooProxy.bar()` will call the `bar` method on the remote Foo object.
*
* @param target - unused.
* @param p - The property accessed on the Proxy object.
* @param receiver - unused.
* @returns A callable that executes the JSON-RPC call.
* @returns A callable that executes the RPC call.
*/
get(target: T, p: PropertyKey, receiver: any): any {
if (p === 'setClient') {
Expand All @@ -249,7 +261,7 @@ export class JsonRpcProxyFactory<T extends object> implements ProxyHandler<T> {
return (...args: any[]) => {
const method = p.toString();
const capturedError = new Error(`Request '${method}' failed`);
return this.connectionPromise.then(connection =>
return this.rpcDeferred.promise.then(connection =>
new Promise<void>((resolve, reject) => {
try {
if (isNotify) {
Expand Down Expand Up @@ -308,3 +320,32 @@ export class JsonRpcProxyFactory<T extends object> implements ProxyHandler<T> {

}

/**
* @deprecated since 1.39.0 use `RpcConnectionEventEmitter` instead
*/
export type JsonRpcConnectionEventEmitter = RpcConnectionEventEmitter;

/**
* @deprecated since 1.39.0 use `RpcServer` instead
*/
export type JsonRpcServer<Client> = RpcServer<Client>;

/**
* @deprecated since 1.39.0 use `RpcProxy` instead
*/
export type JsonRpcProxy<T> = RpcProxy<T>;

/**
* @deprecated since 1.39.0 use `RpcConnectionHandler` instead
*/
export class JsonRpcConnectionHandler<T extends object> extends RpcConnectionHandler<T> {

}

/**
* @deprecated since 1.39.0 use `JsonRpcProxyFactory` instead
*/
export class JsonRpcProxyFactory<T extends object> extends RpcProxyFactory<T> {

}

0 comments on commit 8bb9afa

Please sign in to comment.