Skip to content

Commit

Permalink
Protoype of a faster RPC protocol
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Mäder <tmader@redhat.com>
  • Loading branch information
tsmaeder authored and tortmayr committed Feb 22, 2022
1 parent 92b376d commit 76595fb
Show file tree
Hide file tree
Showing 13 changed files with 1,545 additions and 0 deletions.
10 changes: 10 additions & 0 deletions packages/core/src/common/message-rpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# message-rpc

An attempt to rewrite the theia RPC infrastructure with a couple of changes:

1. "Zero-copy" message writing and reading
2. Support for binary buffers without ever encoding them
3. Separate RPC server from RPC client
4. Use a unified "Channel" interface

A lot of this code is more or less copied from the current Theia code.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/********************************************************************************
* Copyright (C) 2021 Red Hat, Inc. and others.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the Eclipse
* Public License v. 2.0 are satisfied: GNU General Public License, version 2
* with the GNU Classpath Exception which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
********************************************************************************/
import { expect } from 'chai';
import { ArrayBufferReadBuffer, ArrrayBufferWriteBuffer } from './array-buffer-message-buffer';

describe('array message buffer tests', () => {
it('basic read write test', () => {
const buffer = new ArrayBuffer(1024);
const writer = new ArrrayBufferWriteBuffer(buffer);

writer.writeByte(8);
writer.writeInt(10000);
writer.writeBytes(new Uint8Array([1, 2, 3, 4]));
writer.writeString('this is a string');
writer.writeString('another string');
writer.commit();

const written = writer.getCurrentContents();

const reader = new ArrayBufferReadBuffer(written);

expect(reader.readByte()).equal(8);
expect(reader.readInt()).equal(10000);
expect(reader.readBytes()).deep.equal(new Uint8Array([1, 2, 3, 4]).buffer);
expect(reader.readString()).equal('this is a string');
expect(reader.readString()).equal('another string');
});
});
124 changes: 124 additions & 0 deletions packages/core/src/common/message-rpc/array-buffer-message-buffer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/********************************************************************************
* Copyright (C) 2021 Red Hat, Inc. and others.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the Eclipse
* Public License v. 2.0 are satisfied: GNU General Public License, version 2
* with the GNU Classpath Exception which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
********************************************************************************/
import { Emitter, Event } from '../event';
import { ReadBuffer, WriteBuffer } from './message-buffer';

export class ArrrayBufferWriteBuffer implements WriteBuffer {
constructor(private buffer: ArrayBuffer = new ArrayBuffer(1024), private offset: number = 0) {
}

private get msg(): DataView {
return new DataView(this.buffer);
}

ensureCapacity(value: number): WriteBuffer {
let newLength = this.buffer.byteLength;
while (newLength < this.offset + value) {
newLength *= 2;
}
if (newLength !== this.buffer.byteLength) {
const newBuffer = new ArrayBuffer(newLength);
new Uint8Array(newBuffer).set(new Uint8Array(this.buffer));
this.buffer = newBuffer;
}
return this;
}

writeByte(value: number): WriteBuffer {
this.ensureCapacity(1);
this.msg.setUint8(this.offset++, value);
return this;
}

writeInt(value: number): WriteBuffer {
this.ensureCapacity(4);
this.msg.setUint32(this.offset, value);
this.offset += 4;
return this;
}

writeString(value: string): WriteBuffer {
const encoded = this.encodeString(value);
this.writeBytes(encoded);
return this;
}

private encodeString(value: string): Uint8Array {
return new TextEncoder().encode(value);
}

writeBytes(value: ArrayBuffer): WriteBuffer {
this.ensureCapacity(value.byteLength + 4);
this.writeInt(value.byteLength);
new Uint8Array(this.buffer).set(new Uint8Array(value), this.offset);
this.offset += value.byteLength;
return this;
}

private onCommitEmitter = new Emitter<ArrayBuffer>();
get onCommit(): Event<ArrayBuffer> {
return this.onCommitEmitter.event;
}

commit(): void {
this.onCommitEmitter.fire(this.getCurrentContents());
}

getCurrentContents(): ArrayBuffer {
return this.buffer.slice(0, this.offset);
}
}

export class ArrayBufferReadBuffer implements ReadBuffer {
private offset: number = 0;

constructor(private readonly buffer: ArrayBuffer) {
}

private get msg(): DataView {
return new DataView(this.buffer);
}

readByte(): number {
return this.msg.getUint8(this.offset++);
}

readInt(): number {
const result = this.msg.getInt32(this.offset);
this.offset += 4;
return result;
}

readString(): string {
const len = this.msg.getUint32(this.offset);
this.offset += 4;
const result = this.decodeString(this.buffer.slice(this.offset, this.offset + len));
this.offset += len;
return result;
}

private decodeString(buf: ArrayBuffer): string {
return new TextDecoder().decode(buf);
}

readBytes(): ArrayBuffer {
const length = this.msg.getUint32(this.offset);
this.offset += 4;
const result = this.buffer.slice(this.offset, this.offset + length);
this.offset += length;
return result;
}
}
67 changes: 67 additions & 0 deletions packages/core/src/common/message-rpc/channel.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/********************************************************************************
* Copyright (C) 2021 Red Hat, Inc. and others.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the Eclipse
* Public License v. 2.0 are satisfied: GNU General Public License, version 2
* with the GNU Classpath Exception which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
********************************************************************************/
import { assert, expect, spy, use } from 'chai';
import * as spies from 'chai-spies';

import { ChannelMultiplexer, ChannelPipe } from './channel';
import { ReadBuffer } from './message-buffer';

use(spies);

describe('multiplexer test', () => {
it('multiplex message', async () => {
const pipe = new ChannelPipe();

const leftMultiplexer = new ChannelMultiplexer(pipe.left);
const rightMultiplexer = new ChannelMultiplexer(pipe.right);
const openChannelSpy = spy(() => {
});

rightMultiplexer.onDidOpenChannel(openChannelSpy);
leftMultiplexer.onDidOpenChannel(openChannelSpy);

const leftFirst = await leftMultiplexer.open('first');
const leftSecond = await leftMultiplexer.open('second');

const rightFirst = rightMultiplexer.getOpenChannel('first');
const rightSecond = rightMultiplexer.getOpenChannel('second');

assert.isNotNull(rightFirst);
assert.isNotNull(rightSecond);

const leftSecondSpy = spy((buf: ReadBuffer) => {
const message = buf.readString();
expect(message).equal('message for second');
});

leftSecond.onMessage(leftSecondSpy);

const rightFirstSpy = spy((buf: ReadBuffer) => {
const message = buf.readString();
expect(message).equal('message for first');
});

rightFirst!.onMessage(rightFirstSpy);

leftFirst.getWriteBuffer().writeString('message for first').commit();
rightSecond!.getWriteBuffer().writeString('message for second').commit();

expect(leftSecondSpy).to.be.called();
expect(rightFirstSpy).to.be.called();

expect(openChannelSpy).to.be.called.exactly(4);
})
});
Loading

0 comments on commit 76595fb

Please sign in to comment.